Skip to main content

Encryption: Data at Rest & In Transit

Encryption is the foundation of PHI protection. This module covers everything from basic cryptographic concepts to production-ready implementations for healthcare applications.
Learning Objectives:
  • Understand symmetric vs asymmetric encryption
  • Implement AES-256-GCM for data at rest
  • Configure TLS 1.3 for data in transit
  • Design key management systems
  • Implement field-level and database encryption

Why Encryption Matters for HIPAA

┌─────────────────────────────────────────────────────────────────────────────┐
│                    HIPAA ENCRYPTION REQUIREMENTS                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  SECURITY RULE §164.312(a)(2)(iv)        SECURITY RULE §164.312(e)(2)(ii)  │
│  ────────────────────────────────        ─────────────────────────────────  │
│  "Implement a mechanism to               "Implement a mechanism to          │
│   encrypt and decrypt ePHI"               encrypt ePHI whenever deemed      │
│                                           appropriate"                       │
│                                                                              │
│  DATA AT REST                            DATA IN TRANSIT                    │
│  ─────────────                           ───────────────                    │
│  • Database encryption                   • TLS 1.2+ required                │
│  • File system encryption                • Certificate management           │
│  • Backup encryption                     • VPN for internal traffic         │
│  • Key management                        • API security                     │
│                                                                              │
│  SAFE HARBOR PROVISION                                                      │
│  ─────────────────────                                                      │
│  If encrypted data is breached, it may NOT be considered a breach if:      │
│  • Encryption meets NIST standards                                          │
│  • Keys were not compromised                                                │
│  • Data was rendered "unusable, unreadable, or indecipherable"             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Cryptography Fundamentals

Symmetric vs Asymmetric Encryption

"""
Symmetric Encryption:
- Same key for encryption and decryption
- Fast, suitable for bulk data
- Key distribution is the challenge
- Examples: AES, ChaCha20

Asymmetric Encryption:
- Public key encrypts, private key decrypts
- Slower, suitable for small data
- Solves key distribution
- Examples: RSA, ECDSA, Ed25519
"""

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
import os

# Symmetric: AES-256-GCM
def symmetric_encrypt(plaintext: bytes, key: bytes) -> tuple[bytes, bytes]:
    """Encrypt using AES-256-GCM"""
    nonce = os.urandom(12)  # 96 bits for GCM
    aesgcm = AESGCM(key)
    ciphertext = aesgcm.encrypt(nonce, plaintext, None)
    return nonce, ciphertext

def symmetric_decrypt(nonce: bytes, ciphertext: bytes, key: bytes) -> bytes:
    """Decrypt using AES-256-GCM"""
    aesgcm = AESGCM(key)
    return aesgcm.decrypt(nonce, ciphertext, None)


# Asymmetric: RSA-OAEP
def asymmetric_encrypt(plaintext: bytes, public_key: rsa.RSAPublicKey) -> bytes:
    """Encrypt using RSA-OAEP (for small data like symmetric keys)"""
    return public_key.encrypt(
        plaintext,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )

def asymmetric_decrypt(ciphertext: bytes, private_key: rsa.RSAPrivateKey) -> bytes:
    """Decrypt using RSA-OAEP"""
    return private_key.decrypt(
        ciphertext,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )

Choosing the Right Algorithm

Use CaseAlgorithmKey SizeNotes
Data at restAES-256-GCM256 bitsHIPAA recommended
File encryptionAES-256-GCM256 bitsWith authenticated encryption
Key wrappingAES-256-KW256 bitsFor protecting DEKs
Key exchangeECDH P-256256 bitsFor TLS
Digital signaturesEd25519256 bitsFor audit logs
Password hashingArgon2id-For user passwords

Data at Rest Encryption

Envelope Encryption Pattern

Envelope encryption pattern showing Master Key, DEKs, and PHI data hierarchy
The envelope encryption pattern provides multiple layers of protection:
  • Master Key (KEK): Stored in HSM / AWS KMS / HashiCorp Vault
  • Data Encryption Keys (DEKs): Unique per record, stored encrypted
  • PHI Data: Encrypted with the DEK

Implementation with AWS KMS

import boto3
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import base64
import json
from dataclasses import dataclass
from typing import Optional

@dataclass
class EncryptedData:
    """Container for encrypted data with metadata"""
    ciphertext: bytes
    encrypted_dek: bytes
    nonce: bytes
    key_id: str
    algorithm: str = "AES-256-GCM"
    
    def to_dict(self) -> dict:
        return {
            "ciphertext": base64.b64encode(self.ciphertext).decode(),
            "encrypted_dek": base64.b64encode(self.encrypted_dek).decode(),
            "nonce": base64.b64encode(self.nonce).decode(),
            "key_id": self.key_id,
            "algorithm": self.algorithm,
        }
    
    @classmethod
    def from_dict(cls, data: dict) -> "EncryptedData":
        return cls(
            ciphertext=base64.b64decode(data["ciphertext"]),
            encrypted_dek=base64.b64decode(data["encrypted_dek"]),
            nonce=base64.b64decode(data["nonce"]),
            key_id=data["key_id"],
            algorithm=data.get("algorithm", "AES-256-GCM"),
        )


class EnvelopeEncryption:
    """
    HIPAA-compliant envelope encryption using AWS KMS
    """
    
    def __init__(self, kms_key_id: str, region: str = "us-east-1"):
        self.kms = boto3.client("kms", region_name=region)
        self.kms_key_id = kms_key_id
        
    def encrypt(
        self,
        plaintext: bytes,
        context: Optional[dict] = None
    ) -> EncryptedData:
        """
        Encrypt data using envelope encryption
        
        Args:
            plaintext: Data to encrypt
            context: Encryption context for additional AAD
        """
        # Generate a data encryption key from KMS
        dek_response = self.kms.generate_data_key(
            KeyId=self.kms_key_id,
            KeySpec="AES_256",
            EncryptionContext=context or {}
        )
        
        # Extract plaintext and encrypted DEK
        dek_plaintext = dek_response["Plaintext"]
        dek_encrypted = dek_response["CiphertextBlob"]
        
        # Encrypt the data with the DEK
        nonce = os.urandom(12)
        aesgcm = AESGCM(dek_plaintext)
        
        # Include context as AAD if provided
        aad = json.dumps(context).encode() if context else None
        ciphertext = aesgcm.encrypt(nonce, plaintext, aad)
        
        # Zero out the plaintext DEK
        dek_plaintext = b'\x00' * len(dek_plaintext)
        
        return EncryptedData(
            ciphertext=ciphertext,
            encrypted_dek=dek_encrypted,
            nonce=nonce,
            key_id=self.kms_key_id,
        )
    
    def decrypt(
        self,
        encrypted_data: EncryptedData,
        context: Optional[dict] = None
    ) -> bytes:
        """
        Decrypt data using envelope encryption
        """
        # Decrypt the DEK using KMS
        dek_response = self.kms.decrypt(
            CiphertextBlob=encrypted_data.encrypted_dek,
            EncryptionContext=context or {}
        )
        
        dek_plaintext = dek_response["Plaintext"]
        
        # Decrypt the data with the DEK
        aesgcm = AESGCM(dek_plaintext)
        aad = json.dumps(context).encode() if context else None
        plaintext = aesgcm.decrypt(
            encrypted_data.nonce,
            encrypted_data.ciphertext,
            aad
        )
        
        # Zero out the plaintext DEK
        dek_plaintext = b'\x00' * len(dek_plaintext)
        
        return plaintext


# Usage example
encryption = EnvelopeEncryption(kms_key_id="alias/hipaa-phi-key")

# Encrypt patient data
patient_data = json.dumps({
    "patient_id": "P12345",
    "diagnosis": "Type 2 Diabetes",
    "medications": ["Metformin 500mg"]
}).encode()

# Context binds encryption to specific patient
context = {"patient_id": "P12345", "record_type": "medical_history"}

encrypted = encryption.encrypt(patient_data, context)

# Store encrypted_data.to_dict() in database

# Later, decrypt
decrypted = encryption.decrypt(encrypted, context)

Field-Level Encryption

from typing import Any, Dict, List
from dataclasses import dataclass, field

@dataclass
class FieldEncryptionConfig:
    """Configuration for field-level encryption"""
    field_name: str
    encrypt: bool = True
    searchable: bool = False  # If true, use deterministic encryption
    indexed: bool = False

class FieldLevelEncryption:
    """
    Encrypt specific fields in documents
    
    Useful when you need to:
    - Search on some fields while encrypting others
    - Apply different encryption policies to different data
    - Minimize encryption overhead
    """
    
    def __init__(
        self,
        envelope_encryption: EnvelopeEncryption,
        encrypted_fields: List[FieldEncryptionConfig]
    ):
        self.encryption = envelope_encryption
        self.field_configs = {f.field_name: f for f in encrypted_fields}
        
    def encrypt_document(
        self,
        document: Dict[str, Any],
        context: dict
    ) -> Dict[str, Any]:
        """Encrypt specified fields in a document"""
        
        encrypted_doc = {}
        
        for key, value in document.items():
            if key in self.field_configs and self.field_configs[key].encrypt:
                config = self.field_configs[key]
                
                # Serialize value
                serialized = json.dumps(value).encode()
                
                # Encrypt
                if config.searchable:
                    # Deterministic encryption for searchable fields
                    encrypted = self._deterministic_encrypt(serialized, context)
                else:
                    # Randomized encryption for maximum security
                    encrypted = self.encryption.encrypt(serialized, context)
                
                encrypted_doc[key] = {
                    "__encrypted__": True,
                    "data": encrypted.to_dict(),
                }
            else:
                encrypted_doc[key] = value
                
        return encrypted_doc
    
    def decrypt_document(
        self,
        document: Dict[str, Any],
        context: dict
    ) -> Dict[str, Any]:
        """Decrypt specified fields in a document"""
        
        decrypted_doc = {}
        
        for key, value in document.items():
            if isinstance(value, dict) and value.get("__encrypted__"):
                encrypted_data = EncryptedData.from_dict(value["data"])
                decrypted_bytes = self.encryption.decrypt(encrypted_data, context)
                decrypted_doc[key] = json.loads(decrypted_bytes)
            else:
                decrypted_doc[key] = value
                
        return decrypted_doc
    
    def _deterministic_encrypt(
        self,
        plaintext: bytes,
        context: dict
    ) -> EncryptedData:
        """
        Deterministic encryption for searchable fields
        
        Warning: Less secure than randomized encryption.
        Use only when search is required.
        """
        # Derive a deterministic nonce from the plaintext and context
        import hashlib
        nonce_input = plaintext + json.dumps(context, sort_keys=True).encode()
        nonce = hashlib.sha256(nonce_input).digest()[:12]
        
        # Use a stable DEK for this context
        dek = self._get_deterministic_dek(context)
        
        aesgcm = AESGCM(dek)
        ciphertext = aesgcm.encrypt(nonce, plaintext, None)
        
        return EncryptedData(
            ciphertext=ciphertext,
            encrypted_dek=b"",  # DEK derived, not stored
            nonce=nonce,
            key_id="deterministic",
        )


# Usage
field_encryption = FieldLevelEncryption(
    envelope_encryption=encryption,
    encrypted_fields=[
        FieldEncryptionConfig("ssn", encrypt=True, searchable=False),
        FieldEncryptionConfig("name", encrypt=True, searchable=True),
        FieldEncryptionConfig("diagnosis", encrypt=True, searchable=False),
        FieldEncryptionConfig("patient_id", encrypt=False),  # Not encrypted
    ]
)

patient_record = {
    "patient_id": "P12345",
    "name": "John Smith",
    "ssn": "123-45-6789",
    "diagnosis": "Hypertension",
    "created_at": "2024-01-15",
}

encrypted_record = field_encryption.encrypt_document(
    patient_record,
    context={"patient_id": "P12345"}
)

Database Encryption

PostgreSQL Transparent Data Encryption (TDE)

-- PostgreSQL with pgcrypto for column-level encryption

-- Enable pgcrypto extension
CREATE EXTENSION IF NOT EXISTS pgcrypto;

-- Create encrypted column function
CREATE OR REPLACE FUNCTION encrypt_phi(
    data TEXT,
    key_id TEXT
) RETURNS BYTEA AS $$
DECLARE
    encryption_key BYTEA;
BEGIN
    -- Get key from secure key storage (simplified example)
    -- In production, use AWS KMS, HashiCorp Vault, etc.
    encryption_key := get_encryption_key(key_id);
    
    RETURN pgp_sym_encrypt(
        data,
        encode(encryption_key, 'base64'),
        'cipher-algo=aes256, compress-algo=0'
    );
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

CREATE OR REPLACE FUNCTION decrypt_phi(
    encrypted_data BYTEA,
    key_id TEXT
) RETURNS TEXT AS $$
DECLARE
    encryption_key BYTEA;
BEGIN
    encryption_key := get_encryption_key(key_id);
    
    RETURN pgp_sym_decrypt(
        encrypted_data,
        encode(encryption_key, 'base64')
    );
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- Create patients table with encrypted columns
CREATE TABLE patients (
    id SERIAL PRIMARY KEY,
    -- Encrypted columns
    ssn_encrypted BYTEA,
    name_encrypted BYTEA,
    diagnosis_encrypted BYTEA,
    -- Metadata (not encrypted)
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    -- Key reference
    encryption_key_id TEXT NOT NULL
);

-- Insert with encryption
INSERT INTO patients (
    ssn_encrypted,
    name_encrypted,
    diagnosis_encrypted,
    encryption_key_id
) VALUES (
    encrypt_phi('123-45-6789', 'key_v1'),
    encrypt_phi('John Smith', 'key_v1'),
    encrypt_phi('Type 2 Diabetes', 'key_v1'),
    'key_v1'
);

-- Select with decryption
SELECT 
    id,
    decrypt_phi(ssn_encrypted, encryption_key_id) as ssn,
    decrypt_phi(name_encrypted, encryption_key_id) as name,
    decrypt_phi(diagnosis_encrypted, encryption_key_id) as diagnosis
FROM patients
WHERE id = 1;

-- Create view for automatic decryption (for authorized users only)
CREATE VIEW patients_decrypted AS
SELECT 
    id,
    decrypt_phi(ssn_encrypted, encryption_key_id) as ssn,
    decrypt_phi(name_encrypted, encryption_key_id) as name,
    decrypt_phi(diagnosis_encrypted, encryption_key_id) as diagnosis,
    created_at,
    updated_at
FROM patients;

-- Grant access only to authorized roles
GRANT SELECT ON patients_decrypted TO hipaa_authorized_users;

MongoDB Client-Side Field-Level Encryption

from pymongo import MongoClient
from pymongo.encryption import ClientEncryption
from bson.codec_options import CodecOptions
from bson import STANDARD
import os

# MongoDB CSFLE configuration
def create_encrypted_client():
    """Create MongoDB client with client-side field-level encryption"""
    
    # KMS provider configuration (AWS KMS example)
    kms_providers = {
        "aws": {
            "accessKeyId": os.environ["AWS_ACCESS_KEY_ID"],
            "secretAccessKey": os.environ["AWS_SECRET_ACCESS_KEY"],
        }
    }
    
    # Key vault configuration
    key_vault_namespace = "encryption.__keyVault"
    
    # Schema map defining which fields to encrypt
    schema_map = {
        "healthcare.patients": {
            "bsonType": "object",
            "encryptMetadata": {
                "keyId": "/encryptionKeyId",  # Key per document
            },
            "properties": {
                "ssn": {
                    "encrypt": {
                        "bsonType": "string",
                        "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
                    }
                },
                "name": {
                    "encrypt": {
                        "bsonType": "string",
                        "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                    }
                },
                "medicalHistory": {
                    "encrypt": {
                        "bsonType": "array",
                        "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                    }
                },
                "diagnosis": {
                    "encrypt": {
                        "bsonType": "string",
                        "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                    }
                }
            }
        }
    }
    
    # Auto encryption options
    auto_encryption_opts = {
        "kms_providers": kms_providers,
        "key_vault_namespace": key_vault_namespace,
        "schema_map": schema_map,
    }
    
    # Create encrypted client
    client = MongoClient(
        os.environ["MONGODB_URI"],
        auto_encryption_opts=auto_encryption_opts
    )
    
    return client


def create_data_encryption_key(client_encryption, key_alt_name: str) -> bytes:
    """Create a new data encryption key in AWS KMS"""
    
    master_key = {
        "region": "us-east-1",
        "key": os.environ["AWS_KMS_KEY_ARN"],
    }
    
    data_key_id = client_encryption.create_data_key(
        "aws",
        master_key=master_key,
        key_alt_names=[key_alt_name]
    )
    
    return data_key_id


# Usage
client = create_encrypted_client()
db = client.healthcare

# Insert encrypted document (encryption happens automatically)
patient = {
    "patientId": "P12345",
    "ssn": "123-45-6789",  # Will be encrypted deterministically
    "name": "John Smith",  # Will be encrypted randomly
    "diagnosis": "Hypertension",  # Will be encrypted randomly
    "medicalHistory": [
        {"date": "2024-01-15", "procedure": "Annual checkup"}
    ],  # Will be encrypted randomly
    "createdAt": datetime.utcnow(),
}

db.patients.insert_one(patient)

# Query on deterministically encrypted field
# (Only works for deterministic encryption)
result = db.patients.find_one({"ssn": "123-45-6789"})
# Decryption happens automatically for authorized clients
print(result["name"])  # "John Smith"

Data in Transit Encryption

TLS handshake process for secure data in transit

TLS 1.3 Configuration

# Python/FastAPI with TLS 1.3
import ssl
import uvicorn
from fastapi import FastAPI

app = FastAPI()

def create_ssl_context():
    """Create secure SSL context for TLS 1.3"""
    
    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    
    # Minimum TLS 1.2, prefer 1.3
    context.minimum_version = ssl.TLSVersion.TLSv1_2
    context.maximum_version = ssl.TLSVersion.TLSv1_3
    
    # Load certificate and key
    context.load_cert_chain(
        certfile="/path/to/certificate.pem",
        keyfile="/path/to/private-key.pem"
    )
    
    # Strong cipher suites only
    context.set_ciphers(
        "TLS_AES_256_GCM_SHA384:"
        "TLS_CHACHA20_POLY1305_SHA256:"
        "TLS_AES_128_GCM_SHA256:"
        "ECDHE+AESGCM:"
        "DHE+AESGCM"
    )
    
    # Enable OCSP stapling
    # context.ocsp_stapling = True
    
    return context


if __name__ == "__main__":
    ssl_context = create_ssl_context()
    
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=443,
        ssl_keyfile="/path/to/private-key.pem",
        ssl_certfile="/path/to/certificate.pem",
        ssl_version=ssl.PROTOCOL_TLS_SERVER,
    )

NGINX TLS Configuration

# /etc/nginx/conf.d/hipaa-ssl.conf

# SSL configuration for HIPAA compliance
server {
    listen 443 ssl http2;
    server_name api.healthcare.example.com;
    
    # SSL Certificate
    ssl_certificate /etc/nginx/ssl/fullchain.pem;
    ssl_certificate_key /etc/nginx/ssl/privkey.pem;
    
    # TLS versions - TLS 1.2 and 1.3 only
    ssl_protocols TLSv1.2 TLSv1.3;
    
    # Strong cipher suites
    ssl_ciphers 'TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305';
    ssl_prefer_server_ciphers on;
    
    # SSL session configuration
    ssl_session_timeout 1d;
    ssl_session_cache shared:SSL:50m;
    ssl_session_tickets off;
    
    # OCSP Stapling
    ssl_stapling on;
    ssl_stapling_verify on;
    ssl_trusted_certificate /etc/nginx/ssl/chain.pem;
    resolver 8.8.8.8 8.8.4.4 valid=300s;
    resolver_timeout 5s;
    
    # Security headers
    add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
    add_header X-Content-Type-Options nosniff always;
    add_header X-Frame-Options DENY always;
    add_header X-XSS-Protection "1; mode=block" always;
    
    # Proxy to application
    location / {
        proxy_pass http://localhost:8000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}

# Redirect HTTP to HTTPS
server {
    listen 80;
    server_name api.healthcare.example.com;
    return 301 https://$server_name$request_uri;
}

Certificate Management with Let’s Encrypt

#!/bin/bash
# certificate-management.sh

# Install certbot
apt-get update
apt-get install -y certbot python3-certbot-nginx

# Obtain certificate
certbot certonly \
    --nginx \
    --non-interactive \
    --agree-tos \
    --email security@healthcare.example.com \
    --domains api.healthcare.example.com \
    --rsa-key-size 4096

# Set up auto-renewal
echo "0 0 * * * root certbot renew --quiet --post-hook 'systemctl reload nginx'" \
    > /etc/cron.d/certbot-renewal

# Verify configuration
nginx -t && systemctl reload nginx

Key Management

HashiCorp Vault Integration

import hvac
from typing import Optional
import os

class VaultKeyManager:
    """
    Key management using HashiCorp Vault
    
    Features:
    - Automatic key rotation
    - Key versioning
    - Audit logging
    - Access control
    """
    
    def __init__(self):
        self.client = hvac.Client(
            url=os.environ["VAULT_ADDR"],
            token=os.environ["VAULT_TOKEN"],
        )
        
    def create_encryption_key(
        self,
        key_name: str,
        key_type: str = "aes256-gcm96"
    ) -> dict:
        """Create a new encryption key in Vault"""
        
        self.client.secrets.transit.create_key(
            name=key_name,
            key_type=key_type,
            exportable=False,  # Keys never leave Vault
            allow_plaintext_backup=False,
        )
        
        return self.get_key_info(key_name)
    
    def get_key_info(self, key_name: str) -> dict:
        """Get key metadata (not the key itself)"""
        
        key_info = self.client.secrets.transit.read_key(name=key_name)
        return {
            "name": key_name,
            "type": key_info["data"]["type"],
            "latest_version": key_info["data"]["latest_version"],
            "min_decryption_version": key_info["data"]["min_decryption_version"],
            "supports_encryption": key_info["data"]["supports_encryption"],
        }
    
    def encrypt(
        self,
        key_name: str,
        plaintext: bytes,
        context: Optional[bytes] = None
    ) -> str:
        """Encrypt data using Vault Transit"""
        
        import base64
        
        result = self.client.secrets.transit.encrypt_data(
            name=key_name,
            plaintext=base64.b64encode(plaintext).decode(),
            context=base64.b64encode(context).decode() if context else None,
        )
        
        return result["data"]["ciphertext"]
    
    def decrypt(
        self,
        key_name: str,
        ciphertext: str,
        context: Optional[bytes] = None
    ) -> bytes:
        """Decrypt data using Vault Transit"""
        
        import base64
        
        result = self.client.secrets.transit.decrypt_data(
            name=key_name,
            ciphertext=ciphertext,
            context=base64.b64encode(context).decode() if context else None,
        )
        
        return base64.b64decode(result["data"]["plaintext"])
    
    def rotate_key(self, key_name: str) -> dict:
        """Rotate encryption key (old data still decryptable)"""
        
        self.client.secrets.transit.rotate_key(name=key_name)
        return self.get_key_info(key_name)
    
    def rewrap_data(
        self,
        key_name: str,
        ciphertext: str,
        context: Optional[bytes] = None
    ) -> str:
        """
        Re-encrypt data with latest key version
        (without exposing plaintext)
        """
        
        import base64
        
        result = self.client.secrets.transit.rewrap_data(
            name=key_name,
            ciphertext=ciphertext,
            context=base64.b64encode(context).decode() if context else None,
        )
        
        return result["data"]["ciphertext"]


# Usage
vault = VaultKeyManager()

# Create a key for patient data
vault.create_encryption_key("patient-phi-key")

# Encrypt patient data
context = b"patient:P12345"  # Binds ciphertext to context
ciphertext = vault.encrypt(
    "patient-phi-key",
    b'{"diagnosis": "Hypertension"}',
    context=context
)

# Decrypt
plaintext = vault.decrypt("patient-phi-key", ciphertext, context=context)

# Rotate key periodically
vault.rotate_key("patient-phi-key")

# Re-wrap old data with new key version
new_ciphertext = vault.rewrap_data("patient-phi-key", ciphertext, context=context)

Key Rotation Strategy

class KeyRotationScheduler:
    """
    Automated key rotation for HIPAA compliance
    
    Best practices:
    - Rotate master keys annually
    - Rotate data keys based on usage
    - Never delete old key versions until all data re-wrapped
    """
    
    ROTATION_POLICIES = {
        "master_key": {
            "rotation_days": 365,
            "min_versions_to_keep": 3,
        },
        "data_key": {
            "rotation_days": 90,
            "rotation_on_usage": 1_000_000,  # Or after N encryptions
            "min_versions_to_keep": 5,
        },
        "session_key": {
            "rotation_days": 1,
            "min_versions_to_keep": 2,
        },
    }
    
    async def run_rotation_check(self):
        """Check and rotate keys as needed"""
        
        for key_name, policy in self.ROTATION_POLICIES.items():
            key_info = await self.vault.get_key_info(key_name)
            
            if self._should_rotate(key_info, policy):
                await self._rotate_and_rewrap(key_name, policy)
                
    def _should_rotate(self, key_info: dict, policy: dict) -> bool:
        """Determine if key should be rotated"""
        
        last_rotation = key_info.get("last_rotation_time")
        if not last_rotation:
            return False
            
        days_since_rotation = (datetime.utcnow() - last_rotation).days
        return days_since_rotation >= policy["rotation_days"]
    
    async def _rotate_and_rewrap(self, key_name: str, policy: dict):
        """Rotate key and re-wrap existing data"""
        
        # Rotate the key
        await self.vault.rotate_key(key_name)
        
        # Schedule background re-wrapping
        await self.job_queue.enqueue(
            "rewrap_all_data",
            {"key_name": key_name}
        )
        
        # Log rotation event
        await self.audit_logger.log({
            "event_type": "KEY_ROTATION",
            "key_name": key_name,
            "new_version": key_info["latest_version"] + 1,
        })

Key Takeaways

Encrypt Everything

All PHI must be encrypted at rest and in transit

Use Envelope Encryption

Separate data keys from master keys for security and performance

TLS 1.2+ Required

Never allow older TLS versions or weak ciphers

Manage Keys Properly

Use HSM or Vault; never store keys alongside data

Practice Exercise

1

Set Up Key Management

Deploy HashiCorp Vault or configure AWS KMS for your application.
2

Implement Envelope Encryption

Create an encryption service using the envelope encryption pattern.
3

Configure TLS

Set up TLS 1.3 with proper cipher suites for your API.
4

Encrypt Database

Implement field-level encryption for PHI in your database.
5

Test Key Rotation

Verify that key rotation works without data loss.

Next Steps