Airflow Scheduling Mastery
Module Level: Core Foundation
Prerequisites: Modules 1-3 (Overview, Core Concepts, Operators)
Duration: 3-4 hours
Key Concepts: Cron, execution dates, catchup, backfill, SLAs, trigger rules
Understanding Airflow’s Scheduling Model
Airflow’s scheduling is based on a crucial concept: DAGs run at the END of the data interval, not at the beginning.The Data Interval Concept
Copy
"""
KEY CONCEPT: execution_date represents the START of the data interval
Example: DAG with schedule='@daily' (runs at midnight)
Data Interval execution_date Actual Run Time Processes Data For
------------- -------------- --------------- ------------------
Jan 1 00:00 - 2024-01-01 Jan 2, 00:00:00 January 1st data
Jan 2 00:00 00:00:00
Jan 2 00:00 - 2024-01-02 Jan 3, 00:00:00 January 2nd data
Jan 3 00:00 00:00:00
WHY? Because you can't process Jan 1st data until Jan 1st is complete!
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule='@daily', # Runs daily at midnight
start_date=datetime(2024, 1, 1),
catchup=False
)
def scheduling_concept():
@task
def show_dates(**context):
print(f"execution_date (data_interval_start): {context['data_interval_start']}")
print(f"data_interval_end: {context['data_interval_end']}")
print(f"logical_date: {context['logical_date']}") # Same as execution_date in Airflow 2.2+
print(f"Current time (when actually running): {datetime.now()}")
"""
If this DAG runs on Jan 2 at 00:05:
- data_interval_start: 2024-01-01 00:00:00
- data_interval_end: 2024-01-02 00:00:00
- logical_date: 2024-01-01 00:00:00
- Current time: 2024-01-02 00:05:12
Use {{ ds }} (execution_date as YYYY-MM-DD) in queries:
SELECT * FROM orders WHERE order_date = '2024-01-01'
"""
show_dates()
scheduling_concept()
Schedule Interval Formats
Cron Expressions
Copy
from airflow import DAG
from datetime import datetime
# ========== CRON FORMAT ==========
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday=0)
# │ │ │ │ │
# * * * * *
# ========== COMMON SCHEDULES ==========
# Every minute
DAG('every_minute', schedule='* * * * *', start_date=datetime(2024, 1, 1))
# Every 5 minutes
DAG('every_5_min', schedule='*/5 * * * *', start_date=datetime(2024, 1, 1))
# Every hour at minute 0
DAG('hourly', schedule='0 * * * *', start_date=datetime(2024, 1, 1))
# OR use preset
DAG('hourly_preset', schedule='@hourly', start_date=datetime(2024, 1, 1))
# Daily at 2 AM
DAG('daily_2am', schedule='0 2 * * *', start_date=datetime(2024, 1, 1))
# Every weekday at 6 AM (Mon-Fri)
DAG('weekdays', schedule='0 6 * * 1-5', start_date=datetime(2024, 1, 1))
# Every Monday at 9 AM
DAG('monday', schedule='0 9 * * 1', start_date=datetime(2024, 1, 1))
# First day of month at midnight
DAG('monthly', schedule='0 0 1 * *', start_date=datetime(2024, 1, 1))
# Every Sunday at 3 AM
DAG('weekly_sunday', schedule='0 3 * * 0', start_date=datetime(2024, 1, 1))
# Every 4 hours
DAG('every_4_hours', schedule='0 */4 * * *', start_date=datetime(2024, 1, 1))
# Business hours only (9 AM - 5 PM, Mon-Fri)
DAG('business_hours', schedule='0 9-17 * * 1-5', start_date=datetime(2024, 1, 1))
# ========== PRESET SCHEDULES ==========
DAG('none', schedule=None, start_date=datetime(2024, 1, 1)) # Manual trigger only
DAG('once', schedule='@once', start_date=datetime(2024, 1, 1)) # Run once then never
DAG('hourly', schedule='@hourly', start_date=datetime(2024, 1, 1)) # 0 * * * *
DAG('daily', schedule='@daily', start_date=datetime(2024, 1, 1)) # 0 0 * * *
DAG('weekly', schedule='@weekly', start_date=datetime(2024, 1, 1)) # 0 0 * * 0
DAG('monthly', schedule='@monthly', start_date=datetime(2024, 1, 1)) # 0 0 1 * *
DAG('yearly', schedule='@yearly', start_date=datetime(2024, 1, 1)) # 0 0 1 1 *
# ========== TIMEDELTA SCHEDULES ==========
from datetime import timedelta
# Every 30 minutes
DAG('every_30_min', schedule=timedelta(minutes=30), start_date=datetime(2024, 1, 1))
# Every 6 hours
DAG('every_6_hours', schedule=timedelta(hours=6), start_date=datetime(2024, 1, 1))
# Every 2 days
DAG('every_2_days', schedule=timedelta(days=2), start_date=datetime(2024, 1, 1))
# ========== COMPLEX CRON EXAMPLES ==========
# Every 15 minutes during business hours (9 AM - 5 PM, Mon-Fri)
DAG('business_15min', schedule='*/15 9-17 * * 1-5', start_date=datetime(2024, 1, 1))
# Twice daily: 8 AM and 8 PM
DAG('twice_daily', schedule='0 8,20 * * *', start_date=datetime(2024, 1, 1))
# Every quarter (Jan 1, Apr 1, Jul 1, Oct 1)
DAG('quarterly', schedule='0 0 1 1,4,7,10 *', start_date=datetime(2024, 1, 1))
# Last day of month at 11 PM (approximation - runs on 28th)
DAG('month_end', schedule='0 23 28-31 * *', start_date=datetime(2024, 1, 1))
Testing Cron Expressions
Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import cron_presets
from croniter import croniter
from datetime import datetime
def test_cron_schedule(**context):
"""Test when a cron expression will run"""
cron_expr = '0 2 * * 1-5' # 2 AM weekdays
start_date = datetime(2024, 1, 1)
# Generate next 10 execution times
cron = croniter(cron_expr, start_date)
for i in range(10):
next_run = cron.get_next(datetime)
print(f"Run {i+1}: {next_run.strftime('%Y-%m-%d %H:%M %A')}")
"""
Output:
Run 1: 2024-01-01 02:00 Monday
Run 2: 2024-01-02 02:00 Tuesday
Run 3: 2024-01-03 02:00 Wednesday
Run 4: 2024-01-04 02:00 Thursday
Run 5: 2024-01-05 02:00 Friday
Run 6: 2024-01-08 02:00 Monday
...
"""
with DAG('test_cron', schedule=None, start_date=datetime(2024, 1, 1)) as dag:
test = PythonOperator(task_id='test', python_callable=test_cron_schedule)
Catchup and Backfill
Catchup Behavior
Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# ========== CATCHUP=TRUE (Default in Airflow 1.x, False in 2.x) ==========
# If start_date is 30 days ago and catchup=True,
# Airflow creates 30 DAG runs immediately!
with DAG(
'catchup_true',
schedule='@daily',
start_date=datetime(2024, 1, 1), # 30 days ago
catchup=True, # Run all missed intervals
max_active_runs=3 # But only run 3 at a time
) as dag:
def process_data(**context):
execution_date = context['ds']
print(f"Processing data for {execution_date}")
process = PythonOperator(task_id='process', python_callable=process_data)
"""
Behavior:
- Creates DAG runs for Jan 1, Jan 2, Jan 3, ..., Jan 30
- Runs up to 3 concurrently (max_active_runs=3)
- Useful for: Historical data processing, backfilling
- WARNING: Can overwhelm system if start_date is far in past
"""
# ========== CATCHUP=FALSE (Recommended for most cases) ==========
with DAG(
'catchup_false',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False # Only run for current interval
) as dag:
process = PythonOperator(task_id='process', python_callable=process_data)
"""
Behavior:
- Only creates DAG run for current/latest interval
- Skips all past intervals
- Useful for: Most production pipelines, real-time processing
- BEST PRACTICE: Use catchup=False unless you need backfill
"""
# ========== SELECTIVE CATCHUP ==========
with DAG(
'selective_catchup',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False, # Don't auto-backfill
tags=['backfill-capable']
) as dag:
def process_data_idempotent(**context):
"""
Idempotent processing - safe to run multiple times
Can be manually backfilled if needed
"""
execution_date = context['ds']
# Delete existing data for this date (idempotency)
delete_sql = f"DELETE FROM results WHERE date = '{execution_date}'"
# Process and insert
process_sql = f"""
INSERT INTO results
SELECT * FROM source_data WHERE date = '{execution_date}'
"""
print(f"Processed {execution_date}")
process = PythonOperator(task_id='process', python_callable=process_data_idempotent)
"""
Best of both worlds:
- catchup=False prevents automatic backfill
- Idempotent logic allows manual backfill via CLI:
airflow dags backfill selective_catchup -s 2024-01-01 -e 2024-01-31
"""
Manual Backfill via CLI
Copy
# ========== BACKFILL COMMANDS ==========
# Backfill specific date range
airflow dags backfill my_dag \
--start-date 2024-01-01 \
--end-date 2024-01-31
# Backfill with task subset
airflow dags backfill my_dag \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--task-regex 'extract.*' # Only tasks matching regex
# Dry run (don't actually execute)
airflow dags backfill my_dag \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--dry-run
# Backfill with reset (clear existing runs first)
airflow dags backfill my_dag \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--reset-dagruns
# Backfill specific run
airflow dags backfill my_dag \
--start-date 2024-01-15 \
--end-date 2024-01-15
# ========== PROGRAMMATIC BACKFILL ==========
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def trigger_backfill(**context):
"""Trigger backfill via API"""
from airflow.api.client.local_client import Client
client = Client(api_base_url=None, auth=None)
# Trigger backfill for last 7 days
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
# This creates individual DAG runs
current = start_date
while current <= end_date:
client.trigger_dag(
dag_id='target_dag',
run_id=f'backfill_{current.strftime("%Y%m%d")}',
execution_date=current
)
current += timedelta(days=1)
with DAG('trigger_backfill_dag', schedule=None, start_date=datetime(2024, 1, 1)) as dag:
backfill = PythonOperator(task_id='backfill', python_callable=trigger_backfill)
Depends on Past
Control whether tasks wait for previous runs to complete.Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# ========== DEPENDS_ON_PAST=FALSE (Default) ==========
with DAG(
'independent_runs',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=True
) as dag:
task = PythonOperator(
task_id='process',
python_callable=lambda: print("Processing"),
depends_on_past=False # Each run is independent
)
"""
Behavior:
- Jan 1 run can start immediately
- Jan 2 run can start immediately (doesn't wait for Jan 1)
- Jan 3 run can start immediately (doesn't wait for Jan 2)
Use when: Data for each day is independent
"""
# ========== DEPENDS_ON_PAST=TRUE ==========
with DAG(
'sequential_runs',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=True
) as dag:
task = PythonOperator(
task_id='process',
python_callable=lambda: print("Processing"),
depends_on_past=True # Wait for previous run
)
"""
Behavior:
- Jan 1 run starts immediately
- Jan 2 run waits for Jan 1 to succeed
- Jan 3 run waits for Jan 2 to succeed
- Forms a sequential chain across runs
Use when:
- Each day's data depends on previous day
- Incremental processing
- Order matters
"""
# ========== WAIT_FOR_DOWNSTREAM ==========
with DAG(
'wait_downstream',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=True
) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=lambda: print("Extract"),
wait_for_downstream=True # Wait for ALL downstream tasks in previous run
)
transform = PythonOperator(
task_id='transform',
python_callable=lambda: print("Transform")
)
load = PythonOperator(
task_id='load',
python_callable=lambda: print("Load")
)
extract >> transform >> load
"""
Behavior:
- Jan 2's 'extract' waits for Jan 1's 'extract', 'transform', AND 'load'
- Ensures complete pipeline before starting next
Use when:
- Need guarantee that entire previous pipeline completed
- Downstream tasks modify same data
"""
# ========== PRACTICAL EXAMPLE: INCREMENTAL ETL ==========
with DAG(
'incremental_etl',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=True
) as dag:
def extract_incremental(**context):
"""Extract new records since last successful run"""
execution_date = context['ds']
# Get last successful run's end time
from airflow.models import DagRun
last_run = DagRun.find(
dag_id=context['dag'].dag_id,
execution_date=context['prev_execution_date'],
state='success'
)
if last_run:
last_time = last_run[0].end_date
sql = f"SELECT * FROM source WHERE created_at > '{last_time}'"
else:
# First run - get all data for date
sql = f"SELECT * FROM source WHERE DATE(created_at) = '{execution_date}'"
print(f"Query: {sql}")
extract = PythonOperator(
task_id='extract_incremental',
python_callable=extract_incremental,
depends_on_past=True # Ensures sequential processing
)
SLAs (Service Level Agreements)
Define expected task completion times and get alerted on breaches.Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""
Called when SLA is missed
Args:
dag: DAG object
task_list: List of tasks that missed SLA
blocking_task_list: Tasks blocking SLA tasks
slas: List of SlaMiss objects
blocking_tis: Blocking task instances
"""
print(f"SLA MISSED!")
print(f"Tasks: {[t.task_id for t in task_list]}")
for sla in slas:
print(f"Task {sla.task_id} missed SLA by {sla.execution_date}")
# Send alert to PagerDuty, Slack, etc.
send_pagerduty_alert(f"SLA breach in {dag.dag_id}")
with DAG(
'sla_example',
schedule='@hourly',
start_date=datetime(2024, 1, 1),
sla_miss_callback=sla_miss_callback, # DAG-level callback
catchup=False
) as dag:
# ========== TASK-LEVEL SLA ==========
# Task must complete within 30 minutes of DAG start
quick_task = PythonOperator(
task_id='quick_task',
python_callable=lambda: print("Quick processing"),
sla=timedelta(minutes=30)
)
# Task must complete within 2 hours of DAG start
long_task = PythonOperator(
task_id='long_task',
python_callable=lambda: print("Long processing"),
sla=timedelta(hours=2)
)
# ========== REAL-WORLD EXAMPLE: PRODUCTION SLAs ==========
with DAG(
'production_pipeline_sla',
schedule='0 2 * * *', # 2 AM daily
start_date=datetime(2024, 1, 1),
sla_miss_callback=sla_miss_callback,
catchup=False,
tags=['production', 'critical']
) as dag:
def send_sla_alert(context):
"""Custom SLA alert"""
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
task = context['task']
execution_date = context['execution_date']
SlackWebhookOperator(
task_id='sla_alert',
http_conn_id='slack_webhook',
message=f"""
:warning: *SLA Breach*
Task: `{task.task_id}`
DAG: `{task.dag_id}`
Execution: {execution_date}
Expected completion: {task.sla}
""",
channel='#data-alerts'
).execute(context)
# Critical: Must complete by 3 AM (1 hour after start)
extract = PythonOperator(
task_id='extract',
python_callable=lambda: print("Extract"),
sla=timedelta(hours=1),
on_execute_callback=lambda ctx: print(f"Started at {datetime.now()}")
)
# Must complete by 4 AM (2 hours after start)
transform = PythonOperator(
task_id='transform',
python_callable=lambda: print("Transform"),
sla=timedelta(hours=2)
)
# Must complete by 5 AM (3 hours) - pipeline must be done before business hours
load = PythonOperator(
task_id='load',
python_callable=lambda: print("Load"),
sla=timedelta(hours=3),
on_execute_callback=send_sla_alert
)
extract >> transform >> load
# ========== CHECKING SLA MISSES ==========
from airflow.models import SlaMiss
from airflow.providers.postgres.hooks.postgres import PostgresHook
def check_sla_misses(**context):
"""Query SLA misses from metadata DB"""
hook = PostgresHook(postgres_conn_id='airflow_db')
# Get recent SLA misses
sla_misses = hook.get_pandas_df("""
SELECT
dag_id,
task_id,
execution_date,
timestamp as sla_miss_time,
email_sent,
notification_sent
FROM sla_miss
WHERE timestamp >= NOW() - INTERVAL '7 days'
ORDER BY timestamp DESC
""")
print(f"SLA misses in last 7 days: {len(sla_misses)}")
print(sla_misses)
with DAG('check_slas', schedule=None, start_date=datetime(2024, 1, 1)) as dag:
check = PythonOperator(task_id='check', python_callable=check_sla_misses)
DAG Run States and Timeouts
Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import time
with DAG(
'timeouts_example',
schedule='@daily',
start_date=datetime(2024, 1, 1),
dagrun_timeout=timedelta(hours=4), # Kill entire DAG run after 4 hours
catchup=False
) as dag:
# ========== TASK-LEVEL TIMEOUT ==========
def long_running_task():
"""Simulates long-running task"""
print("Starting long task...")
time.sleep(7200) # 2 hours
print("Completed")
quick_timeout = PythonOperator(
task_id='quick_timeout',
python_callable=long_running_task,
execution_timeout=timedelta(minutes=30) # Kill after 30 min
# Task will fail with AirflowTaskTimeout
)
# ========== RETRY CONFIGURATION ==========
def flaky_task():
"""Might fail randomly"""
import random
if random.random() < 0.7:
raise Exception("Temporary failure")
return "Success"
with_retries = PythonOperator(
task_id='with_retries',
python_callable=flaky_task,
retries=5,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True, # 5min, 10min, 20min, 40min, 80min
max_retry_delay=timedelta(hours=1) # Cap backoff at 1 hour
)
# ========== POOL SLOTS FOR RESOURCE MANAGEMENT ==========
# Create pool in UI: Admin → Pools → Add
# Name: database_connections, Slots: 5
db_task_1 = PythonOperator(
task_id='db_task_1',
python_callable=lambda: print("Using DB connection"),
pool='database_connections',
pool_slots=2 # Uses 2 of 5 available slots
)
db_task_2 = PythonOperator(
task_id='db_task_2',
python_callable=lambda: print("Using DB connection"),
pool='database_connections',
pool_slots=2
)
# Only 5 slots available, so if 3 tasks need 2 slots each,
# third task waits for slot to free up
# ========== DAG-LEVEL CONCURRENCY ==========
with DAG(
'concurrency_control',
schedule='@hourly',
start_date=datetime(2024, 1, 1),
max_active_runs=3, # Max 3 concurrent DAG runs
max_active_tasks=10, # Max 10 concurrent tasks across all runs
catchup=True
) as dag:
tasks = []
for i in range(20):
task = PythonOperator(
task_id=f'task_{i}',
python_callable=lambda: print(f"Task {i}")
)
tasks.append(task)
"""
Behavior:
- Only 3 DAG runs execute concurrently
- Within those 3 runs, max 10 tasks run concurrently
- Remaining tasks queue until slots available
"""
Timetables (Airflow 2.2+)
Advanced scheduling beyond cron and timedelta.Copy
from airflow import DAG
from airflow.timetables.interval import CronDataIntervalTimetable
from airflow.operators.python import PythonOperator
from datetime import datetime
# ========== CUSTOM BUSINESS DAY SCHEDULE ==========
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from pendulum import DateTime
import pendulum
class BusinessDayTimetable(Timetable):
"""Run on business days only (Monday-Friday)"""
def next_dagrun_info(self, last_automated_data_interval, restriction):
if last_automated_data_interval is not None:
next_start = last_automated_data_interval.end
else:
next_start = restriction.earliest
# Skip weekends
while next_start.day_of_week in (6, 7): # Saturday, Sunday
next_start = next_start.add(days=1)
next_end = next_start.add(days=1)
return DagRunInfo.interval(start=next_start, end=next_end)
# Use custom timetable
with DAG(
'business_days_only',
timetable=BusinessDayTimetable(),
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
task = PythonOperator(
task_id='business_task',
python_callable=lambda: print("Running on business day")
)
# ========== CRON WITH DATA INTERVAL ==========
with DAG(
'custom_interval',
timetable=CronDataIntervalTimetable(
cron='0 2 * * *', # Run at 2 AM
timezone='America/New_York'
),
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
task = PythonOperator(
task_id='task',
python_callable=lambda **ctx: print(f"Data interval: {ctx['data_interval_start']} to {ctx['data_interval_end']}")
)
Best Practices
Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# ========== BEST PRACTICE DAG ==========
default_args = {
'owner': 'data_team',
'depends_on_past': False, # Usually False for independence
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2, # Retry failed tasks
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'execution_timeout': timedelta(hours=1), # Task timeout
}
with DAG(
'best_practice_scheduling',
default_args=default_args,
description='Production-ready scheduling configuration',
schedule='0 2 * * *', # 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False, # Don't backfill by default
max_active_runs=1, # One run at a time
dagrun_timeout=timedelta(hours=3), # DAG timeout
tags=['production', 'critical', 'daily'],
doc_md="""
# Daily ETL Pipeline
Runs at 2 AM daily to process previous day's data.
**SLA**: Must complete by 5 AM
**Owner**: Data Platform Team
**On-call**: #data-oncall
"""
) as dag:
def idempotent_processing(**context):
"""
BEST PRACTICE: Make tasks idempotent
Can be re-run safely without duplicating data
"""
execution_date = context['ds']
# Clear existing data for date
print(f"DELETE FROM results WHERE date = '{execution_date}'")
# Process and insert
print(f"INSERT INTO results SELECT * FROM source WHERE date = '{execution_date}'")
process = PythonOperator(
task_id='process',
python_callable=idempotent_processing,
sla=timedelta(hours=3) # Alert if not done by 5 AM
)
Summary
You now understand:
- Scheduling Model: execution_date vs actual run time
- Cron Expressions: All common patterns and custom schedules
- Catchup & Backfill: When to use and how to control
- SLAs: Setting expectations and alerting on breaches
- Timeouts: Task and DAG-level execution limits
- Concurrency: Controlling parallel execution
- Best Practices: Production-ready scheduling configuration
Key Takeaways:
- execution_date is the start of the data interval, not when the DAG runs
- Use catchup=False unless you need automatic backfilling
- Set SLAs for critical pipelines to catch slowdowns early
- Make tasks idempotent so they can be safely retried/backfilled
- Use pools to limit resource consumption
- Set timeouts to prevent hung tasks from blocking pipelines
Next Steps
Module 5: Sensors and Hooks - Waiting and Connecting
Master sensors for waiting, hooks for connections, and custom implementations