Skip to main content

Encryption: Data at Rest & In Transit

Encryption is the foundation of PHI protection. This module covers everything from basic cryptographic concepts to production-ready implementations for healthcare applications.
Learning Objectives:
  • Understand symmetric vs asymmetric encryption
  • Implement AES-256-GCM for data at rest
  • Configure TLS 1.3 for data in transit
  • Design key management systems
  • Implement field-level and database encryption

Why Encryption Matters for HIPAA

┌─────────────────────────────────────────────────────────────────────────────┐
│                    HIPAA ENCRYPTION REQUIREMENTS                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  SECURITY RULE §164.312(a)(2)(iv)        SECURITY RULE §164.312(e)(2)(ii)  │
│  ────────────────────────────────        ─────────────────────────────────  │
│  "Implement a mechanism to               "Implement a mechanism to          │
│   encrypt and decrypt ePHI"               encrypt ePHI whenever deemed      │
│                                           appropriate"                       │
│                                                                              │
│  DATA AT REST                            DATA IN TRANSIT                    │
│  ─────────────                           ───────────────                    │
│  • Database encryption                   • TLS 1.2+ required                │
│  • File system encryption                • Certificate management           │
│  • Backup encryption                     • VPN for internal traffic         │
│  • Key management                        • API security                     │
│                                                                              │
│  SAFE HARBOR PROVISION                                                      │
│  ─────────────────────                                                      │
│  If encrypted data is breached, it may NOT be considered a breach if:      │
│  • Encryption meets NIST standards                                          │
│  • Keys were not compromised                                                │
│  • Data was rendered "unusable, unreadable, or indecipherable"             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Cryptography Fundamentals

Symmetric vs Asymmetric Encryption

"""
Symmetric Encryption:
- Same key for encryption and decryption
- Fast, suitable for bulk data
- Key distribution is the challenge
- Examples: AES, ChaCha20

Asymmetric Encryption:
- Public key encrypts, private key decrypts
- Slower, suitable for small data
- Solves key distribution
- Examples: RSA, ECDSA, Ed25519
"""

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
import os

# Symmetric: AES-256-GCM
def symmetric_encrypt(plaintext: bytes, key: bytes) -> tuple[bytes, bytes]:
    """Encrypt using AES-256-GCM"""
    nonce = os.urandom(12)  # 96 bits for GCM
    aesgcm = AESGCM(key)
    ciphertext = aesgcm.encrypt(nonce, plaintext, None)
    return nonce, ciphertext

def symmetric_decrypt(nonce: bytes, ciphertext: bytes, key: bytes) -> bytes:
    """Decrypt using AES-256-GCM"""
    aesgcm = AESGCM(key)
    return aesgcm.decrypt(nonce, ciphertext, None)


# Asymmetric: RSA-OAEP
def asymmetric_encrypt(plaintext: bytes, public_key: rsa.RSAPublicKey) -> bytes:
    """Encrypt using RSA-OAEP (for small data like symmetric keys)"""
    return public_key.encrypt(
        plaintext,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )

def asymmetric_decrypt(ciphertext: bytes, private_key: rsa.RSAPrivateKey) -> bytes:
    """Decrypt using RSA-OAEP"""
    return private_key.decrypt(
        ciphertext,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )

Choosing the Right Algorithm

Use CaseAlgorithmKey SizeNotes
Data at restAES-256-GCM256 bitsHIPAA recommended
File encryptionAES-256-GCM256 bitsWith authenticated encryption
Key wrappingAES-256-KW256 bitsFor protecting DEKs
Key exchangeECDH P-256256 bitsFor TLS
Digital signaturesEd25519256 bitsFor audit logs
Password hashingArgon2id-For user passwords

Data at Rest Encryption

Envelope Encryption Pattern

Envelope encryption pattern showing Master Key, DEKs, and PHI data hierarchy

Envelope Encryption Key Hierarchy

The envelope encryption pattern provides multiple layers of protection:
  • Master Key (KEK): Stored in HSM / AWS KMS / HashiCorp Vault
  • Data Encryption Keys (DEKs): Unique per record, stored encrypted
  • PHI Data: Encrypted with the DEK

Implementation with AWS KMS

import boto3
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import base64
import json
from dataclasses import dataclass
from typing import Optional

@dataclass
class EncryptedData:
    """Container for encrypted data with metadata"""
    ciphertext: bytes
    encrypted_dek: bytes
    nonce: bytes
    key_id: str
    algorithm: str = "AES-256-GCM"
    
    def to_dict(self) -> dict:
        return {
            "ciphertext": base64.b64encode(self.ciphertext).decode(),
            "encrypted_dek": base64.b64encode(self.encrypted_dek).decode(),
            "nonce": base64.b64encode(self.nonce).decode(),
            "key_id": self.key_id,
            "algorithm": self.algorithm,
        }
    
    @classmethod
    def from_dict(cls, data: dict) -> "EncryptedData":
        return cls(
            ciphertext=base64.b64decode(data["ciphertext"]),
            encrypted_dek=base64.b64decode(data["encrypted_dek"]),
            nonce=base64.b64decode(data["nonce"]),
            key_id=data["key_id"],
            algorithm=data.get("algorithm", "AES-256-GCM"),
        )


class EnvelopeEncryption:
    """
    HIPAA-compliant envelope encryption using AWS KMS
    """
    
    def __init__(self, kms_key_id: str, region: str = "us-east-1"):
        self.kms = boto3.client("kms", region_name=region)
        self.kms_key_id = kms_key_id
        
    def encrypt(
        self,
        plaintext: bytes,
        context: Optional[dict] = None
    ) -> EncryptedData:
        """
        Encrypt data using envelope encryption
        
        Args:
            plaintext: Data to encrypt
            context: Encryption context for additional AAD
        """
        # Generate a data encryption key from KMS
        dek_response = self.kms.generate_data_key(
            KeyId=self.kms_key_id,
            KeySpec="AES_256",
            EncryptionContext=context or {}
        )
        
        # Extract plaintext and encrypted DEK
        dek_plaintext = dek_response["Plaintext"]
        dek_encrypted = dek_response["CiphertextBlob"]
        
        # Encrypt the data with the DEK
        nonce = os.urandom(12)
        aesgcm = AESGCM(dek_plaintext)
        
        # Include context as AAD if provided
        aad = json.dumps(context).encode() if context else None
        ciphertext = aesgcm.encrypt(nonce, plaintext, aad)
        
        # Zero out the plaintext DEK
        dek_plaintext = b'\x00' * len(dek_plaintext)
        
        return EncryptedData(
            ciphertext=ciphertext,
            encrypted_dek=dek_encrypted,
            nonce=nonce,
            key_id=self.kms_key_id,
        )
    
    def decrypt(
        self,
        encrypted_data: EncryptedData,
        context: Optional[dict] = None
    ) -> bytes:
        """
        Decrypt data using envelope encryption
        """
        # Decrypt the DEK using KMS
        dek_response = self.kms.decrypt(
            CiphertextBlob=encrypted_data.encrypted_dek,
            EncryptionContext=context or {}
        )
        
        dek_plaintext = dek_response["Plaintext"]
        
        # Decrypt the data with the DEK
        aesgcm = AESGCM(dek_plaintext)
        aad = json.dumps(context).encode() if context else None
        plaintext = aesgcm.decrypt(
            encrypted_data.nonce,
            encrypted_data.ciphertext,
            aad
        )
        
        # Zero out the plaintext DEK
        dek_plaintext = b'\x00' * len(dek_plaintext)
        
        return plaintext


# Usage example
encryption = EnvelopeEncryption(kms_key_id="alias/hipaa-phi-key")

# Encrypt patient data
patient_data = json.dumps({
    "patient_id": "P12345",
    "diagnosis": "Type 2 Diabetes",
    "medications": ["Metformin 500mg"]
}).encode()

# Context binds encryption to specific patient
context = {"patient_id": "P12345", "record_type": "medical_history"}

encrypted = encryption.encrypt(patient_data, context)

# Store encrypted_data.to_dict() in database

# Later, decrypt
decrypted = encryption.decrypt(encrypted, context)

Field-Level Encryption

from typing import Any, Dict, List
from dataclasses import dataclass, field

@dataclass
class FieldEncryptionConfig:
    """Configuration for field-level encryption"""
    field_name: str
    encrypt: bool = True
    searchable: bool = False  # If true, use deterministic encryption
    indexed: bool = False

class FieldLevelEncryption:
    """
    Encrypt specific fields in documents
    
    Useful when you need to:
    - Search on some fields while encrypting others
    - Apply different encryption policies to different data
    - Minimize encryption overhead
    """
    
    def __init__(
        self,
        envelope_encryption: EnvelopeEncryption,
        encrypted_fields: List[FieldEncryptionConfig]
    ):
        self.encryption = envelope_encryption
        self.field_configs = {f.field_name: f for f in encrypted_fields}
        
    def encrypt_document(
        self,
        document: Dict[str, Any],
        context: dict
    ) -> Dict[str, Any]:
        """Encrypt specified fields in a document"""
        
        encrypted_doc = {}
        
        for key, value in document.items():
            if key in self.field_configs and self.field_configs[key].encrypt:
                config = self.field_configs[key]
                
                # Serialize value
                serialized = json.dumps(value).encode()
                
                # Encrypt
                if config.searchable:
                    # Deterministic encryption for searchable fields
                    encrypted = self._deterministic_encrypt(serialized, context)
                else:
                    # Randomized encryption for maximum security
                    encrypted = self.encryption.encrypt(serialized, context)
                
                encrypted_doc[key] = {
                    "__encrypted__": True,
                    "data": encrypted.to_dict(),
                }
            else:
                encrypted_doc[key] = value
                
        return encrypted_doc
    
    def decrypt_document(
        self,
        document: Dict[str, Any],
        context: dict
    ) -> Dict[str, Any]:
        """Decrypt specified fields in a document"""
        
        decrypted_doc = {}
        
        for key, value in document.items():
            if isinstance(value, dict) and value.get("__encrypted__"):
                encrypted_data = EncryptedData.from_dict(value["data"])
                decrypted_bytes = self.encryption.decrypt(encrypted_data, context)
                decrypted_doc[key] = json.loads(decrypted_bytes)
            else:
                decrypted_doc[key] = value
                
        return decrypted_doc
    
    def _deterministic_encrypt(
        self,
        plaintext: bytes,
        context: dict
    ) -> EncryptedData:
        """
        Deterministic encryption for searchable fields
        
        Warning: Less secure than randomized encryption.
        Use only when search is required.
        """
        # Derive a deterministic nonce from the plaintext and context
        import hashlib
        nonce_input = plaintext + json.dumps(context, sort_keys=True).encode()
        nonce = hashlib.sha256(nonce_input).digest()[:12]
        
        # Use a stable DEK for this context
        dek = self._get_deterministic_dek(context)
        
        aesgcm = AESGCM(dek)
        ciphertext = aesgcm.encrypt(nonce, plaintext, None)
        
        return EncryptedData(
            ciphertext=ciphertext,
            encrypted_dek=b"",  # DEK derived, not stored
            nonce=nonce,
            key_id="deterministic",
        )


# Usage
field_encryption = FieldLevelEncryption(
    envelope_encryption=encryption,
    encrypted_fields=[
        FieldEncryptionConfig("ssn", encrypt=True, searchable=False),
        FieldEncryptionConfig("name", encrypt=True, searchable=True),
        FieldEncryptionConfig("diagnosis", encrypt=True, searchable=False),
        FieldEncryptionConfig("patient_id", encrypt=False),  # Not encrypted
    ]
)

patient_record = {
    "patient_id": "P12345",
    "name": "John Smith",
    "ssn": "123-45-6789",
    "diagnosis": "Hypertension",
    "created_at": "2024-01-15",
}

encrypted_record = field_encryption.encrypt_document(
    patient_record,
    context={"patient_id": "P12345"}
)

Database Encryption

PostgreSQL Transparent Data Encryption (TDE)

-- PostgreSQL with pgcrypto for column-level encryption

-- Enable pgcrypto extension
CREATE EXTENSION IF NOT EXISTS pgcrypto;

-- Create encrypted column function
CREATE OR REPLACE FUNCTION encrypt_phi(
    data TEXT,
    key_id TEXT
) RETURNS BYTEA AS $$
DECLARE
    encryption_key BYTEA;
BEGIN
    -- Get key from secure key storage (simplified example)
    -- In production, use AWS KMS, HashiCorp Vault, etc.
    encryption_key := get_encryption_key(key_id);
    
    RETURN pgp_sym_encrypt(
        data,
        encode(encryption_key, 'base64'),
        'cipher-algo=aes256, compress-algo=0'
    );
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

CREATE OR REPLACE FUNCTION decrypt_phi(
    encrypted_data BYTEA,
    key_id TEXT
) RETURNS TEXT AS $$
DECLARE
    encryption_key BYTEA;
BEGIN
    encryption_key := get_encryption_key(key_id);
    
    RETURN pgp_sym_decrypt(
        encrypted_data,
        encode(encryption_key, 'base64')
    );
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- Create patients table with encrypted columns
CREATE TABLE patients (
    id SERIAL PRIMARY KEY,
    -- Encrypted columns
    ssn_encrypted BYTEA,
    name_encrypted BYTEA,
    diagnosis_encrypted BYTEA,
    -- Metadata (not encrypted)
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    -- Key reference
    encryption_key_id TEXT NOT NULL
);

-- Insert with encryption
INSERT INTO patients (
    ssn_encrypted,
    name_encrypted,
    diagnosis_encrypted,
    encryption_key_id
) VALUES (
    encrypt_phi('123-45-6789', 'key_v1'),
    encrypt_phi('John Smith', 'key_v1'),
    encrypt_phi('Type 2 Diabetes', 'key_v1'),
    'key_v1'
);

-- Select with decryption
SELECT 
    id,
    decrypt_phi(ssn_encrypted, encryption_key_id) as ssn,
    decrypt_phi(name_encrypted, encryption_key_id) as name,
    decrypt_phi(diagnosis_encrypted, encryption_key_id) as diagnosis
FROM patients
WHERE id = 1;

-- Create view for automatic decryption (for authorized users only)
CREATE VIEW patients_decrypted AS
SELECT 
    id,
    decrypt_phi(ssn_encrypted, encryption_key_id) as ssn,
    decrypt_phi(name_encrypted, encryption_key_id) as name,
    decrypt_phi(diagnosis_encrypted, encryption_key_id) as diagnosis,
    created_at,
    updated_at
FROM patients;

-- Grant access only to authorized roles
GRANT SELECT ON patients_decrypted TO hipaa_authorized_users;

MongoDB Client-Side Field-Level Encryption

from pymongo import MongoClient
from pymongo.encryption import ClientEncryption
from bson.codec_options import CodecOptions
from bson import STANDARD
import os

# MongoDB CSFLE configuration
def create_encrypted_client():
    """Create MongoDB client with client-side field-level encryption"""
    
    # KMS provider configuration (AWS KMS example)
    kms_providers = {
        "aws": {
            "accessKeyId": os.environ["AWS_ACCESS_KEY_ID"],
            "secretAccessKey": os.environ["AWS_SECRET_ACCESS_KEY"],
        }
    }
    
    # Key vault configuration
    key_vault_namespace = "encryption.__keyVault"
    
    # Schema map defining which fields to encrypt
    schema_map = {
        "healthcare.patients": {
            "bsonType": "object",
            "encryptMetadata": {
                "keyId": "/encryptionKeyId",  # Key per document
            },
            "properties": {
                "ssn": {
                    "encrypt": {
                        "bsonType": "string",
                        "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
                    }
                },
                "name": {
                    "encrypt": {
                        "bsonType": "string",
                        "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                    }
                },
                "medicalHistory": {
                    "encrypt": {
                        "bsonType": "array",
                        "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                    }
                },
                "diagnosis": {
                    "encrypt": {
                        "bsonType": "string",
                        "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                    }
                }
            }
        }
    }
    
    # Auto encryption options
    auto_encryption_opts = {
        "kms_providers": kms_providers,
        "key_vault_namespace": key_vault_namespace,
        "schema_map": schema_map,
    }
    
    # Create encrypted client
    client = MongoClient(
        os.environ["MONGODB_URI"],
        auto_encryption_opts=auto_encryption_opts
    )
    
    return client


def create_data_encryption_key(client_encryption, key_alt_name: str) -> bytes:
    """Create a new data encryption key in AWS KMS"""
    
    master_key = {
        "region": "us-east-1",
        "key": os.environ["AWS_KMS_KEY_ARN"],
    }
    
    data_key_id = client_encryption.create_data_key(
        "aws",
        master_key=master_key,
        key_alt_names=[key_alt_name]
    )
    
    return data_key_id


# Usage
client = create_encrypted_client()
db = client.healthcare

# Insert encrypted document (encryption happens automatically)
patient = {
    "patientId": "P12345",
    "ssn": "123-45-6789",  # Will be encrypted deterministically
    "name": "John Smith",  # Will be encrypted randomly
    "diagnosis": "Hypertension",  # Will be encrypted randomly
    "medicalHistory": [
        {"date": "2024-01-15", "procedure": "Annual checkup"}
    ],  # Will be encrypted randomly
    "createdAt": datetime.utcnow(),
}

db.patients.insert_one(patient)

# Query on deterministically encrypted field
# (Only works for deterministic encryption)
result = db.patients.find_one({"ssn": "123-45-6789"})
# Decryption happens automatically for authorized clients
print(result["name"])  # "John Smith"

Data in Transit Encryption

TLS handshake process for secure data in transit

TLS Encryption Handshake

TLS 1.3 Configuration

# Python/FastAPI with TLS 1.3
import ssl
import uvicorn
from fastapi import FastAPI

app = FastAPI()

def create_ssl_context():
    """Create secure SSL context for TLS 1.3"""
    
    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    
    # Minimum TLS 1.2, prefer 1.3
    context.minimum_version = ssl.TLSVersion.TLSv1_2
    context.maximum_version = ssl.TLSVersion.TLSv1_3
    
    # Load certificate and key
    context.load_cert_chain(
        certfile="/path/to/certificate.pem",
        keyfile="/path/to/private-key.pem"
    )
    
    # Strong cipher suites only
    context.set_ciphers(
        "TLS_AES_256_GCM_SHA384:"
        "TLS_CHACHA20_POLY1305_SHA256:"
        "TLS_AES_128_GCM_SHA256:"
        "ECDHE+AESGCM:"
        "DHE+AESGCM"
    )
    
    # Enable OCSP stapling
    # context.ocsp_stapling = True
    
    return context


if __name__ == "__main__":
    ssl_context = create_ssl_context()
    
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=443,
        ssl_keyfile="/path/to/private-key.pem",
        ssl_certfile="/path/to/certificate.pem",
        ssl_version=ssl.PROTOCOL_TLS_SERVER,
    )

NGINX TLS Configuration

# /etc/nginx/conf.d/hipaa-ssl.conf

# SSL configuration for HIPAA compliance
server {
    listen 443 ssl http2;
    server_name api.healthcare.example.com;
    
    # SSL Certificate
    ssl_certificate /etc/nginx/ssl/fullchain.pem;
    ssl_certificate_key /etc/nginx/ssl/privkey.pem;
    
    # TLS versions - TLS 1.2 and 1.3 only
    ssl_protocols TLSv1.2 TLSv1.3;
    
    # Strong cipher suites
    ssl_ciphers 'TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305';
    ssl_prefer_server_ciphers on;
    
    # SSL session configuration
    ssl_session_timeout 1d;
    ssl_session_cache shared:SSL:50m;
    ssl_session_tickets off;
    
    # OCSP Stapling
    ssl_stapling on;
    ssl_stapling_verify on;
    ssl_trusted_certificate /etc/nginx/ssl/chain.pem;
    resolver 8.8.8.8 8.8.4.4 valid=300s;
    resolver_timeout 5s;
    
    # Security headers
    add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
    add_header X-Content-Type-Options nosniff always;
    add_header X-Frame-Options DENY always;
    add_header X-XSS-Protection "1; mode=block" always;
    
    # Proxy to application
    location / {
        proxy_pass http://localhost:8000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}

# Redirect HTTP to HTTPS
server {
    listen 80;
    server_name api.healthcare.example.com;
    return 301 https://$server_name$request_uri;
}

Certificate Management with Let’s Encrypt

#!/bin/bash
# certificate-management.sh

# Install certbot
apt-get update
apt-get install -y certbot python3-certbot-nginx

# Obtain certificate
certbot certonly \
    --nginx \
    --non-interactive \
    --agree-tos \
    --email [email protected] \
    --domains api.healthcare.example.com \
    --rsa-key-size 4096

# Set up auto-renewal
echo "0 0 * * * root certbot renew --quiet --post-hook 'systemctl reload nginx'" \
    > /etc/cron.d/certbot-renewal

# Verify configuration
nginx -t && systemctl reload nginx

Key Management

HashiCorp Vault Integration

import hvac
from typing import Optional
import os

class VaultKeyManager:
    """
    Key management using HashiCorp Vault
    
    Features:
    - Automatic key rotation
    - Key versioning
    - Audit logging
    - Access control
    """
    
    def __init__(self):
        self.client = hvac.Client(
            url=os.environ["VAULT_ADDR"],
            token=os.environ["VAULT_TOKEN"],
        )
        
    def create_encryption_key(
        self,
        key_name: str,
        key_type: str = "aes256-gcm96"
    ) -> dict:
        """Create a new encryption key in Vault"""
        
        self.client.secrets.transit.create_key(
            name=key_name,
            key_type=key_type,
            exportable=False,  # Keys never leave Vault
            allow_plaintext_backup=False,
        )
        
        return self.get_key_info(key_name)
    
    def get_key_info(self, key_name: str) -> dict:
        """Get key metadata (not the key itself)"""
        
        key_info = self.client.secrets.transit.read_key(name=key_name)
        return {
            "name": key_name,
            "type": key_info["data"]["type"],
            "latest_version": key_info["data"]["latest_version"],
            "min_decryption_version": key_info["data"]["min_decryption_version"],
            "supports_encryption": key_info["data"]["supports_encryption"],
        }
    
    def encrypt(
        self,
        key_name: str,
        plaintext: bytes,
        context: Optional[bytes] = None
    ) -> str:
        """Encrypt data using Vault Transit"""
        
        import base64
        
        result = self.client.secrets.transit.encrypt_data(
            name=key_name,
            plaintext=base64.b64encode(plaintext).decode(),
            context=base64.b64encode(context).decode() if context else None,
        )
        
        return result["data"]["ciphertext"]
    
    def decrypt(
        self,
        key_name: str,
        ciphertext: str,
        context: Optional[bytes] = None
    ) -> bytes:
        """Decrypt data using Vault Transit"""
        
        import base64
        
        result = self.client.secrets.transit.decrypt_data(
            name=key_name,
            ciphertext=ciphertext,
            context=base64.b64encode(context).decode() if context else None,
        )
        
        return base64.b64decode(result["data"]["plaintext"])
    
    def rotate_key(self, key_name: str) -> dict:
        """Rotate encryption key (old data still decryptable)"""
        
        self.client.secrets.transit.rotate_key(name=key_name)
        return self.get_key_info(key_name)
    
    def rewrap_data(
        self,
        key_name: str,
        ciphertext: str,
        context: Optional[bytes] = None
    ) -> str:
        """
        Re-encrypt data with latest key version
        (without exposing plaintext)
        """
        
        import base64
        
        result = self.client.secrets.transit.rewrap_data(
            name=key_name,
            ciphertext=ciphertext,
            context=base64.b64encode(context).decode() if context else None,
        )
        
        return result["data"]["ciphertext"]


# Usage
vault = VaultKeyManager()

# Create a key for patient data
vault.create_encryption_key("patient-phi-key")

# Encrypt patient data
context = b"patient:P12345"  # Binds ciphertext to context
ciphertext = vault.encrypt(
    "patient-phi-key",
    b'{"diagnosis": "Hypertension"}',
    context=context
)

# Decrypt
plaintext = vault.decrypt("patient-phi-key", ciphertext, context=context)

# Rotate key periodically
vault.rotate_key("patient-phi-key")

# Re-wrap old data with new key version
new_ciphertext = vault.rewrap_data("patient-phi-key", ciphertext, context=context)

Key Rotation Strategy

class KeyRotationScheduler:
    """
    Automated key rotation for HIPAA compliance
    
    Best practices:
    - Rotate master keys annually
    - Rotate data keys based on usage
    - Never delete old key versions until all data re-wrapped
    """
    
    ROTATION_POLICIES = {
        "master_key": {
            "rotation_days": 365,
            "min_versions_to_keep": 3,
        },
        "data_key": {
            "rotation_days": 90,
            "rotation_on_usage": 1_000_000,  # Or after N encryptions
            "min_versions_to_keep": 5,
        },
        "session_key": {
            "rotation_days": 1,
            "min_versions_to_keep": 2,
        },
    }
    
    async def run_rotation_check(self):
        """Check and rotate keys as needed"""
        
        for key_name, policy in self.ROTATION_POLICIES.items():
            key_info = await self.vault.get_key_info(key_name)
            
            if self._should_rotate(key_info, policy):
                await self._rotate_and_rewrap(key_name, policy)
                
    def _should_rotate(self, key_info: dict, policy: dict) -> bool:
        """Determine if key should be rotated"""
        
        last_rotation = key_info.get("last_rotation_time")
        if not last_rotation:
            return False
            
        days_since_rotation = (datetime.utcnow() - last_rotation).days
        return days_since_rotation >= policy["rotation_days"]
    
    async def _rotate_and_rewrap(self, key_name: str, policy: dict):
        """Rotate key and re-wrap existing data"""
        
        # Rotate the key
        await self.vault.rotate_key(key_name)
        
        # Schedule background re-wrapping
        await self.job_queue.enqueue(
            "rewrap_all_data",
            {"key_name": key_name}
        )
        
        # Log rotation event
        await self.audit_logger.log({
            "event_type": "KEY_ROTATION",
            "key_name": key_name,
            "new_version": key_info["latest_version"] + 1,
        })

Key Takeaways

Encrypt Everything

All PHI must be encrypted at rest and in transit

Use Envelope Encryption

Separate data keys from master keys for security and performance

TLS 1.2+ Required

Never allow older TLS versions or weak ciphers

Manage Keys Properly

Use HSM or Vault; never store keys alongside data

Practice Exercise

1

Set Up Key Management

Deploy HashiCorp Vault or configure AWS KMS for your application.
2

Implement Envelope Encryption

Create an encryption service using the envelope encryption pattern.
3

Configure TLS

Set up TLS 1.3 with proper cipher suites for your API.
4

Encrypt Database

Implement field-level encryption for PHI in your database.
5

Test Key Rotation

Verify that key rotation works without data loss.

Next Steps