Skip to main content

Chapter 6: Advanced DynamoDB Features

Introduction

Beyond basic CRUD operations, DynamoDB provides powerful advanced features that enable sophisticated application architectures. This chapter explores DynamoDB Streams, Transactions, Global Tables, TTL, and other advanced capabilities that differentiate DynamoDB from traditional databases.

DynamoDB Streams

Overview

DynamoDB Streams captures time-ordered sequences of item-level modifications (create, update, delete) and stores this information for up to 24 hours.
<svg viewBox="0 0 900 600" xmlns="http://www.w3.org/2000/svg">
  <!-- Title -->
  <text x="450" y="30" font-size="18" font-weight="bold" text-anchor="middle" fill="#333">
    DynamoDB Streams Architecture
  </text>

  <!-- DynamoDB Table -->
  <rect x="50" y="80" width="250" height="200" fill="#e3f2fd" stroke="#1976d2" stroke-width="2" rx="5"/>
  <text x="175" y="110" font-size="15" font-weight="bold" text-anchor="middle" fill="#1976d2">
    DynamoDB Table
  </text>

  <!-- Write operations -->
  <rect x="70" y="140" width="210" height="40" fill="#fff" stroke="#1976d2" stroke-width="1" rx="3"/>
  <text x="175" y="165" font-size="12" text-anchor="middle" fill="#333">
    Write Operations
  </text>

  <text x="85" y="200" font-size="11" fill="#666">• PutItem</text>
  <text x="85" y="220" font-size="11" fill="#666">• UpdateItem</text>
  <text x="85" y="240" font-size="11" fill="#666">• DeleteItem</text>

  <!-- Arrow to Stream -->
  <path d="M 300 180 L 380 180" stroke="#1976d2" stroke-width="3" fill="none" marker-end="url(#arrowblue)"/>
  <text x="340" y="170" font-size="11" fill="#666">Captured</text>

  <!-- Stream -->
  <rect x="380" y="80" width="200" height="400" fill="#fff3e0" stroke="#ff9800" stroke-width="2" rx="5"/>
  <text x="480" y="110" font-size="15" font-weight="bold" text-anchor="middle" fill="#ff9800">
    DynamoDB Stream
  </text>

  <!-- Stream Records -->
  <rect x="400" y="130" width="160" height="50" fill="#fff" stroke="#ff9800" stroke-width="1" rx="3"/>
  <text x="480" y="150" font-size="11" text-anchor="middle" fill="#333">Record 1: INSERT</text>
  <text x="480" y="167" font-size="10" text-anchor="middle" fill="#666">NEW_IMAGE</text>

  <rect x="400" y="190" width="160" height="50" fill="#fff" stroke="#ff9800" stroke-width="1" rx="3"/>
  <text x="480" y="210" font-size="11" text-anchor="middle" fill="#333">Record 2: MODIFY</text>
  <text x="480" y="227" font-size="10" text-anchor="middle" fill="#666">NEW_AND_OLD_IMAGES</text>

  <rect x="400" y="250" width="160" height="50" fill="#fff" stroke="#ff9800" stroke-width="1" rx="3"/>
  <text x="480" y="270" font-size="11" text-anchor="middle" fill="#333">Record 3: REMOVE</text>
  <text x="480" y="287" font-size="10" text-anchor="middle" fill="#666">OLD_IMAGE</text>

  <text x="480" y="330" font-size="10" fill="#666">Retention: 24 hours</text>
  <text x="480" y="350" font-size="10" fill="#666">Ordering: Per partition key</text>

  <!-- Consumers -->
  <rect x="620" y="80" width="230" height="400" fill="#e8f5e9" stroke="#4caf50" stroke-width="2" rx="5"/>
  <text x="735" y="110" font-size="15" font-weight="bold" text-anchor="middle" fill="#4caf50">
    Stream Consumers
  </text>

  <!-- Lambda Function -->
  <rect x="640" y="130" width="190" height="60" fill="#fff" stroke="#4caf50" stroke-width="1" rx="3"/>
  <text x="735" y="155" font-size="12" font-weight="bold" text-anchor="middle" fill="#333">
    Lambda Function
  </text>
  <text x="735" y="175" font-size="10" text-anchor="middle" fill="#666">
    Real-time processing
  </text>

  <!-- Kinesis Data Streams -->
  <rect x="640" y="210" width="190" height="60" fill="#fff" stroke="#4caf50" stroke-width="1" rx="3"/>
  <text x="735" y="235" font-size="12" font-weight="bold" text-anchor="middle" fill="#333">
    Kinesis Data Streams
  </text>
  <text x="735" y="255" font-size="10" text-anchor="middle" fill="#666">
    Analytics pipeline
  </text>

  <!-- Application -->
  <rect x="640" y="290" width="190" height="60" fill="#fff" stroke="#4caf50" stroke-width="1" rx="3"/>
  <text x="735" y="315" font-size="12" font-weight="bold" text-anchor="middle" fill="#333">
    Custom Application
  </text>
  <text x="735" y="335" font-size="10" text-anchor="middle" fill="#666">
    DynamoDB Streams API
  </text>

  <!-- ElasticSearch -->
  <rect x="640" y="370" width="190" height="60" fill="#fff" stroke="#4caf50" stroke-width="1" rx="3"/>
  <text x="735" y="395" font-size="12" font-weight="bold" text-anchor="middle" fill="#333">
    Search Index
  </text>
  <text x="735" y="415" font-size="10" text-anchor="middle" fill="#666">
    ElasticSearch sync
  </text>

  <!-- Arrows to consumers -->
  <path d="M 580 160 L 640 160" stroke="#4caf50" stroke-width="2" fill="none" marker-end="url(#arrowgreen)"/>
  <path d="M 580 240 L 640 240" stroke="#4caf50" stroke-width="2" fill="none" marker-end="url(#arrowgreen)"/>
  <path d="M 580 320 L 640 320" stroke="#4caf50" stroke-width="2" fill="none" marker-end="url(#arrowgreen)"/>
  <path d="M 580 400 L 640 400" stroke="#4caf50" stroke-width="2" fill="none" marker-end="url(#arrowgreen)"/>

  <!-- Info box -->
  <rect x="50" y="510" width="800" height="60" fill="#fff9c4" stroke="#fbc02d" stroke-width="2" rx="5"/>
  <text x="450" y="535" font-size="12" font-weight="bold" text-anchor="middle" fill="#333">
    Use Cases: Replication, Aggregation, Notifications, Search Indexing, Audit Logs
  </text>
  <text x="450" y="555" font-size="11" text-anchor="middle" fill="#666">
    Exactly-once processing within Lambda | At-least-once for other consumers
  </text>

  <!-- Arrow markers -->
  <defs>
    <marker id="arrowblue" markerWidth="10" markerHeight="10" refX="9" refY="3" orient="auto">
      <polygon points="0 0, 10 3, 0 6" fill="#1976d2"/>
    </marker>
    <marker id="arrowgreen" markerWidth="10" markerHeight="10" refX="9" refY="3" orient="auto">
      <polygon points="0 0, 10 3, 0 6" fill="#4caf50"/>
    </marker>
  </defs>
</svg>

Stream View Types

// Configure stream when creating table
const createTableWithStream = async () => {
  await dynamodb.createTable({
    TableName: 'Users',
    KeySchema: [
      { AttributeName: 'userId', KeyType: 'HASH' }
    ],
    AttributeDefinitions: [
      { AttributeName: 'userId', AttributeType: 'S' }
    ],
    BillingMode: 'PAY_PER_REQUEST',
    StreamSpecification: {
      StreamEnabled: true,
      StreamViewType: 'NEW_AND_OLD_IMAGES'  // Options below
    }
  }).promise();
};

// Stream View Types:
// 1. KEYS_ONLY - Only partition/sort keys
// 2. NEW_IMAGE - Entire item after modification
// 3. OLD_IMAGE - Entire item before modification
// 4. NEW_AND_OLD_IMAGES - Both old and new images

// Example stream records:

// KEYS_ONLY
{
  eventID: "1",
  eventName: "INSERT",
  dynamodb: {
    Keys: {
      userId: { S: "123" }
    },
    StreamViewType: "KEYS_ONLY"
  }
}

// NEW_IMAGE
{
  eventID: "2",
  eventName: "MODIFY",
  dynamodb: {
    Keys: { userId: { S: "123" } },
    NewImage: {
      userId: { S: "123" },
      name: { S: "Alice Updated" },
      email: { S: "[email protected]" }
    },
    StreamViewType: "NEW_IMAGE"
  }
}

// NEW_AND_OLD_IMAGES
{
  eventID: "3",
  eventName: "MODIFY",
  dynamodb: {
    Keys: { userId: { S: "123" } },
    OldImage: {
      userId: { S: "123" },
      name: { S: "Alice" },
      email: { S: "[email protected]" }
    },
    NewImage: {
      userId: { S: "123" },
      name: { S: "Alice" },
      email: { S: "[email protected]" }
    },
    StreamViewType: "NEW_AND_OLD_IMAGES"
  }
}

Lambda Stream Processing

// Lambda function triggered by DynamoDB Stream
exports.handler = async (event) => {
  for (const record of event.Records) {
    console.log('Event ID:', record.eventID);
    console.log('Event Name:', record.eventName); // INSERT, MODIFY, REMOVE

    const newImage = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage || {});
    const oldImage = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.OldImage || {});

    switch (record.eventName) {
      case 'INSERT':
        await handleInsert(newImage);
        break;
      case 'MODIFY':
        await handleModify(oldImage, newImage);
        break;
      case 'REMOVE':
        await handleRemove(oldImage);
        break;
    }
  }
};

const handleInsert = async (newImage) => {
  // Send welcome email
  await ses.sendEmail({
    To: newImage.email,
    Subject: 'Welcome!',
    Body: `Welcome ${newImage.name}!`
  }).promise();

  // Update analytics
  await cloudwatch.putMetricData({
    Namespace: 'MyApp',
    MetricData: [{
      MetricName: 'NewUsers',
      Value: 1,
      Unit: 'Count'
    }]
  }).promise();
};

const handleModify = async (oldImage, newImage) => {
  // Detect email change
  if (oldImage.email !== newImage.email) {
    await sendEmailVerification(newImage.email);
  }

  // Audit log
  await auditLog.write({
    action: 'USER_UPDATED',
    userId: newImage.userId,
    changes: getChanges(oldImage, newImage),
    timestamp: new Date().toISOString()
  });
};

const handleRemove = async (oldImage) => {
  // Cleanup related data
  await cleanupUserData(oldImage.userId);

  // Send notification
  await sns.publish({
    TopicArn: 'arn:aws:sns:us-east-1:123:user-deletions',
    Message: JSON.stringify({ userId: oldImage.userId })
  }).promise();
};

function getChanges(oldImage, newImage) {
  const changes = {};
  for (const key in newImage) {
    if (oldImage[key] !== newImage[key]) {
      changes[key] = { old: oldImage[key], new: newImage[key] };
    }
  }
  return changes;
}

Stream Use Cases

1. Cross-Region Replication
// Replicate changes to secondary region
exports.handler = async (event) => {
  const secondaryDynamoDB = new AWS.DynamoDB.DocumentClient({
    region: 'eu-west-1'
  });

  for (const record of event.Records) {
    const newImage = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage || {});

    if (record.eventName === 'INSERT' || record.eventName === 'MODIFY') {
      await secondaryDynamoDB.put({
        TableName: 'Users-Replica',
        Item: newImage
      }).promise();
    } else if (record.eventName === 'REMOVE') {
      const keys = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.Keys);
      await secondaryDynamoDB.delete({
        TableName: 'Users-Replica',
        Key: keys
      }).promise();
    }
  }
};
2. Materialized Views / Aggregations
// Maintain order count per user
exports.handler = async (event) => {
  for (const record of event.Records) {
    if (record.eventName === 'INSERT') {
      const order = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);

      await dynamodb.update({
        TableName: 'UserStats',
        Key: { userId: order.userId },
        UpdateExpression: 'ADD orderCount :inc, totalSpent :amount',
        ExpressionAttributeValues: {
          ':inc': 1,
          ':amount': order.total
        }
      }).promise();
    }
  }
};
3. Search Index Synchronization
// Sync to Elasticsearch
const { Client } = require('@elastic/elasticsearch');
const esClient = new Client({ node: 'https://search-domain.es.amazonaws.com' });

exports.handler = async (event) => {
  const operations = [];

  for (const record of event.Records) {
    const keys = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.Keys);

    if (record.eventName === 'REMOVE') {
      operations.push({ delete: { _index: 'users', _id: keys.userId } });
    } else {
      const document = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
      operations.push(
        { index: { _index: 'users', _id: keys.userId } },
        document
      );
    }
  }

  if (operations.length > 0) {
    await esClient.bulk({ body: operations });
  }
};

DynamoDB Transactions

ACID Transactions

DynamoDB supports atomic, consistent, isolated, and durable transactions across multiple items and tables.

Deep Dive: Transactional Isolation and RDBMS Comparison

While DynamoDB provides ACID transactions, the underlying implementation and isolation guarantees differ from traditional SQL databases like PostgreSQL or MySQL.

1. Isolation Levels

In a traditional RDBMS, you can choose between Read Uncommitted, Read Committed, Repeatable Read, and Serializable.
  • DynamoDB Guarantee: DynamoDB provides Serializability for all items within a single transaction.
  • The Catch: This serializability is achieved via Optimistic Concurrency Control (OCC) at the partition level. If two transactions attempt to modify the same item simultaneously, one will succeed and the other will be rejected with a TransactionCanceledException.

2. The 2-Phase Commit (2PC) under the Hood

DynamoDB uses a specialized 2-Phase Commit protocol managed by a “Transaction Coordinator.”
  • Phase 1 (Prepare): The coordinator validates conditions for all items in the transaction and “locks” them.
  • Phase 2 (Commit): If all conditions pass, the coordinator writes the changes. If any fail, it rolls back all changes.
  • Latency Cost: Because it involves multiple round-trips between the coordinator and storage nodes, transactions have higher latency than standard PutItem or UpdateItem operations.

3. Key Differences from RDBMS

FeatureDynamoDB TransactionsTraditional RDBMS (SQL)
ConcurrencyOptimistic (Fail fast on conflict)Pessimistic (Wait for locks)
DeadlocksImpossible (Uses timestamps/OCC)Possible (Requires deadlock detection)
RollbackAutomatic on condition failureManual or automatic
ScopeUp to 100 itemsEntire database/tables
ThroughputLimited per partitionScalable with hardware/sharding
// TransactWriteItems - Multiple writes atomically
const transferFunds = async (fromAccount, toAccount, amount) => {
  try {
    await dynamodb.transactWrite({
      TransactItems: [
        {
          Update: {
            TableName: 'Accounts',
            Key: { accountId: fromAccount },
            UpdateExpression: 'SET balance = balance - :amount',
            ConditionExpression: 'balance >= :amount',
            ExpressionAttributeValues: {
              ':amount': amount
            }
          }
        },
        {
          Update: {
            TableName: 'Accounts',
            Key: { accountId: toAccount },
            UpdateExpression: 'SET balance = balance + :amount',
            ExpressionAttributeValues: {
              ':amount': amount
            }
          }
        },
        {
          Put: {
            TableName: 'Transactions',
            Item: {
              transactionId: generateId(),
              from: fromAccount,
              to: toAccount,
              amount: amount,
              timestamp: new Date().toISOString(),
              status: 'COMPLETED'
            }
          }
        }
      ]
    }).promise();

    return { success: true, message: 'Transfer completed' };
  } catch (error) {
    if (error.code === 'TransactionCanceledException') {
      return { success: false, message: 'Insufficient funds or conflict' };
    }
    throw error;
  }
};

// TransactGetItems - Consistent snapshot read
const getAccountSnapshot = async (accountIds) => {
  const result = await dynamodb.transactGet({
    TransactItems: accountIds.map(id => ({
      Get: {
        TableName: 'Accounts',
        Key: { accountId: id }
      }
    }))
  }).promise();

  return result.Responses.map(r => r.Item);
};

Transaction Constraints

// Transaction limits and constraints:
// - Max 100 items per transaction (TransactWriteItems)
// - Max 25 items per transaction (TransactGetItems)
// - Max 4 MB total transaction size
// - All items must be in same AWS account/region
// - Consumes 2x capacity units

// Example: Handling large transactions
const largeTransaction = async (items) => {
  const batchSize = 100;
  const batches = [];

  for (let i = 0; i < items.length; i += batchSize) {
    batches.push(items.slice(i, i + batchSize));
  }

  // Process in separate transactions
  for (const batch of batches) {
    await dynamodb.transactWrite({
      TransactItems: batch.map(item => ({
        Put: {
          TableName: 'Items',
          Item: item
        }
      }))
    }).promise();
  }
};

// Idempotent transactions using client tokens
const idempotentTransaction = async (operation) => {
  const clientToken = generateIdempotentToken(operation);

  try {
    await dynamodb.transactWrite({
      ClientRequestToken: clientToken,  // Idempotency token
      TransactItems: [/* ... */]
    }).promise();
  } catch (error) {
    if (error.code === 'IdempotentParameterMismatchException') {
      // Same token, different parameters
      throw new Error('Conflicting operation with same token');
    }
    throw error;
  }
};

Transaction Use Cases

Order Processing
const processOrder = async (order) => {
  const transactionItems = [];

  // Reserve inventory for each item
  for (const item of order.items) {
    transactionItems.push({
      Update: {
        TableName: 'Inventory',
        Key: { productId: item.productId },
        UpdateExpression: 'SET available = available - :qty',
        ConditionExpression: 'available >= :qty',
        ExpressionAttributeValues: {
          ':qty': item.quantity
        }
      }
    });
  }

  // Create order record
  transactionItems.push({
    Put: {
      TableName: 'Orders',
      Item: {
        orderId: order.id,
        userId: order.userId,
        items: order.items,
        total: order.total,
        status: 'CONFIRMED',
        createdAt: new Date().toISOString()
      },
      ConditionExpression: 'attribute_not_exists(orderId)'
    }
  });

  // Charge customer
  transactionItems.push({
    Update: {
      TableName: 'Users',
      Key: { userId: order.userId },
      UpdateExpression: 'SET accountBalance = accountBalance - :total',
      ConditionExpression: 'accountBalance >= :total',
      ExpressionAttributeValues: {
        ':total': order.total
      }
    }
  });

  // Execute atomically
  await dynamodb.transactWrite({
    TransactItems: transactionItems
  }).promise();
};

Global Tables

Multi-Region Replication

Global Tables provide managed multi-region, multi-active replication.
// Create global table (Version 2019.11.21)
const createGlobalTable = async () => {
  // 1. Create table in first region (us-east-1)
  await dynamodbUSEast.createTable({
    TableName: 'GlobalUsers',
    KeySchema: [
      { AttributeName: 'userId', KeyType: 'HASH' }
    ],
    AttributeDefinitions: [
      { AttributeName: 'userId', AttributeType: 'S' }
    ],
    BillingMode: 'PAY_PER_REQUEST',
    StreamSpecification: {
      StreamEnabled: true,
      StreamViewType: 'NEW_AND_OLD_IMAGES'
    }
  }).promise();

  // 2. Enable global table
  await dynamodbUSEast.updateTable({
    TableName: 'GlobalUsers',
    ReplicaUpdates: [
      {
        Create: {
          RegionName: 'eu-west-1'
        }
      },
      {
        Create: {
          RegionName: 'ap-southeast-1'
        }
      }
    ]
  }).promise();

  console.log('Global table created in 3 regions');
};

// Write to any region - automatically replicated
const writeToGlobalTable = async () => {
  // Write in US
  await dynamodbUSEast.put({
    TableName: 'GlobalUsers',
    Item: {
      userId: '123',
      name: 'Alice',
      region: 'us-east-1',
      timestamp: Date.now()
    }
  }).promise();

  // Propagates to EU and Asia (typically < 1 second)
  // Can immediately read from any region
};

Conflict Resolution

// Global Tables use last-writer-wins based on timestamps

Deep Dive: Global Tables and Distributed Consistency

The “multi-active” nature of Global Tables introduces classic distributed systems challenges. While DynamoDB handles the replication, developers must understand the theoretical underpinnings.

1. Last Writer Wins (LWW) vs. CRDTs

In the original 2007 Dynamo paper, conflict resolution was handled via Vector Clocks, allowing for complex merging or client-side reconciliation.
  • Modern DynamoDB Approach: For simplicity and performance, Global Tables use Last Writer Wins (LWW) based on a system-level wall clock (NTP-synchronized).
  • The Trade-off: LWW is simpler but can lead to data loss in high-concurrency “write-write” conflict scenarios where the “losing” write is completely overwritten.
  • CRDTs (Conflict-free Replicated Data Types): While DynamoDB doesn’t natively expose CRDTs, you can implement them at the application level (e.g., G-Counters or PN-Counters) using the ADD operation in UpdateItem, which is commutative.

2. Consistency Model: Eventual but Fast

Global Tables are Eventually Consistent across regions.
  • Replication Latency: Typically under 1 second globally.
  • Theoretical Limit: Because it’s an asynchronous replication model, Global Tables fall into the AP (Availability / Partition Tolerance) category of the CAP theorem. They prioritize being able to write to any region over immediate global consistency.

3. Preventing Replication Loops

DynamoDB uses internal metadata to track the origin region of a write. This ensures that a write replicated from Region A to Region B does not get “re-replicated” back to Region A, preventing infinite loops.
// Global Tables use last-writer-wins based on timestamps
// Example conflict scenario:

// Time T0: User writes in us-east-1
await dynamodbUSEast.update({
  TableName: 'GlobalUsers',
  Key: { userId: '123' },
  UpdateExpression: 'SET #name = :name',
  ExpressionAttributeNames: { '#name': 'name' },
  ExpressionAttributeValues: { ':name': 'Alice' }
}).promise();

// Time T0+50ms: User writes in eu-west-1
await dynamodbEU.update({
  TableName: 'GlobalUsers',
  Key: { userId: '123' },
  UpdateExpression: 'SET #name = :name',
  ExpressionAttributeNames: { '#name': 'name' },
  ExpressionAttributeValues: { ':name': 'Bob' }
}).promise();

// Result: Last write wins (Bob at T0+50ms)

// Prevent conflicts using version numbers:
const updateWithVersioning = async (region, userId, newName, currentVersion) => {
  const client = getRegionClient(region);

  try {
    await client.update({
      TableName: 'GlobalUsers',
      Key: { userId: userId },
      UpdateExpression: 'SET #name = :name, version = :newVersion',
      ConditionExpression: 'version = :currentVersion',
      ExpressionAttributeNames: { '#name': 'name' },
      ExpressionAttributeValues: {
        ':name': newName,
        ':newVersion': currentVersion + 1,
        ':currentVersion': currentVersion
      }
    }).promise();

    return { success: true };
  } catch (error) {
    if (error.code === 'ConditionalCheckFailedException') {
      return { success: false, reason: 'Version conflict' };
    }
    throw error;
  }
};

Time To Live (TTL)

Automatic Item Expiration

// Enable TTL on a table
const enableTTL = async () => {
  await dynamodb.updateTimeToLive({
    TableName: 'Sessions',
    TimeToLiveSpecification: {
      Enabled: true,
      AttributeName: 'ttl'  // Unix timestamp in seconds
    }
  }).promise();
};

// Write items with TTL
const createSession = async (sessionId, userId) => {
  const ttl = Math.floor(Date.now() / 1000) + (30 * 60); // 30 minutes

  await dynamodb.put({
    TableName: 'Sessions',
    Item: {
      sessionId: sessionId,
      userId: userId,
      createdAt: new Date().toISOString(),
      ttl: ttl  // Expires in 30 minutes
    }
  }).promise();
};

// Use cases for TTL:
// 1. Session management
const createSessionWithExpiry = async (sessionId, userId, durationMinutes = 30) => {
  await dynamodb.put({
    TableName: 'Sessions',
    Item: {
      sessionId: sessionId,
      userId: userId,
      ttl: Math.floor(Date.now() / 1000) + (durationMinutes * 60)
    }
  }).promise();
};

// 2. Temporary data / caching
const cacheData = async (key, value, ttlSeconds = 3600) => {
  await dynamodb.put({
    TableName: 'Cache',
    Item: {
      cacheKey: key,
      value: value,
      ttl: Math.floor(Date.now() / 1000) + ttlSeconds
    }
  }).promise();
};

// 3. Event logs with retention
const logEvent = async (event, retentionDays = 30) => {
  await dynamodb.put({
    TableName: 'EventLogs',
    Item: {
      eventId: generateId(),
      eventType: event.type,
      data: event.data,
      timestamp: new Date().toISOString(),
      ttl: Math.floor(Date.now() / 1000) + (retentionDays * 24 * 60 * 60)
    }
  }).promise();
};

// 4. Time-limited access tokens
const createAccessToken = async (tokenId, userId, expiresIn = 3600) => {
  await dynamodb.put({
    TableName: 'AccessTokens',
    Item: {
      tokenId: tokenId,
      userId: userId,
      scope: ['read', 'write'],
      ttl: Math.floor(Date.now() / 1000) + expiresIn
    }
  }).promise();
};

// TTL Streams integration
// Capture TTL deletions in DynamoDB Streams
exports.handler = async (event) => {
  for (const record of event.Records) {
    if (record.userIdentity?.type === 'Service' &&
        record.userIdentity?.principalId === 'dynamodb.amazonaws.com') {
      // TTL deletion detected
      const expiredItem = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.OldImage);
      console.log('Item expired via TTL:', expiredItem);

      // Perform cleanup actions
      await cleanupExpiredItem(expiredItem);
    }
  }
};

Point-in-Time Recovery (PITR)

// Enable PITR
const enablePITR = async (tableName) => {
  await dynamodb.updateContinuousBackups({
    TableName: tableName,
    PointInTimeRecoverySpecification: {
      PointInTimeRecoveryEnabled: true
    }
  }).promise();

  console.log('PITR enabled - can restore to any point in last 35 days');
};

// Restore table to specific point in time
const restoreTable = async (sourceTable, targetTable, restoreDateTime) => {
  await dynamodb.restoreTableToPointInTime({
    SourceTableName: sourceTable,
    TargetTableName: targetTable,
    RestoreDateTime: restoreDateTime,  // Date object
    UseLatestRestorableTime: false
  }).promise();

  console.log(`Restored ${sourceTable} to ${targetTable} at ${restoreDateTime}`);
};

// Disaster recovery example
const performDisasterRecovery = async () => {
  const yesterday = new Date();
  yesterday.setDate(yesterday.getDate() - 1);

  await restoreTable(
    'ProductionTable',
    'ProductionTable-Recovered',
    yesterday
  );
};

PartiQL Support

SQL-Compatible Query Language

// PartiQL queries on DynamoDB
const { ExecuteStatement } = require('@aws-sdk/client-dynamodb');

// SELECT
const selectQuery = async () => {
  const result = await dynamodb.executeStatement({
    Statement: `SELECT * FROM Users WHERE userId = ?`,
    Parameters: [{ S: '12345' }]
  }).promise();

  return result.Items;
};

// INSERT
const insertQuery = async () => {
  await dynamodb.executeStatement({
    Statement: `INSERT INTO Users VALUE {'userId': ?, 'name': ?, 'email': ?}`,
    Parameters: [
      { S: '12345' },
      { S: 'Alice' },
      { S: '[email protected]' }
    ]
  }).promise();
};

// UPDATE
const updateQuery = async () => {
  await dynamodb.executeStatement({
    Statement: `UPDATE Users SET email = ? WHERE userId = ?`,
    Parameters: [
      { S: '[email protected]' },
      { S: '12345' }
    ]
  }).promise();
};

// DELETE
const deleteQuery = async () => {
  await dynamodb.executeStatement({
    Statement: `DELETE FROM Users WHERE userId = ?`,
    Parameters: [{ S: '12345' }]
  }).promise();
};

// Batch operations with PartiQL
const batchExecute = async () => {
  await dynamodb.batchExecuteStatement({
    Statements: [
      {
        Statement: `INSERT INTO Users VALUE {'userId': ?, 'name': ?}`,
        Parameters: [{ S: '123' }, { S: 'Alice' }]
      },
      {
        Statement: `INSERT INTO Users VALUE {'userId': ?, 'name': ?}`,
        Parameters: [{ S: '456' }, { S: 'Bob' }]
      }
    ]
  }).promise();
};

Contributor Insights

Identifying Hot Keys

// Enable Contributor Insights
const enableContributorInsights = async (tableName) => {
  await dynamodb.updateContributorInsights({
    TableName: tableName,
    ContributorInsightsAction: 'ENABLE'
  }).promise();

  console.log('Contributor Insights enabled');
};

// View most accessed partition keys
const getTopAccessedKeys = async (tableName) => {
  const cloudwatch = new AWS.CloudWatch();

  const insights = await cloudwatch.getInsightRuleReport({
    RuleName: `DynamoDBContributorInsights-${tableName}`,
    StartTime: new Date(Date.now() - 3600000),  // Last hour
    EndTime: new Date(),
    Period: 300
  }).promise();

  return insights.KeyLabels;  // Top partition keys by traffic
};

Interview Questions and Answers

Question 1: How do DynamoDB Streams differ from Kinesis Data Streams?

Answer: DynamoDB Streams:
  • Captures item-level changes in DynamoDB tables
  • Retention: 24 hours (fixed)
  • Ordering: Per partition key only
  • Shards: Managed automatically
  • Cost: Included with table (no extra charge for reads)
  • Use case: React to DynamoDB changes
Kinesis Data Streams:
  • General-purpose streaming service
  • Retention: 24 hours to 365 days (configurable)
  • Ordering: Per partition key (shard key)
  • Shards: Manual management required
  • Cost: Per shard-hour + PUT payload units
  • Use case: Real-time data ingestion, custom streaming
Integration:
// Can connect DynamoDB Streams to Kinesis for longer retention
exports.handler = async (event) => {
  const kinesis = new AWS.Kinesis();

  for (const record of event.Records) {
    await kinesis.putRecord({
      StreamName: 'dynamodb-changes',
      Data: JSON.stringify(record),
      PartitionKey: record.dynamodb.Keys.userId.S
    }).promise();
  }
};

Question 2: When should you use transactions vs batch operations?

Answer: Use Transactions When:
  • Need ACID guarantees (all-or-nothing)
  • Cross-item consistency required
  • Conditional writes across multiple items
  • Financial operations, inventory management
Use Batch Operations When:
  • Independent operations (no dependencies)
  • Can tolerate partial failures
  • Higher throughput needed
  • Cost optimization (batch is cheaper)
Comparison:
// Transaction (ACID, all-or-nothing)
await dynamodb.transactWrite({
  TransactItems: [
    { Update: { /* debit account A */ } },
    { Update: { /* credit account B */ } }
  ]
}).promise();
// Both succeed or both fail
// Costs: 2 WCUs per item

// Batch (independent, can partially fail)
await dynamodb.batchWrite({
  RequestItems: {
    'Logs': [
      { PutRequest: { Item: log1 } },
      { PutRequest: { Item: log2 } }
    ]
  }
}).promise();
// Some can succeed, some can fail
// Costs: 1 WCU per item

Question 3: Explain conflict resolution in Global Tables.

Answer: Global Tables use last-writer-wins conflict resolution based on timestamps. How it works:
  1. Each write includes an internal timestamp
  2. During concurrent writes to same item in different regions
  3. The write with the latest timestamp wins
  4. Losing write is discarded
Example:
// Region us-east-1 at 10:00:00.000
update({ userId: '123', name: 'Alice' })

// Region eu-west-1 at 10:00:00.100
update({ userId: '123', name: 'Bob' })

// Final state (all regions): name = 'Bob'
// The 10:00:00.100 write wins
Handling conflicts:
// Use version numbers for application-level conflict detection
await dynamodb.update({
  Key: { userId: '123' },
  UpdateExpression: 'SET #name = :name, version = :newVer',
  ConditionExpression: 'version = :currentVer',
  ExpressionAttributeNames: { '#name': 'name' },
  ExpressionAttributeValues: {
    ':name': 'Alice',
    ':newVer': 2,
    ':currentVer': 1
  }
}).promise();
// Fails if version changed (conflict detected)

Question 4: How do you implement audit logging with DynamoDB Streams?

Answer:
// Lambda function processes stream records
exports.handler = async (event) => {
  const auditRecords = [];

  for (const record of event.Records) {
    const keys = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.Keys);
    const oldImage = record.dynamodb.OldImage ?
      AWS.DynamoDB.Converter.unmarshall(record.dynamodb.OldImage) : null;
    const newImage = record.dynamodb.NewImage ?
      AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage) : null;

    const auditRecord = {
      auditId: generateId(),
      tableName: record.eventSourceARN.split('/')[1],
      eventType: record.eventName,  // INSERT, MODIFY, REMOVE
      itemKey: keys,
      oldValue: oldImage,
      newValue: newImage,
      changes: getFieldChanges(oldImage, newImage),
      timestamp: new Date(record.dynamodb.ApproximateCreationDateTime * 1000).toISOString(),
      userIdentity: record.userIdentity
    };

    auditRecords.push(auditRecord);
  }

  // Write to audit table
  await dynamodb.batchWrite({
    RequestItems: {
      'AuditLog': auditRecords.map(record => ({
        PutRequest: { Item: record }
      }))
    }
  }).promise();

  // Also send to S3 for long-term storage
  await s3.putObject({
    Bucket: 'audit-logs',
    Key: `${new Date().toISOString().split('T')[0]}/${Date.now()}.json`,
    Body: JSON.stringify(auditRecords)
  }).promise();
};

function getFieldChanges(oldImage, newImage) {
  if (!oldImage || !newImage) return null;

  const changes = {};
  const allKeys = new Set([...Object.keys(oldImage), ...Object.keys(newImage)]);

  for (const key of allKeys) {
    if (oldImage[key] !== newImage[key]) {
      changes[key] = {
        from: oldImage[key],
        to: newImage[key]
      };
    }
  }

  return Object.keys(changes).length > 0 ? changes : null;
}

Question 5: What are the limitations of DynamoDB transactions?

Answer: Hard Limits:
  • Max 100 items per TransactWriteItems
  • Max 25 items per TransactGetItems
  • Max 4 MB total transaction size
  • All items must be in same AWS account/region
  • Cannot mix transactions across tables with different billing modes (before 2020)
Cost Impact:
  • Consumes 2x capacity units
  • TransactWriteItems: 2 WCUs per KB per item
  • TransactGetItems: 2 RCUs per 4KB per item
Performance:
  • Higher latency than non-transactional operations
  • Limited throughput per partition
Workarounds:
// For > 100 items: Split into multiple transactions
const largeTransactionalWrite = async (items) => {
  const batches = chunk(items, 100);

  for (const batch of batches) {
    await dynamodb.transactWrite({
      TransactItems: batch.map(item => ({
        Put: { TableName: 'Items', Item: item }
      }))
    }).promise();
  }
  // Note: No longer atomic across batches!
};

// For cross-region: Use application-level coordination
// For large data: Use conditional updates instead of transactions

Question 6: How does TTL work and what are its limitations?

Answer: How TTL Works:
  1. Enable TTL on a table attribute
  2. Set attribute to Unix timestamp (seconds since epoch)
  3. DynamoDB automatically deletes items within 48 hours after expiration
  4. Deletions are eventually consistent
  5. No additional cost for TTL deletions
Limitations:
  • Deletion within 48 hours (not immediate)
  • Cannot guarantee exact deletion time
  • TTL attribute must be Number type (Unix timestamp in seconds)
  • Deleted items still consume storage until actually deleted
  • GSIs are updated after base table deletion
Example:
// Enable TTL
await dynamodb.updateTimeToLive({
  TableName: 'Sessions',
  TimeToLiveSpecification: {
    Enabled: true,
    AttributeName: 'expiresAt'
  }
}).promise();

// Create item that expires
await dynamodb.put({
  TableName: 'Sessions',
  Item: {
    sessionId: '123',
    userId: 'user-456',
    expiresAt: Math.floor(Date.now() / 1000) + 3600  // 1 hour
  }
}).promise();

// Item deleted within 48 hours after expiresAt timestamp
Detecting TTL deletions:
// Use DynamoDB Streams to detect TTL deletions
exports.handler = async (event) => {
  for (const record of event.Records) {
    if (record.eventName === 'REMOVE' &&
        record.userIdentity?.principalId === 'dynamodb.amazonaws.com') {
      // This is a TTL deletion
      const expiredItem = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.OldImage);
      console.log('TTL deleted:', expiredItem);

      // Perform cleanup
      await cleanupExpiredSession(expiredItem.sessionId);
    }
  }
};

Question 7: Design a real-time leaderboard using DynamoDB Streams.

Answer: Architecture:
  1. DynamoDB stores user scores
  2. Stream captures score updates
  3. Lambda updates Redis sorted set
  4. Application reads from Redis for rankings
Implementation:
// 1. DynamoDB table for scores
await dynamodb.createTable({
  TableName: 'GameScores',
  KeySchema: [
    { AttributeName: 'userId', KeyType: 'HASH' }
  ],
  StreamSpecification: {
    StreamEnabled: true,
    StreamViewType: 'NEW_IMAGE'
  }
}).promise();

// 2. Update score
await dynamodb.update({
  TableName: 'GameScores',
  Key: { userId: 'user123' },
  UpdateExpression: 'SET score = score + :points',
  ExpressionAttributeValues: { ':points': 100 }
}).promise();

// 3. Stream processor updates Redis
exports.handler = async (event) => {
  const Redis = require('ioredis');
  const redis = new Redis();

  for (const record of event.Records) {
    if (record.eventName === 'INSERT' || record.eventName === 'MODIFY') {
      const data = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);

      // Update Redis sorted set (O(log N))
      await redis.zadd('leaderboard', data.score, data.userId);

      // Also update user's rank cache
      const rank = await redis.zrevrank('leaderboard', data.userId);
      await redis.setex(`rank:${data.userId}`, 60, rank + 1);
    } else if (record.eventName === 'REMOVE') {
      const data = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.Keys);
      await redis.zrem('leaderboard', data.userId);
    }
  }
};

// 4. Application reads from Redis
class LeaderboardService {
  async getTopPlayers(count = 100) {
    return await redis.zrevrange('leaderboard', 0, count - 1, 'WITHSCORES');
  }

  async getUserRank(userId) {
    const cached = await redis.get(`rank:${userId}`);
    if (cached) return parseInt(cached);

    const rank = await redis.zrevrank('leaderboard', userId);
    return rank !== null ? rank + 1 : null;
  }

  async getPlayersNearUser(userId, range = 5) {
    const rank = await redis.zrevrank('leaderboard', userId);
    if (rank === null) return [];

    const start = Math.max(0, rank - range);
    const end = rank + range;

    return await redis.zrevrange('leaderboard', start, end, 'WITHSCORES');
  }
}
Benefits:
  • Sub-millisecond leaderboard queries
  • Millions of score updates/sec
  • Real-time ranking updates
  • Scalable to billions of users

Summary

Advanced Features Overview:
  1. DynamoDB Streams:
    • Captures item-level changes
    • Powers real-time processing
    • 24-hour retention
    • Use for: replication, aggregations, notifications
  2. Transactions:
    • ACID guarantees
    • Up to 100 items (writes) or 25 items (reads)
    • 2x capacity cost
    • Use for: financial operations, multi-item consistency
  3. Global Tables:
    • Multi-region, multi-active replication
    • Sub-second replication latency
    • Last-writer-wins conflict resolution
    • Use for: global applications, disaster recovery
  4. TTL:
    • Automatic item expiration
    • No additional cost
    • Deletion within 48 hours
    • Use for: sessions, temporary data, caching
  5. PITR:
    • 35-day recovery window
    • Point-in-time restore
    • Continuous backups
    • Use for: disaster recovery, compliance
These advanced features enable sophisticated architectures beyond basic CRUD operations, making DynamoDB suitable for complex, mission-critical applications.