HIPAA Compliance Implementation Guide
This guide provides a complete reference implementation combining all concepts from previous modules into a production-ready healthcare application.What You’ll Build:
- HIPAA-compliant FastAPI backend
- Encrypted database layer
- Audit logging system
- E2E encrypted chat with AI
- Complete access control
Project Architecture
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ HIPAA-COMPLIANT APPLICATION ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ CLIENT LAYER │
│ ──────────── │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ React/Mobile App │ │
│ │ • E2E encryption keys │ │
│ │ • Local key storage │ │
│ │ • TLS 1.3 connections │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ API LAYER │
│ ───────── │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ FastAPI Application │ │
│ │ ├── Authentication (JWT + MFA) │ │
│ │ ├── Authorization (RBAC) │ │
│ │ ├── Audit Middleware │ │
│ │ ├── Encryption Service │ │
│ │ └── AI Router │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ DATA LAYER │
│ ────────── │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ PostgreSQL │ │ Redis │ │ S3 │ │
│ │ (Encrypted) │ │ (Sessions) │ │ (Documents) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
│ INFRASTRUCTURE │
│ ────────────── │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ HashiCorp │ │ On-Premise │ │ OpenTelemetry│ │
│ │ Vault │ │ LLM │ │ (Logging) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Project Structure
Copy
hipaa-healthcare-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI application entry
│ ├── config.py # Configuration management
│ ├── dependencies.py # Dependency injection
│ │
│ ├── api/
│ │ ├── __init__.py
│ │ ├── v1/
│ │ │ ├── __init__.py
│ │ │ ├── auth.py # Authentication endpoints
│ │ │ ├── patients.py # Patient CRUD
│ │ │ ├── records.py # Medical records
│ │ │ ├── chat.py # E2E encrypted chat
│ │ │ └── ai.py # AI endpoints
│ │ └── router.py
│ │
│ ├── core/
│ │ ├── __init__.py
│ │ ├── security.py # Auth, JWT, MFA
│ │ ├── encryption.py # Encryption service
│ │ ├── audit.py # Audit logging
│ │ └── rbac.py # Access control
│ │
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── patient.py
│ │ ├── record.py
│ │ └── audit.py
│ │
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── patient.py
│ │ └── record.py
│ │
│ ├── services/
│ │ ├── __init__.py
│ │ ├── patient_service.py
│ │ ├── record_service.py
│ │ ├── chat_service.py
│ │ └── ai_service.py
│ │
│ └── middleware/
│ ├── __init__.py
│ ├── audit.py
│ ├── security.py
│ └── rate_limit.py
│
├── tests/
├── alembic/ # Database migrations
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
Core Configuration
Environment Configuration
Copy
# app/config.py
from pydantic_settings import BaseSettings
from pydantic import SecretStr
from functools import lru_cache
from typing import Optional
class Settings(BaseSettings):
"""HIPAA-compliant application settings"""
# Application
app_name: str = "HIPAA Healthcare API"
environment: str = "production"
debug: bool = False
# Security
secret_key: SecretStr
jwt_algorithm: str = "RS256"
jwt_private_key_path: str = "/secrets/jwt-private.pem"
jwt_public_key_path: str = "/secrets/jwt-public.pem"
access_token_expire_minutes: int = 15
refresh_token_expire_days: int = 7
# Database
database_url: SecretStr
database_encryption_key_id: str
# Encryption
vault_addr: str
vault_token: SecretStr
kms_key_id: str
# Redis (Sessions)
redis_url: SecretStr
session_timeout_seconds: int = 900 # 15 minutes (HIPAA)
# Audit
audit_log_retention_days: int = 2555 # 7 years
# AI
llm_model_path: str = "/models/medical-llm"
ai_processing_mode: str = "on_premise" # on_premise, hybrid
# MFA
mfa_issuer: str = "HIPAA Healthcare"
mfa_required: bool = True
class Config:
env_file = ".env"
case_sensitive = False
@lru_cache()
def get_settings() -> Settings:
return Settings()
Main Application
Copy
# app/main.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from contextlib import asynccontextmanager
import logging
from app.config import get_settings
from app.api.router import api_router
from app.middleware.audit import AuditMiddleware
from app.middleware.security import SecurityHeadersMiddleware
from app.core.encryption import EncryptionService
from app.core.audit import AuditLogger
settings = get_settings()
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifecycle management"""
# Startup
logger.info("Starting HIPAA-compliant healthcare API")
# Initialize encryption service
app.state.encryption = EncryptionService(
vault_addr=settings.vault_addr,
vault_token=settings.vault_token.get_secret_value(),
)
await app.state.encryption.initialize()
# Initialize audit logger
app.state.audit = AuditLogger(
retention_days=settings.audit_log_retention_days
)
await app.state.audit.start()
logger.info("Application started successfully")
yield
# Shutdown
logger.info("Shutting down application")
await app.state.audit.stop()
await app.state.encryption.close()
app = FastAPI(
title=settings.app_name,
lifespan=lifespan,
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None,
)
# Security middleware
if settings.environment == "production":
app.add_middleware(HTTPSRedirectMiddleware)
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(AuditMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.healthcare.example.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
# API routes
app.include_router(api_router, prefix="/api/v1")
@app.get("/health")
async def health_check():
"""Health check endpoint (no PHI)"""
return {"status": "healthy"}
Authentication & Authorization
Security Module
Copy
# app/core/security.py
from datetime import datetime, timedelta
from typing import Optional, List
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel
import pyotp
import secrets
from app.config import get_settings
from app.models.user import User
settings = get_settings()
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
class TokenPayload(BaseModel):
sub: str
exp: datetime
iat: datetime
jti: str
role: str
permissions: List[str]
mfa_verified: bool = False
class AuthService:
"""Authentication service with MFA support"""
def __init__(self):
with open(settings.jwt_private_key_path, "r") as f:
self.private_key = f.read()
with open(settings.jwt_public_key_path, "r") as f:
self.public_key = f.read()
def hash_password(self, password: str) -> str:
"""Hash password using Argon2id"""
return pwd_context.hash(password)
def verify_password(self, plain: str, hashed: str) -> bool:
"""Verify password against hash"""
return pwd_context.verify(plain, hashed)
def generate_mfa_secret(self) -> str:
"""Generate TOTP secret for MFA"""
return pyotp.random_base32()
def verify_mfa_token(self, secret: str, token: str) -> bool:
"""Verify TOTP token"""
totp = pyotp.TOTP(secret)
return totp.verify(token, valid_window=1)
def get_mfa_provisioning_uri(self, secret: str, email: str) -> str:
"""Get provisioning URI for authenticator apps"""
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(
name=email,
issuer_name=settings.mfa_issuer
)
def create_access_token(
self,
user: User,
mfa_verified: bool = False
) -> str:
"""Create JWT access token"""
now = datetime.utcnow()
payload = TokenPayload(
sub=str(user.id),
exp=now + timedelta(minutes=settings.access_token_expire_minutes),
iat=now,
jti=secrets.token_hex(16),
role=user.role,
permissions=user.permissions,
mfa_verified=mfa_verified,
)
return jwt.encode(
payload.model_dump(),
self.private_key,
algorithm=settings.jwt_algorithm
)
def decode_token(self, token: str) -> TokenPayload:
"""Decode and validate JWT token"""
try:
payload = jwt.decode(
token,
self.public_key,
algorithms=[settings.jwt_algorithm]
)
return TokenPayload(**payload)
except JWTError as e:
raise AuthenticationError(f"Invalid token: {e}")
def create_refresh_token(self, user_id: str) -> str:
"""Create refresh token"""
return secrets.token_urlsafe(64)
class AuthenticationError(Exception):
pass
RBAC Implementation
Copy
# app/core/rbac.py
from enum import Enum
from typing import Set, Optional
from functools import wraps
from fastapi import HTTPException, Depends
class Role(str, Enum):
PATIENT = "patient"
PROVIDER = "provider"
NURSE = "nurse"
ADMIN = "admin"
AUDITOR = "auditor"
class Permission(str, Enum):
# Patient permissions
READ_OWN_RECORDS = "read:own_records"
UPDATE_OWN_PROFILE = "update:own_profile"
# Provider permissions
READ_PATIENT_RECORDS = "read:patient_records"
WRITE_PATIENT_RECORDS = "write:patient_records"
PRESCRIBE_MEDICATIONS = "prescribe:medications"
# Admin permissions
MANAGE_USERS = "manage:users"
VIEW_AUDIT_LOGS = "view:audit_logs"
MANAGE_ROLES = "manage:roles"
# Break glass
BREAK_GLASS_ACCESS = "break_glass:access"
# Role to permissions mapping
ROLE_PERMISSIONS: dict[Role, Set[Permission]] = {
Role.PATIENT: {
Permission.READ_OWN_RECORDS,
Permission.UPDATE_OWN_PROFILE,
},
Role.NURSE: {
Permission.READ_PATIENT_RECORDS,
},
Role.PROVIDER: {
Permission.READ_PATIENT_RECORDS,
Permission.WRITE_PATIENT_RECORDS,
Permission.PRESCRIBE_MEDICATIONS,
},
Role.ADMIN: {
Permission.MANAGE_USERS,
Permission.VIEW_AUDIT_LOGS,
Permission.MANAGE_ROLES,
},
Role.AUDITOR: {
Permission.VIEW_AUDIT_LOGS,
},
}
class RBACService:
"""Role-Based Access Control service"""
def __init__(self, audit_logger):
self.audit = audit_logger
def get_permissions(self, role: Role) -> Set[Permission]:
"""Get permissions for a role"""
return ROLE_PERMISSIONS.get(role, set())
async def check_permission(
self,
user_id: str,
required_permission: Permission,
resource_id: Optional[str] = None,
) -> bool:
"""Check if user has required permission"""
user = await self._get_user(user_id)
permissions = self.get_permissions(Role(user.role))
has_permission = required_permission in permissions
# Log access check
await self.audit.log({
"event_type": "ACCESS_CHECK",
"user_id": user_id,
"permission": required_permission.value,
"resource_id": resource_id,
"granted": has_permission,
})
return has_permission
async def check_patient_access(
self,
user_id: str,
patient_id: str,
) -> bool:
"""Check if user can access patient records"""
user = await self._get_user(user_id)
# Patients can only access their own records
if user.role == Role.PATIENT:
return str(user.patient_id) == patient_id
# Providers check care team assignment
if user.role in [Role.PROVIDER, Role.NURSE]:
return await self._is_on_care_team(user_id, patient_id)
return False
async def break_glass_access(
self,
user_id: str,
patient_id: str,
reason: str,
) -> bool:
"""
Emergency access override (break glass)
Use only in emergency situations.
Triggers immediate alerts and audit.
"""
user = await self._get_user(user_id)
if Permission.BREAK_GLASS_ACCESS not in self.get_permissions(Role(user.role)):
return False
# Log break glass access
await self.audit.log({
"event_type": "BREAK_GLASS_ACCESS",
"user_id": user_id,
"patient_id": patient_id,
"reason": reason,
"sensitivity": "CRITICAL",
})
# Trigger alerts
await self._send_break_glass_alert(user_id, patient_id, reason)
return True
# Dependency for permission checking
def require_permission(permission: Permission):
"""FastAPI dependency for permission checking"""
async def permission_checker(
current_user = Depends(get_current_user),
rbac: RBACService = Depends(get_rbac_service),
):
if not await rbac.check_permission(
user_id=str(current_user.id),
required_permission=permission,
):
raise HTTPException(
status_code=403,
detail="Insufficient permissions"
)
return current_user
return permission_checker
Patient Records API
Patient Endpoints
Copy
# app/api/v1/patients.py
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional
from uuid import UUID
from app.core.security import get_current_user
from app.core.rbac import require_permission, Permission
from app.core.encryption import EncryptionService
from app.core.audit import AuditLogger
from app.schemas.patient import (
PatientCreate,
PatientUpdate,
PatientResponse,
MedicalRecordCreate,
MedicalRecordResponse,
)
from app.services.patient_service import PatientService
router = APIRouter(prefix="/patients", tags=["Patients"])
@router.post("/", response_model=PatientResponse)
async def create_patient(
patient_data: PatientCreate,
current_user = Depends(require_permission(Permission.WRITE_PATIENT_RECORDS)),
patient_service: PatientService = Depends(),
audit: AuditLogger = Depends(),
):
"""Create a new patient record"""
patient = await patient_service.create_patient(
data=patient_data,
created_by=current_user.id,
)
await audit.log({
"event_type": "PHI_CREATE",
"resource_type": "patient",
"resource_id": str(patient.id),
"actor_id": str(current_user.id),
})
return patient
@router.get("/{patient_id}", response_model=PatientResponse)
async def get_patient(
patient_id: UUID,
current_user = Depends(get_current_user),
patient_service: PatientService = Depends(),
rbac = Depends(),
audit: AuditLogger = Depends(),
):
"""Get patient by ID"""
# Check access
if not await rbac.check_patient_access(
user_id=str(current_user.id),
patient_id=str(patient_id),
):
raise HTTPException(403, "Access denied to patient records")
patient = await patient_service.get_patient(patient_id)
if not patient:
raise HTTPException(404, "Patient not found")
await audit.log({
"event_type": "PHI_VIEW",
"resource_type": "patient",
"resource_id": str(patient_id),
"patient_id": str(patient_id),
"actor_id": str(current_user.id),
"fields_accessed": ["demographics", "contact_info"],
})
return patient
@router.get("/{patient_id}/records", response_model=List[MedicalRecordResponse])
async def get_patient_records(
patient_id: UUID,
record_type: Optional[str] = Query(None),
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
current_user = Depends(get_current_user),
patient_service: PatientService = Depends(),
rbac = Depends(),
audit: AuditLogger = Depends(),
):
"""Get patient's medical records"""
if not await rbac.check_patient_access(
user_id=str(current_user.id),
patient_id=str(patient_id),
):
raise HTTPException(403, "Access denied")
records = await patient_service.get_medical_records(
patient_id=patient_id,
record_type=record_type,
start_date=start_date,
end_date=end_date,
)
await audit.log({
"event_type": "PHI_VIEW",
"resource_type": "medical_records",
"patient_id": str(patient_id),
"actor_id": str(current_user.id),
"records_accessed": len(records),
"filters": {
"record_type": record_type,
"date_range": f"{start_date} - {end_date}",
},
})
return records
@router.post("/{patient_id}/records", response_model=MedicalRecordResponse)
async def create_medical_record(
patient_id: UUID,
record_data: MedicalRecordCreate,
current_user = Depends(require_permission(Permission.WRITE_PATIENT_RECORDS)),
patient_service: PatientService = Depends(),
encryption: EncryptionService = Depends(),
audit: AuditLogger = Depends(),
):
"""Create a new medical record"""
# Encrypt sensitive fields
encrypted_data = await encryption.encrypt_record(
record_data.model_dump(),
context={"patient_id": str(patient_id)},
)
record = await patient_service.create_medical_record(
patient_id=patient_id,
data=encrypted_data,
created_by=current_user.id,
)
await audit.log({
"event_type": "PHI_CREATE",
"resource_type": "medical_record",
"resource_id": str(record.id),
"patient_id": str(patient_id),
"actor_id": str(current_user.id),
"record_type": record_data.record_type,
})
return record
Patient Service
Copy
# app/services/patient_service.py
from typing import List, Optional
from uuid import UUID
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.patient import Patient, MedicalRecord
from app.schemas.patient import PatientCreate, MedicalRecordCreate
from app.core.encryption import EncryptionService
class PatientService:
"""Service for patient operations"""
def __init__(
self,
db: AsyncSession,
encryption: EncryptionService,
):
self.db = db
self.encryption = encryption
async def create_patient(
self,
data: PatientCreate,
created_by: UUID,
) -> Patient:
"""Create a new patient with encrypted PHI"""
# Encrypt PHI fields
encrypted_ssn = await self.encryption.encrypt_field(
data.ssn,
field_name="ssn",
)
encrypted_name = await self.encryption.encrypt_field(
data.full_name,
field_name="name",
)
patient = Patient(
mrn=self._generate_mrn(),
ssn_encrypted=encrypted_ssn,
name_encrypted=encrypted_name,
date_of_birth=data.date_of_birth,
gender=data.gender,
contact_info_encrypted=await self.encryption.encrypt_field(
data.contact_info.model_dump_json(),
field_name="contact_info",
),
created_by=created_by,
created_at=datetime.utcnow(),
)
self.db.add(patient)
await self.db.commit()
await self.db.refresh(patient)
return patient
async def get_patient(self, patient_id: UUID) -> Optional[Patient]:
"""Get patient with decrypted fields"""
result = await self.db.execute(
select(Patient).where(Patient.id == patient_id)
)
patient = result.scalar_one_or_none()
if patient:
# Decrypt fields for response
patient.ssn = await self.encryption.decrypt_field(
patient.ssn_encrypted,
field_name="ssn",
)
patient.full_name = await self.encryption.decrypt_field(
patient.name_encrypted,
field_name="name",
)
return patient
async def get_medical_records(
self,
patient_id: UUID,
record_type: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> List[MedicalRecord]:
"""Get patient's medical records with filtering"""
query = select(MedicalRecord).where(
MedicalRecord.patient_id == patient_id
)
if record_type:
query = query.where(MedicalRecord.record_type == record_type)
if start_date:
query = query.where(MedicalRecord.created_at >= start_date)
if end_date:
query = query.where(MedicalRecord.created_at <= end_date)
result = await self.db.execute(query)
records = result.scalars().all()
# Decrypt each record
for record in records:
record.content = await self.encryption.decrypt_field(
record.content_encrypted,
field_name="content",
)
return records
def _generate_mrn(self) -> str:
"""Generate unique medical record number"""
import secrets
return f"MRN-{secrets.token_hex(8).upper()}"
E2E Encrypted Chat
Chat Endpoints
Copy
# app/api/v1/chat.py
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
from typing import List
from uuid import UUID
from app.core.security import get_current_user, ws_authenticate
from app.core.audit import AuditLogger
from app.services.chat_service import ChatService
from app.schemas.chat import (
MessageSend,
MessageResponse,
ConversationResponse,
)
router = APIRouter(prefix="/chat", tags=["Chat"])
@router.get("/conversations", response_model=List[ConversationResponse])
async def get_conversations(
current_user = Depends(get_current_user),
chat_service: ChatService = Depends(),
):
"""Get user's conversations"""
return await chat_service.get_conversations(current_user.id)
@router.post("/messages", response_model=MessageResponse)
async def send_message(
message: MessageSend,
current_user = Depends(get_current_user),
chat_service: ChatService = Depends(),
audit: AuditLogger = Depends(),
):
"""
Send E2E encrypted message
Note: Server only sees encrypted ciphertext.
Decryption happens on client devices.
"""
result = await chat_service.send_message(
sender_id=current_user.id,
recipient_id=message.recipient_id,
encrypted_content=message.encrypted_content,
session_id=message.session_id,
)
await audit.log({
"event_type": "MESSAGE_SENT",
"actor_id": str(current_user.id),
"recipient_id": str(message.recipient_id),
"message_size": len(message.encrypted_content),
# Note: We do NOT log message content
})
return result
@router.websocket("/ws")
async def chat_websocket(
websocket: WebSocket,
chat_service: ChatService = Depends(),
audit: AuditLogger = Depends(),
):
"""WebSocket for real-time E2E encrypted chat"""
# Authenticate WebSocket connection
user = await ws_authenticate(websocket)
if not user:
await websocket.close(code=4001)
return
await websocket.accept()
await audit.log({
"event_type": "CHAT_SESSION_START",
"actor_id": str(user.id),
})
try:
# Register connection
await chat_service.connect(user.id, websocket)
while True:
# Receive encrypted message
data = await websocket.receive_json()
# Forward encrypted message (no decryption on server)
await chat_service.handle_message(
sender_id=user.id,
data=data,
)
except WebSocketDisconnect:
await chat_service.disconnect(user.id)
await audit.log({
"event_type": "CHAT_SESSION_END",
"actor_id": str(user.id),
})
Chat Service
Copy
# app/services/chat_service.py
from typing import Dict, Optional
from uuid import UUID
from fastapi import WebSocket
from datetime import datetime
import json
from app.core.encryption import EncryptionService
from app.models.message import Message, Conversation
class ChatService:
"""
E2E Encrypted Chat Service
Key principle: Server NEVER sees plaintext messages.
All encryption/decryption happens on client devices.
"""
def __init__(self, db, encryption: EncryptionService):
self.db = db
self.encryption = encryption
self.connections: Dict[UUID, WebSocket] = {}
async def connect(self, user_id: UUID, websocket: WebSocket):
"""Register WebSocket connection"""
self.connections[user_id] = websocket
async def disconnect(self, user_id: UUID):
"""Remove WebSocket connection"""
self.connections.pop(user_id, None)
async def send_message(
self,
sender_id: UUID,
recipient_id: UUID,
encrypted_content: dict,
session_id: str,
) -> Message:
"""
Store and forward encrypted message
The encrypted_content contains:
- ciphertext: E2E encrypted message
- header: Signal protocol header (public DH key, etc.)
- nonce: Encryption nonce
"""
# Store encrypted message
message = Message(
conversation_id=await self._get_or_create_conversation(
sender_id, recipient_id
),
sender_id=sender_id,
recipient_id=recipient_id,
encrypted_content=json.dumps(encrypted_content),
session_id=session_id,
created_at=datetime.utcnow(),
)
self.db.add(message)
await self.db.commit()
# Forward to recipient if online
if recipient_id in self.connections:
await self.connections[recipient_id].send_json({
"type": "message",
"message_id": str(message.id),
"sender_id": str(sender_id),
"encrypted_content": encrypted_content,
"timestamp": message.created_at.isoformat(),
})
return message
async def handle_message(self, sender_id: UUID, data: dict):
"""Handle incoming WebSocket message"""
msg_type = data.get("type")
if msg_type == "message":
await self.send_message(
sender_id=sender_id,
recipient_id=UUID(data["recipient_id"]),
encrypted_content=data["encrypted_content"],
session_id=data["session_id"],
)
elif msg_type == "key_exchange":
# Handle Signal protocol key exchange
await self._handle_key_exchange(sender_id, data)
elif msg_type == "typing":
# Forward typing indicator (no PHI)
await self._forward_typing(sender_id, UUID(data["recipient_id"]))
async def get_messages(
self,
user_id: UUID,
conversation_id: UUID,
limit: int = 50,
) -> list:
"""Get encrypted messages for a conversation"""
# Verify user is part of conversation
conv = await self.db.get(Conversation, conversation_id)
if user_id not in [conv.participant1_id, conv.participant2_id]:
raise PermissionError("Not a participant in this conversation")
messages = await self.db.execute(
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at.desc())
.limit(limit)
)
# Return encrypted messages (client will decrypt)
return [
{
"id": str(m.id),
"sender_id": str(m.sender_id),
"encrypted_content": json.loads(m.encrypted_content),
"timestamp": m.created_at.isoformat(),
}
for m in messages.scalars()
]
AI Integration
AI Endpoints
Copy
# app/api/v1/ai.py
from fastapi import APIRouter, Depends, HTTPException
from typing import Optional
from uuid import UUID
from app.core.security import get_current_user
from app.core.rbac import require_permission, Permission
from app.core.audit import AuditLogger
from app.services.ai_service import AIService
from app.schemas.ai import (
MedicalQuery,
AIResponse,
AIContextRequest,
)
router = APIRouter(prefix="/ai", tags=["AI Assistant"])
@router.post("/query", response_model=AIResponse)
async def medical_ai_query(
query: MedicalQuery,
current_user = Depends(get_current_user),
ai_service: AIService = Depends(),
audit: AuditLogger = Depends(),
):
"""
Process medical query with AI
Processing happens on-premise to protect PHI.
"""
# Log AI query (not the content)
await audit.log({
"event_type": "AI_QUERY",
"actor_id": str(current_user.id),
"query_length": len(query.query),
"has_context": bool(query.patient_context),
"processing_mode": ai_service.processing_mode,
})
response = await ai_service.process_query(
query=query.query,
patient_context=query.patient_context,
user_id=current_user.id,
)
await audit.log({
"event_type": "AI_RESPONSE",
"actor_id": str(current_user.id),
"response_length": len(response.response),
})
return response
@router.post("/chat/{conversation_id}/ai-assist")
async def ai_assisted_chat(
conversation_id: UUID,
context: AIContextRequest,
current_user = Depends(require_permission(Permission.READ_PATIENT_RECORDS)),
ai_service: AIService = Depends(),
audit: AuditLogger = Depends(),
):
"""
AI assistance within E2E encrypted chat
Flow:
1. Client decrypts relevant messages
2. Sends de-identified context to this endpoint
3. AI generates response
4. Client encrypts and sends to chat
"""
# Verify context is de-identified
if not ai_service.verify_deidentified(context.context):
raise HTTPException(
400,
"Context contains identifiable information. Please de-identify."
)
response = await ai_service.generate_assistance(
context=context.context,
query=context.query,
)
await audit.log({
"event_type": "AI_CHAT_ASSIST",
"actor_id": str(current_user.id),
"conversation_id": str(conversation_id),
})
return {"ai_response": response}
AI Service
Copy
# app/services/ai_service.py
from typing import Optional
import re
from app.config import get_settings
from app.schemas.ai import AIResponse
settings = get_settings()
class AIService:
"""
HIPAA-compliant AI service
Uses on-premise LLM for PHI processing.
"""
def __init__(self, llm_client, deidentifier):
self.llm = llm_client
self.deidentifier = deidentifier
self.processing_mode = settings.ai_processing_mode
async def process_query(
self,
query: str,
patient_context: Optional[str] = None,
user_id: str = None,
) -> AIResponse:
"""Process medical query with on-premise LLM"""
# Build prompt
system_prompt = self._get_medical_system_prompt()
full_prompt = f"""{system_prompt}
Patient Context: {patient_context or 'Not provided'}
Healthcare Provider Query: {query}
Medical AI Response:"""
# Generate with on-premise LLM
response_text = await self.llm.generate(
prompt=full_prompt,
max_tokens=1000,
temperature=0.7,
)
return AIResponse(
response=response_text,
disclaimer="This AI response is for informational purposes only. "
"Please use clinical judgment and consult appropriate "
"medical resources for patient care decisions.",
processing_mode=self.processing_mode,
)
def verify_deidentified(self, text: str) -> bool:
"""Verify text doesn't contain identifiable PHI"""
# Check for common PHI patterns
phi_patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{2}/\d{2}/\d{4}\b', # DOB
r'\bMRN[:\s]*\d+\b', # MRN
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', # Email
r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b', # Phone
]
for pattern in phi_patterns:
if re.search(pattern, text, re.IGNORECASE):
return False
return True
def _get_medical_system_prompt(self) -> str:
return """You are a medical AI assistant operating in a HIPAA-compliant
healthcare environment. You are assisting licensed healthcare providers.
GUIDELINES:
1. Provide evidence-based medical information
2. Reference clinical guidelines when applicable
3. Highlight important safety considerations
4. Suggest relevant differential diagnoses when appropriate
5. Recommend appropriate follow-up or specialist referrals
6. Always defer final clinical decisions to the healthcare provider
IMPORTANT: This is a clinical decision support tool. All recommendations
should be verified against current medical literature and clinical judgment.
"""
Docker Deployment
Docker Compose
Copy
# docker-compose.yml
version: '3.8'
services:
api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://hipaa:${DB_PASSWORD}@db:5432/healthcare
- REDIS_URL=redis://:${REDIS_PASSWORD}@redis:6379
- VAULT_ADDR=http://vault:8200
depends_on:
- db
- redis
- vault
volumes:
- ./secrets:/secrets:ro
networks:
- hipaa-network
deploy:
resources:
limits:
memory: 2G
db:
image: postgres:15
environment:
POSTGRES_USER: hipaa
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_DB: healthcare
volumes:
- postgres-data:/var/lib/postgresql/data
- ./init-db.sql:/docker-entrypoint-initdb.d/init.sql
networks:
- hipaa-network
# Enable TDE
command: >
postgres
-c ssl=on
-c ssl_cert_file=/var/lib/postgresql/server.crt
-c ssl_key_file=/var/lib/postgresql/server.key
redis:
image: redis:7-alpine
command: redis-server --requirepass ${REDIS_PASSWORD} --appendonly yes
volumes:
- redis-data:/data
networks:
- hipaa-network
vault:
image: hashicorp/vault:latest
environment:
VAULT_DEV_ROOT_TOKEN_ID: ${VAULT_TOKEN}
cap_add:
- IPC_LOCK
volumes:
- vault-data:/vault/data
networks:
- hipaa-network
llm:
image: vllm/vllm-openai:latest
runtime: nvidia
environment:
MODEL_NAME: /models/medical-llm
volumes:
- ./models:/models:ro
networks:
- hipaa-network
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
networks:
hipaa-network:
driver: bridge
volumes:
postgres-data:
redis-data:
vault-data:
Dockerfile
Copy
# Dockerfile
FROM python:3.11-slim as builder
WORKDIR /app
# Install build dependencies
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt
# Production stage
FROM python:3.11-slim
WORKDIR /app
# Create non-root user
RUN groupadd -r hipaa && useradd -r -g hipaa hipaa
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
libpq5 \
&& rm -rf /var/lib/apt/lists/*
# Copy wheels and install
COPY --from=builder /app/wheels /wheels
RUN pip install --no-cache /wheels/*
# Copy application
COPY app /app/app
# Set ownership
RUN chown -R hipaa:hipaa /app
USER hipaa
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Key Takeaways
Defense in Depth
Multiple security layers: TLS, encryption, RBAC, audit
Encrypt at Field Level
Don’t just encrypt the database; encrypt sensitive fields
Audit Everything
Every PHI access must be logged without logging PHI content
AI Stays On-Premise
Keep LLM processing within your infrastructure