Airflow : Optimizing Airflow: Efficient resource clean-up techniques and code

Apache Airflow

Airflow is an open-source platform used to programmatically author, schedule and monitor workflows. It’s known for its rich feature set, extensibility, and active community.

Despite these advantages, handling resources effectively in Airflow can sometimes be a challenge. Cleaning up and maintaining a healthy Airflow environment is crucial to prevent bottlenecks and ensure the smooth execution of tasks. This article presents efficient methods to clean up your resources in Apache Airflow, complete with sample codes for easy implementation.

1. DAG Runs and Task Instances Clean-up

Over time, as the number of DAG runs and task instances increases, it might be necessary to clean them up to free up resources and maintain a tidy database. You can automate this process by creating a maintenance DAG that cleans up these older entries.

Here is an example of how you can use a PythonOperator to remove old DAG runs:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow import settings
from datetime import datetime, timedelta
from sqlalchemy import and_
def cleanup_dagruns(session=None):
    MAX_AGE = timedelta(days=30)  
    now = datetime.now()
    session.query(DagRun).filter(and_(DagRun.execution_date <= (now - MAX_AGE))).delete(synchronize_session=False)
dag = DAG('clean_up', schedule_interval="@daily", start_date=datetime(2023, 1, 1))

clean_up = PythonOperator(
    task_id='clean_up',
    python_callable=cleanup_dagruns,
    dag=dag)

In this code, we define a maintenance DAG that runs every day. It calls a Python function, cleanup_dagruns, which deletes all DAG runs older than 30 days.

2. Log Files Clean-up

Airflow creates log files for all the tasks it runs. Over time, these log files can consume a significant amount of disk space. To address this, you can configure a maintenance DAG to clean up old log files.

import os
import shutil
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
BASE_LOG_FOLDER = "/path/to/logs"
def cleanup_logs():
    MAX_AGE = timedelta(days=30)  
    now = datetime.now()
    for dirpath, dirnames, filenames in os.walk(BASE_LOG_FOLDER):
        for file in filenames:
            file_path = os.path.join(dirpath, file)
            if os.path.getmtime(file_path) <= (now - MAX_AGE).timestamp():
                os.remove(file_path)
dag = DAG('clean_up_logs', schedule_interval="@daily", start_date=datetime(2023, 1, 1))
clean_up_logs = PythonOperator(
    task_id='clean_up_logs',
    python_callable=cleanup_logs,
    dag=dag)

This code walks through the log directory and removes any file that hasn’t been modified in the last 30 days.

3. Cleaning up Zombie Tasks

Sometimes, tasks might hang around without making any progress, leading to resource wastage. To detect and clean up such “zombie tasks”, you can set a timeout for your tasks. If the task doesn’t complete within the given time, Airflow will terminate it.

Here is how you can set up a timeout for your tasks:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
dag = DAG('handle_zombies', start_date=datetime(2023, 1, 1))
task_with_timeout = DummyOperator(
    task_id='task_with_timeout',
    dag=dag,
    execution_timeout=timedelta(minutes=30))  # task is killed after 30 minutes

This code adds an execution_timeout parameter to your tasks. If a task doesn’t complete within the given time, it will be killed, freeing up the resources it was using.

Cleaning up resources in Apache Airflow is vital to ensure optimal performance. By taking care of old DAG runs, cleaning up log files, and handling zombie tasks, you can maintain a healthy and efficient Airflow environment.

Read more on Airflow here :

Author: user

Leave a Reply