Encryption: Data at Rest & In Transit
Encryption is the foundation of PHI protection. This module covers everything from basic cryptographic concepts to production-ready implementations for healthcare applications.Learning Objectives:
- Understand symmetric vs asymmetric encryption
- Implement AES-256-GCM for data at rest
- Configure TLS 1.3 for data in transit
- Design key management systems
- Implement field-level and database encryption
Why Encryption Matters for HIPAA
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ HIPAA ENCRYPTION REQUIREMENTS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ SECURITY RULE §164.312(a)(2)(iv) SECURITY RULE §164.312(e)(2)(ii) │
│ ──────────────────────────────── ───────────────────────────────── │
│ "Implement a mechanism to "Implement a mechanism to │
│ encrypt and decrypt ePHI" encrypt ePHI whenever deemed │
│ appropriate" │
│ │
│ DATA AT REST DATA IN TRANSIT │
│ ───────────── ─────────────── │
│ • Database encryption • TLS 1.2+ required │
│ • File system encryption • Certificate management │
│ • Backup encryption • VPN for internal traffic │
│ • Key management • API security │
│ │
│ SAFE HARBOR PROVISION │
│ ───────────────────── │
│ If encrypted data is breached, it may NOT be considered a breach if: │
│ • Encryption meets NIST standards │
│ • Keys were not compromised │
│ • Data was rendered "unusable, unreadable, or indecipherable" │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Cryptography Fundamentals
Symmetric vs Asymmetric Encryption
Copy
"""
Symmetric Encryption:
- Same key for encryption and decryption
- Fast, suitable for bulk data
- Key distribution is the challenge
- Examples: AES, ChaCha20
Asymmetric Encryption:
- Public key encrypts, private key decrypts
- Slower, suitable for small data
- Solves key distribution
- Examples: RSA, ECDSA, Ed25519
"""
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
import os
# Symmetric: AES-256-GCM
def symmetric_encrypt(plaintext: bytes, key: bytes) -> tuple[bytes, bytes]:
"""Encrypt using AES-256-GCM"""
nonce = os.urandom(12) # 96 bits for GCM
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
return nonce, ciphertext
def symmetric_decrypt(nonce: bytes, ciphertext: bytes, key: bytes) -> bytes:
"""Decrypt using AES-256-GCM"""
aesgcm = AESGCM(key)
return aesgcm.decrypt(nonce, ciphertext, None)
# Asymmetric: RSA-OAEP
def asymmetric_encrypt(plaintext: bytes, public_key: rsa.RSAPublicKey) -> bytes:
"""Encrypt using RSA-OAEP (for small data like symmetric keys)"""
return public_key.encrypt(
plaintext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
def asymmetric_decrypt(ciphertext: bytes, private_key: rsa.RSAPrivateKey) -> bytes:
"""Decrypt using RSA-OAEP"""
return private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
Choosing the Right Algorithm
| Use Case | Algorithm | Key Size | Notes |
|---|---|---|---|
| Data at rest | AES-256-GCM | 256 bits | HIPAA recommended |
| File encryption | AES-256-GCM | 256 bits | With authenticated encryption |
| Key wrapping | AES-256-KW | 256 bits | For protecting DEKs |
| Key exchange | ECDH P-256 | 256 bits | For TLS |
| Digital signatures | Ed25519 | 256 bits | For audit logs |
| Password hashing | Argon2id | - | For user passwords |
Data at Rest Encryption
Envelope Encryption Pattern
Envelope Encryption Key Hierarchy
- Master Key (KEK): Stored in HSM / AWS KMS / HashiCorp Vault
- Data Encryption Keys (DEKs): Unique per record, stored encrypted
- PHI Data: Encrypted with the DEK
Implementation with AWS KMS
Copy
import boto3
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import base64
import json
from dataclasses import dataclass
from typing import Optional
@dataclass
class EncryptedData:
"""Container for encrypted data with metadata"""
ciphertext: bytes
encrypted_dek: bytes
nonce: bytes
key_id: str
algorithm: str = "AES-256-GCM"
def to_dict(self) -> dict:
return {
"ciphertext": base64.b64encode(self.ciphertext).decode(),
"encrypted_dek": base64.b64encode(self.encrypted_dek).decode(),
"nonce": base64.b64encode(self.nonce).decode(),
"key_id": self.key_id,
"algorithm": self.algorithm,
}
@classmethod
def from_dict(cls, data: dict) -> "EncryptedData":
return cls(
ciphertext=base64.b64decode(data["ciphertext"]),
encrypted_dek=base64.b64decode(data["encrypted_dek"]),
nonce=base64.b64decode(data["nonce"]),
key_id=data["key_id"],
algorithm=data.get("algorithm", "AES-256-GCM"),
)
class EnvelopeEncryption:
"""
HIPAA-compliant envelope encryption using AWS KMS
"""
def __init__(self, kms_key_id: str, region: str = "us-east-1"):
self.kms = boto3.client("kms", region_name=region)
self.kms_key_id = kms_key_id
def encrypt(
self,
plaintext: bytes,
context: Optional[dict] = None
) -> EncryptedData:
"""
Encrypt data using envelope encryption
Args:
plaintext: Data to encrypt
context: Encryption context for additional AAD
"""
# Generate a data encryption key from KMS
dek_response = self.kms.generate_data_key(
KeyId=self.kms_key_id,
KeySpec="AES_256",
EncryptionContext=context or {}
)
# Extract plaintext and encrypted DEK
dek_plaintext = dek_response["Plaintext"]
dek_encrypted = dek_response["CiphertextBlob"]
# Encrypt the data with the DEK
nonce = os.urandom(12)
aesgcm = AESGCM(dek_plaintext)
# Include context as AAD if provided
aad = json.dumps(context).encode() if context else None
ciphertext = aesgcm.encrypt(nonce, plaintext, aad)
# Zero out the plaintext DEK
dek_plaintext = b'\x00' * len(dek_plaintext)
return EncryptedData(
ciphertext=ciphertext,
encrypted_dek=dek_encrypted,
nonce=nonce,
key_id=self.kms_key_id,
)
def decrypt(
self,
encrypted_data: EncryptedData,
context: Optional[dict] = None
) -> bytes:
"""
Decrypt data using envelope encryption
"""
# Decrypt the DEK using KMS
dek_response = self.kms.decrypt(
CiphertextBlob=encrypted_data.encrypted_dek,
EncryptionContext=context or {}
)
dek_plaintext = dek_response["Plaintext"]
# Decrypt the data with the DEK
aesgcm = AESGCM(dek_plaintext)
aad = json.dumps(context).encode() if context else None
plaintext = aesgcm.decrypt(
encrypted_data.nonce,
encrypted_data.ciphertext,
aad
)
# Zero out the plaintext DEK
dek_plaintext = b'\x00' * len(dek_plaintext)
return plaintext
# Usage example
encryption = EnvelopeEncryption(kms_key_id="alias/hipaa-phi-key")
# Encrypt patient data
patient_data = json.dumps({
"patient_id": "P12345",
"diagnosis": "Type 2 Diabetes",
"medications": ["Metformin 500mg"]
}).encode()
# Context binds encryption to specific patient
context = {"patient_id": "P12345", "record_type": "medical_history"}
encrypted = encryption.encrypt(patient_data, context)
# Store encrypted_data.to_dict() in database
# Later, decrypt
decrypted = encryption.decrypt(encrypted, context)
Field-Level Encryption
Copy
from typing import Any, Dict, List
from dataclasses import dataclass, field
@dataclass
class FieldEncryptionConfig:
"""Configuration for field-level encryption"""
field_name: str
encrypt: bool = True
searchable: bool = False # If true, use deterministic encryption
indexed: bool = False
class FieldLevelEncryption:
"""
Encrypt specific fields in documents
Useful when you need to:
- Search on some fields while encrypting others
- Apply different encryption policies to different data
- Minimize encryption overhead
"""
def __init__(
self,
envelope_encryption: EnvelopeEncryption,
encrypted_fields: List[FieldEncryptionConfig]
):
self.encryption = envelope_encryption
self.field_configs = {f.field_name: f for f in encrypted_fields}
def encrypt_document(
self,
document: Dict[str, Any],
context: dict
) -> Dict[str, Any]:
"""Encrypt specified fields in a document"""
encrypted_doc = {}
for key, value in document.items():
if key in self.field_configs and self.field_configs[key].encrypt:
config = self.field_configs[key]
# Serialize value
serialized = json.dumps(value).encode()
# Encrypt
if config.searchable:
# Deterministic encryption for searchable fields
encrypted = self._deterministic_encrypt(serialized, context)
else:
# Randomized encryption for maximum security
encrypted = self.encryption.encrypt(serialized, context)
encrypted_doc[key] = {
"__encrypted__": True,
"data": encrypted.to_dict(),
}
else:
encrypted_doc[key] = value
return encrypted_doc
def decrypt_document(
self,
document: Dict[str, Any],
context: dict
) -> Dict[str, Any]:
"""Decrypt specified fields in a document"""
decrypted_doc = {}
for key, value in document.items():
if isinstance(value, dict) and value.get("__encrypted__"):
encrypted_data = EncryptedData.from_dict(value["data"])
decrypted_bytes = self.encryption.decrypt(encrypted_data, context)
decrypted_doc[key] = json.loads(decrypted_bytes)
else:
decrypted_doc[key] = value
return decrypted_doc
def _deterministic_encrypt(
self,
plaintext: bytes,
context: dict
) -> EncryptedData:
"""
Deterministic encryption for searchable fields
Warning: Less secure than randomized encryption.
Use only when search is required.
"""
# Derive a deterministic nonce from the plaintext and context
import hashlib
nonce_input = plaintext + json.dumps(context, sort_keys=True).encode()
nonce = hashlib.sha256(nonce_input).digest()[:12]
# Use a stable DEK for this context
dek = self._get_deterministic_dek(context)
aesgcm = AESGCM(dek)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
return EncryptedData(
ciphertext=ciphertext,
encrypted_dek=b"", # DEK derived, not stored
nonce=nonce,
key_id="deterministic",
)
# Usage
field_encryption = FieldLevelEncryption(
envelope_encryption=encryption,
encrypted_fields=[
FieldEncryptionConfig("ssn", encrypt=True, searchable=False),
FieldEncryptionConfig("name", encrypt=True, searchable=True),
FieldEncryptionConfig("diagnosis", encrypt=True, searchable=False),
FieldEncryptionConfig("patient_id", encrypt=False), # Not encrypted
]
)
patient_record = {
"patient_id": "P12345",
"name": "John Smith",
"ssn": "123-45-6789",
"diagnosis": "Hypertension",
"created_at": "2024-01-15",
}
encrypted_record = field_encryption.encrypt_document(
patient_record,
context={"patient_id": "P12345"}
)
Database Encryption
PostgreSQL Transparent Data Encryption (TDE)
Copy
-- PostgreSQL with pgcrypto for column-level encryption
-- Enable pgcrypto extension
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Create encrypted column function
CREATE OR REPLACE FUNCTION encrypt_phi(
data TEXT,
key_id TEXT
) RETURNS BYTEA AS $$
DECLARE
encryption_key BYTEA;
BEGIN
-- Get key from secure key storage (simplified example)
-- In production, use AWS KMS, HashiCorp Vault, etc.
encryption_key := get_encryption_key(key_id);
RETURN pgp_sym_encrypt(
data,
encode(encryption_key, 'base64'),
'cipher-algo=aes256, compress-algo=0'
);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
CREATE OR REPLACE FUNCTION decrypt_phi(
encrypted_data BYTEA,
key_id TEXT
) RETURNS TEXT AS $$
DECLARE
encryption_key BYTEA;
BEGIN
encryption_key := get_encryption_key(key_id);
RETURN pgp_sym_decrypt(
encrypted_data,
encode(encryption_key, 'base64')
);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- Create patients table with encrypted columns
CREATE TABLE patients (
id SERIAL PRIMARY KEY,
-- Encrypted columns
ssn_encrypted BYTEA,
name_encrypted BYTEA,
diagnosis_encrypted BYTEA,
-- Metadata (not encrypted)
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
-- Key reference
encryption_key_id TEXT NOT NULL
);
-- Insert with encryption
INSERT INTO patients (
ssn_encrypted,
name_encrypted,
diagnosis_encrypted,
encryption_key_id
) VALUES (
encrypt_phi('123-45-6789', 'key_v1'),
encrypt_phi('John Smith', 'key_v1'),
encrypt_phi('Type 2 Diabetes', 'key_v1'),
'key_v1'
);
-- Select with decryption
SELECT
id,
decrypt_phi(ssn_encrypted, encryption_key_id) as ssn,
decrypt_phi(name_encrypted, encryption_key_id) as name,
decrypt_phi(diagnosis_encrypted, encryption_key_id) as diagnosis
FROM patients
WHERE id = 1;
-- Create view for automatic decryption (for authorized users only)
CREATE VIEW patients_decrypted AS
SELECT
id,
decrypt_phi(ssn_encrypted, encryption_key_id) as ssn,
decrypt_phi(name_encrypted, encryption_key_id) as name,
decrypt_phi(diagnosis_encrypted, encryption_key_id) as diagnosis,
created_at,
updated_at
FROM patients;
-- Grant access only to authorized roles
GRANT SELECT ON patients_decrypted TO hipaa_authorized_users;
MongoDB Client-Side Field-Level Encryption
Copy
from pymongo import MongoClient
from pymongo.encryption import ClientEncryption
from bson.codec_options import CodecOptions
from bson import STANDARD
import os
# MongoDB CSFLE configuration
def create_encrypted_client():
"""Create MongoDB client with client-side field-level encryption"""
# KMS provider configuration (AWS KMS example)
kms_providers = {
"aws": {
"accessKeyId": os.environ["AWS_ACCESS_KEY_ID"],
"secretAccessKey": os.environ["AWS_SECRET_ACCESS_KEY"],
}
}
# Key vault configuration
key_vault_namespace = "encryption.__keyVault"
# Schema map defining which fields to encrypt
schema_map = {
"healthcare.patients": {
"bsonType": "object",
"encryptMetadata": {
"keyId": "/encryptionKeyId", # Key per document
},
"properties": {
"ssn": {
"encrypt": {
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
}
},
"name": {
"encrypt": {
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
},
"medicalHistory": {
"encrypt": {
"bsonType": "array",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
},
"diagnosis": {
"encrypt": {
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
}
}
}
}
# Auto encryption options
auto_encryption_opts = {
"kms_providers": kms_providers,
"key_vault_namespace": key_vault_namespace,
"schema_map": schema_map,
}
# Create encrypted client
client = MongoClient(
os.environ["MONGODB_URI"],
auto_encryption_opts=auto_encryption_opts
)
return client
def create_data_encryption_key(client_encryption, key_alt_name: str) -> bytes:
"""Create a new data encryption key in AWS KMS"""
master_key = {
"region": "us-east-1",
"key": os.environ["AWS_KMS_KEY_ARN"],
}
data_key_id = client_encryption.create_data_key(
"aws",
master_key=master_key,
key_alt_names=[key_alt_name]
)
return data_key_id
# Usage
client = create_encrypted_client()
db = client.healthcare
# Insert encrypted document (encryption happens automatically)
patient = {
"patientId": "P12345",
"ssn": "123-45-6789", # Will be encrypted deterministically
"name": "John Smith", # Will be encrypted randomly
"diagnosis": "Hypertension", # Will be encrypted randomly
"medicalHistory": [
{"date": "2024-01-15", "procedure": "Annual checkup"}
], # Will be encrypted randomly
"createdAt": datetime.utcnow(),
}
db.patients.insert_one(patient)
# Query on deterministically encrypted field
# (Only works for deterministic encryption)
result = db.patients.find_one({"ssn": "123-45-6789"})
# Decryption happens automatically for authorized clients
print(result["name"]) # "John Smith"
Data in Transit Encryption
TLS Encryption Handshake
TLS 1.3 Configuration
Copy
# Python/FastAPI with TLS 1.3
import ssl
import uvicorn
from fastapi import FastAPI
app = FastAPI()
def create_ssl_context():
"""Create secure SSL context for TLS 1.3"""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# Minimum TLS 1.2, prefer 1.3
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.maximum_version = ssl.TLSVersion.TLSv1_3
# Load certificate and key
context.load_cert_chain(
certfile="/path/to/certificate.pem",
keyfile="/path/to/private-key.pem"
)
# Strong cipher suites only
context.set_ciphers(
"TLS_AES_256_GCM_SHA384:"
"TLS_CHACHA20_POLY1305_SHA256:"
"TLS_AES_128_GCM_SHA256:"
"ECDHE+AESGCM:"
"DHE+AESGCM"
)
# Enable OCSP stapling
# context.ocsp_stapling = True
return context
if __name__ == "__main__":
ssl_context = create_ssl_context()
uvicorn.run(
app,
host="0.0.0.0",
port=443,
ssl_keyfile="/path/to/private-key.pem",
ssl_certfile="/path/to/certificate.pem",
ssl_version=ssl.PROTOCOL_TLS_SERVER,
)
NGINX TLS Configuration
Copy
# /etc/nginx/conf.d/hipaa-ssl.conf
# SSL configuration for HIPAA compliance
server {
listen 443 ssl http2;
server_name api.healthcare.example.com;
# SSL Certificate
ssl_certificate /etc/nginx/ssl/fullchain.pem;
ssl_certificate_key /etc/nginx/ssl/privkey.pem;
# TLS versions - TLS 1.2 and 1.3 only
ssl_protocols TLSv1.2 TLSv1.3;
# Strong cipher suites
ssl_ciphers 'TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305';
ssl_prefer_server_ciphers on;
# SSL session configuration
ssl_session_timeout 1d;
ssl_session_cache shared:SSL:50m;
ssl_session_tickets off;
# OCSP Stapling
ssl_stapling on;
ssl_stapling_verify on;
ssl_trusted_certificate /etc/nginx/ssl/chain.pem;
resolver 8.8.8.8 8.8.4.4 valid=300s;
resolver_timeout 5s;
# Security headers
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
add_header X-Content-Type-Options nosniff always;
add_header X-Frame-Options DENY always;
add_header X-XSS-Protection "1; mode=block" always;
# Proxy to application
location / {
proxy_pass http://localhost:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
# Redirect HTTP to HTTPS
server {
listen 80;
server_name api.healthcare.example.com;
return 301 https://$server_name$request_uri;
}
Certificate Management with Let’s Encrypt
Copy
#!/bin/bash
# certificate-management.sh
# Install certbot
apt-get update
apt-get install -y certbot python3-certbot-nginx
# Obtain certificate
certbot certonly \
--nginx \
--non-interactive \
--agree-tos \
--email [email protected] \
--domains api.healthcare.example.com \
--rsa-key-size 4096
# Set up auto-renewal
echo "0 0 * * * root certbot renew --quiet --post-hook 'systemctl reload nginx'" \
> /etc/cron.d/certbot-renewal
# Verify configuration
nginx -t && systemctl reload nginx
Key Management
HashiCorp Vault Integration
Copy
import hvac
from typing import Optional
import os
class VaultKeyManager:
"""
Key management using HashiCorp Vault
Features:
- Automatic key rotation
- Key versioning
- Audit logging
- Access control
"""
def __init__(self):
self.client = hvac.Client(
url=os.environ["VAULT_ADDR"],
token=os.environ["VAULT_TOKEN"],
)
def create_encryption_key(
self,
key_name: str,
key_type: str = "aes256-gcm96"
) -> dict:
"""Create a new encryption key in Vault"""
self.client.secrets.transit.create_key(
name=key_name,
key_type=key_type,
exportable=False, # Keys never leave Vault
allow_plaintext_backup=False,
)
return self.get_key_info(key_name)
def get_key_info(self, key_name: str) -> dict:
"""Get key metadata (not the key itself)"""
key_info = self.client.secrets.transit.read_key(name=key_name)
return {
"name": key_name,
"type": key_info["data"]["type"],
"latest_version": key_info["data"]["latest_version"],
"min_decryption_version": key_info["data"]["min_decryption_version"],
"supports_encryption": key_info["data"]["supports_encryption"],
}
def encrypt(
self,
key_name: str,
plaintext: bytes,
context: Optional[bytes] = None
) -> str:
"""Encrypt data using Vault Transit"""
import base64
result = self.client.secrets.transit.encrypt_data(
name=key_name,
plaintext=base64.b64encode(plaintext).decode(),
context=base64.b64encode(context).decode() if context else None,
)
return result["data"]["ciphertext"]
def decrypt(
self,
key_name: str,
ciphertext: str,
context: Optional[bytes] = None
) -> bytes:
"""Decrypt data using Vault Transit"""
import base64
result = self.client.secrets.transit.decrypt_data(
name=key_name,
ciphertext=ciphertext,
context=base64.b64encode(context).decode() if context else None,
)
return base64.b64decode(result["data"]["plaintext"])
def rotate_key(self, key_name: str) -> dict:
"""Rotate encryption key (old data still decryptable)"""
self.client.secrets.transit.rotate_key(name=key_name)
return self.get_key_info(key_name)
def rewrap_data(
self,
key_name: str,
ciphertext: str,
context: Optional[bytes] = None
) -> str:
"""
Re-encrypt data with latest key version
(without exposing plaintext)
"""
import base64
result = self.client.secrets.transit.rewrap_data(
name=key_name,
ciphertext=ciphertext,
context=base64.b64encode(context).decode() if context else None,
)
return result["data"]["ciphertext"]
# Usage
vault = VaultKeyManager()
# Create a key for patient data
vault.create_encryption_key("patient-phi-key")
# Encrypt patient data
context = b"patient:P12345" # Binds ciphertext to context
ciphertext = vault.encrypt(
"patient-phi-key",
b'{"diagnosis": "Hypertension"}',
context=context
)
# Decrypt
plaintext = vault.decrypt("patient-phi-key", ciphertext, context=context)
# Rotate key periodically
vault.rotate_key("patient-phi-key")
# Re-wrap old data with new key version
new_ciphertext = vault.rewrap_data("patient-phi-key", ciphertext, context=context)
Key Rotation Strategy
Copy
class KeyRotationScheduler:
"""
Automated key rotation for HIPAA compliance
Best practices:
- Rotate master keys annually
- Rotate data keys based on usage
- Never delete old key versions until all data re-wrapped
"""
ROTATION_POLICIES = {
"master_key": {
"rotation_days": 365,
"min_versions_to_keep": 3,
},
"data_key": {
"rotation_days": 90,
"rotation_on_usage": 1_000_000, # Or after N encryptions
"min_versions_to_keep": 5,
},
"session_key": {
"rotation_days": 1,
"min_versions_to_keep": 2,
},
}
async def run_rotation_check(self):
"""Check and rotate keys as needed"""
for key_name, policy in self.ROTATION_POLICIES.items():
key_info = await self.vault.get_key_info(key_name)
if self._should_rotate(key_info, policy):
await self._rotate_and_rewrap(key_name, policy)
def _should_rotate(self, key_info: dict, policy: dict) -> bool:
"""Determine if key should be rotated"""
last_rotation = key_info.get("last_rotation_time")
if not last_rotation:
return False
days_since_rotation = (datetime.utcnow() - last_rotation).days
return days_since_rotation >= policy["rotation_days"]
async def _rotate_and_rewrap(self, key_name: str, policy: dict):
"""Rotate key and re-wrap existing data"""
# Rotate the key
await self.vault.rotate_key(key_name)
# Schedule background re-wrapping
await self.job_queue.enqueue(
"rewrap_all_data",
{"key_name": key_name}
)
# Log rotation event
await self.audit_logger.log({
"event_type": "KEY_ROTATION",
"key_name": key_name,
"new_version": key_info["latest_version"] + 1,
})
Key Takeaways
Encrypt Everything
All PHI must be encrypted at rest and in transit
Use Envelope Encryption
Separate data keys from master keys for security and performance
TLS 1.2+ Required
Never allow older TLS versions or weak ciphers
Manage Keys Properly
Use HSM or Vault; never store keys alongside data
Practice Exercise
1
Set Up Key Management
Deploy HashiCorp Vault or configure AWS KMS for your application.
2
Implement Envelope Encryption
Create an encryption service using the envelope encryption pattern.
3
Configure TLS
Set up TLS 1.3 with proper cipher suites for your API.
4
Encrypt Database
Implement field-level encryption for PHI in your database.
5
Test Key Rotation
Verify that key rotation works without data loss.