Skip to main content

Airflow Scheduling Mastery

Module Level: Core Foundation Prerequisites: Modules 1-3 (Overview, Core Concepts, Operators) Duration: 3-4 hours Key Concepts: Cron, execution dates, catchup, backfill, SLAs, trigger rules

Understanding Airflow’s Scheduling Model

Airflow’s scheduling is based on a crucial concept: DAGs run at the END of the data interval, not at the beginning.

The Data Interval Concept

"""
KEY CONCEPT: execution_date represents the START of the data interval

Example: DAG with schedule='@daily' (runs at midnight)

Data Interval        execution_date       Actual Run Time       Processes Data For
-------------        --------------       ---------------       ------------------
Jan 1 00:00 -        2024-01-01           Jan 2, 00:00:00       January 1st data
Jan 2 00:00          00:00:00

Jan 2 00:00 -        2024-01-02           Jan 3, 00:00:00       January 2nd data
Jan 3 00:00          00:00:00

WHY? Because you can't process Jan 1st data until Jan 1st is complete!
"""

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule='@daily',  # Runs daily at midnight
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def scheduling_concept():

    @task
    def show_dates(**context):
        print(f"execution_date (data_interval_start): {context['data_interval_start']}")
        print(f"data_interval_end: {context['data_interval_end']}")
        print(f"logical_date: {context['logical_date']}")  # Same as execution_date in Airflow 2.2+
        print(f"Current time (when actually running): {datetime.now()}")

        """
        If this DAG runs on Jan 2 at 00:05:
        - data_interval_start: 2024-01-01 00:00:00
        - data_interval_end: 2024-01-02 00:00:00
        - logical_date: 2024-01-01 00:00:00
        - Current time: 2024-01-02 00:05:12

        Use {{ ds }} (execution_date as YYYY-MM-DD) in queries:
        SELECT * FROM orders WHERE order_date = '2024-01-01'
        """

    show_dates()

scheduling_concept()

Schedule Interval Formats

Cron Expressions

from airflow import DAG
from datetime import datetime

# ========== CRON FORMAT ==========
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday=0)
# │ │ │ │ │
# * * * * *

# ========== COMMON SCHEDULES ==========

# Every minute
DAG('every_minute', schedule='* * * * *', start_date=datetime(2024, 1, 1))

# Every 5 minutes
DAG('every_5_min', schedule='*/5 * * * *', start_date=datetime(2024, 1, 1))

# Every hour at minute 0
DAG('hourly', schedule='0 * * * *', start_date=datetime(2024, 1, 1))
# OR use preset
DAG('hourly_preset', schedule='@hourly', start_date=datetime(2024, 1, 1))

# Daily at 2 AM
DAG('daily_2am', schedule='0 2 * * *', start_date=datetime(2024, 1, 1))

# Every weekday at 6 AM (Mon-Fri)
DAG('weekdays', schedule='0 6 * * 1-5', start_date=datetime(2024, 1, 1))

# Every Monday at 9 AM
DAG('monday', schedule='0 9 * * 1', start_date=datetime(2024, 1, 1))

# First day of month at midnight
DAG('monthly', schedule='0 0 1 * *', start_date=datetime(2024, 1, 1))

# Every Sunday at 3 AM
DAG('weekly_sunday', schedule='0 3 * * 0', start_date=datetime(2024, 1, 1))

# Every 4 hours
DAG('every_4_hours', schedule='0 */4 * * *', start_date=datetime(2024, 1, 1))

# Business hours only (9 AM - 5 PM, Mon-Fri)
DAG('business_hours', schedule='0 9-17 * * 1-5', start_date=datetime(2024, 1, 1))


# ========== PRESET SCHEDULES ==========

DAG('none', schedule=None, start_date=datetime(2024, 1, 1))  # Manual trigger only
DAG('once', schedule='@once', start_date=datetime(2024, 1, 1))  # Run once then never
DAG('hourly', schedule='@hourly', start_date=datetime(2024, 1, 1))  # 0 * * * *
DAG('daily', schedule='@daily', start_date=datetime(2024, 1, 1))  # 0 0 * * *
DAG('weekly', schedule='@weekly', start_date=datetime(2024, 1, 1))  # 0 0 * * 0
DAG('monthly', schedule='@monthly', start_date=datetime(2024, 1, 1))  # 0 0 1 * *
DAG('yearly', schedule='@yearly', start_date=datetime(2024, 1, 1))  # 0 0 1 1 *


# ========== TIMEDELTA SCHEDULES ==========

from datetime import timedelta

# Every 30 minutes
DAG('every_30_min', schedule=timedelta(minutes=30), start_date=datetime(2024, 1, 1))

# Every 6 hours
DAG('every_6_hours', schedule=timedelta(hours=6), start_date=datetime(2024, 1, 1))

# Every 2 days
DAG('every_2_days', schedule=timedelta(days=2), start_date=datetime(2024, 1, 1))


# ========== COMPLEX CRON EXAMPLES ==========

# Every 15 minutes during business hours (9 AM - 5 PM, Mon-Fri)
DAG('business_15min', schedule='*/15 9-17 * * 1-5', start_date=datetime(2024, 1, 1))

# Twice daily: 8 AM and 8 PM
DAG('twice_daily', schedule='0 8,20 * * *', start_date=datetime(2024, 1, 1))

# Every quarter (Jan 1, Apr 1, Jul 1, Oct 1)
DAG('quarterly', schedule='0 0 1 1,4,7,10 *', start_date=datetime(2024, 1, 1))

# Last day of month at 11 PM (approximation - runs on 28th)
DAG('month_end', schedule='0 23 28-31 * *', start_date=datetime(2024, 1, 1))

Testing Cron Expressions

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import cron_presets
from croniter import croniter
from datetime import datetime

def test_cron_schedule(**context):
    """Test when a cron expression will run"""

    cron_expr = '0 2 * * 1-5'  # 2 AM weekdays
    start_date = datetime(2024, 1, 1)

    # Generate next 10 execution times
    cron = croniter(cron_expr, start_date)
    for i in range(10):
        next_run = cron.get_next(datetime)
        print(f"Run {i+1}: {next_run.strftime('%Y-%m-%d %H:%M %A')}")

    """
    Output:
    Run 1: 2024-01-01 02:00 Monday
    Run 2: 2024-01-02 02:00 Tuesday
    Run 3: 2024-01-03 02:00 Wednesday
    Run 4: 2024-01-04 02:00 Thursday
    Run 5: 2024-01-05 02:00 Friday
    Run 6: 2024-01-08 02:00 Monday
    ...
    """

with DAG('test_cron', schedule=None, start_date=datetime(2024, 1, 1)) as dag:
    test = PythonOperator(task_id='test', python_callable=test_cron_schedule)

Catchup and Backfill

Catchup Behavior

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# ========== CATCHUP=TRUE (Default in Airflow 1.x, False in 2.x) ==========

# If start_date is 30 days ago and catchup=True,
# Airflow creates 30 DAG runs immediately!

with DAG(
    'catchup_true',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),  # 30 days ago
    catchup=True,  # Run all missed intervals
    max_active_runs=3  # But only run 3 at a time
) as dag:

    def process_data(**context):
        execution_date = context['ds']
        print(f"Processing data for {execution_date}")

    process = PythonOperator(task_id='process', python_callable=process_data)

    """
    Behavior:
    - Creates DAG runs for Jan 1, Jan 2, Jan 3, ..., Jan 30
    - Runs up to 3 concurrently (max_active_runs=3)
    - Useful for: Historical data processing, backfilling
    - WARNING: Can overwhelm system if start_date is far in past
    """


# ========== CATCHUP=FALSE (Recommended for most cases) ==========

with DAG(
    'catchup_false',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False  # Only run for current interval
) as dag:

    process = PythonOperator(task_id='process', python_callable=process_data)

    """
    Behavior:
    - Only creates DAG run for current/latest interval
    - Skips all past intervals
    - Useful for: Most production pipelines, real-time processing
    - BEST PRACTICE: Use catchup=False unless you need backfill
    """


# ========== SELECTIVE CATCHUP ==========

with DAG(
    'selective_catchup',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,  # Don't auto-backfill
    tags=['backfill-capable']
) as dag:

    def process_data_idempotent(**context):
        """
        Idempotent processing - safe to run multiple times
        Can be manually backfilled if needed
        """
        execution_date = context['ds']

        # Delete existing data for this date (idempotency)
        delete_sql = f"DELETE FROM results WHERE date = '{execution_date}'"

        # Process and insert
        process_sql = f"""
        INSERT INTO results
        SELECT * FROM source_data WHERE date = '{execution_date}'
        """

        print(f"Processed {execution_date}")

    process = PythonOperator(task_id='process', python_callable=process_data_idempotent)

    """
    Best of both worlds:
    - catchup=False prevents automatic backfill
    - Idempotent logic allows manual backfill via CLI:
      airflow dags backfill selective_catchup -s 2024-01-01 -e 2024-01-31
    """

Manual Backfill via CLI

# ========== BACKFILL COMMANDS ==========

# Backfill specific date range
airflow dags backfill my_dag \
  --start-date 2024-01-01 \
  --end-date 2024-01-31

# Backfill with task subset
airflow dags backfill my_dag \
  --start-date 2024-01-01 \
  --end-date 2024-01-31 \
  --task-regex 'extract.*'  # Only tasks matching regex

# Dry run (don't actually execute)
airflow dags backfill my_dag \
  --start-date 2024-01-01 \
  --end-date 2024-01-31 \
  --dry-run

# Backfill with reset (clear existing runs first)
airflow dags backfill my_dag \
  --start-date 2024-01-01 \
  --end-date 2024-01-31 \
  --reset-dagruns

# Backfill specific run
airflow dags backfill my_dag \
  --start-date 2024-01-15 \
  --end-date 2024-01-15


# ========== PROGRAMMATIC BACKFILL ==========

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def trigger_backfill(**context):
    """Trigger backfill via API"""
    from airflow.api.client.local_client import Client

    client = Client(api_base_url=None, auth=None)

    # Trigger backfill for last 7 days
    end_date = datetime.now()
    start_date = end_date - timedelta(days=7)

    # This creates individual DAG runs
    current = start_date
    while current <= end_date:
        client.trigger_dag(
            dag_id='target_dag',
            run_id=f'backfill_{current.strftime("%Y%m%d")}',
            execution_date=current
        )
        current += timedelta(days=1)

with DAG('trigger_backfill_dag', schedule=None, start_date=datetime(2024, 1, 1)) as dag:
    backfill = PythonOperator(task_id='backfill', python_callable=trigger_backfill)

Depends on Past

Control whether tasks wait for previous runs to complete.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# ========== DEPENDS_ON_PAST=FALSE (Default) ==========

with DAG(
    'independent_runs',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=True
) as dag:

    task = PythonOperator(
        task_id='process',
        python_callable=lambda: print("Processing"),
        depends_on_past=False  # Each run is independent
    )

    """
    Behavior:
    - Jan 1 run can start immediately
    - Jan 2 run can start immediately (doesn't wait for Jan 1)
    - Jan 3 run can start immediately (doesn't wait for Jan 2)

    Use when: Data for each day is independent
    """


# ========== DEPENDS_ON_PAST=TRUE ==========

with DAG(
    'sequential_runs',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=True
) as dag:

    task = PythonOperator(
        task_id='process',
        python_callable=lambda: print("Processing"),
        depends_on_past=True  # Wait for previous run
    )

    """
    Behavior:
    - Jan 1 run starts immediately
    - Jan 2 run waits for Jan 1 to succeed
    - Jan 3 run waits for Jan 2 to succeed
    - Forms a sequential chain across runs

    Use when:
    - Each day's data depends on previous day
    - Incremental processing
    - Order matters
    """


# ========== WAIT_FOR_DOWNSTREAM ==========

with DAG(
    'wait_downstream',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=True
) as dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=lambda: print("Extract"),
        wait_for_downstream=True  # Wait for ALL downstream tasks in previous run
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=lambda: print("Transform")
    )

    load = PythonOperator(
        task_id='load',
        python_callable=lambda: print("Load")
    )

    extract >> transform >> load

    """
    Behavior:
    - Jan 2's 'extract' waits for Jan 1's 'extract', 'transform', AND 'load'
    - Ensures complete pipeline before starting next

    Use when:
    - Need guarantee that entire previous pipeline completed
    - Downstream tasks modify same data
    """


# ========== PRACTICAL EXAMPLE: INCREMENTAL ETL ==========

with DAG(
    'incremental_etl',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=True
) as dag:

    def extract_incremental(**context):
        """Extract new records since last successful run"""
        execution_date = context['ds']

        # Get last successful run's end time
        from airflow.models import DagRun
        last_run = DagRun.find(
            dag_id=context['dag'].dag_id,
            execution_date=context['prev_execution_date'],
            state='success'
        )

        if last_run:
            last_time = last_run[0].end_date
            sql = f"SELECT * FROM source WHERE created_at > '{last_time}'"
        else:
            # First run - get all data for date
            sql = f"SELECT * FROM source WHERE DATE(created_at) = '{execution_date}'"

        print(f"Query: {sql}")

    extract = PythonOperator(
        task_id='extract_incremental',
        python_callable=extract_incremental,
        depends_on_past=True  # Ensures sequential processing
    )

SLAs (Service Level Agreements)

Define expected task completion times and get alerted on breaches.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    """
    Called when SLA is missed

    Args:
        dag: DAG object
        task_list: List of tasks that missed SLA
        blocking_task_list: Tasks blocking SLA tasks
        slas: List of SlaMiss objects
        blocking_tis: Blocking task instances
    """
    print(f"SLA MISSED!")
    print(f"Tasks: {[t.task_id for t in task_list]}")

    for sla in slas:
        print(f"Task {sla.task_id} missed SLA by {sla.execution_date}")

    # Send alert to PagerDuty, Slack, etc.
    send_pagerduty_alert(f"SLA breach in {dag.dag_id}")


with DAG(
    'sla_example',
    schedule='@hourly',
    start_date=datetime(2024, 1, 1),
    sla_miss_callback=sla_miss_callback,  # DAG-level callback
    catchup=False
) as dag:

    # ========== TASK-LEVEL SLA ==========

    # Task must complete within 30 minutes of DAG start
    quick_task = PythonOperator(
        task_id='quick_task',
        python_callable=lambda: print("Quick processing"),
        sla=timedelta(minutes=30)
    )

    # Task must complete within 2 hours of DAG start
    long_task = PythonOperator(
        task_id='long_task',
        python_callable=lambda: print("Long processing"),
        sla=timedelta(hours=2)
    )


# ========== REAL-WORLD EXAMPLE: PRODUCTION SLAs ==========

with DAG(
    'production_pipeline_sla',
    schedule='0 2 * * *',  # 2 AM daily
    start_date=datetime(2024, 1, 1),
    sla_miss_callback=sla_miss_callback,
    catchup=False,
    tags=['production', 'critical']
) as dag:

    def send_sla_alert(context):
        """Custom SLA alert"""
        from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

        task = context['task']
        execution_date = context['execution_date']

        SlackWebhookOperator(
            task_id='sla_alert',
            http_conn_id='slack_webhook',
            message=f"""
            :warning: *SLA Breach*
            Task: `{task.task_id}`
            DAG: `{task.dag_id}`
            Execution: {execution_date}
            Expected completion: {task.sla}
            """,
            channel='#data-alerts'
        ).execute(context)

    # Critical: Must complete by 3 AM (1 hour after start)
    extract = PythonOperator(
        task_id='extract',
        python_callable=lambda: print("Extract"),
        sla=timedelta(hours=1),
        on_execute_callback=lambda ctx: print(f"Started at {datetime.now()}")
    )

    # Must complete by 4 AM (2 hours after start)
    transform = PythonOperator(
        task_id='transform',
        python_callable=lambda: print("Transform"),
        sla=timedelta(hours=2)
    )

    # Must complete by 5 AM (3 hours) - pipeline must be done before business hours
    load = PythonOperator(
        task_id='load',
        python_callable=lambda: print("Load"),
        sla=timedelta(hours=3),
        on_execute_callback=send_sla_alert
    )

    extract >> transform >> load


# ========== CHECKING SLA MISSES ==========

from airflow.models import SlaMiss
from airflow.providers.postgres.hooks.postgres import PostgresHook

def check_sla_misses(**context):
    """Query SLA misses from metadata DB"""

    hook = PostgresHook(postgres_conn_id='airflow_db')

    # Get recent SLA misses
    sla_misses = hook.get_pandas_df("""
        SELECT
            dag_id,
            task_id,
            execution_date,
            timestamp as sla_miss_time,
            email_sent,
            notification_sent
        FROM sla_miss
        WHERE timestamp >= NOW() - INTERVAL '7 days'
        ORDER BY timestamp DESC
    """)

    print(f"SLA misses in last 7 days: {len(sla_misses)}")
    print(sla_misses)

with DAG('check_slas', schedule=None, start_date=datetime(2024, 1, 1)) as dag:
    check = PythonOperator(task_id='check', python_callable=check_sla_misses)

DAG Run States and Timeouts

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import time

with DAG(
    'timeouts_example',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    dagrun_timeout=timedelta(hours=4),  # Kill entire DAG run after 4 hours
    catchup=False
) as dag:

    # ========== TASK-LEVEL TIMEOUT ==========

    def long_running_task():
        """Simulates long-running task"""
        print("Starting long task...")
        time.sleep(7200)  # 2 hours
        print("Completed")

    quick_timeout = PythonOperator(
        task_id='quick_timeout',
        python_callable=long_running_task,
        execution_timeout=timedelta(minutes=30)  # Kill after 30 min
        # Task will fail with AirflowTaskTimeout
    )

    # ========== RETRY CONFIGURATION ==========

    def flaky_task():
        """Might fail randomly"""
        import random
        if random.random() < 0.7:
            raise Exception("Temporary failure")
        return "Success"

    with_retries = PythonOperator(
        task_id='with_retries',
        python_callable=flaky_task,
        retries=5,
        retry_delay=timedelta(minutes=5),
        retry_exponential_backoff=True,  # 5min, 10min, 20min, 40min, 80min
        max_retry_delay=timedelta(hours=1)  # Cap backoff at 1 hour
    )

    # ========== POOL SLOTS FOR RESOURCE MANAGEMENT ==========

    # Create pool in UI: Admin → Pools → Add
    # Name: database_connections, Slots: 5

    db_task_1 = PythonOperator(
        task_id='db_task_1',
        python_callable=lambda: print("Using DB connection"),
        pool='database_connections',
        pool_slots=2  # Uses 2 of 5 available slots
    )

    db_task_2 = PythonOperator(
        task_id='db_task_2',
        python_callable=lambda: print("Using DB connection"),
        pool='database_connections',
        pool_slots=2
    )

    # Only 5 slots available, so if 3 tasks need 2 slots each,
    # third task waits for slot to free up


# ========== DAG-LEVEL CONCURRENCY ==========

with DAG(
    'concurrency_control',
    schedule='@hourly',
    start_date=datetime(2024, 1, 1),
    max_active_runs=3,  # Max 3 concurrent DAG runs
    max_active_tasks=10,  # Max 10 concurrent tasks across all runs
    catchup=True
) as dag:

    tasks = []
    for i in range(20):
        task = PythonOperator(
            task_id=f'task_{i}',
            python_callable=lambda: print(f"Task {i}")
        )
        tasks.append(task)

    """
    Behavior:
    - Only 3 DAG runs execute concurrently
    - Within those 3 runs, max 10 tasks run concurrently
    - Remaining tasks queue until slots available
    """

Timetables (Airflow 2.2+)

Advanced scheduling beyond cron and timedelta.
from airflow import DAG
from airflow.timetables.interval import CronDataIntervalTimetable
from airflow.operators.python import PythonOperator
from datetime import datetime

# ========== CUSTOM BUSINESS DAY SCHEDULE ==========

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from pendulum import DateTime
import pendulum

class BusinessDayTimetable(Timetable):
    """Run on business days only (Monday-Friday)"""

    def next_dagrun_info(self, last_automated_data_interval, restriction):
        if last_automated_data_interval is not None:
            next_start = last_automated_data_interval.end
        else:
            next_start = restriction.earliest

        # Skip weekends
        while next_start.day_of_week in (6, 7):  # Saturday, Sunday
            next_start = next_start.add(days=1)

        next_end = next_start.add(days=1)

        return DagRunInfo.interval(start=next_start, end=next_end)


# Use custom timetable
with DAG(
    'business_days_only',
    timetable=BusinessDayTimetable(),
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:

    task = PythonOperator(
        task_id='business_task',
        python_callable=lambda: print("Running on business day")
    )


# ========== CRON WITH DATA INTERVAL ==========

with DAG(
    'custom_interval',
    timetable=CronDataIntervalTimetable(
        cron='0 2 * * *',  # Run at 2 AM
        timezone='America/New_York'
    ),
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:

    task = PythonOperator(
        task_id='task',
        python_callable=lambda **ctx: print(f"Data interval: {ctx['data_interval_start']} to {ctx['data_interval_end']}")
    )

Best Practices

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# ========== BEST PRACTICE DAG ==========

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,  # Usually False for independence
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,  # Retry failed tasks
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'execution_timeout': timedelta(hours=1),  # Task timeout
}

with DAG(
    'best_practice_scheduling',
    default_args=default_args,
    description='Production-ready scheduling configuration',
    schedule='0 2 * * *',  # 2 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,  # Don't backfill by default
    max_active_runs=1,  # One run at a time
    dagrun_timeout=timedelta(hours=3),  # DAG timeout
    tags=['production', 'critical', 'daily'],
    doc_md="""
    # Daily ETL Pipeline

    Runs at 2 AM daily to process previous day's data.

    **SLA**: Must complete by 5 AM
    **Owner**: Data Platform Team
    **On-call**: #data-oncall
    """
) as dag:

    def idempotent_processing(**context):
        """
        BEST PRACTICE: Make tasks idempotent
        Can be re-run safely without duplicating data
        """
        execution_date = context['ds']

        # Clear existing data for date
        print(f"DELETE FROM results WHERE date = '{execution_date}'")

        # Process and insert
        print(f"INSERT INTO results SELECT * FROM source WHERE date = '{execution_date}'")

    process = PythonOperator(
        task_id='process',
        python_callable=idempotent_processing,
        sla=timedelta(hours=3)  # Alert if not done by 5 AM
    )

Summary

You now understand:
  • Scheduling Model: execution_date vs actual run time
  • Cron Expressions: All common patterns and custom schedules
  • Catchup & Backfill: When to use and how to control
  • SLAs: Setting expectations and alerting on breaches
  • Timeouts: Task and DAG-level execution limits
  • Concurrency: Controlling parallel execution
  • Best Practices: Production-ready scheduling configuration
Key Takeaways:
  1. execution_date is the start of the data interval, not when the DAG runs
  2. Use catchup=False unless you need automatic backfilling
  3. Set SLAs for critical pipelines to catch slowdowns early
  4. Make tasks idempotent so they can be safely retried/backfilled
  5. Use pools to limit resource consumption
  6. Set timeouts to prevent hung tasks from blocking pipelines

Next Steps

Module 5: Sensors and Hooks - Waiting and Connecting

Master sensors for waiting, hooks for connections, and custom implementations