Module Overview
Estimated Time: 4-5 hours | Difficulty: Intermediate-Advanced | Prerequisites: Compute, Networking
- SQS queues and message processing patterns
- SNS pub/sub and fan-out architectures
- EventBridge for event-driven applications
- Step Functions for workflow orchestration
- API Gateway advanced patterns
- Choosing the right integration pattern
Integration Patterns Overview
Copy
┌────────────────────────────────────────────────────────────────────────┐
│ AWS Integration Services │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ SQS (Simple Queue Service) │
│ ───────────────────────────── │
│ • Point-to-point messaging │
│ • Decouple producers from consumers │
│ • Buffer for traffic spikes │
│ • Guaranteed delivery │
│ │
│ SNS (Simple Notification Service) │
│ ─────────────────────────────────── │
│ • Pub/sub messaging │
│ • Fan-out to multiple subscribers │
│ • Push-based delivery │
│ • Mobile push, email, SMS, HTTP │
│ │
│ EventBridge │
│ ───────────── │
│ • Event bus for event-driven apps │
│ • Content-based filtering │
│ • 200+ AWS service integrations │
│ • Schema registry │
│ │
│ Step Functions │
│ ───────────── │
│ • Workflow orchestration │
│ • Visual workflow designer │
│ • Error handling & retries │
│ • Long-running processes │
│ │
│ API Gateway │
│ ───────────── │
│ • REST & WebSocket APIs │
│ • Request transformation │
│ • Authorization & throttling │
│ • Direct service integrations │
│ │
└────────────────────────────────────────────────────────────────────────┘
SQS Deep Dive
Queue Types
Copy
┌────────────────────────────────────────────────────────────────────────┐
│ SQS Queue Types │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ STANDARD QUEUE │ FIFO QUEUE │
│ ────────────────── │ ────────────────── │
│ • Unlimited throughput │ • 3,000 msg/sec (batching) │
│ • At-least-once delivery │ • Exactly-once processing │
│ • Best-effort ordering │ • Strict ordering │
│ • Lower cost │ • Higher cost (~20% more) │
│ │ • Deduplication │
│ │
│ Use Standard for: │ Use FIFO for: │
│ • High throughput needs │ • Order-sensitive workflows │
│ • Duplicate handling OK │ • Financial transactions │
│ • General decoupling │ • Sequential processing │
│ │
│ MESSAGE LIFECYCLE: │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Producer ──▶ Queue ──▶ Consumer ──▶ Delete │ │
│ │ │ │ │ │ │
│ │ │ │ ▼ │ │
│ │ │ │ Visibility Timeout │ │
│ │ │ │ (Message invisible │ │
│ │ │ │ while processing) │ │
│ │ │ │ │ │ │
│ │ │ │ ▼ │ │
│ │ │ │ Not deleted in time? │ │
│ │ │ │ Message reappears │ │
│ │ │ │ │ │ │
│ │ │ ▼ ▼ │ │
│ │ │ Max receives exceeded → Dead Letter Queue │ │
│ │ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘
SQS Implementation
Copy
import boto3
import json
from typing import List, Optional
sqs = boto3.client('sqs')
class SQSProcessor:
def __init__(self, queue_url: str, dlq_url: Optional[str] = None):
self.queue_url = queue_url
self.dlq_url = dlq_url
def send_message(self, body: dict, delay_seconds: int = 0,
group_id: Optional[str] = None) -> str:
"""
Send message to SQS queue.
For FIFO queues, group_id is required for ordering.
"""
params = {
'QueueUrl': self.queue_url,
'MessageBody': json.dumps(body),
'DelaySeconds': delay_seconds
}
# FIFO queue parameters
if group_id:
params['MessageGroupId'] = group_id
# Deduplication - use content-based or explicit ID
params['MessageDeduplicationId'] = str(hash(json.dumps(body)))
response = sqs.send_message(**params)
return response['MessageId']
def send_batch(self, messages: List[dict]) -> dict:
"""
Send up to 10 messages in batch for efficiency.
"""
entries = [
{
'Id': str(i),
'MessageBody': json.dumps(msg['body']),
'DelaySeconds': msg.get('delay', 0)
}
for i, msg in enumerate(messages)
]
response = sqs.send_message_batch(
QueueUrl=self.queue_url,
Entries=entries
)
return {
'successful': len(response.get('Successful', [])),
'failed': response.get('Failed', [])
}
def receive_messages(self, max_messages: int = 10,
wait_time: int = 20,
visibility_timeout: int = 30) -> List[dict]:
"""
Long polling for efficient message retrieval.
WaitTimeSeconds enables long polling (reduces API calls).
"""
response = sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=min(max_messages, 10), # Max is 10
WaitTimeSeconds=wait_time, # Long polling
VisibilityTimeout=visibility_timeout,
AttributeNames=['All'],
MessageAttributeNames=['All']
)
return response.get('Messages', [])
def process_message(self, message: dict) -> bool:
"""Process a single message. Return True if successful."""
try:
body = json.loads(message['Body'])
# Your processing logic here
print(f"Processing: {body}")
# Delete message after successful processing
sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
return True
except Exception as e:
print(f"Error processing message: {e}")
# Message will return to queue after visibility timeout
return False
def extend_visibility(self, receipt_handle: str,
additional_seconds: int = 30):
"""
Extend visibility timeout for long-running processing.
Call this periodically for long tasks.
"""
sqs.change_message_visibility(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=additional_seconds
)
def process_loop(self):
"""Continuous processing loop."""
while True:
messages = self.receive_messages()
for message in messages:
self.process_message(message)
# Lambda handler for SQS trigger
def lambda_handler(event, context):
"""
Lambda processes SQS messages in batches.
Failed messages are retried or sent to DLQ.
"""
batch_item_failures = []
for record in event['Records']:
try:
body = json.loads(record['body'])
# Process message
process_order(body)
except Exception as e:
print(f"Error: {e}")
# Report partial batch failure
batch_item_failures.append({
'itemIdentifier': record['messageId']
})
return {
'batchItemFailures': batch_item_failures
}
SNS Deep Dive
Fan-Out Pattern
Copy
┌────────────────────────────────────────────────────────────────────────┐
│ SNS Fan-Out Architecture │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ Order Service │
│ │ │
│ │ Publish │
│ ▼ │
│ ┌───────────────┐ │
│ │ SNS Topic │ │
│ │ order-events │ │
│ └───────────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ SQS Queue │ │ SQS Queue │ │ Lambda │ │
│ │ Inventory │ │ Shipping │ │ Analytics │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Update inventory Ship order Stream to │
│ data lake │
│ │
│ SUBSCRIBER TYPES: │
│ • SQS queues (most common) │
│ • Lambda functions │
│ • HTTP/HTTPS endpoints │
│ • Email addresses │
│ • SMS (text messages) │
│ • Mobile push (iOS, Android) │
│ • Kinesis Data Firehose │
│ │
│ FILTERING: │
│ Subscribers can filter by message attributes: │
│ { │
│ "order_type": ["premium"], │
│ "amount": [{"numeric": [">=", 1000]}] │
│ } │
│ │
└────────────────────────────────────────────────────────────────────────┘
SNS Implementation
Copy
import boto3
import json
sns = boto3.client('sns')
class SNSPublisher:
def __init__(self, topic_arn: str):
self.topic_arn = topic_arn
def publish(self, message: dict,
subject: str = None,
attributes: dict = None) -> str:
"""
Publish message to SNS topic.
Attributes enable message filtering.
"""
params = {
'TopicArn': self.topic_arn,
'Message': json.dumps(message)
}
if subject:
params['Subject'] = subject
# Message attributes for filtering
if attributes:
params['MessageAttributes'] = {
key: {
'DataType': 'String',
'StringValue': str(value)
}
for key, value in attributes.items()
}
response = sns.publish(**params)
return response['MessageId']
def publish_batch(self, messages: list) -> dict:
"""Publish up to 10 messages in batch."""
entries = [
{
'Id': str(i),
'Message': json.dumps(msg['body']),
'MessageAttributes': {
key: {'DataType': 'String', 'StringValue': str(value)}
for key, value in msg.get('attributes', {}).items()
}
}
for i, msg in enumerate(messages)
]
response = sns.publish_batch(
TopicArn=self.topic_arn,
PublishBatchRequestEntries=entries
)
return response
# Terraform: SNS with SQS subscription and filtering
sns_sqs_fanout = """
resource "aws_sns_topic" "orders" {
name = "order-events"
}
resource "aws_sqs_queue" "inventory" {
name = "inventory-updates"
}
resource "aws_sqs_queue" "shipping" {
name = "shipping-orders"
}
# Subscription with filter
resource "aws_sns_topic_subscription" "inventory_sub" {
topic_arn = aws_sns_topic.orders.arn
protocol = "sqs"
endpoint = aws_sqs_queue.inventory.arn
filter_policy = jsonencode({
event_type = ["order_created", "order_updated"]
})
}
# High-value orders only
resource "aws_sns_topic_subscription" "premium_sub" {
topic_arn = aws_sns_topic.orders.arn
protocol = "sqs"
endpoint = aws_sqs_queue.shipping.arn
filter_policy = jsonencode({
order_value = [{"numeric": [">=", 1000]}]
shipping_type = ["express", "priority"]
})
filter_policy_scope = "MessageAttributes"
}
# SQS policy to allow SNS to send messages
resource "aws_sqs_queue_policy" "inventory_policy" {
queue_url = aws_sqs_queue.inventory.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "sns.amazonaws.com" }
Action = "sqs:SendMessage"
Resource = aws_sqs_queue.inventory.arn
Condition = {
ArnEquals = {
"aws:SourceArn" = aws_sns_topic.orders.arn
}
}
}]
})
}
"""
EventBridge Deep Dive
Event-Driven Architecture
Copy
┌────────────────────────────────────────────────────────────────────────┐
│ EventBridge Architecture │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ EVENT SOURCES EVENT TARGETS │
│ ───────────── ───────────── │
│ • AWS Services (100+) • Lambda │
│ • Custom applications • Step Functions │
│ • SaaS integrations • SQS/SNS │
│ • Scheduled (cron) • API Gateway │
│ • ECS tasks │
│ • Kinesis │
│ │
│ ARCHITECTURE: │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────┐ │ │
│ │ │ EC2 Events │───┐ │ │
│ │ └─────────────┘ │ │ │
│ │ │ │ │
│ │ ┌─────────────┐ │ ┌─────────────┐ │ │
│ │ │ S3 Events │───┼─────▶│ Event Bus │ │ │
│ │ └─────────────┘ │ │ (default) │ │ │
│ │ │ └──────┬──────┘ │ │
│ │ ┌─────────────┐ │ │ │ │
│ │ │Custom Events│───┘ │ │ │
│ │ └─────────────┘ │ │ │
│ │ │ │ │
│ │ ┌─────────────┼─────────────┐ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Rule 1 │ │ Rule 2 │ │ Rule 3 │ │ │
│ │ │ Pattern │ │ Pattern │ │ Schedule │ │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ Lambda SQS Queue Step Func │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ VS SNS: │
│ • Content-based filtering (not just attributes) │
│ • Schema discovery and validation │
│ • 200+ AWS service integrations built-in │
│ • Scheduled events (cron) │
│ • Archive and replay events │
│ │
└────────────────────────────────────────────────────────────────────────┘
EventBridge Implementation
Copy
import boto3
import json
from datetime import datetime
events = boto3.client('events')
class EventBridgePublisher:
def __init__(self, event_bus_name: str = 'default'):
self.event_bus_name = event_bus_name
def put_event(self, source: str, detail_type: str,
detail: dict) -> str:
"""
Publish custom event to EventBridge.
"""
response = events.put_events(
Entries=[
{
'Source': source,
'DetailType': detail_type,
'Detail': json.dumps(detail),
'EventBusName': self.event_bus_name,
'Time': datetime.utcnow()
}
]
)
if response['FailedEntryCount'] > 0:
raise Exception(f"Failed to put event: {response['Entries']}")
return response['Entries'][0]['EventId']
def put_events_batch(self, events_list: list) -> dict:
"""Publish up to 10 events in batch."""
entries = [
{
'Source': event['source'],
'DetailType': event['detail_type'],
'Detail': json.dumps(event['detail']),
'EventBusName': self.event_bus_name,
'Time': datetime.utcnow()
}
for event in events_list
]
response = events.put_events(Entries=entries)
return {
'successful': len(entries) - response['FailedEntryCount'],
'failed': response['FailedEntryCount']
}
# Event patterns for rules
event_patterns = {
# Match specific EC2 state changes
"ec2_instance_stopped": {
"source": ["aws.ec2"],
"detail-type": ["EC2 Instance State-change Notification"],
"detail": {
"state": ["stopped", "terminated"]
}
},
# Match custom order events over $1000
"high_value_orders": {
"source": ["com.myapp.orders"],
"detail-type": ["OrderCreated"],
"detail": {
"amount": [{"numeric": [">=", 1000]}],
"status": ["confirmed"]
}
},
# Match S3 object creation in specific prefix
"s3_uploads": {
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"bucket": {"name": ["my-bucket"]},
"object": {
"key": [{"prefix": "uploads/"}]
}
}
}
}
# Terraform: EventBridge rule with Lambda target
eventbridge_rule = """
resource "aws_cloudwatch_event_rule" "order_processing" {
name = "high-value-orders"
description = "Route high-value orders to premium processing"
event_pattern = jsonencode({
source = ["com.myapp.orders"]
detail-type = ["OrderCreated"]
detail = {
amount = [{ numeric = [">=", 1000] }]
}
})
}
resource "aws_cloudwatch_event_target" "lambda" {
rule = aws_cloudwatch_event_rule.order_processing.name
target_id = "ProcessHighValueOrder"
arn = aws_lambda_function.premium_processor.arn
# Transform event before sending to target
input_transformer {
input_paths = {
orderId = "$.detail.orderId"
amount = "$.detail.amount"
}
input_template = <<EOF
{
"order_id": <orderId>,
"amount": <amount>,
"priority": "high"
}
EOF
}
}
# Scheduled rule (cron)
resource "aws_cloudwatch_event_rule" "daily_report" {
name = "daily-sales-report"
description = "Trigger daily sales report generation"
schedule_expression = "cron(0 8 * * ? *)" # 8 AM UTC daily
}
"""
Step Functions
Workflow Orchestration
Copy
┌────────────────────────────────────────────────────────────────────────┐
│ Step Functions Workflow │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ ORDER PROCESSING WORKFLOW: │
│ ────────────────────────── │
│ │
│ ┌─────────────┐ │
│ │ Start │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Validate │──── Error ────▶ ┌─────────────┐ │
│ │ Order │ │ Notify │ │
│ └──────┬──────┘ │ Failure │ │
│ │ Success └─────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ Parallel Processing │ │
│ │ ┌───────────┐ ┌───────────┐ │ │
│ │ │ Check │ │ Reserve │ │ │
│ │ │ Inventory │ │ Payment │ │ │
│ │ └───────────┘ └───────────┘ │ │
│ └──────────────────────┬──────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ Choice │
│ │Ship Express │◀──── Premium? ────▶┌─────────────┐ │
│ └──────┬──────┘ │ │Ship Standard│ │
│ │ │ └──────┬──────┘ │
│ └────────────────┼─────────────────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Wait 5s │ │
│ └──────┬──────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Notify │ │
│ │ Customer │ │
│ └──────┬──────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ End │ │
│ └─────────────┘ │
│ │
│ WORKFLOW TYPES: │
│ • Standard: Long-running, exactly-once, audit history │
│ • Express: High-volume, at-least-once, short duration (under 5min) │
│ │
└────────────────────────────────────────────────────────────────────────┘
Step Functions ASL Definition
Copy
{
"Comment": "Order Processing Workflow",
"StartAt": "ValidateOrder",
"States": {
"ValidateOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:ValidateOrder",
"Catch": [{
"ErrorEquals": ["ValidationError"],
"Next": "NotifyFailure"
}],
"Retry": [{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}],
"Next": "ParallelProcessing"
},
"ParallelProcessing": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "CheckInventory",
"States": {
"CheckInventory": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:CheckInventory",
"End": true
}
}
},
{
"StartAt": "ReservePayment",
"States": {
"ReservePayment": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:ReservePayment",
"End": true
}
}
}
],
"Next": "ChooseShipping"
},
"ChooseShipping": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.order.isPremium",
"BooleanEquals": true,
"Next": "ShipExpress"
}
],
"Default": "ShipStandard"
},
"ShipExpress": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:ShipExpress",
"Next": "WaitForProcessing"
},
"ShipStandard": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:ShipStandard",
"Next": "WaitForProcessing"
},
"WaitForProcessing": {
"Type": "Wait",
"Seconds": 5,
"Next": "NotifyCustomer"
},
"NotifyCustomer": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:123456789:order-notifications",
"Message.$": "States.Format('Order {} shipped!', $.orderId)"
},
"End": true
},
"NotifyFailure": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:123456789:order-failures",
"Message.$": "$.error"
},
"End": true
}
}
}
API Gateway Integration
Direct Service Integrations
Copy
┌────────────────────────────────────────────────────────────────────────┐
│ API Gateway Direct Integrations │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ Skip Lambda for simple operations: │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ API Gateway ──▶ SQS (SendMessage) │ │
│ │ API Gateway ──▶ SNS (Publish) │ │
│ │ API Gateway ──▶ Step Functions (StartExecution) │ │
│ │ API Gateway ──▶ DynamoDB (PutItem, GetItem, Query) │ │
│ │ API Gateway ──▶ EventBridge (PutEvents) │ │
│ │ API Gateway ──▶ Kinesis (PutRecord) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ BENEFITS: │
│ • No Lambda cold starts │
│ • Lower cost (no Lambda execution) │
│ • Lower latency │
│ • Simpler architecture │
│ │
│ LIMITATIONS: │
│ • Limited transformation logic │
│ • VTL templating required │
│ • Complex error handling │
│ │
└────────────────────────────────────────────────────────────────────────┘
API Gateway to SQS Direct Integration
Copy
# OpenAPI with API Gateway extensions
openapi: 3.0.0
info:
title: Order API
version: 1.0.0
paths:
/orders:
post:
summary: Submit order to queue
x-amazon-apigateway-integration:
type: aws
httpMethod: POST
uri: arn:aws:apigateway:us-east-1:sqs:path/123456789012/orders-queue
credentials: arn:aws:iam::123456789012:role/APIGatewaySQSRole
requestParameters:
integration.request.header.Content-Type: "'application/x-www-form-urlencoded'"
requestTemplates:
application/json: |
Action=SendMessage&MessageBody=$util.urlEncode($input.body)&MessageAttribute.1.Name=source&MessageAttribute.1.Value.StringValue=api&MessageAttribute.1.Value.DataType=String
responses:
"200":
statusCode: 202
responseTemplates:
application/json: |
{
"messageId": "$input.path('$.SendMessageResponse.SendMessageResult.MessageId')",
"status": "queued"
}
🎯 Interview Questions
Q1: SQS vs SNS vs EventBridge - when to use each?
Q1: SQS vs SNS vs EventBridge - when to use each?
SQS:
- Point-to-point messaging
- Decoupling with buffering
- One consumer per message
- Need message persistence
- Fan-out to multiple subscribers
- Push-based delivery
- Simple attribute filtering
- Mobile push, email, SMS
- Event-driven architectures
- Content-based filtering (JSON path)
- AWS service integration
- Schema registry
- Event archive/replay
Q2: How do you handle poison messages in SQS?
Q2: How do you handle poison messages in SQS?
Dead Letter Queue (DLQ) pattern:
- Configure
maxReceiveCounton source queue - After N failed processing attempts, message moves to DLQ
- Set up CloudWatch alarm on DLQ depth
- Process DLQ separately:
- Manual review
- Automated retry with fixes
- Archive for analysis
- DLQ retention longer than source (14 days)
- Same queue type (Standard/FIFO)
- Monitor
ApproximateNumberOfMessagesVisible
Q3: Explain SQS visibility timeout and how to set it.
Q3: Explain SQS visibility timeout and how to set it.
Visibility timeout:
- Time a message is hidden after being received
- Prevents other consumers from processing same message
- If not deleted in time, message reappears
- Should be longer than processing time
- Too short = duplicate processing
- Too long = delayed retry on failure
Copy
# For long-running tasks
sqs.change_message_visibility(
QueueUrl=url,
ReceiptHandle=handle,
VisibilityTimeout=additional_seconds
)
Q4: How do you ensure exactly-once processing with SQS?
Q4: How do you ensure exactly-once processing with SQS?
For FIFO queues:
- Built-in deduplication (5-minute window)
- Message deduplication ID
- Content-based deduplication
- Idempotent consumers
- Store processed message IDs in DynamoDB
- Use conditional writes
Copy
# Idempotency check
try:
dynamodb.put_item(
TableName='processed-messages',
Item={'messageId': message_id},
ConditionExpression='attribute_not_exists(messageId)'
)
# Process message
except ConditionalCheckFailedException:
# Already processed, skip
pass
Q5: Design a saga pattern using Step Functions.
Q5: Design a saga pattern using Step Functions.
Saga for distributed transactions:Step Functions features:
Copy
Order Saga:
1. Reserve Inventory → Success → Continue
→ Fail → End
2. Charge Payment → Success → Continue
→ Fail → Compensate: Release Inventory
3. Ship Order → Success → End
→ Fail → Compensate: Refund Payment
→ Compensate: Release Inventory
- Catch blocks for compensation
- Parallel branches for concurrent steps
- Wait states for human approval
- Express workflows for high-volume
🧪 Hands-On Lab: Event-Driven Order System
1
Create SNS Topic
Topic for order events with filter policies
2
Create SQS Queues
Inventory queue, shipping queue with DLQs
3
Set Up Fan-Out
Subscribe queues to SNS with filters
4
Create EventBridge Rule
High-value order detection with Lambda target
5
Build Step Functions Workflow
Order processing with parallel steps and error handling
6
Test End-to-End
Publish events, verify all systems receive appropriate messages
Next Module
DevOps & CI/CD
Master CodePipeline, CloudFormation, CDK, and infrastructure automation