Dynamic custom arguments in Airflow: A step-by-step guide

Apache Airflow

With the flexibility Airflow offers, users can incorporate custom parameters into their DAGs to make them more dynamic and adaptable to varying requirements. This article walks you through the process of adding custom arguments to your DAG.

Setting up the Environment

Before diving in, ensure you have Airflow properly installed. If not, you can follow the installation guide here.

Begin with a Basic DAG

Let’s start by defining a simple DAG named freshers_viewership.

from datetime import datetime
from airflow import DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}
dag = DAG('freshers_viewership',
          default_args=default_args,
          description='A simple viewership DAG',
          schedule_interval='@daily',
          start_date=datetime(2023, 8, 27),
          catchup=False)

Dynamically Add Custom Arguments

To dynamically add custom arguments to our DAG, we can leverage Python’s functions and the power of Airflow’s Variable class.

from airflow.models import Variable
def fetch_custom_args():
    # Get custom args from Airflow variables, return as a dictionary
    custom_args = Variable.get("freshers_viewership_custom_args", deserialize_json=True, default_var=dict())
    return custom_args
# Merge custom args with default args
args_with_custom = {**default_args, **fetch_custom_args()}
# Update the default_args of the DAG
dag.default_args = args_with_custom

In this step, we are fetching custom arguments stored in Airflow’s metadata database under the variable freshers_viewership_custom_args.

Store Custom Arguments in Airflow Variables

To use the above setup:

Go to Airflow web server.
Navigate to the Admin tab and then Variables.
Add a new variable with the key freshers_viewership_custom_args and the desired JSON dictionary as its value. For instance:

{
  "email": ["your-email@example.com"],
  "email_on_failure": true
}

Implementing tasks in the DAG

With the dynamic arguments now integrated, you can add tasks to the DAG like any other Airflow DAG, using operators like PythonOperator, BashOperator, etc.

Read more on Airflow here :

Author: user

Leave a Reply