Thursday, December 5, 2024
No menu items!
HomeCloud ComputingWorking with Incoming Data in Cloud Workflows

Working with Incoming Data in Cloud Workflows

In this post I will be looking at how we can use Google Workflows to orchestrate tasks and processes. I specifically want to detail how to process data within a Workflow. This is useful when looking to parse incoming data for information relevant to one or more next stages within the Workflow. 

Google Workflows allows the orchestration of hosted or custom services like Cloud Functions, Cloud Run, BiqQuery or any HTTP-based API, in a defined order of steps. 

Therefore, Workflows are useful where we want to control the flow of execution of steps. 

Workflows are also useful when we are looking to pass information like event data, error codes, response body, onto the other orchestrated services. 

But how do we capture and examine the incoming data to the Workflow and use it in the Workflow?

Setting the scene

Let’s consider a simple online flower store. In this scenario, our online store accepts orders for flowers – name, productID, quantity – and generates an orderID.

To process each order successfully in the system, we want to capture the orderID of an order of flowers, and pass that value on to any other services in our online store process. 

The sample order data we are looking to process is a simple one value array as follows:

code_block[StructValue([(u’code’, u'{“flowers”:[ rn {“name”:”roses”, “productID”:”rr001″, “quantity”:25, “orderID”: “001233”},]}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eece17aed90>)])]

To achieve this using Google Workflows we have set up the following architecture:

First we will publish our flower order to a Pub/Sub topic. This PubSub topic will then trigger a Workflow, using Eventarc. 

Then the input data – our flowers orders information – in the Pub/Sub message is processed by the Workflow. 

This can then be passed as input to other services orchestrated by the Workflow, for example a Cloud Function or a Cloud Run service

Let’s set this up by first creating the Pub/Sub topic, then the Workflow and finally the Eventarc trigger to link them together.

Create the Pub/Sub Topic

First let’s enable the relevant apis in cloud shell:

code_block[StructValue([(u’code’, u’gcloud services enable eventarc.googleapis.com pubsub.googleapis.com workflows.googleapis.com workflowexecutions.googleapis.com run.googleapis.com’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eece0f0dc50>)])]

Create the pub/sub topic

code_block[StructValue([(u’code’, u’gcloud pubsub topics create new_pubsub’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eecd188e1d0>)])]

where new_pusub is the name of your Pub/Sub topic

Building the Workflow

To create a Workflow, we need a configuration file. This is where all the logic for the Workflow steps and data processing we require is written.

Create a file called workflow.yaml and paste in the following content. Save the file.

code_block[StructValue([(u’code’, u’main:rn params: [event]rn steps:rn – init_vars:rn assign:rn – project: ${sys.get_env(“GOOGLE_CLOUD_PROJECT_ID”)}rn – inbound_message: “”rn – full-msg: “rn – input: “”rnrn – decode_pubsub_message: # decode incoming message rn assign:rn – inbound_message: ${base64.decode(event.data.message.data)}rn – full_msg: ${json.decode(text.decode(inbound_message))}rn – input: ${full_msg.flowers[0].orderID)}rnrn – finish_workflow: # End of Workflow and return the declared variablesrn return:rn – ${full_msg}rn – ${input}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eecd188ee90>)])]

Understanding the Workflow syntax

The Workflows service receives incoming events and converts it to a JSON object (following the CloudEvents specification). This is passed to the Workflow execution request as a runtime argument, and used in the Workflows workspace. 

Note the following key steps in our Workflow:

 params – runtime arguments passed to the Workflow as part of the execution request. This names the variable that stores the data – in this case ‘event’. 

init_vars – defining global variables

decode_pubsub_message – decoding and parsing the incoming event data. This is a key step for our use case and is described below.

finish_workflow – return output 

The data processing logic

This is built into the Decode_pubsub_message step in a few stages. Let’s go through each one:

Firstly, the ‘inbound_message’ variable is assigned the message data value of the decoded inbound event data ${base64.decode(event.data.message.data)};

The JSON format of the event is documented in the CloudEvents – JSON event format document.

Then the ‘full_msg’ variable is assigned the decoded json strings from the ‘inbound_message’ variable: ${json.decode(text.decode(inbound_message))}

Lastly the ‘input’ variable is assigned the orderID value that we want in the Workflow. Query the message array – ${‘inbound_message} for the first value – flowers[0] , and then for the orderID value: ${full_msg.flowers[0].orderID)}
This can then be used as input to steps listed in the Workflow. Here, we simply return this value as output of the Workflow ( ${input} ). 

So let’s create the Workflow

Create the workflow

code_block[StructValue([(u’code’, u’gcloud workflows deploy new-workflow –source=workflow.yaml’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eecd1a2c110>)])]

where new-workflow is the workflow name

OK, now we have looked at the required logic, and built our simple architecture, let’s see this in action.

Putting it all together

Create a service account used to invoke our workflow:

code_block[StructValue([(u’code’, u’gcloud iam service-accounts create workflow-sa –description=”workflow service account” –display-name=”workflow-sa”‘), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eecd1a2c290>)])]

Enable the relevant permissions:

code_block[StructValue([(u’code’, u’gcloud projects add-iam-policy-binding ${PROJECT_ID} \rn–member=”serviceAccount:workflow-sa@${PROJECT_ID}.iam.gserviceaccount.com” \rn –role=”roles/workflows.invoker” \rn –role=”roles/pubsub.publisher” \rn –role=”roles/logging.logWriter” \rn –role=”roles/eventarc.admin”‘), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eecd1a2c650>)])]

where ${PROJECT_ID} is the ID of your GCP project

Create an EventArc trigger that will invoke the Workflows execution when we publish a message to the PubSub topic. 

Create the Eventarc trigger

code_block[StructValue([(u’code’, u’gcloud eventarc triggers create events-pubsub-trigger –destination-workflow=new-workflow –event-filters=”type=google.cloud.pubsub.topic.v1.messagePublished” –service-account=”workflow-sa@${PROJECT_ID}.iam.gserviceaccount.com” –transport-topic=new_pubsub –location=europe-west1′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eecd1a2c4d0>)])]

where ${PROJECT_ID} is the ID of your GCP project

Now we have everything we need, let’s run it. 

Publish a message in the PubSub topic:

code_block[StructValue([(u’code’, u’gcloud pubsub topics publish new_pubsub –message ‘{“flowers”:[ rn {“name”:”roses”, “productID”:”rr001″, “quantity”:25, “orderID”: “001233”} ]}”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eecb7b46610>)])]

From the console open Workflows / new-workflow / Executions. Here we can see the successful execution of the Workflow:

We can also see the output of the message in the latest execution by clicking the execution ID link:

Note the output on the right hand side of the panel, showing our total flower order, and the separate orderID (001233).

So, to wrap up, we have passed input data to a Workflow, looked at the logic required to extract the input event data within the Workflow,, and finally returned this as output. The ability to share data in the Workflow workspace across all the steps within extends the ability of Cloud Workflow to orchestrate complex solutions, with end-to-end observability.

Learn more

Interested in finding out more about Workflows? Check out this Google Codelab Triggering Workflows with Eventarc; There are also lots of Workflow code examples on the All Workflows code samples page. Finally, a comprehensive tutorial on building event driven systems with Workflows and EventArc is available on YouTube  Build an event-driven orchestration with Eventarc and Workflows.

Related Article

Get to know Workflows, Google Cloud’s serverless orchestration engine

Google Cloud’s purpose-built Workflows tool lets you orchestrate complex, multi-step processes more effectively than general-purpose tools.

Read Article

Cloud BlogRead More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments