Saturday, May 11, 2024
No menu items!
HomeCloud ComputingWorkflows executing other parallel workflows: A practical guide

Workflows executing other parallel workflows: A practical guide

Introduction

There are numerous scenarios where you might want to execute tasks in parallel. One common use case involves dividing data into batches, processing each batch in parallel, and combining the results in the end. This approach not only enhances the speed of the overall processing but it also allows for easier error detection in smaller tasks. 

On the other hand, setting up parallel tasks, monitoring them, handling errors in each task, and combining the results in the end is not trivial. Thankfully, Google Cloud’s Workflows can help. In this post, we will explore how you can use a parent workflow to set up and execute parallel child workflows.

Let’s get started!

Setting up the child workflow

To begin, let’s create a child workflow that will serve as the foundation for our parallel execution. 

The child workflow receives arguments from the parent workflow. In our example, we’ll use a simple iteration integer, but in real-world scenarios, it could represent a data chunk passed from the parent workflow.

code_block[StructValue([(u’code’, u’main:rn params: [args]rn steps:rn – init:rn assign:rn – iteration : ${args.iteration}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eea0b6c3550>)])]

The child workflow starts performing some work. In this example, it simply waits 10 seconds to simulate doing some work.

code_block[StructValue([(u’code’, u’- wait:rn call: sys.sleeprn args:rn seconds: 10′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eea0b6c3e10>)])]

Afterwards, it returns the result or failure of the work. In this case, it just uses whether the iteration is even or odd to simulate success and failure:

code_block[StructValue([(u’code’, u’- check_iteration_even_or_odd:rn switch:rn – condition: ${iteration % 2 == 0}rn next: raise_errorrn – return_message:rn return: ${“Hello world”+iteration}rn – raise_error:rn raise: ${“Error with iteration “+iteration}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eea0b62c290>)])]

You can see the full definition in the workflow-child.yaml file. Deploy the child workflow:

code_block[StructValue([(u’code’, u’gcloud workflows deploy workflow-child –source=workflow-child.yaml’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eea0b65a950>)])]

Setting up the parent workflow

Now, let’s create the parent workflow, which orchestrates the parallel execution of the child workflows. The parent workflow starts by initializing a map to store the results of successful and failed executions.

code_block[StructValue([(u’code’, u”main:rn steps:rn – init:rn assign:rn – execution_results: {} # results from each executionrn – execution_results.success: {} # successful executions saved under ‘success’ keyrn – execution_results.failure: {} # failed executions saved under ‘failure’ key”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eea0b65a990>)])]

Next, the parent workflow employs a parallel for-loop to execute the child workflows with data chunks. In our example, we pass integers from 1 to 4 to simulate data. As each iteration is independent, we parallelize them using the parallel keyword. Note that each for-loop iteration spins up a thread and the for-loop is not waiting for a response before proceeding with the next iteration.

code_block[StructValue([(u’code’, u’- execute_child_workflows:rn parallel:rn shared: [execution_results]rn for:rn value: iterationrn in: [1, 2, 3, 4]rn steps:rn – iterate:’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eea0b652f50>)])]

Within each iteration, the child workflow is executed with the iteration argument. The parent workflow then waits for the success or failure of the child workflow execution and captures the results/failures in the map.

code_block[StructValue([(u’code’, u’try:rn steps:rn – execute_child_workflow:rn call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.runrn args:rn workflow_id: workflow-childrn #location: …rn #project_id: …rn argument:rn iteration: ${iteration}rn result: execution_resultrn – save_successful_execution:rn assign:rn – execution_results.success[string(iteration)]: ${execution_result}rn except:rn as: ern steps:rn – save_failed_execution:rn assign:rn – execution_results.failure[string(iteration)]: ${e}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eea0b652590>)])]

Finally, the parent workflow returns the results/failures map.

code_block[StructValue([(u’code’, u’- return_execution_results:rn return: ${execution_results}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eea0b652150>)])]

You can see the full definition in workflow-parent.yaml file. Deploy the parent workflow:

code_block[StructValue([(u’code’, u’gcloud workflows deploy workflow-parent –source=workflow-parent.yaml’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eea0b631310>)])]

Execute the workflow

With both workflows deployed, it’s time to execute the parent workflow:

code_block[StructValue([(u’code’, u’gcloud workflows run workflow-parent’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3eea0b631410>)])]

As the parent workflow runs, you will observe four parallel executions of the child workflow. Each child workflow represents a different batch of data being processed simultaneously.

Since they all run in parallel, after 10 seconds, you should see 2 of them succeeded and 2 failed:

The parent workflow displays the results of the successful executions:

And the errors of the failed executions:

At this point, the parent workflow has the option to retry the failed executions or proceed with the successful ones, depending on your requirements.

Summary

By dividing data into batches and executing them simultaneously, we can enhance overall processing speed and detect failures more easily in each execution. In this post, we explored how to implement parallel execution of workflows and combining the results using Google Cloud Workflows. 

Check out the following video on more information on parallel steps in Workflows:

And as always, feel free to contact me on Twitter @meteatamel for any questions or feedback.

Cloud BlogRead More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments