Audit Logging for HIPAA Compliance
Audit logging is a core requirement of the HIPAA Security Rule. Every access, modification, and disclosure of PHI must be logged, retained, and protected from tampering.Learning Objectives:
- Understand HIPAA audit requirements
- Design comprehensive audit log schemas
- Implement tamper-proof logging
- Build real-time alerting systems
- Meet retention and review requirements
Why Audit Logging Matters
HIPAA Audit Logging Architecture
Audit Log Schema Design
Core Audit Event Schema
Copy
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
import uuid
import hashlib
import json
class EventType(Enum):
# Authentication events
LOGIN_SUCCESS = "auth.login.success"
LOGIN_FAILURE = "auth.login.failure"
LOGOUT = "auth.logout"
MFA_SUCCESS = "auth.mfa.success"
MFA_FAILURE = "auth.mfa.failure"
PASSWORD_CHANGE = "auth.password.change"
# PHI Access events
PHI_VIEW = "phi.view"
PHI_CREATE = "phi.create"
PHI_UPDATE = "phi.update"
PHI_DELETE = "phi.delete"
PHI_EXPORT = "phi.export"
PHI_PRINT = "phi.print"
PHI_COPY = "phi.copy"
# Disclosure events
DISCLOSURE_TREATMENT = "disclosure.treatment"
DISCLOSURE_PAYMENT = "disclosure.payment"
DISCLOSURE_OPERATIONS = "disclosure.operations"
DISCLOSURE_AUTHORIZED = "disclosure.authorized"
DISCLOSURE_REQUIRED = "disclosure.required"
# System events
CONFIG_CHANGE = "system.config.change"
USER_CREATE = "system.user.create"
USER_MODIFY = "system.user.modify"
USER_DELETE = "system.user.delete"
ROLE_CHANGE = "system.role.change"
# Security events
ACCESS_DENIED = "security.access.denied"
INTRUSION_DETECTED = "security.intrusion"
ENCRYPTION_KEY_ROTATE = "security.key.rotate"
BREAK_GLASS = "security.break_glass"
class SensitivityLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class AuditEvent:
"""HIPAA-compliant audit event record"""
# Unique identifier
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
# Timestamp (UTC, ISO 8601)
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
# Event classification
event_type: EventType = EventType.PHI_VIEW
sensitivity: SensitivityLevel = SensitivityLevel.MEDIUM
# Actor (who performed the action)
actor_id: str = ""
actor_type: str = "user" # user, system, api_client
actor_name: str = ""
actor_role: str = ""
actor_ip: str = ""
actor_user_agent: str = ""
actor_session_id: str = ""
# Target (what was acted upon)
resource_type: str = "" # patient_record, prescription, lab_result
resource_id: str = ""
patient_id: Optional[str] = None # If applicable
# Action details
action: str = "" # read, write, delete, etc.
outcome: str = "success" # success, failure, error
reason: Optional[str] = None # For failures or break_glass
# Request context
request_id: str = ""
correlation_id: str = ""
# Data details (what changed)
fields_accessed: List[str] = field(default_factory=list)
old_values: Optional[Dict[str, Any]] = None # For updates
new_values: Optional[Dict[str, Any]] = None # For updates
# Integrity
previous_hash: str = "" # Hash of previous log entry
signature: str = "" # Digital signature
def to_dict(self) -> dict:
"""Convert to dictionary for storage"""
return {
"event_id": self.event_id,
"timestamp": self.timestamp,
"event_type": self.event_type.value,
"sensitivity": self.sensitivity.value,
"actor": {
"id": self.actor_id,
"type": self.actor_type,
"name": self.actor_name,
"role": self.actor_role,
"ip": self.actor_ip,
"user_agent": self.actor_user_agent,
"session_id": self.actor_session_id,
},
"target": {
"resource_type": self.resource_type,
"resource_id": self.resource_id,
"patient_id": self.patient_id,
},
"action": {
"type": self.action,
"outcome": self.outcome,
"reason": self.reason,
},
"context": {
"request_id": self.request_id,
"correlation_id": self.correlation_id,
},
"data": {
"fields_accessed": self.fields_accessed,
"old_values": self.old_values,
"new_values": self.new_values,
},
"integrity": {
"previous_hash": self.previous_hash,
"signature": self.signature,
},
}
def compute_hash(self) -> str:
"""Compute hash for integrity verification"""
data = self.to_dict()
del data["integrity"] # Exclude integrity fields
canonical = json.dumps(data, sort_keys=True)
return hashlib.sha256(canonical.encode()).hexdigest()
Implementing the Audit Logger
Core Audit Service
Copy
import asyncio
from datetime import datetime, timedelta
from typing import Optional, List
import aiohttp
import json
import hmac
import hashlib
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.backends import default_backend
class AuditLogger:
"""
HIPAA-compliant audit logging service
Features:
- Tamper-proof chain of hashes
- Digital signatures
- Async batch processing
- Multiple storage backends
- Real-time alerting
"""
def __init__(
self,
storage_backend,
signing_key: rsa.RSAPrivateKey,
alert_service=None,
batch_size: int = 100,
flush_interval: float = 5.0,
):
self.storage = storage_backend
self.signing_key = signing_key
self.alert_service = alert_service
self.batch_size = batch_size
self.flush_interval = flush_interval
self._buffer: List[AuditEvent] = []
self._last_hash: str = ""
self._lock = asyncio.Lock()
self._flush_task: Optional[asyncio.Task] = None
async def start(self):
"""Start the background flush task"""
self._last_hash = await self.storage.get_last_hash()
self._flush_task = asyncio.create_task(self._periodic_flush())
async def stop(self):
"""Stop and flush remaining events"""
if self._flush_task:
self._flush_task.cancel()
await self._flush()
async def log(self, event: AuditEvent) -> str:
"""Log an audit event"""
async with self._lock:
# Chain to previous hash
event.previous_hash = self._last_hash
# Compute hash
event_hash = event.compute_hash()
self._last_hash = event_hash
# Sign the event
event.signature = self._sign_event(event_hash)
# Add to buffer
self._buffer.append(event)
# Check for alerts
if self.alert_service:
await self._check_alerts(event)
# Flush if buffer is full
if len(self._buffer) >= self.batch_size:
await self._flush()
return event.event_id
def _sign_event(self, event_hash: str) -> str:
"""Digitally sign the event hash"""
signature = self.signing_key.sign(
event_hash.encode(),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return signature.hex()
async def _flush(self):
"""Flush buffer to storage"""
if not self._buffer:
return
events = self._buffer.copy()
self._buffer.clear()
await self.storage.store_batch(events)
async def _periodic_flush(self):
"""Periodically flush buffer"""
while True:
await asyncio.sleep(self.flush_interval)
async with self._lock:
await self._flush()
async def _check_alerts(self, event: AuditEvent):
"""Check if event should trigger an alert"""
alert_conditions = [
# Multiple failed logins
event.event_type == EventType.LOGIN_FAILURE,
# Break glass access
event.event_type == EventType.BREAK_GLASS,
# Large data export
event.event_type == EventType.PHI_EXPORT,
# Access denied
event.event_type == EventType.ACCESS_DENIED,
# High sensitivity events
event.sensitivity == SensitivityLevel.CRITICAL,
]
if any(alert_conditions):
await self.alert_service.send_alert(event)
class AuditStoragePostgres:
"""PostgreSQL storage backend for audit logs"""
def __init__(self, connection_pool):
self.pool = connection_pool
async def store_batch(self, events: List[AuditEvent]):
"""Store batch of events atomically"""
async with self.pool.acquire() as conn:
async with conn.transaction():
for event in events:
await conn.execute("""
INSERT INTO audit_logs (
event_id, timestamp, event_type, sensitivity,
actor_data, target_data, action_data,
context_data, change_data, integrity_data
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
""",
event.event_id,
event.timestamp,
event.event_type.value,
event.sensitivity.value,
json.dumps(event.to_dict()["actor"]),
json.dumps(event.to_dict()["target"]),
json.dumps(event.to_dict()["action"]),
json.dumps(event.to_dict()["context"]),
json.dumps(event.to_dict()["data"]),
json.dumps(event.to_dict()["integrity"]),
)
async def get_last_hash(self) -> str:
"""Get the hash of the last event"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT integrity_data->>'previous_hash' as hash
FROM audit_logs
ORDER BY timestamp DESC
LIMIT 1
""")
return row["hash"] if row else ""
Database Schema
Copy
-- PostgreSQL schema for HIPAA-compliant audit logs
-- Main audit log table (append-only)
CREATE TABLE audit_logs (
id BIGSERIAL PRIMARY KEY,
event_id UUID UNIQUE NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
event_type VARCHAR(100) NOT NULL,
sensitivity VARCHAR(20) NOT NULL,
-- JSONB for flexible querying
actor_data JSONB NOT NULL,
target_data JSONB NOT NULL,
action_data JSONB NOT NULL,
context_data JSONB NOT NULL,
change_data JSONB,
integrity_data JSONB NOT NULL,
-- Partitioning key
created_date DATE NOT NULL DEFAULT CURRENT_DATE
) PARTITION BY RANGE (created_date);
-- Create partitions for each month (6+ year retention)
CREATE TABLE audit_logs_2024_01 PARTITION OF audit_logs
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
-- ... create partitions for future months
-- Indexes for common queries
CREATE INDEX idx_audit_timestamp ON audit_logs (timestamp DESC);
CREATE INDEX idx_audit_event_type ON audit_logs (event_type);
CREATE INDEX idx_audit_actor_id ON audit_logs ((actor_data->>'id'));
CREATE INDEX idx_audit_patient_id ON audit_logs ((target_data->>'patient_id'));
CREATE INDEX idx_audit_resource ON audit_logs (
(target_data->>'resource_type'),
(target_data->>'resource_id')
);
-- Prevent modifications (append-only)
CREATE OR REPLACE FUNCTION prevent_audit_modification()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'UPDATE' THEN
RAISE EXCEPTION 'Audit logs cannot be modified';
ELSIF TG_OP = 'DELETE' THEN
RAISE EXCEPTION 'Audit logs cannot be deleted';
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER audit_immutable
BEFORE UPDATE OR DELETE ON audit_logs
FOR EACH ROW
EXECUTE FUNCTION prevent_audit_modification();
-- Grant read-only access to auditors
CREATE ROLE auditor;
GRANT SELECT ON audit_logs TO auditor;
-- Separate table for integrity verification
CREATE TABLE audit_integrity_checkpoints (
id SERIAL PRIMARY KEY,
checkpoint_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_event_id UUID NOT NULL,
computed_hash VARCHAR(64) NOT NULL,
verified_by VARCHAR(100) NOT NULL,
verification_result BOOLEAN NOT NULL
);
FastAPI Integration
Audit Middleware
Copy
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from contextvars import ContextVar
import uuid
# Context variable for request tracking
request_context: ContextVar[dict] = ContextVar('request_context', default={})
class AuditMiddleware(BaseHTTPMiddleware):
"""Middleware to capture request context for audit logging"""
def __init__(self, app, audit_logger: AuditLogger):
super().__init__(app)
self.audit_logger = audit_logger
async def dispatch(self, request: Request, call_next):
# Generate request ID
request_id = str(uuid.uuid4())
correlation_id = request.headers.get("X-Correlation-ID", request_id)
# Extract actor information
actor_context = {
"request_id": request_id,
"correlation_id": correlation_id,
"ip": request.client.host if request.client else "unknown",
"user_agent": request.headers.get("User-Agent", ""),
"method": request.method,
"path": request.url.path,
}
# Set context for downstream use
request_context.set(actor_context)
# Add request ID to response headers
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
class PHIAccessLogger:
"""Decorator for logging PHI access in endpoints"""
def __init__(self, audit_logger: AuditLogger):
self.audit_logger = audit_logger
def log_access(
self,
resource_type: str,
action: str = "read",
sensitivity: SensitivityLevel = SensitivityLevel.MEDIUM,
):
def decorator(func):
async def wrapper(*args, **kwargs):
# Get current user from dependency injection
current_user = kwargs.get("current_user")
ctx = request_context.get()
# Get resource ID from path parameters
resource_id = kwargs.get("patient_id") or kwargs.get("record_id")
# Create audit event
event = AuditEvent(
event_type=EventType.PHI_VIEW if action == "read" else EventType.PHI_UPDATE,
sensitivity=sensitivity,
actor_id=str(current_user.id) if current_user else "anonymous",
actor_type="user",
actor_name=current_user.email if current_user else "",
actor_role=current_user.role if current_user else "",
actor_ip=ctx.get("ip", ""),
actor_user_agent=ctx.get("user_agent", ""),
resource_type=resource_type,
resource_id=str(resource_id) if resource_id else "",
patient_id=str(kwargs.get("patient_id", "")),
action=action,
request_id=ctx.get("request_id", ""),
correlation_id=ctx.get("correlation_id", ""),
)
try:
result = await func(*args, **kwargs)
event.outcome = "success"
# Log accessed fields if result is a model
if hasattr(result, "__dict__"):
event.fields_accessed = list(result.__dict__.keys())
except Exception as e:
event.outcome = "error"
event.reason = str(e)
raise
finally:
await self.audit_logger.log(event)
return result
return wrapper
return decorator
# Usage in FastAPI endpoints
from fastapi import Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
audit_logger = AuditLogger(...) # Initialize with config
phi_logger = PHIAccessLogger(audit_logger)
@app.get("/patients/{patient_id}")
@phi_logger.log_access(resource_type="patient_record", action="read")
async def get_patient(
patient_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get patient record with automatic audit logging"""
patient = await db.get(Patient, patient_id)
if not patient:
raise HTTPException(404, "Patient not found")
return patient
@app.put("/patients/{patient_id}")
@phi_logger.log_access(
resource_type="patient_record",
action="update",
sensitivity=SensitivityLevel.HIGH
)
async def update_patient(
patient_id: int,
update_data: PatientUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update patient record with change tracking"""
patient = await db.get(Patient, patient_id)
if not patient:
raise HTTPException(404, "Patient not found")
# Store old values for audit
old_values = patient.to_dict()
# Apply updates
for field, value in update_data.dict(exclude_unset=True).items():
setattr(patient, field, value)
await db.commit()
# Log changes (handled by decorator + context)
ctx = request_context.get()
ctx["old_values"] = old_values
ctx["new_values"] = patient.to_dict()
return patient
Log Integrity Verification
Verification Service
Copy
class AuditIntegrityVerifier:
"""
Verify integrity of audit log chain
Detects:
- Modified entries
- Deleted entries
- Inserted entries
- Chain breaks
"""
def __init__(self, storage, public_key):
self.storage = storage
self.public_key = public_key
async def verify_chain(
self,
start_time: datetime,
end_time: datetime
) -> dict:
"""Verify integrity of audit log chain"""
results = {
"verified": True,
"events_checked": 0,
"errors": [],
"warnings": [],
}
events = await self.storage.get_events_in_range(start_time, end_time)
previous_hash = ""
for i, event in enumerate(events):
results["events_checked"] += 1
# Verify chain linkage
if event.previous_hash != previous_hash:
results["verified"] = False
results["errors"].append({
"event_id": event.event_id,
"error": "Chain break detected",
"expected_hash": previous_hash,
"found_hash": event.previous_hash,
})
# Verify hash computation
computed_hash = event.compute_hash()
if computed_hash != self._extract_hash_from_signature(event):
results["verified"] = False
results["errors"].append({
"event_id": event.event_id,
"error": "Hash mismatch - possible tampering",
})
# Verify signature
if not self._verify_signature(event):
results["verified"] = False
results["errors"].append({
"event_id": event.event_id,
"error": "Invalid signature",
})
previous_hash = computed_hash
return results
def _verify_signature(self, event: AuditEvent) -> bool:
"""Verify event signature using public key"""
try:
event_hash = event.compute_hash()
signature = bytes.fromhex(event.signature)
self.public_key.verify(
signature,
event_hash.encode(),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception:
return False
async def run_daily_verification(self):
"""Scheduled daily verification"""
yesterday = datetime.utcnow() - timedelta(days=1)
today = datetime.utcnow()
results = await self.verify_chain(yesterday, today)
# Store verification result
await self.storage.store_checkpoint({
"checkpoint_time": datetime.utcnow(),
"verified_by": "automated_daily_check",
"verification_result": results["verified"],
"details": results,
})
# Alert on failures
if not results["verified"]:
await self.alert_service.critical_alert(
"Audit Log Integrity Failure",
results
)
Real-time Alerting
Alert Configuration
Copy
from dataclasses import dataclass
from typing import List, Callable
import asyncio
@dataclass
class AlertRule:
"""Defines when to trigger an alert"""
name: str
event_types: List[EventType]
threshold: int = 1
window_seconds: int = 60
severity: str = "warning"
class AlertService:
"""Real-time security alerting"""
def __init__(self):
self.rules: List[AlertRule] = []
self.event_counts: dict = {}
self.handlers: List[Callable] = []
def add_rule(self, rule: AlertRule):
self.rules.append(rule)
def add_handler(self, handler: Callable):
"""Add alert handler (email, Slack, PagerDuty, etc.)"""
self.handlers.append(handler)
async def check_event(self, event: AuditEvent):
"""Check if event triggers any rules"""
for rule in self.rules:
if event.event_type in rule.event_types:
key = f"{rule.name}:{event.actor_id}"
# Increment counter
if key not in self.event_counts:
self.event_counts[key] = {
"count": 0,
"first_seen": datetime.utcnow(),
}
self.event_counts[key]["count"] += 1
# Check threshold
if self.event_counts[key]["count"] >= rule.threshold:
await self._trigger_alert(rule, event)
self.event_counts[key]["count"] = 0
async def _trigger_alert(self, rule: AlertRule, event: AuditEvent):
"""Send alert to all handlers"""
alert = {
"rule_name": rule.name,
"severity": rule.severity,
"event": event.to_dict(),
"timestamp": datetime.utcnow().isoformat(),
}
for handler in self.handlers:
await handler(alert)
# Configure standard HIPAA alerts
alert_service = AlertService()
# Failed login attempts (possible brute force)
alert_service.add_rule(AlertRule(
name="brute_force_detection",
event_types=[EventType.LOGIN_FAILURE],
threshold=5,
window_seconds=300,
severity="high",
))
# Break glass access
alert_service.add_rule(AlertRule(
name="break_glass_access",
event_types=[EventType.BREAK_GLASS],
threshold=1,
severity="critical",
))
# Large data export
alert_service.add_rule(AlertRule(
name="bulk_data_export",
event_types=[EventType.PHI_EXPORT],
threshold=1,
severity="high",
))
# After hours access
alert_service.add_rule(AlertRule(
name="after_hours_access",
event_types=[EventType.PHI_VIEW, EventType.PHI_UPDATE],
threshold=10,
window_seconds=3600,
severity="warning",
))
# Alert handlers
async def slack_handler(alert: dict):
"""Send alert to Slack"""
async with aiohttp.ClientSession() as session:
await session.post(
SLACK_WEBHOOK_URL,
json={
"text": f"🚨 HIPAA Alert: {alert['rule_name']}",
"attachments": [{
"color": "danger" if alert["severity"] == "critical" else "warning",
"fields": [
{"title": "Severity", "value": alert["severity"]},
{"title": "Actor", "value": alert["event"]["actor"]["id"]},
{"title": "Action", "value": alert["event"]["event_type"]},
]
}]
}
)
async def pagerduty_handler(alert: dict):
"""Create PagerDuty incident for critical alerts"""
if alert["severity"] == "critical":
async with aiohttp.ClientSession() as session:
await session.post(
"https://events.pagerduty.com/v2/enqueue",
json={
"routing_key": PAGERDUTY_ROUTING_KEY,
"event_action": "trigger",
"payload": {
"summary": f"HIPAA Security Alert: {alert['rule_name']}",
"severity": "critical",
"source": "hipaa-audit-system",
}
}
)
alert_service.add_handler(slack_handler)
alert_service.add_handler(pagerduty_handler)
Accounting of Disclosures
HIPAA requires providing patients with an accounting of disclosures of their PHI:Copy
class DisclosureAccountingService:
"""
Generate accounting of disclosures for patient rights requests
HIPAA requires tracking disclosures for:
- 6 years prior to the request
- Excludes TPO (Treatment, Payment, Operations)
"""
def __init__(self, audit_storage):
self.storage = audit_storage
async def generate_accounting(
self,
patient_id: str,
start_date: datetime,
end_date: datetime
) -> List[dict]:
"""Generate accounting of disclosures for a patient"""
# Query disclosures (excluding TPO)
disclosures = await self.storage.query_events(
filters={
"target.patient_id": patient_id,
"event_type": {
"$in": [
EventType.DISCLOSURE_AUTHORIZED.value,
EventType.DISCLOSURE_REQUIRED.value,
]
},
"timestamp": {
"$gte": start_date.isoformat(),
"$lte": end_date.isoformat(),
}
}
)
# Format for patient
return [
{
"date": d["timestamp"],
"recipient": self._get_recipient_name(d),
"purpose": d["action"]["reason"],
"description": self._get_disclosure_description(d),
}
for d in disclosures
]
def _get_recipient_name(self, disclosure: dict) -> str:
"""Get human-readable recipient name"""
# Map to actual entity names from your system
return disclosure.get("disclosure_recipient", "Unknown")
def _get_disclosure_description(self, disclosure: dict) -> str:
"""Get human-readable description of what was disclosed"""
fields = disclosure.get("data", {}).get("fields_accessed", [])
return f"Disclosed: {', '.join(fields)}"
Retention and Archival
Copy
class AuditRetentionManager:
"""
Manage audit log retention per HIPAA requirements
HIPAA requires 6-year retention minimum
Many organizations retain for 7+ years
"""
RETENTION_YEARS = 7
def __init__(self, hot_storage, cold_storage):
self.hot_storage = hot_storage # PostgreSQL
self.cold_storage = cold_storage # S3 Glacier
async def archive_old_logs(self):
"""Move old logs to cold storage"""
cutoff = datetime.utcnow() - timedelta(days=365)
# Get old partitions
old_partitions = await self.hot_storage.get_partitions_before(cutoff)
for partition in old_partitions:
# Export to Parquet format
data = await self.hot_storage.export_partition(partition)
# Compress and encrypt
encrypted_data = self._encrypt_archive(data)
# Upload to Glacier
archive_id = await self.cold_storage.upload(
encrypted_data,
metadata={
"partition": partition,
"export_date": datetime.utcnow().isoformat(),
"record_count": len(data),
}
)
# Store archive reference
await self.hot_storage.store_archive_reference(
partition=partition,
archive_id=archive_id,
archive_location=f"s3://audit-archive/{archive_id}",
)
# Drop hot partition (keep metadata)
await self.hot_storage.drop_partition(partition)
async def retrieve_archived_logs(
self,
start_date: datetime,
end_date: datetime
) -> List[AuditEvent]:
"""Retrieve logs from cold storage (may take hours)"""
# Find relevant archives
archives = await self.hot_storage.find_archives(start_date, end_date)
# Initiate retrieval
retrieval_jobs = []
for archive in archives:
job = await self.cold_storage.initiate_retrieval(
archive["archive_id"],
tier="Expedited" # 1-5 minutes, costs more
)
retrieval_jobs.append(job)
# Wait for retrieval (or poll asynchronously)
events = []
for job in retrieval_jobs:
data = await self.cold_storage.wait_for_retrieval(job)
decrypted = self._decrypt_archive(data)
events.extend(decrypted)
return events
Key Takeaways
Log Everything
Every PHI access must be logged with who, what, when, where, and why
Tamper-Proof
Use hash chains and digital signatures to detect tampering
Real-time Alerts
Detect security incidents as they happen, not during audits
Retain 6+ Years
Keep logs accessible for the full HIPAA retention period
Practice Exercise
1
Design Schema
Create an audit log schema for your application’s specific PHI access patterns.
2
Implement Logging
Build the core audit logger with hash chaining.
3
Add Middleware
Integrate audit logging into your API middleware.
4
Configure Alerts
Set up real-time alerts for security-relevant events.
5
Verify Integrity
Implement and test the integrity verification system.