Skip to main content

Airflow Core Concepts: DAGs, Tasks, and Dependencies

Module Level: Core Foundation Prerequisites: Module 1 (Airflow Overview), Python basics Duration: 3-4 hours Key Concepts: DAG definition, TaskFlow API, dependencies, dynamic generation

DAGs: The Workflow Container

A DAG (Directed Acyclic Graph) is the fundamental concept in Airflow. It defines the workflow structure, schedule, and execution parameters.

DAG Anatomy

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# DAG configuration parameters
default_args = {
    'owner': 'data_team',           # Who owns this DAG
    'depends_on_past': False,        # Don't wait for previous runs
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,                    # Retry failed tasks 3 times
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2)  # Kill task after 2 hours
}

# Three ways to define a DAG
# Modern, clean syntax with automatic task registration

with DAG(
    dag_id='my_pipeline',                    # REQUIRED: Unique identifier
    default_args=default_args,
    description='ETL pipeline for customer data',
    schedule='@daily',                       # REQUIRED: When to run
    start_date=datetime(2024, 1, 1),        # REQUIRED: When DAG becomes active
    catchup=False,                           # Don't backfill past runs
    max_active_runs=1,                       # Only 1 concurrent DAG run
    tags=['production', 'etl', 'customers'], # Organizational tags
    dagrun_timeout=timedelta(hours=4),       # Kill entire DAG run after 4h
    doc_md="""
    # Customer ETL Pipeline

    This pipeline extracts customer data from Postgres,
    transforms it with dbt, and loads to Snowflake.

    **Owner**: Data Platform Team
    **SLA**: Must complete by 6 AM
    """
) as dag:

    # Tasks automatically register to this DAG
    extract = PythonOperator(task_id='extract', python_callable=extract_func)
    transform = PythonOperator(task_id='transform', python_callable=transform_func)

    extract >> transform

Method 2: Standard Constructor

# Explicit DAG object creation

dag = DAG(
    'my_pipeline',
    default_args=default_args,
    schedule='@daily',
    start_date=datetime(2024, 1, 1)
)

# Must explicitly pass dag parameter to each task
extract = PythonOperator(
    task_id='extract',
    python_callable=extract_func,
    dag=dag  # Explicit assignment
)

transform = PythonOperator(
    task_id='transform',
    python_callable=transform_func,
    dag=dag
)

extract >> transform

Method 3: Decorator Pattern (TaskFlow API)

# Modern Python decorators - most concise

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['taskflow']
)
def my_pipeline():
    """
    Pipeline using TaskFlow API
    Functions become tasks automatically
    """

    @task
    def extract():
        return {'data': [1, 2, 3]}

    @task
    def transform(data):
        return [x * 2 for x in data['data']]

    @task
    def load(data):
        print(f"Loading {data}")

    # XCom passing is automatic
    data = extract()
    transformed = transform(data)
    load(transformed)

# Must call function to register DAG
my_pipeline()

DAG Configuration Deep Dive

with DAG(
    dag_id='advanced_config',

    # ========== SCHEDULING ==========
    schedule='0 2 * * *',        # Cron: 2 AM daily
    # OR: schedule=timedelta(hours=6)  # Every 6 hours
    # OR: schedule='@hourly', '@daily', '@weekly', '@monthly'
    # OR: schedule=None  # Manual trigger only

    start_date=datetime(2024, 1, 1),  # When DAG activates
    end_date=datetime(2024, 12, 31),  # When DAG deactivates (optional)

    catchup=True,  # Backfill runs between start_date and now
    # If True and start_date is 30 days ago, creates 30 DAG runs immediately

    # ========== CONCURRENCY ==========
    max_active_runs=3,  # Max concurrent DAG runs
    max_active_tasks=10,  # Max concurrent tasks across all DAG runs
    concurrency=10,  # (Deprecated) Use max_active_tasks

    # ========== TIMEOUTS ==========
    dagrun_timeout=timedelta(hours=2),  # Kill DAG run if exceeds
    execution_timeout=timedelta(minutes=30),  # Default task timeout

    # ========== SLA & CALLBACKS ==========
    sla_miss_callback=notify_sla_miss,  # Function called on SLA breach
    on_success_callback=notify_success,
    on_failure_callback=notify_failure,

    # ========== ORGANIZATION ==========
    tags=['env:prod', 'team:data', 'priority:high'],
    owner_links={  # Links shown in UI
        'data_team': 'https://wiki.company.com/data-team',
        'on_call': 'https://pagerduty.com/...'
    },
    doc_md=__doc__,  # Documentation (supports Markdown)

    # ========== TASK DEFAULTS ==========
    default_args={
        'owner': 'data_team',
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'retry_exponential_backoff': True,
        'max_retry_delay': timedelta(hours=1),
        'email': ['[email protected]'],
        'email_on_failure': True,
        'email_on_retry': False,
        'depends_on_past': False,
        'wait_for_downstream': False,
        'pool': 'default_pool',  # Resource pool
        'priority_weight': 1,  # Higher = runs first
        'queue': 'default',  # Celery queue
        'sla': timedelta(hours=2),  # Task-level SLA
    },

    # ========== ACCESS CONTROL ==========
    access_control={
        'data_team': {'can_read', 'can_edit', 'can_delete'},
        'analysts': {'can_read'}
    },

    # ========== EXPERIMENTAL ==========
    is_paused_upon_creation=True,  # Start paused
    render_template_as_native_obj=True,  # Return native Python types from templates

) as dag:
    pass

Understanding execution_date vs logical_date

"""
CRITICAL CONCEPT: execution_date is NOT when the task runs!

execution_date = logical_date = start of the data interval
actual_run_time = when the task actually executes

Example: Daily DAG scheduled for 2 AM
- execution_date: 2024-01-15 00:00:00 (start of day)
- DAG runs at: 2024-01-16 02:00:00 (next day at 2 AM)
- Processes data for: 2024-01-15 (the execution_date)
"""

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule='@daily',  # Runs at midnight
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def date_concepts():

    @task
    def show_dates(**context):
        print(f"execution_date (logical_date): {context['execution_date']}")
        print(f"Next execution_date: {context['next_execution_date']}")
        print(f"Previous execution_date: {context['prev_execution_date']}")
        print(f"Current time: {datetime.now()}")

        # Common date macros
        print(f"ds (YYYY-MM-DD): {context['ds']}")  # execution_date as string
        print(f"ds_nodash: {context['ds_nodash']}")  # 20240115
        print(f"ts (timestamp): {context['ts']}")     # Full ISO timestamp

        """
        Output when run on Jan 16, 2024 at 12:05 AM:
        execution_date: 2024-01-15 00:00:00  (yesterday!)
        Next execution_date: 2024-01-16 00:00:00
        Previous execution_date: 2024-01-14 00:00:00
        Current time: 2024-01-16 00:05:12
        ds: 2024-01-15
        """

    show_dates()

date_concepts()

Tasks: The Work Units

Tasks are individual units of work within a DAG. Each task is an instance of an Operator.

Task Definition Patterns

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG('task_patterns', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    # ========== PATTERN 1: Simple Task ==========
    def simple_function():
        print("Hello Airflow!")

    simple_task = PythonOperator(
        task_id='simple',
        python_callable=simple_function
    )

    # ========== PATTERN 2: Task with Arguments ==========
    def process_data(file_path, output_format):
        print(f"Processing {file_path} to {output_format}")

    task_with_args = PythonOperator(
        task_id='with_args',
        python_callable=process_data,
        op_kwargs={
            'file_path': '/data/input.csv',
            'output_format': 'parquet'
        }
    )

    # ========== PATTERN 3: Task with Context ==========
    def use_context(**context):
        # Access Airflow context variables
        execution_date = context['execution_date']
        dag_id = context['dag'].dag_id
        task_id = context['task'].task_id

        print(f"Running {dag_id}.{task_id} for {execution_date}")

        # Pull XCom from another task
        previous_result = context['ti'].xcom_pull(task_ids='previous_task')

    context_task = PythonOperator(
        task_id='with_context',
        python_callable=use_context,
        provide_context=True  # Deprecated in Airflow 2.0+, always True
    )

    # ========== PATTERN 4: Task with Templating ==========
    bash_task = BashOperator(
        task_id='templated',
        bash_command="""
        echo "Processing data for {{ ds }}"
        python /scripts/process.py \
            --date {{ ds }} \
            --output /data/{{ ds }}/output.csv
        """,
        env={
            'EXECUTION_DATE': '{{ ds }}',
            'DAG_ID': '{{ dag.dag_id }}'
        }
    )

    # ========== PATTERN 5: Task with Retries & Timeouts ==========
    critical_task = PythonOperator(
        task_id='critical',
        python_callable=simple_function,
        retries=5,
        retry_delay=timedelta(minutes=10),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(hours=1),
        execution_timeout=timedelta(minutes=30),
        email_on_failure=True,
        email=['[email protected]']
    )

    # ========== PATTERN 6: Task with Pool & Priority ==========
    # Pools limit concurrency for resource management
    pooled_task = PythonOperator(
        task_id='pooled',
        python_callable=simple_function,
        pool='database_connections',  # Created in UI: Admin → Pools
        pool_slots=2,  # Uses 2 slots from pool
        priority_weight=10,  # Higher priority = runs first
        weight_rule='downstream'  # Consider downstream tasks for priority
    )

Task Configuration Options

with DAG('task_config', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    configured_task = PythonOperator(
        # ========== IDENTIFICATION ==========
        task_id='my_task',  # REQUIRED: Unique within DAG
        owner='data_team',

        # ========== EXECUTION ==========
        python_callable=my_function,
        op_args=[arg1, arg2],  # Positional arguments
        op_kwargs={'key': 'value'},  # Keyword arguments

        # ========== RETRY BEHAVIOR ==========
        retries=3,
        retry_delay=timedelta(minutes=5),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(hours=1),

        # ========== TIMEOUTS ==========
        execution_timeout=timedelta(hours=2),
        timeout=timedelta(hours=2),  # Alias for execution_timeout

        # ========== DEPENDENCIES ==========
        depends_on_past=False,  # Wait for previous run's same task
        wait_for_downstream=False,  # Previous run's downstream tasks must complete
        trigger_rule='all_success',  # When to trigger (see below)

        # ========== RESOURCE MANAGEMENT ==========
        pool='default_pool',
        pool_slots=1,
        priority_weight=1,
        weight_rule='downstream',  # 'downstream', 'upstream', 'absolute'
        queue='default',  # Celery queue

        # ========== SLA & NOTIFICATIONS ==========
        sla=timedelta(hours=2),
        email=['[email protected]'],
        email_on_failure=True,
        email_on_retry=False,
        on_failure_callback=lambda context: notify_slack(context),
        on_success_callback=None,
        on_retry_callback=None,

        # ========== TASK GROUPS ==========
        task_group=None,  # Parent task group

        # ========== RENDERING ==========
        do_xcom_push=True,  # Push return value to XCom
        multiple_outputs=False,  # TaskFlow: return dict pushes multiple XComs

        # ========== UI ==========
        doc_md="Task documentation in Markdown",
        ui_color='#ffefeb',  # Task color in graph view
        ui_fgcolor='#000'    # Text color
    )

TaskFlow API: Modern Airflow

The TaskFlow API (introduced in Airflow 2.0) simplifies DAG authoring with decorators and automatic XCom handling.

Traditional vs TaskFlow

# ========== TRADITIONAL APPROACH ==========

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    data = {'value': 42}
    return data  # Automatically pushed to XCom

def transform_data(**context):
    ti = context['ti']
    data = ti.xcom_pull(task_ids='extract')  # Manual XCom pull
    transformed = data['value'] * 2
    return transformed

def load_data(**context):
    ti = context['ti']
    result = ti.xcom_pull(task_ids='transform')
    print(f"Loading {result}")

with DAG('traditional', start_date=datetime(2024, 1, 1), schedule=None) as dag:
    extract = PythonOperator(task_id='extract', python_callable=extract_data)
    transform = PythonOperator(task_id='transform', python_callable=transform_data)
    load = PythonOperator(task_id='load', python_callable=load_data)

    extract >> transform >> load


# ========== TASKFLOW APPROACH ==========

from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def taskflow_example():

    @task
    def extract():
        return {'value': 42}

    @task
    def transform(data: dict) -> int:  # Type hints for clarity
        return data['value'] * 2

    @task
    def load(result: int):
        print(f"Loading {result}")

    # XCom passing is automatic based on return/parameters
    data = extract()
    result = transform(data)
    load(result)

taskflow_example()

TaskFlow Advanced Patterns

from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, List

@dag(start_date=datetime(2024, 1, 1), schedule='@daily', catchup=False)
def advanced_taskflow():

    # ========== PATTERN 1: Multiple Outputs ==========
    @task(multiple_outputs=True)
    def extract_multiple() -> Dict[str, any]:
        """
        Returns dict - each key becomes separate XCom
        Accessible as: extract_multiple()['customers']
        """
        return {
            'customers': [1, 2, 3],
            'orders': [10, 20, 30],
            'metadata': {'count': 6}
        }

    # ========== PATTERN 2: Task Configuration ==========
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        execution_timeout=timedelta(hours=1),
        pool='api_calls',
        email_on_failure=True
    )
    def configured_task():
        return "Done"

    # ========== PATTERN 3: Virtualenv Isolation ==========
    @task.virtualenv(
        requirements=['pandas==2.0.0', 'numpy==1.24.0'],
        system_site_packages=False  # Isolated environment
    )
    def analyze_with_pandas():
        import pandas as pd
        import numpy as np

        df = pd.DataFrame({'A': [1, 2, 3]})
        return df.sum().to_dict()

    # ========== PATTERN 4: Docker Isolation ==========
    @task.docker(
        image='python:3.11-slim',
        api_version='auto',
        auto_remove=True,
        mount_tmp_dir=False
    )
    def run_in_container():
        return "Executed in Docker"

    # ========== PATTERN 5: Task Groups ==========
    from airflow.utils.task_group import TaskGroup

    @task
    def process_customer(customer_id: int):
        return f"Processed customer {customer_id}"

    with TaskGroup('process_customers') as customer_group:
        customers = [1, 2, 3, 4, 5]
        tasks = [process_customer.override(task_id=f'customer_{i}')(i) for i in customers]

    # ========== PATTERN 6: Dynamic Task Mapping (Airflow 2.3+) ==========
    @task
    def get_customer_ids():
        return [1, 2, 3, 4, 5]

    @task
    def process_single_customer(customer_id: int):
        print(f"Processing customer {customer_id}")
        return customer_id * 10

    @task
    def aggregate_results(results: List[int]):
        print(f"Total: {sum(results)}")

    # Dynamic task mapping - creates 5 parallel tasks automatically
    customer_ids = get_customer_ids()
    results = process_single_customer.expand(customer_id=customer_ids)
    aggregate_results(results)

    # ========== PATTERN 7: Branching ==========
    @task.branch
    def choose_branch(**context):
        """
        Returns task_id(s) to execute
        Other downstream tasks are skipped
        """
        hour = datetime.now().hour
        if hour < 12:
            return 'morning_task'
        else:
            return 'afternoon_task'

    @task
    def morning_task():
        print("Good morning!")

    @task
    def afternoon_task():
        print("Good afternoon!")

    @task(trigger_rule='none_failed_min_one_success')  # Runs after any branch
    def final_task():
        print("Pipeline complete")

    branch = choose_branch()
    morning = morning_task()
    afternoon = afternoon_task()
    final = final_task()

    branch >> [morning, afternoon] >> final

    # ========== PATTERN 8: Sensor Decorator ==========
    @task.sensor(poke_interval=30, timeout=3600, mode='poke')
    def wait_for_file(filepath: str) -> bool:
        """
        Returns True when condition met, False to keep waiting
        """
        import os
        return os.path.exists(filepath)

    wait_for_file('/data/input.csv')

advanced_taskflow()

TaskFlow with External Data

@dag(start_date=datetime(2024, 1, 1), schedule='@daily', catchup=False)
def etl_pipeline():

    @task
    def extract_from_api(**context):
        """Extract data from REST API"""
        import requests

        execution_date = context['ds']
        response = requests.get(
            'https://api.example.com/data',
            params={'date': execution_date}
        )
        response.raise_for_status()

        return response.json()

    @task
    def transform_data(raw_data: dict) -> dict:
        """Transform and enrich data"""
        records = raw_data['records']

        transformed = []
        for record in records:
            transformed.append({
                'id': record['id'],
                'value': record['amount'] * 1.1,  # Add 10%
                'category': record['type'].upper(),
                'processed_at': datetime.now().isoformat()
            })

        return {'records': transformed, 'count': len(transformed)}

    @task
    def validate_data(data: dict) -> dict:
        """Data quality checks"""
        records = data['records']

        # Check for required fields
        for record in records:
            assert 'id' in record, f"Missing id in {record}"
            assert record['value'] > 0, f"Invalid value: {record['value']}"

        # Check count
        assert data['count'] > 0, "No records to process"

        print(f"Validation passed: {data['count']} records")
        return data

    @task
    def load_to_database(data: dict):
        """Load to PostgreSQL"""
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        hook = PostgresHook(postgres_conn_id='my_postgres')
        conn = hook.get_conn()
        cursor = conn.cursor()

        insert_sql = """
        INSERT INTO processed_data (id, value, category, processed_at)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (id) DO UPDATE SET
            value = EXCLUDED.value,
            category = EXCLUDED.category,
            processed_at = EXCLUDED.processed_at
        """

        for record in data['records']:
            cursor.execute(insert_sql, (
                record['id'],
                record['value'],
                record['category'],
                record['processed_at']
            ))

        conn.commit()
        cursor.close()
        conn.close()

        print(f"Loaded {data['count']} records")

    # Define pipeline
    raw = extract_from_api()
    transformed = transform_data(raw)
    validated = validate_data(transformed)
    load_to_database(validated)

etl_pipeline()

Task Dependencies

Dependencies define the execution order of tasks in a DAG.

Dependency Operators

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG('dependencies', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    task_a = PythonOperator(task_id='task_a', python_callable=lambda: print('A'))
    task_b = PythonOperator(task_id='task_b', python_callable=lambda: print('B'))
    task_c = PythonOperator(task_id='task_c', python_callable=lambda: print('C'))
    task_d = PythonOperator(task_id='task_d', python_callable=lambda: print('D'))
    task_e = PythonOperator(task_id='task_e', python_callable=lambda: print('E'))

    # ========== LINEAR DEPENDENCIES ==========
    # A → B → C
    task_a >> task_b >> task_c

    # Equivalent syntax:
    task_a.set_downstream(task_b)
    task_b.set_downstream(task_c)

    # Or reverse:
    task_c << task_b << task_a

    # ========== FAN-OUT (Parallel) ==========
    #     ┌─→ B
    # A ──┼─→ C
    #     └─→ D
    task_a >> [task_b, task_c, task_d]

    # ========== FAN-IN (Join) ==========
    # B ──┐
    # C ──┼─→ E
    # D ──┘
    [task_b, task_c, task_d] >> task_e

    # ========== COMPLEX DEPENDENCIES ==========
    #     ┌─→ B ──┐
    # A ──┤       ├─→ D
    #     └─→ C ──┘
    task_a >> [task_b, task_c] >> task_d

    # ========== CROSS DEPENDENCIES ==========
    task_a >> task_b
    task_a >> task_c
    task_b >> task_d
    task_c >> task_d

    # ========== CHAIN HELPER ==========
    from airflow.models.baseoperator import chain

    # chain(A, B, C, D) equivalent to A >> B >> C >> D
    chain(task_a, task_b, task_c, task_d)

    # Multiple chains in parallel
    chain(task_a, [task_b, task_c], task_d)
    # A → B → D
    # A → C → D

    # ========== CROSS DOWNSTREAM ==========
    from airflow.models.baseoperator import cross_downstream

    # Connect all upstream to all downstream
    cross_downstream([task_a, task_b], [task_c, task_d])
    # A → C, A → D, B → C, B → D

Trigger Rules

Trigger rules determine when a task should run based on upstream task states.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime

with DAG('trigger_rules', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    # ========== ALL_SUCCESS (Default) ==========
    # Runs only if ALL upstream tasks succeeded
    task_default = PythonOperator(
        task_id='all_success',
        python_callable=lambda: print('All upstream succeeded'),
        trigger_rule='all_success'  # This is the default
    )

    # ========== ALL_FAILED ==========
    # Runs only if ALL upstream tasks failed
    task_all_failed = PythonOperator(
        task_id='all_failed',
        python_callable=lambda: print('All upstream failed'),
        trigger_rule='all_failed'
    )

    # ========== ALL_DONE ==========
    # Runs when ALL upstream tasks finished (success or failed)
    # Useful for cleanup tasks
    cleanup = PythonOperator(
        task_id='cleanup',
        python_callable=lambda: print('Cleaning up...'),
        trigger_rule='all_done'
    )

    # ========== ONE_SUCCESS ==========
    # Runs if AT LEAST ONE upstream task succeeded
    task_one_success = PythonOperator(
        task_id='one_success',
        python_callable=lambda: print('At least one succeeded'),
        trigger_rule='one_success'
    )

    # ========== ONE_FAILED ==========
    # Runs if AT LEAST ONE upstream task failed
    alert_failure = PythonOperator(
        task_id='alert_failure',
        python_callable=lambda: print('Alert: failure detected'),
        trigger_rule='one_failed'
    )

    # ========== NONE_FAILED ==========
    # Runs if NO upstream tasks failed (some may be skipped)
    # Useful after branching
    task_none_failed = PythonOperator(
        task_id='none_failed',
        python_callable=lambda: print('No failures'),
        trigger_rule='none_failed'
    )

    # ========== NONE_FAILED_MIN_ONE_SUCCESS ==========
    # Runs if no tasks failed AND at least one succeeded
    task_none_failed_min_success = PythonOperator(
        task_id='none_failed_min_success',
        python_callable=lambda: print('No failures, at least one success'),
        trigger_rule='none_failed_min_one_success'
    )

    # ========== NONE_SKIPPED ==========
    # Runs only if NO upstream tasks were skipped
    task_none_skipped = PythonOperator(
        task_id='none_skipped',
        python_callable=lambda: print('None skipped'),
        trigger_rule='none_skipped'
    )

    # ========== ALWAYS ==========
    # Always runs, regardless of upstream state
    # Useful for notifications, logging
    always_run = PythonOperator(
        task_id='always_run',
        python_callable=lambda: print('Always executes'),
        trigger_rule='always'
    )


# ========== PRACTICAL EXAMPLE: Error Handling Pipeline ==========

with DAG('error_handling', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    def task_that_might_fail():
        import random
        if random.random() < 0.5:
            raise Exception("Random failure!")
        return "Success"

    risky_task_1 = PythonOperator(task_id='risky_1', python_callable=task_that_might_fail)
    risky_task_2 = PythonOperator(task_id='risky_2', python_callable=task_that_might_fail)
    risky_task_3 = PythonOperator(task_id='risky_3', python_callable=task_that_might_fail)

    # Runs if at least one risky task succeeded
    partial_success = PythonOperator(
        task_id='partial_success',
        python_callable=lambda: print("At least partial success!"),
        trigger_rule='one_success'
    )

    # Runs if any task failed (for alerting)
    send_alert = PythonOperator(
        task_id='send_alert',
        python_callable=lambda: print("ALERT: Task failure detected"),
        trigger_rule='one_failed'
    )

    # Cleanup runs regardless of outcome
    cleanup = PythonOperator(
        task_id='cleanup',
        python_callable=lambda: print("Cleanup completed"),
        trigger_rule='all_done'
    )

    [risky_task_1, risky_task_2, risky_task_3] >> partial_success
    [risky_task_1, risky_task_2, risky_task_3] >> send_alert
    [risky_task_1, risky_task_2, risky_task_3] >> cleanup

Dynamic DAG Generation

Create DAGs programmatically based on configuration, databases, or external sources.

Pattern 1: Loop-Based Generation

# Generate similar DAGs for different environments

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

ENVIRONMENTS = ['dev', 'staging', 'prod']

def create_environment_dag(env):
    """Factory function to create DAG for each environment"""

    dag = DAG(
        dag_id=f'etl_{env}',
        start_date=datetime(2024, 1, 1),
        schedule='@daily' if env == 'prod' else None,
        catchup=False,
        tags=[env, 'etl']
    )

    with dag:
        extract = PythonOperator(
            task_id='extract',
            python_callable=lambda: print(f"Extracting for {env}")
        )

        transform = PythonOperator(
            task_id='transform',
            python_callable=lambda: print(f"Transforming for {env}")
        )

        load = PythonOperator(
            task_id='load',
            python_callable=lambda: print(f"Loading to {env}")
        )

        extract >> transform >> load

    return dag

# Create DAGs for all environments
for env in ENVIRONMENTS:
    dag_id = f'etl_{env}'
    globals()[dag_id] = create_environment_dag(env)

Pattern 2: Configuration-Driven DAGs

# Generate DAGs from YAML configuration

import yaml
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

# config.yaml
"""
pipelines:
  - name: sales_pipeline
    schedule: '@daily'
    tasks:
      - task_id: extract_sales
        type: bash
        command: python /scripts/extract_sales.py
      - task_id: transform_sales
        type: python
        function: transform_sales
      - task_id: load_sales
        type: bash
        command: python /scripts/load_sales.py

  - name: inventory_pipeline
    schedule: '@hourly'
    tasks:
      - task_id: sync_inventory
        type: bash
        command: python /scripts/sync_inventory.py
"""

def load_config():
    with open('/opt/airflow/config/pipelines.yaml') as f:
        return yaml.safe_load(f)

def create_dag_from_config(pipeline_config):
    dag = DAG(
        dag_id=pipeline_config['name'],
        start_date=datetime(2024, 1, 1),
        schedule=pipeline_config['schedule'],
        catchup=False
    )

    with dag:
        tasks = {}
        for task_config in pipeline_config['tasks']:
            if task_config['type'] == 'bash':
                task = BashOperator(
                    task_id=task_config['task_id'],
                    bash_command=task_config['command']
                )
            elif task_config['type'] == 'python':
                task = PythonOperator(
                    task_id=task_config['task_id'],
                    python_callable=globals()[task_config['function']]
                )

            tasks[task_config['task_id']] = task

        # Set dependencies (assume sequential for simplicity)
        task_list = list(tasks.values())
        for i in range(len(task_list) - 1):
            task_list[i] >> task_list[i + 1]

    return dag

# Generate DAGs from config
config = load_config()
for pipeline in config['pipelines']:
    dag_id = pipeline['name']
    globals()[dag_id] = create_dag_from_config(pipeline)

Pattern 3: Database-Driven DAGs

# Generate DAGs from database table

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime

def get_pipeline_configs():
    """
    Fetch pipeline configurations from database

    Table: pipeline_configs
    Columns: name, schedule, source_table, destination_table, enabled
    """
    hook = PostgresHook(postgres_conn_id='metadata_db')
    sql = "SELECT name, schedule, source_table, destination_table FROM pipeline_configs WHERE enabled = true"
    return hook.get_records(sql)

def create_etl_dag(name, schedule, source_table, dest_table):
    def extract_data(**context):
        hook = PostgresHook(postgres_conn_id='source_db')
        df = hook.get_pandas_df(f"SELECT * FROM {source_table} WHERE date = '{context['ds']}'")
        return df.to_dict('records')

    def load_data(data, **context):
        hook = PostgresHook(postgres_conn_id='dest_db')
        # Load logic here
        print(f"Loading {len(data)} records to {dest_table}")

    dag = DAG(
        dag_id=f'etl_{name}',
        start_date=datetime(2024, 1, 1),
        schedule=schedule,
        catchup=False,
        tags=['auto-generated', 'etl']
    )

    with dag:
        extract = PythonOperator(task_id='extract', python_callable=extract_data)
        load = PythonOperator(task_id='load', python_callable=load_data, op_kwargs={'data': extract.output})

        extract >> load

    return dag

# Generate DAGs from database
for name, schedule, source, dest in get_pipeline_configs():
    dag_id = f'etl_{name}'
    globals()[dag_id] = create_etl_dag(name, schedule, source, dest)

Pattern 4: Dynamic Task Generation

# Generate tasks dynamically based on runtime conditions

from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2024, 1, 1), schedule='@daily', catchup=False)
def dynamic_tasks():

    @task
    def get_tables_to_process():
        """
        Determine which tables need processing
        Could query database, read config, call API, etc.
        """
        return ['customers', 'orders', 'products', 'inventory']

    @task
    def process_table(table_name: str):
        """Process a single table"""
        print(f"Processing table: {table_name}")
        # ETL logic here
        return f"{table_name}_processed"

    @task
    def aggregate_results(processed_tables: list):
        """Combine results from all table processing"""
        print(f"Processed tables: {processed_tables}")

    # Dynamic task mapping (Airflow 2.3+)
    tables = get_tables_to_process()
    results = process_table.expand(table_name=tables)
    aggregate_results(results)

dynamic_tasks()

Task Groups

Organize tasks visually in the UI without affecting execution.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime

with DAG('task_groups', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    start = PythonOperator(task_id='start', python_callable=lambda: print('Start'))

    # ========== TASK GROUP 1: Data Extraction ==========
    with TaskGroup('extract_sources') as extract_group:
        extract_db1 = PythonOperator(task_id='extract_postgres', python_callable=lambda: print('Extract PG'))
        extract_db2 = PythonOperator(task_id='extract_mysql', python_callable=lambda: print('Extract MySQL'))
        extract_api = PythonOperator(task_id='extract_api', python_callable=lambda: print('Extract API'))

        # Dependencies within group
        [extract_db1, extract_db2, extract_api]

    # ========== TASK GROUP 2: Transformation ==========
    with TaskGroup('transform') as transform_group:
        clean_data = PythonOperator(task_id='clean', python_callable=lambda: print('Clean'))
        enrich_data = PythonOperator(task_id='enrich', python_callable=lambda: print('Enrich'))
        aggregate = PythonOperator(task_id='aggregate', python_callable=lambda: print('Aggregate'))

        clean_data >> enrich_data >> aggregate

    # ========== TASK GROUP 3: Loading ==========
    with TaskGroup('load_destinations') as load_group:
        load_warehouse = PythonOperator(task_id='load_snowflake', python_callable=lambda: print('Load Snowflake'))
        load_datalake = PythonOperator(task_id='load_s3', python_callable=lambda: print('Load S3'))

        [load_warehouse, load_datalake]

    end = PythonOperator(task_id='end', python_callable=lambda: print('End'))

    # High-level dependencies
    start >> extract_group >> transform_group >> load_group >> end


# ========== NESTED TASK GROUPS ==========

with DAG('nested_task_groups', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    with TaskGroup('processing') as processing:
        with TaskGroup('data_quality') as quality:
            check_nulls = PythonOperator(task_id='check_nulls', python_callable=lambda: print('Check nulls'))
            check_duplicates = PythonOperator(task_id='check_duplicates', python_callable=lambda: print('Check dupes'))

            check_nulls >> check_duplicates

        with TaskGroup('transformations') as transforms:
            normalize = PythonOperator(task_id='normalize', python_callable=lambda: print('Normalize'))
            denormalize = PythonOperator(task_id='denormalize', python_callable=lambda: print('Denormalize'))

            normalize >> denormalize

        quality >> transforms

Summary: Core Concepts Mastery

You now understand:
  • DAGs: Container for workflows with schedule and configuration
  • Tasks: Individual units of work (operator instances)
  • TaskFlow API: Modern decorator-based DAG authoring
  • Dependencies: Controlling task execution order
  • Trigger Rules: Conditional task execution based on upstream states
  • Dynamic Generation: Creating DAGs and tasks programmatically
  • Task Groups: Organizing tasks visually in the UI
Key Takeaways:
  1. Use TaskFlow API for new DAGs - cleaner syntax, automatic XCom
  2. Dependencies define execution order, not data flow (use XCom for data)
  3. Trigger rules enable complex conditional logic
  4. Dynamic DAGs reduce code duplication and enable configuration-driven pipelines
  5. Task groups improve UI organization without affecting execution

Next Steps

Now that you’ve mastered core concepts, let’s explore the vast ecosystem of Airflow operators.

Module 3: Operators - The Building Blocks

Learn built-in operators, create custom operators, and master best practices