Skip to main content

Capstone Project: SecureHealth Platform

This capstone project challenges you to build a fully HIPAA-compliant healthcare application from the ground up. You’ll implement all the security controls, compliance measures, and best practices covered throughout this course.
Time Commitment: This project is designed for 40-60 hours of work over 4-6 weeks. Each phase builds on the previous, creating a production-ready healthcare platform.

Project Overview

What You’ll Build

SecureHealth - A patient portal and clinical management system featuring:

Patient Portal

Secure patient access to records, appointments, and messaging

Clinical Interface

Provider-facing system for patient management and documentation

Admin Dashboard

Compliance monitoring, audit logs, and system administration

Integration APIs

HIPAA-compliant APIs for third-party integrations

Technology Stack

# Recommended Technology Stack

stack = {
    "backend": {
        "framework": "FastAPI (Python 3.11+)",
        "orm": "SQLAlchemy 2.0",
        "database": "PostgreSQL 15+",
        "cache": "Redis 7+",
        "queue": "Celery with Redis"
    },
    "frontend": {
        "framework": "React 18+ or Next.js 14+",
        "state": "React Query / Zustand",
        "ui": "Tailwind CSS + shadcn/ui"
    },
    "security": {
        "auth": "Custom JWT + OAuth2",
        "encryption": "Python cryptography library",
        "key_management": "HashiCorp Vault or AWS KMS",
        "secrets": "HashiCorp Vault"
    },
    "infrastructure": {
        "cloud": "AWS (HIPAA-eligible services)",
        "containers": "Docker + ECS Fargate",
        "ci_cd": "GitHub Actions",
        "monitoring": "CloudWatch + custom audit logging"
    }
}

Phase 1: Foundation & Architecture (Week 1)

Objectives

  • Set up secure development environment
  • Design database schema with PHI classification
  • Implement core security infrastructure
  • Create project structure following security best practices

1.1 Project Setup

1

Repository Structure

Create project with security-focused structure:
securehealth/
├── backend/
│   ├── app/
│   │   ├── api/
│   │   │   ├── v1/
│   │   │   │   ├── endpoints/
│   │   │   │   └── router.py
│   │   ├── core/
│   │   │   ├── config.py
│   │   │   ├── security.py
│   │   │   └── encryption.py
│   │   ├── models/
│   │   ├── schemas/
│   │   ├── services/
│   │   └── middleware/
│   ├── tests/
│   └── alembic/
├── frontend/
├── infrastructure/
│   ├── terraform/
│   └── docker/
├── docs/
│   ├── security/
│   └── compliance/
└── scripts/
2

Security Configuration

Implement centralized security configuration:
# backend/app/core/config.py
from pydantic_settings import BaseSettings
from typing import List, Optional
import secrets

class SecuritySettings(BaseSettings):
    """Security configuration - all secrets from environment/vault"""
    
    # Database
    DATABASE_URL: str
    DATABASE_ENCRYPTION_KEY_ID: str
    
    # Authentication
    JWT_SECRET_KEY: str
    JWT_ALGORITHM: str = "RS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 15  # Short-lived
    REFRESH_TOKEN_EXPIRE_DAYS: int = 7
    
    # Encryption
    ENCRYPTION_KEY_ID: str  # AWS KMS or Vault key ID
    FIELD_ENCRYPTION_KEY_ID: str
    
    # Session
    SESSION_COOKIE_SECURE: bool = True
    SESSION_COOKIE_HTTPONLY: bool = True
    SESSION_COOKIE_SAMESITE: str = "strict"
    
    # CORS (restrictive)
    CORS_ORIGINS: List[str] = []
    
    # Rate Limiting
    RATE_LIMIT_PER_MINUTE: int = 60
    
    # Audit
    AUDIT_LOG_RETENTION_YEARS: int = 6
    
    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"
        case_sensitive = True
3

Database Schema Design

Design PHI-aware database schema:
# backend/app/models/patient.py
from sqlalchemy import Column, String, Date, Boolean, LargeBinary, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.db.base import Base
import uuid

class Patient(Base):
    """Patient model with field-level encryption for PHI"""
    __tablename__ = "patients"
    
    # Identifiers
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    mrn = Column(String(20), unique=True, nullable=False, index=True)
    
    # Encrypted PHI fields (stored as binary)
    first_name_encrypted = Column(LargeBinary, nullable=False)
    last_name_encrypted = Column(LargeBinary, nullable=False)
    ssn_encrypted = Column(LargeBinary, nullable=False)
    date_of_birth_encrypted = Column(LargeBinary, nullable=False)
    address_encrypted = Column(LargeBinary, nullable=False)
    phone_encrypted = Column(LargeBinary, nullable=False)
    email_encrypted = Column(LargeBinary, nullable=False)
    
    # Blind indexes for searchable encrypted fields
    ssn_hash = Column(LargeBinary, nullable=False, index=True)
    name_hash = Column(LargeBinary, nullable=False, index=True)
    dob_hash = Column(LargeBinary, nullable=False, index=True)
    
    # Non-PHI metadata
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    
    # Relationships
    encounters = relationship("Encounter", back_populates="patient")
    clinical_notes = relationship("ClinicalNote", back_populates="patient")
    
    # PHI classification
    __phi_fields__ = [
        "first_name", "last_name", "ssn", "date_of_birth",
        "address", "phone", "email"
    ]

1.2 Encryption Service

Implement the encryption service for PHI:
# backend/app/core/encryption.py
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
import os
import hashlib
import base64
from typing import Optional, Tuple
from dataclasses import dataclass

@dataclass
class EncryptedData:
    """Container for encrypted data with metadata"""
    ciphertext: bytes
    nonce: bytes
    key_version: int
    algorithm: str = "AES-256-GCM"

class PHIEncryptionService:
    """
    Field-level encryption service for PHI.
    Uses AES-256-GCM with unique nonce per encryption.
    """
    
    def __init__(self, key_provider):
        """
        Initialize with a key provider (Vault, KMS, etc.)
        
        Args:
            key_provider: Service that provides encryption keys
        """
        self.key_provider = key_provider
        self.current_key_version: int = 1
        self._key_cache: dict = {}
    
    def encrypt(self, plaintext: str, context: dict = None) -> EncryptedData:
        """
        Encrypt a PHI field.
        
        Args:
            plaintext: The data to encrypt
            context: Additional authenticated data (AAD)
        
        Returns:
            EncryptedData with ciphertext, nonce, and key version
        """
        if not plaintext:
            return None
        
        # Get current encryption key
        key = self._get_key(self.current_key_version)
        
        # Generate unique nonce (96 bits for GCM)
        nonce = os.urandom(12)
        
        # Create cipher
        aesgcm = AESGCM(key)
        
        # Prepare AAD if provided
        aad = self._prepare_aad(context) if context else None
        
        # Encrypt
        ciphertext = aesgcm.encrypt(nonce, plaintext.encode('utf-8'), aad)
        
        return EncryptedData(
            ciphertext=ciphertext,
            nonce=nonce,
            key_version=self.current_key_version
        )
    
    def decrypt(self, encrypted_data: EncryptedData, context: dict = None) -> str:
        """
        Decrypt a PHI field.
        
        Args:
            encrypted_data: The encrypted data container
            context: Additional authenticated data (must match encryption)
        
        Returns:
            Decrypted plaintext string
        """
        if not encrypted_data:
            return None
        
        # Get the correct key version
        key = self._get_key(encrypted_data.key_version)
        
        # Create cipher
        aesgcm = AESGCM(key)
        
        # Prepare AAD if provided
        aad = self._prepare_aad(context) if context else None
        
        # Decrypt
        plaintext = aesgcm.decrypt(
            encrypted_data.nonce,
            encrypted_data.ciphertext,
            aad
        )
        
        return plaintext.decode('utf-8')
    
    def create_blind_index(self, value: str, salt: bytes = None) -> bytes:
        """
        Create a blind index for encrypted field searching.
        Uses HMAC-SHA256 for deterministic but non-reversible indexing.
        
        Args:
            value: The value to index
            salt: Optional salt (should be consistent per field)
        
        Returns:
            32-byte hash suitable for exact-match searching
        """
        if not value:
            return None
        
        # Normalize the value
        normalized = value.lower().strip()
        
        # Get indexing key (different from encryption key)
        index_key = self.key_provider.get_indexing_key()
        
        # Create HMAC
        h = hmac.new(index_key, normalized.encode('utf-8'), hashlib.sha256)
        
        if salt:
            h.update(salt)
        
        return h.digest()
    
    def rotate_key(self, old_version: int, new_version: int):
        """
        Re-encrypt data with new key version.
        This should be called during key rotation.
        """
        # Implementation depends on your key rotation strategy
        pass
    
    def _get_key(self, version: int) -> bytes:
        """Get encryption key for specified version"""
        if version not in self._key_cache:
            self._key_cache[version] = self.key_provider.get_key(version)
        return self._key_cache[version]
    
    def _prepare_aad(self, context: dict) -> bytes:
        """Prepare additional authenticated data from context"""
        import json
        return json.dumps(context, sort_keys=True).encode('utf-8')


# Key Provider Interface
from abc import ABC, abstractmethod

class KeyProvider(ABC):
    """Abstract base for key providers"""
    
    @abstractmethod
    def get_key(self, version: int) -> bytes:
        """Get encryption key by version"""
        pass
    
    @abstractmethod
    def get_indexing_key(self) -> bytes:
        """Get key for blind indexing"""
        pass


class VaultKeyProvider(KeyProvider):
    """HashiCorp Vault key provider"""
    
    def __init__(self, vault_client, key_path: str):
        self.client = vault_client
        self.key_path = key_path
    
    def get_key(self, version: int) -> bytes:
        secret = self.client.secrets.kv.v2.read_secret_version(
            path=f"{self.key_path}/v{version}"
        )
        return base64.b64decode(secret['data']['data']['key'])
    
    def get_indexing_key(self) -> bytes:
        secret = self.client.secrets.kv.v2.read_secret_version(
            path=f"{self.key_path}/indexing"
        )
        return base64.b64decode(secret['data']['data']['key'])


class AWSKMSKeyProvider(KeyProvider):
    """AWS KMS key provider using envelope encryption"""
    
    def __init__(self, kms_client, key_id: str):
        self.kms = kms_client
        self.key_id = key_id
        self._data_keys: dict = {}
    
    def get_key(self, version: int) -> bytes:
        """Get data key from KMS"""
        if version not in self._data_keys:
            # Generate data key
            response = self.kms.generate_data_key(
                KeyId=self.key_id,
                KeySpec='AES_256',
                EncryptionContext={'version': str(version)}
            )
            self._data_keys[version] = response['Plaintext']
        return self._data_keys[version]
    
    def get_indexing_key(self) -> bytes:
        """Get indexing key from KMS"""
        response = self.kms.generate_data_key(
            KeyId=self.key_id,
            KeySpec='AES_256',
            EncryptionContext={'purpose': 'blind_indexing'}
        )
        return response['Plaintext']

Phase 1 Deliverables

  • Project repository with security-focused structure
  • Database schema with PHI classification
  • Encryption service with key management integration
  • Development environment with secrets management
  • Initial security documentation

Phase 2: Authentication & Access Control (Week 2)

Objectives

  • Implement secure authentication with MFA
  • Build role-based access control (RBAC)
  • Create session management with security controls
  • Implement break-glass procedures

2.1 Authentication System

# backend/app/services/auth_service.py
from datetime import datetime, timedelta
from typing import Optional, Tuple
from jose import jwt, JWTError
from passlib.context import CryptContext
import pyotp
import secrets
from app.core.config import settings
from app.models.user import User
from app.services.audit_service import AuditService

class AuthenticationService:
    """HIPAA-compliant authentication service"""
    
    def __init__(self, db_session, audit_service: AuditService):
        self.db = db_session
        self.audit = audit_service
        self.pwd_context = CryptContext(
            schemes=["argon2"],  # Use Argon2id for password hashing
            deprecated="auto"
        )
        # Account lockout settings
        self.max_failed_attempts = 5
        self.lockout_duration_minutes = 30
    
    async def authenticate(
        self,
        username: str,
        password: str,
        mfa_code: Optional[str],
        ip_address: str,
        user_agent: str
    ) -> Tuple[bool, Optional[dict], str]:
        """
        Authenticate user with password and MFA.
        
        Returns:
            Tuple of (success, tokens, message)
        """
        # Get user
        user = await self._get_user_by_username(username)
        
        if not user:
            # Don't reveal if username exists
            await self._log_failed_auth(username, ip_address, "user_not_found")
            return False, None, "Invalid credentials"
        
        # Check account lockout
        if await self._is_account_locked(user):
            await self._log_failed_auth(username, ip_address, "account_locked")
            return False, None, "Account temporarily locked"
        
        # Verify password
        if not self._verify_password(password, user.password_hash):
            await self._increment_failed_attempts(user)
            await self._log_failed_auth(username, ip_address, "invalid_password")
            return False, None, "Invalid credentials"
        
        # MFA required for all users
        if not mfa_code:
            return False, None, "MFA code required"
        
        if not self._verify_mfa(user, mfa_code):
            await self._increment_failed_attempts(user)
            await self._log_failed_auth(username, ip_address, "invalid_mfa")
            return False, None, "Invalid MFA code"
        
        # Authentication successful
        await self._reset_failed_attempts(user)
        
        # Generate tokens
        tokens = await self._generate_tokens(user)
        
        # Create session
        session = await self._create_session(user, ip_address, user_agent)
        
        # Audit log
        await self.audit.log_authentication(
            user_id=user.id,
            action="login_success",
            ip_address=ip_address,
            user_agent=user_agent,
            session_id=session.id
        )
        
        return True, tokens, "Authentication successful"
    
    def _verify_password(self, plain: str, hashed: str) -> bool:
        """Verify password using Argon2"""
        return self.pwd_context.verify(plain, hashed)
    
    def _hash_password(self, password: str) -> str:
        """Hash password using Argon2"""
        return self.pwd_context.hash(password)
    
    def _verify_mfa(self, user: User, code: str) -> bool:
        """Verify TOTP MFA code"""
        totp = pyotp.TOTP(user.mfa_secret)
        # Allow 1 window tolerance for clock skew
        return totp.verify(code, valid_window=1)
    
    async def _generate_tokens(self, user: User) -> dict:
        """Generate access and refresh tokens"""
        now = datetime.utcnow()
        
        # Access token (short-lived)
        access_payload = {
            "sub": str(user.id),
            "type": "access",
            "roles": [role.name for role in user.roles],
            "permissions": self._get_user_permissions(user),
            "iat": now,
            "exp": now + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
        }
        access_token = jwt.encode(
            access_payload,
            settings.JWT_SECRET_KEY,
            algorithm=settings.JWT_ALGORITHM
        )
        
        # Refresh token (longer-lived, stored in DB)
        refresh_token = secrets.token_urlsafe(32)
        await self._store_refresh_token(user, refresh_token)
        
        return {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "bearer",
            "expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
        }
    
    async def _is_account_locked(self, user: User) -> bool:
        """Check if account is locked due to failed attempts"""
        if user.failed_login_attempts >= self.max_failed_attempts:
            if user.last_failed_login:
                lockout_until = user.last_failed_login + timedelta(
                    minutes=self.lockout_duration_minutes
                )
                if datetime.utcnow() < lockout_until:
                    return True
        return False

2.2 Role-Based Access Control

# backend/app/services/rbac_service.py
from typing import List, Set, Optional
from dataclasses import dataclass
from enum import Enum

class Permission(Enum):
    """Fine-grained permissions for healthcare operations"""
    
    # Patient operations
    PATIENT_VIEW = "patient:view"
    PATIENT_CREATE = "patient:create"
    PATIENT_UPDATE = "patient:update"
    PATIENT_DELETE = "patient:delete"
    PATIENT_VIEW_SENSITIVE = "patient:view_sensitive"  # SSN, etc.
    
    # Clinical operations
    ENCOUNTER_VIEW = "encounter:view"
    ENCOUNTER_CREATE = "encounter:create"
    ENCOUNTER_UPDATE = "encounter:update"
    CLINICAL_NOTE_VIEW = "clinical_note:view"
    CLINICAL_NOTE_CREATE = "clinical_note:create"
    CLINICAL_NOTE_SIGN = "clinical_note:sign"
    
    # Medication operations
    MEDICATION_VIEW = "medication:view"
    MEDICATION_PRESCRIBE = "medication:prescribe"
    CONTROLLED_SUBSTANCE_PRESCRIBE = "controlled_substance:prescribe"
    
    # Administrative
    USER_MANAGE = "user:manage"
    ROLE_MANAGE = "role:manage"
    AUDIT_VIEW = "audit:view"
    SYSTEM_CONFIG = "system:config"
    
    # Break-glass
    BREAK_GLASS_REQUEST = "break_glass:request"
    BREAK_GLASS_APPROVE = "break_glass:approve"

@dataclass
class Role:
    """Role definition with permissions"""
    name: str
    description: str
    permissions: Set[Permission]
    is_clinical: bool = False
    requires_mfa: bool = True

# Pre-defined roles
ROLES = {
    "physician": Role(
        name="physician",
        description="Licensed physician with full clinical access",
        permissions={
            Permission.PATIENT_VIEW,
            Permission.PATIENT_CREATE,
            Permission.PATIENT_UPDATE,
            Permission.PATIENT_VIEW_SENSITIVE,
            Permission.ENCOUNTER_VIEW,
            Permission.ENCOUNTER_CREATE,
            Permission.ENCOUNTER_UPDATE,
            Permission.CLINICAL_NOTE_VIEW,
            Permission.CLINICAL_NOTE_CREATE,
            Permission.CLINICAL_NOTE_SIGN,
            Permission.MEDICATION_VIEW,
            Permission.MEDICATION_PRESCRIBE,
            Permission.CONTROLLED_SUBSTANCE_PRESCRIBE,
            Permission.BREAK_GLASS_REQUEST,
        },
        is_clinical=True
    ),
    "nurse": Role(
        name="nurse",
        description="Registered nurse with clinical access",
        permissions={
            Permission.PATIENT_VIEW,
            Permission.PATIENT_UPDATE,
            Permission.ENCOUNTER_VIEW,
            Permission.ENCOUNTER_CREATE,
            Permission.CLINICAL_NOTE_VIEW,
            Permission.CLINICAL_NOTE_CREATE,
            Permission.MEDICATION_VIEW,
            Permission.BREAK_GLASS_REQUEST,
        },
        is_clinical=True
    ),
    "medical_assistant": Role(
        name="medical_assistant",
        description="Medical assistant with limited clinical access",
        permissions={
            Permission.PATIENT_VIEW,
            Permission.ENCOUNTER_VIEW,
            Permission.ENCOUNTER_CREATE,
            Permission.CLINICAL_NOTE_VIEW,
        },
        is_clinical=True
    ),
    "billing": Role(
        name="billing",
        description="Billing staff with limited PHI access",
        permissions={
            Permission.PATIENT_VIEW,
            Permission.ENCOUNTER_VIEW,
        },
        is_clinical=False
    ),
    "admin": Role(
        name="admin",
        description="System administrator",
        permissions={
            Permission.USER_MANAGE,
            Permission.ROLE_MANAGE,
            Permission.AUDIT_VIEW,
            Permission.SYSTEM_CONFIG,
            Permission.BREAK_GLASS_APPROVE,
        },
        is_clinical=False
    )
}

class RBACService:
    """Role-based access control service"""
    
    def __init__(self, db_session):
        self.db = db_session
    
    def check_permission(
        self,
        user_id: str,
        permission: Permission,
        resource_id: Optional[str] = None
    ) -> bool:
        """
        Check if user has permission, optionally for specific resource.
        
        Implements minimum necessary principle by checking:
        1. User has the permission
        2. User has legitimate access to the resource
        """
        user = self._get_user(user_id)
        
        # Check role permissions
        user_permissions = self._get_user_permissions(user)
        if permission not in user_permissions:
            return False
        
        # If resource-specific, check resource access
        if resource_id:
            return self._check_resource_access(user, permission, resource_id)
        
        return True
    
    def _check_resource_access(
        self,
        user,
        permission: Permission,
        resource_id: str
    ) -> bool:
        """
        Check minimum necessary access to specific resource.
        
        For patients: Check treatment relationship
        For other resources: Check ownership or assignment
        """
        # Implement based on resource type
        if permission.value.startswith("patient:"):
            return self._check_patient_access(user, resource_id)
        
        return True
    
    def _check_patient_access(self, user, patient_id: str) -> bool:
        """
        Verify user has legitimate access to patient.
        
        Access granted if:
        - User is assigned to patient
        - User is on care team
        - User has active break-glass
        """
        # Check direct assignment
        if self._is_assigned_to_patient(user.id, patient_id):
            return True
        
        # Check care team
        if self._is_on_care_team(user.id, patient_id):
            return True
        
        # Check break-glass
        if self._has_active_break_glass(user.id, patient_id):
            return True
        
        return False

Phase 2 Deliverables

  • Authentication service with MFA
  • RBAC implementation with minimum necessary
  • Session management with security controls
  • Break-glass procedure implementation
  • Password policy enforcement

Phase 3: Audit Logging & Monitoring (Week 3)

Objectives

  • Implement comprehensive audit logging
  • Create PHI access tracking
  • Build anomaly detection system
  • Design audit reporting for compliance

3.1 Audit Logging Service

# backend/app/services/audit_service.py
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
from dataclasses import dataclass, asdict
import json
import hashlib

class AuditEventType(Enum):
    """Categorized audit event types"""
    
    # Authentication
    LOGIN_SUCCESS = "auth.login.success"
    LOGIN_FAILURE = "auth.login.failure"
    LOGOUT = "auth.logout"
    MFA_SETUP = "auth.mfa.setup"
    PASSWORD_CHANGE = "auth.password.change"
    SESSION_TIMEOUT = "auth.session.timeout"
    
    # PHI Access
    PHI_VIEW = "phi.view"
    PHI_CREATE = "phi.create"
    PHI_UPDATE = "phi.update"
    PHI_DELETE = "phi.delete"
    PHI_EXPORT = "phi.export"
    PHI_PRINT = "phi.print"
    
    # Break-Glass
    BREAK_GLASS_REQUEST = "access.break_glass.request"
    BREAK_GLASS_APPROVE = "access.break_glass.approve"
    BREAK_GLASS_DENY = "access.break_glass.deny"
    BREAK_GLASS_EXPIRE = "access.break_glass.expire"
    
    # System
    CONFIG_CHANGE = "system.config.change"
    USER_CREATE = "system.user.create"
    USER_UPDATE = "system.user.update"
    ROLE_CHANGE = "system.role.change"
    
    # Security
    SECURITY_ALERT = "security.alert"
    ANOMALY_DETECTED = "security.anomaly"
    ACCESS_DENIED = "security.access.denied"

@dataclass
class AuditEvent:
    """Immutable audit event record"""
    
    event_id: str
    event_type: AuditEventType
    timestamp: datetime
    user_id: Optional[str]
    username: Optional[str]
    ip_address: str
    user_agent: str
    session_id: Optional[str]
    
    # Resource information
    resource_type: Optional[str] = None
    resource_id: Optional[str] = None
    
    # Action details
    action: str = ""
    details: Dict[str, Any] = None
    
    # For PHI access
    patient_id: Optional[str] = None
    phi_fields_accessed: List[str] = None
    
    # Integrity
    checksum: str = None
    
    def __post_init__(self):
        # Generate checksum for integrity verification
        if not self.checksum:
            self.checksum = self._generate_checksum()
    
    def _generate_checksum(self) -> str:
        """Generate SHA-256 checksum of event data"""
        data = {
            "event_id": self.event_id,
            "event_type": self.event_type.value,
            "timestamp": self.timestamp.isoformat(),
            "user_id": self.user_id,
            "ip_address": self.ip_address,
            "resource_id": self.resource_id,
            "action": self.action
        }
        return hashlib.sha256(
            json.dumps(data, sort_keys=True).encode()
        ).hexdigest()

class AuditService:
    """Comprehensive HIPAA audit logging service"""
    
    def __init__(self, db_session, event_queue=None):
        self.db = db_session
        self.queue = event_queue  # For async processing
    
    async def log_phi_access(
        self,
        user_id: str,
        patient_id: str,
        access_type: str,
        fields_accessed: List[str],
        request_context: dict
    ):
        """Log PHI access with full context"""
        
        event_type = {
            "view": AuditEventType.PHI_VIEW,
            "create": AuditEventType.PHI_CREATE,
            "update": AuditEventType.PHI_UPDATE,
            "delete": AuditEventType.PHI_DELETE,
            "export": AuditEventType.PHI_EXPORT,
            "print": AuditEventType.PHI_PRINT
        }.get(access_type, AuditEventType.PHI_VIEW)
        
        event = AuditEvent(
            event_id=self._generate_event_id(),
            event_type=event_type,
            timestamp=datetime.utcnow(),
            user_id=user_id,
            username=request_context.get("username"),
            ip_address=request_context.get("ip_address"),
            user_agent=request_context.get("user_agent"),
            session_id=request_context.get("session_id"),
            resource_type="patient",
            resource_id=patient_id,
            patient_id=patient_id,
            phi_fields_accessed=fields_accessed,
            action=f"Accessed patient {access_type}",
            details={
                "fields": fields_accessed,
                "purpose": request_context.get("purpose"),
                "break_glass": request_context.get("break_glass", False)
            }
        )
        
        await self._persist_event(event)
        
        # Check for anomalies
        await self._check_anomalies(event)
    
    async def log_authentication(
        self,
        user_id: Optional[str],
        action: str,
        ip_address: str,
        user_agent: str,
        session_id: Optional[str] = None,
        failure_reason: Optional[str] = None
    ):
        """Log authentication events"""
        
        event_type = AuditEventType.LOGIN_SUCCESS if action == "login_success" \
            else AuditEventType.LOGIN_FAILURE
        
        event = AuditEvent(
            event_id=self._generate_event_id(),
            event_type=event_type,
            timestamp=datetime.utcnow(),
            user_id=user_id,
            ip_address=ip_address,
            user_agent=user_agent,
            session_id=session_id,
            action=action,
            details={"failure_reason": failure_reason} if failure_reason else None
        )
        
        await self._persist_event(event)
    
    async def generate_accounting_of_disclosures(
        self,
        patient_id: str,
        start_date: datetime,
        end_date: datetime
    ) -> List[dict]:
        """Generate HIPAA-required Accounting of Disclosures report"""
        
        events = await self._query_events(
            patient_id=patient_id,
            event_types=[AuditEventType.PHI_VIEW, AuditEventType.PHI_EXPORT],
            start_date=start_date,
            end_date=end_date
        )
        
        # Format for patient disclosure
        disclosures = []
        for event in events:
            disclosures.append({
                "date": event.timestamp.isoformat(),
                "disclosed_to": event.username,
                "purpose": event.details.get("purpose", "Treatment"),
                "description": event.action
            })
        
        return disclosures
    
    async def _check_anomalies(self, event: AuditEvent):
        """Check for anomalous patterns that might indicate breach"""
        
        # Check high-volume access
        if event.event_type == AuditEventType.PHI_VIEW:
            recent_count = await self._count_recent_access(
                user_id=event.user_id,
                minutes=60
            )
            if recent_count > 100:  # Threshold
                await self._trigger_anomaly_alert(
                    event,
                    "High volume PHI access detected"
                )
        
        # Check after-hours access
        hour = event.timestamp.hour
        if hour < 6 or hour > 22:  # Outside normal hours
            await self._trigger_anomaly_alert(
                event,
                "After-hours PHI access"
            )

Phase 3 Deliverables

  • Comprehensive audit logging for all PHI access
  • Authentication event logging
  • Anomaly detection rules
  • Accounting of Disclosures report
  • Audit log integrity verification

Phase 4: API Security & Integration (Week 4)

Objectives

  • Build secure REST API with HIPAA controls
  • Implement API rate limiting and throttling
  • Create secure third-party integration patterns
  • Build data validation and sanitization

4.1 Secure API Endpoints

# backend/app/api/v1/endpoints/patients.py
from fastapi import APIRouter, Depends, HTTPException, Request
from typing import List
from app.api.deps import get_current_user, require_permissions
from app.models.user import User
from app.services.patient_service import PatientService
from app.services.audit_service import AuditService
from app.core.rbac import Permission
from app.schemas.patient import PatientCreate, PatientResponse

router = APIRouter()

@router.get("/{patient_id}", response_model=PatientResponse)
async def get_patient(
    patient_id: str,
    request: Request,
    current_user: User = Depends(get_current_user),
    _: bool = Depends(require_permissions([Permission.PATIENT_VIEW])),
    patient_service: PatientService = Depends(),
    audit_service: AuditService = Depends()
):
    """
    Get patient by ID.
    
    Requires: PATIENT_VIEW permission + treatment relationship
    """
    # Check treatment relationship (minimum necessary)
    if not await patient_service.verify_access(current_user.id, patient_id):
        # Log access denial
        await audit_service.log_access_denied(
            user_id=current_user.id,
            resource_type="patient",
            resource_id=patient_id,
            reason="No treatment relationship",
            request_context=_get_request_context(request)
        )
        raise HTTPException(
            status_code=403,
            detail="Access denied - no treatment relationship"
        )
    
    # Get patient (decrypted)
    patient = await patient_service.get_patient(patient_id)
    
    if not patient:
        raise HTTPException(status_code=404, detail="Patient not found")
    
    # Log PHI access
    await audit_service.log_phi_access(
        user_id=current_user.id,
        patient_id=patient_id,
        access_type="view",
        fields_accessed=["demographics", "contact"],
        request_context=_get_request_context(request)
    )
    
    return patient

@router.get("/{patient_id}/full", response_model=PatientFullResponse)
async def get_patient_full(
    patient_id: str,
    request: Request,
    current_user: User = Depends(get_current_user),
    _: bool = Depends(require_permissions([
        Permission.PATIENT_VIEW,
        Permission.PATIENT_VIEW_SENSITIVE
    ])),
    patient_service: PatientService = Depends(),
    audit_service: AuditService = Depends()
):
    """
    Get patient with sensitive fields (SSN, etc.).
    
    Requires: PATIENT_VIEW + PATIENT_VIEW_SENSITIVE permissions
    """
    # Verify access
    if not await patient_service.verify_access(current_user.id, patient_id):
        await audit_service.log_access_denied(
            user_id=current_user.id,
            resource_type="patient",
            resource_id=patient_id,
            reason="No treatment relationship",
            request_context=_get_request_context(request)
        )
        raise HTTPException(status_code=403)
    
    patient = await patient_service.get_patient_full(patient_id)
    
    # Log with sensitive field access
    await audit_service.log_phi_access(
        user_id=current_user.id,
        patient_id=patient_id,
        access_type="view",
        fields_accessed=["demographics", "contact", "ssn", "insurance"],
        request_context=_get_request_context(request)
    )
    
    return patient

def _get_request_context(request: Request) -> dict:
    """Extract audit context from request"""
    return {
        "ip_address": request.client.host,
        "user_agent": request.headers.get("user-agent"),
        "session_id": request.state.session_id if hasattr(request.state, "session_id") else None,
        "request_id": request.state.request_id,
        "endpoint": request.url.path,
        "method": request.method
    }

Phase 4 Deliverables

  • Secure API endpoints with authentication
  • Permission-based access control on all endpoints
  • Request/response validation
  • Rate limiting implementation
  • API documentation with security notes

Phase 5: Infrastructure & Deployment (Week 5)

Objectives

  • Deploy to HIPAA-eligible cloud infrastructure
  • Implement infrastructure as code
  • Configure security monitoring
  • Set up backup and disaster recovery

5.1 Terraform Infrastructure

# infrastructure/terraform/main.tf

# HIPAA-compliant VPC
module "vpc" {
  source = "./modules/vpc"
  
  name = "securehealth-${var.environment}"
  cidr = "10.0.0.0/16"
  
  # Private subnets for PHI systems
  private_subnets = ["10.0.1.0/24", "10.0.2.0/24"]
  
  # Public subnets for load balancer only
  public_subnets = ["10.0.101.0/24", "10.0.102.0/24"]
  
  enable_flow_logs = true
  flow_logs_retention = 365  # 1 year minimum
  
  tags = {
    Environment = var.environment
    Compliance  = "HIPAA"
    DataClass   = "PHI"
  }
}

# RDS PostgreSQL with encryption
resource "aws_db_instance" "main" {
  identifier = "securehealth-${var.environment}"
  
  engine         = "postgres"
  engine_version = "15"
  instance_class = "db.r6g.large"
  
  # Storage encryption
  storage_encrypted = true
  kms_key_id       = aws_kms_key.database.arn
  
  # Network isolation
  db_subnet_group_name   = aws_db_subnet_group.main.name
  vpc_security_group_ids = [aws_security_group.database.id]
  publicly_accessible    = false
  
  # Backup configuration (HIPAA requires 6 years)
  backup_retention_period = 35  # Max for RDS, archive older
  backup_window          = "03:00-04:00"
  
  # Enhanced monitoring
  monitoring_interval = 60
  monitoring_role_arn = aws_iam_role.rds_monitoring.arn
  
  # Audit logging
  enabled_cloudwatch_logs_exports = [
    "postgresql",
    "upgrade"
  ]
  
  # Protection
  deletion_protection = true
  
  tags = {
    Environment = var.environment
    Compliance  = "HIPAA"
    DataClass   = "PHI"
  }
}

# KMS key for database encryption
resource "aws_kms_key" "database" {
  description             = "SecureHealth database encryption key"
  deletion_window_in_days = 30
  enable_key_rotation     = true
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "Enable IAM User Permissions"
        Effect = "Allow"
        Principal = {
          AWS = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
        }
        Action   = "kms:*"
        Resource = "*"
      },
      {
        Sid    = "Allow RDS to use the key"
        Effect = "Allow"
        Principal = {
          Service = "rds.amazonaws.com"
        }
        Action = [
          "kms:Encrypt",
          "kms:Decrypt",
          "kms:GenerateDataKey*"
        ]
        Resource = "*"
      }
    ]
  })
  
  tags = {
    Environment = var.environment
    Compliance  = "HIPAA"
    Purpose     = "Database encryption"
  }
}

# CloudTrail for audit logging
resource "aws_cloudtrail" "hipaa_trail" {
  name                          = "securehealth-hipaa-trail"
  s3_bucket_name               = aws_s3_bucket.cloudtrail.id
  include_global_service_events = true
  is_multi_region_trail        = true
  enable_log_file_validation   = true
  kms_key_id                   = aws_kms_key.cloudtrail.arn
  
  event_selector {
    read_write_type           = "All"
    include_management_events = true
    
    data_resource {
      type   = "AWS::S3::Object"
      values = ["arn:aws:s3:::"]
    }
  }
  
  tags = {
    Environment = var.environment
    Compliance  = "HIPAA"
  }
}

Phase 5 Deliverables

  • HIPAA-compliant AWS infrastructure
  • Terraform modules for all components
  • Security groups with least privilege
  • CloudTrail and CloudWatch configuration
  • Backup and DR procedures

Phase 6: Testing & Compliance Validation (Week 6)

Objectives

  • Security testing (penetration testing prep)
  • Compliance checklist validation
  • Documentation completion
  • Final review and sign-off

6.1 Security Testing Checklist

# tests/security/test_security_controls.py
import pytest
from httpx import AsyncClient

class TestAuthenticationSecurity:
    """Security tests for authentication"""
    
    async def test_password_requirements(self, client: AsyncClient):
        """Test password complexity is enforced"""
        weak_passwords = [
            "short",  # Too short
            "alllowercase123",  # No uppercase
            "ALLUPPERCASE123",  # No lowercase
            "NoNumbers!!!",  # No numbers
            "NoSpecial123Char",  # No special characters
        ]
        
        for password in weak_passwords:
            response = await client.post("/api/v1/auth/register", json={
                "username": "testuser",
                "password": password,
                "email": "[email protected]"
            })
            assert response.status_code == 400
            assert "password" in response.json()["detail"].lower()
    
    async def test_account_lockout(self, client: AsyncClient):
        """Test account lockout after failed attempts"""
        # Attempt login 5 times with wrong password
        for i in range(5):
            await client.post("/api/v1/auth/login", json={
                "username": "testuser",
                "password": "wrongpassword"
            })
        
        # 6th attempt should indicate lockout
        response = await client.post("/api/v1/auth/login", json={
            "username": "testuser",
            "password": "wrongpassword"
        })
        assert "locked" in response.json()["detail"].lower()
    
    async def test_mfa_required(self, client: AsyncClient, valid_credentials: dict):
        """Test MFA is required for login"""
        response = await client.post("/api/v1/auth/login", json={
            "username": valid_credentials["username"],
            "password": valid_credentials["password"]
            # No MFA code
        })
        assert response.status_code == 400
        assert "mfa" in response.json()["detail"].lower()

class TestAccessControl:
    """Security tests for access control"""
    
    async def test_patient_access_denied_without_relationship(
        self,
        client: AsyncClient,
        nurse_token: str,
        unassigned_patient_id: str
    ):
        """Test access denied to patients without treatment relationship"""
        response = await client.get(
            f"/api/v1/patients/{unassigned_patient_id}",
            headers={"Authorization": f"Bearer {nurse_token}"}
        )
        assert response.status_code == 403
    
    async def test_break_glass_audit_logged(
        self,
        client: AsyncClient,
        nurse_token: str,
        audit_service: AuditService
    ):
        """Test break-glass access is properly logged"""
        # Request break-glass
        response = await client.post(
            f"/api/v1/access/break-glass",
            json={
                "patient_id": "some-patient-id",
                "reason": "Emergency"
            },
            headers={"Authorization": f"Bearer {nurse_token}"}
        )
        assert response.status_code == 200
        
        # Verify audit log
        events = await audit_service.get_recent_events(
            event_type="BREAK_GLASS_REQUEST"
        )
        assert len(events) > 0

class TestPHIEncryption:
    """Security tests for PHI encryption"""
    
    async def test_phi_encrypted_at_rest(self, db_session):
        """Verify PHI is encrypted in database"""
        # Query raw database
        result = await db_session.execute(
            "SELECT first_name_encrypted FROM patients LIMIT 1"
        )
        row = result.fetchone()
        
        # Should be binary (encrypted), not plaintext
        assert isinstance(row[0], bytes)
        assert b"John" not in row[0]  # Name shouldn't be visible
    
    async def test_sensitive_fields_not_in_logs(self, caplog):
        """Verify sensitive data not logged"""
        # Perform operation that might log
        # ...
        
        # Check logs don't contain sensitive data
        sensitive_patterns = ["ssn", "123-45-6789", "password"]
        for pattern in sensitive_patterns:
            assert pattern not in caplog.text.lower()

class TestAuditLogging:
    """Security tests for audit logging"""
    
    async def test_all_phi_access_logged(
        self,
        client: AsyncClient,
        physician_token: str,
        patient_id: str,
        audit_service: AuditService
    ):
        """Test all PHI access is logged"""
        # Access patient
        await client.get(
            f"/api/v1/patients/{patient_id}",
            headers={"Authorization": f"Bearer {physician_token}"}
        )
        
        # Verify audit log exists
        events = await audit_service.get_patient_access_logs(patient_id)
        assert len(events) > 0
        
        latest = events[0]
        assert latest.patient_id == patient_id
        assert latest.event_type == "PHI_VIEW"

6.2 Compliance Checklist

Access Control (§164.312(a))
  • Unique user identification
  • Emergency access procedure (break-glass)
  • Automatic logoff
  • Encryption and decryption
Audit Controls (§164.312(b))
  • Audit log mechanism implemented
  • Logs retained for 6 years
  • Log integrity verification
  • Regular log review process
Integrity (§164.312(c))
  • Authentication of ePHI
  • Change detection mechanism
Transmission Security (§164.312(e))
  • Integrity controls
  • Encryption in transit (TLS 1.2+)
Administrative Safeguards
  • Risk analysis documented
  • Risk management plan
  • Sanction policy
  • Information system activity review
Technical Policies
  • Access authorization policy
  • Workstation use policy
  • Device and media controls
  • Audit controls policy

Phase 6 Deliverables

  • Complete security test suite
  • Penetration testing report (if applicable)
  • Compliance checklist completed
  • Risk assessment documentation
  • Policies and procedures documentation
  • User training materials

Grading Rubric

ComponentWeightCriteria
Architecture & Design15%Clean architecture, security-first design, proper PHI handling
Encryption Implementation20%Proper encryption at rest and in transit, key management
Access Control20%RBAC, minimum necessary, break-glass procedures
Audit Logging15%Comprehensive logging, PHI access tracking, reporting
API Security10%Secure endpoints, validation, rate limiting
Infrastructure10%HIPAA-eligible services, proper configuration
Testing & Documentation10%Security tests, compliance documentation

Submission Requirements

  1. Code Repository
    • Complete source code
    • README with setup instructions
    • Environment configuration (without secrets)
  2. Documentation
    • Architecture diagram
    • Security controls documentation
    • Risk assessment summary
    • Policies and procedures
  3. Demo
    • Working deployment (local or cloud)
    • Walkthrough of security features
    • Audit log demonstration
  4. Testing
    • Security test results
    • Compliance checklist (completed)
    • Any penetration testing results

Congratulations!

Upon completing this capstone, you will have:

Built Real Security

A production-ready HIPAA-compliant healthcare application

Practical Experience

Hands-on implementation of encryption, access control, and auditing

Compliance Knowledge

Deep understanding of HIPAA technical requirements

Portfolio Project

A substantial project demonstrating healthcare security expertise
Good luck with your capstone project!