Airflow Operators: The Building Blocks
Module Level: Core Foundation
Prerequisites: Module 2 (Core Concepts)
Duration: 3-4 hours
Key Concepts: Built-in operators, custom operators, operator patterns, best practices
What are Operators?
Operators are the fundamental building blocks of Airflow tasks. They define what a task does - whether it’s running a Python function, executing a bash command, transferring data, or waiting for a condition.Copy
"""
Operator Hierarchy:
BaseOperator (abstract)
├── BashOperator
├── PythonOperator
├── EmailOperator
├── SQLOperator
│ ├── PostgresOperator
│ ├── MySqlOperator
│ └── SnowflakeOperator
├── TransferOperator
│ ├── S3ToRedshiftOperator
│ └── PostgresToGcsOperator
└── Sensor (special operator type)
├── FileSensor
├── S3KeySensor
└── HttpSensor
"""
Operator vs Task
Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG('operator_vs_task', start_date=datetime(2024, 1, 1), schedule=None) as dag:
# PythonOperator is the OPERATOR CLASS
# my_task is the TASK INSTANCE (instantiated operator)
my_task = PythonOperator(
task_id='my_task',
python_callable=lambda: print('Hello')
)
# The operator defines behavior
# The task is the execution unit in the DAG
Core Operators
BashOperator: Execute Shell Commands
Copy
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG('bash_examples', start_date=datetime(2024, 1, 1), schedule=None) as dag:
# ========== SIMPLE COMMAND ==========
simple_bash = BashOperator(
task_id='simple',
bash_command='echo "Hello from Bash"'
)
# ========== MULTI-LINE SCRIPT ==========
multi_line = BashOperator(
task_id='multi_line',
bash_command="""
set -e # Exit on error
echo "Step 1: Creating directory"
mkdir -p /tmp/airflow_test
echo "Step 2: Writing file"
echo "Data for {{ ds }}" > /tmp/airflow_test/data_{{ ds }}.txt
echo "Step 3: Listing files"
ls -la /tmp/airflow_test
echo "Completed successfully"
"""
)
# ========== WITH ENVIRONMENT VARIABLES ==========
with_env = BashOperator(
task_id='with_env',
bash_command='echo "Execution date: $EXEC_DATE, DAG: $DAG_ID"',
env={
'EXEC_DATE': '{{ ds }}',
'DAG_ID': '{{ dag.dag_id }}',
'CUSTOM_VAR': 'my_value'
}
)
# ========== RUNNING EXTERNAL SCRIPT ==========
run_script = BashOperator(
task_id='run_script',
bash_command='/opt/scripts/process_data.sh {{ ds }} {{ execution_date.hour }}',
cwd='/opt/scripts' # Working directory
)
# ========== WITH RETURN VALUE ==========
# BashOperator returns last line of stdout
get_value = BashOperator(
task_id='get_value',
bash_command='echo "result: 42"',
do_xcom_push=True # Push to XCom
)
use_value = BashOperator(
task_id='use_value',
bash_command='echo "Previous result: {{ ti.xcom_pull(task_ids=\"get_value\") }}"'
)
# ========== ERROR HANDLING ==========
with_retry = BashOperator(
task_id='with_retry',
bash_command="""
# Script that might fail
if [ $(( RANDOM % 2 )) -eq 0 ]; then
echo "Success!"
exit 0
else
echo "Failed!"
exit 1
fi
""",
retries=3,
retry_delay=timedelta(seconds=30)
)
# ========== CONDITIONAL EXECUTION ==========
conditional = BashOperator(
task_id='conditional',
bash_command="""
if [ -f /tmp/data_ready.flag ]; then
echo "Data is ready, processing..."
python /scripts/process.py
else
echo "Data not ready, skipping..."
exit 0
fi
"""
)
get_value >> use_value
# ========== REAL-WORLD EXAMPLE: ETL WITH BASH ==========
with DAG('etl_bash', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
extract = BashOperator(
task_id='extract_data',
bash_command="""
# Extract data from PostgreSQL
psql -h $DB_HOST -U $DB_USER -d $DB_NAME -c \
"COPY (SELECT * FROM orders WHERE date = '{{ ds }}') \
TO '/tmp/orders_{{ ds }}.csv' WITH CSV HEADER"
""",
env={
'DB_HOST': 'prod-db.company.com',
'DB_USER': 'airflow',
'DB_NAME': 'production',
'PGPASSWORD': '{{ var.value.db_password }}'
}
)
compress = BashOperator(
task_id='compress_data',
bash_command='gzip /tmp/orders_{{ ds }}.csv'
)
upload_s3 = BashOperator(
task_id='upload_to_s3',
bash_command="""
aws s3 cp /tmp/orders_{{ ds }}.csv.gz \
s3://data-lake/raw/orders/{{ ds }}/orders.csv.gz
""",
env={'AWS_PROFILE': 'production'}
)
cleanup = BashOperator(
task_id='cleanup',
bash_command='rm -f /tmp/orders_{{ ds }}.csv.gz',
trigger_rule='all_done' # Run even if upstream fails
)
extract >> compress >> upload_s3 >> cleanup
PythonOperator: Execute Python Functions
Copy
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import json
with DAG('python_examples', start_date=datetime(2024, 1, 1), schedule=None) as dag:
# ========== SIMPLE FUNCTION ==========
def simple_function():
print("Hello from Python!")
return "Task completed"
simple_task = PythonOperator(
task_id='simple',
python_callable=simple_function
)
# ========== FUNCTION WITH ARGUMENTS ==========
def process_data(input_file, output_format, multiplier=1):
print(f"Processing {input_file}")
print(f"Output format: {output_format}")
print(f"Multiplier: {multiplier}")
return {"status": "success", "records": 100}
with_args = PythonOperator(
task_id='with_args',
python_callable=process_data,
op_kwargs={
'input_file': '/data/input.csv',
'output_format': 'parquet',
'multiplier': 2
}
)
# ========== FUNCTION WITH CONTEXT ==========
def use_context(**context):
"""Access Airflow context variables"""
execution_date = context['execution_date']
dag_id = context['dag'].dag_id
task_id = context['task'].task_id
ti = context['ti'] # Task instance
print(f"Running {dag_id}.{task_id}")
print(f"Execution date: {execution_date}")
# Pull XCom from previous task
previous_result = ti.xcom_pull(task_ids='previous_task')
print(f"Previous result: {previous_result}")
# Push to XCom
ti.xcom_push(key='my_key', value={'data': [1, 2, 3]})
return "Done"
context_task = PythonOperator(
task_id='with_context',
python_callable=use_context
)
# ========== FUNCTION WITH MIXED ARGS AND CONTEXT ==========
def mixed_function(param1, param2, **context):
execution_date = context['ds']
print(f"Params: {param1}, {param2}")
print(f"Date: {execution_date}")
mixed_task = PythonOperator(
task_id='mixed',
python_callable=mixed_function,
op_kwargs={'param1': 'value1', 'param2': 'value2'}
)
# ========== LAMBDA FUNCTION ==========
lambda_task = PythonOperator(
task_id='lambda',
python_callable=lambda: print("Quick task")
)
# ========== RETURN VALUE FOR XCOM ==========
def return_data():
return {
'records': 1000,
'timestamp': datetime.now().isoformat(),
'status': 'completed'
}
return_task = PythonOperator(
task_id='return_data',
python_callable=return_data,
do_xcom_push=True # Default is True
)
# ========== REAL-WORLD EXAMPLE: DATA PROCESSING ==========
with DAG('data_processing_python', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
def extract_from_api(**context):
"""Extract data from REST API"""
import requests
execution_date = context['ds']
response = requests.get(
'https://api.example.com/data',
params={
'date': execution_date,
'api_key': context['var']['value']['api_key']
},
timeout=30
)
response.raise_for_status()
data = response.json()
print(f"Extracted {len(data['records'])} records")
# Save to file
with open(f'/tmp/raw_data_{execution_date}.json', 'w') as f:
json.dump(data, f)
return len(data['records'])
def transform_data(**context):
"""Transform and clean data"""
import pandas as pd
execution_date = context['ds']
record_count = context['ti'].xcom_pull(task_ids='extract')
# Read raw data
with open(f'/tmp/raw_data_{execution_date}.json', 'r') as f:
data = json.load(f)
# Transform with pandas
df = pd.DataFrame(data['records'])
# Data cleaning
df = df.dropna(subset=['id', 'amount'])
df['amount'] = df['amount'].astype(float)
df['processed_at'] = datetime.now()
# Save processed data
df.to_parquet(f'/tmp/processed_data_{execution_date}.parquet')
print(f"Transformed {len(df)} records (from {record_count})")
return len(df)
def validate_data(**context):
"""Data quality checks"""
import pandas as pd
execution_date = context['ds']
df = pd.read_parquet(f'/tmp/processed_data_{execution_date}.parquet')
# Validation rules
assert len(df) > 0, "No data to process"
assert df['amount'].min() >= 0, "Negative amounts found"
assert df['id'].is_unique, "Duplicate IDs found"
print(f"Validation passed: {len(df)} records")
return True
def load_to_database(**context):
"""Load to PostgreSQL"""
import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook
execution_date = context['ds']
df = pd.read_parquet(f'/tmp/processed_data_{execution_date}.parquet')
hook = PostgresHook(postgres_conn_id='warehouse')
engine = hook.get_sqlalchemy_engine()
df.to_sql(
'processed_data',
engine,
if_exists='append',
index=False,
method='multi',
chunksize=1000
)
print(f"Loaded {len(df)} records to database")
extract = PythonOperator(task_id='extract', python_callable=extract_from_api)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
validate = PythonOperator(task_id='validate', python_callable=validate_data)
load = PythonOperator(task_id='load', python_callable=load_to_database)
extract >> transform >> validate >> load
EmailOperator: Send Notifications
Copy
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG('email_examples', start_date=datetime(2024, 1, 1), schedule=None) as dag:
# ========== SIMPLE EMAIL ==========
simple_email = EmailOperator(
task_id='simple_email',
to='[email protected]',
subject='Airflow Notification',
html_content='<p>Task completed successfully!</p>'
)
# ========== EMAIL WITH TEMPLATES ==========
templated_email = EmailOperator(
task_id='templated_email',
to=['[email protected]', '[email protected]'],
cc=['[email protected]'],
subject='Daily Report for {{ ds }}',
html_content="""
<h2>Daily ETL Report</h2>
<p><strong>Execution Date:</strong> {{ ds }}</p>
<p><strong>DAG:</strong> {{ dag.dag_id }}</p>
<p><strong>Status:</strong> Completed</p>
<h3>Summary</h3>
<ul>
<li>Records processed: {{ ti.xcom_pull(task_ids='process_data') }}</li>
<li>Duration: {{ ti.duration }}s</li>
</ul>
"""
)
# ========== EMAIL WITH ATTACHMENTS ==========
email_with_attachment = EmailOperator(
task_id='with_attachment',
to='[email protected]',
subject='Daily Report with CSV',
html_content='Please find attached the daily report.',
files=['/tmp/report_{{ ds }}.csv']
)
# ========== CONDITIONAL EMAIL ON FAILURE ==========
def task_that_might_fail():
import random
if random.random() < 0.5:
raise Exception("Random failure!")
risky_task = PythonOperator(
task_id='risky_task',
python_callable=task_that_might_fail,
email_on_failure=True,
email=['[email protected]']
)
# ========== EMAIL IN CALLBACK ==========
def send_custom_email(context):
"""Custom email on failure"""
task_instance = context['task_instance']
exception = context.get('exception')
EmailOperator(
task_id='failure_email',
to='[email protected]',
subject=f'ALERT: Task Failed - {task_instance.task_id}',
html_content=f"""
<h2>Task Failure Alert</h2>
<p><strong>Task:</strong> {task_instance.task_id}</p>
<p><strong>DAG:</strong> {task_instance.dag_id}</p>
<p><strong>Execution Date:</strong> {task_instance.execution_date}</p>
<p><strong>Error:</strong> {exception}</p>
<p><strong>Log:</strong> <a href="{task_instance.log_url}">View Logs</a></p>
"""
).execute(context)
task_with_callback = PythonOperator(
task_id='task_with_callback',
python_callable=task_that_might_fail,
on_failure_callback=send_custom_email
)
SQL Operators: Database Operations
Copy
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
# ========== PostgreSQL ==========
with DAG('postgres_examples', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='my_postgres',
sql="""
CREATE TABLE IF NOT EXISTS daily_sales (
id SERIAL PRIMARY KEY,
date DATE NOT NULL,
product VARCHAR(100),
amount DECIMAL(10, 2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
)
insert_data = PostgresOperator(
task_id='insert_data',
postgres_conn_id='my_postgres',
sql="""
INSERT INTO daily_sales (date, product, amount)
SELECT
'{{ ds }}'::date,
product_name,
SUM(sale_amount)
FROM raw_sales
WHERE sale_date = '{{ ds }}'
GROUP BY product_name;
"""
)
# SQL from file
run_sql_file = PostgresOperator(
task_id='run_sql_file',
postgres_conn_id='my_postgres',
sql='/opt/airflow/sql/daily_aggregation.sql',
params={'execution_date': '{{ ds }}'} # Pass parameters to SQL
)
create_table >> insert_data >> run_sql_file
# ========== DYNAMIC SQL GENERATION ==========
with DAG('dynamic_sql', start_date=datetime(2024, 1, 1), schedule=None) as dag:
def generate_insert_queries(**context):
"""Generate SQL based on data"""
tables = ['customers', 'orders', 'products']
for table in tables:
PostgresOperator(
task_id=f'truncate_{table}',
postgres_conn_id='my_postgres',
sql=f'TRUNCATE TABLE staging.{table};'
).execute(context)
PostgresOperator(
task_id=f'load_{table}',
postgres_conn_id='my_postgres',
sql=f"""
INSERT INTO staging.{table}
SELECT * FROM source.{table}
WHERE updated_at >= '{{{{ ds }}}}'::date;
"""
).execute(context)
dynamic_sql = PythonOperator(
task_id='dynamic_sql',
python_callable=generate_insert_queries
)
# ========== SQL CHECK OPERATORS ==========
from airflow.providers.common.sql.operators.sql import (
SQLCheckOperator,
SQLValueCheckOperator,
SQLThresholdCheckOperator
)
with DAG('sql_checks', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
# Check that query returns True
check_data_exists = SQLCheckOperator(
task_id='check_data_exists',
conn_id='my_postgres',
sql="""
SELECT COUNT(*) > 0
FROM orders
WHERE order_date = '{{ ds }}'
"""
)
# Check specific value
check_record_count = SQLValueCheckOperator(
task_id='check_record_count',
conn_id='my_postgres',
sql="SELECT COUNT(*) FROM orders WHERE order_date = '{{ ds }}'",
pass_value=1000, # Expected value
tolerance=0.1 # 10% tolerance
)
# Check threshold
check_revenue = SQLThresholdCheckOperator(
task_id='check_revenue',
conn_id='my_postgres',
sql="SELECT SUM(amount) FROM orders WHERE order_date = '{{ ds }}'",
min_threshold=10000, # Minimum expected
max_threshold=1000000 # Maximum expected
)
[check_data_exists, check_record_count, check_revenue]
Transfer Operators: Moving Data
Copy
from airflow import DAG
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.postgres.transfers.postgres_to_gcs import PostgresToGCSOperator
from datetime import datetime
with DAG('transfer_examples', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
# ========== S3 to Redshift ==========
s3_to_redshift = S3ToRedshiftOperator(
task_id='s3_to_redshift',
s3_bucket='data-lake',
s3_key='processed/sales/{{ ds }}/sales.parquet',
redshift_conn_id='redshift',
schema='analytics',
table='sales',
copy_options=['PARQUET', 'COMPUPDATE OFF'],
method='REPLACE' # or 'APPEND', 'UPSERT'
)
# ========== GCS to BigQuery ==========
gcs_to_bigquery = GCSToBigQueryOperator(
task_id='gcs_to_bigquery',
bucket='data-bucket',
source_objects=['data/{{ ds }}/*.parquet'],
destination_project_dataset_table='project.dataset.table',
source_format='PARQUET',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED',
autodetect=True
)
# ========== Postgres to GCS ==========
postgres_to_gcs = PostgresToGCSOperator(
task_id='postgres_to_gcs',
postgres_conn_id='my_postgres',
sql='SELECT * FROM orders WHERE date = {{ ds }}',
bucket='data-bucket',
filename='exports/orders/{{ ds }}/orders.json',
export_format='json',
gzip=True
)
Custom Operators
Create reusable operators for common patterns.Creating a Custom Operator
Copy
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import Any, Dict
import requests
class SlackNotificationOperator(BaseOperator):
"""
Custom operator to send Slack notifications
:param slack_webhook_url: Slack webhook URL
:param message: Message to send
:param channel: Slack channel (optional)
:param username: Bot username (optional)
"""
# Define UI color
ui_color = '#1F77B4'
ui_fgcolor = '#FFFFFF'
# Define template fields (can use Jinja templates)
template_fields = ('message', 'channel')
@apply_defaults
def __init__(
self,
slack_webhook_url: str,
message: str,
channel: str = None,
username: str = 'Airflow Bot',
icon_emoji: str = ':robot_face:',
**kwargs
) -> None:
super().__init__(**kwargs)
self.slack_webhook_url = slack_webhook_url
self.message = message
self.channel = channel
self.username = username
self.icon_emoji = icon_emoji
def execute(self, context: Dict[str, Any]) -> None:
"""
Main execution method
Called by Airflow when task runs
"""
payload = {
'text': self.message,
'username': self.username,
'icon_emoji': self.icon_emoji
}
if self.channel:
payload['channel'] = self.channel
# Log execution
self.log.info(f"Sending Slack notification to {self.channel or 'default channel'}")
self.log.info(f"Message: {self.message}")
# Send request
response = requests.post(
self.slack_webhook_url,
json=payload,
timeout=10
)
response.raise_for_status()
self.log.info("Slack notification sent successfully")
# Usage
with DAG('custom_operator', start_date=datetime(2024, 1, 1), schedule=None) as dag:
notify = SlackNotificationOperator(
task_id='notify_slack',
slack_webhook_url='https://hooks.slack.com/services/YOUR/WEBHOOK/URL',
message='DAG {{ dag.dag_id }} completed for {{ ds }}',
channel='#data-alerts'
)
Advanced Custom Operator with Hook
Copy
from airflow.models import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import pandas as pd
from typing import List
class PostgresToS3Operator(BaseOperator):
"""
Transfer data from Postgres to S3 as Parquet
:param postgres_conn_id: Postgres connection ID
:param sql: SQL query to extract data
:param s3_bucket: Target S3 bucket
:param s3_key: Target S3 key (supports templating)
:param chunk_size: Number of rows per chunk
"""
template_fields = ('sql', 's3_key')
template_ext = ('.sql',)
ui_color = '#ededed'
def __init__(
self,
*,
postgres_conn_id: str,
sql: str,
s3_bucket: str,
s3_key: str,
s3_conn_id: str = 'aws_default',
chunk_size: int = 10000,
**kwargs
) -> None:
super().__init__(**kwargs)
self.postgres_conn_id = postgres_conn_id
self.sql = sql
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.s3_conn_id = s3_conn_id
self.chunk_size = chunk_size
def execute(self, context: dict) -> str:
"""Extract from Postgres and load to S3"""
self.log.info(f"Extracting data from Postgres using query: {self.sql}")
# Use Postgres Hook
postgres_hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
df = postgres_hook.get_pandas_df(sql=self.sql)
self.log.info(f"Extracted {len(df)} rows")
# Convert to Parquet
import io
buffer = io.BytesIO()
df.to_parquet(buffer, index=False, compression='snappy')
buffer.seek(0)
# Upload to S3
s3_hook = S3Hook(aws_conn_id=self.s3_conn_id)
s3_hook.load_bytes(
bytes_data=buffer.getvalue(),
key=self.s3_key,
bucket_name=self.s3_bucket,
replace=True
)
s3_path = f"s3://{self.s3_bucket}/{self.s3_key}"
self.log.info(f"Uploaded data to {s3_path}")
return s3_path
# Usage
with DAG('postgres_to_s3', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
export_sales = PostgresToS3Operator(
task_id='export_sales',
postgres_conn_id='prod_db',
sql="""
SELECT *
FROM sales
WHERE sale_date = '{{ ds }}'
""",
s3_bucket='data-lake',
s3_key='exports/sales/{{ ds }}/sales.parquet'
)
Provider Packages
Airflow providers offer hundreds of pre-built operators for popular services.Installing Providers
Copy
# Install specific provider
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-google
pip install apache-airflow-providers-snowflake
pip install apache-airflow-providers-apache-spark
# Install multiple providers
pip install apache-airflow-providers-amazon \
apache-airflow-providers-google \
apache-airflow-providers-postgres
AWS Providers
Copy
from airflow import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.providers.amazon.aws.operators.emr import (
EmrCreateJobFlowOperator,
EmrAddStepsOperator,
EmrTerminateJobFlowOperator
)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime
with DAG('aws_example', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
# Wait for file in S3
wait_for_file = S3KeySensor(
task_id='wait_for_file',
bucket_name='input-bucket',
bucket_key='data/{{ ds }}/input.csv',
aws_conn_id='aws_default',
timeout=3600, # 1 hour
poke_interval=60 # Check every minute
)
# Create EMR cluster
create_emr = EmrCreateJobFlowOperator(
task_id='create_emr',
job_flow_overrides={
'Name': 'airflow-emr-{{ ds }}',
'ReleaseLabel': 'emr-6.9.0',
'Applications': [{'Name': 'Spark'}],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': 'Worker',
'Market': 'SPOT',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False
}
}
)
# Add Spark job
add_steps = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr', key='return_value') }}",
steps=[
{
'Name': 'Process Data',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
's3://scripts/process_data.py',
'--input', 's3://input-bucket/data/{{ ds }}/',
'--output', 's3://output-bucket/processed/{{ ds }}/'
]
}
}
]
)
# Terminate cluster
terminate_emr = EmrTerminateJobFlowOperator(
task_id='terminate_emr',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr', key='return_value') }}"
)
wait_for_file >> create_emr >> add_steps >> terminate_emr
Google Cloud Providers
Copy
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyTableOperator,
BigQueryInsertJobOperator
)
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocSubmitJobOperator,
DataprocDeleteClusterOperator
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime
with DAG('gcp_example', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
PROJECT_ID = 'my-project'
REGION = 'us-central1'
CLUSTER_NAME = 'airflow-cluster-{{ ds_nodash }}'
# Create Dataproc cluster
create_cluster = DataprocCreateClusterOperator(
task_id='create_cluster',
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
cluster_config={
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n1-standard-2'
},
'worker_config': {
'num_instances': 2,
'machine_type_uri': 'n1-standard-2'
}
}
)
# Submit PySpark job
pyspark_job = DataprocSubmitJobOperator(
task_id='pyspark_job',
project_id=PROJECT_ID,
region=REGION,
job={
'reference': {'project_id': PROJECT_ID},
'placement': {'cluster_name': CLUSTER_NAME},
'pyspark_job': {
'main_python_file_uri': 'gs://bucket/scripts/process.py',
'args': ['--date', '{{ ds }}']
}
}
)
# Load to BigQuery
load_bigquery = GCSToBigQueryOperator(
task_id='load_bigquery',
bucket='data-bucket',
source_objects=['processed/{{ ds }}/*.parquet'],
destination_project_dataset_table=f'{PROJECT_ID}.analytics.sales',
source_format='PARQUET',
write_disposition='WRITE_TRUNCATE'
)
# Delete cluster
delete_cluster = DataprocDeleteClusterOperator(
task_id='delete_cluster',
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
trigger_rule='all_done'
)
create_cluster >> pyspark_job >> load_bigquery >> delete_cluster
Snowflake Provider
Copy
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator
from datetime import datetime
with DAG('snowflake_example', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
# Create table
create_table = SnowflakeOperator(
task_id='create_table',
snowflake_conn_id='snowflake_default',
sql="""
CREATE TABLE IF NOT EXISTS SALES (
sale_id NUMBER,
sale_date DATE,
amount NUMBER(10,2),
product VARCHAR(100)
);
"""
)
# Load from S3
load_data = S3ToSnowflakeOperator(
task_id='load_data',
snowflake_conn_id='snowflake_default',
s3_keys=['s3://bucket/data/{{ ds }}/sales.csv'],
table='SALES',
stage='MY_STAGE',
file_format='(TYPE=CSV, SKIP_HEADER=1)'
)
# Run analytics query
analyze = SnowflakeOperator(
task_id='analyze',
snowflake_conn_id='snowflake_default',
sql="""
INSERT INTO DAILY_SUMMARY
SELECT
sale_date,
COUNT(*) as num_sales,
SUM(amount) as total_revenue,
AVG(amount) as avg_sale
FROM SALES
WHERE sale_date = '{{ ds }}'
GROUP BY sale_date;
"""
)
create_table >> load_data >> analyze
Operator Best Practices
1. Idempotency
Copy
# BAD: Not idempotent - running twice creates duplicates
bad_insert = PostgresOperator(
task_id='bad_insert',
sql="INSERT INTO sales SELECT * FROM staging_sales WHERE date = '{{ ds }}'"
)
# GOOD: Idempotent - can run multiple times safely
good_insert = PostgresOperator(
task_id='good_insert',
sql="""
DELETE FROM sales WHERE date = '{{ ds }}';
INSERT INTO sales SELECT * FROM staging_sales WHERE date = '{{ ds }}';
"""
)
# BETTER: Upsert pattern
upsert = PostgresOperator(
task_id='upsert',
sql="""
INSERT INTO sales (id, date, amount)
SELECT id, date, amount FROM staging_sales WHERE date = '{{ ds }}'
ON CONFLICT (id) DO UPDATE SET
amount = EXCLUDED.amount,
updated_at = CURRENT_TIMESTAMP;
"""
)
2. Resource Management
Copy
# Use pools to limit concurrent resource usage
heavy_task = PythonOperator(
task_id='heavy_task',
python_callable=process_large_file,
pool='database_connections', # Create in UI: Admin → Pools
pool_slots=2 # Uses 2 of the pool's slots
)
3. Error Handling
Copy
def robust_function(**context):
"""Function with proper error handling"""
try:
# Main logic
result = process_data()
# Validate result
if result is None:
raise ValueError("Process returned no data")
return result
except Exception as e:
# Log error details
context['task_instance'].log.error(f"Error processing data: {str(e)}")
# Optionally send alert
send_alert(f"Task failed: {str(e)}")
# Re-raise to mark task as failed
raise
robust_task = PythonOperator(
task_id='robust',
python_callable=robust_function,
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True
)
4. Testing Operators
Copy
# test_operators.py
import pytest
from airflow.models import DagBag
from datetime import datetime
def test_dag_loaded():
"""Test that DAG loads without errors"""
dagbag = DagBag(dag_folder='dags/', include_examples=False)
assert len(dagbag.import_errors) == 0, "DAG import errors"
def test_task_count():
"""Test expected number of tasks"""
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_pipeline')
assert len(dag.tasks) == 5
def test_task_dependencies():
"""Test task dependency structure"""
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_pipeline')
extract = dag.get_task('extract')
transform = dag.get_task('transform')
assert transform in extract.downstream_list
Summary
You now understand:
- Core Operators: BashOperator, PythonOperator, EmailOperator, SQL operators
- Transfer Operators: Moving data between systems
- Custom Operators: Creating reusable operator classes
- Provider Packages: AWS, GCP, Snowflake, and 100+ integrations
- Best Practices: Idempotency, resource management, error handling
Key Takeaways:
- Choose the right operator for the job - don’t force Python for everything
- Use provider packages instead of reinventing the wheel
- Make operators idempotent - safe to retry
- Implement proper error handling and logging
- Create custom operators for common patterns in your organization
Next Steps
Module 4: Scheduling - Cron, Catchup, and Trigger Rules
Master cron expressions, execution_date, backfilling, and SLAs