Skip to main content

Track 5: Database Security for Healthcare

Databases are where PHI lives. This module covers comprehensive database security strategies from transparent data encryption to field-level encryption, secure query patterns, and HIPAA-compliant backup strategies.
Why This Matters: Database breaches account for some of the largest HIPAA violations. A single misconfigured database can expose millions of patient records.

What You’ll Master

Transparent Data Encryption

Encrypt entire databases without application changes

Field-Level Encryption

Protect specific PHI fields with application-level encryption

Secure Query Patterns

Query encrypted data without exposing plaintext

Backup Security

HIPAA-compliant backup and disaster recovery

Part 1: Database Security Architecture

Defense in Depth for Databases

┌─────────────────────────────────────────────────────────────────┐
│                    DEFENSE IN DEPTH                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Layer 1: Network Security                                       │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │ • VPC/Private Subnets    • Security Groups                  ││
│  │ • No public access       • Network ACLs                     ││
│  │ • TLS 1.2+ only          • VPN/Direct Connect               ││
│  └─────────────────────────────────────────────────────────────┘│
│                                                                  │
│  Layer 2: Authentication & Authorization                         │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │ • Strong password policies  • Certificate authentication   ││
│  │ • Role-based access         • Least privilege principle    ││
│  │ • No shared accounts        • Regular access reviews       ││
│  └─────────────────────────────────────────────────────────────┘│
│                                                                  │
│  Layer 3: Encryption at Rest                                     │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │ • Transparent Data Encryption (TDE)                         ││
│  │ • Encrypted tablespaces                                     ││
│  │ • Encrypted backups                                         ││
│  └─────────────────────────────────────────────────────────────┘│
│                                                                  │
│  Layer 4: Field-Level Encryption                                 │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │ • Application-level encryption for sensitive fields         ││
│  │ • Column encryption for specific PHI                        ││
│  │ • Client-side field encryption (CSFLE)                      ││
│  └─────────────────────────────────────────────────────────────┘│
│                                                                  │
│  Layer 5: Audit & Monitoring                                     │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │ • Query logging          • Access monitoring                ││
│  │ • Anomaly detection      • Privileged activity tracking     ││
│  │ • Change data capture    • Real-time alerts                 ││
│  └─────────────────────────────────────────────────────────────┘│
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

HIPAA Requirements for Databases

RequirementImplementationVerification Method
Access Control (§164.312(a))Role-based access, authenticationAccess reviews, audit logs
Audit Controls (§164.312(b))Database activity monitoringLog review, SIEM integration
Integrity (§164.312(c))Checksums, change trackingIntegrity verification scripts
Transmission Security (§164.312(e))TLS, encrypted connectionsNetwork scans, config review
Encryption (§164.312(a)(2)(iv))TDE, field encryptionEncryption verification

Part 2: PostgreSQL Security for Healthcare

Transparent Data Encryption with PostgreSQL

import os
import subprocess
from dataclasses import dataclass
from typing import Optional, List
from enum import Enum

class EncryptionMethod(Enum):
    """PostgreSQL encryption methods"""
    PGCRYPTO = "pgcrypto"
    TDE_ENTERPRISE = "tde_enterprise"  # EDB Postgres
    AWS_RDS_ENCRYPTION = "aws_rds"

@dataclass
class PostgresSecurityConfig:
    """PostgreSQL security configuration for HIPAA"""
    
    # Connection settings
    host: str
    port: int = 5432
    database: str = "healthcare_db"
    ssl_mode: str = "verify-full"  # require, verify-ca, verify-full
    ssl_cert_path: Optional[str] = None
    ssl_key_path: Optional[str] = None
    ssl_root_cert: Optional[str] = None
    
    # Authentication
    auth_method: str = "scram-sha-256"  # md5 is deprecated
    password_encryption: str = "scram-sha-256"
    
    # Encryption
    encryption_method: EncryptionMethod = EncryptionMethod.PGCRYPTO
    
    # Audit settings
    pgaudit_enabled: bool = True
    log_statement: str = "all"  # none, ddl, mod, all
    log_connections: bool = True
    log_disconnections: bool = True
    
    def generate_postgresql_conf(self) -> str:
        """Generate security-focused postgresql.conf settings"""
        return f"""
# HIPAA-Compliant PostgreSQL Configuration

# SSL Configuration
ssl = on
ssl_cert_file = '{self.ssl_cert_path}'
ssl_key_file = '{self.ssl_key_path}'
ssl_ca_file = '{self.ssl_root_cert}'
ssl_min_protocol_version = 'TLSv1.2'
ssl_ciphers = 'HIGH:!aNULL:!MD5'

# Authentication
password_encryption = '{self.password_encryption}'

# Connection Security
log_connections = {'on' if self.log_connections else 'off'}
log_disconnections = {'on' if self.log_disconnections else 'off'}

# Audit Logging
log_statement = '{self.log_statement}'
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
log_checkpoints = on
log_lock_waits = on

# pgAudit Configuration (if enabled)
{"shared_preload_libraries = 'pgaudit'" if self.pgaudit_enabled else ""}
{"pgaudit.log = 'read, write, ddl'" if self.pgaudit_enabled else ""}
{"pgaudit.log_catalog = off" if self.pgaudit_enabled else ""}
{"pgaudit.log_parameter = on" if self.pgaudit_enabled else ""}
"""
    
    def generate_pg_hba_conf(self) -> str:
        """Generate secure pg_hba.conf"""
        return f"""
# HIPAA-Compliant pg_hba.conf

# TYPE  DATABASE        USER            ADDRESS                 METHOD

# Reject all by default
local   all             all                                     reject
host    all             all             0.0.0.0/0               reject
host    all             all             ::/0                    reject

# Allow specific secure connections
# Application connections (require SSL + SCRAM)
hostssl {self.database}  app_user        10.0.0.0/8              {self.auth_method}
hostssl {self.database}  app_user        172.16.0.0/12           {self.auth_method}

# Admin connections (require SSL + certificate)
hostssl all             admin_user       10.0.1.0/24             cert

# Replication (require SSL + certificate)
hostssl replication     repl_user        10.0.2.0/24             cert

# Local socket for maintenance (peer auth only)
local   all             postgres                                 peer
"""

class PostgresPHIEncryption:
    """Field-level encryption for PHI using pgcrypto"""
    
    def __init__(self, connection):
        self.conn = connection
        self._ensure_pgcrypto()
    
    def _ensure_pgcrypto(self):
        """Enable pgcrypto extension"""
        with self.conn.cursor() as cur:
            cur.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto;")
            self.conn.commit()
    
    def create_encrypted_table(self) -> str:
        """Create a table with encrypted PHI columns"""
        return """
-- Table for storing encrypted patient data
CREATE TABLE patients (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    mrn VARCHAR(20) NOT NULL UNIQUE,  -- Medical Record Number (not PHI by itself)
    
    -- Encrypted PHI fields (stored as bytea)
    first_name_encrypted BYTEA NOT NULL,
    last_name_encrypted BYTEA NOT NULL,
    ssn_encrypted BYTEA NOT NULL,
    date_of_birth_encrypted BYTEA NOT NULL,
    address_encrypted BYTEA NOT NULL,
    phone_encrypted BYTEA NOT NULL,
    email_encrypted BYTEA NOT NULL,
    medical_conditions_encrypted BYTEA NOT NULL,
    
    -- Metadata (not PHI)
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    created_by UUID NOT NULL,
    
    -- Search indexes (using blind indexes)
    ssn_hash BYTEA NOT NULL,  -- For exact match search
    name_hash BYTEA NOT NULL  -- For exact match search
);

-- Index on blind indexes
CREATE INDEX idx_patients_ssn_hash ON patients(ssn_hash);
CREATE INDEX idx_patients_name_hash ON patients(name_hash);

-- Audit trigger
CREATE TRIGGER audit_patients
    AFTER INSERT OR UPDATE OR DELETE ON patients
    FOR EACH ROW EXECUTE FUNCTION audit_trigger();
"""
    
    def encrypt_insert_function(self) -> str:
        """SQL function to insert with encryption"""
        return """
-- Function to insert patient with encryption
CREATE OR REPLACE FUNCTION insert_patient(
    p_mrn VARCHAR(20),
    p_first_name TEXT,
    p_last_name TEXT,
    p_ssn TEXT,
    p_dob DATE,
    p_address TEXT,
    p_phone TEXT,
    p_email TEXT,
    p_conditions TEXT,
    p_encryption_key TEXT,
    p_created_by UUID
) RETURNS UUID AS $$
DECLARE
    v_patient_id UUID;
BEGIN
    INSERT INTO patients (
        mrn,
        first_name_encrypted,
        last_name_encrypted,
        ssn_encrypted,
        date_of_birth_encrypted,
        address_encrypted,
        phone_encrypted,
        email_encrypted,
        medical_conditions_encrypted,
        ssn_hash,
        name_hash,
        created_by
    ) VALUES (
        p_mrn,
        pgp_sym_encrypt(p_first_name, p_encryption_key),
        pgp_sym_encrypt(p_last_name, p_encryption_key),
        pgp_sym_encrypt(p_ssn, p_encryption_key),
        pgp_sym_encrypt(p_dob::TEXT, p_encryption_key),
        pgp_sym_encrypt(p_address, p_encryption_key),
        pgp_sym_encrypt(p_phone, p_encryption_key),
        pgp_sym_encrypt(p_email, p_encryption_key),
        pgp_sym_encrypt(p_conditions, p_encryption_key),
        digest(p_ssn, 'sha256'),
        digest(LOWER(p_first_name || p_last_name), 'sha256'),
        p_created_by
    )
    RETURNING id INTO v_patient_id;
    
    RETURN v_patient_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
"""
    
    def decrypt_select_function(self) -> str:
        """SQL function to select with decryption"""
        return """
-- Function to get patient with decryption
CREATE OR REPLACE FUNCTION get_patient(
    p_patient_id UUID,
    p_encryption_key TEXT
) RETURNS TABLE (
    id UUID,
    mrn VARCHAR(20),
    first_name TEXT,
    last_name TEXT,
    ssn TEXT,
    date_of_birth DATE,
    address TEXT,
    phone TEXT,
    email TEXT,
    medical_conditions TEXT
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        patients.id,
        patients.mrn,
        pgp_sym_decrypt(first_name_encrypted, p_encryption_key)::TEXT,
        pgp_sym_decrypt(last_name_encrypted, p_encryption_key)::TEXT,
        pgp_sym_decrypt(ssn_encrypted, p_encryption_key)::TEXT,
        pgp_sym_decrypt(date_of_birth_encrypted, p_encryption_key)::DATE,
        pgp_sym_decrypt(address_encrypted, p_encryption_key)::TEXT,
        pgp_sym_decrypt(phone_encrypted, p_encryption_key)::TEXT,
        pgp_sym_decrypt(email_encrypted, p_encryption_key)::TEXT,
        pgp_sym_decrypt(medical_conditions_encrypted, p_encryption_key)::TEXT
    FROM patients
    WHERE patients.id = p_patient_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- Grant execute to application role only
REVOKE ALL ON FUNCTION get_patient FROM PUBLIC;
GRANT EXECUTE ON FUNCTION get_patient TO app_role;
"""
    
    def search_by_ssn_function(self) -> str:
        """Search encrypted data using blind index"""
        return """
-- Search patient by SSN using blind index
CREATE OR REPLACE FUNCTION search_patient_by_ssn(
    p_ssn TEXT,
    p_encryption_key TEXT
) RETURNS TABLE (
    id UUID,
    mrn VARCHAR(20),
    first_name TEXT,
    last_name TEXT
) AS $$
BEGIN
    -- First, find by blind index
    -- Then decrypt only matching records
    RETURN QUERY
    SELECT 
        patients.id,
        patients.mrn,
        pgp_sym_decrypt(first_name_encrypted, p_encryption_key)::TEXT,
        pgp_sym_decrypt(last_name_encrypted, p_encryption_key)::TEXT
    FROM patients
    WHERE ssn_hash = digest(p_ssn, 'sha256');
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
"""

PostgreSQL Row-Level Security

class PostgresRowLevelSecurity:
    """Implement HIPAA access controls with Row-Level Security"""
    
    def create_rls_policies(self) -> str:
        """Create RLS policies for patient data"""
        return """
-- Enable Row-Level Security on patient tables
ALTER TABLE patients ENABLE ROW LEVEL SECURITY;
ALTER TABLE encounters ENABLE ROW LEVEL SECURITY;
ALTER TABLE clinical_notes ENABLE ROW LEVEL SECURITY;

-- Create security context table
CREATE TABLE user_access_context (
    user_id UUID PRIMARY KEY,
    department_id UUID,
    role_name VARCHAR(50),
    assigned_patients UUID[],
    is_treating_provider BOOLEAN DEFAULT FALSE,
    has_break_glass BOOLEAN DEFAULT FALSE,
    break_glass_expiry TIMESTAMP WITH TIME ZONE
);

-- Policy: Users can only see their assigned patients
CREATE POLICY patient_access_policy ON patients
    FOR ALL
    TO app_role
    USING (
        -- Check if user has direct assignment
        id = ANY(
            SELECT unnest(assigned_patients) 
            FROM user_access_context 
            WHERE user_id = current_setting('app.current_user_id')::UUID
        )
        OR
        -- Check if user has break-glass access
        (
            SELECT has_break_glass AND break_glass_expiry > CURRENT_TIMESTAMP
            FROM user_access_context
            WHERE user_id = current_setting('app.current_user_id')::UUID
        )
        OR
        -- Admins can see all (for emergencies)
        (
            SELECT role_name = 'admin'
            FROM user_access_context
            WHERE user_id = current_setting('app.current_user_id')::UUID
        )
    );

-- Policy: Clinical notes access based on role
CREATE POLICY clinical_notes_access ON clinical_notes
    FOR SELECT
    TO app_role
    USING (
        -- Author can always see their notes
        author_id = current_setting('app.current_user_id')::UUID
        OR
        -- Treating providers can see notes for their patients
        patient_id IN (
            SELECT unnest(assigned_patients)
            FROM user_access_context
            WHERE user_id = current_setting('app.current_user_id')::UUID
            AND is_treating_provider = TRUE
        )
        OR
        -- Quality/Compliance can see de-identified notes
        (
            SELECT role_name IN ('quality', 'compliance')
            FROM user_access_context
            WHERE user_id = current_setting('app.current_user_id')::UUID
        )
    );

-- Function to set user context (call at session start)
CREATE OR REPLACE FUNCTION set_user_context(p_user_id UUID)
RETURNS VOID AS $$
BEGIN
    -- Set session variable for RLS policies
    PERFORM set_config('app.current_user_id', p_user_id::TEXT, FALSE);
    
    -- Log the context change
    INSERT INTO access_log (user_id, action, timestamp)
    VALUES (p_user_id, 'SESSION_START', CURRENT_TIMESTAMP);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- Function to enable break-glass access
CREATE OR REPLACE FUNCTION enable_break_glass(
    p_user_id UUID,
    p_patient_id UUID,
    p_reason TEXT,
    p_duration_minutes INT DEFAULT 60
) RETURNS BOOLEAN AS $$
DECLARE
    v_expiry TIMESTAMP WITH TIME ZONE;
BEGIN
    v_expiry := CURRENT_TIMESTAMP + (p_duration_minutes || ' minutes')::INTERVAL;
    
    -- Update user context with break-glass
    UPDATE user_access_context
    SET 
        has_break_glass = TRUE,
        break_glass_expiry = v_expiry,
        assigned_patients = array_append(assigned_patients, p_patient_id)
    WHERE user_id = p_user_id;
    
    -- Log break-glass activation (CRITICAL audit event)
    INSERT INTO break_glass_log (
        user_id,
        patient_id,
        reason,
        activated_at,
        expires_at,
        ip_address
    ) VALUES (
        p_user_id,
        p_patient_id,
        p_reason,
        CURRENT_TIMESTAMP,
        v_expiry,
        inet_client_addr()
    );
    
    -- Send immediate notification to security team
    PERFORM pg_notify('break_glass_alert', json_build_object(
        'user_id', p_user_id,
        'patient_id', p_patient_id,
        'reason', p_reason,
        'timestamp', CURRENT_TIMESTAMP
    )::TEXT);
    
    RETURN TRUE;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
"""

PostgreSQL Audit Configuration

class PostgresAuditSetup:
    """Configure comprehensive audit logging for HIPAA"""
    
    def create_audit_infrastructure(self) -> str:
        """Create audit tables and triggers"""
        return """
-- Audit log table (high-performance, partitioned by month)
CREATE TABLE audit_log (
    id BIGSERIAL,
    event_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    user_id UUID,
    user_name TEXT,
    client_addr INET,
    action TEXT NOT NULL,
    table_name TEXT,
    record_id UUID,
    old_values JSONB,
    new_values JSONB,
    query_text TEXT,
    application_name TEXT
) PARTITION BY RANGE (event_time);

-- Create partitions for next 12 months
DO $$
DECLARE
    start_date DATE := DATE_TRUNC('month', CURRENT_DATE);
    partition_date DATE;
    partition_name TEXT;
BEGIN
    FOR i IN 0..11 LOOP
        partition_date := start_date + (i || ' months')::INTERVAL;
        partition_name := 'audit_log_' || TO_CHAR(partition_date, 'YYYY_MM');
        
        EXECUTE format(
            'CREATE TABLE IF NOT EXISTS %I PARTITION OF audit_log
             FOR VALUES FROM (%L) TO (%L)',
            partition_name,
            partition_date,
            partition_date + INTERVAL '1 month'
        );
    END LOOP;
END $$;

-- Index for common queries
CREATE INDEX idx_audit_user ON audit_log(user_id, event_time);
CREATE INDEX idx_audit_table ON audit_log(table_name, event_time);
CREATE INDEX idx_audit_record ON audit_log(record_id, event_time);

-- Generic audit trigger function
CREATE OR REPLACE FUNCTION audit_trigger()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO audit_log (
        user_id,
        user_name,
        client_addr,
        action,
        table_name,
        record_id,
        old_values,
        new_values,
        query_text,
        application_name
    ) VALUES (
        NULLIF(current_setting('app.current_user_id', TRUE), '')::UUID,
        current_user,
        inet_client_addr(),
        TG_OP,
        TG_TABLE_NAME,
        COALESCE(NEW.id, OLD.id),
        CASE WHEN TG_OP IN ('UPDATE', 'DELETE') THEN to_jsonb(OLD) END,
        CASE WHEN TG_OP IN ('INSERT', 'UPDATE') THEN to_jsonb(NEW) END,
        current_query(),
        current_setting('application_name')
    );
    
    RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;

-- PHI access log (specifically for patient record access)
CREATE TABLE phi_access_log (
    id BIGSERIAL PRIMARY KEY,
    access_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    user_id UUID NOT NULL,
    patient_id UUID NOT NULL,
    access_type VARCHAR(20) NOT NULL, -- 'view', 'edit', 'print', 'export'
    data_elements TEXT[],  -- Which fields were accessed
    purpose TEXT,
    break_glass_used BOOLEAN DEFAULT FALSE,
    session_id TEXT,
    client_ip INET,
    client_user_agent TEXT
);

-- Function to log PHI access (call from application)
CREATE OR REPLACE FUNCTION log_phi_access(
    p_user_id UUID,
    p_patient_id UUID,
    p_access_type VARCHAR(20),
    p_data_elements TEXT[],
    p_purpose TEXT DEFAULT NULL
) RETURNS VOID AS $$
BEGIN
    INSERT INTO phi_access_log (
        user_id,
        patient_id,
        access_type,
        data_elements,
        purpose,
        break_glass_used,
        session_id,
        client_ip
    ) VALUES (
        p_user_id,
        p_patient_id,
        p_access_type,
        p_data_elements,
        p_purpose,
        (
            SELECT has_break_glass AND break_glass_expiry > CURRENT_TIMESTAMP
            FROM user_access_context
            WHERE user_id = p_user_id
        ),
        current_setting('app.session_id', TRUE),
        inet_client_addr()
    );
END;
$$ LANGUAGE plpgsql;
"""
    
    def generate_audit_report(self) -> str:
        """Generate HIPAA-required audit reports"""
        return """
-- Accounting of Disclosures Report (required by HIPAA)
CREATE OR REPLACE FUNCTION generate_disclosure_report(
    p_patient_id UUID,
    p_start_date DATE DEFAULT CURRENT_DATE - INTERVAL '6 years',
    p_end_date DATE DEFAULT CURRENT_DATE
) RETURNS TABLE (
    disclosure_date TIMESTAMP WITH TIME ZONE,
    disclosed_to TEXT,
    purpose TEXT,
    data_disclosed TEXT
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        pal.access_time,
        u.full_name,
        COALESCE(pal.purpose, 'Treatment'),
        array_to_string(pal.data_elements, ', ')
    FROM phi_access_log pal
    JOIN users u ON u.id = pal.user_id
    WHERE pal.patient_id = p_patient_id
    AND pal.access_time BETWEEN p_start_date AND p_end_date
    ORDER BY pal.access_time DESC;
END;
$$ LANGUAGE plpgsql;

-- User Activity Report (for access reviews)
CREATE OR REPLACE FUNCTION user_activity_report(
    p_user_id UUID,
    p_days INT DEFAULT 30
) RETURNS TABLE (
    activity_date DATE,
    patients_accessed INT,
    records_viewed INT,
    records_modified INT,
    exports_performed INT,
    break_glass_count INT
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        DATE(access_time) AS activity_date,
        COUNT(DISTINCT patient_id)::INT AS patients_accessed,
        COUNT(*) FILTER (WHERE access_type = 'view')::INT AS records_viewed,
        COUNT(*) FILTER (WHERE access_type = 'edit')::INT AS records_modified,
        COUNT(*) FILTER (WHERE access_type = 'export')::INT AS exports_performed,
        COUNT(*) FILTER (WHERE break_glass_used = TRUE)::INT AS break_glass_count
    FROM phi_access_log
    WHERE user_id = p_user_id
    AND access_time > CURRENT_DATE - (p_days || ' days')::INTERVAL
    GROUP BY DATE(access_time)
    ORDER BY activity_date DESC;
END;
$$ LANGUAGE plpgsql;

-- Anomaly Detection Query
CREATE OR REPLACE FUNCTION detect_access_anomalies(
    p_days INT DEFAULT 7
) RETURNS TABLE (
    user_id UUID,
    user_name TEXT,
    anomaly_type TEXT,
    details TEXT,
    severity TEXT
) AS $$
BEGIN
    -- Detect after-hours access
    RETURN QUERY
    SELECT 
        pal.user_id,
        u.full_name,
        'After Hours Access'::TEXT,
        COUNT(*)::TEXT || ' records accessed between 10pm-6am',
        'MEDIUM'::TEXT
    FROM phi_access_log pal
    JOIN users u ON u.id = pal.user_id
    WHERE pal.access_time > CURRENT_DATE - (p_days || ' days')::INTERVAL
    AND EXTRACT(HOUR FROM pal.access_time) NOT BETWEEN 6 AND 22
    GROUP BY pal.user_id, u.full_name
    HAVING COUNT(*) > 10;
    
    -- Detect high-volume access
    RETURN QUERY
    SELECT 
        pal.user_id,
        u.full_name,
        'High Volume Access'::TEXT,
        COUNT(DISTINCT patient_id)::TEXT || ' unique patients in one day',
        'HIGH'::TEXT
    FROM phi_access_log pal
    JOIN users u ON u.id = pal.user_id
    WHERE pal.access_time > CURRENT_DATE - (p_days || ' days')::INTERVAL
    GROUP BY pal.user_id, u.full_name, DATE(pal.access_time)
    HAVING COUNT(DISTINCT patient_id) > 100;
    
    -- Detect excessive break-glass usage
    RETURN QUERY
    SELECT 
        pal.user_id,
        u.full_name,
        'Excessive Break-Glass'::TEXT,
        COUNT(*)::TEXT || ' break-glass accesses',
        'CRITICAL'::TEXT
    FROM phi_access_log pal
    JOIN users u ON u.id = pal.user_id
    WHERE pal.access_time > CURRENT_DATE - (p_days || ' days')::INTERVAL
    AND pal.break_glass_used = TRUE
    GROUP BY pal.user_id, u.full_name
    HAVING COUNT(*) > 5;
END;
$$ LANGUAGE plpgsql;
"""

Part 3: MongoDB Security for Healthcare

MongoDB Client-Side Field Level Encryption (CSFLE)

from pymongo import MongoClient
from pymongo.encryption import ClientEncryption
from pymongo.encryption_options import AutoEncryptionOpts
from bson.codec_options import CodecOptions
from bson.binary import STANDARD, UUID
import os
from dataclasses import dataclass
from typing import Dict, Any, Optional
import json

@dataclass
class MongoDBCSFLEConfig:
    """MongoDB CSFLE configuration for healthcare"""
    
    connection_string: str
    key_vault_namespace: str = "encryption.__keyVault"
    key_vault_database: str = "encryption"
    key_vault_collection: str = "__keyVault"
    
    # KMS provider (AWS KMS, Azure Key Vault, GCP KMS, or local for dev)
    kms_provider: str = "aws"  # aws, azure, gcp, local
    
    # AWS KMS settings
    aws_key_arn: str = ""
    aws_region: str = "us-east-1"
    
    def get_kms_providers(self) -> Dict[str, Any]:
        """Get KMS provider configuration"""
        if self.kms_provider == "aws":
            return {
                "aws": {
                    "accessKeyId": os.environ.get("AWS_ACCESS_KEY_ID"),
                    "secretAccessKey": os.environ.get("AWS_SECRET_ACCESS_KEY")
                }
            }
        elif self.kms_provider == "local":
            # LOCAL KEY ONLY FOR DEVELOPMENT
            # Generate with: secrets.token_bytes(96)
            local_master_key = os.environ.get("LOCAL_MASTER_KEY", "").encode()
            if len(local_master_key) != 96:
                raise ValueError("Local master key must be 96 bytes")
            return {"local": {"key": local_master_key}}
        else:
            raise ValueError(f"Unsupported KMS provider: {self.kms_provider}")

class HealthcareMongoDBClient:
    """MongoDB client with HIPAA-compliant encryption"""
    
    def __init__(self, config: MongoDBCSFLEConfig):
        self.config = config
        self._setup_encryption()
    
    def _setup_encryption(self):
        """Setup client-side field level encryption"""
        kms_providers = self.config.get_kms_providers()
        
        # Create key vault client
        key_vault_client = MongoClient(self.config.connection_string)
        
        # Create encryption helper
        self.client_encryption = ClientEncryption(
            kms_providers=kms_providers,
            key_vault_namespace=self.config.key_vault_namespace,
            key_vault_client=key_vault_client,
            codec_options=CodecOptions(uuid_representation=STANDARD)
        )
        
        # Create data encryption key if not exists
        self.data_key_id = self._get_or_create_data_key()
        
        # Define schema for automatic encryption
        self.schema_map = self._create_schema_map()
        
        # Create auto-encrypting client
        auto_encryption_opts = AutoEncryptionOpts(
            kms_providers=kms_providers,
            key_vault_namespace=self.config.key_vault_namespace,
            schema_map=self.schema_map,
            crypt_shared_lib_path="/path/to/crypt_shared"  # MongoDB crypt shared library
        )
        
        self.encrypted_client = MongoClient(
            self.config.connection_string,
            auto_encryption_opts=auto_encryption_opts
        )
    
    def _get_or_create_data_key(self) -> bytes:
        """Get existing or create new data encryption key"""
        key_vault_db = self.client_encryption._key_vault_coll.database
        key_vault_coll = key_vault_db[self.config.key_vault_collection]
        
        # Look for existing key
        existing_key = key_vault_coll.find_one({"keyAltNames": "healthcare_phi_key"})
        
        if existing_key:
            return existing_key["_id"]
        
        # Create new key
        if self.config.kms_provider == "aws":
            master_key = {
                "region": self.config.aws_region,
                "key": self.config.aws_key_arn
            }
        else:
            master_key = {}
        
        data_key_id = self.client_encryption.create_data_key(
            self.config.kms_provider,
            master_key=master_key,
            key_alt_names=["healthcare_phi_key"]
        )
        
        return data_key_id
    
    def _create_schema_map(self) -> Dict[str, Any]:
        """Create JSON schema for automatic field encryption"""
        return {
            "healthcare.patients": {
                "bsonType": "object",
                "encryptMetadata": {
                    "keyId": [self.data_key_id]
                },
                "properties": {
                    # PHI fields with deterministic encryption (searchable)
                    "ssn": {
                        "encrypt": {
                            "bsonType": "string",
                            "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
                        }
                    },
                    "mrn": {
                        "encrypt": {
                            "bsonType": "string",
                            "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
                        }
                    },
                    # PHI fields with random encryption (not searchable, more secure)
                    "firstName": {
                        "encrypt": {
                            "bsonType": "string",
                            "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                        }
                    },
                    "lastName": {
                        "encrypt": {
                            "bsonType": "string",
                            "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                        }
                    },
                    "dateOfBirth": {
                        "encrypt": {
                            "bsonType": "date",
                            "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                        }
                    },
                    "address": {
                        "encrypt": {
                            "bsonType": "object",
                            "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                        }
                    },
                    "phoneNumbers": {
                        "encrypt": {
                            "bsonType": "array",
                            "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                        }
                    },
                    "email": {
                        "encrypt": {
                            "bsonType": "string",
                            "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                        }
                    },
                    "medicalHistory": {
                        "encrypt": {
                            "bsonType": "object",
                            "algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
                        }
                    }
                }
            }
        }
    
    def insert_patient(self, patient_data: Dict[str, Any]) -> str:
        """Insert patient with automatic field encryption"""
        db = self.encrypted_client["healthcare"]
        collection = db["patients"]
        
        # Automatic encryption happens transparently
        result = collection.insert_one(patient_data)
        
        return str(result.inserted_id)
    
    def find_patient_by_ssn(self, ssn: str) -> Optional[Dict[str, Any]]:
        """Find patient by SSN (deterministic encryption allows equality search)"""
        db = self.encrypted_client["healthcare"]
        collection = db["patients"]
        
        # SSN is automatically encrypted before query
        # and result is automatically decrypted
        return collection.find_one({"ssn": ssn})
    
    def update_patient(self, patient_id: str, updates: Dict[str, Any]) -> bool:
        """Update patient with encrypted fields"""
        from bson.objectid import ObjectId
        
        db = self.encrypted_client["healthcare"]
        collection = db["patients"]
        
        result = collection.update_one(
            {"_id": ObjectId(patient_id)},
            {"$set": updates}
        )
        
        return result.modified_count > 0

# Example usage
def demonstrate_mongodb_csfle():
    """Show CSFLE in action"""
    
    config = MongoDBCSFLEConfig(
        connection_string="mongodb://localhost:27017",
        kms_provider="aws",
        aws_key_arn="arn:aws:kms:us-east-1:123456789:key/your-key-id",
        aws_region="us-east-1"
    )
    
    client = HealthcareMongoDBClient(config)
    
    # Insert patient - PHI is automatically encrypted
    patient = {
        "mrn": "MRN12345",
        "ssn": "123-45-6789",
        "firstName": "John",
        "lastName": "Doe",
        "dateOfBirth": "1985-03-15",
        "address": {
            "street": "123 Main St",
            "city": "Boston",
            "state": "MA",
            "zip": "02101"
        },
        "phoneNumbers": ["617-555-0123", "617-555-0124"],
        "email": "[email protected]",
        "medicalHistory": {
            "allergies": ["penicillin"],
            "conditions": ["hypertension", "diabetes type 2"]
        }
    }
    
    patient_id = client.insert_patient(patient)
    print(f"Inserted patient: {patient_id}")
    
    # Find by SSN - works because SSN uses deterministic encryption
    found = client.find_patient_by_ssn("123-45-6789")
    print(f"Found patient: {found['firstName']} {found['lastName']}")

MongoDB Role-Based Access Control

class MongoDBRBACSetup:
    """Setup HIPAA-compliant RBAC in MongoDB"""
    
    def __init__(self, admin_client: MongoClient):
        self.client = admin_client
    
    def create_hipaa_roles(self):
        """Create healthcare-specific roles"""
        
        admin_db = self.client.admin
        healthcare_db = self.client.healthcare
        
        # Role: Clinical Staff (Read patients they're assigned to)
        healthcare_db.command("createRole", "clinicalStaff", 
            privileges=[
                {
                    "resource": {"db": "healthcare", "collection": "patients"},
                    "actions": ["find"]
                },
                {
                    "resource": {"db": "healthcare", "collection": "encounters"},
                    "actions": ["find", "insert", "update"]
                },
                {
                    "resource": {"db": "healthcare", "collection": "clinicalNotes"},
                    "actions": ["find", "insert"]
                }
            ],
            roles=[]
        )
        
        # Role: Physician (Extended clinical access)
        healthcare_db.command("createRole", "physician",
            privileges=[
                {
                    "resource": {"db": "healthcare", "collection": "patients"},
                    "actions": ["find", "update"]
                },
                {
                    "resource": {"db": "healthcare", "collection": "encounters"},
                    "actions": ["find", "insert", "update"]
                },
                {
                    "resource": {"db": "healthcare", "collection": "clinicalNotes"},
                    "actions": ["find", "insert", "update"]
                },
                {
                    "resource": {"db": "healthcare", "collection": "orders"},
                    "actions": ["find", "insert", "update"]
                },
                {
                    "resource": {"db": "healthcare", "collection": "prescriptions"},
                    "actions": ["find", "insert"]
                }
            ],
            roles=[]
        )
        
        # Role: Billing Staff (Limited PHI access)
        healthcare_db.command("createRole", "billingStaff",
            privileges=[
                {
                    "resource": {"db": "healthcare", "collection": "patients"},
                    "actions": ["find"]  # Read-only, limited to billing-relevant fields via app
                },
                {
                    "resource": {"db": "healthcare", "collection": "encounters"},
                    "actions": ["find"]
                },
                {
                    "resource": {"db": "healthcare", "collection": "claims"},
                    "actions": ["find", "insert", "update"]
                },
                {
                    "resource": {"db": "healthcare", "collection": "payments"},
                    "actions": ["find", "insert"]
                }
            ],
            roles=[]
        )
        
        # Role: Auditor (Read-only access to audit logs)
        healthcare_db.command("createRole", "auditor",
            privileges=[
                {
                    "resource": {"db": "healthcare", "collection": "auditLog"},
                    "actions": ["find"]
                },
                {
                    "resource": {"db": "healthcare", "collection": "accessLog"},
                    "actions": ["find"]
                }
            ],
            roles=[]
        )
        
        # Role: Security Admin (Manage users and roles)
        admin_db.command("createRole", "securityAdmin",
            privileges=[
                {
                    "resource": {"db": "healthcare", "collection": ""},
                    "actions": ["createUser", "dropUser", "grantRole", "revokeRole", "viewUser"]
                }
            ],
            roles=["userAdmin"]
        )
    
    def create_users(self):
        """Create users with appropriate roles"""
        
        healthcare_db = self.client.healthcare
        
        # Create application service account
        healthcare_db.command("createUser", "healthcare_app",
            pwd="strong_password_here",  # Use secrets manager in production
            roles=[
                {"role": "clinicalStaff", "db": "healthcare"},
                {"role": "readWrite", "db": "healthcare"}
            ],
            mechanisms=["SCRAM-SHA-256"]
        )
        
        # Create read-only analytics account
        healthcare_db.command("createUser", "analytics_readonly",
            pwd="another_strong_password",
            roles=[
                {"role": "read", "db": "healthcare"}
            ],
            mechanisms=["SCRAM-SHA-256"]
        )
    
    def enable_audit_logging(self) -> Dict[str, Any]:
        """MongoDB audit log configuration (Enterprise feature)"""
        return {
            "auditLog": {
                "destination": "file",
                "format": "JSON",
                "path": "/var/log/mongodb/audit.json"
            },
            "setParameter": {
                "auditAuthorizationSuccess": True
            }
        }

Part 4: Backup Security

HIPAA-Compliant Backup Strategy

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from enum import Enum
import hashlib
import subprocess
import os

class BackupType(Enum):
    FULL = "full"
    INCREMENTAL = "incremental"
    DIFFERENTIAL = "differential"

class StorageTier(Enum):
    HOT = "hot"  # Immediate access
    WARM = "warm"  # Minutes to access
    COLD = "cold"  # Hours to access
    ARCHIVE = "archive"  # Days to access

@dataclass
class BackupPolicy:
    """HIPAA-compliant backup policy"""
    
    name: str
    database_name: str
    
    # Retention settings (HIPAA requires 6 years minimum)
    retention_days_hot: int = 30
    retention_days_warm: int = 90
    retention_days_cold: int = 365
    retention_days_archive: int = 2190  # 6 years
    
    # Backup schedule
    full_backup_frequency: str = "weekly"  # daily, weekly
    incremental_frequency: str = "hourly"
    
    # Security settings
    encryption_algorithm: str = "AES-256-GCM"
    encryption_key_id: str = ""  # KMS key ID
    
    # Verification settings
    verify_after_backup: bool = True
    test_restore_frequency: str = "monthly"
    
    # Locations
    primary_location: str = ""  # Primary backup location
    secondary_location: str = ""  # Geographic redundancy
    
    def validate(self) -> List[str]:
        """Validate backup policy meets HIPAA requirements"""
        issues = []
        
        # Check retention
        total_retention = (
            self.retention_days_hot + 
            self.retention_days_warm + 
            self.retention_days_cold + 
            self.retention_days_archive
        )
        if total_retention < 2190:  # 6 years
            issues.append(
                f"Total retention ({total_retention} days) is less than "
                f"HIPAA minimum (2190 days / 6 years)"
            )
        
        # Check encryption
        if self.encryption_algorithm not in ["AES-256-GCM", "AES-256-CBC"]:
            issues.append("Encryption algorithm must be AES-256")
        
        if not self.encryption_key_id:
            issues.append("Encryption key ID must be specified")
        
        # Check geographic redundancy
        if not self.secondary_location:
            issues.append("Secondary backup location required for disaster recovery")
        
        if self.primary_location == self.secondary_location:
            issues.append("Secondary location must be different from primary")
        
        return issues

@dataclass
class BackupRecord:
    """Record of a completed backup"""
    
    backup_id: str
    policy_name: str
    backup_type: BackupType
    start_time: datetime
    end_time: datetime
    size_bytes: int
    storage_location: str
    storage_tier: StorageTier
    encryption_key_id: str
    checksum_sha256: str
    verified: bool = False
    verification_time: Optional[datetime] = None

class PostgresBackupManager:
    """Manage HIPAA-compliant PostgreSQL backups"""
    
    def __init__(self, policy: BackupPolicy):
        self.policy = policy
        self.validate_policy()
    
    def validate_policy(self):
        """Validate backup policy"""
        issues = self.policy.validate()
        if issues:
            raise ValueError(f"Invalid backup policy: {issues}")
    
    def perform_full_backup(self) -> BackupRecord:
        """Perform encrypted full backup"""
        import uuid
        
        backup_id = str(uuid.uuid4())
        timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        backup_file = f"/backup/{self.policy.database_name}_full_{timestamp}.sql.gz.enc"
        
        start_time = datetime.utcnow()
        
        # Backup command with encryption
        # Using pg_dump -> gzip -> openssl encryption
        backup_cmd = f"""
        pg_dump -h localhost -U backup_user -d {self.policy.database_name} \
            --format=custom \
            --no-password \
            | gzip \
            | openssl enc -{self.policy.encryption_algorithm.lower().replace('-', '')} \
                -e \
                -pbkdf2 \
                -pass file:/etc/backup/key.txt \
                -out {backup_file}
        """
        
        # Execute backup
        result = subprocess.run(
            backup_cmd,
            shell=True,
            capture_output=True,
            text=True
        )
        
        if result.returncode != 0:
            raise Exception(f"Backup failed: {result.stderr}")
        
        end_time = datetime.utcnow()
        
        # Calculate checksum
        with open(backup_file, 'rb') as f:
            checksum = hashlib.sha256(f.read()).hexdigest()
        
        # Get file size
        size_bytes = os.path.getsize(backup_file)
        
        # Upload to primary location
        self._upload_to_storage(backup_file, self.policy.primary_location)
        
        # Upload to secondary location
        self._upload_to_storage(backup_file, self.policy.secondary_location)
        
        # Create backup record
        record = BackupRecord(
            backup_id=backup_id,
            policy_name=self.policy.name,
            backup_type=BackupType.FULL,
            start_time=start_time,
            end_time=end_time,
            size_bytes=size_bytes,
            storage_location=self.policy.primary_location,
            storage_tier=StorageTier.HOT,
            encryption_key_id=self.policy.encryption_key_id,
            checksum_sha256=checksum
        )
        
        # Verify if required
        if self.policy.verify_after_backup:
            record = self._verify_backup(record)
        
        return record
    
    def _upload_to_storage(self, local_path: str, remote_location: str):
        """Upload backup to storage (S3, Azure Blob, etc.)"""
        if remote_location.startswith("s3://"):
            # AWS S3 upload with server-side encryption
            cmd = f"""
            aws s3 cp {local_path} {remote_location}/ \
                --sse aws:kms \
                --sse-kms-key-id {self.policy.encryption_key_id} \
                --storage-class STANDARD_IA
            """
            subprocess.run(cmd, shell=True, check=True)
        else:
            raise ValueError(f"Unsupported storage location: {remote_location}")
    
    def _verify_backup(self, record: BackupRecord) -> BackupRecord:
        """Verify backup integrity and restorability"""
        # Download backup
        local_verify_path = f"/tmp/verify_{record.backup_id}.sql.gz"
        
        # Decrypt and decompress
        verify_cmd = f"""
        aws s3 cp {self.policy.primary_location}/{record.backup_id} - \
            | openssl enc -{self.policy.encryption_algorithm.lower().replace('-', '')} \
                -d \
                -pbkdf2 \
                -pass file:/etc/backup/key.txt \
            | gunzip \
            | pg_restore --list > /dev/null
        """
        
        result = subprocess.run(
            verify_cmd,
            shell=True,
            capture_output=True,
            text=True
        )
        
        record.verified = result.returncode == 0
        record.verification_time = datetime.utcnow()
        
        return record
    
    def perform_test_restore(self) -> Dict[str, any]:
        """Perform test restore to verify backups (required regularly)"""
        # This should restore to a test environment
        
        test_results = {
            "test_date": datetime.utcnow(),
            "backup_used": None,
            "restore_successful": False,
            "data_integrity_verified": False,
            "time_to_restore_seconds": 0,
            "issues_found": []
        }
        
        # Get latest backup
        # Restore to test database
        # Verify data integrity
        # Record results
        
        return test_results
    
    def apply_retention_policy(self):
        """Move backups between storage tiers based on age"""
        now = datetime.utcnow()
        
        # This would query your backup catalog and move files
        # between storage tiers based on age
        
        transitions = {
            StorageTier.HOT: StorageTier.WARM,
            StorageTier.WARM: StorageTier.COLD,
            StorageTier.COLD: StorageTier.ARCHIVE
        }
        
        age_thresholds = {
            StorageTier.HOT: timedelta(days=self.policy.retention_days_hot),
            StorageTier.WARM: timedelta(days=self.policy.retention_days_warm),
            StorageTier.COLD: timedelta(days=self.policy.retention_days_cold)
        }
        
        # Logic to transition backups between tiers
        pass
    
    def generate_backup_report(self) -> str:
        """Generate backup status report for compliance"""
        return """
BACKUP COMPLIANCE REPORT
========================

Policy: {policy_name}
Report Date: {date}

BACKUP STATUS
-------------
Last Full Backup: [DATE] (SUCCESS/FAILED)
Last Incremental: [DATE] (SUCCESS/FAILED)
Backups in Last 24h: X
Backups Verified: X/Y

STORAGE LOCATIONS
-----------------
Primary: {primary_location}
Secondary: {secondary_location}
Encryption: {encryption_algorithm}

RETENTION COMPLIANCE
--------------------
Hot Storage: X backups (Y days)
Warm Storage: X backups (Y days)
Cold Storage: X backups (Y days)
Archive: X backups (Y days)
Total Retention: Z days (COMPLIANT/NON-COMPLIANT)

TEST RESTORES
-------------
Last Test Restore: [DATE]
Result: SUCCESS/FAILED
Time to Restore: X minutes
Next Scheduled Test: [DATE]

ISSUES & ALERTS
---------------
[List any issues]
"""

Secure Backup Verification

1

Automated Integrity Checks

Verify backup checksums match after each backup completes
2

Monthly Test Restores

Restore to isolated environment and verify:
  • Data completeness
  • Data integrity
  • Application functionality
  • Time to restore meets RTO
3

Annual Full DR Test

Complete disaster recovery drill:
  • Restore to alternate site
  • Verify all systems operational
  • Test with actual users
  • Document lessons learned

Part 5: Database Security Checklist

Pre-Deployment Checklist

Network Security
  • Database in private subnet (no public IP)
  • Security group restricts to application servers only
  • TLS 1.2+ required for all connections
  • VPN or Direct Connect for admin access
Authentication
  • Strong password policy enforced
  • No default/shared accounts
  • Service accounts have minimal privileges
  • Certificate authentication for admin accounts
  • MFA for privileged database access
Authorization
  • Role-based access control implemented
  • Least privilege enforced
  • No direct table access (views/functions only)
  • Row-level security for multi-tenant
Encryption
  • Encryption at rest enabled (TDE)
  • Encryption in transit (TLS)
  • Field-level encryption for sensitive PHI
  • Encryption keys in KMS (not database)
  • Key rotation configured
Audit Logging
  • All logins logged
  • All queries logged (or sampled)
  • PHI access specifically logged
  • Privileged actions logged
  • Logs shipped to SIEM
  • Logs retained 6+ years
Backup Security
  • Backups encrypted
  • Backup keys different from data keys
  • Geographic redundancy
  • Retention meets HIPAA (6 years)
  • Regular restore testing
  • Backup access restricted
Monitoring
  • Failed login alerts
  • Privilege escalation alerts
  • After-hours access alerts
  • High-volume query alerts
  • Schema change alerts
Patching
  • Patch management process defined
  • Security patches within 30 days
  • Critical patches within 72 hours
  • Patching tested in staging first

Practical Exercises

Exercise 1: Implement Field-Level Encryption

Objective: Implement field-level encryption for a patient table in PostgreSQL.Requirements:
  1. Create a patients table with encrypted PHI fields
  2. Implement blind indexes for searchable fields (SSN, MRN)
  3. Create functions for secure insert, update, and query
  4. Test search functionality on encrypted data
Starter Code:
# Start with this schema and add encryption

class Patient:
    id: str
    mrn: str  # Searchable
    ssn: str  # Searchable
    first_name: str
    last_name: str
    date_of_birth: date
    address: dict
    phone: str
    email: str

# Your implementation should:
# 1. Encrypt all PHI fields before storage
# 2. Create blind indexes for mrn and ssn
# 3. Allow searching by mrn or ssn
# 4. Decrypt on read
Deliverables:
  1. SQL for table creation
  2. Insert function with encryption
  3. Search function using blind indexes
  4. Update function
  5. Test queries demonstrating functionality

Exercise 2: Configure Row-Level Security

Objective: Implement RLS policies for a multi-provider clinic.Scenario:
  • Multiple providers (physicians, nurses)
  • Patients assigned to specific providers
  • Providers should only see their patients
  • Break-glass for emergencies
Requirements:
  1. Create user context table
  2. Implement RLS policies on patient table
  3. Create break-glass function with audit
  4. Test with multiple users
Test Cases:
  • Provider A can see their patients
  • Provider A cannot see Provider B’s patients
  • Break-glass allows temporary access
  • All access is logged

Exercise 3: Build Backup Automation

Objective: Create automated, encrypted backup system.Requirements:
  1. Full backup weekly, incremental daily
  2. Encrypt with AES-256
  3. Upload to S3 with server-side encryption
  4. Verify backup integrity
  5. Rotate based on retention policy
Deliverables:
  1. Backup script
  2. Verification script
  3. Retention management script
  4. Cron schedule
  5. Monitoring/alerting

Key Takeaways

Defense in Depth

Multiple security layers: network, authentication, encryption, audit

Encrypt Everything

TDE for storage, TLS for transit, field-level for PHI

Least Privilege

RBAC + RLS ensures minimum necessary access

Audit Everything

Log all access, especially to PHI

Next Steps