Thursday, December 5, 2024
No menu items!
HomeCloud ComputingReduce Airflow DAG parse times in Cloud Composer

Reduce Airflow DAG parse times in Cloud Composer

This comprehensive blog presents various approaches for monitoring, troubleshooting, and minimizing DAG parse times, leading to notable performance improvements in Cloud Composer / Airflow:

Increase environment scalability by efficiently handling larger workloads and accommodating more DAGs.

Improve environment stability by limiting the chance of task overlaps and resource contention. 

Enhance productivity and overall efficiency for developers through faster feedback loops and reduced processing time.

A low DAG parse time serves as a reliable indicator of a healthy Cloud Composer / Airflow environment

Getting started

What is an Airflow DAG?

An Airflow DAG (Directed Acyclic Graph) is a collection of tasks that are organized in a way that reflects their relationships and dependencies. DAGs are defined in Python scripts, and they are the core concept of Airflow.

A DAG defines four things:

The tasks that need to be run

The order in which the tasks need to be run

The dependencies between the tasks

The schedule for running the tasks

DAGs are a powerful way to define and manage complex workflows. They can be used to automate tasks, schedule tasks, and monitor the execution of tasks.

What is the Airflow Scheduler?

The Airflow Scheduler monitors all tasks and DAGs, then triggers the task instances once dependent tasks are complete. Once every 30 seconds by default, the Scheduler collects DAG parsing results and checks whether any active tasks can be triggered.

What is the DAG Processor? 

As of Airflow 2.3.0, the DAG Processor is separate from the Airflow Scheduler. For more information about this change, check out AIP-43 DAG Processor separation.

Monitoring and alerting

Monitoring DAG parse times

In Google Cloud console you can use the Monitoring page and the Logs tab to inspect DAG parse times. 

On Cloud Composer environment

Run the following commands to check DAG parse times on the Cloud Composer environment.:

code_block[StructValue([(u’code’, u’gcloud composer environments run $ENVIRONMENT_NAME \rn –location $LOCATION \rn dags report’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebec548d7d0>)])]

Locally using time command

code_block[StructValue([(u’code’, u’time python airflow/example_dags/example.py’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf205402d0>)])]

Make sure to run it several times in succession to account for caching effects. Compare the results before and after the optimization (in the same conditions – using the same machine, environment etc.) in order to assess the impact of any optimization.

Sample output:

code_block[StructValue([(u’code’, u’real 0m0.699srn user 0m0.590srn sys 0m0.108s’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20540510>)])]

The important metric is the “real time” – which tells you how long time it took to process the DAG. Note that when loading the file this way, you are starting a new interpreter so there is an initial loading time that is not present when Airflow parses the DAG. You can assess the time of initialization by running:

code_block[StructValue([(u’code’, u’time python -c’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20540b50>)])]

Result:

code_block[StructValue([(u’code’, u’real 0m0.073srn user 0m0.037srn sys 0m0.039s’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20fa80d0>)])]

In this case the initial interpreter startup time is ~ 0.07s which is about 10% of time needed to parse the example_python_operator.py above so the actual parsing time is about ~ 0.62 s for the example DAG.

What is an ideal parse time metric?

On the Monitoring dashboard, in the DAG Statistics section, observe graphs for the total DAG parse time. If the number exceeds about 10 seconds, your Schedulers might be overloaded with DAG parsing and cannot run DAGs effectively.

How can I receive alerts for long parse times?

You can create alerting policies to monitor the values of metrics and to notify you when those metrics violate a condition. This can also be done through the Composer Monitoring Dashboard.

DAG code optimization

Generalized DAG code improvements

Check out Optimize Cloud Composer via Better Airflow DAGs to view a generalized checklist of activities when authoring Apache Airflow DAGs.  These items follow best practices determined by Google Cloud and the open source community.  A collection of performant DAGs will enable Cloud Composer to work optimally and standardized authoring will help developers manage hundreds or thousands of DAGs.  Each item will benefit your Cloud Composer environment and your development process. The two highest priorities should be limiting top-level code and avoiding the use of variables/xcoms in top-level code.

Limit top-level code

Follow established best practices. You should avoid writing the top level code which is not necessary to create Operators and build DAG relations between them. This is because of the design decision for the Scheduler of Airflow and the impact the top-level code parsing speed on both performance and scalability of Airflow.

One of the important factors impacting DAG loading time, that might be overlooked by Python developers is that top-level imports might take surprisingly a lot of time (in the order of seconds) and they can generate a lot of overhead and this can be easily avoided by converting them to local imports inside Python callables for example.

Avoid the use of Variables and Xcoms in top-level code

If you are using Variable.get() in top level code, every time the .py file is parsed, Airflow executes a Variable.get() which opens a session to the DB. This can dramatically slow down parse times. 

Use JSON dictionaries or Jinja templates as values if absolutely necessary. (one connection for many values inside dict)

DAG folder cleanup

Remove unused DAGs, unnecessary files from the DAGs folder

Airflow Scheduler wastes time and resources parsing files in DAGs folder that aren’t used.

Use .airflowignore

 An .airflowignore file specifies the directories or files in DAG_FOLDER or PLUGINS_FOLDER that Airflow should intentionally ignore. Airflow supports two syntax flavors for patterns in the file, as specified by the DAG_IGNORE_FILE_SYNTAX configuration parameter (added in Airflow 2.3): regexp and glob.

More files ignored = less files parsed by Airflow Scheduler.

Review paused DAGs

Paused DAGs are still continuously parsed by the Airflow Scheduler.  Determine why each DAG is paused and whether it should be removed, ignored, or unpaused.

Airflow configurations

min_file_process_interval

The Scheduler parses your DAG files every min_file_process_interval number of seconds. Airflow starts using your updated DAG code only after this interval ends.

Consider increasing this interval when you have a high number of DAGs that do not change too often, or observe a high Scheduler load in general. Consider decreasing this interval to parse your DAGs faster. Updates to DAGs are reflected after this interval. Keeping this number low will increase CPU usage.

For example, if you have >1000 dag files, raise the min_file_process_interval to 600 (10 minutes), 6000 (100 minutes), or a higher value.

dag_dir_list_interval

Dag_dir_list_interval determines how often Airflow should scan the DAGs directory in seconds. A lower value here means that new DAGs will be processed faster, but this comes at the cost of CPU usage.

Increasing the DAG directory listing interval reduces the Scheduler load associated with discovery of new DAGs in the environment’s bucket. Consider increasing this interval if you deploy new DAGs infrequently. Consider decreasing this interval if you want Airflow to react faster to newly deployed DAG files.

parsing_processes

The DAG Processor can run multiple processes in parallel to parse DAGs, and parsing_processes (formerly max_threads) determines how many of those processes can run in parallel. Increasing this value can help to serialize DAGs if you have a large number of them. By default, this is set to 2.

code_block[StructValue([(u’code’, u'[scheduler]rnparsing_processes = <NUMBER_OF_CORES_IN_MACHINE – 1>’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20fa81d0>)])]

file_parsing_sort_mode 

Evaluate the following file_parsing_sort_mode options if you are running more than one Airflow Scheduler. The Scheduler will list and sort the dag files to decide the parsing order.

modified_time: Sort by modified time of the files. This is useful on a large scale to parse the recently modified DAGs first. (default)

random_seeded_by_host: Sort randomly across multiple Schedulers but with the same order on the same host. This is useful when running with Scheduler in HA mode where each Scheduler can parse different DAG files.

alphabetical: Sort by filename

When there are a lot (>1000) of dags files, you can prioritize parsing of new files by changing the file_parsing_sort_mode to modified_time.

Cloud Composer upgrades

If you’ve gotten this far and still observe long DAG parse times, you’ll need to consider adding more resources to your Cloud Composer Environment. Note: this will add to the overall cost of your Cloud Composer environment.

Change/Increase the number of Airflow Schedulers

Adjusting the number of Schedulers improves the Scheduler capacity and resilience of Airflow scheduling. Caution: Don’t configure more than three Airflow Schedulers in your Cloud Composer environment without special consideration.

If you increase the number of Schedulers, this increases the traffic to and from the Airflow database. We recommend using two Airflow Schedulers in most scenarios.

Increase CPU/Memory of Airflow Schedulers

You can specify the amount of CPUs, memory, and disk space used by your environment. In this way, you can increase performance of your environment, in addition to horizontal scaling provided by using multiple workers and Schedulers.

Conclusion

By following these next steps, you can maximize the benefits of Cloud Composer / Airflow, enhance the performance of your environment, and create a smoother development experience.

Cloud BlogRead More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments