Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

HIPAA Compliance Implementation Guide

This guide provides a complete reference implementation combining all concepts from previous modules into a production-ready healthcare application.
What You’ll Build:
  • HIPAA-compliant FastAPI backend
  • Encrypted database layer
  • Audit logging system
  • E2E encrypted chat with AI
  • Complete access control

Project Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                    HIPAA-COMPLIANT APPLICATION ARCHITECTURE                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   CLIENT LAYER                                                              │
│   ────────────                                                              │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  React/Mobile App                                                    │   │
│   │  • E2E encryption keys                                               │   │
│   │  • Local key storage                                                 │   │
│   │  • TLS 1.3 connections                                               │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   API LAYER                                                                 │
│   ─────────                                                                 │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  FastAPI Application                                                 │   │
│   │  ├── Authentication (JWT + MFA)                                      │   │
│   │  ├── Authorization (RBAC)                                            │   │
│   │  ├── Audit Middleware                                                │   │
│   │  ├── Encryption Service                                              │   │
│   │  └── AI Router                                                       │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   DATA LAYER                                                                │
│   ──────────                                                                │
│   ┌───────────────┐  ┌───────────────┐  ┌───────────────┐                  │
│   │  PostgreSQL   │  │  Redis        │  │  S3           │                  │
│   │  (Encrypted)  │  │  (Sessions)   │  │  (Documents)  │                  │
│   └───────────────┘  └───────────────┘  └───────────────┘                  │
│                                                                              │
│   INFRASTRUCTURE                                                            │
│   ──────────────                                                            │
│   ┌───────────────┐  ┌───────────────┐  ┌───────────────┐                  │
│   │  HashiCorp    │  │  On-Premise   │  │  OpenTelemetry│                  │
│   │  Vault        │  │  LLM          │  │  (Logging)    │                  │
│   └───────────────┘  └───────────────┘  └───────────────┘                  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Project Structure

hipaa-healthcare-api/
├── app/
│   ├── __init__.py
│   ├── main.py                 # FastAPI application entry
│   ├── config.py               # Configuration management
│   ├── dependencies.py         # Dependency injection
│   │
│   ├── api/
│   │   ├── __init__.py
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── auth.py         # Authentication endpoints
│   │   │   ├── patients.py     # Patient CRUD
│   │   │   ├── records.py      # Medical records
│   │   │   ├── chat.py         # E2E encrypted chat
│   │   │   └── ai.py           # AI endpoints
│   │   └── router.py
│   │
│   ├── core/
│   │   ├── __init__.py
│   │   ├── security.py         # Auth, JWT, MFA
│   │   ├── encryption.py       # Encryption service
│   │   ├── audit.py            # Audit logging
│   │   └── rbac.py             # Access control
│   │
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── patient.py
│   │   ├── record.py
│   │   └── audit.py
│   │
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── patient.py
│   │   └── record.py
│   │
│   ├── services/
│   │   ├── __init__.py
│   │   ├── patient_service.py
│   │   ├── record_service.py
│   │   ├── chat_service.py
│   │   └── ai_service.py
│   │
│   └── middleware/
│       ├── __init__.py
│       ├── audit.py
│       ├── security.py
│       └── rate_limit.py

├── tests/
├── alembic/                    # Database migrations
├── docker-compose.yml
├── Dockerfile
└── requirements.txt

Core Configuration

Environment Configuration

# app/config.py

from pydantic_settings import BaseSettings
from pydantic import SecretStr
from functools import lru_cache
from typing import Optional

class Settings(BaseSettings):
    """HIPAA-compliant application settings"""
    
    # Application
    app_name: str = "HIPAA Healthcare API"
    environment: str = "production"
    debug: bool = False
    
    # Security
    secret_key: SecretStr
    jwt_algorithm: str = "RS256"
    jwt_private_key_path: str = "/secrets/jwt-private.pem"
    jwt_public_key_path: str = "/secrets/jwt-public.pem"
    access_token_expire_minutes: int = 15
    refresh_token_expire_days: int = 7
    
    # Database
    database_url: SecretStr
    database_encryption_key_id: str
    
    # Encryption
    vault_addr: str
    vault_token: SecretStr
    kms_key_id: str
    
    # Redis (Sessions)
    redis_url: SecretStr
    session_timeout_seconds: int = 900  # 15 minutes (HIPAA)
    
    # Audit
    audit_log_retention_days: int = 2555  # 7 years
    
    # AI
    llm_model_path: str = "/models/medical-llm"
    ai_processing_mode: str = "on_premise"  # on_premise, hybrid
    
    # MFA
    mfa_issuer: str = "HIPAA Healthcare"
    mfa_required: bool = True
    
    class Config:
        env_file = ".env"
        case_sensitive = False

@lru_cache()
def get_settings() -> Settings:
    return Settings()

Main Application

# app/main.py

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from contextlib import asynccontextmanager
import logging

from app.config import get_settings
from app.api.router import api_router
from app.middleware.audit import AuditMiddleware
from app.middleware.security import SecurityHeadersMiddleware
from app.core.encryption import EncryptionService
from app.core.audit import AuditLogger

settings = get_settings()
logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifecycle management"""
    # Startup
    logger.info("Starting HIPAA-compliant healthcare API")
    
    # Initialize encryption service
    app.state.encryption = EncryptionService(
        vault_addr=settings.vault_addr,
        vault_token=settings.vault_token.get_secret_value(),
    )
    await app.state.encryption.initialize()
    
    # Initialize audit logger
    app.state.audit = AuditLogger(
        retention_days=settings.audit_log_retention_days
    )
    await app.state.audit.start()
    
    logger.info("Application started successfully")
    
    yield
    
    # Shutdown
    logger.info("Shutting down application")
    await app.state.audit.stop()
    await app.state.encryption.close()

app = FastAPI(
    title=settings.app_name,
    lifespan=lifespan,
    docs_url="/docs" if settings.debug else None,
    redoc_url="/redoc" if settings.debug else None,
)

# Security middleware
if settings.environment == "production":
    app.add_middleware(HTTPSRedirectMiddleware)

app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(AuditMiddleware)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.healthcare.example.com"],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["*"],
)

# API routes
app.include_router(api_router, prefix="/api/v1")

@app.get("/health")
async def health_check():
    """Health check endpoint (no PHI)"""
    return {"status": "healthy"}

Authentication & Authorization

Security Module

# app/core/security.py

from datetime import datetime, timedelta
from typing import Optional, List
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel
import pyotp
import secrets

from app.config import get_settings
from app.models.user import User

settings = get_settings()
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")

class TokenPayload(BaseModel):
    sub: str
    exp: datetime
    iat: datetime
    jti: str
    role: str
    permissions: List[str]
    mfa_verified: bool = False

class AuthService:
    """Authentication service with MFA support"""
    
    def __init__(self):
        with open(settings.jwt_private_key_path, "r") as f:
            self.private_key = f.read()
        with open(settings.jwt_public_key_path, "r") as f:
            self.public_key = f.read()
    
    def hash_password(self, password: str) -> str:
        """Hash password using Argon2id"""
        return pwd_context.hash(password)
    
    def verify_password(self, plain: str, hashed: str) -> bool:
        """Verify password against hash"""
        return pwd_context.verify(plain, hashed)
    
    def generate_mfa_secret(self) -> str:
        """Generate TOTP secret for MFA"""
        return pyotp.random_base32()
    
    def verify_mfa_token(self, secret: str, token: str) -> bool:
        """Verify TOTP token"""
        totp = pyotp.TOTP(secret)
        return totp.verify(token, valid_window=1)
    
    def get_mfa_provisioning_uri(self, secret: str, email: str) -> str:
        """Get provisioning URI for authenticator apps"""
        totp = pyotp.TOTP(secret)
        return totp.provisioning_uri(
            name=email,
            issuer_name=settings.mfa_issuer
        )
    
    def create_access_token(
        self,
        user: User,
        mfa_verified: bool = False
    ) -> str:
        """Create JWT access token"""
        
        now = datetime.utcnow()
        payload = TokenPayload(
            sub=str(user.id),
            exp=now + timedelta(minutes=settings.access_token_expire_minutes),
            iat=now,
            jti=secrets.token_hex(16),
            role=user.role,
            permissions=user.permissions,
            mfa_verified=mfa_verified,
        )
        
        return jwt.encode(
            payload.model_dump(),
            self.private_key,
            algorithm=settings.jwt_algorithm
        )
    
    def decode_token(self, token: str) -> TokenPayload:
        """Decode and validate JWT token"""
        
        try:
            payload = jwt.decode(
                token,
                self.public_key,
                algorithms=[settings.jwt_algorithm]
            )
            return TokenPayload(**payload)
        except JWTError as e:
            raise AuthenticationError(f"Invalid token: {e}")
    
    def create_refresh_token(self, user_id: str) -> str:
        """Create refresh token"""
        return secrets.token_urlsafe(64)


class AuthenticationError(Exception):
    pass

RBAC Implementation

# app/core/rbac.py

from enum import Enum
from typing import Set, Optional
from functools import wraps
from fastapi import HTTPException, Depends

class Role(str, Enum):
    PATIENT = "patient"
    PROVIDER = "provider"
    NURSE = "nurse"
    ADMIN = "admin"
    AUDITOR = "auditor"

class Permission(str, Enum):
    # Patient permissions
    READ_OWN_RECORDS = "read:own_records"
    UPDATE_OWN_PROFILE = "update:own_profile"
    
    # Provider permissions
    READ_PATIENT_RECORDS = "read:patient_records"
    WRITE_PATIENT_RECORDS = "write:patient_records"
    PRESCRIBE_MEDICATIONS = "prescribe:medications"
    
    # Admin permissions
    MANAGE_USERS = "manage:users"
    VIEW_AUDIT_LOGS = "view:audit_logs"
    MANAGE_ROLES = "manage:roles"
    
    # Break glass
    BREAK_GLASS_ACCESS = "break_glass:access"

# Role to permissions mapping
ROLE_PERMISSIONS: dict[Role, Set[Permission]] = {
    Role.PATIENT: {
        Permission.READ_OWN_RECORDS,
        Permission.UPDATE_OWN_PROFILE,
    },
    Role.NURSE: {
        Permission.READ_PATIENT_RECORDS,
    },
    Role.PROVIDER: {
        Permission.READ_PATIENT_RECORDS,
        Permission.WRITE_PATIENT_RECORDS,
        Permission.PRESCRIBE_MEDICATIONS,
    },
    Role.ADMIN: {
        Permission.MANAGE_USERS,
        Permission.VIEW_AUDIT_LOGS,
        Permission.MANAGE_ROLES,
    },
    Role.AUDITOR: {
        Permission.VIEW_AUDIT_LOGS,
    },
}

class RBACService:
    """Role-Based Access Control service"""
    
    def __init__(self, audit_logger):
        self.audit = audit_logger
    
    def get_permissions(self, role: Role) -> Set[Permission]:
        """Get permissions for a role"""
        return ROLE_PERMISSIONS.get(role, set())
    
    async def check_permission(
        self,
        user_id: str,
        required_permission: Permission,
        resource_id: Optional[str] = None,
    ) -> bool:
        """Check if user has required permission"""
        
        user = await self._get_user(user_id)
        permissions = self.get_permissions(Role(user.role))
        
        has_permission = required_permission in permissions
        
        # Log access check
        await self.audit.log({
            "event_type": "ACCESS_CHECK",
            "user_id": user_id,
            "permission": required_permission.value,
            "resource_id": resource_id,
            "granted": has_permission,
        })
        
        return has_permission
    
    async def check_patient_access(
        self,
        user_id: str,
        patient_id: str,
    ) -> bool:
        """Check if user can access patient records"""
        
        user = await self._get_user(user_id)
        
        # Patients can only access their own records
        if user.role == Role.PATIENT:
            return str(user.patient_id) == patient_id
        
        # Providers check care team assignment
        if user.role in [Role.PROVIDER, Role.NURSE]:
            return await self._is_on_care_team(user_id, patient_id)
        
        return False
    
    async def break_glass_access(
        self,
        user_id: str,
        patient_id: str,
        reason: str,
    ) -> bool:
        """
        Emergency access override (break glass)
        
        Use only in emergency situations.
        Triggers immediate alerts and audit.
        """
        
        user = await self._get_user(user_id)
        
        if Permission.BREAK_GLASS_ACCESS not in self.get_permissions(Role(user.role)):
            return False
        
        # Log break glass access
        await self.audit.log({
            "event_type": "BREAK_GLASS_ACCESS",
            "user_id": user_id,
            "patient_id": patient_id,
            "reason": reason,
            "sensitivity": "CRITICAL",
        })
        
        # Trigger alerts
        await self._send_break_glass_alert(user_id, patient_id, reason)
        
        return True


# Dependency for permission checking
def require_permission(permission: Permission):
    """FastAPI dependency for permission checking"""
    
    async def permission_checker(
        current_user = Depends(get_current_user),
        rbac: RBACService = Depends(get_rbac_service),
    ):
        if not await rbac.check_permission(
            user_id=str(current_user.id),
            required_permission=permission,
        ):
            raise HTTPException(
                status_code=403,
                detail="Insufficient permissions"
            )
        return current_user
    
    return permission_checker

Patient Records API

Patient Endpoints

# app/api/v1/patients.py

from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional
from uuid import UUID

from app.core.security import get_current_user
from app.core.rbac import require_permission, Permission
from app.core.encryption import EncryptionService
from app.core.audit import AuditLogger
from app.schemas.patient import (
    PatientCreate,
    PatientUpdate,
    PatientResponse,
    MedicalRecordCreate,
    MedicalRecordResponse,
)
from app.services.patient_service import PatientService

router = APIRouter(prefix="/patients", tags=["Patients"])

@router.post("/", response_model=PatientResponse)
async def create_patient(
    patient_data: PatientCreate,
    current_user = Depends(require_permission(Permission.WRITE_PATIENT_RECORDS)),
    patient_service: PatientService = Depends(),
    audit: AuditLogger = Depends(),
):
    """Create a new patient record"""
    
    patient = await patient_service.create_patient(
        data=patient_data,
        created_by=current_user.id,
    )
    
    await audit.log({
        "event_type": "PHI_CREATE",
        "resource_type": "patient",
        "resource_id": str(patient.id),
        "actor_id": str(current_user.id),
    })
    
    return patient


@router.get("/{patient_id}", response_model=PatientResponse)
async def get_patient(
    patient_id: UUID,
    current_user = Depends(get_current_user),
    patient_service: PatientService = Depends(),
    rbac = Depends(),
    audit: AuditLogger = Depends(),
):
    """Get patient by ID"""
    
    # Check access
    if not await rbac.check_patient_access(
        user_id=str(current_user.id),
        patient_id=str(patient_id),
    ):
        raise HTTPException(403, "Access denied to patient records")
    
    patient = await patient_service.get_patient(patient_id)
    if not patient:
        raise HTTPException(404, "Patient not found")
    
    await audit.log({
        "event_type": "PHI_VIEW",
        "resource_type": "patient",
        "resource_id": str(patient_id),
        "patient_id": str(patient_id),
        "actor_id": str(current_user.id),
        "fields_accessed": ["demographics", "contact_info"],
    })
    
    return patient


@router.get("/{patient_id}/records", response_model=List[MedicalRecordResponse])
async def get_patient_records(
    patient_id: UUID,
    record_type: Optional[str] = Query(None),
    start_date: Optional[str] = Query(None),
    end_date: Optional[str] = Query(None),
    current_user = Depends(get_current_user),
    patient_service: PatientService = Depends(),
    rbac = Depends(),
    audit: AuditLogger = Depends(),
):
    """Get patient's medical records"""
    
    if not await rbac.check_patient_access(
        user_id=str(current_user.id),
        patient_id=str(patient_id),
    ):
        raise HTTPException(403, "Access denied")
    
    records = await patient_service.get_medical_records(
        patient_id=patient_id,
        record_type=record_type,
        start_date=start_date,
        end_date=end_date,
    )
    
    await audit.log({
        "event_type": "PHI_VIEW",
        "resource_type": "medical_records",
        "patient_id": str(patient_id),
        "actor_id": str(current_user.id),
        "records_accessed": len(records),
        "filters": {
            "record_type": record_type,
            "date_range": f"{start_date} - {end_date}",
        },
    })
    
    return records


@router.post("/{patient_id}/records", response_model=MedicalRecordResponse)
async def create_medical_record(
    patient_id: UUID,
    record_data: MedicalRecordCreate,
    current_user = Depends(require_permission(Permission.WRITE_PATIENT_RECORDS)),
    patient_service: PatientService = Depends(),
    encryption: EncryptionService = Depends(),
    audit: AuditLogger = Depends(),
):
    """Create a new medical record"""
    
    # Encrypt sensitive fields
    encrypted_data = await encryption.encrypt_record(
        record_data.model_dump(),
        context={"patient_id": str(patient_id)},
    )
    
    record = await patient_service.create_medical_record(
        patient_id=patient_id,
        data=encrypted_data,
        created_by=current_user.id,
    )
    
    await audit.log({
        "event_type": "PHI_CREATE",
        "resource_type": "medical_record",
        "resource_id": str(record.id),
        "patient_id": str(patient_id),
        "actor_id": str(current_user.id),
        "record_type": record_data.record_type,
    })
    
    return record

Patient Service

# app/services/patient_service.py

from typing import List, Optional
from uuid import UUID
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

from app.models.patient import Patient, MedicalRecord
from app.schemas.patient import PatientCreate, MedicalRecordCreate
from app.core.encryption import EncryptionService

class PatientService:
    """Service for patient operations"""
    
    def __init__(
        self,
        db: AsyncSession,
        encryption: EncryptionService,
    ):
        self.db = db
        self.encryption = encryption
    
    async def create_patient(
        self,
        data: PatientCreate,
        created_by: UUID,
    ) -> Patient:
        """Create a new patient with encrypted PHI"""
        
        # Encrypt PHI fields
        encrypted_ssn = await self.encryption.encrypt_field(
            data.ssn,
            field_name="ssn",
        )
        encrypted_name = await self.encryption.encrypt_field(
            data.full_name,
            field_name="name",
        )
        
        patient = Patient(
            mrn=self._generate_mrn(),
            ssn_encrypted=encrypted_ssn,
            name_encrypted=encrypted_name,
            date_of_birth=data.date_of_birth,
            gender=data.gender,
            contact_info_encrypted=await self.encryption.encrypt_field(
                data.contact_info.model_dump_json(),
                field_name="contact_info",
            ),
            created_by=created_by,
            created_at=datetime.utcnow(),
        )
        
        self.db.add(patient)
        await self.db.commit()
        await self.db.refresh(patient)
        
        return patient
    
    async def get_patient(self, patient_id: UUID) -> Optional[Patient]:
        """Get patient with decrypted fields"""
        
        result = await self.db.execute(
            select(Patient).where(Patient.id == patient_id)
        )
        patient = result.scalar_one_or_none()
        
        if patient:
            # Decrypt fields for response
            patient.ssn = await self.encryption.decrypt_field(
                patient.ssn_encrypted,
                field_name="ssn",
            )
            patient.full_name = await self.encryption.decrypt_field(
                patient.name_encrypted,
                field_name="name",
            )
        
        return patient
    
    async def get_medical_records(
        self,
        patient_id: UUID,
        record_type: Optional[str] = None,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None,
    ) -> List[MedicalRecord]:
        """Get patient's medical records with filtering"""
        
        query = select(MedicalRecord).where(
            MedicalRecord.patient_id == patient_id
        )
        
        if record_type:
            query = query.where(MedicalRecord.record_type == record_type)
        if start_date:
            query = query.where(MedicalRecord.created_at >= start_date)
        if end_date:
            query = query.where(MedicalRecord.created_at <= end_date)
        
        result = await self.db.execute(query)
        records = result.scalars().all()
        
        # Decrypt each record
        for record in records:
            record.content = await self.encryption.decrypt_field(
                record.content_encrypted,
                field_name="content",
            )
        
        return records
    
    def _generate_mrn(self) -> str:
        """Generate unique medical record number"""
        import secrets
        return f"MRN-{secrets.token_hex(8).upper()}"

E2E Encrypted Chat

Chat Endpoints

# app/api/v1/chat.py

from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
from typing import List
from uuid import UUID

from app.core.security import get_current_user, ws_authenticate
from app.core.audit import AuditLogger
from app.services.chat_service import ChatService
from app.schemas.chat import (
    MessageSend,
    MessageResponse,
    ConversationResponse,
)

router = APIRouter(prefix="/chat", tags=["Chat"])

@router.get("/conversations", response_model=List[ConversationResponse])
async def get_conversations(
    current_user = Depends(get_current_user),
    chat_service: ChatService = Depends(),
):
    """Get user's conversations"""
    return await chat_service.get_conversations(current_user.id)


@router.post("/messages", response_model=MessageResponse)
async def send_message(
    message: MessageSend,
    current_user = Depends(get_current_user),
    chat_service: ChatService = Depends(),
    audit: AuditLogger = Depends(),
):
    """
    Send E2E encrypted message
    
    Note: Server only sees encrypted ciphertext.
    Decryption happens on client devices.
    """
    
    result = await chat_service.send_message(
        sender_id=current_user.id,
        recipient_id=message.recipient_id,
        encrypted_content=message.encrypted_content,
        session_id=message.session_id,
    )
    
    await audit.log({
        "event_type": "MESSAGE_SENT",
        "actor_id": str(current_user.id),
        "recipient_id": str(message.recipient_id),
        "message_size": len(message.encrypted_content),
        # Note: We do NOT log message content
    })
    
    return result


@router.websocket("/ws")
async def chat_websocket(
    websocket: WebSocket,
    chat_service: ChatService = Depends(),
    audit: AuditLogger = Depends(),
):
    """WebSocket for real-time E2E encrypted chat"""
    
    # Authenticate WebSocket connection
    user = await ws_authenticate(websocket)
    if not user:
        await websocket.close(code=4001)
        return
    
    await websocket.accept()
    
    await audit.log({
        "event_type": "CHAT_SESSION_START",
        "actor_id": str(user.id),
    })
    
    try:
        # Register connection
        await chat_service.connect(user.id, websocket)
        
        while True:
            # Receive encrypted message
            data = await websocket.receive_json()
            
            # Forward encrypted message (no decryption on server)
            await chat_service.handle_message(
                sender_id=user.id,
                data=data,
            )
            
    except WebSocketDisconnect:
        await chat_service.disconnect(user.id)
        await audit.log({
            "event_type": "CHAT_SESSION_END",
            "actor_id": str(user.id),
        })

Chat Service

# app/services/chat_service.py

from typing import Dict, Optional
from uuid import UUID
from fastapi import WebSocket
from datetime import datetime
import json

from app.core.encryption import EncryptionService
from app.models.message import Message, Conversation

class ChatService:
    """
    E2E Encrypted Chat Service
    
    Key principle: Server NEVER sees plaintext messages.
    All encryption/decryption happens on client devices.
    """
    
    def __init__(self, db, encryption: EncryptionService):
        self.db = db
        self.encryption = encryption
        self.connections: Dict[UUID, WebSocket] = {}
    
    async def connect(self, user_id: UUID, websocket: WebSocket):
        """Register WebSocket connection"""
        self.connections[user_id] = websocket
    
    async def disconnect(self, user_id: UUID):
        """Remove WebSocket connection"""
        self.connections.pop(user_id, None)
    
    async def send_message(
        self,
        sender_id: UUID,
        recipient_id: UUID,
        encrypted_content: dict,
        session_id: str,
    ) -> Message:
        """
        Store and forward encrypted message
        
        The encrypted_content contains:
        - ciphertext: E2E encrypted message
        - header: Signal protocol header (public DH key, etc.)
        - nonce: Encryption nonce
        """
        
        # Store encrypted message
        message = Message(
            conversation_id=await self._get_or_create_conversation(
                sender_id, recipient_id
            ),
            sender_id=sender_id,
            recipient_id=recipient_id,
            encrypted_content=json.dumps(encrypted_content),
            session_id=session_id,
            created_at=datetime.utcnow(),
        )
        
        self.db.add(message)
        await self.db.commit()
        
        # Forward to recipient if online
        if recipient_id in self.connections:
            await self.connections[recipient_id].send_json({
                "type": "message",
                "message_id": str(message.id),
                "sender_id": str(sender_id),
                "encrypted_content": encrypted_content,
                "timestamp": message.created_at.isoformat(),
            })
        
        return message
    
    async def handle_message(self, sender_id: UUID, data: dict):
        """Handle incoming WebSocket message"""
        
        msg_type = data.get("type")
        
        if msg_type == "message":
            await self.send_message(
                sender_id=sender_id,
                recipient_id=UUID(data["recipient_id"]),
                encrypted_content=data["encrypted_content"],
                session_id=data["session_id"],
            )
        
        elif msg_type == "key_exchange":
            # Handle Signal protocol key exchange
            await self._handle_key_exchange(sender_id, data)
        
        elif msg_type == "typing":
            # Forward typing indicator (no PHI)
            await self._forward_typing(sender_id, UUID(data["recipient_id"]))
    
    async def get_messages(
        self,
        user_id: UUID,
        conversation_id: UUID,
        limit: int = 50,
    ) -> list:
        """Get encrypted messages for a conversation"""
        
        # Verify user is part of conversation
        conv = await self.db.get(Conversation, conversation_id)
        if user_id not in [conv.participant1_id, conv.participant2_id]:
            raise PermissionError("Not a participant in this conversation")
        
        messages = await self.db.execute(
            select(Message)
            .where(Message.conversation_id == conversation_id)
            .order_by(Message.created_at.desc())
            .limit(limit)
        )
        
        # Return encrypted messages (client will decrypt)
        return [
            {
                "id": str(m.id),
                "sender_id": str(m.sender_id),
                "encrypted_content": json.loads(m.encrypted_content),
                "timestamp": m.created_at.isoformat(),
            }
            for m in messages.scalars()
        ]

AI Integration

AI Endpoints

# app/api/v1/ai.py

from fastapi import APIRouter, Depends, HTTPException
from typing import Optional
from uuid import UUID

from app.core.security import get_current_user
from app.core.rbac import require_permission, Permission
from app.core.audit import AuditLogger
from app.services.ai_service import AIService
from app.schemas.ai import (
    MedicalQuery,
    AIResponse,
    AIContextRequest,
)

router = APIRouter(prefix="/ai", tags=["AI Assistant"])

@router.post("/query", response_model=AIResponse)
async def medical_ai_query(
    query: MedicalQuery,
    current_user = Depends(get_current_user),
    ai_service: AIService = Depends(),
    audit: AuditLogger = Depends(),
):
    """
    Process medical query with AI
    
    Processing happens on-premise to protect PHI.
    """
    
    # Log AI query (not the content)
    await audit.log({
        "event_type": "AI_QUERY",
        "actor_id": str(current_user.id),
        "query_length": len(query.query),
        "has_context": bool(query.patient_context),
        "processing_mode": ai_service.processing_mode,
    })
    
    response = await ai_service.process_query(
        query=query.query,
        patient_context=query.patient_context,
        user_id=current_user.id,
    )
    
    await audit.log({
        "event_type": "AI_RESPONSE",
        "actor_id": str(current_user.id),
        "response_length": len(response.response),
    })
    
    return response


@router.post("/chat/{conversation_id}/ai-assist")
async def ai_assisted_chat(
    conversation_id: UUID,
    context: AIContextRequest,
    current_user = Depends(require_permission(Permission.READ_PATIENT_RECORDS)),
    ai_service: AIService = Depends(),
    audit: AuditLogger = Depends(),
):
    """
    AI assistance within E2E encrypted chat
    
    Flow:
    1. Client decrypts relevant messages
    2. Sends de-identified context to this endpoint
    3. AI generates response
    4. Client encrypts and sends to chat
    """
    
    # Verify context is de-identified
    if not ai_service.verify_deidentified(context.context):
        raise HTTPException(
            400,
            "Context contains identifiable information. Please de-identify."
        )
    
    response = await ai_service.generate_assistance(
        context=context.context,
        query=context.query,
    )
    
    await audit.log({
        "event_type": "AI_CHAT_ASSIST",
        "actor_id": str(current_user.id),
        "conversation_id": str(conversation_id),
    })
    
    return {"ai_response": response}

AI Service

# app/services/ai_service.py

from typing import Optional
import re

from app.config import get_settings
from app.schemas.ai import AIResponse

settings = get_settings()

class AIService:
    """
    HIPAA-compliant AI service
    
    Uses on-premise LLM for PHI processing.
    """
    
    def __init__(self, llm_client, deidentifier):
        self.llm = llm_client
        self.deidentifier = deidentifier
        self.processing_mode = settings.ai_processing_mode
    
    async def process_query(
        self,
        query: str,
        patient_context: Optional[str] = None,
        user_id: str = None,
    ) -> AIResponse:
        """Process medical query with on-premise LLM"""
        
        # Build prompt
        system_prompt = self._get_medical_system_prompt()
        
        full_prompt = f"""{system_prompt}

Patient Context: {patient_context or 'Not provided'}

Healthcare Provider Query: {query}

Medical AI Response:"""

        # Generate with on-premise LLM
        response_text = await self.llm.generate(
            prompt=full_prompt,
            max_tokens=1000,
            temperature=0.7,
        )
        
        return AIResponse(
            response=response_text,
            disclaimer="This AI response is for informational purposes only. "
                       "Please use clinical judgment and consult appropriate "
                       "medical resources for patient care decisions.",
            processing_mode=self.processing_mode,
        )
    
    def verify_deidentified(self, text: str) -> bool:
        """Verify text doesn't contain identifiable PHI"""
        
        # Check for common PHI patterns
        phi_patterns = [
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
            r'\b\d{2}/\d{2}/\d{4}\b',  # DOB
            r'\bMRN[:\s]*\d+\b',  # MRN
            r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',  # Email
            r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b',  # Phone
        ]
        
        for pattern in phi_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                return False
        
        return True
    
    def _get_medical_system_prompt(self) -> str:
        return """You are a medical AI assistant operating in a HIPAA-compliant 
healthcare environment. You are assisting licensed healthcare providers.

GUIDELINES:
1. Provide evidence-based medical information
2. Reference clinical guidelines when applicable
3. Highlight important safety considerations
4. Suggest relevant differential diagnoses when appropriate
5. Recommend appropriate follow-up or specialist referrals
6. Always defer final clinical decisions to the healthcare provider

IMPORTANT: This is a clinical decision support tool. All recommendations 
should be verified against current medical literature and clinical judgment.
"""

Docker Deployment

Docker Compose

# docker-compose.yml

version: '3.8'

services:
  api:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://hipaa:${DB_PASSWORD}@db:5432/healthcare
      - REDIS_URL=redis://:${REDIS_PASSWORD}@redis:6379
      - VAULT_ADDR=http://vault:8200
    depends_on:
      - db
      - redis
      - vault
    volumes:
      - ./secrets:/secrets:ro
    networks:
      - hipaa-network
    deploy:
      resources:
        limits:
          memory: 2G

  db:
    image: postgres:15
    environment:
      POSTGRES_USER: hipaa
      POSTGRES_PASSWORD: ${DB_PASSWORD}
      POSTGRES_DB: healthcare
    volumes:
      - postgres-data:/var/lib/postgresql/data
      - ./init-db.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - hipaa-network
    # Enable TDE
    command: >
      postgres
      -c ssl=on
      -c ssl_cert_file=/var/lib/postgresql/server.crt
      -c ssl_key_file=/var/lib/postgresql/server.key

  redis:
    image: redis:7-alpine
    command: redis-server --requirepass ${REDIS_PASSWORD} --appendonly yes
    volumes:
      - redis-data:/data
    networks:
      - hipaa-network

  vault:
    image: hashicorp/vault:latest
    environment:
      VAULT_DEV_ROOT_TOKEN_ID: ${VAULT_TOKEN}
    cap_add:
      - IPC_LOCK
    volumes:
      - vault-data:/vault/data
    networks:
      - hipaa-network

  llm:
    image: vllm/vllm-openai:latest
    runtime: nvidia
    environment:
      MODEL_NAME: /models/medical-llm
    volumes:
      - ./models:/models:ro
    networks:
      - hipaa-network
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: all
              capabilities: [gpu]

networks:
  hipaa-network:
    driver: bridge

volumes:
  postgres-data:
  redis-data:
  vault-data:

Dockerfile

# Dockerfile

FROM python:3.11-slim as builder

WORKDIR /app

# Install build dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt

# Production stage
FROM python:3.11-slim

WORKDIR /app

# Create non-root user
RUN groupadd -r hipaa && useradd -r -g hipaa hipaa

# Install runtime dependencies
RUN apt-get update && apt-get install -y \
    libpq5 \
    && rm -rf /var/lib/apt/lists/*

# Copy wheels and install
COPY --from=builder /app/wheels /wheels
RUN pip install --no-cache /wheels/*

# Copy application
COPY app /app/app

# Set ownership
RUN chown -R hipaa:hipaa /app

USER hipaa

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

Key Takeaways

Defense in Depth

Multiple security layers: TLS, encryption, RBAC, audit

Encrypt at Field Level

Don’t just encrypt the database; encrypt sensitive fields

Audit Everything

Every PHI access must be logged without logging PHI content

AI Stays On-Premise

Keep LLM processing within your infrastructure

Next Steps

Compliance Checklist

Verify your implementation against HIPAA requirements

Course Overview

Return to course overview

Interview Deep-Dive

Strong Answer:
  • Authentication and authorization: Does the endpoint require authentication (JWT validation)? Does it check RBAC permissions before returning data? Is the authorization check server-side (not relying on client-side role checks)? Does it verify the requesting user has a legitimate relationship to the requested patient (not just a valid token)?
  • Data exposure: Does the API response include only the minimum necessary fields? A GET /patients/:id endpoint should return different field sets based on the caller’s role — a billing specialist gets billing fields, a clinician gets clinical fields. Watch for serializers that dump the entire database model into the response.
  • Encryption: Are any PHI fields being returned in plaintext that should be encrypted in transit? Is the endpoint exclusively served over HTTPS (no HTTP fallback)? If the endpoint writes data, is field-level encryption applied before database insertion?
  • Audit logging: Is every access to this endpoint audit-logged? Does the log capture the actor, the action, the resource, and the outcome? Are failed authorization attempts logged separately as security events? If this endpoint modifies data, are the old and new values captured in the audit trail?
  • Input validation: Is the input validated and sanitized to prevent SQL injection and XSS (which could lead to PHI exposure)? Are path parameters and query parameters validated against expected formats?
  • Error handling: Do error responses avoid leaking PHI or system internals? A 404 response for a patient that exists but the user cannot access should be indistinguishable from a 404 for a patient that does not exist (to prevent patient enumeration). Stack traces with PHI in request bodies should never appear in error responses.
  • Rate limiting: Is the endpoint rate-limited to prevent bulk data harvesting? A compromised token should not be able to iterate through all patient IDs.
Follow-up: The developer argues that adding audit logging to every endpoint slows down the API and they want to skip it for “read-only” endpoints. What is your response?Read-only endpoints are exactly the ones that need audit logging the most. The HIPAA Security Rule requires tracking all access to PHI, and the most common audit finding is “who viewed what and when.” Snooping — unauthorized viewing of records without modification — is one of the most frequent HIPAA violations (Montefiore Medical Center’s $4.75M fine was for exactly this). For the performance concern, audit logging should be asynchronous: write the log event to an in-memory buffer, return the API response immediately, and flush the buffer to the database in the background every few seconds. The added latency to the API call is microseconds (the time to append to an in-memory list), not milliseconds. If the developer is seeing performance impact, the logging implementation needs optimization, not removal.
Strong Answer:
  • Authentication flow: Step 1 — the user submits username and password over TLS. The password is verified against an Argon2id hash (not bcrypt, not SHA-256 — Argon2id is the current NIST recommendation for password hashing, resistant to both GPU and ASIC attacks). Step 2 — if credentials are valid, issue a temporary MFA challenge. The user provides a TOTP code from their authenticator app (or a WebAuthn/FIDO2 hardware key for higher security). Step 3 — if MFA succeeds, issue a short-lived JWT access token (15 minutes) and a longer-lived refresh token (7 days, stored server-side in Redis, not in a cookie).
  • Token architecture: The access token is a signed JWT (RS256 with a rotating key pair) containing the user ID, roles, and session ID. It does not contain PHI. The access token is stateless — the API validates it without a database call. The refresh token is an opaque random string stored in Redis with the session metadata. When the access token expires, the client sends the refresh token to get a new access token. This refresh operation validates that the session is still active, the user is not locked, and the refresh token has not been revoked.
  • Session management: Each user has at most N concurrent sessions (configurable, default 3). New logins beyond the limit either reject or terminate the oldest session. Session metadata in Redis tracks: user ID, login time, last activity time, IP address, user agent, and MFA verification status. The session timeout is 15 minutes of inactivity (HIPAA automatic logoff). Activity (any API call) resets the inactivity timer by updating last_activity in Redis.
  • Session termination triggers: (1) Explicit logout — delete the session from Redis and add the current access token to a short-lived denylist. (2) Inactivity timeout — a background job sweeps Redis for expired sessions. (3) Password change — terminate all sessions for the user. (4) Account lockout (5 failed login attempts) — terminate all sessions. (5) Administrative revocation — the security team can terminate any user’s sessions immediately.
  • The refresh token rotation pattern: every time a refresh token is used, issue a new one and invalidate the old one. If an attacker steals a refresh token and the legitimate user also uses it, the second use of the old token triggers a security alert and terminates all sessions for that user (detects token theft).
Follow-up: A developer wants to store the refresh token in an HttpOnly cookie for convenience. What are the security considerations?HttpOnly cookies are actually better than localStorage for refresh tokens from a security perspective — they are immune to XSS attacks (JavaScript cannot read them). But there are HIPAA-relevant considerations: set Secure flag (cookie only sent over HTTPS), SameSite=Strict (prevents CSRF in most cases), and add explicit CSRF protection (double-submit cookie or synchronizer token) for state-changing requests. The cookie domain should be scoped as narrowly as possible. Set a reasonable expiration that matches the refresh token lifetime. The concern I would have is cross-origin scenarios: if the patient portal and provider portal are on different subdomains, cookie scoping gets complex. For a single-domain application, HttpOnly cookies for refresh tokens are the right choice. For multi-domain architectures, an in-memory approach with a dedicated token endpoint is cleaner.
Strong Answer:
  • Detection layer: The audit middleware captures every API call with the actor, resource, timestamp, and response size. The alert service runs real-time rules against the audit stream. Rule one: more than 100 patient record accesses per hour from a single user (baseline is 15-20) triggers a high-severity alert. Rule two: sequential patient ID access (iterating through IDs programmatically) triggers a critical alert. Rule three: bulk data export endpoints accessed more than once per day triggers an alert.
  • Investigation layer: the correlation ID on every request links the suspicious API calls to a single session. From the session, I can see: the authenticated user, their IP address, the user agent (is it a browser or a script?), when the session started, and whether MFA was completed. The audit logs give me the exact sequence of API calls — which patients, which fields, what order, how fast. If the user agent shows a Python requests library or curl instead of a browser, that is strong evidence of automated exfiltration.
  • Containment layer: the application supports session-level and user-level lockout without restarting anything. Step 1: revoke the session in Redis (immediate, sub-second). The next API call with the access token checks the session validity and returns 401. Step 2: if the account may be compromised, disable the user account in the database and flush all their sessions from Redis. Step 3: if the source IP is suspicious, add it to the WAF blocklist through the API gateway.
  • Forensic layer: the complete audit trail gives the investigation team everything they need: exactly which patient records were accessed, which fields were viewed, whether any data was exported or printed, the complete request/response timeline, and the source of the session. Combined with network flow logs from the infrastructure layer, I can determine if data was exfiltrated to an external destination.
  • Post-incident: the affected patient list comes directly from the audit logs (SELECT DISTINCT patient_id FROM audit_logs WHERE session_id = X). This list feeds into the breach notification process. The root cause analysis examines how the attacker obtained valid credentials and whether existing controls (rate limiting, anomaly detection thresholds) need adjustment.
Follow-up: The investigation reveals the “attacker” is actually a physician using a personal script to export their panel of 500 patients before leaving the practice. Technically they have access, but the method is unauthorized. How do you handle this?This is an insider misuse scenario, not a credential compromise, but it is still a security incident with potential HIPAA implications. The physician has authorized access to these 500 patients but is using an unauthorized method (personal script instead of the approved application interface) and may be exporting data for unauthorized purposes (taking it to a new practice without following the proper records transfer process). Steps: (1) Terminate the script’s session immediately. (2) Engage HR and the physician’s department chief — this is a policy violation. (3) Determine what data was actually extracted and whether it left the organization’s control. (4) Conduct the four-factor breach assessment: the physician is a healthcare provider (lower risk recipient), the data was for patients they have a treatment relationship with, but the extraction method bypassed audit controls and DLP protections. (5) If data was transferred outside the organization, it may be a reportable breach depending on the assessment. (6) Remediation: implement API-level controls that detect and block automated access patterns (rate limiting, CAPTCHA for bulk operations, bot detection), and add DLP rules that alert on large-volume data access through non-standard channels.