Track 5: Database Security for Healthcare
Databases are where PHI lives. This module covers comprehensive database security strategies from transparent data encryption to field-level encryption, secure query patterns, and HIPAA-compliant backup strategies.Why This Matters: Database breaches account for some of the largest HIPAA violations. A single misconfigured database can expose millions of patient records.
What You’ll Master
Transparent Data Encryption
Encrypt entire databases without application changes
Field-Level Encryption
Protect specific PHI fields with application-level encryption
Secure Query Patterns
Query encrypted data without exposing plaintext
Backup Security
HIPAA-compliant backup and disaster recovery
Part 1: Database Security Architecture
Defense in Depth for Databases
Copy
┌─────────────────────────────────────────────────────────────────┐
│ DEFENSE IN DEPTH │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Layer 1: Network Security │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ • VPC/Private Subnets • Security Groups ││
│ │ • No public access • Network ACLs ││
│ │ • TLS 1.2+ only • VPN/Direct Connect ││
│ └─────────────────────────────────────────────────────────────┘│
│ │
│ Layer 2: Authentication & Authorization │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ • Strong password policies • Certificate authentication ││
│ │ • Role-based access • Least privilege principle ││
│ │ • No shared accounts • Regular access reviews ││
│ └─────────────────────────────────────────────────────────────┘│
│ │
│ Layer 3: Encryption at Rest │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ • Transparent Data Encryption (TDE) ││
│ │ • Encrypted tablespaces ││
│ │ • Encrypted backups ││
│ └─────────────────────────────────────────────────────────────┘│
│ │
│ Layer 4: Field-Level Encryption │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ • Application-level encryption for sensitive fields ││
│ │ • Column encryption for specific PHI ││
│ │ • Client-side field encryption (CSFLE) ││
│ └─────────────────────────────────────────────────────────────┘│
│ │
│ Layer 5: Audit & Monitoring │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ • Query logging • Access monitoring ││
│ │ • Anomaly detection • Privileged activity tracking ││
│ │ • Change data capture • Real-time alerts ││
│ └─────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────┘
HIPAA Requirements for Databases
| Requirement | Implementation | Verification Method |
|---|---|---|
| Access Control (§164.312(a)) | Role-based access, authentication | Access reviews, audit logs |
| Audit Controls (§164.312(b)) | Database activity monitoring | Log review, SIEM integration |
| Integrity (§164.312(c)) | Checksums, change tracking | Integrity verification scripts |
| Transmission Security (§164.312(e)) | TLS, encrypted connections | Network scans, config review |
| Encryption (§164.312(a)(2)(iv)) | TDE, field encryption | Encryption verification |
Part 2: PostgreSQL Security for Healthcare
Transparent Data Encryption with PostgreSQL
Copy
import os
import subprocess
from dataclasses import dataclass
from typing import Optional, List
from enum import Enum
class EncryptionMethod(Enum):
"""PostgreSQL encryption methods"""
PGCRYPTO = "pgcrypto"
TDE_ENTERPRISE = "tde_enterprise" # EDB Postgres
AWS_RDS_ENCRYPTION = "aws_rds"
@dataclass
class PostgresSecurityConfig:
"""PostgreSQL security configuration for HIPAA"""
# Connection settings
host: str
port: int = 5432
database: str = "healthcare_db"
ssl_mode: str = "verify-full" # require, verify-ca, verify-full
ssl_cert_path: Optional[str] = None
ssl_key_path: Optional[str] = None
ssl_root_cert: Optional[str] = None
# Authentication
auth_method: str = "scram-sha-256" # md5 is deprecated
password_encryption: str = "scram-sha-256"
# Encryption
encryption_method: EncryptionMethod = EncryptionMethod.PGCRYPTO
# Audit settings
pgaudit_enabled: bool = True
log_statement: str = "all" # none, ddl, mod, all
log_connections: bool = True
log_disconnections: bool = True
def generate_postgresql_conf(self) -> str:
"""Generate security-focused postgresql.conf settings"""
return f"""
# HIPAA-Compliant PostgreSQL Configuration
# SSL Configuration
ssl = on
ssl_cert_file = '{self.ssl_cert_path}'
ssl_key_file = '{self.ssl_key_path}'
ssl_ca_file = '{self.ssl_root_cert}'
ssl_min_protocol_version = 'TLSv1.2'
ssl_ciphers = 'HIGH:!aNULL:!MD5'
# Authentication
password_encryption = '{self.password_encryption}'
# Connection Security
log_connections = {'on' if self.log_connections else 'off'}
log_disconnections = {'on' if self.log_disconnections else 'off'}
# Audit Logging
log_statement = '{self.log_statement}'
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
log_checkpoints = on
log_lock_waits = on
# pgAudit Configuration (if enabled)
{"shared_preload_libraries = 'pgaudit'" if self.pgaudit_enabled else ""}
{"pgaudit.log = 'read, write, ddl'" if self.pgaudit_enabled else ""}
{"pgaudit.log_catalog = off" if self.pgaudit_enabled else ""}
{"pgaudit.log_parameter = on" if self.pgaudit_enabled else ""}
"""
def generate_pg_hba_conf(self) -> str:
"""Generate secure pg_hba.conf"""
return f"""
# HIPAA-Compliant pg_hba.conf
# TYPE DATABASE USER ADDRESS METHOD
# Reject all by default
local all all reject
host all all 0.0.0.0/0 reject
host all all ::/0 reject
# Allow specific secure connections
# Application connections (require SSL + SCRAM)
hostssl {self.database} app_user 10.0.0.0/8 {self.auth_method}
hostssl {self.database} app_user 172.16.0.0/12 {self.auth_method}
# Admin connections (require SSL + certificate)
hostssl all admin_user 10.0.1.0/24 cert
# Replication (require SSL + certificate)
hostssl replication repl_user 10.0.2.0/24 cert
# Local socket for maintenance (peer auth only)
local all postgres peer
"""
class PostgresPHIEncryption:
"""Field-level encryption for PHI using pgcrypto"""
def __init__(self, connection):
self.conn = connection
self._ensure_pgcrypto()
def _ensure_pgcrypto(self):
"""Enable pgcrypto extension"""
with self.conn.cursor() as cur:
cur.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto;")
self.conn.commit()
def create_encrypted_table(self) -> str:
"""Create a table with encrypted PHI columns"""
return """
-- Table for storing encrypted patient data
CREATE TABLE patients (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
mrn VARCHAR(20) NOT NULL UNIQUE, -- Medical Record Number (not PHI by itself)
-- Encrypted PHI fields (stored as bytea)
first_name_encrypted BYTEA NOT NULL,
last_name_encrypted BYTEA NOT NULL,
ssn_encrypted BYTEA NOT NULL,
date_of_birth_encrypted BYTEA NOT NULL,
address_encrypted BYTEA NOT NULL,
phone_encrypted BYTEA NOT NULL,
email_encrypted BYTEA NOT NULL,
medical_conditions_encrypted BYTEA NOT NULL,
-- Metadata (not PHI)
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
created_by UUID NOT NULL,
-- Search indexes (using blind indexes)
ssn_hash BYTEA NOT NULL, -- For exact match search
name_hash BYTEA NOT NULL -- For exact match search
);
-- Index on blind indexes
CREATE INDEX idx_patients_ssn_hash ON patients(ssn_hash);
CREATE INDEX idx_patients_name_hash ON patients(name_hash);
-- Audit trigger
CREATE TRIGGER audit_patients
AFTER INSERT OR UPDATE OR DELETE ON patients
FOR EACH ROW EXECUTE FUNCTION audit_trigger();
"""
def encrypt_insert_function(self) -> str:
"""SQL function to insert with encryption"""
return """
-- Function to insert patient with encryption
CREATE OR REPLACE FUNCTION insert_patient(
p_mrn VARCHAR(20),
p_first_name TEXT,
p_last_name TEXT,
p_ssn TEXT,
p_dob DATE,
p_address TEXT,
p_phone TEXT,
p_email TEXT,
p_conditions TEXT,
p_encryption_key TEXT,
p_created_by UUID
) RETURNS UUID AS $$
DECLARE
v_patient_id UUID;
BEGIN
INSERT INTO patients (
mrn,
first_name_encrypted,
last_name_encrypted,
ssn_encrypted,
date_of_birth_encrypted,
address_encrypted,
phone_encrypted,
email_encrypted,
medical_conditions_encrypted,
ssn_hash,
name_hash,
created_by
) VALUES (
p_mrn,
pgp_sym_encrypt(p_first_name, p_encryption_key),
pgp_sym_encrypt(p_last_name, p_encryption_key),
pgp_sym_encrypt(p_ssn, p_encryption_key),
pgp_sym_encrypt(p_dob::TEXT, p_encryption_key),
pgp_sym_encrypt(p_address, p_encryption_key),
pgp_sym_encrypt(p_phone, p_encryption_key),
pgp_sym_encrypt(p_email, p_encryption_key),
pgp_sym_encrypt(p_conditions, p_encryption_key),
digest(p_ssn, 'sha256'),
digest(LOWER(p_first_name || p_last_name), 'sha256'),
p_created_by
)
RETURNING id INTO v_patient_id;
RETURN v_patient_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
"""
def decrypt_select_function(self) -> str:
"""SQL function to select with decryption"""
return """
-- Function to get patient with decryption
CREATE OR REPLACE FUNCTION get_patient(
p_patient_id UUID,
p_encryption_key TEXT
) RETURNS TABLE (
id UUID,
mrn VARCHAR(20),
first_name TEXT,
last_name TEXT,
ssn TEXT,
date_of_birth DATE,
address TEXT,
phone TEXT,
email TEXT,
medical_conditions TEXT
) AS $$
BEGIN
RETURN QUERY
SELECT
patients.id,
patients.mrn,
pgp_sym_decrypt(first_name_encrypted, p_encryption_key)::TEXT,
pgp_sym_decrypt(last_name_encrypted, p_encryption_key)::TEXT,
pgp_sym_decrypt(ssn_encrypted, p_encryption_key)::TEXT,
pgp_sym_decrypt(date_of_birth_encrypted, p_encryption_key)::DATE,
pgp_sym_decrypt(address_encrypted, p_encryption_key)::TEXT,
pgp_sym_decrypt(phone_encrypted, p_encryption_key)::TEXT,
pgp_sym_decrypt(email_encrypted, p_encryption_key)::TEXT,
pgp_sym_decrypt(medical_conditions_encrypted, p_encryption_key)::TEXT
FROM patients
WHERE patients.id = p_patient_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- Grant execute to application role only
REVOKE ALL ON FUNCTION get_patient FROM PUBLIC;
GRANT EXECUTE ON FUNCTION get_patient TO app_role;
"""
def search_by_ssn_function(self) -> str:
"""Search encrypted data using blind index"""
return """
-- Search patient by SSN using blind index
CREATE OR REPLACE FUNCTION search_patient_by_ssn(
p_ssn TEXT,
p_encryption_key TEXT
) RETURNS TABLE (
id UUID,
mrn VARCHAR(20),
first_name TEXT,
last_name TEXT
) AS $$
BEGIN
-- First, find by blind index
-- Then decrypt only matching records
RETURN QUERY
SELECT
patients.id,
patients.mrn,
pgp_sym_decrypt(first_name_encrypted, p_encryption_key)::TEXT,
pgp_sym_decrypt(last_name_encrypted, p_encryption_key)::TEXT
FROM patients
WHERE ssn_hash = digest(p_ssn, 'sha256');
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
"""
PostgreSQL Row-Level Security
Copy
class PostgresRowLevelSecurity:
"""Implement HIPAA access controls with Row-Level Security"""
def create_rls_policies(self) -> str:
"""Create RLS policies for patient data"""
return """
-- Enable Row-Level Security on patient tables
ALTER TABLE patients ENABLE ROW LEVEL SECURITY;
ALTER TABLE encounters ENABLE ROW LEVEL SECURITY;
ALTER TABLE clinical_notes ENABLE ROW LEVEL SECURITY;
-- Create security context table
CREATE TABLE user_access_context (
user_id UUID PRIMARY KEY,
department_id UUID,
role_name VARCHAR(50),
assigned_patients UUID[],
is_treating_provider BOOLEAN DEFAULT FALSE,
has_break_glass BOOLEAN DEFAULT FALSE,
break_glass_expiry TIMESTAMP WITH TIME ZONE
);
-- Policy: Users can only see their assigned patients
CREATE POLICY patient_access_policy ON patients
FOR ALL
TO app_role
USING (
-- Check if user has direct assignment
id = ANY(
SELECT unnest(assigned_patients)
FROM user_access_context
WHERE user_id = current_setting('app.current_user_id')::UUID
)
OR
-- Check if user has break-glass access
(
SELECT has_break_glass AND break_glass_expiry > CURRENT_TIMESTAMP
FROM user_access_context
WHERE user_id = current_setting('app.current_user_id')::UUID
)
OR
-- Admins can see all (for emergencies)
(
SELECT role_name = 'admin'
FROM user_access_context
WHERE user_id = current_setting('app.current_user_id')::UUID
)
);
-- Policy: Clinical notes access based on role
CREATE POLICY clinical_notes_access ON clinical_notes
FOR SELECT
TO app_role
USING (
-- Author can always see their notes
author_id = current_setting('app.current_user_id')::UUID
OR
-- Treating providers can see notes for their patients
patient_id IN (
SELECT unnest(assigned_patients)
FROM user_access_context
WHERE user_id = current_setting('app.current_user_id')::UUID
AND is_treating_provider = TRUE
)
OR
-- Quality/Compliance can see de-identified notes
(
SELECT role_name IN ('quality', 'compliance')
FROM user_access_context
WHERE user_id = current_setting('app.current_user_id')::UUID
)
);
-- Function to set user context (call at session start)
CREATE OR REPLACE FUNCTION set_user_context(p_user_id UUID)
RETURNS VOID AS $$
BEGIN
-- Set session variable for RLS policies
PERFORM set_config('app.current_user_id', p_user_id::TEXT, FALSE);
-- Log the context change
INSERT INTO access_log (user_id, action, timestamp)
VALUES (p_user_id, 'SESSION_START', CURRENT_TIMESTAMP);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- Function to enable break-glass access
CREATE OR REPLACE FUNCTION enable_break_glass(
p_user_id UUID,
p_patient_id UUID,
p_reason TEXT,
p_duration_minutes INT DEFAULT 60
) RETURNS BOOLEAN AS $$
DECLARE
v_expiry TIMESTAMP WITH TIME ZONE;
BEGIN
v_expiry := CURRENT_TIMESTAMP + (p_duration_minutes || ' minutes')::INTERVAL;
-- Update user context with break-glass
UPDATE user_access_context
SET
has_break_glass = TRUE,
break_glass_expiry = v_expiry,
assigned_patients = array_append(assigned_patients, p_patient_id)
WHERE user_id = p_user_id;
-- Log break-glass activation (CRITICAL audit event)
INSERT INTO break_glass_log (
user_id,
patient_id,
reason,
activated_at,
expires_at,
ip_address
) VALUES (
p_user_id,
p_patient_id,
p_reason,
CURRENT_TIMESTAMP,
v_expiry,
inet_client_addr()
);
-- Send immediate notification to security team
PERFORM pg_notify('break_glass_alert', json_build_object(
'user_id', p_user_id,
'patient_id', p_patient_id,
'reason', p_reason,
'timestamp', CURRENT_TIMESTAMP
)::TEXT);
RETURN TRUE;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
"""
PostgreSQL Audit Configuration
Copy
class PostgresAuditSetup:
"""Configure comprehensive audit logging for HIPAA"""
def create_audit_infrastructure(self) -> str:
"""Create audit tables and triggers"""
return """
-- Audit log table (high-performance, partitioned by month)
CREATE TABLE audit_log (
id BIGSERIAL,
event_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
user_id UUID,
user_name TEXT,
client_addr INET,
action TEXT NOT NULL,
table_name TEXT,
record_id UUID,
old_values JSONB,
new_values JSONB,
query_text TEXT,
application_name TEXT
) PARTITION BY RANGE (event_time);
-- Create partitions for next 12 months
DO $$
DECLARE
start_date DATE := DATE_TRUNC('month', CURRENT_DATE);
partition_date DATE;
partition_name TEXT;
BEGIN
FOR i IN 0..11 LOOP
partition_date := start_date + (i || ' months')::INTERVAL;
partition_name := 'audit_log_' || TO_CHAR(partition_date, 'YYYY_MM');
EXECUTE format(
'CREATE TABLE IF NOT EXISTS %I PARTITION OF audit_log
FOR VALUES FROM (%L) TO (%L)',
partition_name,
partition_date,
partition_date + INTERVAL '1 month'
);
END LOOP;
END $$;
-- Index for common queries
CREATE INDEX idx_audit_user ON audit_log(user_id, event_time);
CREATE INDEX idx_audit_table ON audit_log(table_name, event_time);
CREATE INDEX idx_audit_record ON audit_log(record_id, event_time);
-- Generic audit trigger function
CREATE OR REPLACE FUNCTION audit_trigger()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit_log (
user_id,
user_name,
client_addr,
action,
table_name,
record_id,
old_values,
new_values,
query_text,
application_name
) VALUES (
NULLIF(current_setting('app.current_user_id', TRUE), '')::UUID,
current_user,
inet_client_addr(),
TG_OP,
TG_TABLE_NAME,
COALESCE(NEW.id, OLD.id),
CASE WHEN TG_OP IN ('UPDATE', 'DELETE') THEN to_jsonb(OLD) END,
CASE WHEN TG_OP IN ('INSERT', 'UPDATE') THEN to_jsonb(NEW) END,
current_query(),
current_setting('application_name')
);
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
-- PHI access log (specifically for patient record access)
CREATE TABLE phi_access_log (
id BIGSERIAL PRIMARY KEY,
access_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
user_id UUID NOT NULL,
patient_id UUID NOT NULL,
access_type VARCHAR(20) NOT NULL, -- 'view', 'edit', 'print', 'export'
data_elements TEXT[], -- Which fields were accessed
purpose TEXT,
break_glass_used BOOLEAN DEFAULT FALSE,
session_id TEXT,
client_ip INET,
client_user_agent TEXT
);
-- Function to log PHI access (call from application)
CREATE OR REPLACE FUNCTION log_phi_access(
p_user_id UUID,
p_patient_id UUID,
p_access_type VARCHAR(20),
p_data_elements TEXT[],
p_purpose TEXT DEFAULT NULL
) RETURNS VOID AS $$
BEGIN
INSERT INTO phi_access_log (
user_id,
patient_id,
access_type,
data_elements,
purpose,
break_glass_used,
session_id,
client_ip
) VALUES (
p_user_id,
p_patient_id,
p_access_type,
p_data_elements,
p_purpose,
(
SELECT has_break_glass AND break_glass_expiry > CURRENT_TIMESTAMP
FROM user_access_context
WHERE user_id = p_user_id
),
current_setting('app.session_id', TRUE),
inet_client_addr()
);
END;
$$ LANGUAGE plpgsql;
"""
def generate_audit_report(self) -> str:
"""Generate HIPAA-required audit reports"""
return """
-- Accounting of Disclosures Report (required by HIPAA)
CREATE OR REPLACE FUNCTION generate_disclosure_report(
p_patient_id UUID,
p_start_date DATE DEFAULT CURRENT_DATE - INTERVAL '6 years',
p_end_date DATE DEFAULT CURRENT_DATE
) RETURNS TABLE (
disclosure_date TIMESTAMP WITH TIME ZONE,
disclosed_to TEXT,
purpose TEXT,
data_disclosed TEXT
) AS $$
BEGIN
RETURN QUERY
SELECT
pal.access_time,
u.full_name,
COALESCE(pal.purpose, 'Treatment'),
array_to_string(pal.data_elements, ', ')
FROM phi_access_log pal
JOIN users u ON u.id = pal.user_id
WHERE pal.patient_id = p_patient_id
AND pal.access_time BETWEEN p_start_date AND p_end_date
ORDER BY pal.access_time DESC;
END;
$$ LANGUAGE plpgsql;
-- User Activity Report (for access reviews)
CREATE OR REPLACE FUNCTION user_activity_report(
p_user_id UUID,
p_days INT DEFAULT 30
) RETURNS TABLE (
activity_date DATE,
patients_accessed INT,
records_viewed INT,
records_modified INT,
exports_performed INT,
break_glass_count INT
) AS $$
BEGIN
RETURN QUERY
SELECT
DATE(access_time) AS activity_date,
COUNT(DISTINCT patient_id)::INT AS patients_accessed,
COUNT(*) FILTER (WHERE access_type = 'view')::INT AS records_viewed,
COUNT(*) FILTER (WHERE access_type = 'edit')::INT AS records_modified,
COUNT(*) FILTER (WHERE access_type = 'export')::INT AS exports_performed,
COUNT(*) FILTER (WHERE break_glass_used = TRUE)::INT AS break_glass_count
FROM phi_access_log
WHERE user_id = p_user_id
AND access_time > CURRENT_DATE - (p_days || ' days')::INTERVAL
GROUP BY DATE(access_time)
ORDER BY activity_date DESC;
END;
$$ LANGUAGE plpgsql;
-- Anomaly Detection Query
CREATE OR REPLACE FUNCTION detect_access_anomalies(
p_days INT DEFAULT 7
) RETURNS TABLE (
user_id UUID,
user_name TEXT,
anomaly_type TEXT,
details TEXT,
severity TEXT
) AS $$
BEGIN
-- Detect after-hours access
RETURN QUERY
SELECT
pal.user_id,
u.full_name,
'After Hours Access'::TEXT,
COUNT(*)::TEXT || ' records accessed between 10pm-6am',
'MEDIUM'::TEXT
FROM phi_access_log pal
JOIN users u ON u.id = pal.user_id
WHERE pal.access_time > CURRENT_DATE - (p_days || ' days')::INTERVAL
AND EXTRACT(HOUR FROM pal.access_time) NOT BETWEEN 6 AND 22
GROUP BY pal.user_id, u.full_name
HAVING COUNT(*) > 10;
-- Detect high-volume access
RETURN QUERY
SELECT
pal.user_id,
u.full_name,
'High Volume Access'::TEXT,
COUNT(DISTINCT patient_id)::TEXT || ' unique patients in one day',
'HIGH'::TEXT
FROM phi_access_log pal
JOIN users u ON u.id = pal.user_id
WHERE pal.access_time > CURRENT_DATE - (p_days || ' days')::INTERVAL
GROUP BY pal.user_id, u.full_name, DATE(pal.access_time)
HAVING COUNT(DISTINCT patient_id) > 100;
-- Detect excessive break-glass usage
RETURN QUERY
SELECT
pal.user_id,
u.full_name,
'Excessive Break-Glass'::TEXT,
COUNT(*)::TEXT || ' break-glass accesses',
'CRITICAL'::TEXT
FROM phi_access_log pal
JOIN users u ON u.id = pal.user_id
WHERE pal.access_time > CURRENT_DATE - (p_days || ' days')::INTERVAL
AND pal.break_glass_used = TRUE
GROUP BY pal.user_id, u.full_name
HAVING COUNT(*) > 5;
END;
$$ LANGUAGE plpgsql;
"""
Part 3: MongoDB Security for Healthcare
MongoDB Client-Side Field Level Encryption (CSFLE)
Copy
from pymongo import MongoClient
from pymongo.encryption import ClientEncryption
from pymongo.encryption_options import AutoEncryptionOpts
from bson.codec_options import CodecOptions
from bson.binary import STANDARD, UUID
import os
from dataclasses import dataclass
from typing import Dict, Any, Optional
import json
@dataclass
class MongoDBCSFLEConfig:
"""MongoDB CSFLE configuration for healthcare"""
connection_string: str
key_vault_namespace: str = "encryption.__keyVault"
key_vault_database: str = "encryption"
key_vault_collection: str = "__keyVault"
# KMS provider (AWS KMS, Azure Key Vault, GCP KMS, or local for dev)
kms_provider: str = "aws" # aws, azure, gcp, local
# AWS KMS settings
aws_key_arn: str = ""
aws_region: str = "us-east-1"
def get_kms_providers(self) -> Dict[str, Any]:
"""Get KMS provider configuration"""
if self.kms_provider == "aws":
return {
"aws": {
"accessKeyId": os.environ.get("AWS_ACCESS_KEY_ID"),
"secretAccessKey": os.environ.get("AWS_SECRET_ACCESS_KEY")
}
}
elif self.kms_provider == "local":
# LOCAL KEY ONLY FOR DEVELOPMENT
# Generate with: secrets.token_bytes(96)
local_master_key = os.environ.get("LOCAL_MASTER_KEY", "").encode()
if len(local_master_key) != 96:
raise ValueError("Local master key must be 96 bytes")
return {"local": {"key": local_master_key}}
else:
raise ValueError(f"Unsupported KMS provider: {self.kms_provider}")
class HealthcareMongoDBClient:
"""MongoDB client with HIPAA-compliant encryption"""
def __init__(self, config: MongoDBCSFLEConfig):
self.config = config
self._setup_encryption()
def _setup_encryption(self):
"""Setup client-side field level encryption"""
kms_providers = self.config.get_kms_providers()
# Create key vault client
key_vault_client = MongoClient(self.config.connection_string)
# Create encryption helper
self.client_encryption = ClientEncryption(
kms_providers=kms_providers,
key_vault_namespace=self.config.key_vault_namespace,
key_vault_client=key_vault_client,
codec_options=CodecOptions(uuid_representation=STANDARD)
)
# Create data encryption key if not exists
self.data_key_id = self._get_or_create_data_key()
# Define schema for automatic encryption
self.schema_map = self._create_schema_map()
# Create auto-encrypting client
auto_encryption_opts = AutoEncryptionOpts(
kms_providers=kms_providers,
key_vault_namespace=self.config.key_vault_namespace,
schema_map=self.schema_map,
crypt_shared_lib_path="/path/to/crypt_shared" # MongoDB crypt shared library
)
self.encrypted_client = MongoClient(
self.config.connection_string,
auto_encryption_opts=auto_encryption_opts
)
def _get_or_create_data_key(self) -> bytes:
"""Get existing or create new data encryption key"""
key_vault_db = self.client_encryption._key_vault_coll.database
key_vault_coll = key_vault_db[self.config.key_vault_collection]
# Look for existing key
existing_key = key_vault_coll.find_one({"keyAltNames": "healthcare_phi_key"})
if existing_key:
return existing_key["_id"]
# Create new key
if self.config.kms_provider == "aws":
master_key = {
"region": self.config.aws_region,
"key": self.config.aws_key_arn
}
else:
master_key = {}
data_key_id = self.client_encryption.create_data_key(
self.config.kms_provider,
master_key=master_key,
key_alt_names=["healthcare_phi_key"]
)
return data_key_id
def _create_schema_map(self) -> Dict[str, Any]:
"""Create JSON schema for automatic field encryption"""
return {
"healthcare.patients": {
"bsonType": "object",
"encryptMetadata": {
"keyId": [self.data_key_id]
},
"properties": {
# PHI fields with deterministic encryption (searchable)
"ssn": {
"encrypt": {
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
}
},
"mrn": {
"encrypt": {
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
}
},
# PHI fields with random encryption (not searchable, more secure)
"firstName": {
"encrypt": {
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
},
"lastName": {
"encrypt": {
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
},
"dateOfBirth": {
"encrypt": {
"bsonType": "date",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
},
"address": {
"encrypt": {
"bsonType": "object",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
},
"phoneNumbers": {
"encrypt": {
"bsonType": "array",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
},
"email": {
"encrypt": {
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
},
"medicalHistory": {
"encrypt": {
"bsonType": "object",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
}
}
}
}
def insert_patient(self, patient_data: Dict[str, Any]) -> str:
"""Insert patient with automatic field encryption"""
db = self.encrypted_client["healthcare"]
collection = db["patients"]
# Automatic encryption happens transparently
result = collection.insert_one(patient_data)
return str(result.inserted_id)
def find_patient_by_ssn(self, ssn: str) -> Optional[Dict[str, Any]]:
"""Find patient by SSN (deterministic encryption allows equality search)"""
db = self.encrypted_client["healthcare"]
collection = db["patients"]
# SSN is automatically encrypted before query
# and result is automatically decrypted
return collection.find_one({"ssn": ssn})
def update_patient(self, patient_id: str, updates: Dict[str, Any]) -> bool:
"""Update patient with encrypted fields"""
from bson.objectid import ObjectId
db = self.encrypted_client["healthcare"]
collection = db["patients"]
result = collection.update_one(
{"_id": ObjectId(patient_id)},
{"$set": updates}
)
return result.modified_count > 0
# Example usage
def demonstrate_mongodb_csfle():
"""Show CSFLE in action"""
config = MongoDBCSFLEConfig(
connection_string="mongodb://localhost:27017",
kms_provider="aws",
aws_key_arn="arn:aws:kms:us-east-1:123456789:key/your-key-id",
aws_region="us-east-1"
)
client = HealthcareMongoDBClient(config)
# Insert patient - PHI is automatically encrypted
patient = {
"mrn": "MRN12345",
"ssn": "123-45-6789",
"firstName": "John",
"lastName": "Doe",
"dateOfBirth": "1985-03-15",
"address": {
"street": "123 Main St",
"city": "Boston",
"state": "MA",
"zip": "02101"
},
"phoneNumbers": ["617-555-0123", "617-555-0124"],
"email": "[email protected]",
"medicalHistory": {
"allergies": ["penicillin"],
"conditions": ["hypertension", "diabetes type 2"]
}
}
patient_id = client.insert_patient(patient)
print(f"Inserted patient: {patient_id}")
# Find by SSN - works because SSN uses deterministic encryption
found = client.find_patient_by_ssn("123-45-6789")
print(f"Found patient: {found['firstName']} {found['lastName']}")
MongoDB Role-Based Access Control
Copy
class MongoDBRBACSetup:
"""Setup HIPAA-compliant RBAC in MongoDB"""
def __init__(self, admin_client: MongoClient):
self.client = admin_client
def create_hipaa_roles(self):
"""Create healthcare-specific roles"""
admin_db = self.client.admin
healthcare_db = self.client.healthcare
# Role: Clinical Staff (Read patients they're assigned to)
healthcare_db.command("createRole", "clinicalStaff",
privileges=[
{
"resource": {"db": "healthcare", "collection": "patients"},
"actions": ["find"]
},
{
"resource": {"db": "healthcare", "collection": "encounters"},
"actions": ["find", "insert", "update"]
},
{
"resource": {"db": "healthcare", "collection": "clinicalNotes"},
"actions": ["find", "insert"]
}
],
roles=[]
)
# Role: Physician (Extended clinical access)
healthcare_db.command("createRole", "physician",
privileges=[
{
"resource": {"db": "healthcare", "collection": "patients"},
"actions": ["find", "update"]
},
{
"resource": {"db": "healthcare", "collection": "encounters"},
"actions": ["find", "insert", "update"]
},
{
"resource": {"db": "healthcare", "collection": "clinicalNotes"},
"actions": ["find", "insert", "update"]
},
{
"resource": {"db": "healthcare", "collection": "orders"},
"actions": ["find", "insert", "update"]
},
{
"resource": {"db": "healthcare", "collection": "prescriptions"},
"actions": ["find", "insert"]
}
],
roles=[]
)
# Role: Billing Staff (Limited PHI access)
healthcare_db.command("createRole", "billingStaff",
privileges=[
{
"resource": {"db": "healthcare", "collection": "patients"},
"actions": ["find"] # Read-only, limited to billing-relevant fields via app
},
{
"resource": {"db": "healthcare", "collection": "encounters"},
"actions": ["find"]
},
{
"resource": {"db": "healthcare", "collection": "claims"},
"actions": ["find", "insert", "update"]
},
{
"resource": {"db": "healthcare", "collection": "payments"},
"actions": ["find", "insert"]
}
],
roles=[]
)
# Role: Auditor (Read-only access to audit logs)
healthcare_db.command("createRole", "auditor",
privileges=[
{
"resource": {"db": "healthcare", "collection": "auditLog"},
"actions": ["find"]
},
{
"resource": {"db": "healthcare", "collection": "accessLog"},
"actions": ["find"]
}
],
roles=[]
)
# Role: Security Admin (Manage users and roles)
admin_db.command("createRole", "securityAdmin",
privileges=[
{
"resource": {"db": "healthcare", "collection": ""},
"actions": ["createUser", "dropUser", "grantRole", "revokeRole", "viewUser"]
}
],
roles=["userAdmin"]
)
def create_users(self):
"""Create users with appropriate roles"""
healthcare_db = self.client.healthcare
# Create application service account
healthcare_db.command("createUser", "healthcare_app",
pwd="strong_password_here", # Use secrets manager in production
roles=[
{"role": "clinicalStaff", "db": "healthcare"},
{"role": "readWrite", "db": "healthcare"}
],
mechanisms=["SCRAM-SHA-256"]
)
# Create read-only analytics account
healthcare_db.command("createUser", "analytics_readonly",
pwd="another_strong_password",
roles=[
{"role": "read", "db": "healthcare"}
],
mechanisms=["SCRAM-SHA-256"]
)
def enable_audit_logging(self) -> Dict[str, Any]:
"""MongoDB audit log configuration (Enterprise feature)"""
return {
"auditLog": {
"destination": "file",
"format": "JSON",
"path": "/var/log/mongodb/audit.json"
},
"setParameter": {
"auditAuthorizationSuccess": True
}
}
Part 4: Backup Security
HIPAA-Compliant Backup Strategy
Copy
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from enum import Enum
import hashlib
import subprocess
import os
class BackupType(Enum):
FULL = "full"
INCREMENTAL = "incremental"
DIFFERENTIAL = "differential"
class StorageTier(Enum):
HOT = "hot" # Immediate access
WARM = "warm" # Minutes to access
COLD = "cold" # Hours to access
ARCHIVE = "archive" # Days to access
@dataclass
class BackupPolicy:
"""HIPAA-compliant backup policy"""
name: str
database_name: str
# Retention settings (HIPAA requires 6 years minimum)
retention_days_hot: int = 30
retention_days_warm: int = 90
retention_days_cold: int = 365
retention_days_archive: int = 2190 # 6 years
# Backup schedule
full_backup_frequency: str = "weekly" # daily, weekly
incremental_frequency: str = "hourly"
# Security settings
encryption_algorithm: str = "AES-256-GCM"
encryption_key_id: str = "" # KMS key ID
# Verification settings
verify_after_backup: bool = True
test_restore_frequency: str = "monthly"
# Locations
primary_location: str = "" # Primary backup location
secondary_location: str = "" # Geographic redundancy
def validate(self) -> List[str]:
"""Validate backup policy meets HIPAA requirements"""
issues = []
# Check retention
total_retention = (
self.retention_days_hot +
self.retention_days_warm +
self.retention_days_cold +
self.retention_days_archive
)
if total_retention < 2190: # 6 years
issues.append(
f"Total retention ({total_retention} days) is less than "
f"HIPAA minimum (2190 days / 6 years)"
)
# Check encryption
if self.encryption_algorithm not in ["AES-256-GCM", "AES-256-CBC"]:
issues.append("Encryption algorithm must be AES-256")
if not self.encryption_key_id:
issues.append("Encryption key ID must be specified")
# Check geographic redundancy
if not self.secondary_location:
issues.append("Secondary backup location required for disaster recovery")
if self.primary_location == self.secondary_location:
issues.append("Secondary location must be different from primary")
return issues
@dataclass
class BackupRecord:
"""Record of a completed backup"""
backup_id: str
policy_name: str
backup_type: BackupType
start_time: datetime
end_time: datetime
size_bytes: int
storage_location: str
storage_tier: StorageTier
encryption_key_id: str
checksum_sha256: str
verified: bool = False
verification_time: Optional[datetime] = None
class PostgresBackupManager:
"""Manage HIPAA-compliant PostgreSQL backups"""
def __init__(self, policy: BackupPolicy):
self.policy = policy
self.validate_policy()
def validate_policy(self):
"""Validate backup policy"""
issues = self.policy.validate()
if issues:
raise ValueError(f"Invalid backup policy: {issues}")
def perform_full_backup(self) -> BackupRecord:
"""Perform encrypted full backup"""
import uuid
backup_id = str(uuid.uuid4())
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
backup_file = f"/backup/{self.policy.database_name}_full_{timestamp}.sql.gz.enc"
start_time = datetime.utcnow()
# Backup command with encryption
# Using pg_dump -> gzip -> openssl encryption
backup_cmd = f"""
pg_dump -h localhost -U backup_user -d {self.policy.database_name} \
--format=custom \
--no-password \
| gzip \
| openssl enc -{self.policy.encryption_algorithm.lower().replace('-', '')} \
-e \
-pbkdf2 \
-pass file:/etc/backup/key.txt \
-out {backup_file}
"""
# Execute backup
result = subprocess.run(
backup_cmd,
shell=True,
capture_output=True,
text=True
)
if result.returncode != 0:
raise Exception(f"Backup failed: {result.stderr}")
end_time = datetime.utcnow()
# Calculate checksum
with open(backup_file, 'rb') as f:
checksum = hashlib.sha256(f.read()).hexdigest()
# Get file size
size_bytes = os.path.getsize(backup_file)
# Upload to primary location
self._upload_to_storage(backup_file, self.policy.primary_location)
# Upload to secondary location
self._upload_to_storage(backup_file, self.policy.secondary_location)
# Create backup record
record = BackupRecord(
backup_id=backup_id,
policy_name=self.policy.name,
backup_type=BackupType.FULL,
start_time=start_time,
end_time=end_time,
size_bytes=size_bytes,
storage_location=self.policy.primary_location,
storage_tier=StorageTier.HOT,
encryption_key_id=self.policy.encryption_key_id,
checksum_sha256=checksum
)
# Verify if required
if self.policy.verify_after_backup:
record = self._verify_backup(record)
return record
def _upload_to_storage(self, local_path: str, remote_location: str):
"""Upload backup to storage (S3, Azure Blob, etc.)"""
if remote_location.startswith("s3://"):
# AWS S3 upload with server-side encryption
cmd = f"""
aws s3 cp {local_path} {remote_location}/ \
--sse aws:kms \
--sse-kms-key-id {self.policy.encryption_key_id} \
--storage-class STANDARD_IA
"""
subprocess.run(cmd, shell=True, check=True)
else:
raise ValueError(f"Unsupported storage location: {remote_location}")
def _verify_backup(self, record: BackupRecord) -> BackupRecord:
"""Verify backup integrity and restorability"""
# Download backup
local_verify_path = f"/tmp/verify_{record.backup_id}.sql.gz"
# Decrypt and decompress
verify_cmd = f"""
aws s3 cp {self.policy.primary_location}/{record.backup_id} - \
| openssl enc -{self.policy.encryption_algorithm.lower().replace('-', '')} \
-d \
-pbkdf2 \
-pass file:/etc/backup/key.txt \
| gunzip \
| pg_restore --list > /dev/null
"""
result = subprocess.run(
verify_cmd,
shell=True,
capture_output=True,
text=True
)
record.verified = result.returncode == 0
record.verification_time = datetime.utcnow()
return record
def perform_test_restore(self) -> Dict[str, any]:
"""Perform test restore to verify backups (required regularly)"""
# This should restore to a test environment
test_results = {
"test_date": datetime.utcnow(),
"backup_used": None,
"restore_successful": False,
"data_integrity_verified": False,
"time_to_restore_seconds": 0,
"issues_found": []
}
# Get latest backup
# Restore to test database
# Verify data integrity
# Record results
return test_results
def apply_retention_policy(self):
"""Move backups between storage tiers based on age"""
now = datetime.utcnow()
# This would query your backup catalog and move files
# between storage tiers based on age
transitions = {
StorageTier.HOT: StorageTier.WARM,
StorageTier.WARM: StorageTier.COLD,
StorageTier.COLD: StorageTier.ARCHIVE
}
age_thresholds = {
StorageTier.HOT: timedelta(days=self.policy.retention_days_hot),
StorageTier.WARM: timedelta(days=self.policy.retention_days_warm),
StorageTier.COLD: timedelta(days=self.policy.retention_days_cold)
}
# Logic to transition backups between tiers
pass
def generate_backup_report(self) -> str:
"""Generate backup status report for compliance"""
return """
BACKUP COMPLIANCE REPORT
========================
Policy: {policy_name}
Report Date: {date}
BACKUP STATUS
-------------
Last Full Backup: [DATE] (SUCCESS/FAILED)
Last Incremental: [DATE] (SUCCESS/FAILED)
Backups in Last 24h: X
Backups Verified: X/Y
STORAGE LOCATIONS
-----------------
Primary: {primary_location}
Secondary: {secondary_location}
Encryption: {encryption_algorithm}
RETENTION COMPLIANCE
--------------------
Hot Storage: X backups (Y days)
Warm Storage: X backups (Y days)
Cold Storage: X backups (Y days)
Archive: X backups (Y days)
Total Retention: Z days (COMPLIANT/NON-COMPLIANT)
TEST RESTORES
-------------
Last Test Restore: [DATE]
Result: SUCCESS/FAILED
Time to Restore: X minutes
Next Scheduled Test: [DATE]
ISSUES & ALERTS
---------------
[List any issues]
"""
Secure Backup Verification
1
Automated Integrity Checks
Verify backup checksums match after each backup completes
2
Monthly Test Restores
Restore to isolated environment and verify:
- Data completeness
- Data integrity
- Application functionality
- Time to restore meets RTO
3
Annual Full DR Test
Complete disaster recovery drill:
- Restore to alternate site
- Verify all systems operational
- Test with actual users
- Document lessons learned
Part 5: Database Security Checklist
Pre-Deployment Checklist
Complete Security Checklist
Complete Security Checklist
Network Security
- Database in private subnet (no public IP)
- Security group restricts to application servers only
- TLS 1.2+ required for all connections
- VPN or Direct Connect for admin access
- Strong password policy enforced
- No default/shared accounts
- Service accounts have minimal privileges
- Certificate authentication for admin accounts
- MFA for privileged database access
- Role-based access control implemented
- Least privilege enforced
- No direct table access (views/functions only)
- Row-level security for multi-tenant
- Encryption at rest enabled (TDE)
- Encryption in transit (TLS)
- Field-level encryption for sensitive PHI
- Encryption keys in KMS (not database)
- Key rotation configured
- All logins logged
- All queries logged (or sampled)
- PHI access specifically logged
- Privileged actions logged
- Logs shipped to SIEM
- Logs retained 6+ years
- Backups encrypted
- Backup keys different from data keys
- Geographic redundancy
- Retention meets HIPAA (6 years)
- Regular restore testing
- Backup access restricted
- Failed login alerts
- Privilege escalation alerts
- After-hours access alerts
- High-volume query alerts
- Schema change alerts
- Patch management process defined
- Security patches within 30 days
- Critical patches within 72 hours
- Patching tested in staging first
Practical Exercises
Exercise 1: Implement Field-Level Encryption
Exercise Instructions
Exercise Instructions
Objective: Implement field-level encryption for a patient table in PostgreSQL.Requirements:Deliverables:
- Create a patients table with encrypted PHI fields
- Implement blind indexes for searchable fields (SSN, MRN)
- Create functions for secure insert, update, and query
- Test search functionality on encrypted data
Copy
# Start with this schema and add encryption
class Patient:
id: str
mrn: str # Searchable
ssn: str # Searchable
first_name: str
last_name: str
date_of_birth: date
address: dict
phone: str
email: str
# Your implementation should:
# 1. Encrypt all PHI fields before storage
# 2. Create blind indexes for mrn and ssn
# 3. Allow searching by mrn or ssn
# 4. Decrypt on read
- SQL for table creation
- Insert function with encryption
- Search function using blind indexes
- Update function
- Test queries demonstrating functionality
Exercise 2: Configure Row-Level Security
Exercise Instructions
Exercise Instructions
Objective: Implement RLS policies for a multi-provider clinic.Scenario:
- Multiple providers (physicians, nurses)
- Patients assigned to specific providers
- Providers should only see their patients
- Break-glass for emergencies
- Create user context table
- Implement RLS policies on patient table
- Create break-glass function with audit
- Test with multiple users
- Provider A can see their patients
- Provider A cannot see Provider B’s patients
- Break-glass allows temporary access
- All access is logged
Exercise 3: Build Backup Automation
Exercise Instructions
Exercise Instructions
Objective: Create automated, encrypted backup system.Requirements:
- Full backup weekly, incremental daily
- Encrypt with AES-256
- Upload to S3 with server-side encryption
- Verify backup integrity
- Rotate based on retention policy
- Backup script
- Verification script
- Retention management script
- Cron schedule
- Monitoring/alerting
Key Takeaways
Defense in Depth
Multiple security layers: network, authentication, encryption, audit
Encrypt Everything
TDE for storage, TLS for transit, field-level for PHI
Least Privilege
RBAC + RLS ensures minimum necessary access
Audit Everything
Log all access, especially to PHI