Skip to main content

Audit Logging for HIPAA Compliance

Audit logging is a core requirement of the HIPAA Security Rule. Every access, modification, and disclosure of PHI must be logged, retained, and protected from tampering.
Learning Objectives:
  • Understand HIPAA audit requirements
  • Design comprehensive audit log schemas
  • Implement tamper-proof logging
  • Build real-time alerting systems
  • Meet retention and review requirements

Why Audit Logging Matters

Comprehensive audit logging architecture for HIPAA compliance

HIPAA Audit Logging Architecture

Audit logging is essential for HIPAA compliance:

Audit Log Schema Design

Core Audit Event Schema

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
import uuid
import hashlib
import json

class EventType(Enum):
    # Authentication events
    LOGIN_SUCCESS = "auth.login.success"
    LOGIN_FAILURE = "auth.login.failure"
    LOGOUT = "auth.logout"
    MFA_SUCCESS = "auth.mfa.success"
    MFA_FAILURE = "auth.mfa.failure"
    PASSWORD_CHANGE = "auth.password.change"
    
    # PHI Access events
    PHI_VIEW = "phi.view"
    PHI_CREATE = "phi.create"
    PHI_UPDATE = "phi.update"
    PHI_DELETE = "phi.delete"
    PHI_EXPORT = "phi.export"
    PHI_PRINT = "phi.print"
    PHI_COPY = "phi.copy"
    
    # Disclosure events
    DISCLOSURE_TREATMENT = "disclosure.treatment"
    DISCLOSURE_PAYMENT = "disclosure.payment"
    DISCLOSURE_OPERATIONS = "disclosure.operations"
    DISCLOSURE_AUTHORIZED = "disclosure.authorized"
    DISCLOSURE_REQUIRED = "disclosure.required"
    
    # System events
    CONFIG_CHANGE = "system.config.change"
    USER_CREATE = "system.user.create"
    USER_MODIFY = "system.user.modify"
    USER_DELETE = "system.user.delete"
    ROLE_CHANGE = "system.role.change"
    
    # Security events
    ACCESS_DENIED = "security.access.denied"
    INTRUSION_DETECTED = "security.intrusion"
    ENCRYPTION_KEY_ROTATE = "security.key.rotate"
    BREAK_GLASS = "security.break_glass"

class SensitivityLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class AuditEvent:
    """HIPAA-compliant audit event record"""
    
    # Unique identifier
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    
    # Timestamp (UTC, ISO 8601)
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
    
    # Event classification
    event_type: EventType = EventType.PHI_VIEW
    sensitivity: SensitivityLevel = SensitivityLevel.MEDIUM
    
    # Actor (who performed the action)
    actor_id: str = ""
    actor_type: str = "user"  # user, system, api_client
    actor_name: str = ""
    actor_role: str = ""
    actor_ip: str = ""
    actor_user_agent: str = ""
    actor_session_id: str = ""
    
    # Target (what was acted upon)
    resource_type: str = ""  # patient_record, prescription, lab_result
    resource_id: str = ""
    patient_id: Optional[str] = None  # If applicable
    
    # Action details
    action: str = ""  # read, write, delete, etc.
    outcome: str = "success"  # success, failure, error
    reason: Optional[str] = None  # For failures or break_glass
    
    # Request context
    request_id: str = ""
    correlation_id: str = ""
    
    # Data details (what changed)
    fields_accessed: List[str] = field(default_factory=list)
    old_values: Optional[Dict[str, Any]] = None  # For updates
    new_values: Optional[Dict[str, Any]] = None  # For updates
    
    # Integrity
    previous_hash: str = ""  # Hash of previous log entry
    signature: str = ""  # Digital signature
    
    def to_dict(self) -> dict:
        """Convert to dictionary for storage"""
        return {
            "event_id": self.event_id,
            "timestamp": self.timestamp,
            "event_type": self.event_type.value,
            "sensitivity": self.sensitivity.value,
            "actor": {
                "id": self.actor_id,
                "type": self.actor_type,
                "name": self.actor_name,
                "role": self.actor_role,
                "ip": self.actor_ip,
                "user_agent": self.actor_user_agent,
                "session_id": self.actor_session_id,
            },
            "target": {
                "resource_type": self.resource_type,
                "resource_id": self.resource_id,
                "patient_id": self.patient_id,
            },
            "action": {
                "type": self.action,
                "outcome": self.outcome,
                "reason": self.reason,
            },
            "context": {
                "request_id": self.request_id,
                "correlation_id": self.correlation_id,
            },
            "data": {
                "fields_accessed": self.fields_accessed,
                "old_values": self.old_values,
                "new_values": self.new_values,
            },
            "integrity": {
                "previous_hash": self.previous_hash,
                "signature": self.signature,
            },
        }
    
    def compute_hash(self) -> str:
        """Compute hash for integrity verification"""
        data = self.to_dict()
        del data["integrity"]  # Exclude integrity fields
        canonical = json.dumps(data, sort_keys=True)
        return hashlib.sha256(canonical.encode()).hexdigest()

Implementing the Audit Logger

Core Audit Service

import asyncio
from datetime import datetime, timedelta
from typing import Optional, List
import aiohttp
import json
import hmac
import hashlib
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.backends import default_backend

class AuditLogger:
    """
    HIPAA-compliant audit logging service
    
    Features:
    - Tamper-proof chain of hashes
    - Digital signatures
    - Async batch processing
    - Multiple storage backends
    - Real-time alerting
    """
    
    def __init__(
        self,
        storage_backend,
        signing_key: rsa.RSAPrivateKey,
        alert_service=None,
        batch_size: int = 100,
        flush_interval: float = 5.0,
    ):
        self.storage = storage_backend
        self.signing_key = signing_key
        self.alert_service = alert_service
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        
        self._buffer: List[AuditEvent] = []
        self._last_hash: str = ""
        self._lock = asyncio.Lock()
        self._flush_task: Optional[asyncio.Task] = None
        
    async def start(self):
        """Start the background flush task"""
        self._last_hash = await self.storage.get_last_hash()
        self._flush_task = asyncio.create_task(self._periodic_flush())
        
    async def stop(self):
        """Stop and flush remaining events"""
        if self._flush_task:
            self._flush_task.cancel()
        await self._flush()
        
    async def log(self, event: AuditEvent) -> str:
        """Log an audit event"""
        async with self._lock:
            # Chain to previous hash
            event.previous_hash = self._last_hash
            
            # Compute hash
            event_hash = event.compute_hash()
            self._last_hash = event_hash
            
            # Sign the event
            event.signature = self._sign_event(event_hash)
            
            # Add to buffer
            self._buffer.append(event)
            
            # Check for alerts
            if self.alert_service:
                await self._check_alerts(event)
            
            # Flush if buffer is full
            if len(self._buffer) >= self.batch_size:
                await self._flush()
                
        return event.event_id
    
    def _sign_event(self, event_hash: str) -> str:
        """Digitally sign the event hash"""
        signature = self.signing_key.sign(
            event_hash.encode(),
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        return signature.hex()
    
    async def _flush(self):
        """Flush buffer to storage"""
        if not self._buffer:
            return
            
        events = self._buffer.copy()
        self._buffer.clear()
        
        await self.storage.store_batch(events)
        
    async def _periodic_flush(self):
        """Periodically flush buffer"""
        while True:
            await asyncio.sleep(self.flush_interval)
            async with self._lock:
                await self._flush()
                
    async def _check_alerts(self, event: AuditEvent):
        """Check if event should trigger an alert"""
        alert_conditions = [
            # Multiple failed logins
            event.event_type == EventType.LOGIN_FAILURE,
            # Break glass access
            event.event_type == EventType.BREAK_GLASS,
            # Large data export
            event.event_type == EventType.PHI_EXPORT,
            # Access denied
            event.event_type == EventType.ACCESS_DENIED,
            # High sensitivity events
            event.sensitivity == SensitivityLevel.CRITICAL,
        ]
        
        if any(alert_conditions):
            await self.alert_service.send_alert(event)


class AuditStoragePostgres:
    """PostgreSQL storage backend for audit logs"""
    
    def __init__(self, connection_pool):
        self.pool = connection_pool
        
    async def store_batch(self, events: List[AuditEvent]):
        """Store batch of events atomically"""
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                for event in events:
                    await conn.execute("""
                        INSERT INTO audit_logs (
                            event_id, timestamp, event_type, sensitivity,
                            actor_data, target_data, action_data,
                            context_data, change_data, integrity_data
                        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
                    """,
                        event.event_id,
                        event.timestamp,
                        event.event_type.value,
                        event.sensitivity.value,
                        json.dumps(event.to_dict()["actor"]),
                        json.dumps(event.to_dict()["target"]),
                        json.dumps(event.to_dict()["action"]),
                        json.dumps(event.to_dict()["context"]),
                        json.dumps(event.to_dict()["data"]),
                        json.dumps(event.to_dict()["integrity"]),
                    )
                    
    async def get_last_hash(self) -> str:
        """Get the hash of the last event"""
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow("""
                SELECT integrity_data->>'previous_hash' as hash
                FROM audit_logs
                ORDER BY timestamp DESC
                LIMIT 1
            """)
            return row["hash"] if row else ""

Database Schema

-- PostgreSQL schema for HIPAA-compliant audit logs

-- Main audit log table (append-only)
CREATE TABLE audit_logs (
    id BIGSERIAL PRIMARY KEY,
    event_id UUID UNIQUE NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    event_type VARCHAR(100) NOT NULL,
    sensitivity VARCHAR(20) NOT NULL,
    
    -- JSONB for flexible querying
    actor_data JSONB NOT NULL,
    target_data JSONB NOT NULL,
    action_data JSONB NOT NULL,
    context_data JSONB NOT NULL,
    change_data JSONB,
    integrity_data JSONB NOT NULL,
    
    -- Partitioning key
    created_date DATE NOT NULL DEFAULT CURRENT_DATE
) PARTITION BY RANGE (created_date);

-- Create partitions for each month (6+ year retention)
CREATE TABLE audit_logs_2024_01 PARTITION OF audit_logs
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
-- ... create partitions for future months

-- Indexes for common queries
CREATE INDEX idx_audit_timestamp ON audit_logs (timestamp DESC);
CREATE INDEX idx_audit_event_type ON audit_logs (event_type);
CREATE INDEX idx_audit_actor_id ON audit_logs ((actor_data->>'id'));
CREATE INDEX idx_audit_patient_id ON audit_logs ((target_data->>'patient_id'));
CREATE INDEX idx_audit_resource ON audit_logs (
    (target_data->>'resource_type'),
    (target_data->>'resource_id')
);

-- Prevent modifications (append-only)
CREATE OR REPLACE FUNCTION prevent_audit_modification()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'UPDATE' THEN
        RAISE EXCEPTION 'Audit logs cannot be modified';
    ELSIF TG_OP = 'DELETE' THEN
        RAISE EXCEPTION 'Audit logs cannot be deleted';
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER audit_immutable
    BEFORE UPDATE OR DELETE ON audit_logs
    FOR EACH ROW
    EXECUTE FUNCTION prevent_audit_modification();

-- Grant read-only access to auditors
CREATE ROLE auditor;
GRANT SELECT ON audit_logs TO auditor;

-- Separate table for integrity verification
CREATE TABLE audit_integrity_checkpoints (
    id SERIAL PRIMARY KEY,
    checkpoint_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    last_event_id UUID NOT NULL,
    computed_hash VARCHAR(64) NOT NULL,
    verified_by VARCHAR(100) NOT NULL,
    verification_result BOOLEAN NOT NULL
);

FastAPI Integration

Audit Middleware

from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from contextvars import ContextVar
import uuid

# Context variable for request tracking
request_context: ContextVar[dict] = ContextVar('request_context', default={})

class AuditMiddleware(BaseHTTPMiddleware):
    """Middleware to capture request context for audit logging"""
    
    def __init__(self, app, audit_logger: AuditLogger):
        super().__init__(app)
        self.audit_logger = audit_logger
        
    async def dispatch(self, request: Request, call_next):
        # Generate request ID
        request_id = str(uuid.uuid4())
        correlation_id = request.headers.get("X-Correlation-ID", request_id)
        
        # Extract actor information
        actor_context = {
            "request_id": request_id,
            "correlation_id": correlation_id,
            "ip": request.client.host if request.client else "unknown",
            "user_agent": request.headers.get("User-Agent", ""),
            "method": request.method,
            "path": request.url.path,
        }
        
        # Set context for downstream use
        request_context.set(actor_context)
        
        # Add request ID to response headers
        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        
        return response


class PHIAccessLogger:
    """Decorator for logging PHI access in endpoints"""
    
    def __init__(self, audit_logger: AuditLogger):
        self.audit_logger = audit_logger
        
    def log_access(
        self,
        resource_type: str,
        action: str = "read",
        sensitivity: SensitivityLevel = SensitivityLevel.MEDIUM,
    ):
        def decorator(func):
            async def wrapper(*args, **kwargs):
                # Get current user from dependency injection
                current_user = kwargs.get("current_user")
                ctx = request_context.get()
                
                # Get resource ID from path parameters
                resource_id = kwargs.get("patient_id") or kwargs.get("record_id")
                
                # Create audit event
                event = AuditEvent(
                    event_type=EventType.PHI_VIEW if action == "read" else EventType.PHI_UPDATE,
                    sensitivity=sensitivity,
                    actor_id=str(current_user.id) if current_user else "anonymous",
                    actor_type="user",
                    actor_name=current_user.email if current_user else "",
                    actor_role=current_user.role if current_user else "",
                    actor_ip=ctx.get("ip", ""),
                    actor_user_agent=ctx.get("user_agent", ""),
                    resource_type=resource_type,
                    resource_id=str(resource_id) if resource_id else "",
                    patient_id=str(kwargs.get("patient_id", "")),
                    action=action,
                    request_id=ctx.get("request_id", ""),
                    correlation_id=ctx.get("correlation_id", ""),
                )
                
                try:
                    result = await func(*args, **kwargs)
                    event.outcome = "success"
                    
                    # Log accessed fields if result is a model
                    if hasattr(result, "__dict__"):
                        event.fields_accessed = list(result.__dict__.keys())
                        
                except Exception as e:
                    event.outcome = "error"
                    event.reason = str(e)
                    raise
                finally:
                    await self.audit_logger.log(event)
                    
                return result
            return wrapper
        return decorator


# Usage in FastAPI endpoints
from fastapi import Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()
audit_logger = AuditLogger(...)  # Initialize with config
phi_logger = PHIAccessLogger(audit_logger)

@app.get("/patients/{patient_id}")
@phi_logger.log_access(resource_type="patient_record", action="read")
async def get_patient(
    patient_id: int,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """Get patient record with automatic audit logging"""
    patient = await db.get(Patient, patient_id)
    if not patient:
        raise HTTPException(404, "Patient not found")
    return patient


@app.put("/patients/{patient_id}")
@phi_logger.log_access(
    resource_type="patient_record",
    action="update",
    sensitivity=SensitivityLevel.HIGH
)
async def update_patient(
    patient_id: int,
    update_data: PatientUpdate,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """Update patient record with change tracking"""
    patient = await db.get(Patient, patient_id)
    if not patient:
        raise HTTPException(404, "Patient not found")
    
    # Store old values for audit
    old_values = patient.to_dict()
    
    # Apply updates
    for field, value in update_data.dict(exclude_unset=True).items():
        setattr(patient, field, value)
    
    await db.commit()
    
    # Log changes (handled by decorator + context)
    ctx = request_context.get()
    ctx["old_values"] = old_values
    ctx["new_values"] = patient.to_dict()
    
    return patient

Log Integrity Verification

Verification Service

class AuditIntegrityVerifier:
    """
    Verify integrity of audit log chain
    
    Detects:
    - Modified entries
    - Deleted entries
    - Inserted entries
    - Chain breaks
    """
    
    def __init__(self, storage, public_key):
        self.storage = storage
        self.public_key = public_key
        
    async def verify_chain(
        self,
        start_time: datetime,
        end_time: datetime
    ) -> dict:
        """Verify integrity of audit log chain"""
        results = {
            "verified": True,
            "events_checked": 0,
            "errors": [],
            "warnings": [],
        }
        
        events = await self.storage.get_events_in_range(start_time, end_time)
        
        previous_hash = ""
        for i, event in enumerate(events):
            results["events_checked"] += 1
            
            # Verify chain linkage
            if event.previous_hash != previous_hash:
                results["verified"] = False
                results["errors"].append({
                    "event_id": event.event_id,
                    "error": "Chain break detected",
                    "expected_hash": previous_hash,
                    "found_hash": event.previous_hash,
                })
                
            # Verify hash computation
            computed_hash = event.compute_hash()
            if computed_hash != self._extract_hash_from_signature(event):
                results["verified"] = False
                results["errors"].append({
                    "event_id": event.event_id,
                    "error": "Hash mismatch - possible tampering",
                })
                
            # Verify signature
            if not self._verify_signature(event):
                results["verified"] = False
                results["errors"].append({
                    "event_id": event.event_id,
                    "error": "Invalid signature",
                })
                
            previous_hash = computed_hash
            
        return results
    
    def _verify_signature(self, event: AuditEvent) -> bool:
        """Verify event signature using public key"""
        try:
            event_hash = event.compute_hash()
            signature = bytes.fromhex(event.signature)
            
            self.public_key.verify(
                signature,
                event_hash.encode(),
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            return True
        except Exception:
            return False
            
    async def run_daily_verification(self):
        """Scheduled daily verification"""
        yesterday = datetime.utcnow() - timedelta(days=1)
        today = datetime.utcnow()
        
        results = await self.verify_chain(yesterday, today)
        
        # Store verification result
        await self.storage.store_checkpoint({
            "checkpoint_time": datetime.utcnow(),
            "verified_by": "automated_daily_check",
            "verification_result": results["verified"],
            "details": results,
        })
        
        # Alert on failures
        if not results["verified"]:
            await self.alert_service.critical_alert(
                "Audit Log Integrity Failure",
                results
            )

Real-time Alerting

Alert Configuration

from dataclasses import dataclass
from typing import List, Callable
import asyncio

@dataclass
class AlertRule:
    """Defines when to trigger an alert"""
    name: str
    event_types: List[EventType]
    threshold: int = 1
    window_seconds: int = 60
    severity: str = "warning"
    
class AlertService:
    """Real-time security alerting"""
    
    def __init__(self):
        self.rules: List[AlertRule] = []
        self.event_counts: dict = {}
        self.handlers: List[Callable] = []
        
    def add_rule(self, rule: AlertRule):
        self.rules.append(rule)
        
    def add_handler(self, handler: Callable):
        """Add alert handler (email, Slack, PagerDuty, etc.)"""
        self.handlers.append(handler)
        
    async def check_event(self, event: AuditEvent):
        """Check if event triggers any rules"""
        for rule in self.rules:
            if event.event_type in rule.event_types:
                key = f"{rule.name}:{event.actor_id}"
                
                # Increment counter
                if key not in self.event_counts:
                    self.event_counts[key] = {
                        "count": 0,
                        "first_seen": datetime.utcnow(),
                    }
                    
                self.event_counts[key]["count"] += 1
                
                # Check threshold
                if self.event_counts[key]["count"] >= rule.threshold:
                    await self._trigger_alert(rule, event)
                    self.event_counts[key]["count"] = 0
                    
    async def _trigger_alert(self, rule: AlertRule, event: AuditEvent):
        """Send alert to all handlers"""
        alert = {
            "rule_name": rule.name,
            "severity": rule.severity,
            "event": event.to_dict(),
            "timestamp": datetime.utcnow().isoformat(),
        }
        
        for handler in self.handlers:
            await handler(alert)


# Configure standard HIPAA alerts
alert_service = AlertService()

# Failed login attempts (possible brute force)
alert_service.add_rule(AlertRule(
    name="brute_force_detection",
    event_types=[EventType.LOGIN_FAILURE],
    threshold=5,
    window_seconds=300,
    severity="high",
))

# Break glass access
alert_service.add_rule(AlertRule(
    name="break_glass_access",
    event_types=[EventType.BREAK_GLASS],
    threshold=1,
    severity="critical",
))

# Large data export
alert_service.add_rule(AlertRule(
    name="bulk_data_export",
    event_types=[EventType.PHI_EXPORT],
    threshold=1,
    severity="high",
))

# After hours access
alert_service.add_rule(AlertRule(
    name="after_hours_access",
    event_types=[EventType.PHI_VIEW, EventType.PHI_UPDATE],
    threshold=10,
    window_seconds=3600,
    severity="warning",
))


# Alert handlers
async def slack_handler(alert: dict):
    """Send alert to Slack"""
    async with aiohttp.ClientSession() as session:
        await session.post(
            SLACK_WEBHOOK_URL,
            json={
                "text": f"🚨 HIPAA Alert: {alert['rule_name']}",
                "attachments": [{
                    "color": "danger" if alert["severity"] == "critical" else "warning",
                    "fields": [
                        {"title": "Severity", "value": alert["severity"]},
                        {"title": "Actor", "value": alert["event"]["actor"]["id"]},
                        {"title": "Action", "value": alert["event"]["event_type"]},
                    ]
                }]
            }
        )

async def pagerduty_handler(alert: dict):
    """Create PagerDuty incident for critical alerts"""
    if alert["severity"] == "critical":
        async with aiohttp.ClientSession() as session:
            await session.post(
                "https://events.pagerduty.com/v2/enqueue",
                json={
                    "routing_key": PAGERDUTY_ROUTING_KEY,
                    "event_action": "trigger",
                    "payload": {
                        "summary": f"HIPAA Security Alert: {alert['rule_name']}",
                        "severity": "critical",
                        "source": "hipaa-audit-system",
                    }
                }
            )

alert_service.add_handler(slack_handler)
alert_service.add_handler(pagerduty_handler)

Accounting of Disclosures

HIPAA requires providing patients with an accounting of disclosures of their PHI:
class DisclosureAccountingService:
    """
    Generate accounting of disclosures for patient rights requests
    
    HIPAA requires tracking disclosures for:
    - 6 years prior to the request
    - Excludes TPO (Treatment, Payment, Operations)
    """
    
    def __init__(self, audit_storage):
        self.storage = audit_storage
        
    async def generate_accounting(
        self,
        patient_id: str,
        start_date: datetime,
        end_date: datetime
    ) -> List[dict]:
        """Generate accounting of disclosures for a patient"""
        
        # Query disclosures (excluding TPO)
        disclosures = await self.storage.query_events(
            filters={
                "target.patient_id": patient_id,
                "event_type": {
                    "$in": [
                        EventType.DISCLOSURE_AUTHORIZED.value,
                        EventType.DISCLOSURE_REQUIRED.value,
                    ]
                },
                "timestamp": {
                    "$gte": start_date.isoformat(),
                    "$lte": end_date.isoformat(),
                }
            }
        )
        
        # Format for patient
        return [
            {
                "date": d["timestamp"],
                "recipient": self._get_recipient_name(d),
                "purpose": d["action"]["reason"],
                "description": self._get_disclosure_description(d),
            }
            for d in disclosures
        ]
    
    def _get_recipient_name(self, disclosure: dict) -> str:
        """Get human-readable recipient name"""
        # Map to actual entity names from your system
        return disclosure.get("disclosure_recipient", "Unknown")
        
    def _get_disclosure_description(self, disclosure: dict) -> str:
        """Get human-readable description of what was disclosed"""
        fields = disclosure.get("data", {}).get("fields_accessed", [])
        return f"Disclosed: {', '.join(fields)}"

Retention and Archival

class AuditRetentionManager:
    """
    Manage audit log retention per HIPAA requirements
    
    HIPAA requires 6-year retention minimum
    Many organizations retain for 7+ years
    """
    
    RETENTION_YEARS = 7
    
    def __init__(self, hot_storage, cold_storage):
        self.hot_storage = hot_storage  # PostgreSQL
        self.cold_storage = cold_storage  # S3 Glacier
        
    async def archive_old_logs(self):
        """Move old logs to cold storage"""
        cutoff = datetime.utcnow() - timedelta(days=365)
        
        # Get old partitions
        old_partitions = await self.hot_storage.get_partitions_before(cutoff)
        
        for partition in old_partitions:
            # Export to Parquet format
            data = await self.hot_storage.export_partition(partition)
            
            # Compress and encrypt
            encrypted_data = self._encrypt_archive(data)
            
            # Upload to Glacier
            archive_id = await self.cold_storage.upload(
                encrypted_data,
                metadata={
                    "partition": partition,
                    "export_date": datetime.utcnow().isoformat(),
                    "record_count": len(data),
                }
            )
            
            # Store archive reference
            await self.hot_storage.store_archive_reference(
                partition=partition,
                archive_id=archive_id,
                archive_location=f"s3://audit-archive/{archive_id}",
            )
            
            # Drop hot partition (keep metadata)
            await self.hot_storage.drop_partition(partition)
            
    async def retrieve_archived_logs(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> List[AuditEvent]:
        """Retrieve logs from cold storage (may take hours)"""
        
        # Find relevant archives
        archives = await self.hot_storage.find_archives(start_date, end_date)
        
        # Initiate retrieval
        retrieval_jobs = []
        for archive in archives:
            job = await self.cold_storage.initiate_retrieval(
                archive["archive_id"],
                tier="Expedited"  # 1-5 minutes, costs more
            )
            retrieval_jobs.append(job)
            
        # Wait for retrieval (or poll asynchronously)
        events = []
        for job in retrieval_jobs:
            data = await self.cold_storage.wait_for_retrieval(job)
            decrypted = self._decrypt_archive(data)
            events.extend(decrypted)
            
        return events

Key Takeaways

Log Everything

Every PHI access must be logged with who, what, when, where, and why

Tamper-Proof

Use hash chains and digital signatures to detect tampering

Real-time Alerts

Detect security incidents as they happen, not during audits

Retain 6+ Years

Keep logs accessible for the full HIPAA retention period

Practice Exercise

1

Design Schema

Create an audit log schema for your application’s specific PHI access patterns.
2

Implement Logging

Build the core audit logger with hash chaining.
3

Add Middleware

Integrate audit logging into your API middleware.
4

Configure Alerts

Set up real-time alerts for security-relevant events.
5

Verify Integrity

Implement and test the integrity verification system.

Next Steps