Wednesday, May 1, 2024
No menu items!
HomeCloud ComputingBuffer workflow executions with a Cloud Tasks queue

Buffer workflow executions with a Cloud Tasks queue

Introduction

In my previous post, I talked about how you can use a parent workflow to execute child workflows in parallel for faster overall processing time and easier detection of errors. Another useful pattern is to use a Cloud Tasks queue to create Workflows executions and that’s the topic of this post.

When your application experiences a sudden surge of traffic, it’s natural to want to handle the increased load by creating a high number of concurrent workflow executions. However, Google Cloud’s Workflows enforces quotas to prevent abuse and ensure fair resource allocation. These quotas limit the maximum number of concurrent workflow executions per region, per project, for example, Workflows currently enforces a maximum of 2000 concurrent executions by default. Once this limit is reached, any new executions beyond the quota will fail with an HTTP 429 error.

A Cloud Tasks queue can help. Rather than creating Workflow executions directly, you can add Workflows execution tasks to the Cloud Tasks queue and let Cloud Tasks drain the queue at a rate that you define. This allows for better utilization of your workflow quota and ensures the smooth execution of workflows.

Let’s dive into how to set this up. 

Create a Cloud Tasks queue

We’ll start by creating a Cloud Tasks queue. The Cloud Tasks queue acts as a buffer between the parent workflow and the child workflows, allowing us to regulate the rate of executions.

Create the Cloud Tasks queue (initially with no dispatch rate limits) with the desired name and location:

code_block[StructValue([(u’code’, u’QUEUE=queue-workflow-childrnLOCATION=us-central1rnrngcloud tasks queues create $QUEUE –location=$LOCATION’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eb9e3d80790>)])]

Now that we have our queue in place, let’s proceed to set up the child workflow.

Create and deploy a child workflow

The child workflow performs a specific task and returns a result to the parent workflow.

Create workflow-child.yaml to define the child workflow:

code_block[StructValue([(u’code’, u’main:rn params: [args]rn steps:rn – init:rn assign:rn – iteration: ${args.iteration}rn – wait:rn call: sys.sleeprn args:rn seconds: 10rn – return_message:rn return: ${“Hello world” + iteration}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eb9e2b63f50>)])]

In this example, the child workflow receives an iteration argument from the parent workflow, simulates work by waiting for 10 seconds, and returns a string as the result.

Deploy the child workflow:

code_block[StructValue([(u’code’, u’gcloud workflows deploy workflow-child –source=workflow-child.yaml –location=$LOCATION’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eb9e1d6a710>)])]

Create and deploy a parent workflow

Next, create a parent workflow in workflow-parent.yaml.

The workflow assigns some constants first. Note that it’s referring to the child workflow and the queue name between the parent and child workflows:

code_block[StructValue([(u’code’, u’main:rn steps:rn – init:rn assign:rn – project_id: ${sys.get_env(“GOOGLE_CLOUD_PROJECT_ID”)}rn – project_number: ${sys.get_env(“GOOGLE_CLOUD_PROJECT_NUMBER”)}rn – location: ${sys.get_env(“GOOGLE_CLOUD_LOCATION”)}rn – workflow_child_name: “workflow-child”rn – queue_name: “queue-workflow-child”‘), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eb9e1d6ac50>)])]

In the next step, Workflows creates and adds a high number of tasks (whose body is an HTTP request to execute the child workflow) to the Cloud Tasks queue:

code_block[StructValue([(u’code’, u’- enqueue_tasks_to_execute_child_workflow:rn for:rn value: iterationrn range: [1, 100]rn steps:rn – iterate:rn assign:rn – data:rn iteration: ${iteration}rn – exec:rn # Need to wrap into argument for Workflows args.rn argument: ${json.encode_to_string(data)}rn – create_task_to_execute_child_workflow:rn call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.creatern args:rn parent: ${“projects/” + project_id + “/locations/” + location + “/queues/” + queue_name}rn body:rn task:rn httpRequest:rn body: ${base64.encode(json.encode(exec))}rn url: ${“https://workflowexecutions.googleapis.com/v1/projects/” + project_id + “/locations/” + location + “/workflows/” + workflow_child_name + “/executions”}rn oauthToken:rn serviceAccountEmail: ${project_number + “[email protected]”}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eb9e1d6a7d0>)])]

Note that task creation is a non-blocking call in Workflows. Cloud Tasks takes care of running those tasks to execute child workflows asynchronously.

Deploy the parent workflow:

code_block[StructValue([(u’code’, u’gcloud workflows deploy workflow-parent –source=workflow-parent.yaml –location=$LOCATION’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eb9e1131790>)])]

Execute the parent workflow with no dispatch rate limits

Time to execute the parent workflow:

code_block[StructValue([(u’code’, u’gcloud workflows run workflow-parent –location=$LOCATION’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eb9e3304990>)])]

As the parent workflow is running, you can see parallel executions of the child workflow, all executed roughly around the same:

In this case, 100 executions is a well under the concurrency limit for Workflows. Quota issues may arise if you submit 1000s of executions all at once. This is when Cloud Tasks queue and its rate limits become useful.

Execute the parent workflow with dispatch rate limits

Let’s now apply a rate limit to the Cloud Tasks queue. In this case, 1 dispatch per second:

code_block[StructValue([(u’code’, u’gcloud tasks queues update $QUEUE –max-dispatches-per-second=1 –location=$LOCATION’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eb9e110d9d0>)])]

Execute the parent workflow again:

code_block[StructValue([(u’code’, u’gcloud workflows run workflow-parent –location=$LOCATION’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eb9e3e17110>)])]

This time, you see a more smooth execution rate (1 execution request per second):

Summary

By introducing a Cloud Tasks queue before executing a workflow and playing with different dispatch rates and concurrency settings, you can better utilize your Workflows quota and stay below the limits without triggering unnecessary quota related failures. 

Check out the Buffer HTTP requests with Cloud Tasks codelab, if you want to get more hands-on experience with Cloud Tasks. As always, feel free to contact me on Twitter @meteatamel for any questions or feedback.

Related Article

Workflows executing other parallel workflows: A practical guide

Explore how you can configure Google Cloud Workflows to run parallel tasks by using parent and child workflows.

Read Article

Cloud BlogRead More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments