Access Control Systems for Healthcare
Access control is the gatekeeper of PHI. HIPAA requires that only authorized individuals access PHI, and only for legitimate purposes. This module covers everything from basic RBAC to sophisticated attribute-based policies and emergency access procedures.Learning Objectives:
Hands-On Labs: 4 practical implementations
Prerequisites: HIPAA Fundamentals, Risk Assessment
- Implement Role-Based Access Control (RBAC) for healthcare
- Design Attribute-Based Access Control (ABAC) policies
- Build break-glass emergency access procedures
- Integrate Multi-Factor Authentication (MFA)
- Implement automatic session management
- Create audit-ready access control documentation
Hands-On Labs: 4 practical implementations
Prerequisites: HIPAA Fundamentals, Risk Assessment
HIPAA Access Control Requirements
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ HIPAA ACCESS CONTROL REQUIREMENTS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ §164.312(a)(1) - ACCESS CONTROL (Required) │
│ ────────────────────────────────────────── │
│ "Implement technical policies and procedures for electronic information │
│ systems that maintain ePHI to allow access only to those persons or │
│ software programs that have been granted access rights." │
│ │
│ Implementation Specifications: │
│ │
│ ┌─────────────────────────┬──────────┬─────────────────────────────────┐ │
│ │ Specification │ Status │ Description │ │
│ ├─────────────────────────┼──────────┼─────────────────────────────────┤ │
│ │ Unique User ID │ Required │ Each user has unique identifier │ │
│ │ Emergency Access │ Required │ Procedures for emergencies │ │
│ │ Automatic Logoff │ Addressable│ Session timeout │ │
│ │ Encryption/Decryption │ Addressable│ Mechanism for encryption │ │
│ └─────────────────────────┴──────────┴─────────────────────────────────┘ │
│ │
│ §164.312(d) - PERSON OR ENTITY AUTHENTICATION (Required) │
│ ──────────────────────────────────────────────────────── │
│ "Implement procedures to verify that a person or entity seeking access │
│ to ePHI is the one claimed." │
│ │
│ §164.308(a)(4) - INFORMATION ACCESS MANAGEMENT (Required) │
│ ────────────────────────────────────────────────────────── │
│ "Implement policies and procedures for authorizing access to ePHI" │
│ │
│ Implementation Specifications: │
│ • Access authorization │
│ • Access establishment and modification │
│ • Workforce clearance procedures │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
The Principle of Least Privilege
Before diving into implementation, understand the core principle:Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ PRINCIPLE OF LEAST PRIVILEGE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ "Every user should have only the minimum access necessary to │
│ perform their job functions—nothing more, nothing less." │
│ │
│ Healthcare Examples: │
│ ─────────────────── │
│ │
│ ┌─────────────┐ ┌─────────────────────────────────────────────────┐ │
│ │ Nurse │───▶│ • View patients on assigned unit │ │
│ │ │ │ • Document vital signs, assessments │ │
│ │ │ │ • View (not edit) physician orders │ │
│ │ │ │ ✗ Cannot access billing information │ │
│ │ │ │ ✗ Cannot access other units' patients │ │
│ └─────────────┘ └─────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────────────────────────────────────────┐ │
│ │ Physician │───▶│ • Full access to own patients │ │
│ │ │ │ • Write orders, prescriptions │ │
│ │ │ │ • Consult on other physicians' patients │ │
│ │ │ │ ✗ Cannot modify another physician's notes │ │
│ │ │ │ ✗ Cannot access administrative functions │ │
│ └─────────────┘ └─────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────────────────────────────────────────┐ │
│ │ Billing │───▶│ • Access billing/insurance information │ │
│ │ Staff │ │ • View diagnosis codes for billing │ │
│ │ │ │ ✗ Cannot view clinical notes │ │
│ │ │ │ ✗ Cannot view lab results │ │
│ └─────────────┘ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Role-Based Access Control (RBAC)
RBAC Architecture
Copy
from dataclasses import dataclass, field
from typing import List, Set, Optional, Dict
from enum import Enum
from datetime import datetime, timedelta
import uuid
class Permission(Enum):
"""Granular permissions for healthcare systems"""
# Patient Records
PATIENT_VIEW = "patient:view"
PATIENT_CREATE = "patient:create"
PATIENT_UPDATE = "patient:update"
PATIENT_DELETE = "patient:delete"
# Clinical Notes
NOTES_VIEW = "notes:view"
NOTES_CREATE = "notes:create"
NOTES_UPDATE_OWN = "notes:update:own"
NOTES_UPDATE_ANY = "notes:update:any"
NOTES_DELETE = "notes:delete"
# Orders
ORDERS_VIEW = "orders:view"
ORDERS_CREATE = "orders:create"
ORDERS_SIGN = "orders:sign"
ORDERS_CANCEL = "orders:cancel"
# Prescriptions
RX_VIEW = "rx:view"
RX_PRESCRIBE = "rx:prescribe"
RX_DISPENSE = "rx:dispense"
# Lab Results
LABS_VIEW = "labs:view"
LABS_ORDER = "labs:order"
LABS_RESULT = "labs:result"
# Billing
BILLING_VIEW = "billing:view"
BILLING_CREATE = "billing:create"
BILLING_MODIFY = "billing:modify"
# Administration
ADMIN_USERS = "admin:users"
ADMIN_ROLES = "admin:roles"
ADMIN_AUDIT = "admin:audit"
ADMIN_SYSTEM = "admin:system"
# PHI Export
PHI_EXPORT = "phi:export"
PHI_BULK_ACCESS = "phi:bulk_access"
# Emergency
BREAK_GLASS = "emergency:break_glass"
@dataclass
class Role:
"""Healthcare role with permissions"""
role_id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
permissions: Set[Permission] = field(default_factory=set)
# Role hierarchy
parent_role: Optional[str] = None # Inherits from parent
# Constraints
max_patients: Optional[int] = None # Max concurrent patients
allowed_departments: List[str] = field(default_factory=list)
allowed_facilities: List[str] = field(default_factory=list)
# Temporal
time_restrictions: Optional[Dict] = None # e.g., {"start": "08:00", "end": "18:00"}
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
def has_permission(self, permission: Permission) -> bool:
return permission in self.permissions
# Define standard healthcare roles
HEALTHCARE_ROLES = {
"physician": Role(
name="Physician",
description="Licensed physician with full clinical access",
permissions={
Permission.PATIENT_VIEW,
Permission.PATIENT_CREATE,
Permission.PATIENT_UPDATE,
Permission.NOTES_VIEW,
Permission.NOTES_CREATE,
Permission.NOTES_UPDATE_OWN,
Permission.ORDERS_VIEW,
Permission.ORDERS_CREATE,
Permission.ORDERS_SIGN,
Permission.RX_VIEW,
Permission.RX_PRESCRIBE,
Permission.LABS_VIEW,
Permission.LABS_ORDER,
Permission.BREAK_GLASS,
},
),
"nurse": Role(
name="Registered Nurse",
description="RN with documentation and viewing access",
permissions={
Permission.PATIENT_VIEW,
Permission.PATIENT_UPDATE,
Permission.NOTES_VIEW,
Permission.NOTES_CREATE,
Permission.NOTES_UPDATE_OWN,
Permission.ORDERS_VIEW,
Permission.RX_VIEW,
Permission.LABS_VIEW,
Permission.BREAK_GLASS,
},
),
"medical_assistant": Role(
name="Medical Assistant",
description="MA with limited clinical access",
permissions={
Permission.PATIENT_VIEW,
Permission.NOTES_VIEW,
Permission.NOTES_CREATE,
Permission.ORDERS_VIEW,
Permission.LABS_VIEW,
},
),
"billing_specialist": Role(
name="Billing Specialist",
description="Access to billing and minimal clinical info",
permissions={
Permission.PATIENT_VIEW, # Demographics only
Permission.BILLING_VIEW,
Permission.BILLING_CREATE,
Permission.BILLING_MODIFY,
},
),
"admin": Role(
name="System Administrator",
description="System administration without clinical access",
permissions={
Permission.ADMIN_USERS,
Permission.ADMIN_ROLES,
Permission.ADMIN_AUDIT,
Permission.ADMIN_SYSTEM,
},
),
"compliance_officer": Role(
name="Compliance Officer",
description="Audit and compliance oversight",
permissions={
Permission.ADMIN_AUDIT,
Permission.PHI_EXPORT, # For audit purposes
},
),
}
@dataclass
class User:
"""User with role assignments"""
user_id: str = field(default_factory=lambda: str(uuid.uuid4()))
username: str = ""
email: str = ""
# Authentication
password_hash: str = ""
mfa_enabled: bool = False
mfa_secret: Optional[str] = None
# Role assignments
roles: List[str] = field(default_factory=list)
# Department/facility constraints
department: str = ""
facility: str = ""
# Status
is_active: bool = True
last_login: Optional[datetime] = None
failed_login_attempts: int = 0
locked_until: Optional[datetime] = None
# Audit
created_at: datetime = field(default_factory=datetime.utcnow)
created_by: str = ""
def get_all_permissions(self, roles_db: Dict[str, Role]) -> Set[Permission]:
"""Get all permissions from all assigned roles"""
permissions = set()
for role_name in self.roles:
if role_name in roles_db:
role = roles_db[role_name]
permissions.update(role.permissions)
# Handle role hierarchy
if role.parent_role and role.parent_role in roles_db:
parent = roles_db[role.parent_role]
permissions.update(parent.permissions)
return permissions
def has_permission(
self,
permission: Permission,
roles_db: Dict[str, Role]
) -> bool:
"""Check if user has specific permission"""
return permission in self.get_all_permissions(roles_db)
class RBACService:
"""RBAC enforcement service"""
def __init__(self):
self.roles: Dict[str, Role] = HEALTHCARE_ROLES.copy()
self.users: Dict[str, User] = {}
def check_access(
self,
user_id: str,
permission: Permission,
resource: Optional[Dict] = None,
) -> tuple[bool, str]:
"""
Check if user has access to perform action
Returns:
Tuple of (allowed: bool, reason: str)
"""
user = self.users.get(user_id)
if not user:
return False, "User not found"
if not user.is_active:
return False, "User account is inactive"
if user.locked_until and user.locked_until > datetime.utcnow():
return False, "User account is locked"
# Check permission
if not user.has_permission(permission, self.roles):
return False, f"User lacks permission: {permission.value}"
# Check department/facility constraints if resource provided
if resource:
resource_dept = resource.get("department")
resource_facility = resource.get("facility")
# Get user's role constraints
for role_name in user.roles:
role = self.roles.get(role_name)
if role:
if role.allowed_departments:
if resource_dept not in role.allowed_departments:
return False, f"Access denied: wrong department"
if role.allowed_facilities:
if resource_facility not in role.allowed_facilities:
return False, f"Access denied: wrong facility"
return True, "Access granted"
def assign_role(
self,
user_id: str,
role_name: str,
assigned_by: str,
) -> bool:
"""Assign role to user with audit"""
user = self.users.get(user_id)
if not user:
return False
if role_name not in self.roles:
return False
if role_name not in user.roles:
user.roles.append(role_name)
# Log role assignment for audit
self._log_role_change(user_id, role_name, "assigned", assigned_by)
return True
def revoke_role(
self,
user_id: str,
role_name: str,
revoked_by: str,
) -> bool:
"""Revoke role from user with audit"""
user = self.users.get(user_id)
if not user:
return False
if role_name in user.roles:
user.roles.remove(role_name)
self._log_role_change(user_id, role_name, "revoked", revoked_by)
return True
def _log_role_change(
self,
user_id: str,
role_name: str,
action: str,
changed_by: str,
):
"""Log role changes for HIPAA audit"""
# In production, this would go to audit log system
print(f"AUDIT: Role {role_name} {action} for user {user_id} by {changed_by}")
Attribute-Based Access Control (ABAC)
ABAC provides more granular control by evaluating attributes of the user, resource, and context:Copy
from dataclasses import dataclass
from typing import Dict, Any, List, Callable
from enum import Enum
from datetime import datetime, time
import json
class PolicyEffect(Enum):
ALLOW = "allow"
DENY = "deny"
@dataclass
class PolicyCondition:
"""Single condition in a policy"""
attribute: str # e.g., "user.department", "resource.sensitivity"
operator: str # e.g., "equals", "in", "greater_than"
value: Any
def evaluate(self, context: Dict[str, Any]) -> bool:
"""Evaluate this condition against context"""
# Navigate to attribute value
attr_value = self._get_attribute(context, self.attribute)
if self.operator == "equals":
return attr_value == self.value
elif self.operator == "not_equals":
return attr_value != self.value
elif self.operator == "in":
return attr_value in self.value
elif self.operator == "not_in":
return attr_value not in self.value
elif self.operator == "contains":
return self.value in attr_value
elif self.operator == "greater_than":
return attr_value > self.value
elif self.operator == "less_than":
return attr_value < self.value
elif self.operator == "is_true":
return bool(attr_value)
elif self.operator == "is_false":
return not bool(attr_value)
return False
def _get_attribute(self, context: Dict, path: str) -> Any:
"""Navigate nested dict using dot notation"""
parts = path.split(".")
value = context
for part in parts:
if isinstance(value, dict):
value = value.get(part)
else:
return None
return value
@dataclass
class AccessPolicy:
"""ABAC policy definition"""
policy_id: str
name: str
description: str
effect: PolicyEffect
# What this policy applies to
resource_types: List[str] # e.g., ["patient_record", "lab_result"]
actions: List[str] # e.g., ["read", "write", "delete"]
# Conditions that must be met
conditions: List[PolicyCondition]
# Priority (lower = higher priority)
priority: int = 100
def evaluate(self, context: Dict[str, Any]) -> tuple[bool, PolicyEffect]:
"""
Evaluate policy against context
Returns:
Tuple of (matches: bool, effect: PolicyEffect)
"""
# Check if policy applies to this resource/action
resource_type = context.get("resource", {}).get("type")
action = context.get("action")
if resource_type not in self.resource_types:
return False, None
if action not in self.actions:
return False, None
# Evaluate all conditions (AND logic)
for condition in self.conditions:
if not condition.evaluate(context):
return False, None
return True, self.effect
class ABACEngine:
"""Attribute-Based Access Control Engine"""
def __init__(self):
self.policies: List[AccessPolicy] = []
self._setup_healthcare_policies()
def _setup_healthcare_policies(self):
"""Define healthcare-specific ABAC policies"""
# Policy 1: Treatment team can access their patients
self.policies.append(AccessPolicy(
policy_id="P-001",
name="Treatment Team Access",
description="Allow access to patients on user's treatment team",
effect=PolicyEffect.ALLOW,
resource_types=["patient_record", "clinical_note", "lab_result"],
actions=["read", "write"],
conditions=[
PolicyCondition(
attribute="user.user_id",
operator="in",
value="resource.treatment_team",
),
],
priority=10,
))
# Policy 2: Same department access
self.policies.append(AccessPolicy(
policy_id="P-002",
name="Department Access",
description="Allow access to patients in same department",
effect=PolicyEffect.ALLOW,
resource_types=["patient_record", "clinical_note"],
actions=["read"],
conditions=[
PolicyCondition(
attribute="user.department",
operator="equals",
value="resource.department",
),
PolicyCondition(
attribute="user.roles",
operator="contains",
value="clinical_staff",
),
],
priority=20,
))
# Policy 3: Deny access to VIP patients without explicit permission
self.policies.append(AccessPolicy(
policy_id="P-003",
name="VIP Patient Protection",
description="Require explicit VIP access for VIP patients",
effect=PolicyEffect.DENY,
resource_types=["patient_record", "clinical_note", "lab_result"],
actions=["read", "write", "delete"],
conditions=[
PolicyCondition(
attribute="resource.is_vip",
operator="is_true",
value=None,
),
PolicyCondition(
attribute="user.has_vip_access",
operator="is_false",
value=None,
),
],
priority=5, # High priority - evaluate first
))
# Policy 4: Time-based access restriction
self.policies.append(AccessPolicy(
policy_id="P-004",
name="Business Hours Only",
description="Deny access outside business hours for non-clinical staff",
effect=PolicyEffect.DENY,
resource_types=["patient_record"],
actions=["read", "write"],
conditions=[
PolicyCondition(
attribute="context.is_business_hours",
operator="is_false",
value=None,
),
PolicyCondition(
attribute="user.roles",
operator="not_in",
value=["physician", "nurse", "on_call"],
),
],
priority=15,
))
# Policy 5: Minimum necessary for billing
self.policies.append(AccessPolicy(
policy_id="P-005",
name="Billing Minimum Necessary",
description="Limit billing staff to billing-relevant fields only",
effect=PolicyEffect.DENY,
resource_types=["clinical_note", "mental_health_note", "substance_abuse"],
actions=["read"],
conditions=[
PolicyCondition(
attribute="user.primary_role",
operator="equals",
value="billing_specialist",
),
],
priority=10,
))
# Policy 6: Break-glass override
self.policies.append(AccessPolicy(
policy_id="P-006",
name="Break Glass Override",
description="Allow emergency access with break-glass",
effect=PolicyEffect.ALLOW,
resource_types=["patient_record", "clinical_note", "lab_result", "prescription"],
actions=["read"],
conditions=[
PolicyCondition(
attribute="context.break_glass_active",
operator="is_true",
value=None,
),
PolicyCondition(
attribute="user.can_break_glass",
operator="is_true",
value=None,
),
],
priority=1, # Highest priority
))
def check_access(
self,
user: Dict[str, Any],
resource: Dict[str, Any],
action: str,
context: Optional[Dict[str, Any]] = None,
) -> tuple[bool, str, List[str]]:
"""
Check if access should be granted
Returns:
Tuple of (allowed: bool, reason: str, matched_policies: List[str])
"""
full_context = {
"user": user,
"resource": resource,
"action": action,
"context": context or {},
}
# Add computed context
full_context["context"]["is_business_hours"] = self._is_business_hours()
full_context["context"]["current_time"] = datetime.utcnow().isoformat()
# Sort policies by priority
sorted_policies = sorted(self.policies, key=lambda p: p.priority)
matched_policies = []
for policy in sorted_policies:
matches, effect = policy.evaluate(full_context)
if matches:
matched_policies.append(policy.policy_id)
if effect == PolicyEffect.DENY:
return False, f"Denied by policy: {policy.name}", matched_policies
elif effect == PolicyEffect.ALLOW:
# Continue checking for explicit denies
pass
# If any ALLOW matched and no DENY, grant access
allow_policies = [p for p in matched_policies
if any(pol.policy_id == p and pol.effect == PolicyEffect.ALLOW
for pol in self.policies)]
if allow_policies:
return True, "Access granted", matched_policies
# Default deny
return False, "No matching allow policy", []
def _is_business_hours(self) -> bool:
"""Check if current time is within business hours"""
now = datetime.utcnow().time()
start = time(8, 0) # 8 AM
end = time(18, 0) # 6 PM
return start <= now <= end
# Example usage
abac = ABACEngine()
user = {
"user_id": "dr-smith-123",
"department": "cardiology",
"roles": ["physician", "clinical_staff"],
"primary_role": "physician",
"has_vip_access": False,
"can_break_glass": True,
}
resource = {
"type": "patient_record",
"patient_id": "patient-456",
"department": "cardiology",
"treatment_team": ["dr-smith-123", "nurse-jones-789"],
"is_vip": False,
}
allowed, reason, policies = abac.check_access(
user=user,
resource=resource,
action="read",
)
print(f"Access: {allowed}, Reason: {reason}")
Break-Glass Emergency Access
Break-glass (also called “break-the-glass”) provides emergency access when normal access controls would prevent critical patient care:Copy
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, List
from enum import Enum
import uuid
import hashlib
class BreakGlassReason(Enum):
"""Pre-defined reasons for break-glass access"""
MEDICAL_EMERGENCY = "medical_emergency"
PATIENT_UNRESPONSIVE = "patient_unresponsive"
TRAUMA = "trauma"
CODE_BLUE = "code_blue"
TREATMENT_CONTINUITY = "treatment_continuity"
DISASTER_RESPONSE = "disaster_response"
OTHER = "other"
@dataclass
class BreakGlassSession:
"""Active break-glass session"""
session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
user_id: str = ""
patient_id: str = ""
# Reason
reason: BreakGlassReason = BreakGlassReason.MEDICAL_EMERGENCY
reason_detail: str = "" # Free text for OTHER
# Timing
started_at: datetime = field(default_factory=datetime.utcnow)
expires_at: datetime = field(default_factory=lambda: datetime.utcnow() + timedelta(hours=4))
ended_at: Optional[datetime] = None
# Context
location: str = ""
ip_address: str = ""
device_id: str = ""
# Review status
reviewed: bool = False
reviewed_by: Optional[str] = None
reviewed_at: Optional[datetime] = None
review_outcome: Optional[str] = None # "appropriate", "inappropriate", "under_investigation"
# Audit
resources_accessed: List[str] = field(default_factory=list)
@property
def is_active(self) -> bool:
if self.ended_at:
return False
return datetime.utcnow() < self.expires_at
class BreakGlassService:
"""
Break-glass emergency access management
HIPAA §164.312(a)(2)(ii) requires emergency access procedures.
"""
def __init__(self):
self.active_sessions: Dict[str, BreakGlassSession] = {}
self.session_history: List[BreakGlassSession] = []
# Configuration
self.max_duration = timedelta(hours=4)
self.extension_duration = timedelta(hours=2)
self.require_review_within = timedelta(hours=24)
# Allowed roles for break-glass
self.allowed_roles = {"physician", "nurse", "paramedic", "emergency_staff"}
def initiate_break_glass(
self,
user_id: str,
user_roles: List[str],
patient_id: str,
reason: BreakGlassReason,
reason_detail: str = "",
location: str = "",
ip_address: str = "",
) -> tuple[bool, str, Optional[BreakGlassSession]]:
"""
Initiate break-glass access
Returns:
Tuple of (success: bool, message: str, session: Optional[BreakGlassSession])
"""
# Verify user is allowed to break glass
if not any(role in self.allowed_roles for role in user_roles):
return False, "User role not authorized for break-glass access", None
# Check for existing session
existing = self._get_active_session(user_id, patient_id)
if existing:
return False, "Break-glass session already active", existing
# Create session
session = BreakGlassSession(
user_id=user_id,
patient_id=patient_id,
reason=reason,
reason_detail=reason_detail,
location=location,
ip_address=ip_address,
expires_at=datetime.utcnow() + self.max_duration,
)
self.active_sessions[session.session_id] = session
# Send alerts
self._send_break_glass_alert(session)
# Log for audit
self._log_break_glass_event(session, "initiated")
return True, "Break-glass access granted", session
def end_break_glass(
self,
session_id: str,
user_id: str,
) -> tuple[bool, str]:
"""End a break-glass session"""
session = self.active_sessions.get(session_id)
if not session:
return False, "Session not found"
if session.user_id != user_id:
return False, "Only session owner can end session"
session.ended_at = datetime.utcnow()
# Move to history
del self.active_sessions[session_id]
self.session_history.append(session)
# Log
self._log_break_glass_event(session, "ended")
# Queue for review
self._queue_for_review(session)
return True, "Break-glass session ended"
def check_break_glass_access(
self,
user_id: str,
patient_id: str,
) -> Optional[BreakGlassSession]:
"""Check if user has active break-glass access to patient"""
return self._get_active_session(user_id, patient_id)
def log_resource_access(
self,
session_id: str,
resource_type: str,
resource_id: str,
):
"""Log resource accessed during break-glass"""
session = self.active_sessions.get(session_id)
if session:
session.resources_accessed.append(f"{resource_type}:{resource_id}")
def review_session(
self,
session_id: str,
reviewer_id: str,
outcome: str,
notes: str = "",
) -> tuple[bool, str]:
"""Review a break-glass session"""
# Find session in history
session = next(
(s for s in self.session_history if s.session_id == session_id),
None
)
if not session:
return False, "Session not found"
if session.reviewed:
return False, "Session already reviewed"
session.reviewed = True
session.reviewed_by = reviewer_id
session.reviewed_at = datetime.utcnow()
session.review_outcome = outcome
# Log review
self._log_break_glass_event(session, f"reviewed:{outcome}")
# If inappropriate, escalate
if outcome == "inappropriate":
self._escalate_inappropriate_access(session, notes)
return True, "Review recorded"
def get_pending_reviews(self) -> List[BreakGlassSession]:
"""Get sessions pending review"""
return [
s for s in self.session_history
if not s.reviewed
]
def _get_active_session(
self,
user_id: str,
patient_id: str,
) -> Optional[BreakGlassSession]:
"""Find active session for user/patient combo"""
for session in self.active_sessions.values():
if (session.user_id == user_id and
session.patient_id == patient_id and
session.is_active):
return session
return None
def _send_break_glass_alert(self, session: BreakGlassSession):
"""Send real-time alert for break-glass access"""
alert = {
"type": "break_glass_access",
"severity": "high",
"session_id": session.session_id,
"user_id": session.user_id,
"patient_id": session.patient_id,
"reason": session.reason.value,
"timestamp": session.started_at.isoformat(),
"location": session.location,
}
# In production: Send to security team, compliance officer
print(f"ALERT: Break-glass access initiated: {alert}")
def _log_break_glass_event(
self,
session: BreakGlassSession,
event: str,
):
"""Log break-glass event for audit trail"""
# In production: Send to immutable audit log
log_entry = {
"event_type": f"break_glass.{event}",
"session_id": session.session_id,
"user_id": session.user_id,
"patient_id": session.patient_id,
"reason": session.reason.value,
"timestamp": datetime.utcnow().isoformat(),
}
print(f"AUDIT: {log_entry}")
def _queue_for_review(self, session: BreakGlassSession):
"""Queue session for compliance review"""
# In production: Create ticket in compliance system
print(f"REVIEW QUEUE: Session {session.session_id} queued for review")
def _escalate_inappropriate_access(
self,
session: BreakGlassSession,
notes: str,
):
"""Escalate inappropriate access for investigation"""
escalation = {
"type": "security_incident",
"category": "inappropriate_phi_access",
"session_id": session.session_id,
"user_id": session.user_id,
"patient_id": session.patient_id,
"notes": notes,
}
# In production: Create incident ticket, notify security team
print(f"ESCALATION: {escalation}")
Multi-Factor Authentication (MFA)
Copy
import pyotp
import secrets
import hashlib
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, List
from enum import Enum
class MFAType(Enum):
TOTP = "totp" # Time-based one-time password
SMS = "sms" # SMS code
EMAIL = "email" # Email code
PUSH = "push" # Push notification
HARDWARE = "hardware" # Hardware token (FIDO2/WebAuthn)
@dataclass
class MFAEnrollment:
"""User MFA enrollment"""
user_id: str
mfa_type: MFAType
secret: str # Encrypted in production
backup_codes: List[str] # Hashed backup codes
enrolled_at: datetime
last_used: Optional[datetime] = None
class MFAService:
"""
Multi-Factor Authentication for HIPAA compliance
HIPAA doesn't explicitly require MFA, but it's considered
a best practice for the "Person or Entity Authentication"
requirement (§164.312(d)).
"""
def __init__(self):
self.enrollments: Dict[str, MFAEnrollment] = {}
self.pending_challenges: Dict[str, dict] = {}
def enroll_totp(self, user_id: str, user_email: str) -> dict:
"""
Enroll user in TOTP-based MFA
Returns provisioning data for authenticator app
"""
# Generate secret
secret = pyotp.random_base32()
# Generate backup codes
backup_codes = [secrets.token_hex(4) for _ in range(10)]
backup_codes_hashed = [
hashlib.sha256(code.encode()).hexdigest()
for code in backup_codes
]
# Create enrollment
enrollment = MFAEnrollment(
user_id=user_id,
mfa_type=MFAType.TOTP,
secret=secret,
backup_codes=backup_codes_hashed,
enrolled_at=datetime.utcnow(),
)
# Store (encrypt secret in production!)
self.enrollments[user_id] = enrollment
# Generate provisioning URI
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=user_email,
issuer_name="HIPAA Healthcare",
)
return {
"secret": secret,
"provisioning_uri": provisioning_uri,
"backup_codes": backup_codes, # Show once, never again
}
def verify_totp(self, user_id: str, code: str) -> bool:
"""Verify TOTP code"""
enrollment = self.enrollments.get(user_id)
if not enrollment or enrollment.mfa_type != MFAType.TOTP:
return False
totp = pyotp.TOTP(enrollment.secret)
# Allow 1 time step tolerance for clock drift
if totp.verify(code, valid_window=1):
enrollment.last_used = datetime.utcnow()
return True
# Check backup codes
code_hash = hashlib.sha256(code.encode()).hexdigest()
if code_hash in enrollment.backup_codes:
enrollment.backup_codes.remove(code_hash)
enrollment.last_used = datetime.utcnow()
return True
return False
def send_sms_challenge(self, user_id: str, phone_number: str) -> str:
"""Send SMS challenge code"""
code = secrets.token_hex(3) # 6-char code
self.pending_challenges[user_id] = {
"code_hash": hashlib.sha256(code.encode()).hexdigest(),
"expires_at": datetime.utcnow() + timedelta(minutes=5),
"attempts": 0,
}
# In production: Send via SMS provider
# sms_client.send(phone_number, f"Your verification code: {code}")
return code # Return for testing; don't return in production
def verify_sms_challenge(self, user_id: str, code: str) -> bool:
"""Verify SMS challenge code"""
challenge = self.pending_challenges.get(user_id)
if not challenge:
return False
if datetime.utcnow() > challenge["expires_at"]:
del self.pending_challenges[user_id]
return False
challenge["attempts"] += 1
if challenge["attempts"] > 3:
del self.pending_challenges[user_id]
return False
code_hash = hashlib.sha256(code.encode()).hexdigest()
if code_hash == challenge["code_hash"]:
del self.pending_challenges[user_id]
return True
return False
def require_mfa_for_action(
self,
user_id: str,
action: str,
) -> bool:
"""Determine if action requires MFA re-verification"""
# High-risk actions requiring MFA
high_risk_actions = {
"phi_export",
"bulk_access",
"password_change",
"role_assignment",
"break_glass",
"patient_delete",
}
return action in high_risk_actions
Session Management
Copy
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, List
import secrets
import hashlib
import json
@dataclass
class Session:
"""User session with HIPAA-compliant timeouts"""
session_id: str = field(default_factory=lambda: secrets.token_urlsafe(32))
user_id: str = ""
# Timing
created_at: datetime = field(default_factory=datetime.utcnow)
last_activity: datetime = field(default_factory=datetime.utcnow)
expires_at: datetime = field(default_factory=lambda: datetime.utcnow() + timedelta(hours=8))
# Security
ip_address: str = ""
user_agent: str = ""
device_fingerprint: str = ""
# State
mfa_verified: bool = False
mfa_verified_at: Optional[datetime] = None
# Elevation
elevated: bool = False
elevated_at: Optional[datetime] = None
elevated_reason: str = ""
# Tracking
resources_accessed: int = 0
@property
def is_valid(self) -> bool:
return datetime.utcnow() < self.expires_at
@property
def is_idle_timeout(self) -> bool:
# HIPAA recommends 15 minutes idle timeout
idle_limit = timedelta(minutes=15)
return (datetime.utcnow() - self.last_activity) > idle_limit
@property
def needs_mfa_refresh(self) -> bool:
# Require MFA re-verification every 4 hours
if not self.mfa_verified_at:
return True
mfa_validity = timedelta(hours=4)
return (datetime.utcnow() - self.mfa_verified_at) > mfa_validity
class SessionManager:
"""
HIPAA-compliant session management
Implements:
- Automatic logoff (§164.312(a)(2)(iii))
- Session timeout
- Concurrent session limits
- Session binding
"""
def __init__(self):
self.sessions: Dict[str, Session] = {}
# Configuration
self.idle_timeout = timedelta(minutes=15) # HIPAA recommendation
self.absolute_timeout = timedelta(hours=8)
self.max_concurrent_sessions = 3
self.require_session_binding = True
def create_session(
self,
user_id: str,
ip_address: str,
user_agent: str,
mfa_verified: bool = False,
) -> Session:
"""Create new session after authentication"""
# Check concurrent session limit
user_sessions = self.get_user_sessions(user_id)
if len(user_sessions) >= self.max_concurrent_sessions:
# Terminate oldest session
oldest = min(user_sessions, key=lambda s: s.created_at)
self.terminate_session(oldest.session_id, "max_sessions_exceeded")
session = Session(
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
device_fingerprint=self._compute_fingerprint(ip_address, user_agent),
mfa_verified=mfa_verified,
mfa_verified_at=datetime.utcnow() if mfa_verified else None,
expires_at=datetime.utcnow() + self.absolute_timeout,
)
self.sessions[session.session_id] = session
# Log session creation
self._log_session_event(session, "created")
return session
def validate_session(
self,
session_id: str,
ip_address: str,
user_agent: str,
) -> tuple[bool, str, Optional[Session]]:
"""
Validate session for request
Returns:
Tuple of (valid: bool, message: str, session: Optional[Session])
"""
session = self.sessions.get(session_id)
if not session:
return False, "Session not found", None
if not session.is_valid:
self.terminate_session(session_id, "expired")
return False, "Session expired", None
if session.is_idle_timeout:
self.terminate_session(session_id, "idle_timeout")
return False, "Session timed out due to inactivity", None
# Session binding validation
if self.require_session_binding:
current_fingerprint = self._compute_fingerprint(ip_address, user_agent)
if current_fingerprint != session.device_fingerprint:
self.terminate_session(session_id, "session_binding_violation")
self._log_security_event(session, "session_hijack_attempt")
return False, "Session binding violation", None
# Update activity
session.last_activity = datetime.utcnow()
session.resources_accessed += 1
return True, "Valid", session
def terminate_session(
self,
session_id: str,
reason: str,
) -> bool:
"""Terminate session"""
session = self.sessions.get(session_id)
if not session:
return False
self._log_session_event(session, f"terminated:{reason}")
del self.sessions[session_id]
return True
def terminate_all_user_sessions(
self,
user_id: str,
reason: str,
):
"""Terminate all sessions for a user (password change, security event)"""
user_sessions = self.get_user_sessions(user_id)
for session in user_sessions:
self.terminate_session(session.session_id, reason)
def get_user_sessions(self, user_id: str) -> List[Session]:
"""Get all active sessions for user"""
return [s for s in self.sessions.values() if s.user_id == user_id]
def elevate_session(
self,
session_id: str,
reason: str,
) -> bool:
"""Elevate session privileges (after MFA re-verification)"""
session = self.sessions.get(session_id)
if not session:
return False
session.elevated = True
session.elevated_at = datetime.utcnow()
session.elevated_reason = reason
self._log_session_event(session, f"elevated:{reason}")
return True
def _compute_fingerprint(self, ip_address: str, user_agent: str) -> str:
"""Compute device fingerprint for session binding"""
data = f"{ip_address}:{user_agent}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
def _log_session_event(self, session: Session, event: str):
"""Log session event for audit"""
log_entry = {
"event_type": f"session.{event}",
"session_id": session.session_id[:8] + "...", # Partial for privacy
"user_id": session.user_id,
"ip_address": session.ip_address,
"timestamp": datetime.utcnow().isoformat(),
}
print(f"AUDIT: {log_entry}")
def _log_security_event(self, session: Session, event: str):
"""Log security event for SIEM"""
security_event = {
"event_type": f"security.{event}",
"severity": "high",
"session_id": session.session_id,
"user_id": session.user_id,
"ip_address": session.ip_address,
"timestamp": datetime.utcnow().isoformat(),
}
print(f"SECURITY ALERT: {security_event}")
Hands-On Lab: Implement Access Control
1
Define Your Role Hierarchy
Create at least 5 roles for your healthcare application:
- Define permissions for each role
- Implement role inheritance where appropriate
- Document role assignment procedures
2
Implement ABAC Policies
Create policies for:
- Treatment team access
- Department-based access
- VIP patient protection
- Time-based restrictions
- Break-glass override
3
Build Break-Glass System
Implement:
- Break-glass initiation with reason
- Automatic expiration
- Real-time alerting
- Post-access review workflow
4
Session Management
Implement:
- 15-minute idle timeout
- Session binding
- MFA re-verification for sensitive actions
- Concurrent session limits
Access Control Integration with FastAPI
Copy
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.security import HTTPBearer
from functools import wraps
from typing import List, Optional
app = FastAPI()
security = HTTPBearer()
# Initialize services
rbac_service = RBACService()
abac_engine = ABACEngine()
session_manager = SessionManager()
break_glass_service = BreakGlassService()
async def get_current_session(request: Request) -> Session:
"""Extract and validate session from request"""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization header")
session_id = auth_header[7:]
ip_address = request.client.host
user_agent = request.headers.get("User-Agent", "")
valid, message, session = session_manager.validate_session(
session_id, ip_address, user_agent
)
if not valid:
raise HTTPException(status_code=401, detail=message)
return session
def require_permission(permission: Permission):
"""Decorator to require specific permission"""
def decorator(func):
@wraps(func)
async def wrapper(*args, session: Session = Depends(get_current_session), **kwargs):
allowed, reason = rbac_service.check_access(
session.user_id, permission
)
if not allowed:
raise HTTPException(status_code=403, detail=reason)
return await func(*args, session=session, **kwargs)
return wrapper
return decorator
def require_patient_access(action: str):
"""Decorator to check patient-level access via ABAC"""
def decorator(func):
@wraps(func)
async def wrapper(
patient_id: str,
*args,
session: Session = Depends(get_current_session),
request: Request = None,
**kwargs
):
# Get user info
user = rbac_service.users.get(session.user_id)
if not user:
raise HTTPException(status_code=401, detail="User not found")
# Get patient/resource info (from database in production)
resource = await get_patient_resource(patient_id)
# Check break-glass first
break_glass = break_glass_service.check_break_glass_access(
session.user_id, patient_id
)
context = {
"break_glass_active": break_glass is not None,
}
# Check ABAC
allowed, reason, policies = abac_engine.check_access(
user=user.__dict__,
resource=resource,
action=action,
context=context,
)
if not allowed:
raise HTTPException(status_code=403, detail=reason)
# Log access if break-glass
if break_glass:
break_glass_service.log_resource_access(
break_glass.session_id,
"patient_record",
patient_id,
)
return await func(patient_id, *args, session=session, **kwargs)
return wrapper
return decorator
# Example endpoints
@app.get("/api/patients/{patient_id}")
@require_patient_access("read")
async def get_patient(patient_id: str, session: Session = Depends(get_current_session)):
"""Get patient record with access control"""
# Implementation
return {"patient_id": patient_id, "data": "..."}
@app.post("/api/break-glass")
async def initiate_break_glass(
patient_id: str,
reason: BreakGlassReason,
reason_detail: str = "",
session: Session = Depends(get_current_session),
request: Request = None,
):
"""Initiate break-glass emergency access"""
user = rbac_service.users.get(session.user_id)
success, message, bg_session = break_glass_service.initiate_break_glass(
user_id=session.user_id,
user_roles=user.roles,
patient_id=patient_id,
reason=reason,
reason_detail=reason_detail,
location=request.headers.get("X-Location", ""),
ip_address=request.client.host,
)
if not success:
raise HTTPException(status_code=403, detail=message)
return {"session_id": bg_session.session_id, "expires_at": bg_session.expires_at}
Key Takeaways
Least Privilege
Every user gets minimum access needed. No exceptions.
Defense in Depth
RBAC + ABAC + session controls = comprehensive access control.
Break-Glass is Required
HIPAA requires emergency access procedures. Build them thoughtfully.
Audit Everything
Log every access decision for HIPAA compliance and forensics.