Conditional triggering of Airflow DAG runs based on previous run status : Hold the DAG until the previous failure is fixed

Apache Airflow

This Apache Airflow tutorial demonstrates how to implement conditional triggering of DAG runs based on the status of the previous run. In this setup, the DAG is scheduled to run at regular intervals (e.g., every 5 minutes), but it will only trigger the next run if the previous run succeeded. If the previous run failed, it will skip the next run until the issue is resolved. This approach helps ensure that the DAG continues to run only when the previous execution was successful, preventing unnecessary runs in case of failures.

In Apache Airflow, you can control the scheduling and execution of DAGs using the TriggerDagRunOperator. To achieve the behavior you described, where a DAG should only be triggered again after it has been fixed, you can implement a mechanism to check for the previous run’s status and conditionally trigger the next run only if the previous run succeeded. Here’s how you can do it:

  1. Create a custom operator or Python callable that checks the status of the previous DAG run.
  2. In your DAG definition, use this custom operator or callable to check the status of the previous run.
  3. If the previous run succeeded, trigger the new run using the TriggerDagRunOperator. If it failed, do not trigger the new run.

Here’s an example of how you can structure your DAG to achieve this:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

def check_previous_run_status(context):
    # Use context to access the previous run's status
    prev_run_status = context.get_previous_dagrun().get_state()
    
    if prev_run_status == "success":
        return "trigger_next_dag_run"
    else:
        return "skip_next_dag_run"

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'schedule_interval': '*/5 * * * *',  # Run every 5 minutes
}

dag = DAG(
    'my_dag',
    default_args=default_args,
    catchup=False,
    max_active_runs=1,  # Ensure only one instance is running at a time
    is_paused_upon_creation=False,
)

start = DummyOperator(task_id='start', dag=dag)

check_run_status = PythonOperator(
    task_id='check_previous_run_status',
    python_callable=check_previous_run_status,
    provide_context=True,
    dag=dag,
)

trigger_next_run = TriggerDagRunOperator(
    task_id='trigger_next_run',
    trigger_dag_id='my_dag',  # This triggers the same DAG
    dag=dag,
    allowed_states=["trigger_next_dag_run"],
)

skip_next_run = DummyOperator(
    task_id='skip_next_run',
    dag=dag,
)

start >> check_run_status >> [trigger_next_run, skip_next_run]

 

The check_previous_run_status PythonOperator checks the status of the previous DAG run.

Depending on the previous run’s status, it either triggers the next run using the TriggerDagRunOperator or skips the next run using a DummyOperator.

The DAG runs every 5 minutes, but it will only trigger the next run if the previous run succeeded. If the previous run failed, it will skip the next run until the issue is resolved.

Read more on Airflow here :

Author: user