Skip to main content

Airflow Operators: The Building Blocks

Module Level: Core Foundation Prerequisites: Module 2 (Core Concepts) Duration: 3-4 hours Key Concepts: Built-in operators, custom operators, operator patterns, best practices

What are Operators?

Operators are the fundamental building blocks of Airflow tasks. They define what a task does - whether it’s running a Python function, executing a bash command, transferring data, or waiting for a condition.
"""
Operator Hierarchy:

BaseOperator (abstract)
├── BashOperator
├── PythonOperator
├── EmailOperator
├── SQLOperator
│   ├── PostgresOperator
│   ├── MySqlOperator
│   └── SnowflakeOperator
├── TransferOperator
│   ├── S3ToRedshiftOperator
│   └── PostgresToGcsOperator
└── Sensor (special operator type)
    ├── FileSensor
    ├── S3KeySensor
    └── HttpSensor
"""

Operator vs Task

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG('operator_vs_task', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    # PythonOperator is the OPERATOR CLASS
    # my_task is the TASK INSTANCE (instantiated operator)

    my_task = PythonOperator(
        task_id='my_task',
        python_callable=lambda: print('Hello')
    )

    # The operator defines behavior
    # The task is the execution unit in the DAG

Core Operators

BashOperator: Execute Shell Commands

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG('bash_examples', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    # ========== SIMPLE COMMAND ==========
    simple_bash = BashOperator(
        task_id='simple',
        bash_command='echo "Hello from Bash"'
    )

    # ========== MULTI-LINE SCRIPT ==========
    multi_line = BashOperator(
        task_id='multi_line',
        bash_command="""
        set -e  # Exit on error

        echo "Step 1: Creating directory"
        mkdir -p /tmp/airflow_test

        echo "Step 2: Writing file"
        echo "Data for {{ ds }}" > /tmp/airflow_test/data_{{ ds }}.txt

        echo "Step 3: Listing files"
        ls -la /tmp/airflow_test

        echo "Completed successfully"
        """
    )

    # ========== WITH ENVIRONMENT VARIABLES ==========
    with_env = BashOperator(
        task_id='with_env',
        bash_command='echo "Execution date: $EXEC_DATE, DAG: $DAG_ID"',
        env={
            'EXEC_DATE': '{{ ds }}',
            'DAG_ID': '{{ dag.dag_id }}',
            'CUSTOM_VAR': 'my_value'
        }
    )

    # ========== RUNNING EXTERNAL SCRIPT ==========
    run_script = BashOperator(
        task_id='run_script',
        bash_command='/opt/scripts/process_data.sh {{ ds }} {{ execution_date.hour }}',
        cwd='/opt/scripts'  # Working directory
    )

    # ========== WITH RETURN VALUE ==========
    # BashOperator returns last line of stdout
    get_value = BashOperator(
        task_id='get_value',
        bash_command='echo "result: 42"',
        do_xcom_push=True  # Push to XCom
    )

    use_value = BashOperator(
        task_id='use_value',
        bash_command='echo "Previous result: {{ ti.xcom_pull(task_ids=\"get_value\") }}"'
    )

    # ========== ERROR HANDLING ==========
    with_retry = BashOperator(
        task_id='with_retry',
        bash_command="""
        # Script that might fail
        if [ $(( RANDOM % 2 )) -eq 0 ]; then
            echo "Success!"
            exit 0
        else
            echo "Failed!"
            exit 1
        fi
        """,
        retries=3,
        retry_delay=timedelta(seconds=30)
    )

    # ========== CONDITIONAL EXECUTION ==========
    conditional = BashOperator(
        task_id='conditional',
        bash_command="""
        if [ -f /tmp/data_ready.flag ]; then
            echo "Data is ready, processing..."
            python /scripts/process.py
        else
            echo "Data not ready, skipping..."
            exit 0
        fi
        """
    )

    get_value >> use_value


# ========== REAL-WORLD EXAMPLE: ETL WITH BASH ==========

with DAG('etl_bash', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    extract = BashOperator(
        task_id='extract_data',
        bash_command="""
        # Extract data from PostgreSQL
        psql -h $DB_HOST -U $DB_USER -d $DB_NAME -c \
        "COPY (SELECT * FROM orders WHERE date = '{{ ds }}') \
         TO '/tmp/orders_{{ ds }}.csv' WITH CSV HEADER"
        """,
        env={
            'DB_HOST': 'prod-db.company.com',
            'DB_USER': 'airflow',
            'DB_NAME': 'production',
            'PGPASSWORD': '{{ var.value.db_password }}'
        }
    )

    compress = BashOperator(
        task_id='compress_data',
        bash_command='gzip /tmp/orders_{{ ds }}.csv'
    )

    upload_s3 = BashOperator(
        task_id='upload_to_s3',
        bash_command="""
        aws s3 cp /tmp/orders_{{ ds }}.csv.gz \
        s3://data-lake/raw/orders/{{ ds }}/orders.csv.gz
        """,
        env={'AWS_PROFILE': 'production'}
    )

    cleanup = BashOperator(
        task_id='cleanup',
        bash_command='rm -f /tmp/orders_{{ ds }}.csv.gz',
        trigger_rule='all_done'  # Run even if upstream fails
    )

    extract >> compress >> upload_s3 >> cleanup

PythonOperator: Execute Python Functions

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import json

with DAG('python_examples', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    # ========== SIMPLE FUNCTION ==========
    def simple_function():
        print("Hello from Python!")
        return "Task completed"

    simple_task = PythonOperator(
        task_id='simple',
        python_callable=simple_function
    )

    # ========== FUNCTION WITH ARGUMENTS ==========
    def process_data(input_file, output_format, multiplier=1):
        print(f"Processing {input_file}")
        print(f"Output format: {output_format}")
        print(f"Multiplier: {multiplier}")
        return {"status": "success", "records": 100}

    with_args = PythonOperator(
        task_id='with_args',
        python_callable=process_data,
        op_kwargs={
            'input_file': '/data/input.csv',
            'output_format': 'parquet',
            'multiplier': 2
        }
    )

    # ========== FUNCTION WITH CONTEXT ==========
    def use_context(**context):
        """Access Airflow context variables"""
        execution_date = context['execution_date']
        dag_id = context['dag'].dag_id
        task_id = context['task'].task_id
        ti = context['ti']  # Task instance

        print(f"Running {dag_id}.{task_id}")
        print(f"Execution date: {execution_date}")

        # Pull XCom from previous task
        previous_result = ti.xcom_pull(task_ids='previous_task')
        print(f"Previous result: {previous_result}")

        # Push to XCom
        ti.xcom_push(key='my_key', value={'data': [1, 2, 3]})

        return "Done"

    context_task = PythonOperator(
        task_id='with_context',
        python_callable=use_context
    )

    # ========== FUNCTION WITH MIXED ARGS AND CONTEXT ==========
    def mixed_function(param1, param2, **context):
        execution_date = context['ds']
        print(f"Params: {param1}, {param2}")
        print(f"Date: {execution_date}")

    mixed_task = PythonOperator(
        task_id='mixed',
        python_callable=mixed_function,
        op_kwargs={'param1': 'value1', 'param2': 'value2'}
    )

    # ========== LAMBDA FUNCTION ==========
    lambda_task = PythonOperator(
        task_id='lambda',
        python_callable=lambda: print("Quick task")
    )

    # ========== RETURN VALUE FOR XCOM ==========
    def return_data():
        return {
            'records': 1000,
            'timestamp': datetime.now().isoformat(),
            'status': 'completed'
        }

    return_task = PythonOperator(
        task_id='return_data',
        python_callable=return_data,
        do_xcom_push=True  # Default is True
    )


# ========== REAL-WORLD EXAMPLE: DATA PROCESSING ==========

with DAG('data_processing_python', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    def extract_from_api(**context):
        """Extract data from REST API"""
        import requests

        execution_date = context['ds']
        response = requests.get(
            'https://api.example.com/data',
            params={
                'date': execution_date,
                'api_key': context['var']['value']['api_key']
            },
            timeout=30
        )
        response.raise_for_status()

        data = response.json()
        print(f"Extracted {len(data['records'])} records")

        # Save to file
        with open(f'/tmp/raw_data_{execution_date}.json', 'w') as f:
            json.dump(data, f)

        return len(data['records'])

    def transform_data(**context):
        """Transform and clean data"""
        import pandas as pd

        execution_date = context['ds']
        record_count = context['ti'].xcom_pull(task_ids='extract')

        # Read raw data
        with open(f'/tmp/raw_data_{execution_date}.json', 'r') as f:
            data = json.load(f)

        # Transform with pandas
        df = pd.DataFrame(data['records'])

        # Data cleaning
        df = df.dropna(subset=['id', 'amount'])
        df['amount'] = df['amount'].astype(float)
        df['processed_at'] = datetime.now()

        # Save processed data
        df.to_parquet(f'/tmp/processed_data_{execution_date}.parquet')

        print(f"Transformed {len(df)} records (from {record_count})")
        return len(df)

    def validate_data(**context):
        """Data quality checks"""
        import pandas as pd

        execution_date = context['ds']
        df = pd.read_parquet(f'/tmp/processed_data_{execution_date}.parquet')

        # Validation rules
        assert len(df) > 0, "No data to process"
        assert df['amount'].min() >= 0, "Negative amounts found"
        assert df['id'].is_unique, "Duplicate IDs found"

        print(f"Validation passed: {len(df)} records")
        return True

    def load_to_database(**context):
        """Load to PostgreSQL"""
        import pandas as pd
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        execution_date = context['ds']
        df = pd.read_parquet(f'/tmp/processed_data_{execution_date}.parquet')

        hook = PostgresHook(postgres_conn_id='warehouse')
        engine = hook.get_sqlalchemy_engine()

        df.to_sql(
            'processed_data',
            engine,
            if_exists='append',
            index=False,
            method='multi',
            chunksize=1000
        )

        print(f"Loaded {len(df)} records to database")

    extract = PythonOperator(task_id='extract', python_callable=extract_from_api)
    transform = PythonOperator(task_id='transform', python_callable=transform_data)
    validate = PythonOperator(task_id='validate', python_callable=validate_data)
    load = PythonOperator(task_id='load', python_callable=load_to_database)

    extract >> transform >> validate >> load

EmailOperator: Send Notifications

from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG('email_examples', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    # ========== SIMPLE EMAIL ==========
    simple_email = EmailOperator(
        task_id='simple_email',
        to='[email protected]',
        subject='Airflow Notification',
        html_content='<p>Task completed successfully!</p>'
    )

    # ========== EMAIL WITH TEMPLATES ==========
    templated_email = EmailOperator(
        task_id='templated_email',
        to=['[email protected]', '[email protected]'],
        cc=['[email protected]'],
        subject='Daily Report for {{ ds }}',
        html_content="""
        <h2>Daily ETL Report</h2>
        <p><strong>Execution Date:</strong> {{ ds }}</p>
        <p><strong>DAG:</strong> {{ dag.dag_id }}</p>
        <p><strong>Status:</strong> Completed</p>

        <h3>Summary</h3>
        <ul>
            <li>Records processed: {{ ti.xcom_pull(task_ids='process_data') }}</li>
            <li>Duration: {{ ti.duration }}s</li>
        </ul>
        """
    )

    # ========== EMAIL WITH ATTACHMENTS ==========
    email_with_attachment = EmailOperator(
        task_id='with_attachment',
        to='[email protected]',
        subject='Daily Report with CSV',
        html_content='Please find attached the daily report.',
        files=['/tmp/report_{{ ds }}.csv']
    )

    # ========== CONDITIONAL EMAIL ON FAILURE ==========
    def task_that_might_fail():
        import random
        if random.random() < 0.5:
            raise Exception("Random failure!")

    risky_task = PythonOperator(
        task_id='risky_task',
        python_callable=task_that_might_fail,
        email_on_failure=True,
        email=['[email protected]']
    )

    # ========== EMAIL IN CALLBACK ==========
    def send_custom_email(context):
        """Custom email on failure"""
        task_instance = context['task_instance']
        exception = context.get('exception')

        EmailOperator(
            task_id='failure_email',
            to='[email protected]',
            subject=f'ALERT: Task Failed - {task_instance.task_id}',
            html_content=f"""
            <h2>Task Failure Alert</h2>
            <p><strong>Task:</strong> {task_instance.task_id}</p>
            <p><strong>DAG:</strong> {task_instance.dag_id}</p>
            <p><strong>Execution Date:</strong> {task_instance.execution_date}</p>
            <p><strong>Error:</strong> {exception}</p>
            <p><strong>Log:</strong> <a href="{task_instance.log_url}">View Logs</a></p>
            """
        ).execute(context)

    task_with_callback = PythonOperator(
        task_id='task_with_callback',
        python_callable=task_that_might_fail,
        on_failure_callback=send_custom_email
    )

SQL Operators: Database Operations

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.python import PythonOperator
from datetime import datetime

# ========== PostgreSQL ==========

with DAG('postgres_examples', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='my_postgres',
        sql="""
        CREATE TABLE IF NOT EXISTS daily_sales (
            id SERIAL PRIMARY KEY,
            date DATE NOT NULL,
            product VARCHAR(100),
            amount DECIMAL(10, 2),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """
    )

    insert_data = PostgresOperator(
        task_id='insert_data',
        postgres_conn_id='my_postgres',
        sql="""
        INSERT INTO daily_sales (date, product, amount)
        SELECT
            '{{ ds }}'::date,
            product_name,
            SUM(sale_amount)
        FROM raw_sales
        WHERE sale_date = '{{ ds }}'
        GROUP BY product_name;
        """
    )

    # SQL from file
    run_sql_file = PostgresOperator(
        task_id='run_sql_file',
        postgres_conn_id='my_postgres',
        sql='/opt/airflow/sql/daily_aggregation.sql',
        params={'execution_date': '{{ ds }}'}  # Pass parameters to SQL
    )

    create_table >> insert_data >> run_sql_file


# ========== DYNAMIC SQL GENERATION ==========

with DAG('dynamic_sql', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    def generate_insert_queries(**context):
        """Generate SQL based on data"""
        tables = ['customers', 'orders', 'products']

        for table in tables:
            PostgresOperator(
                task_id=f'truncate_{table}',
                postgres_conn_id='my_postgres',
                sql=f'TRUNCATE TABLE staging.{table};'
            ).execute(context)

            PostgresOperator(
                task_id=f'load_{table}',
                postgres_conn_id='my_postgres',
                sql=f"""
                INSERT INTO staging.{table}
                SELECT * FROM source.{table}
                WHERE updated_at >= '{{{{ ds }}}}'::date;
                """
            ).execute(context)

    dynamic_sql = PythonOperator(
        task_id='dynamic_sql',
        python_callable=generate_insert_queries
    )


# ========== SQL CHECK OPERATORS ==========

from airflow.providers.common.sql.operators.sql import (
    SQLCheckOperator,
    SQLValueCheckOperator,
    SQLThresholdCheckOperator
)

with DAG('sql_checks', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    # Check that query returns True
    check_data_exists = SQLCheckOperator(
        task_id='check_data_exists',
        conn_id='my_postgres',
        sql="""
        SELECT COUNT(*) > 0
        FROM orders
        WHERE order_date = '{{ ds }}'
        """
    )

    # Check specific value
    check_record_count = SQLValueCheckOperator(
        task_id='check_record_count',
        conn_id='my_postgres',
        sql="SELECT COUNT(*) FROM orders WHERE order_date = '{{ ds }}'",
        pass_value=1000,  # Expected value
        tolerance=0.1  # 10% tolerance
    )

    # Check threshold
    check_revenue = SQLThresholdCheckOperator(
        task_id='check_revenue',
        conn_id='my_postgres',
        sql="SELECT SUM(amount) FROM orders WHERE order_date = '{{ ds }}'",
        min_threshold=10000,  # Minimum expected
        max_threshold=1000000  # Maximum expected
    )

    [check_data_exists, check_record_count, check_revenue]

Transfer Operators: Moving Data

from airflow import DAG
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.postgres.transfers.postgres_to_gcs import PostgresToGCSOperator
from datetime import datetime

with DAG('transfer_examples', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    # ========== S3 to Redshift ==========
    s3_to_redshift = S3ToRedshiftOperator(
        task_id='s3_to_redshift',
        s3_bucket='data-lake',
        s3_key='processed/sales/{{ ds }}/sales.parquet',
        redshift_conn_id='redshift',
        schema='analytics',
        table='sales',
        copy_options=['PARQUET', 'COMPUPDATE OFF'],
        method='REPLACE'  # or 'APPEND', 'UPSERT'
    )

    # ========== GCS to BigQuery ==========
    gcs_to_bigquery = GCSToBigQueryOperator(
        task_id='gcs_to_bigquery',
        bucket='data-bucket',
        source_objects=['data/{{ ds }}/*.parquet'],
        destination_project_dataset_table='project.dataset.table',
        source_format='PARQUET',
        write_disposition='WRITE_APPEND',
        create_disposition='CREATE_IF_NEEDED',
        autodetect=True
    )

    # ========== Postgres to GCS ==========
    postgres_to_gcs = PostgresToGCSOperator(
        task_id='postgres_to_gcs',
        postgres_conn_id='my_postgres',
        sql='SELECT * FROM orders WHERE date = {{ ds }}',
        bucket='data-bucket',
        filename='exports/orders/{{ ds }}/orders.json',
        export_format='json',
        gzip=True
    )

Custom Operators

Create reusable operators for common patterns.

Creating a Custom Operator

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import Any, Dict
import requests

class SlackNotificationOperator(BaseOperator):
    """
    Custom operator to send Slack notifications

    :param slack_webhook_url: Slack webhook URL
    :param message: Message to send
    :param channel: Slack channel (optional)
    :param username: Bot username (optional)
    """

    # Define UI color
    ui_color = '#1F77B4'
    ui_fgcolor = '#FFFFFF'

    # Define template fields (can use Jinja templates)
    template_fields = ('message', 'channel')

    @apply_defaults
    def __init__(
        self,
        slack_webhook_url: str,
        message: str,
        channel: str = None,
        username: str = 'Airflow Bot',
        icon_emoji: str = ':robot_face:',
        **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.slack_webhook_url = slack_webhook_url
        self.message = message
        self.channel = channel
        self.username = username
        self.icon_emoji = icon_emoji

    def execute(self, context: Dict[str, Any]) -> None:
        """
        Main execution method
        Called by Airflow when task runs
        """
        payload = {
            'text': self.message,
            'username': self.username,
            'icon_emoji': self.icon_emoji
        }

        if self.channel:
            payload['channel'] = self.channel

        # Log execution
        self.log.info(f"Sending Slack notification to {self.channel or 'default channel'}")
        self.log.info(f"Message: {self.message}")

        # Send request
        response = requests.post(
            self.slack_webhook_url,
            json=payload,
            timeout=10
        )
        response.raise_for_status()

        self.log.info("Slack notification sent successfully")


# Usage
with DAG('custom_operator', start_date=datetime(2024, 1, 1), schedule=None) as dag:

    notify = SlackNotificationOperator(
        task_id='notify_slack',
        slack_webhook_url='https://hooks.slack.com/services/YOUR/WEBHOOK/URL',
        message='DAG {{ dag.dag_id }} completed for {{ ds }}',
        channel='#data-alerts'
    )

Advanced Custom Operator with Hook

from airflow.models import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import pandas as pd
from typing import List

class PostgresToS3Operator(BaseOperator):
    """
    Transfer data from Postgres to S3 as Parquet

    :param postgres_conn_id: Postgres connection ID
    :param sql: SQL query to extract data
    :param s3_bucket: Target S3 bucket
    :param s3_key: Target S3 key (supports templating)
    :param chunk_size: Number of rows per chunk
    """

    template_fields = ('sql', 's3_key')
    template_ext = ('.sql',)
    ui_color = '#ededed'

    def __init__(
        self,
        *,
        postgres_conn_id: str,
        sql: str,
        s3_bucket: str,
        s3_key: str,
        s3_conn_id: str = 'aws_default',
        chunk_size: int = 10000,
        **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.postgres_conn_id = postgres_conn_id
        self.sql = sql
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key
        self.s3_conn_id = s3_conn_id
        self.chunk_size = chunk_size

    def execute(self, context: dict) -> str:
        """Extract from Postgres and load to S3"""

        self.log.info(f"Extracting data from Postgres using query: {self.sql}")

        # Use Postgres Hook
        postgres_hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
        df = postgres_hook.get_pandas_df(sql=self.sql)

        self.log.info(f"Extracted {len(df)} rows")

        # Convert to Parquet
        import io
        buffer = io.BytesIO()
        df.to_parquet(buffer, index=False, compression='snappy')
        buffer.seek(0)

        # Upload to S3
        s3_hook = S3Hook(aws_conn_id=self.s3_conn_id)
        s3_hook.load_bytes(
            bytes_data=buffer.getvalue(),
            key=self.s3_key,
            bucket_name=self.s3_bucket,
            replace=True
        )

        s3_path = f"s3://{self.s3_bucket}/{self.s3_key}"
        self.log.info(f"Uploaded data to {s3_path}")

        return s3_path


# Usage
with DAG('postgres_to_s3', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    export_sales = PostgresToS3Operator(
        task_id='export_sales',
        postgres_conn_id='prod_db',
        sql="""
        SELECT *
        FROM sales
        WHERE sale_date = '{{ ds }}'
        """,
        s3_bucket='data-lake',
        s3_key='exports/sales/{{ ds }}/sales.parquet'
    )

Provider Packages

Airflow providers offer hundreds of pre-built operators for popular services.

Installing Providers

# Install specific provider
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-google
pip install apache-airflow-providers-snowflake
pip install apache-airflow-providers-apache-spark

# Install multiple providers
pip install apache-airflow-providers-amazon \
            apache-airflow-providers-google \
            apache-airflow-providers-postgres

AWS Providers

from airflow import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.providers.amazon.aws.operators.emr import (
    EmrCreateJobFlowOperator,
    EmrAddStepsOperator,
    EmrTerminateJobFlowOperator
)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime

with DAG('aws_example', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    # Wait for file in S3
    wait_for_file = S3KeySensor(
        task_id='wait_for_file',
        bucket_name='input-bucket',
        bucket_key='data/{{ ds }}/input.csv',
        aws_conn_id='aws_default',
        timeout=3600,  # 1 hour
        poke_interval=60  # Check every minute
    )

    # Create EMR cluster
    create_emr = EmrCreateJobFlowOperator(
        task_id='create_emr',
        job_flow_overrides={
            'Name': 'airflow-emr-{{ ds }}',
            'ReleaseLabel': 'emr-6.9.0',
            'Applications': [{'Name': 'Spark'}],
            'Instances': {
                'InstanceGroups': [
                    {
                        'Name': 'Master',
                        'Market': 'SPOT',
                        'InstanceRole': 'MASTER',
                        'InstanceType': 'm5.xlarge',
                        'InstanceCount': 1
                    },
                    {
                        'Name': 'Worker',
                        'Market': 'SPOT',
                        'InstanceRole': 'CORE',
                        'InstanceType': 'm5.xlarge',
                        'InstanceCount': 2
                    }
                ],
                'KeepJobFlowAliveWhenNoSteps': True,
                'TerminationProtected': False
            }
        }
    )

    # Add Spark job
    add_steps = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr', key='return_value') }}",
        steps=[
            {
                'Name': 'Process Data',
                'ActionOnFailure': 'CONTINUE',
                'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': [
                        'spark-submit',
                        '--deploy-mode', 'cluster',
                        's3://scripts/process_data.py',
                        '--input', 's3://input-bucket/data/{{ ds }}/',
                        '--output', 's3://output-bucket/processed/{{ ds }}/'
                    ]
                }
            }
        ]
    )

    # Terminate cluster
    terminate_emr = EmrTerminateJobFlowOperator(
        task_id='terminate_emr',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr', key='return_value') }}"
    )

    wait_for_file >> create_emr >> add_steps >> terminate_emr

Google Cloud Providers

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateEmptyTableOperator,
    BigQueryInsertJobOperator
)
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateClusterOperator,
    DataprocSubmitJobOperator,
    DataprocDeleteClusterOperator
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime

with DAG('gcp_example', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    PROJECT_ID = 'my-project'
    REGION = 'us-central1'
    CLUSTER_NAME = 'airflow-cluster-{{ ds_nodash }}'

    # Create Dataproc cluster
    create_cluster = DataprocCreateClusterOperator(
        task_id='create_cluster',
        project_id=PROJECT_ID,
        region=REGION,
        cluster_name=CLUSTER_NAME,
        cluster_config={
            'master_config': {
                'num_instances': 1,
                'machine_type_uri': 'n1-standard-2'
            },
            'worker_config': {
                'num_instances': 2,
                'machine_type_uri': 'n1-standard-2'
            }
        }
    )

    # Submit PySpark job
    pyspark_job = DataprocSubmitJobOperator(
        task_id='pyspark_job',
        project_id=PROJECT_ID,
        region=REGION,
        job={
            'reference': {'project_id': PROJECT_ID},
            'placement': {'cluster_name': CLUSTER_NAME},
            'pyspark_job': {
                'main_python_file_uri': 'gs://bucket/scripts/process.py',
                'args': ['--date', '{{ ds }}']
            }
        }
    )

    # Load to BigQuery
    load_bigquery = GCSToBigQueryOperator(
        task_id='load_bigquery',
        bucket='data-bucket',
        source_objects=['processed/{{ ds }}/*.parquet'],
        destination_project_dataset_table=f'{PROJECT_ID}.analytics.sales',
        source_format='PARQUET',
        write_disposition='WRITE_TRUNCATE'
    )

    # Delete cluster
    delete_cluster = DataprocDeleteClusterOperator(
        task_id='delete_cluster',
        project_id=PROJECT_ID,
        region=REGION,
        cluster_name=CLUSTER_NAME,
        trigger_rule='all_done'
    )

    create_cluster >> pyspark_job >> load_bigquery >> delete_cluster

Snowflake Provider

from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator
from datetime import datetime

with DAG('snowflake_example', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    # Create table
    create_table = SnowflakeOperator(
        task_id='create_table',
        snowflake_conn_id='snowflake_default',
        sql="""
        CREATE TABLE IF NOT EXISTS SALES (
            sale_id NUMBER,
            sale_date DATE,
            amount NUMBER(10,2),
            product VARCHAR(100)
        );
        """
    )

    # Load from S3
    load_data = S3ToSnowflakeOperator(
        task_id='load_data',
        snowflake_conn_id='snowflake_default',
        s3_keys=['s3://bucket/data/{{ ds }}/sales.csv'],
        table='SALES',
        stage='MY_STAGE',
        file_format='(TYPE=CSV, SKIP_HEADER=1)'
    )

    # Run analytics query
    analyze = SnowflakeOperator(
        task_id='analyze',
        snowflake_conn_id='snowflake_default',
        sql="""
        INSERT INTO DAILY_SUMMARY
        SELECT
            sale_date,
            COUNT(*) as num_sales,
            SUM(amount) as total_revenue,
            AVG(amount) as avg_sale
        FROM SALES
        WHERE sale_date = '{{ ds }}'
        GROUP BY sale_date;
        """
    )

    create_table >> load_data >> analyze

Operator Best Practices

1. Idempotency

# BAD: Not idempotent - running twice creates duplicates
bad_insert = PostgresOperator(
    task_id='bad_insert',
    sql="INSERT INTO sales SELECT * FROM staging_sales WHERE date = '{{ ds }}'"
)

# GOOD: Idempotent - can run multiple times safely
good_insert = PostgresOperator(
    task_id='good_insert',
    sql="""
    DELETE FROM sales WHERE date = '{{ ds }}';
    INSERT INTO sales SELECT * FROM staging_sales WHERE date = '{{ ds }}';
    """
)

# BETTER: Upsert pattern
upsert = PostgresOperator(
    task_id='upsert',
    sql="""
    INSERT INTO sales (id, date, amount)
    SELECT id, date, amount FROM staging_sales WHERE date = '{{ ds }}'
    ON CONFLICT (id) DO UPDATE SET
        amount = EXCLUDED.amount,
        updated_at = CURRENT_TIMESTAMP;
    """
)

2. Resource Management

# Use pools to limit concurrent resource usage
heavy_task = PythonOperator(
    task_id='heavy_task',
    python_callable=process_large_file,
    pool='database_connections',  # Create in UI: Admin → Pools
    pool_slots=2  # Uses 2 of the pool's slots
)

3. Error Handling

def robust_function(**context):
    """Function with proper error handling"""
    try:
        # Main logic
        result = process_data()

        # Validate result
        if result is None:
            raise ValueError("Process returned no data")

        return result

    except Exception as e:
        # Log error details
        context['task_instance'].log.error(f"Error processing data: {str(e)}")

        # Optionally send alert
        send_alert(f"Task failed: {str(e)}")

        # Re-raise to mark task as failed
        raise

robust_task = PythonOperator(
    task_id='robust',
    python_callable=robust_function,
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True
)

4. Testing Operators

# test_operators.py
import pytest
from airflow.models import DagBag
from datetime import datetime

def test_dag_loaded():
    """Test that DAG loads without errors"""
    dagbag = DagBag(dag_folder='dags/', include_examples=False)
    assert len(dagbag.import_errors) == 0, "DAG import errors"

def test_task_count():
    """Test expected number of tasks"""
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_pipeline')
    assert len(dag.tasks) == 5

def test_task_dependencies():
    """Test task dependency structure"""
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_pipeline')

    extract = dag.get_task('extract')
    transform = dag.get_task('transform')

    assert transform in extract.downstream_list

Summary

You now understand:
  • Core Operators: BashOperator, PythonOperator, EmailOperator, SQL operators
  • Transfer Operators: Moving data between systems
  • Custom Operators: Creating reusable operator classes
  • Provider Packages: AWS, GCP, Snowflake, and 100+ integrations
  • Best Practices: Idempotency, resource management, error handling
Key Takeaways:
  1. Choose the right operator for the job - don’t force Python for everything
  2. Use provider packages instead of reinventing the wheel
  3. Make operators idempotent - safe to retry
  4. Implement proper error handling and logging
  5. Create custom operators for common patterns in your organization

Next Steps

Module 4: Scheduling - Cron, Catchup, and Trigger Rules

Master cron expressions, execution_date, backfilling, and SLAs