Skip to main content

HIPAA Compliance Implementation Guide

This guide provides a complete reference implementation combining all concepts from previous modules into a production-ready healthcare application.
What You’ll Build:
  • HIPAA-compliant FastAPI backend
  • Encrypted database layer
  • Audit logging system
  • E2E encrypted chat with AI
  • Complete access control

Project Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                    HIPAA-COMPLIANT APPLICATION ARCHITECTURE                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   CLIENT LAYER                                                              │
│   ────────────                                                              │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  React/Mobile App                                                    │   │
│   │  • E2E encryption keys                                               │   │
│   │  • Local key storage                                                 │   │
│   │  • TLS 1.3 connections                                               │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   API LAYER                                                                 │
│   ─────────                                                                 │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  FastAPI Application                                                 │   │
│   │  ├── Authentication (JWT + MFA)                                      │   │
│   │  ├── Authorization (RBAC)                                            │   │
│   │  ├── Audit Middleware                                                │   │
│   │  ├── Encryption Service                                              │   │
│   │  └── AI Router                                                       │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   DATA LAYER                                                                │
│   ──────────                                                                │
│   ┌───────────────┐  ┌───────────────┐  ┌───────────────┐                  │
│   │  PostgreSQL   │  │  Redis        │  │  S3           │                  │
│   │  (Encrypted)  │  │  (Sessions)   │  │  (Documents)  │                  │
│   └───────────────┘  └───────────────┘  └───────────────┘                  │
│                                                                              │
│   INFRASTRUCTURE                                                            │
│   ──────────────                                                            │
│   ┌───────────────┐  ┌───────────────┐  ┌───────────────┐                  │
│   │  HashiCorp    │  │  On-Premise   │  │  OpenTelemetry│                  │
│   │  Vault        │  │  LLM          │  │  (Logging)    │                  │
│   └───────────────┘  └───────────────┘  └───────────────┘                  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Project Structure

hipaa-healthcare-api/
├── app/
│   ├── __init__.py
│   ├── main.py                 # FastAPI application entry
│   ├── config.py               # Configuration management
│   ├── dependencies.py         # Dependency injection
│   │
│   ├── api/
│   │   ├── __init__.py
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── auth.py         # Authentication endpoints
│   │   │   ├── patients.py     # Patient CRUD
│   │   │   ├── records.py      # Medical records
│   │   │   ├── chat.py         # E2E encrypted chat
│   │   │   └── ai.py           # AI endpoints
│   │   └── router.py
│   │
│   ├── core/
│   │   ├── __init__.py
│   │   ├── security.py         # Auth, JWT, MFA
│   │   ├── encryption.py       # Encryption service
│   │   ├── audit.py            # Audit logging
│   │   └── rbac.py             # Access control
│   │
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── patient.py
│   │   ├── record.py
│   │   └── audit.py
│   │
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── patient.py
│   │   └── record.py
│   │
│   ├── services/
│   │   ├── __init__.py
│   │   ├── patient_service.py
│   │   ├── record_service.py
│   │   ├── chat_service.py
│   │   └── ai_service.py
│   │
│   └── middleware/
│       ├── __init__.py
│       ├── audit.py
│       ├── security.py
│       └── rate_limit.py

├── tests/
├── alembic/                    # Database migrations
├── docker-compose.yml
├── Dockerfile
└── requirements.txt

Core Configuration

Environment Configuration

# app/config.py

from pydantic_settings import BaseSettings
from pydantic import SecretStr
from functools import lru_cache
from typing import Optional

class Settings(BaseSettings):
    """HIPAA-compliant application settings"""
    
    # Application
    app_name: str = "HIPAA Healthcare API"
    environment: str = "production"
    debug: bool = False
    
    # Security
    secret_key: SecretStr
    jwt_algorithm: str = "RS256"
    jwt_private_key_path: str = "/secrets/jwt-private.pem"
    jwt_public_key_path: str = "/secrets/jwt-public.pem"
    access_token_expire_minutes: int = 15
    refresh_token_expire_days: int = 7
    
    # Database
    database_url: SecretStr
    database_encryption_key_id: str
    
    # Encryption
    vault_addr: str
    vault_token: SecretStr
    kms_key_id: str
    
    # Redis (Sessions)
    redis_url: SecretStr
    session_timeout_seconds: int = 900  # 15 minutes (HIPAA)
    
    # Audit
    audit_log_retention_days: int = 2555  # 7 years
    
    # AI
    llm_model_path: str = "/models/medical-llm"
    ai_processing_mode: str = "on_premise"  # on_premise, hybrid
    
    # MFA
    mfa_issuer: str = "HIPAA Healthcare"
    mfa_required: bool = True
    
    class Config:
        env_file = ".env"
        case_sensitive = False

@lru_cache()
def get_settings() -> Settings:
    return Settings()

Main Application

# app/main.py

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from contextlib import asynccontextmanager
import logging

from app.config import get_settings
from app.api.router import api_router
from app.middleware.audit import AuditMiddleware
from app.middleware.security import SecurityHeadersMiddleware
from app.core.encryption import EncryptionService
from app.core.audit import AuditLogger

settings = get_settings()
logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifecycle management"""
    # Startup
    logger.info("Starting HIPAA-compliant healthcare API")
    
    # Initialize encryption service
    app.state.encryption = EncryptionService(
        vault_addr=settings.vault_addr,
        vault_token=settings.vault_token.get_secret_value(),
    )
    await app.state.encryption.initialize()
    
    # Initialize audit logger
    app.state.audit = AuditLogger(
        retention_days=settings.audit_log_retention_days
    )
    await app.state.audit.start()
    
    logger.info("Application started successfully")
    
    yield
    
    # Shutdown
    logger.info("Shutting down application")
    await app.state.audit.stop()
    await app.state.encryption.close()

app = FastAPI(
    title=settings.app_name,
    lifespan=lifespan,
    docs_url="/docs" if settings.debug else None,
    redoc_url="/redoc" if settings.debug else None,
)

# Security middleware
if settings.environment == "production":
    app.add_middleware(HTTPSRedirectMiddleware)

app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(AuditMiddleware)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.healthcare.example.com"],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["*"],
)

# API routes
app.include_router(api_router, prefix="/api/v1")

@app.get("/health")
async def health_check():
    """Health check endpoint (no PHI)"""
    return {"status": "healthy"}

Authentication & Authorization

Security Module

# app/core/security.py

from datetime import datetime, timedelta
from typing import Optional, List
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel
import pyotp
import secrets

from app.config import get_settings
from app.models.user import User

settings = get_settings()
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")

class TokenPayload(BaseModel):
    sub: str
    exp: datetime
    iat: datetime
    jti: str
    role: str
    permissions: List[str]
    mfa_verified: bool = False

class AuthService:
    """Authentication service with MFA support"""
    
    def __init__(self):
        with open(settings.jwt_private_key_path, "r") as f:
            self.private_key = f.read()
        with open(settings.jwt_public_key_path, "r") as f:
            self.public_key = f.read()
    
    def hash_password(self, password: str) -> str:
        """Hash password using Argon2id"""
        return pwd_context.hash(password)
    
    def verify_password(self, plain: str, hashed: str) -> bool:
        """Verify password against hash"""
        return pwd_context.verify(plain, hashed)
    
    def generate_mfa_secret(self) -> str:
        """Generate TOTP secret for MFA"""
        return pyotp.random_base32()
    
    def verify_mfa_token(self, secret: str, token: str) -> bool:
        """Verify TOTP token"""
        totp = pyotp.TOTP(secret)
        return totp.verify(token, valid_window=1)
    
    def get_mfa_provisioning_uri(self, secret: str, email: str) -> str:
        """Get provisioning URI for authenticator apps"""
        totp = pyotp.TOTP(secret)
        return totp.provisioning_uri(
            name=email,
            issuer_name=settings.mfa_issuer
        )
    
    def create_access_token(
        self,
        user: User,
        mfa_verified: bool = False
    ) -> str:
        """Create JWT access token"""
        
        now = datetime.utcnow()
        payload = TokenPayload(
            sub=str(user.id),
            exp=now + timedelta(minutes=settings.access_token_expire_minutes),
            iat=now,
            jti=secrets.token_hex(16),
            role=user.role,
            permissions=user.permissions,
            mfa_verified=mfa_verified,
        )
        
        return jwt.encode(
            payload.model_dump(),
            self.private_key,
            algorithm=settings.jwt_algorithm
        )
    
    def decode_token(self, token: str) -> TokenPayload:
        """Decode and validate JWT token"""
        
        try:
            payload = jwt.decode(
                token,
                self.public_key,
                algorithms=[settings.jwt_algorithm]
            )
            return TokenPayload(**payload)
        except JWTError as e:
            raise AuthenticationError(f"Invalid token: {e}")
    
    def create_refresh_token(self, user_id: str) -> str:
        """Create refresh token"""
        return secrets.token_urlsafe(64)


class AuthenticationError(Exception):
    pass

RBAC Implementation

# app/core/rbac.py

from enum import Enum
from typing import Set, Optional
from functools import wraps
from fastapi import HTTPException, Depends

class Role(str, Enum):
    PATIENT = "patient"
    PROVIDER = "provider"
    NURSE = "nurse"
    ADMIN = "admin"
    AUDITOR = "auditor"

class Permission(str, Enum):
    # Patient permissions
    READ_OWN_RECORDS = "read:own_records"
    UPDATE_OWN_PROFILE = "update:own_profile"
    
    # Provider permissions
    READ_PATIENT_RECORDS = "read:patient_records"
    WRITE_PATIENT_RECORDS = "write:patient_records"
    PRESCRIBE_MEDICATIONS = "prescribe:medications"
    
    # Admin permissions
    MANAGE_USERS = "manage:users"
    VIEW_AUDIT_LOGS = "view:audit_logs"
    MANAGE_ROLES = "manage:roles"
    
    # Break glass
    BREAK_GLASS_ACCESS = "break_glass:access"

# Role to permissions mapping
ROLE_PERMISSIONS: dict[Role, Set[Permission]] = {
    Role.PATIENT: {
        Permission.READ_OWN_RECORDS,
        Permission.UPDATE_OWN_PROFILE,
    },
    Role.NURSE: {
        Permission.READ_PATIENT_RECORDS,
    },
    Role.PROVIDER: {
        Permission.READ_PATIENT_RECORDS,
        Permission.WRITE_PATIENT_RECORDS,
        Permission.PRESCRIBE_MEDICATIONS,
    },
    Role.ADMIN: {
        Permission.MANAGE_USERS,
        Permission.VIEW_AUDIT_LOGS,
        Permission.MANAGE_ROLES,
    },
    Role.AUDITOR: {
        Permission.VIEW_AUDIT_LOGS,
    },
}

class RBACService:
    """Role-Based Access Control service"""
    
    def __init__(self, audit_logger):
        self.audit = audit_logger
    
    def get_permissions(self, role: Role) -> Set[Permission]:
        """Get permissions for a role"""
        return ROLE_PERMISSIONS.get(role, set())
    
    async def check_permission(
        self,
        user_id: str,
        required_permission: Permission,
        resource_id: Optional[str] = None,
    ) -> bool:
        """Check if user has required permission"""
        
        user = await self._get_user(user_id)
        permissions = self.get_permissions(Role(user.role))
        
        has_permission = required_permission in permissions
        
        # Log access check
        await self.audit.log({
            "event_type": "ACCESS_CHECK",
            "user_id": user_id,
            "permission": required_permission.value,
            "resource_id": resource_id,
            "granted": has_permission,
        })
        
        return has_permission
    
    async def check_patient_access(
        self,
        user_id: str,
        patient_id: str,
    ) -> bool:
        """Check if user can access patient records"""
        
        user = await self._get_user(user_id)
        
        # Patients can only access their own records
        if user.role == Role.PATIENT:
            return str(user.patient_id) == patient_id
        
        # Providers check care team assignment
        if user.role in [Role.PROVIDER, Role.NURSE]:
            return await self._is_on_care_team(user_id, patient_id)
        
        return False
    
    async def break_glass_access(
        self,
        user_id: str,
        patient_id: str,
        reason: str,
    ) -> bool:
        """
        Emergency access override (break glass)
        
        Use only in emergency situations.
        Triggers immediate alerts and audit.
        """
        
        user = await self._get_user(user_id)
        
        if Permission.BREAK_GLASS_ACCESS not in self.get_permissions(Role(user.role)):
            return False
        
        # Log break glass access
        await self.audit.log({
            "event_type": "BREAK_GLASS_ACCESS",
            "user_id": user_id,
            "patient_id": patient_id,
            "reason": reason,
            "sensitivity": "CRITICAL",
        })
        
        # Trigger alerts
        await self._send_break_glass_alert(user_id, patient_id, reason)
        
        return True


# Dependency for permission checking
def require_permission(permission: Permission):
    """FastAPI dependency for permission checking"""
    
    async def permission_checker(
        current_user = Depends(get_current_user),
        rbac: RBACService = Depends(get_rbac_service),
    ):
        if not await rbac.check_permission(
            user_id=str(current_user.id),
            required_permission=permission,
        ):
            raise HTTPException(
                status_code=403,
                detail="Insufficient permissions"
            )
        return current_user
    
    return permission_checker

Patient Records API

Patient Endpoints

# app/api/v1/patients.py

from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional
from uuid import UUID

from app.core.security import get_current_user
from app.core.rbac import require_permission, Permission
from app.core.encryption import EncryptionService
from app.core.audit import AuditLogger
from app.schemas.patient import (
    PatientCreate,
    PatientUpdate,
    PatientResponse,
    MedicalRecordCreate,
    MedicalRecordResponse,
)
from app.services.patient_service import PatientService

router = APIRouter(prefix="/patients", tags=["Patients"])

@router.post("/", response_model=PatientResponse)
async def create_patient(
    patient_data: PatientCreate,
    current_user = Depends(require_permission(Permission.WRITE_PATIENT_RECORDS)),
    patient_service: PatientService = Depends(),
    audit: AuditLogger = Depends(),
):
    """Create a new patient record"""
    
    patient = await patient_service.create_patient(
        data=patient_data,
        created_by=current_user.id,
    )
    
    await audit.log({
        "event_type": "PHI_CREATE",
        "resource_type": "patient",
        "resource_id": str(patient.id),
        "actor_id": str(current_user.id),
    })
    
    return patient


@router.get("/{patient_id}", response_model=PatientResponse)
async def get_patient(
    patient_id: UUID,
    current_user = Depends(get_current_user),
    patient_service: PatientService = Depends(),
    rbac = Depends(),
    audit: AuditLogger = Depends(),
):
    """Get patient by ID"""
    
    # Check access
    if not await rbac.check_patient_access(
        user_id=str(current_user.id),
        patient_id=str(patient_id),
    ):
        raise HTTPException(403, "Access denied to patient records")
    
    patient = await patient_service.get_patient(patient_id)
    if not patient:
        raise HTTPException(404, "Patient not found")
    
    await audit.log({
        "event_type": "PHI_VIEW",
        "resource_type": "patient",
        "resource_id": str(patient_id),
        "patient_id": str(patient_id),
        "actor_id": str(current_user.id),
        "fields_accessed": ["demographics", "contact_info"],
    })
    
    return patient


@router.get("/{patient_id}/records", response_model=List[MedicalRecordResponse])
async def get_patient_records(
    patient_id: UUID,
    record_type: Optional[str] = Query(None),
    start_date: Optional[str] = Query(None),
    end_date: Optional[str] = Query(None),
    current_user = Depends(get_current_user),
    patient_service: PatientService = Depends(),
    rbac = Depends(),
    audit: AuditLogger = Depends(),
):
    """Get patient's medical records"""
    
    if not await rbac.check_patient_access(
        user_id=str(current_user.id),
        patient_id=str(patient_id),
    ):
        raise HTTPException(403, "Access denied")
    
    records = await patient_service.get_medical_records(
        patient_id=patient_id,
        record_type=record_type,
        start_date=start_date,
        end_date=end_date,
    )
    
    await audit.log({
        "event_type": "PHI_VIEW",
        "resource_type": "medical_records",
        "patient_id": str(patient_id),
        "actor_id": str(current_user.id),
        "records_accessed": len(records),
        "filters": {
            "record_type": record_type,
            "date_range": f"{start_date} - {end_date}",
        },
    })
    
    return records


@router.post("/{patient_id}/records", response_model=MedicalRecordResponse)
async def create_medical_record(
    patient_id: UUID,
    record_data: MedicalRecordCreate,
    current_user = Depends(require_permission(Permission.WRITE_PATIENT_RECORDS)),
    patient_service: PatientService = Depends(),
    encryption: EncryptionService = Depends(),
    audit: AuditLogger = Depends(),
):
    """Create a new medical record"""
    
    # Encrypt sensitive fields
    encrypted_data = await encryption.encrypt_record(
        record_data.model_dump(),
        context={"patient_id": str(patient_id)},
    )
    
    record = await patient_service.create_medical_record(
        patient_id=patient_id,
        data=encrypted_data,
        created_by=current_user.id,
    )
    
    await audit.log({
        "event_type": "PHI_CREATE",
        "resource_type": "medical_record",
        "resource_id": str(record.id),
        "patient_id": str(patient_id),
        "actor_id": str(current_user.id),
        "record_type": record_data.record_type,
    })
    
    return record

Patient Service

# app/services/patient_service.py

from typing import List, Optional
from uuid import UUID
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

from app.models.patient import Patient, MedicalRecord
from app.schemas.patient import PatientCreate, MedicalRecordCreate
from app.core.encryption import EncryptionService

class PatientService:
    """Service for patient operations"""
    
    def __init__(
        self,
        db: AsyncSession,
        encryption: EncryptionService,
    ):
        self.db = db
        self.encryption = encryption
    
    async def create_patient(
        self,
        data: PatientCreate,
        created_by: UUID,
    ) -> Patient:
        """Create a new patient with encrypted PHI"""
        
        # Encrypt PHI fields
        encrypted_ssn = await self.encryption.encrypt_field(
            data.ssn,
            field_name="ssn",
        )
        encrypted_name = await self.encryption.encrypt_field(
            data.full_name,
            field_name="name",
        )
        
        patient = Patient(
            mrn=self._generate_mrn(),
            ssn_encrypted=encrypted_ssn,
            name_encrypted=encrypted_name,
            date_of_birth=data.date_of_birth,
            gender=data.gender,
            contact_info_encrypted=await self.encryption.encrypt_field(
                data.contact_info.model_dump_json(),
                field_name="contact_info",
            ),
            created_by=created_by,
            created_at=datetime.utcnow(),
        )
        
        self.db.add(patient)
        await self.db.commit()
        await self.db.refresh(patient)
        
        return patient
    
    async def get_patient(self, patient_id: UUID) -> Optional[Patient]:
        """Get patient with decrypted fields"""
        
        result = await self.db.execute(
            select(Patient).where(Patient.id == patient_id)
        )
        patient = result.scalar_one_or_none()
        
        if patient:
            # Decrypt fields for response
            patient.ssn = await self.encryption.decrypt_field(
                patient.ssn_encrypted,
                field_name="ssn",
            )
            patient.full_name = await self.encryption.decrypt_field(
                patient.name_encrypted,
                field_name="name",
            )
        
        return patient
    
    async def get_medical_records(
        self,
        patient_id: UUID,
        record_type: Optional[str] = None,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None,
    ) -> List[MedicalRecord]:
        """Get patient's medical records with filtering"""
        
        query = select(MedicalRecord).where(
            MedicalRecord.patient_id == patient_id
        )
        
        if record_type:
            query = query.where(MedicalRecord.record_type == record_type)
        if start_date:
            query = query.where(MedicalRecord.created_at >= start_date)
        if end_date:
            query = query.where(MedicalRecord.created_at <= end_date)
        
        result = await self.db.execute(query)
        records = result.scalars().all()
        
        # Decrypt each record
        for record in records:
            record.content = await self.encryption.decrypt_field(
                record.content_encrypted,
                field_name="content",
            )
        
        return records
    
    def _generate_mrn(self) -> str:
        """Generate unique medical record number"""
        import secrets
        return f"MRN-{secrets.token_hex(8).upper()}"

E2E Encrypted Chat

Chat Endpoints

# app/api/v1/chat.py

from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
from typing import List
from uuid import UUID

from app.core.security import get_current_user, ws_authenticate
from app.core.audit import AuditLogger
from app.services.chat_service import ChatService
from app.schemas.chat import (
    MessageSend,
    MessageResponse,
    ConversationResponse,
)

router = APIRouter(prefix="/chat", tags=["Chat"])

@router.get("/conversations", response_model=List[ConversationResponse])
async def get_conversations(
    current_user = Depends(get_current_user),
    chat_service: ChatService = Depends(),
):
    """Get user's conversations"""
    return await chat_service.get_conversations(current_user.id)


@router.post("/messages", response_model=MessageResponse)
async def send_message(
    message: MessageSend,
    current_user = Depends(get_current_user),
    chat_service: ChatService = Depends(),
    audit: AuditLogger = Depends(),
):
    """
    Send E2E encrypted message
    
    Note: Server only sees encrypted ciphertext.
    Decryption happens on client devices.
    """
    
    result = await chat_service.send_message(
        sender_id=current_user.id,
        recipient_id=message.recipient_id,
        encrypted_content=message.encrypted_content,
        session_id=message.session_id,
    )
    
    await audit.log({
        "event_type": "MESSAGE_SENT",
        "actor_id": str(current_user.id),
        "recipient_id": str(message.recipient_id),
        "message_size": len(message.encrypted_content),
        # Note: We do NOT log message content
    })
    
    return result


@router.websocket("/ws")
async def chat_websocket(
    websocket: WebSocket,
    chat_service: ChatService = Depends(),
    audit: AuditLogger = Depends(),
):
    """WebSocket for real-time E2E encrypted chat"""
    
    # Authenticate WebSocket connection
    user = await ws_authenticate(websocket)
    if not user:
        await websocket.close(code=4001)
        return
    
    await websocket.accept()
    
    await audit.log({
        "event_type": "CHAT_SESSION_START",
        "actor_id": str(user.id),
    })
    
    try:
        # Register connection
        await chat_service.connect(user.id, websocket)
        
        while True:
            # Receive encrypted message
            data = await websocket.receive_json()
            
            # Forward encrypted message (no decryption on server)
            await chat_service.handle_message(
                sender_id=user.id,
                data=data,
            )
            
    except WebSocketDisconnect:
        await chat_service.disconnect(user.id)
        await audit.log({
            "event_type": "CHAT_SESSION_END",
            "actor_id": str(user.id),
        })

Chat Service

# app/services/chat_service.py

from typing import Dict, Optional
from uuid import UUID
from fastapi import WebSocket
from datetime import datetime
import json

from app.core.encryption import EncryptionService
from app.models.message import Message, Conversation

class ChatService:
    """
    E2E Encrypted Chat Service
    
    Key principle: Server NEVER sees plaintext messages.
    All encryption/decryption happens on client devices.
    """
    
    def __init__(self, db, encryption: EncryptionService):
        self.db = db
        self.encryption = encryption
        self.connections: Dict[UUID, WebSocket] = {}
    
    async def connect(self, user_id: UUID, websocket: WebSocket):
        """Register WebSocket connection"""
        self.connections[user_id] = websocket
    
    async def disconnect(self, user_id: UUID):
        """Remove WebSocket connection"""
        self.connections.pop(user_id, None)
    
    async def send_message(
        self,
        sender_id: UUID,
        recipient_id: UUID,
        encrypted_content: dict,
        session_id: str,
    ) -> Message:
        """
        Store and forward encrypted message
        
        The encrypted_content contains:
        - ciphertext: E2E encrypted message
        - header: Signal protocol header (public DH key, etc.)
        - nonce: Encryption nonce
        """
        
        # Store encrypted message
        message = Message(
            conversation_id=await self._get_or_create_conversation(
                sender_id, recipient_id
            ),
            sender_id=sender_id,
            recipient_id=recipient_id,
            encrypted_content=json.dumps(encrypted_content),
            session_id=session_id,
            created_at=datetime.utcnow(),
        )
        
        self.db.add(message)
        await self.db.commit()
        
        # Forward to recipient if online
        if recipient_id in self.connections:
            await self.connections[recipient_id].send_json({
                "type": "message",
                "message_id": str(message.id),
                "sender_id": str(sender_id),
                "encrypted_content": encrypted_content,
                "timestamp": message.created_at.isoformat(),
            })
        
        return message
    
    async def handle_message(self, sender_id: UUID, data: dict):
        """Handle incoming WebSocket message"""
        
        msg_type = data.get("type")
        
        if msg_type == "message":
            await self.send_message(
                sender_id=sender_id,
                recipient_id=UUID(data["recipient_id"]),
                encrypted_content=data["encrypted_content"],
                session_id=data["session_id"],
            )
        
        elif msg_type == "key_exchange":
            # Handle Signal protocol key exchange
            await self._handle_key_exchange(sender_id, data)
        
        elif msg_type == "typing":
            # Forward typing indicator (no PHI)
            await self._forward_typing(sender_id, UUID(data["recipient_id"]))
    
    async def get_messages(
        self,
        user_id: UUID,
        conversation_id: UUID,
        limit: int = 50,
    ) -> list:
        """Get encrypted messages for a conversation"""
        
        # Verify user is part of conversation
        conv = await self.db.get(Conversation, conversation_id)
        if user_id not in [conv.participant1_id, conv.participant2_id]:
            raise PermissionError("Not a participant in this conversation")
        
        messages = await self.db.execute(
            select(Message)
            .where(Message.conversation_id == conversation_id)
            .order_by(Message.created_at.desc())
            .limit(limit)
        )
        
        # Return encrypted messages (client will decrypt)
        return [
            {
                "id": str(m.id),
                "sender_id": str(m.sender_id),
                "encrypted_content": json.loads(m.encrypted_content),
                "timestamp": m.created_at.isoformat(),
            }
            for m in messages.scalars()
        ]

AI Integration

AI Endpoints

# app/api/v1/ai.py

from fastapi import APIRouter, Depends, HTTPException
from typing import Optional
from uuid import UUID

from app.core.security import get_current_user
from app.core.rbac import require_permission, Permission
from app.core.audit import AuditLogger
from app.services.ai_service import AIService
from app.schemas.ai import (
    MedicalQuery,
    AIResponse,
    AIContextRequest,
)

router = APIRouter(prefix="/ai", tags=["AI Assistant"])

@router.post("/query", response_model=AIResponse)
async def medical_ai_query(
    query: MedicalQuery,
    current_user = Depends(get_current_user),
    ai_service: AIService = Depends(),
    audit: AuditLogger = Depends(),
):
    """
    Process medical query with AI
    
    Processing happens on-premise to protect PHI.
    """
    
    # Log AI query (not the content)
    await audit.log({
        "event_type": "AI_QUERY",
        "actor_id": str(current_user.id),
        "query_length": len(query.query),
        "has_context": bool(query.patient_context),
        "processing_mode": ai_service.processing_mode,
    })
    
    response = await ai_service.process_query(
        query=query.query,
        patient_context=query.patient_context,
        user_id=current_user.id,
    )
    
    await audit.log({
        "event_type": "AI_RESPONSE",
        "actor_id": str(current_user.id),
        "response_length": len(response.response),
    })
    
    return response


@router.post("/chat/{conversation_id}/ai-assist")
async def ai_assisted_chat(
    conversation_id: UUID,
    context: AIContextRequest,
    current_user = Depends(require_permission(Permission.READ_PATIENT_RECORDS)),
    ai_service: AIService = Depends(),
    audit: AuditLogger = Depends(),
):
    """
    AI assistance within E2E encrypted chat
    
    Flow:
    1. Client decrypts relevant messages
    2. Sends de-identified context to this endpoint
    3. AI generates response
    4. Client encrypts and sends to chat
    """
    
    # Verify context is de-identified
    if not ai_service.verify_deidentified(context.context):
        raise HTTPException(
            400,
            "Context contains identifiable information. Please de-identify."
        )
    
    response = await ai_service.generate_assistance(
        context=context.context,
        query=context.query,
    )
    
    await audit.log({
        "event_type": "AI_CHAT_ASSIST",
        "actor_id": str(current_user.id),
        "conversation_id": str(conversation_id),
    })
    
    return {"ai_response": response}

AI Service

# app/services/ai_service.py

from typing import Optional
import re

from app.config import get_settings
from app.schemas.ai import AIResponse

settings = get_settings()

class AIService:
    """
    HIPAA-compliant AI service
    
    Uses on-premise LLM for PHI processing.
    """
    
    def __init__(self, llm_client, deidentifier):
        self.llm = llm_client
        self.deidentifier = deidentifier
        self.processing_mode = settings.ai_processing_mode
    
    async def process_query(
        self,
        query: str,
        patient_context: Optional[str] = None,
        user_id: str = None,
    ) -> AIResponse:
        """Process medical query with on-premise LLM"""
        
        # Build prompt
        system_prompt = self._get_medical_system_prompt()
        
        full_prompt = f"""{system_prompt}

Patient Context: {patient_context or 'Not provided'}

Healthcare Provider Query: {query}

Medical AI Response:"""

        # Generate with on-premise LLM
        response_text = await self.llm.generate(
            prompt=full_prompt,
            max_tokens=1000,
            temperature=0.7,
        )
        
        return AIResponse(
            response=response_text,
            disclaimer="This AI response is for informational purposes only. "
                       "Please use clinical judgment and consult appropriate "
                       "medical resources for patient care decisions.",
            processing_mode=self.processing_mode,
        )
    
    def verify_deidentified(self, text: str) -> bool:
        """Verify text doesn't contain identifiable PHI"""
        
        # Check for common PHI patterns
        phi_patterns = [
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
            r'\b\d{2}/\d{2}/\d{4}\b',  # DOB
            r'\bMRN[:\s]*\d+\b',  # MRN
            r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',  # Email
            r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b',  # Phone
        ]
        
        for pattern in phi_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                return False
        
        return True
    
    def _get_medical_system_prompt(self) -> str:
        return """You are a medical AI assistant operating in a HIPAA-compliant 
healthcare environment. You are assisting licensed healthcare providers.

GUIDELINES:
1. Provide evidence-based medical information
2. Reference clinical guidelines when applicable
3. Highlight important safety considerations
4. Suggest relevant differential diagnoses when appropriate
5. Recommend appropriate follow-up or specialist referrals
6. Always defer final clinical decisions to the healthcare provider

IMPORTANT: This is a clinical decision support tool. All recommendations 
should be verified against current medical literature and clinical judgment.
"""

Docker Deployment

Docker Compose

# docker-compose.yml

version: '3.8'

services:
  api:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://hipaa:${DB_PASSWORD}@db:5432/healthcare
      - REDIS_URL=redis://:${REDIS_PASSWORD}@redis:6379
      - VAULT_ADDR=http://vault:8200
    depends_on:
      - db
      - redis
      - vault
    volumes:
      - ./secrets:/secrets:ro
    networks:
      - hipaa-network
    deploy:
      resources:
        limits:
          memory: 2G

  db:
    image: postgres:15
    environment:
      POSTGRES_USER: hipaa
      POSTGRES_PASSWORD: ${DB_PASSWORD}
      POSTGRES_DB: healthcare
    volumes:
      - postgres-data:/var/lib/postgresql/data
      - ./init-db.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - hipaa-network
    # Enable TDE
    command: >
      postgres
      -c ssl=on
      -c ssl_cert_file=/var/lib/postgresql/server.crt
      -c ssl_key_file=/var/lib/postgresql/server.key

  redis:
    image: redis:7-alpine
    command: redis-server --requirepass ${REDIS_PASSWORD} --appendonly yes
    volumes:
      - redis-data:/data
    networks:
      - hipaa-network

  vault:
    image: hashicorp/vault:latest
    environment:
      VAULT_DEV_ROOT_TOKEN_ID: ${VAULT_TOKEN}
    cap_add:
      - IPC_LOCK
    volumes:
      - vault-data:/vault/data
    networks:
      - hipaa-network

  llm:
    image: vllm/vllm-openai:latest
    runtime: nvidia
    environment:
      MODEL_NAME: /models/medical-llm
    volumes:
      - ./models:/models:ro
    networks:
      - hipaa-network
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: all
              capabilities: [gpu]

networks:
  hipaa-network:
    driver: bridge

volumes:
  postgres-data:
  redis-data:
  vault-data:

Dockerfile

# Dockerfile

FROM python:3.11-slim as builder

WORKDIR /app

# Install build dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt

# Production stage
FROM python:3.11-slim

WORKDIR /app

# Create non-root user
RUN groupadd -r hipaa && useradd -r -g hipaa hipaa

# Install runtime dependencies
RUN apt-get update && apt-get install -y \
    libpq5 \
    && rm -rf /var/lib/apt/lists/*

# Copy wheels and install
COPY --from=builder /app/wheels /wheels
RUN pip install --no-cache /wheels/*

# Copy application
COPY app /app/app

# Set ownership
RUN chown -R hipaa:hipaa /app

USER hipaa

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

Key Takeaways

Defense in Depth

Multiple security layers: TLS, encryption, RBAC, audit

Encrypt at Field Level

Don’t just encrypt the database; encrypt sensitive fields

Audit Everything

Every PHI access must be logged without logging PHI content

AI Stays On-Premise

Keep LLM processing within your infrastructure

Next Steps