Capstone Project: SecureHealth Platform
This capstone project challenges you to build a fully HIPAA-compliant healthcare application from the ground up. You’ll implement all the security controls, compliance measures, and best practices covered throughout this course.Time Commitment: This project is designed for 40-60 hours of work over 4-6 weeks. Each phase builds on the previous, creating a production-ready healthcare platform.
Project Overview
What You’ll Build
SecureHealth - A patient portal and clinical management system featuring:Patient Portal
Secure patient access to records, appointments, and messaging
Clinical Interface
Provider-facing system for patient management and documentation
Admin Dashboard
Compliance monitoring, audit logs, and system administration
Integration APIs
HIPAA-compliant APIs for third-party integrations
Technology Stack
Copy
# Recommended Technology Stack
stack = {
"backend": {
"framework": "FastAPI (Python 3.11+)",
"orm": "SQLAlchemy 2.0",
"database": "PostgreSQL 15+",
"cache": "Redis 7+",
"queue": "Celery with Redis"
},
"frontend": {
"framework": "React 18+ or Next.js 14+",
"state": "React Query / Zustand",
"ui": "Tailwind CSS + shadcn/ui"
},
"security": {
"auth": "Custom JWT + OAuth2",
"encryption": "Python cryptography library",
"key_management": "HashiCorp Vault or AWS KMS",
"secrets": "HashiCorp Vault"
},
"infrastructure": {
"cloud": "AWS (HIPAA-eligible services)",
"containers": "Docker + ECS Fargate",
"ci_cd": "GitHub Actions",
"monitoring": "CloudWatch + custom audit logging"
}
}
Phase 1: Foundation & Architecture (Week 1)
Objectives
- Set up secure development environment
- Design database schema with PHI classification
- Implement core security infrastructure
- Create project structure following security best practices
1.1 Project Setup
1
Repository Structure
Create project with security-focused structure:
Copy
securehealth/
├── backend/
│ ├── app/
│ │ ├── api/
│ │ │ ├── v1/
│ │ │ │ ├── endpoints/
│ │ │ │ └── router.py
│ │ ├── core/
│ │ │ ├── config.py
│ │ │ ├── security.py
│ │ │ └── encryption.py
│ │ ├── models/
│ │ ├── schemas/
│ │ ├── services/
│ │ └── middleware/
│ ├── tests/
│ └── alembic/
├── frontend/
├── infrastructure/
│ ├── terraform/
│ └── docker/
├── docs/
│ ├── security/
│ └── compliance/
└── scripts/
2
Security Configuration
Implement centralized security configuration:
Copy
# backend/app/core/config.py
from pydantic_settings import BaseSettings
from typing import List, Optional
import secrets
class SecuritySettings(BaseSettings):
"""Security configuration - all secrets from environment/vault"""
# Database
DATABASE_URL: str
DATABASE_ENCRYPTION_KEY_ID: str
# Authentication
JWT_SECRET_KEY: str
JWT_ALGORITHM: str = "RS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 15 # Short-lived
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Encryption
ENCRYPTION_KEY_ID: str # AWS KMS or Vault key ID
FIELD_ENCRYPTION_KEY_ID: str
# Session
SESSION_COOKIE_SECURE: bool = True
SESSION_COOKIE_HTTPONLY: bool = True
SESSION_COOKIE_SAMESITE: str = "strict"
# CORS (restrictive)
CORS_ORIGINS: List[str] = []
# Rate Limiting
RATE_LIMIT_PER_MINUTE: int = 60
# Audit
AUDIT_LOG_RETENTION_YEARS: int = 6
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = True
3
Database Schema Design
Design PHI-aware database schema:
Copy
# backend/app/models/patient.py
from sqlalchemy import Column, String, Date, Boolean, LargeBinary, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.db.base import Base
import uuid
class Patient(Base):
"""Patient model with field-level encryption for PHI"""
__tablename__ = "patients"
# Identifiers
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
mrn = Column(String(20), unique=True, nullable=False, index=True)
# Encrypted PHI fields (stored as binary)
first_name_encrypted = Column(LargeBinary, nullable=False)
last_name_encrypted = Column(LargeBinary, nullable=False)
ssn_encrypted = Column(LargeBinary, nullable=False)
date_of_birth_encrypted = Column(LargeBinary, nullable=False)
address_encrypted = Column(LargeBinary, nullable=False)
phone_encrypted = Column(LargeBinary, nullable=False)
email_encrypted = Column(LargeBinary, nullable=False)
# Blind indexes for searchable encrypted fields
ssn_hash = Column(LargeBinary, nullable=False, index=True)
name_hash = Column(LargeBinary, nullable=False, index=True)
dob_hash = Column(LargeBinary, nullable=False, index=True)
# Non-PHI metadata
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
encounters = relationship("Encounter", back_populates="patient")
clinical_notes = relationship("ClinicalNote", back_populates="patient")
# PHI classification
__phi_fields__ = [
"first_name", "last_name", "ssn", "date_of_birth",
"address", "phone", "email"
]
1.2 Encryption Service
Implement the encryption service for PHI:Copy
# backend/app/core/encryption.py
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
import os
import hashlib
import base64
from typing import Optional, Tuple
from dataclasses import dataclass
@dataclass
class EncryptedData:
"""Container for encrypted data with metadata"""
ciphertext: bytes
nonce: bytes
key_version: int
algorithm: str = "AES-256-GCM"
class PHIEncryptionService:
"""
Field-level encryption service for PHI.
Uses AES-256-GCM with unique nonce per encryption.
"""
def __init__(self, key_provider):
"""
Initialize with a key provider (Vault, KMS, etc.)
Args:
key_provider: Service that provides encryption keys
"""
self.key_provider = key_provider
self.current_key_version: int = 1
self._key_cache: dict = {}
def encrypt(self, plaintext: str, context: dict = None) -> EncryptedData:
"""
Encrypt a PHI field.
Args:
plaintext: The data to encrypt
context: Additional authenticated data (AAD)
Returns:
EncryptedData with ciphertext, nonce, and key version
"""
if not plaintext:
return None
# Get current encryption key
key = self._get_key(self.current_key_version)
# Generate unique nonce (96 bits for GCM)
nonce = os.urandom(12)
# Create cipher
aesgcm = AESGCM(key)
# Prepare AAD if provided
aad = self._prepare_aad(context) if context else None
# Encrypt
ciphertext = aesgcm.encrypt(nonce, plaintext.encode('utf-8'), aad)
return EncryptedData(
ciphertext=ciphertext,
nonce=nonce,
key_version=self.current_key_version
)
def decrypt(self, encrypted_data: EncryptedData, context: dict = None) -> str:
"""
Decrypt a PHI field.
Args:
encrypted_data: The encrypted data container
context: Additional authenticated data (must match encryption)
Returns:
Decrypted plaintext string
"""
if not encrypted_data:
return None
# Get the correct key version
key = self._get_key(encrypted_data.key_version)
# Create cipher
aesgcm = AESGCM(key)
# Prepare AAD if provided
aad = self._prepare_aad(context) if context else None
# Decrypt
plaintext = aesgcm.decrypt(
encrypted_data.nonce,
encrypted_data.ciphertext,
aad
)
return plaintext.decode('utf-8')
def create_blind_index(self, value: str, salt: bytes = None) -> bytes:
"""
Create a blind index for encrypted field searching.
Uses HMAC-SHA256 for deterministic but non-reversible indexing.
Args:
value: The value to index
salt: Optional salt (should be consistent per field)
Returns:
32-byte hash suitable for exact-match searching
"""
if not value:
return None
# Normalize the value
normalized = value.lower().strip()
# Get indexing key (different from encryption key)
index_key = self.key_provider.get_indexing_key()
# Create HMAC
h = hmac.new(index_key, normalized.encode('utf-8'), hashlib.sha256)
if salt:
h.update(salt)
return h.digest()
def rotate_key(self, old_version: int, new_version: int):
"""
Re-encrypt data with new key version.
This should be called during key rotation.
"""
# Implementation depends on your key rotation strategy
pass
def _get_key(self, version: int) -> bytes:
"""Get encryption key for specified version"""
if version not in self._key_cache:
self._key_cache[version] = self.key_provider.get_key(version)
return self._key_cache[version]
def _prepare_aad(self, context: dict) -> bytes:
"""Prepare additional authenticated data from context"""
import json
return json.dumps(context, sort_keys=True).encode('utf-8')
# Key Provider Interface
from abc import ABC, abstractmethod
class KeyProvider(ABC):
"""Abstract base for key providers"""
@abstractmethod
def get_key(self, version: int) -> bytes:
"""Get encryption key by version"""
pass
@abstractmethod
def get_indexing_key(self) -> bytes:
"""Get key for blind indexing"""
pass
class VaultKeyProvider(KeyProvider):
"""HashiCorp Vault key provider"""
def __init__(self, vault_client, key_path: str):
self.client = vault_client
self.key_path = key_path
def get_key(self, version: int) -> bytes:
secret = self.client.secrets.kv.v2.read_secret_version(
path=f"{self.key_path}/v{version}"
)
return base64.b64decode(secret['data']['data']['key'])
def get_indexing_key(self) -> bytes:
secret = self.client.secrets.kv.v2.read_secret_version(
path=f"{self.key_path}/indexing"
)
return base64.b64decode(secret['data']['data']['key'])
class AWSKMSKeyProvider(KeyProvider):
"""AWS KMS key provider using envelope encryption"""
def __init__(self, kms_client, key_id: str):
self.kms = kms_client
self.key_id = key_id
self._data_keys: dict = {}
def get_key(self, version: int) -> bytes:
"""Get data key from KMS"""
if version not in self._data_keys:
# Generate data key
response = self.kms.generate_data_key(
KeyId=self.key_id,
KeySpec='AES_256',
EncryptionContext={'version': str(version)}
)
self._data_keys[version] = response['Plaintext']
return self._data_keys[version]
def get_indexing_key(self) -> bytes:
"""Get indexing key from KMS"""
response = self.kms.generate_data_key(
KeyId=self.key_id,
KeySpec='AES_256',
EncryptionContext={'purpose': 'blind_indexing'}
)
return response['Plaintext']
Phase 1 Deliverables
- Project repository with security-focused structure
- Database schema with PHI classification
- Encryption service with key management integration
- Development environment with secrets management
- Initial security documentation
Phase 2: Authentication & Access Control (Week 2)
Objectives
- Implement secure authentication with MFA
- Build role-based access control (RBAC)
- Create session management with security controls
- Implement break-glass procedures
2.1 Authentication System
Copy
# backend/app/services/auth_service.py
from datetime import datetime, timedelta
from typing import Optional, Tuple
from jose import jwt, JWTError
from passlib.context import CryptContext
import pyotp
import secrets
from app.core.config import settings
from app.models.user import User
from app.services.audit_service import AuditService
class AuthenticationService:
"""HIPAA-compliant authentication service"""
def __init__(self, db_session, audit_service: AuditService):
self.db = db_session
self.audit = audit_service
self.pwd_context = CryptContext(
schemes=["argon2"], # Use Argon2id for password hashing
deprecated="auto"
)
# Account lockout settings
self.max_failed_attempts = 5
self.lockout_duration_minutes = 30
async def authenticate(
self,
username: str,
password: str,
mfa_code: Optional[str],
ip_address: str,
user_agent: str
) -> Tuple[bool, Optional[dict], str]:
"""
Authenticate user with password and MFA.
Returns:
Tuple of (success, tokens, message)
"""
# Get user
user = await self._get_user_by_username(username)
if not user:
# Don't reveal if username exists
await self._log_failed_auth(username, ip_address, "user_not_found")
return False, None, "Invalid credentials"
# Check account lockout
if await self._is_account_locked(user):
await self._log_failed_auth(username, ip_address, "account_locked")
return False, None, "Account temporarily locked"
# Verify password
if not self._verify_password(password, user.password_hash):
await self._increment_failed_attempts(user)
await self._log_failed_auth(username, ip_address, "invalid_password")
return False, None, "Invalid credentials"
# MFA required for all users
if not mfa_code:
return False, None, "MFA code required"
if not self._verify_mfa(user, mfa_code):
await self._increment_failed_attempts(user)
await self._log_failed_auth(username, ip_address, "invalid_mfa")
return False, None, "Invalid MFA code"
# Authentication successful
await self._reset_failed_attempts(user)
# Generate tokens
tokens = await self._generate_tokens(user)
# Create session
session = await self._create_session(user, ip_address, user_agent)
# Audit log
await self.audit.log_authentication(
user_id=user.id,
action="login_success",
ip_address=ip_address,
user_agent=user_agent,
session_id=session.id
)
return True, tokens, "Authentication successful"
def _verify_password(self, plain: str, hashed: str) -> bool:
"""Verify password using Argon2"""
return self.pwd_context.verify(plain, hashed)
def _hash_password(self, password: str) -> str:
"""Hash password using Argon2"""
return self.pwd_context.hash(password)
def _verify_mfa(self, user: User, code: str) -> bool:
"""Verify TOTP MFA code"""
totp = pyotp.TOTP(user.mfa_secret)
# Allow 1 window tolerance for clock skew
return totp.verify(code, valid_window=1)
async def _generate_tokens(self, user: User) -> dict:
"""Generate access and refresh tokens"""
now = datetime.utcnow()
# Access token (short-lived)
access_payload = {
"sub": str(user.id),
"type": "access",
"roles": [role.name for role in user.roles],
"permissions": self._get_user_permissions(user),
"iat": now,
"exp": now + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
}
access_token = jwt.encode(
access_payload,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM
)
# Refresh token (longer-lived, stored in DB)
refresh_token = secrets.token_urlsafe(32)
await self._store_refresh_token(user, refresh_token)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
}
async def _is_account_locked(self, user: User) -> bool:
"""Check if account is locked due to failed attempts"""
if user.failed_login_attempts >= self.max_failed_attempts:
if user.last_failed_login:
lockout_until = user.last_failed_login + timedelta(
minutes=self.lockout_duration_minutes
)
if datetime.utcnow() < lockout_until:
return True
return False
2.2 Role-Based Access Control
Copy
# backend/app/services/rbac_service.py
from typing import List, Set, Optional
from dataclasses import dataclass
from enum import Enum
class Permission(Enum):
"""Fine-grained permissions for healthcare operations"""
# Patient operations
PATIENT_VIEW = "patient:view"
PATIENT_CREATE = "patient:create"
PATIENT_UPDATE = "patient:update"
PATIENT_DELETE = "patient:delete"
PATIENT_VIEW_SENSITIVE = "patient:view_sensitive" # SSN, etc.
# Clinical operations
ENCOUNTER_VIEW = "encounter:view"
ENCOUNTER_CREATE = "encounter:create"
ENCOUNTER_UPDATE = "encounter:update"
CLINICAL_NOTE_VIEW = "clinical_note:view"
CLINICAL_NOTE_CREATE = "clinical_note:create"
CLINICAL_NOTE_SIGN = "clinical_note:sign"
# Medication operations
MEDICATION_VIEW = "medication:view"
MEDICATION_PRESCRIBE = "medication:prescribe"
CONTROLLED_SUBSTANCE_PRESCRIBE = "controlled_substance:prescribe"
# Administrative
USER_MANAGE = "user:manage"
ROLE_MANAGE = "role:manage"
AUDIT_VIEW = "audit:view"
SYSTEM_CONFIG = "system:config"
# Break-glass
BREAK_GLASS_REQUEST = "break_glass:request"
BREAK_GLASS_APPROVE = "break_glass:approve"
@dataclass
class Role:
"""Role definition with permissions"""
name: str
description: str
permissions: Set[Permission]
is_clinical: bool = False
requires_mfa: bool = True
# Pre-defined roles
ROLES = {
"physician": Role(
name="physician",
description="Licensed physician with full clinical access",
permissions={
Permission.PATIENT_VIEW,
Permission.PATIENT_CREATE,
Permission.PATIENT_UPDATE,
Permission.PATIENT_VIEW_SENSITIVE,
Permission.ENCOUNTER_VIEW,
Permission.ENCOUNTER_CREATE,
Permission.ENCOUNTER_UPDATE,
Permission.CLINICAL_NOTE_VIEW,
Permission.CLINICAL_NOTE_CREATE,
Permission.CLINICAL_NOTE_SIGN,
Permission.MEDICATION_VIEW,
Permission.MEDICATION_PRESCRIBE,
Permission.CONTROLLED_SUBSTANCE_PRESCRIBE,
Permission.BREAK_GLASS_REQUEST,
},
is_clinical=True
),
"nurse": Role(
name="nurse",
description="Registered nurse with clinical access",
permissions={
Permission.PATIENT_VIEW,
Permission.PATIENT_UPDATE,
Permission.ENCOUNTER_VIEW,
Permission.ENCOUNTER_CREATE,
Permission.CLINICAL_NOTE_VIEW,
Permission.CLINICAL_NOTE_CREATE,
Permission.MEDICATION_VIEW,
Permission.BREAK_GLASS_REQUEST,
},
is_clinical=True
),
"medical_assistant": Role(
name="medical_assistant",
description="Medical assistant with limited clinical access",
permissions={
Permission.PATIENT_VIEW,
Permission.ENCOUNTER_VIEW,
Permission.ENCOUNTER_CREATE,
Permission.CLINICAL_NOTE_VIEW,
},
is_clinical=True
),
"billing": Role(
name="billing",
description="Billing staff with limited PHI access",
permissions={
Permission.PATIENT_VIEW,
Permission.ENCOUNTER_VIEW,
},
is_clinical=False
),
"admin": Role(
name="admin",
description="System administrator",
permissions={
Permission.USER_MANAGE,
Permission.ROLE_MANAGE,
Permission.AUDIT_VIEW,
Permission.SYSTEM_CONFIG,
Permission.BREAK_GLASS_APPROVE,
},
is_clinical=False
)
}
class RBACService:
"""Role-based access control service"""
def __init__(self, db_session):
self.db = db_session
def check_permission(
self,
user_id: str,
permission: Permission,
resource_id: Optional[str] = None
) -> bool:
"""
Check if user has permission, optionally for specific resource.
Implements minimum necessary principle by checking:
1. User has the permission
2. User has legitimate access to the resource
"""
user = self._get_user(user_id)
# Check role permissions
user_permissions = self._get_user_permissions(user)
if permission not in user_permissions:
return False
# If resource-specific, check resource access
if resource_id:
return self._check_resource_access(user, permission, resource_id)
return True
def _check_resource_access(
self,
user,
permission: Permission,
resource_id: str
) -> bool:
"""
Check minimum necessary access to specific resource.
For patients: Check treatment relationship
For other resources: Check ownership or assignment
"""
# Implement based on resource type
if permission.value.startswith("patient:"):
return self._check_patient_access(user, resource_id)
return True
def _check_patient_access(self, user, patient_id: str) -> bool:
"""
Verify user has legitimate access to patient.
Access granted if:
- User is assigned to patient
- User is on care team
- User has active break-glass
"""
# Check direct assignment
if self._is_assigned_to_patient(user.id, patient_id):
return True
# Check care team
if self._is_on_care_team(user.id, patient_id):
return True
# Check break-glass
if self._has_active_break_glass(user.id, patient_id):
return True
return False
Phase 2 Deliverables
- Authentication service with MFA
- RBAC implementation with minimum necessary
- Session management with security controls
- Break-glass procedure implementation
- Password policy enforcement
Phase 3: Audit Logging & Monitoring (Week 3)
Objectives
- Implement comprehensive audit logging
- Create PHI access tracking
- Build anomaly detection system
- Design audit reporting for compliance
3.1 Audit Logging Service
Copy
# backend/app/services/audit_service.py
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
from dataclasses import dataclass, asdict
import json
import hashlib
class AuditEventType(Enum):
"""Categorized audit event types"""
# Authentication
LOGIN_SUCCESS = "auth.login.success"
LOGIN_FAILURE = "auth.login.failure"
LOGOUT = "auth.logout"
MFA_SETUP = "auth.mfa.setup"
PASSWORD_CHANGE = "auth.password.change"
SESSION_TIMEOUT = "auth.session.timeout"
# PHI Access
PHI_VIEW = "phi.view"
PHI_CREATE = "phi.create"
PHI_UPDATE = "phi.update"
PHI_DELETE = "phi.delete"
PHI_EXPORT = "phi.export"
PHI_PRINT = "phi.print"
# Break-Glass
BREAK_GLASS_REQUEST = "access.break_glass.request"
BREAK_GLASS_APPROVE = "access.break_glass.approve"
BREAK_GLASS_DENY = "access.break_glass.deny"
BREAK_GLASS_EXPIRE = "access.break_glass.expire"
# System
CONFIG_CHANGE = "system.config.change"
USER_CREATE = "system.user.create"
USER_UPDATE = "system.user.update"
ROLE_CHANGE = "system.role.change"
# Security
SECURITY_ALERT = "security.alert"
ANOMALY_DETECTED = "security.anomaly"
ACCESS_DENIED = "security.access.denied"
@dataclass
class AuditEvent:
"""Immutable audit event record"""
event_id: str
event_type: AuditEventType
timestamp: datetime
user_id: Optional[str]
username: Optional[str]
ip_address: str
user_agent: str
session_id: Optional[str]
# Resource information
resource_type: Optional[str] = None
resource_id: Optional[str] = None
# Action details
action: str = ""
details: Dict[str, Any] = None
# For PHI access
patient_id: Optional[str] = None
phi_fields_accessed: List[str] = None
# Integrity
checksum: str = None
def __post_init__(self):
# Generate checksum for integrity verification
if not self.checksum:
self.checksum = self._generate_checksum()
def _generate_checksum(self) -> str:
"""Generate SHA-256 checksum of event data"""
data = {
"event_id": self.event_id,
"event_type": self.event_type.value,
"timestamp": self.timestamp.isoformat(),
"user_id": self.user_id,
"ip_address": self.ip_address,
"resource_id": self.resource_id,
"action": self.action
}
return hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
class AuditService:
"""Comprehensive HIPAA audit logging service"""
def __init__(self, db_session, event_queue=None):
self.db = db_session
self.queue = event_queue # For async processing
async def log_phi_access(
self,
user_id: str,
patient_id: str,
access_type: str,
fields_accessed: List[str],
request_context: dict
):
"""Log PHI access with full context"""
event_type = {
"view": AuditEventType.PHI_VIEW,
"create": AuditEventType.PHI_CREATE,
"update": AuditEventType.PHI_UPDATE,
"delete": AuditEventType.PHI_DELETE,
"export": AuditEventType.PHI_EXPORT,
"print": AuditEventType.PHI_PRINT
}.get(access_type, AuditEventType.PHI_VIEW)
event = AuditEvent(
event_id=self._generate_event_id(),
event_type=event_type,
timestamp=datetime.utcnow(),
user_id=user_id,
username=request_context.get("username"),
ip_address=request_context.get("ip_address"),
user_agent=request_context.get("user_agent"),
session_id=request_context.get("session_id"),
resource_type="patient",
resource_id=patient_id,
patient_id=patient_id,
phi_fields_accessed=fields_accessed,
action=f"Accessed patient {access_type}",
details={
"fields": fields_accessed,
"purpose": request_context.get("purpose"),
"break_glass": request_context.get("break_glass", False)
}
)
await self._persist_event(event)
# Check for anomalies
await self._check_anomalies(event)
async def log_authentication(
self,
user_id: Optional[str],
action: str,
ip_address: str,
user_agent: str,
session_id: Optional[str] = None,
failure_reason: Optional[str] = None
):
"""Log authentication events"""
event_type = AuditEventType.LOGIN_SUCCESS if action == "login_success" \
else AuditEventType.LOGIN_FAILURE
event = AuditEvent(
event_id=self._generate_event_id(),
event_type=event_type,
timestamp=datetime.utcnow(),
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
session_id=session_id,
action=action,
details={"failure_reason": failure_reason} if failure_reason else None
)
await self._persist_event(event)
async def generate_accounting_of_disclosures(
self,
patient_id: str,
start_date: datetime,
end_date: datetime
) -> List[dict]:
"""Generate HIPAA-required Accounting of Disclosures report"""
events = await self._query_events(
patient_id=patient_id,
event_types=[AuditEventType.PHI_VIEW, AuditEventType.PHI_EXPORT],
start_date=start_date,
end_date=end_date
)
# Format for patient disclosure
disclosures = []
for event in events:
disclosures.append({
"date": event.timestamp.isoformat(),
"disclosed_to": event.username,
"purpose": event.details.get("purpose", "Treatment"),
"description": event.action
})
return disclosures
async def _check_anomalies(self, event: AuditEvent):
"""Check for anomalous patterns that might indicate breach"""
# Check high-volume access
if event.event_type == AuditEventType.PHI_VIEW:
recent_count = await self._count_recent_access(
user_id=event.user_id,
minutes=60
)
if recent_count > 100: # Threshold
await self._trigger_anomaly_alert(
event,
"High volume PHI access detected"
)
# Check after-hours access
hour = event.timestamp.hour
if hour < 6 or hour > 22: # Outside normal hours
await self._trigger_anomaly_alert(
event,
"After-hours PHI access"
)
Phase 3 Deliverables
- Comprehensive audit logging for all PHI access
- Authentication event logging
- Anomaly detection rules
- Accounting of Disclosures report
- Audit log integrity verification
Phase 4: API Security & Integration (Week 4)
Objectives
- Build secure REST API with HIPAA controls
- Implement API rate limiting and throttling
- Create secure third-party integration patterns
- Build data validation and sanitization
4.1 Secure API Endpoints
Copy
# backend/app/api/v1/endpoints/patients.py
from fastapi import APIRouter, Depends, HTTPException, Request
from typing import List
from app.api.deps import get_current_user, require_permissions
from app.models.user import User
from app.services.patient_service import PatientService
from app.services.audit_service import AuditService
from app.core.rbac import Permission
from app.schemas.patient import PatientCreate, PatientResponse
router = APIRouter()
@router.get("/{patient_id}", response_model=PatientResponse)
async def get_patient(
patient_id: str,
request: Request,
current_user: User = Depends(get_current_user),
_: bool = Depends(require_permissions([Permission.PATIENT_VIEW])),
patient_service: PatientService = Depends(),
audit_service: AuditService = Depends()
):
"""
Get patient by ID.
Requires: PATIENT_VIEW permission + treatment relationship
"""
# Check treatment relationship (minimum necessary)
if not await patient_service.verify_access(current_user.id, patient_id):
# Log access denial
await audit_service.log_access_denied(
user_id=current_user.id,
resource_type="patient",
resource_id=patient_id,
reason="No treatment relationship",
request_context=_get_request_context(request)
)
raise HTTPException(
status_code=403,
detail="Access denied - no treatment relationship"
)
# Get patient (decrypted)
patient = await patient_service.get_patient(patient_id)
if not patient:
raise HTTPException(status_code=404, detail="Patient not found")
# Log PHI access
await audit_service.log_phi_access(
user_id=current_user.id,
patient_id=patient_id,
access_type="view",
fields_accessed=["demographics", "contact"],
request_context=_get_request_context(request)
)
return patient
@router.get("/{patient_id}/full", response_model=PatientFullResponse)
async def get_patient_full(
patient_id: str,
request: Request,
current_user: User = Depends(get_current_user),
_: bool = Depends(require_permissions([
Permission.PATIENT_VIEW,
Permission.PATIENT_VIEW_SENSITIVE
])),
patient_service: PatientService = Depends(),
audit_service: AuditService = Depends()
):
"""
Get patient with sensitive fields (SSN, etc.).
Requires: PATIENT_VIEW + PATIENT_VIEW_SENSITIVE permissions
"""
# Verify access
if not await patient_service.verify_access(current_user.id, patient_id):
await audit_service.log_access_denied(
user_id=current_user.id,
resource_type="patient",
resource_id=patient_id,
reason="No treatment relationship",
request_context=_get_request_context(request)
)
raise HTTPException(status_code=403)
patient = await patient_service.get_patient_full(patient_id)
# Log with sensitive field access
await audit_service.log_phi_access(
user_id=current_user.id,
patient_id=patient_id,
access_type="view",
fields_accessed=["demographics", "contact", "ssn", "insurance"],
request_context=_get_request_context(request)
)
return patient
def _get_request_context(request: Request) -> dict:
"""Extract audit context from request"""
return {
"ip_address": request.client.host,
"user_agent": request.headers.get("user-agent"),
"session_id": request.state.session_id if hasattr(request.state, "session_id") else None,
"request_id": request.state.request_id,
"endpoint": request.url.path,
"method": request.method
}
Phase 4 Deliverables
- Secure API endpoints with authentication
- Permission-based access control on all endpoints
- Request/response validation
- Rate limiting implementation
- API documentation with security notes
Phase 5: Infrastructure & Deployment (Week 5)
Objectives
- Deploy to HIPAA-eligible cloud infrastructure
- Implement infrastructure as code
- Configure security monitoring
- Set up backup and disaster recovery
5.1 Terraform Infrastructure
Copy
# infrastructure/terraform/main.tf
# HIPAA-compliant VPC
module "vpc" {
source = "./modules/vpc"
name = "securehealth-${var.environment}"
cidr = "10.0.0.0/16"
# Private subnets for PHI systems
private_subnets = ["10.0.1.0/24", "10.0.2.0/24"]
# Public subnets for load balancer only
public_subnets = ["10.0.101.0/24", "10.0.102.0/24"]
enable_flow_logs = true
flow_logs_retention = 365 # 1 year minimum
tags = {
Environment = var.environment
Compliance = "HIPAA"
DataClass = "PHI"
}
}
# RDS PostgreSQL with encryption
resource "aws_db_instance" "main" {
identifier = "securehealth-${var.environment}"
engine = "postgres"
engine_version = "15"
instance_class = "db.r6g.large"
# Storage encryption
storage_encrypted = true
kms_key_id = aws_kms_key.database.arn
# Network isolation
db_subnet_group_name = aws_db_subnet_group.main.name
vpc_security_group_ids = [aws_security_group.database.id]
publicly_accessible = false
# Backup configuration (HIPAA requires 6 years)
backup_retention_period = 35 # Max for RDS, archive older
backup_window = "03:00-04:00"
# Enhanced monitoring
monitoring_interval = 60
monitoring_role_arn = aws_iam_role.rds_monitoring.arn
# Audit logging
enabled_cloudwatch_logs_exports = [
"postgresql",
"upgrade"
]
# Protection
deletion_protection = true
tags = {
Environment = var.environment
Compliance = "HIPAA"
DataClass = "PHI"
}
}
# KMS key for database encryption
resource "aws_kms_key" "database" {
description = "SecureHealth database encryption key"
deletion_window_in_days = 30
enable_key_rotation = true
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "Enable IAM User Permissions"
Effect = "Allow"
Principal = {
AWS = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
}
Action = "kms:*"
Resource = "*"
},
{
Sid = "Allow RDS to use the key"
Effect = "Allow"
Principal = {
Service = "rds.amazonaws.com"
}
Action = [
"kms:Encrypt",
"kms:Decrypt",
"kms:GenerateDataKey*"
]
Resource = "*"
}
]
})
tags = {
Environment = var.environment
Compliance = "HIPAA"
Purpose = "Database encryption"
}
}
# CloudTrail for audit logging
resource "aws_cloudtrail" "hipaa_trail" {
name = "securehealth-hipaa-trail"
s3_bucket_name = aws_s3_bucket.cloudtrail.id
include_global_service_events = true
is_multi_region_trail = true
enable_log_file_validation = true
kms_key_id = aws_kms_key.cloudtrail.arn
event_selector {
read_write_type = "All"
include_management_events = true
data_resource {
type = "AWS::S3::Object"
values = ["arn:aws:s3:::"]
}
}
tags = {
Environment = var.environment
Compliance = "HIPAA"
}
}
Phase 5 Deliverables
- HIPAA-compliant AWS infrastructure
- Terraform modules for all components
- Security groups with least privilege
- CloudTrail and CloudWatch configuration
- Backup and DR procedures
Phase 6: Testing & Compliance Validation (Week 6)
Objectives
- Security testing (penetration testing prep)
- Compliance checklist validation
- Documentation completion
- Final review and sign-off
6.1 Security Testing Checklist
Copy
# tests/security/test_security_controls.py
import pytest
from httpx import AsyncClient
class TestAuthenticationSecurity:
"""Security tests for authentication"""
async def test_password_requirements(self, client: AsyncClient):
"""Test password complexity is enforced"""
weak_passwords = [
"short", # Too short
"alllowercase123", # No uppercase
"ALLUPPERCASE123", # No lowercase
"NoNumbers!!!", # No numbers
"NoSpecial123Char", # No special characters
]
for password in weak_passwords:
response = await client.post("/api/v1/auth/register", json={
"username": "testuser",
"password": password,
"email": "[email protected]"
})
assert response.status_code == 400
assert "password" in response.json()["detail"].lower()
async def test_account_lockout(self, client: AsyncClient):
"""Test account lockout after failed attempts"""
# Attempt login 5 times with wrong password
for i in range(5):
await client.post("/api/v1/auth/login", json={
"username": "testuser",
"password": "wrongpassword"
})
# 6th attempt should indicate lockout
response = await client.post("/api/v1/auth/login", json={
"username": "testuser",
"password": "wrongpassword"
})
assert "locked" in response.json()["detail"].lower()
async def test_mfa_required(self, client: AsyncClient, valid_credentials: dict):
"""Test MFA is required for login"""
response = await client.post("/api/v1/auth/login", json={
"username": valid_credentials["username"],
"password": valid_credentials["password"]
# No MFA code
})
assert response.status_code == 400
assert "mfa" in response.json()["detail"].lower()
class TestAccessControl:
"""Security tests for access control"""
async def test_patient_access_denied_without_relationship(
self,
client: AsyncClient,
nurse_token: str,
unassigned_patient_id: str
):
"""Test access denied to patients without treatment relationship"""
response = await client.get(
f"/api/v1/patients/{unassigned_patient_id}",
headers={"Authorization": f"Bearer {nurse_token}"}
)
assert response.status_code == 403
async def test_break_glass_audit_logged(
self,
client: AsyncClient,
nurse_token: str,
audit_service: AuditService
):
"""Test break-glass access is properly logged"""
# Request break-glass
response = await client.post(
f"/api/v1/access/break-glass",
json={
"patient_id": "some-patient-id",
"reason": "Emergency"
},
headers={"Authorization": f"Bearer {nurse_token}"}
)
assert response.status_code == 200
# Verify audit log
events = await audit_service.get_recent_events(
event_type="BREAK_GLASS_REQUEST"
)
assert len(events) > 0
class TestPHIEncryption:
"""Security tests for PHI encryption"""
async def test_phi_encrypted_at_rest(self, db_session):
"""Verify PHI is encrypted in database"""
# Query raw database
result = await db_session.execute(
"SELECT first_name_encrypted FROM patients LIMIT 1"
)
row = result.fetchone()
# Should be binary (encrypted), not plaintext
assert isinstance(row[0], bytes)
assert b"John" not in row[0] # Name shouldn't be visible
async def test_sensitive_fields_not_in_logs(self, caplog):
"""Verify sensitive data not logged"""
# Perform operation that might log
# ...
# Check logs don't contain sensitive data
sensitive_patterns = ["ssn", "123-45-6789", "password"]
for pattern in sensitive_patterns:
assert pattern not in caplog.text.lower()
class TestAuditLogging:
"""Security tests for audit logging"""
async def test_all_phi_access_logged(
self,
client: AsyncClient,
physician_token: str,
patient_id: str,
audit_service: AuditService
):
"""Test all PHI access is logged"""
# Access patient
await client.get(
f"/api/v1/patients/{patient_id}",
headers={"Authorization": f"Bearer {physician_token}"}
)
# Verify audit log exists
events = await audit_service.get_patient_access_logs(patient_id)
assert len(events) > 0
latest = events[0]
assert latest.patient_id == patient_id
assert latest.event_type == "PHI_VIEW"
6.2 Compliance Checklist
Final Compliance Checklist
Final Compliance Checklist
Access Control (§164.312(a))
- Unique user identification
- Emergency access procedure (break-glass)
- Automatic logoff
- Encryption and decryption
- Audit log mechanism implemented
- Logs retained for 6 years
- Log integrity verification
- Regular log review process
- Authentication of ePHI
- Change detection mechanism
- Integrity controls
- Encryption in transit (TLS 1.2+)
- Risk analysis documented
- Risk management plan
- Sanction policy
- Information system activity review
- Access authorization policy
- Workstation use policy
- Device and media controls
- Audit controls policy
Phase 6 Deliverables
- Complete security test suite
- Penetration testing report (if applicable)
- Compliance checklist completed
- Risk assessment documentation
- Policies and procedures documentation
- User training materials
Grading Rubric
| Component | Weight | Criteria |
|---|---|---|
| Architecture & Design | 15% | Clean architecture, security-first design, proper PHI handling |
| Encryption Implementation | 20% | Proper encryption at rest and in transit, key management |
| Access Control | 20% | RBAC, minimum necessary, break-glass procedures |
| Audit Logging | 15% | Comprehensive logging, PHI access tracking, reporting |
| API Security | 10% | Secure endpoints, validation, rate limiting |
| Infrastructure | 10% | HIPAA-eligible services, proper configuration |
| Testing & Documentation | 10% | Security tests, compliance documentation |
Submission Requirements
-
Code Repository
- Complete source code
- README with setup instructions
- Environment configuration (without secrets)
-
Documentation
- Architecture diagram
- Security controls documentation
- Risk assessment summary
- Policies and procedures
-
Demo
- Working deployment (local or cloud)
- Walkthrough of security features
- Audit log demonstration
-
Testing
- Security test results
- Compliance checklist (completed)
- Any penetration testing results
Congratulations!
Upon completing this capstone, you will have:Built Real Security
A production-ready HIPAA-compliant healthcare application
Practical Experience
Hands-on implementation of encryption, access control, and auditing
Compliance Knowledge
Deep understanding of HIPAA technical requirements
Portfolio Project
A substantial project demonstrating healthcare security expertise