Airflow Core Concepts: DAGs, Tasks, and Dependencies
Module Level: Core Foundation
Prerequisites: Module 1 (Airflow Overview), Python basics
Duration: 3-4 hours
Key Concepts: DAG definition, TaskFlow API, dependencies, dynamic generation
DAGs: The Workflow Container
A DAG (Directed Acyclic Graph) is the fundamental concept in Airflow. It defines the workflow structure, schedule, and execution parameters.DAG Anatomy
Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# DAG configuration parameters
default_args = {
'owner': 'data_team', # Who owns this DAG
'depends_on_past': False, # Don't wait for previous runs
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3, # Retry failed tasks 3 times
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2) # Kill task after 2 hours
}
# Three ways to define a DAG
Method 1: Context Manager (Recommended)
Copy
# Modern, clean syntax with automatic task registration
with DAG(
dag_id='my_pipeline', # REQUIRED: Unique identifier
default_args=default_args,
description='ETL pipeline for customer data',
schedule='@daily', # REQUIRED: When to run
start_date=datetime(2024, 1, 1), # REQUIRED: When DAG becomes active
catchup=False, # Don't backfill past runs
max_active_runs=1, # Only 1 concurrent DAG run
tags=['production', 'etl', 'customers'], # Organizational tags
dagrun_timeout=timedelta(hours=4), # Kill entire DAG run after 4h
doc_md="""
# Customer ETL Pipeline
This pipeline extracts customer data from Postgres,
transforms it with dbt, and loads to Snowflake.
**Owner**: Data Platform Team
**SLA**: Must complete by 6 AM
"""
) as dag:
# Tasks automatically register to this DAG
extract = PythonOperator(task_id='extract', python_callable=extract_func)
transform = PythonOperator(task_id='transform', python_callable=transform_func)
extract >> transform
Method 2: Standard Constructor
Copy
# Explicit DAG object creation
dag = DAG(
'my_pipeline',
default_args=default_args,
schedule='@daily',
start_date=datetime(2024, 1, 1)
)
# Must explicitly pass dag parameter to each task
extract = PythonOperator(
task_id='extract',
python_callable=extract_func,
dag=dag # Explicit assignment
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_func,
dag=dag
)
extract >> transform
Method 3: Decorator Pattern (TaskFlow API)
Copy
# Modern Python decorators - most concise
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow']
)
def my_pipeline():
"""
Pipeline using TaskFlow API
Functions become tasks automatically
"""
@task
def extract():
return {'data': [1, 2, 3]}
@task
def transform(data):
return [x * 2 for x in data['data']]
@task
def load(data):
print(f"Loading {data}")
# XCom passing is automatic
data = extract()
transformed = transform(data)
load(transformed)
# Must call function to register DAG
my_pipeline()
DAG Configuration Deep Dive
Copy
with DAG(
dag_id='advanced_config',
# ========== SCHEDULING ==========
schedule='0 2 * * *', # Cron: 2 AM daily
# OR: schedule=timedelta(hours=6) # Every 6 hours
# OR: schedule='@hourly', '@daily', '@weekly', '@monthly'
# OR: schedule=None # Manual trigger only
start_date=datetime(2024, 1, 1), # When DAG activates
end_date=datetime(2024, 12, 31), # When DAG deactivates (optional)
catchup=True, # Backfill runs between start_date and now
# If True and start_date is 30 days ago, creates 30 DAG runs immediately
# ========== CONCURRENCY ==========
max_active_runs=3, # Max concurrent DAG runs
max_active_tasks=10, # Max concurrent tasks across all DAG runs
concurrency=10, # (Deprecated) Use max_active_tasks
# ========== TIMEOUTS ==========
dagrun_timeout=timedelta(hours=2), # Kill DAG run if exceeds
execution_timeout=timedelta(minutes=30), # Default task timeout
# ========== SLA & CALLBACKS ==========
sla_miss_callback=notify_sla_miss, # Function called on SLA breach
on_success_callback=notify_success,
on_failure_callback=notify_failure,
# ========== ORGANIZATION ==========
tags=['env:prod', 'team:data', 'priority:high'],
owner_links={ # Links shown in UI
'data_team': 'https://wiki.company.com/data-team',
'on_call': 'https://pagerduty.com/...'
},
doc_md=__doc__, # Documentation (supports Markdown)
# ========== TASK DEFAULTS ==========
default_args={
'owner': 'data_team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'depends_on_past': False,
'wait_for_downstream': False,
'pool': 'default_pool', # Resource pool
'priority_weight': 1, # Higher = runs first
'queue': 'default', # Celery queue
'sla': timedelta(hours=2), # Task-level SLA
},
# ========== ACCESS CONTROL ==========
access_control={
'data_team': {'can_read', 'can_edit', 'can_delete'},
'analysts': {'can_read'}
},
# ========== EXPERIMENTAL ==========
is_paused_upon_creation=True, # Start paused
render_template_as_native_obj=True, # Return native Python types from templates
) as dag:
pass
Understanding execution_date vs logical_date
Copy
"""
CRITICAL CONCEPT: execution_date is NOT when the task runs!
execution_date = logical_date = start of the data interval
actual_run_time = when the task actually executes
Example: Daily DAG scheduled for 2 AM
- execution_date: 2024-01-15 00:00:00 (start of day)
- DAG runs at: 2024-01-16 02:00:00 (next day at 2 AM)
- Processes data for: 2024-01-15 (the execution_date)
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule='@daily', # Runs at midnight
start_date=datetime(2024, 1, 1),
catchup=False
)
def date_concepts():
@task
def show_dates(**context):
print(f"execution_date (logical_date): {context['execution_date']}")
print(f"Next execution_date: {context['next_execution_date']}")
print(f"Previous execution_date: {context['prev_execution_date']}")
print(f"Current time: {datetime.now()}")
# Common date macros
print(f"ds (YYYY-MM-DD): {context['ds']}") # execution_date as string
print(f"ds_nodash: {context['ds_nodash']}") # 20240115
print(f"ts (timestamp): {context['ts']}") # Full ISO timestamp
"""
Output when run on Jan 16, 2024 at 12:05 AM:
execution_date: 2024-01-15 00:00:00 (yesterday!)
Next execution_date: 2024-01-16 00:00:00
Previous execution_date: 2024-01-14 00:00:00
Current time: 2024-01-16 00:05:12
ds: 2024-01-15
"""
show_dates()
date_concepts()
Tasks: The Work Units
Tasks are individual units of work within a DAG. Each task is an instance of an Operator.Task Definition Patterns
Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG('task_patterns', start_date=datetime(2024, 1, 1), schedule=None) as dag:
# ========== PATTERN 1: Simple Task ==========
def simple_function():
print("Hello Airflow!")
simple_task = PythonOperator(
task_id='simple',
python_callable=simple_function
)
# ========== PATTERN 2: Task with Arguments ==========
def process_data(file_path, output_format):
print(f"Processing {file_path} to {output_format}")
task_with_args = PythonOperator(
task_id='with_args',
python_callable=process_data,
op_kwargs={
'file_path': '/data/input.csv',
'output_format': 'parquet'
}
)
# ========== PATTERN 3: Task with Context ==========
def use_context(**context):
# Access Airflow context variables
execution_date = context['execution_date']
dag_id = context['dag'].dag_id
task_id = context['task'].task_id
print(f"Running {dag_id}.{task_id} for {execution_date}")
# Pull XCom from another task
previous_result = context['ti'].xcom_pull(task_ids='previous_task')
context_task = PythonOperator(
task_id='with_context',
python_callable=use_context,
provide_context=True # Deprecated in Airflow 2.0+, always True
)
# ========== PATTERN 4: Task with Templating ==========
bash_task = BashOperator(
task_id='templated',
bash_command="""
echo "Processing data for {{ ds }}"
python /scripts/process.py \
--date {{ ds }} \
--output /data/{{ ds }}/output.csv
""",
env={
'EXECUTION_DATE': '{{ ds }}',
'DAG_ID': '{{ dag.dag_id }}'
}
)
# ========== PATTERN 5: Task with Retries & Timeouts ==========
critical_task = PythonOperator(
task_id='critical',
python_callable=simple_function,
retries=5,
retry_delay=timedelta(minutes=10),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
execution_timeout=timedelta(minutes=30),
email_on_failure=True,
email=['[email protected]']
)
# ========== PATTERN 6: Task with Pool & Priority ==========
# Pools limit concurrency for resource management
pooled_task = PythonOperator(
task_id='pooled',
python_callable=simple_function,
pool='database_connections', # Created in UI: Admin → Pools
pool_slots=2, # Uses 2 slots from pool
priority_weight=10, # Higher priority = runs first
weight_rule='downstream' # Consider downstream tasks for priority
)
Task Configuration Options
Copy
with DAG('task_config', start_date=datetime(2024, 1, 1), schedule=None) as dag:
configured_task = PythonOperator(
# ========== IDENTIFICATION ==========
task_id='my_task', # REQUIRED: Unique within DAG
owner='data_team',
# ========== EXECUTION ==========
python_callable=my_function,
op_args=[arg1, arg2], # Positional arguments
op_kwargs={'key': 'value'}, # Keyword arguments
# ========== RETRY BEHAVIOR ==========
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
# ========== TIMEOUTS ==========
execution_timeout=timedelta(hours=2),
timeout=timedelta(hours=2), # Alias for execution_timeout
# ========== DEPENDENCIES ==========
depends_on_past=False, # Wait for previous run's same task
wait_for_downstream=False, # Previous run's downstream tasks must complete
trigger_rule='all_success', # When to trigger (see below)
# ========== RESOURCE MANAGEMENT ==========
pool='default_pool',
pool_slots=1,
priority_weight=1,
weight_rule='downstream', # 'downstream', 'upstream', 'absolute'
queue='default', # Celery queue
# ========== SLA & NOTIFICATIONS ==========
sla=timedelta(hours=2),
email=['[email protected]'],
email_on_failure=True,
email_on_retry=False,
on_failure_callback=lambda context: notify_slack(context),
on_success_callback=None,
on_retry_callback=None,
# ========== TASK GROUPS ==========
task_group=None, # Parent task group
# ========== RENDERING ==========
do_xcom_push=True, # Push return value to XCom
multiple_outputs=False, # TaskFlow: return dict pushes multiple XComs
# ========== UI ==========
doc_md="Task documentation in Markdown",
ui_color='#ffefeb', # Task color in graph view
ui_fgcolor='#000' # Text color
)
TaskFlow API: Modern Airflow
The TaskFlow API (introduced in Airflow 2.0) simplifies DAG authoring with decorators and automatic XCom handling.Traditional vs TaskFlow
Copy
# ========== TRADITIONAL APPROACH ==========
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
data = {'value': 42}
return data # Automatically pushed to XCom
def transform_data(**context):
ti = context['ti']
data = ti.xcom_pull(task_ids='extract') # Manual XCom pull
transformed = data['value'] * 2
return transformed
def load_data(**context):
ti = context['ti']
result = ti.xcom_pull(task_ids='transform')
print(f"Loading {result}")
with DAG('traditional', start_date=datetime(2024, 1, 1), schedule=None) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
load = PythonOperator(task_id='load', python_callable=load_data)
extract >> transform >> load
# ========== TASKFLOW APPROACH ==========
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def taskflow_example():
@task
def extract():
return {'value': 42}
@task
def transform(data: dict) -> int: # Type hints for clarity
return data['value'] * 2
@task
def load(result: int):
print(f"Loading {result}")
# XCom passing is automatic based on return/parameters
data = extract()
result = transform(data)
load(result)
taskflow_example()
TaskFlow Advanced Patterns
Copy
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, List
@dag(start_date=datetime(2024, 1, 1), schedule='@daily', catchup=False)
def advanced_taskflow():
# ========== PATTERN 1: Multiple Outputs ==========
@task(multiple_outputs=True)
def extract_multiple() -> Dict[str, any]:
"""
Returns dict - each key becomes separate XCom
Accessible as: extract_multiple()['customers']
"""
return {
'customers': [1, 2, 3],
'orders': [10, 20, 30],
'metadata': {'count': 6}
}
# ========== PATTERN 2: Task Configuration ==========
@task(
retries=3,
retry_delay=timedelta(minutes=5),
execution_timeout=timedelta(hours=1),
pool='api_calls',
email_on_failure=True
)
def configured_task():
return "Done"
# ========== PATTERN 3: Virtualenv Isolation ==========
@task.virtualenv(
requirements=['pandas==2.0.0', 'numpy==1.24.0'],
system_site_packages=False # Isolated environment
)
def analyze_with_pandas():
import pandas as pd
import numpy as np
df = pd.DataFrame({'A': [1, 2, 3]})
return df.sum().to_dict()
# ========== PATTERN 4: Docker Isolation ==========
@task.docker(
image='python:3.11-slim',
api_version='auto',
auto_remove=True,
mount_tmp_dir=False
)
def run_in_container():
return "Executed in Docker"
# ========== PATTERN 5: Task Groups ==========
from airflow.utils.task_group import TaskGroup
@task
def process_customer(customer_id: int):
return f"Processed customer {customer_id}"
with TaskGroup('process_customers') as customer_group:
customers = [1, 2, 3, 4, 5]
tasks = [process_customer.override(task_id=f'customer_{i}')(i) for i in customers]
# ========== PATTERN 6: Dynamic Task Mapping (Airflow 2.3+) ==========
@task
def get_customer_ids():
return [1, 2, 3, 4, 5]
@task
def process_single_customer(customer_id: int):
print(f"Processing customer {customer_id}")
return customer_id * 10
@task
def aggregate_results(results: List[int]):
print(f"Total: {sum(results)}")
# Dynamic task mapping - creates 5 parallel tasks automatically
customer_ids = get_customer_ids()
results = process_single_customer.expand(customer_id=customer_ids)
aggregate_results(results)
# ========== PATTERN 7: Branching ==========
@task.branch
def choose_branch(**context):
"""
Returns task_id(s) to execute
Other downstream tasks are skipped
"""
hour = datetime.now().hour
if hour < 12:
return 'morning_task'
else:
return 'afternoon_task'
@task
def morning_task():
print("Good morning!")
@task
def afternoon_task():
print("Good afternoon!")
@task(trigger_rule='none_failed_min_one_success') # Runs after any branch
def final_task():
print("Pipeline complete")
branch = choose_branch()
morning = morning_task()
afternoon = afternoon_task()
final = final_task()
branch >> [morning, afternoon] >> final
# ========== PATTERN 8: Sensor Decorator ==========
@task.sensor(poke_interval=30, timeout=3600, mode='poke')
def wait_for_file(filepath: str) -> bool:
"""
Returns True when condition met, False to keep waiting
"""
import os
return os.path.exists(filepath)
wait_for_file('/data/input.csv')
advanced_taskflow()
TaskFlow with External Data
Copy
@dag(start_date=datetime(2024, 1, 1), schedule='@daily', catchup=False)
def etl_pipeline():
@task
def extract_from_api(**context):
"""Extract data from REST API"""
import requests
execution_date = context['ds']
response = requests.get(
'https://api.example.com/data',
params={'date': execution_date}
)
response.raise_for_status()
return response.json()
@task
def transform_data(raw_data: dict) -> dict:
"""Transform and enrich data"""
records = raw_data['records']
transformed = []
for record in records:
transformed.append({
'id': record['id'],
'value': record['amount'] * 1.1, # Add 10%
'category': record['type'].upper(),
'processed_at': datetime.now().isoformat()
})
return {'records': transformed, 'count': len(transformed)}
@task
def validate_data(data: dict) -> dict:
"""Data quality checks"""
records = data['records']
# Check for required fields
for record in records:
assert 'id' in record, f"Missing id in {record}"
assert record['value'] > 0, f"Invalid value: {record['value']}"
# Check count
assert data['count'] > 0, "No records to process"
print(f"Validation passed: {data['count']} records")
return data
@task
def load_to_database(data: dict):
"""Load to PostgreSQL"""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='my_postgres')
conn = hook.get_conn()
cursor = conn.cursor()
insert_sql = """
INSERT INTO processed_data (id, value, category, processed_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
value = EXCLUDED.value,
category = EXCLUDED.category,
processed_at = EXCLUDED.processed_at
"""
for record in data['records']:
cursor.execute(insert_sql, (
record['id'],
record['value'],
record['category'],
record['processed_at']
))
conn.commit()
cursor.close()
conn.close()
print(f"Loaded {data['count']} records")
# Define pipeline
raw = extract_from_api()
transformed = transform_data(raw)
validated = validate_data(transformed)
load_to_database(validated)
etl_pipeline()
Task Dependencies
Dependencies define the execution order of tasks in a DAG.Dependency Operators
Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG('dependencies', start_date=datetime(2024, 1, 1), schedule=None) as dag:
task_a = PythonOperator(task_id='task_a', python_callable=lambda: print('A'))
task_b = PythonOperator(task_id='task_b', python_callable=lambda: print('B'))
task_c = PythonOperator(task_id='task_c', python_callable=lambda: print('C'))
task_d = PythonOperator(task_id='task_d', python_callable=lambda: print('D'))
task_e = PythonOperator(task_id='task_e', python_callable=lambda: print('E'))
# ========== LINEAR DEPENDENCIES ==========
# A → B → C
task_a >> task_b >> task_c
# Equivalent syntax:
task_a.set_downstream(task_b)
task_b.set_downstream(task_c)
# Or reverse:
task_c << task_b << task_a
# ========== FAN-OUT (Parallel) ==========
# ┌─→ B
# A ──┼─→ C
# └─→ D
task_a >> [task_b, task_c, task_d]
# ========== FAN-IN (Join) ==========
# B ──┐
# C ──┼─→ E
# D ──┘
[task_b, task_c, task_d] >> task_e
# ========== COMPLEX DEPENDENCIES ==========
# ┌─→ B ──┐
# A ──┤ ├─→ D
# └─→ C ──┘
task_a >> [task_b, task_c] >> task_d
# ========== CROSS DEPENDENCIES ==========
task_a >> task_b
task_a >> task_c
task_b >> task_d
task_c >> task_d
# ========== CHAIN HELPER ==========
from airflow.models.baseoperator import chain
# chain(A, B, C, D) equivalent to A >> B >> C >> D
chain(task_a, task_b, task_c, task_d)
# Multiple chains in parallel
chain(task_a, [task_b, task_c], task_d)
# A → B → D
# A → C → D
# ========== CROSS DOWNSTREAM ==========
from airflow.models.baseoperator import cross_downstream
# Connect all upstream to all downstream
cross_downstream([task_a, task_b], [task_c, task_d])
# A → C, A → D, B → C, B → D
Trigger Rules
Trigger rules determine when a task should run based on upstream task states.Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime
with DAG('trigger_rules', start_date=datetime(2024, 1, 1), schedule=None) as dag:
# ========== ALL_SUCCESS (Default) ==========
# Runs only if ALL upstream tasks succeeded
task_default = PythonOperator(
task_id='all_success',
python_callable=lambda: print('All upstream succeeded'),
trigger_rule='all_success' # This is the default
)
# ========== ALL_FAILED ==========
# Runs only if ALL upstream tasks failed
task_all_failed = PythonOperator(
task_id='all_failed',
python_callable=lambda: print('All upstream failed'),
trigger_rule='all_failed'
)
# ========== ALL_DONE ==========
# Runs when ALL upstream tasks finished (success or failed)
# Useful for cleanup tasks
cleanup = PythonOperator(
task_id='cleanup',
python_callable=lambda: print('Cleaning up...'),
trigger_rule='all_done'
)
# ========== ONE_SUCCESS ==========
# Runs if AT LEAST ONE upstream task succeeded
task_one_success = PythonOperator(
task_id='one_success',
python_callable=lambda: print('At least one succeeded'),
trigger_rule='one_success'
)
# ========== ONE_FAILED ==========
# Runs if AT LEAST ONE upstream task failed
alert_failure = PythonOperator(
task_id='alert_failure',
python_callable=lambda: print('Alert: failure detected'),
trigger_rule='one_failed'
)
# ========== NONE_FAILED ==========
# Runs if NO upstream tasks failed (some may be skipped)
# Useful after branching
task_none_failed = PythonOperator(
task_id='none_failed',
python_callable=lambda: print('No failures'),
trigger_rule='none_failed'
)
# ========== NONE_FAILED_MIN_ONE_SUCCESS ==========
# Runs if no tasks failed AND at least one succeeded
task_none_failed_min_success = PythonOperator(
task_id='none_failed_min_success',
python_callable=lambda: print('No failures, at least one success'),
trigger_rule='none_failed_min_one_success'
)
# ========== NONE_SKIPPED ==========
# Runs only if NO upstream tasks were skipped
task_none_skipped = PythonOperator(
task_id='none_skipped',
python_callable=lambda: print('None skipped'),
trigger_rule='none_skipped'
)
# ========== ALWAYS ==========
# Always runs, regardless of upstream state
# Useful for notifications, logging
always_run = PythonOperator(
task_id='always_run',
python_callable=lambda: print('Always executes'),
trigger_rule='always'
)
# ========== PRACTICAL EXAMPLE: Error Handling Pipeline ==========
with DAG('error_handling', start_date=datetime(2024, 1, 1), schedule=None) as dag:
def task_that_might_fail():
import random
if random.random() < 0.5:
raise Exception("Random failure!")
return "Success"
risky_task_1 = PythonOperator(task_id='risky_1', python_callable=task_that_might_fail)
risky_task_2 = PythonOperator(task_id='risky_2', python_callable=task_that_might_fail)
risky_task_3 = PythonOperator(task_id='risky_3', python_callable=task_that_might_fail)
# Runs if at least one risky task succeeded
partial_success = PythonOperator(
task_id='partial_success',
python_callable=lambda: print("At least partial success!"),
trigger_rule='one_success'
)
# Runs if any task failed (for alerting)
send_alert = PythonOperator(
task_id='send_alert',
python_callable=lambda: print("ALERT: Task failure detected"),
trigger_rule='one_failed'
)
# Cleanup runs regardless of outcome
cleanup = PythonOperator(
task_id='cleanup',
python_callable=lambda: print("Cleanup completed"),
trigger_rule='all_done'
)
[risky_task_1, risky_task_2, risky_task_3] >> partial_success
[risky_task_1, risky_task_2, risky_task_3] >> send_alert
[risky_task_1, risky_task_2, risky_task_3] >> cleanup
Dynamic DAG Generation
Create DAGs programmatically based on configuration, databases, or external sources.Pattern 1: Loop-Based Generation
Copy
# Generate similar DAGs for different environments
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
ENVIRONMENTS = ['dev', 'staging', 'prod']
def create_environment_dag(env):
"""Factory function to create DAG for each environment"""
dag = DAG(
dag_id=f'etl_{env}',
start_date=datetime(2024, 1, 1),
schedule='@daily' if env == 'prod' else None,
catchup=False,
tags=[env, 'etl']
)
with dag:
extract = PythonOperator(
task_id='extract',
python_callable=lambda: print(f"Extracting for {env}")
)
transform = PythonOperator(
task_id='transform',
python_callable=lambda: print(f"Transforming for {env}")
)
load = PythonOperator(
task_id='load',
python_callable=lambda: print(f"Loading to {env}")
)
extract >> transform >> load
return dag
# Create DAGs for all environments
for env in ENVIRONMENTS:
dag_id = f'etl_{env}'
globals()[dag_id] = create_environment_dag(env)
Pattern 2: Configuration-Driven DAGs
Copy
# Generate DAGs from YAML configuration
import yaml
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
# config.yaml
"""
pipelines:
- name: sales_pipeline
schedule: '@daily'
tasks:
- task_id: extract_sales
type: bash
command: python /scripts/extract_sales.py
- task_id: transform_sales
type: python
function: transform_sales
- task_id: load_sales
type: bash
command: python /scripts/load_sales.py
- name: inventory_pipeline
schedule: '@hourly'
tasks:
- task_id: sync_inventory
type: bash
command: python /scripts/sync_inventory.py
"""
def load_config():
with open('/opt/airflow/config/pipelines.yaml') as f:
return yaml.safe_load(f)
def create_dag_from_config(pipeline_config):
dag = DAG(
dag_id=pipeline_config['name'],
start_date=datetime(2024, 1, 1),
schedule=pipeline_config['schedule'],
catchup=False
)
with dag:
tasks = {}
for task_config in pipeline_config['tasks']:
if task_config['type'] == 'bash':
task = BashOperator(
task_id=task_config['task_id'],
bash_command=task_config['command']
)
elif task_config['type'] == 'python':
task = PythonOperator(
task_id=task_config['task_id'],
python_callable=globals()[task_config['function']]
)
tasks[task_config['task_id']] = task
# Set dependencies (assume sequential for simplicity)
task_list = list(tasks.values())
for i in range(len(task_list) - 1):
task_list[i] >> task_list[i + 1]
return dag
# Generate DAGs from config
config = load_config()
for pipeline in config['pipelines']:
dag_id = pipeline['name']
globals()[dag_id] = create_dag_from_config(pipeline)
Pattern 3: Database-Driven DAGs
Copy
# Generate DAGs from database table
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
def get_pipeline_configs():
"""
Fetch pipeline configurations from database
Table: pipeline_configs
Columns: name, schedule, source_table, destination_table, enabled
"""
hook = PostgresHook(postgres_conn_id='metadata_db')
sql = "SELECT name, schedule, source_table, destination_table FROM pipeline_configs WHERE enabled = true"
return hook.get_records(sql)
def create_etl_dag(name, schedule, source_table, dest_table):
def extract_data(**context):
hook = PostgresHook(postgres_conn_id='source_db')
df = hook.get_pandas_df(f"SELECT * FROM {source_table} WHERE date = '{context['ds']}'")
return df.to_dict('records')
def load_data(data, **context):
hook = PostgresHook(postgres_conn_id='dest_db')
# Load logic here
print(f"Loading {len(data)} records to {dest_table}")
dag = DAG(
dag_id=f'etl_{name}',
start_date=datetime(2024, 1, 1),
schedule=schedule,
catchup=False,
tags=['auto-generated', 'etl']
)
with dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
load = PythonOperator(task_id='load', python_callable=load_data, op_kwargs={'data': extract.output})
extract >> load
return dag
# Generate DAGs from database
for name, schedule, source, dest in get_pipeline_configs():
dag_id = f'etl_{name}'
globals()[dag_id] = create_etl_dag(name, schedule, source, dest)
Pattern 4: Dynamic Task Generation
Copy
# Generate tasks dynamically based on runtime conditions
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule='@daily', catchup=False)
def dynamic_tasks():
@task
def get_tables_to_process():
"""
Determine which tables need processing
Could query database, read config, call API, etc.
"""
return ['customers', 'orders', 'products', 'inventory']
@task
def process_table(table_name: str):
"""Process a single table"""
print(f"Processing table: {table_name}")
# ETL logic here
return f"{table_name}_processed"
@task
def aggregate_results(processed_tables: list):
"""Combine results from all table processing"""
print(f"Processed tables: {processed_tables}")
# Dynamic task mapping (Airflow 2.3+)
tables = get_tables_to_process()
results = process_table.expand(table_name=tables)
aggregate_results(results)
dynamic_tasks()
Task Groups
Organize tasks visually in the UI without affecting execution.Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
with DAG('task_groups', start_date=datetime(2024, 1, 1), schedule=None) as dag:
start = PythonOperator(task_id='start', python_callable=lambda: print('Start'))
# ========== TASK GROUP 1: Data Extraction ==========
with TaskGroup('extract_sources') as extract_group:
extract_db1 = PythonOperator(task_id='extract_postgres', python_callable=lambda: print('Extract PG'))
extract_db2 = PythonOperator(task_id='extract_mysql', python_callable=lambda: print('Extract MySQL'))
extract_api = PythonOperator(task_id='extract_api', python_callable=lambda: print('Extract API'))
# Dependencies within group
[extract_db1, extract_db2, extract_api]
# ========== TASK GROUP 2: Transformation ==========
with TaskGroup('transform') as transform_group:
clean_data = PythonOperator(task_id='clean', python_callable=lambda: print('Clean'))
enrich_data = PythonOperator(task_id='enrich', python_callable=lambda: print('Enrich'))
aggregate = PythonOperator(task_id='aggregate', python_callable=lambda: print('Aggregate'))
clean_data >> enrich_data >> aggregate
# ========== TASK GROUP 3: Loading ==========
with TaskGroup('load_destinations') as load_group:
load_warehouse = PythonOperator(task_id='load_snowflake', python_callable=lambda: print('Load Snowflake'))
load_datalake = PythonOperator(task_id='load_s3', python_callable=lambda: print('Load S3'))
[load_warehouse, load_datalake]
end = PythonOperator(task_id='end', python_callable=lambda: print('End'))
# High-level dependencies
start >> extract_group >> transform_group >> load_group >> end
# ========== NESTED TASK GROUPS ==========
with DAG('nested_task_groups', start_date=datetime(2024, 1, 1), schedule=None) as dag:
with TaskGroup('processing') as processing:
with TaskGroup('data_quality') as quality:
check_nulls = PythonOperator(task_id='check_nulls', python_callable=lambda: print('Check nulls'))
check_duplicates = PythonOperator(task_id='check_duplicates', python_callable=lambda: print('Check dupes'))
check_nulls >> check_duplicates
with TaskGroup('transformations') as transforms:
normalize = PythonOperator(task_id='normalize', python_callable=lambda: print('Normalize'))
denormalize = PythonOperator(task_id='denormalize', python_callable=lambda: print('Denormalize'))
normalize >> denormalize
quality >> transforms
Summary: Core Concepts Mastery
You now understand:
- DAGs: Container for workflows with schedule and configuration
- Tasks: Individual units of work (operator instances)
- TaskFlow API: Modern decorator-based DAG authoring
- Dependencies: Controlling task execution order
- Trigger Rules: Conditional task execution based on upstream states
- Dynamic Generation: Creating DAGs and tasks programmatically
- Task Groups: Organizing tasks visually in the UI
Key Takeaways:
- Use TaskFlow API for new DAGs - cleaner syntax, automatic XCom
- Dependencies define execution order, not data flow (use XCom for data)
- Trigger rules enable complex conditional logic
- Dynamic DAGs reduce code duplication and enable configuration-driven pipelines
- Task groups improve UI organization without affecting execution
Next Steps
Now that you’ve mastered core concepts, let’s explore the vast ecosystem of Airflow operators.Module 3: Operators - The Building Blocks
Learn built-in operators, create custom operators, and master best practices