Wednesday, May 8, 2024
No menu items!
HomeCloud ComputingIngesting Google Cloud Storage files to BigQuery using Cloud Functions and...

Ingesting Google Cloud Storage files to BigQuery using Cloud Functions and Serverless Spark

Apache Spark has become a popular platform as it can serve all of data engineering, data exploration, and machine learning use cases. However, Spark still requires the on-premises way of managing clusters and tuning infrastructure for each job. This increases costs, reduces agility, and makes governance extremely hard; prohibiting enterprises from making insights available to the right users at the right time.Dataproc Serverless lets you run Spark batch workloads without requiring you to provision and manage your own cluster. Specify workload parameters, and then submit the workload to the Dataproc Serverless service. The service will run the workload on a managed compute infrastructure, autoscaling resources as needed. Dataproc Serverless charges apply only to the time when the workload is executing.

You can run the following Spark workload types on the Dataproc Serverless for Spark service:

Pyspark

Spark SQL

Spark R

Spark Java/Scala

This post walks you through the process of ingesting files into BigQuery using serverless service such asCloud Functions, Pub/Sub & Serverless Spark. We use Daily Shelter Occupancy data in this example.

You can find the complete source code for this solution within our Github.

Here’s a look at the architecture we’ll be using:

Here’s how to get started with ingesting GCS files to BigQuery using Cloud Functions and Serverless Spark:

1. Create a bucket, the bucket holds the data to be ingested in GCP. Once the object is upload in a bucket, the notification is created in Pub/Sub topic

code_block[StructValue([(u’code’, u’PROJECT_ID=<<project_id>>rnGCS_BUCKET_NAME=<<Bucket name>>rngsutil mb gs://${GCS_BUCKET_NAME}rngsutil notification create \rn -t projects/${PROJECT_ID}/topics/create_notification_${GCS_BUCKET_NAME} \rn -e OBJECT_FINALIZE \rn -f json gs://${GCS_BUCKET_NAME}’), (u’language’, u”)])]

2. Build and copy the jar to a GCS bucket(Create a GCS bucket to store the jar if you don’t have one).

Follow the steps to create a GCS bucket and copy JAR to the same.

code_block[StructValue([(u’code’, u’GCS_ARTIFACT_REPO=<<artifact repo name>>rngsutil mb gs://${GCS_ARTIFACT_REPO}rncd gcs2bq-sparkrnmvn clean installrngsutil cp target/GCS2BQWithSpark-1.0-SNAPSHOT.jar gs://${GCS_ARTIFACT_REPO}/’), (u’language’, u”)])]

3. Enable network configuration required to run serverless spark

Open subnet connectivity: The subnet must allow subnet communication on all ports. The following gcloud command attaches a network firewall to a subnet that allows ingress communications using all protocols on all ports if the source and destination are tagged with “serverless-spark”

code_block[StructValue([(u’code’, u’gcloud compute firewall-rules create allow-internal-ingress \rn–network=”default” \rn–source-tags=”serverless-spark” \rn–target-tags=”serverless-spark” \rn–direction=”ingress” \rn–action=”allow” \rn–rules=”all”‘), (u’language’, u”)])]

Note: The default VPC network in a project with the default-allow-internal firewall rule, which allows ingress communication on all ports (tcp:0-65535, udp:0-65535, and icmp protocols:ports), meets this requirement. However, it also allows ingress by any VM instance on the network

Private Google Access: The subnet must have Private Google Access enabled. External network access. Drivers and executors have internal IP addresses. You can set up Cloud NAT to allow outbound traffic using internal IPs on your VPC network.

4. Create necessary GCP resources required by Serverless Spark

Create BQ Dataset Create a dataset to load csv files.

code_block[StructValue([(u’code’, u’DATASET_NAME=<<dataset_name>>rnbq –location=US mk -d \rn –default_table_expiration 3600 \rn ${DATASET_NAME}’), (u’language’, u”)])]

Create BQ table Create a table using the schema in schema/schema.json

code_block[StructValue([(u’code’, u’TABLE_NAME=<<table_name>>rnbq mk –table ${PROJECT_ID}:${DATASET_NAME}.${TABLE_NAME} \rn ./schema/schema.json’), (u’language’, u”)])]

Create service account and permission required to read from GCS bucket and write to BigQuery table

code_block[StructValue([(u’code’, u’SERVICE_ACCOUNT_ID=”gcs-to-bq-sa”rngcloud iam service-accounts create ${SERVICE_ACCOUNT_ID} \rn –description=”GCS to BQ service account for Serverless Spark” \rn –display-name=”GCS2BQ-SA”rnrnroles=(“roles/dataproc.worker” “roles/bigquery.dataEditor” “roles/bigquery.jobUser” “roles/storage.objectViewer”)rnfor role in ${roles[@]}; dorn gcloud projects add-iam-policy-binding ${PROJECT_ID} \rn –member=”serviceAccount:${SERVICE_ACCOUNT_ID}@${PROJECT_ID}.iam.gserviceaccount.com” \rn –role=”$role”rndone’), (u’language’, u”)])]

Create GCS bucket to load data to BigQuery

code_block[StructValue([(u’code’, u’GCS_TEMP_BUCKET=<<temp_bucket>>rngsutil mb gs://${GCS_TEMP_BUCKET}’), (u’language’, u”)])]

Create Dead Letter Topic and Subscription

code_block[StructValue([(u’code’, u’ERROR_TOPIC=err_gcs2bq_${GCS_BUCKET_NAME}rngcloud pubsub topics create $ERROR_TOPICrngcloud pubsub subscriptions create err_sub_${GCS_BUCKET_NAME}} \rn–topic=${ERROR_TOPIC}’), (u’language’, u”)])]

Note: Once all resources are created, change the variables value () in trigger-serverless-spark-fxn/main.py from line 27 to 31. The code of the function is in Github

5. Deploy the cloud function.

The cloud function is triggered once the object is copied to the bucket. The cloud function triggers the Servereless spark which loads data into Bigquery.

code_block[StructValue([(u’code’, u’cd trigger-serverless-spark-fxnrngcloud functions deploy trigger-serverless-spark-fxn –entry-point \rninvoke_sreverless_spark –runtime python37 \rn–trigger-resource ${GCS_BUCKET_NAME}_create_notification \rn–trigger-event google.pubsub.topic.publish’), (u’language’, u”)])]

6. Invoke end to end pipeline

Invoke the end-to-end pipeline by Downloading 2020 Daily Center Data and uploading to the GCS bucket(GCS_BUCKET_NAME).(Note: replace with the bucket name created in Step-1)

Debugging Pipelines

Error messages for the failed data pipelines are published to Pub/Sub topic (ERROR_TOPIC) created in Step 4 (Create Dead Letter Topic and Subscription). The errors from both cloud function and spark are forwarded to Pub/Sub. Pub/Sub topics might have multiple entries for the same data-pipeline instance. Messages in Pub/Sub topics can be filtered using the “oid” attribute. The attribute(oid) is unique for each pipeline run and holds a full object name with the generation id.

Conclusion

In this post, we’ve shown you how to ingest GCS files to BigQuery using Cloud Functions and Serverless Spark. Register interest here to request early access to the new solutions for Spark on Google Cloud.

Cloud BlogRead More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments