Skip to main content

Access Control Systems for Healthcare

Access control is the gatekeeper of PHI. HIPAA requires that only authorized individuals access PHI, and only for legitimate purposes. This module covers everything from basic RBAC to sophisticated attribute-based policies and emergency access procedures.
Learning Objectives:
  • Implement Role-Based Access Control (RBAC) for healthcare
  • Design Attribute-Based Access Control (ABAC) policies
  • Build break-glass emergency access procedures
  • Integrate Multi-Factor Authentication (MFA)
  • Implement automatic session management
  • Create audit-ready access control documentation
Estimated Time: 8-10 hours
Hands-On Labs: 4 practical implementations
Prerequisites: HIPAA Fundamentals, Risk Assessment

HIPAA Access Control Requirements

┌─────────────────────────────────────────────────────────────────────────────┐
│                    HIPAA ACCESS CONTROL REQUIREMENTS                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  §164.312(a)(1) - ACCESS CONTROL (Required)                                │
│  ──────────────────────────────────────────                                 │
│  "Implement technical policies and procedures for electronic information   │
│   systems that maintain ePHI to allow access only to those persons or      │
│   software programs that have been granted access rights."                 │
│                                                                              │
│  Implementation Specifications:                                             │
│                                                                              │
│  ┌─────────────────────────┬──────────┬─────────────────────────────────┐  │
│  │ Specification           │ Status   │ Description                      │  │
│  ├─────────────────────────┼──────────┼─────────────────────────────────┤  │
│  │ Unique User ID          │ Required │ Each user has unique identifier │  │
│  │ Emergency Access        │ Required │ Procedures for emergencies      │  │
│  │ Automatic Logoff        │ Addressable│ Session timeout               │  │
│  │ Encryption/Decryption   │ Addressable│ Mechanism for encryption      │  │
│  └─────────────────────────┴──────────┴─────────────────────────────────┘  │
│                                                                              │
│  §164.312(d) - PERSON OR ENTITY AUTHENTICATION (Required)                  │
│  ────────────────────────────────────────────────────────                   │
│  "Implement procedures to verify that a person or entity seeking access    │
│   to ePHI is the one claimed."                                             │
│                                                                              │
│  §164.308(a)(4) - INFORMATION ACCESS MANAGEMENT (Required)                 │
│  ──────────────────────────────────────────────────────────                 │
│  "Implement policies and procedures for authorizing access to ePHI"        │
│                                                                              │
│  Implementation Specifications:                                             │
│  • Access authorization                                                     │
│  • Access establishment and modification                                    │
│  • Workforce clearance procedures                                           │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

The Principle of Least Privilege

Before diving into implementation, understand the core principle:
┌─────────────────────────────────────────────────────────────────────────────┐
│                    PRINCIPLE OF LEAST PRIVILEGE                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  "Every user should have only the minimum access necessary to               │
│   perform their job functions—nothing more, nothing less."                 │
│                                                                              │
│  Healthcare Examples:                                                       │
│  ───────────────────                                                        │
│                                                                              │
│  ┌─────────────┐    ┌─────────────────────────────────────────────────┐    │
│  │   Nurse     │───▶│ • View patients on assigned unit                │    │
│  │             │    │ • Document vital signs, assessments              │    │
│  │             │    │ • View (not edit) physician orders               │    │
│  │             │    │ ✗ Cannot access billing information              │    │
│  │             │    │ ✗ Cannot access other units' patients            │    │
│  └─────────────┘    └─────────────────────────────────────────────────┘    │
│                                                                              │
│  ┌─────────────┐    ┌─────────────────────────────────────────────────┐    │
│  │  Physician  │───▶│ • Full access to own patients                   │    │
│  │             │    │ • Write orders, prescriptions                    │    │
│  │             │    │ • Consult on other physicians' patients         │    │
│  │             │    │ ✗ Cannot modify another physician's notes       │    │
│  │             │    │ ✗ Cannot access administrative functions         │    │
│  └─────────────┘    └─────────────────────────────────────────────────┘    │
│                                                                              │
│  ┌─────────────┐    ┌─────────────────────────────────────────────────┐    │
│  │   Billing   │───▶│ • Access billing/insurance information          │    │
│  │   Staff     │    │ • View diagnosis codes for billing              │    │
│  │             │    │ ✗ Cannot view clinical notes                    │    │
│  │             │    │ ✗ Cannot view lab results                       │    │
│  └─────────────┘    └─────────────────────────────────────────────────┘    │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Role-Based Access Control (RBAC)

RBAC Architecture

from dataclasses import dataclass, field
from typing import List, Set, Optional, Dict
from enum import Enum
from datetime import datetime, timedelta
import uuid

class Permission(Enum):
    """Granular permissions for healthcare systems"""
    
    # Patient Records
    PATIENT_VIEW = "patient:view"
    PATIENT_CREATE = "patient:create"
    PATIENT_UPDATE = "patient:update"
    PATIENT_DELETE = "patient:delete"
    
    # Clinical Notes
    NOTES_VIEW = "notes:view"
    NOTES_CREATE = "notes:create"
    NOTES_UPDATE_OWN = "notes:update:own"
    NOTES_UPDATE_ANY = "notes:update:any"
    NOTES_DELETE = "notes:delete"
    
    # Orders
    ORDERS_VIEW = "orders:view"
    ORDERS_CREATE = "orders:create"
    ORDERS_SIGN = "orders:sign"
    ORDERS_CANCEL = "orders:cancel"
    
    # Prescriptions
    RX_VIEW = "rx:view"
    RX_PRESCRIBE = "rx:prescribe"
    RX_DISPENSE = "rx:dispense"
    
    # Lab Results
    LABS_VIEW = "labs:view"
    LABS_ORDER = "labs:order"
    LABS_RESULT = "labs:result"
    
    # Billing
    BILLING_VIEW = "billing:view"
    BILLING_CREATE = "billing:create"
    BILLING_MODIFY = "billing:modify"
    
    # Administration
    ADMIN_USERS = "admin:users"
    ADMIN_ROLES = "admin:roles"
    ADMIN_AUDIT = "admin:audit"
    ADMIN_SYSTEM = "admin:system"
    
    # PHI Export
    PHI_EXPORT = "phi:export"
    PHI_BULK_ACCESS = "phi:bulk_access"
    
    # Emergency
    BREAK_GLASS = "emergency:break_glass"


@dataclass
class Role:
    """Healthcare role with permissions"""
    
    role_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    permissions: Set[Permission] = field(default_factory=set)
    
    # Role hierarchy
    parent_role: Optional[str] = None  # Inherits from parent
    
    # Constraints
    max_patients: Optional[int] = None  # Max concurrent patients
    allowed_departments: List[str] = field(default_factory=list)
    allowed_facilities: List[str] = field(default_factory=list)
    
    # Temporal
    time_restrictions: Optional[Dict] = None  # e.g., {"start": "08:00", "end": "18:00"}
    
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)
    
    def has_permission(self, permission: Permission) -> bool:
        return permission in self.permissions


# Define standard healthcare roles
HEALTHCARE_ROLES = {
    "physician": Role(
        name="Physician",
        description="Licensed physician with full clinical access",
        permissions={
            Permission.PATIENT_VIEW,
            Permission.PATIENT_CREATE,
            Permission.PATIENT_UPDATE,
            Permission.NOTES_VIEW,
            Permission.NOTES_CREATE,
            Permission.NOTES_UPDATE_OWN,
            Permission.ORDERS_VIEW,
            Permission.ORDERS_CREATE,
            Permission.ORDERS_SIGN,
            Permission.RX_VIEW,
            Permission.RX_PRESCRIBE,
            Permission.LABS_VIEW,
            Permission.LABS_ORDER,
            Permission.BREAK_GLASS,
        },
    ),
    
    "nurse": Role(
        name="Registered Nurse",
        description="RN with documentation and viewing access",
        permissions={
            Permission.PATIENT_VIEW,
            Permission.PATIENT_UPDATE,
            Permission.NOTES_VIEW,
            Permission.NOTES_CREATE,
            Permission.NOTES_UPDATE_OWN,
            Permission.ORDERS_VIEW,
            Permission.RX_VIEW,
            Permission.LABS_VIEW,
            Permission.BREAK_GLASS,
        },
    ),
    
    "medical_assistant": Role(
        name="Medical Assistant",
        description="MA with limited clinical access",
        permissions={
            Permission.PATIENT_VIEW,
            Permission.NOTES_VIEW,
            Permission.NOTES_CREATE,
            Permission.ORDERS_VIEW,
            Permission.LABS_VIEW,
        },
    ),
    
    "billing_specialist": Role(
        name="Billing Specialist",
        description="Access to billing and minimal clinical info",
        permissions={
            Permission.PATIENT_VIEW,  # Demographics only
            Permission.BILLING_VIEW,
            Permission.BILLING_CREATE,
            Permission.BILLING_MODIFY,
        },
    ),
    
    "admin": Role(
        name="System Administrator",
        description="System administration without clinical access",
        permissions={
            Permission.ADMIN_USERS,
            Permission.ADMIN_ROLES,
            Permission.ADMIN_AUDIT,
            Permission.ADMIN_SYSTEM,
        },
    ),
    
    "compliance_officer": Role(
        name="Compliance Officer",
        description="Audit and compliance oversight",
        permissions={
            Permission.ADMIN_AUDIT,
            Permission.PHI_EXPORT,  # For audit purposes
        },
    ),
}


@dataclass
class User:
    """User with role assignments"""
    
    user_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    username: str = ""
    email: str = ""
    
    # Authentication
    password_hash: str = ""
    mfa_enabled: bool = False
    mfa_secret: Optional[str] = None
    
    # Role assignments
    roles: List[str] = field(default_factory=list)
    
    # Department/facility constraints
    department: str = ""
    facility: str = ""
    
    # Status
    is_active: bool = True
    last_login: Optional[datetime] = None
    failed_login_attempts: int = 0
    locked_until: Optional[datetime] = None
    
    # Audit
    created_at: datetime = field(default_factory=datetime.utcnow)
    created_by: str = ""
    
    def get_all_permissions(self, roles_db: Dict[str, Role]) -> Set[Permission]:
        """Get all permissions from all assigned roles"""
        permissions = set()
        for role_name in self.roles:
            if role_name in roles_db:
                role = roles_db[role_name]
                permissions.update(role.permissions)
                # Handle role hierarchy
                if role.parent_role and role.parent_role in roles_db:
                    parent = roles_db[role.parent_role]
                    permissions.update(parent.permissions)
        return permissions
    
    def has_permission(
        self, 
        permission: Permission, 
        roles_db: Dict[str, Role]
    ) -> bool:
        """Check if user has specific permission"""
        return permission in self.get_all_permissions(roles_db)


class RBACService:
    """RBAC enforcement service"""
    
    def __init__(self):
        self.roles: Dict[str, Role] = HEALTHCARE_ROLES.copy()
        self.users: Dict[str, User] = {}
        
    def check_access(
        self,
        user_id: str,
        permission: Permission,
        resource: Optional[Dict] = None,
    ) -> tuple[bool, str]:
        """
        Check if user has access to perform action
        
        Returns:
            Tuple of (allowed: bool, reason: str)
        """
        user = self.users.get(user_id)
        if not user:
            return False, "User not found"
        
        if not user.is_active:
            return False, "User account is inactive"
        
        if user.locked_until and user.locked_until > datetime.utcnow():
            return False, "User account is locked"
        
        # Check permission
        if not user.has_permission(permission, self.roles):
            return False, f"User lacks permission: {permission.value}"
        
        # Check department/facility constraints if resource provided
        if resource:
            resource_dept = resource.get("department")
            resource_facility = resource.get("facility")
            
            # Get user's role constraints
            for role_name in user.roles:
                role = self.roles.get(role_name)
                if role:
                    if role.allowed_departments:
                        if resource_dept not in role.allowed_departments:
                            return False, f"Access denied: wrong department"
                    if role.allowed_facilities:
                        if resource_facility not in role.allowed_facilities:
                            return False, f"Access denied: wrong facility"
        
        return True, "Access granted"
    
    def assign_role(
        self,
        user_id: str,
        role_name: str,
        assigned_by: str,
    ) -> bool:
        """Assign role to user with audit"""
        user = self.users.get(user_id)
        if not user:
            return False
        
        if role_name not in self.roles:
            return False
        
        if role_name not in user.roles:
            user.roles.append(role_name)
            # Log role assignment for audit
            self._log_role_change(user_id, role_name, "assigned", assigned_by)
        
        return True
    
    def revoke_role(
        self,
        user_id: str,
        role_name: str,
        revoked_by: str,
    ) -> bool:
        """Revoke role from user with audit"""
        user = self.users.get(user_id)
        if not user:
            return False
        
        if role_name in user.roles:
            user.roles.remove(role_name)
            self._log_role_change(user_id, role_name, "revoked", revoked_by)
        
        return True
    
    def _log_role_change(
        self,
        user_id: str,
        role_name: str,
        action: str,
        changed_by: str,
    ):
        """Log role changes for HIPAA audit"""
        # In production, this would go to audit log system
        print(f"AUDIT: Role {role_name} {action} for user {user_id} by {changed_by}")

Attribute-Based Access Control (ABAC)

ABAC provides more granular control by evaluating attributes of the user, resource, and context:
from dataclasses import dataclass
from typing import Dict, Any, List, Callable
from enum import Enum
from datetime import datetime, time
import json

class PolicyEffect(Enum):
    ALLOW = "allow"
    DENY = "deny"

@dataclass
class PolicyCondition:
    """Single condition in a policy"""
    
    attribute: str  # e.g., "user.department", "resource.sensitivity"
    operator: str   # e.g., "equals", "in", "greater_than"
    value: Any
    
    def evaluate(self, context: Dict[str, Any]) -> bool:
        """Evaluate this condition against context"""
        
        # Navigate to attribute value
        attr_value = self._get_attribute(context, self.attribute)
        
        if self.operator == "equals":
            return attr_value == self.value
        elif self.operator == "not_equals":
            return attr_value != self.value
        elif self.operator == "in":
            return attr_value in self.value
        elif self.operator == "not_in":
            return attr_value not in self.value
        elif self.operator == "contains":
            return self.value in attr_value
        elif self.operator == "greater_than":
            return attr_value > self.value
        elif self.operator == "less_than":
            return attr_value < self.value
        elif self.operator == "is_true":
            return bool(attr_value)
        elif self.operator == "is_false":
            return not bool(attr_value)
        
        return False
    
    def _get_attribute(self, context: Dict, path: str) -> Any:
        """Navigate nested dict using dot notation"""
        parts = path.split(".")
        value = context
        for part in parts:
            if isinstance(value, dict):
                value = value.get(part)
            else:
                return None
        return value


@dataclass
class AccessPolicy:
    """ABAC policy definition"""
    
    policy_id: str
    name: str
    description: str
    effect: PolicyEffect
    
    # What this policy applies to
    resource_types: List[str]  # e.g., ["patient_record", "lab_result"]
    actions: List[str]         # e.g., ["read", "write", "delete"]
    
    # Conditions that must be met
    conditions: List[PolicyCondition]
    
    # Priority (lower = higher priority)
    priority: int = 100
    
    def evaluate(self, context: Dict[str, Any]) -> tuple[bool, PolicyEffect]:
        """
        Evaluate policy against context
        
        Returns:
            Tuple of (matches: bool, effect: PolicyEffect)
        """
        # Check if policy applies to this resource/action
        resource_type = context.get("resource", {}).get("type")
        action = context.get("action")
        
        if resource_type not in self.resource_types:
            return False, None
        if action not in self.actions:
            return False, None
        
        # Evaluate all conditions (AND logic)
        for condition in self.conditions:
            if not condition.evaluate(context):
                return False, None
        
        return True, self.effect


class ABACEngine:
    """Attribute-Based Access Control Engine"""
    
    def __init__(self):
        self.policies: List[AccessPolicy] = []
        self._setup_healthcare_policies()
    
    def _setup_healthcare_policies(self):
        """Define healthcare-specific ABAC policies"""
        
        # Policy 1: Treatment team can access their patients
        self.policies.append(AccessPolicy(
            policy_id="P-001",
            name="Treatment Team Access",
            description="Allow access to patients on user's treatment team",
            effect=PolicyEffect.ALLOW,
            resource_types=["patient_record", "clinical_note", "lab_result"],
            actions=["read", "write"],
            conditions=[
                PolicyCondition(
                    attribute="user.user_id",
                    operator="in",
                    value="resource.treatment_team",
                ),
            ],
            priority=10,
        ))
        
        # Policy 2: Same department access
        self.policies.append(AccessPolicy(
            policy_id="P-002",
            name="Department Access",
            description="Allow access to patients in same department",
            effect=PolicyEffect.ALLOW,
            resource_types=["patient_record", "clinical_note"],
            actions=["read"],
            conditions=[
                PolicyCondition(
                    attribute="user.department",
                    operator="equals",
                    value="resource.department",
                ),
                PolicyCondition(
                    attribute="user.roles",
                    operator="contains",
                    value="clinical_staff",
                ),
            ],
            priority=20,
        ))
        
        # Policy 3: Deny access to VIP patients without explicit permission
        self.policies.append(AccessPolicy(
            policy_id="P-003",
            name="VIP Patient Protection",
            description="Require explicit VIP access for VIP patients",
            effect=PolicyEffect.DENY,
            resource_types=["patient_record", "clinical_note", "lab_result"],
            actions=["read", "write", "delete"],
            conditions=[
                PolicyCondition(
                    attribute="resource.is_vip",
                    operator="is_true",
                    value=None,
                ),
                PolicyCondition(
                    attribute="user.has_vip_access",
                    operator="is_false",
                    value=None,
                ),
            ],
            priority=5,  # High priority - evaluate first
        ))
        
        # Policy 4: Time-based access restriction
        self.policies.append(AccessPolicy(
            policy_id="P-004",
            name="Business Hours Only",
            description="Deny access outside business hours for non-clinical staff",
            effect=PolicyEffect.DENY,
            resource_types=["patient_record"],
            actions=["read", "write"],
            conditions=[
                PolicyCondition(
                    attribute="context.is_business_hours",
                    operator="is_false",
                    value=None,
                ),
                PolicyCondition(
                    attribute="user.roles",
                    operator="not_in",
                    value=["physician", "nurse", "on_call"],
                ),
            ],
            priority=15,
        ))
        
        # Policy 5: Minimum necessary for billing
        self.policies.append(AccessPolicy(
            policy_id="P-005",
            name="Billing Minimum Necessary",
            description="Limit billing staff to billing-relevant fields only",
            effect=PolicyEffect.DENY,
            resource_types=["clinical_note", "mental_health_note", "substance_abuse"],
            actions=["read"],
            conditions=[
                PolicyCondition(
                    attribute="user.primary_role",
                    operator="equals",
                    value="billing_specialist",
                ),
            ],
            priority=10,
        ))
        
        # Policy 6: Break-glass override
        self.policies.append(AccessPolicy(
            policy_id="P-006",
            name="Break Glass Override",
            description="Allow emergency access with break-glass",
            effect=PolicyEffect.ALLOW,
            resource_types=["patient_record", "clinical_note", "lab_result", "prescription"],
            actions=["read"],
            conditions=[
                PolicyCondition(
                    attribute="context.break_glass_active",
                    operator="is_true",
                    value=None,
                ),
                PolicyCondition(
                    attribute="user.can_break_glass",
                    operator="is_true",
                    value=None,
                ),
            ],
            priority=1,  # Highest priority
        ))
    
    def check_access(
        self,
        user: Dict[str, Any],
        resource: Dict[str, Any],
        action: str,
        context: Optional[Dict[str, Any]] = None,
    ) -> tuple[bool, str, List[str]]:
        """
        Check if access should be granted
        
        Returns:
            Tuple of (allowed: bool, reason: str, matched_policies: List[str])
        """
        full_context = {
            "user": user,
            "resource": resource,
            "action": action,
            "context": context or {},
        }
        
        # Add computed context
        full_context["context"]["is_business_hours"] = self._is_business_hours()
        full_context["context"]["current_time"] = datetime.utcnow().isoformat()
        
        # Sort policies by priority
        sorted_policies = sorted(self.policies, key=lambda p: p.priority)
        
        matched_policies = []
        
        for policy in sorted_policies:
            matches, effect = policy.evaluate(full_context)
            
            if matches:
                matched_policies.append(policy.policy_id)
                
                if effect == PolicyEffect.DENY:
                    return False, f"Denied by policy: {policy.name}", matched_policies
                elif effect == PolicyEffect.ALLOW:
                    # Continue checking for explicit denies
                    pass
        
        # If any ALLOW matched and no DENY, grant access
        allow_policies = [p for p in matched_policies 
                         if any(pol.policy_id == p and pol.effect == PolicyEffect.ALLOW 
                               for pol in self.policies)]
        
        if allow_policies:
            return True, "Access granted", matched_policies
        
        # Default deny
        return False, "No matching allow policy", []
    
    def _is_business_hours(self) -> bool:
        """Check if current time is within business hours"""
        now = datetime.utcnow().time()
        start = time(8, 0)  # 8 AM
        end = time(18, 0)   # 6 PM
        return start <= now <= end


# Example usage
abac = ABACEngine()

user = {
    "user_id": "dr-smith-123",
    "department": "cardiology",
    "roles": ["physician", "clinical_staff"],
    "primary_role": "physician",
    "has_vip_access": False,
    "can_break_glass": True,
}

resource = {
    "type": "patient_record",
    "patient_id": "patient-456",
    "department": "cardiology",
    "treatment_team": ["dr-smith-123", "nurse-jones-789"],
    "is_vip": False,
}

allowed, reason, policies = abac.check_access(
    user=user,
    resource=resource,
    action="read",
)

print(f"Access: {allowed}, Reason: {reason}")

Break-Glass Emergency Access

Break-glass (also called “break-the-glass”) provides emergency access when normal access controls would prevent critical patient care:
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, List
from enum import Enum
import uuid
import hashlib

class BreakGlassReason(Enum):
    """Pre-defined reasons for break-glass access"""
    MEDICAL_EMERGENCY = "medical_emergency"
    PATIENT_UNRESPONSIVE = "patient_unresponsive"
    TRAUMA = "trauma"
    CODE_BLUE = "code_blue"
    TREATMENT_CONTINUITY = "treatment_continuity"
    DISASTER_RESPONSE = "disaster_response"
    OTHER = "other"

@dataclass
class BreakGlassSession:
    """Active break-glass session"""
    
    session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    user_id: str = ""
    patient_id: str = ""
    
    # Reason
    reason: BreakGlassReason = BreakGlassReason.MEDICAL_EMERGENCY
    reason_detail: str = ""  # Free text for OTHER
    
    # Timing
    started_at: datetime = field(default_factory=datetime.utcnow)
    expires_at: datetime = field(default_factory=lambda: datetime.utcnow() + timedelta(hours=4))
    ended_at: Optional[datetime] = None
    
    # Context
    location: str = ""
    ip_address: str = ""
    device_id: str = ""
    
    # Review status
    reviewed: bool = False
    reviewed_by: Optional[str] = None
    reviewed_at: Optional[datetime] = None
    review_outcome: Optional[str] = None  # "appropriate", "inappropriate", "under_investigation"
    
    # Audit
    resources_accessed: List[str] = field(default_factory=list)
    
    @property
    def is_active(self) -> bool:
        if self.ended_at:
            return False
        return datetime.utcnow() < self.expires_at


class BreakGlassService:
    """
    Break-glass emergency access management
    
    HIPAA §164.312(a)(2)(ii) requires emergency access procedures.
    """
    
    def __init__(self):
        self.active_sessions: Dict[str, BreakGlassSession] = {}
        self.session_history: List[BreakGlassSession] = []
        
        # Configuration
        self.max_duration = timedelta(hours=4)
        self.extension_duration = timedelta(hours=2)
        self.require_review_within = timedelta(hours=24)
        
        # Allowed roles for break-glass
        self.allowed_roles = {"physician", "nurse", "paramedic", "emergency_staff"}
    
    def initiate_break_glass(
        self,
        user_id: str,
        user_roles: List[str],
        patient_id: str,
        reason: BreakGlassReason,
        reason_detail: str = "",
        location: str = "",
        ip_address: str = "",
    ) -> tuple[bool, str, Optional[BreakGlassSession]]:
        """
        Initiate break-glass access
        
        Returns:
            Tuple of (success: bool, message: str, session: Optional[BreakGlassSession])
        """
        # Verify user is allowed to break glass
        if not any(role in self.allowed_roles for role in user_roles):
            return False, "User role not authorized for break-glass access", None
        
        # Check for existing session
        existing = self._get_active_session(user_id, patient_id)
        if existing:
            return False, "Break-glass session already active", existing
        
        # Create session
        session = BreakGlassSession(
            user_id=user_id,
            patient_id=patient_id,
            reason=reason,
            reason_detail=reason_detail,
            location=location,
            ip_address=ip_address,
            expires_at=datetime.utcnow() + self.max_duration,
        )
        
        self.active_sessions[session.session_id] = session
        
        # Send alerts
        self._send_break_glass_alert(session)
        
        # Log for audit
        self._log_break_glass_event(session, "initiated")
        
        return True, "Break-glass access granted", session
    
    def end_break_glass(
        self,
        session_id: str,
        user_id: str,
    ) -> tuple[bool, str]:
        """End a break-glass session"""
        
        session = self.active_sessions.get(session_id)
        if not session:
            return False, "Session not found"
        
        if session.user_id != user_id:
            return False, "Only session owner can end session"
        
        session.ended_at = datetime.utcnow()
        
        # Move to history
        del self.active_sessions[session_id]
        self.session_history.append(session)
        
        # Log
        self._log_break_glass_event(session, "ended")
        
        # Queue for review
        self._queue_for_review(session)
        
        return True, "Break-glass session ended"
    
    def check_break_glass_access(
        self,
        user_id: str,
        patient_id: str,
    ) -> Optional[BreakGlassSession]:
        """Check if user has active break-glass access to patient"""
        return self._get_active_session(user_id, patient_id)
    
    def log_resource_access(
        self,
        session_id: str,
        resource_type: str,
        resource_id: str,
    ):
        """Log resource accessed during break-glass"""
        session = self.active_sessions.get(session_id)
        if session:
            session.resources_accessed.append(f"{resource_type}:{resource_id}")
    
    def review_session(
        self,
        session_id: str,
        reviewer_id: str,
        outcome: str,
        notes: str = "",
    ) -> tuple[bool, str]:
        """Review a break-glass session"""
        
        # Find session in history
        session = next(
            (s for s in self.session_history if s.session_id == session_id),
            None
        )
        
        if not session:
            return False, "Session not found"
        
        if session.reviewed:
            return False, "Session already reviewed"
        
        session.reviewed = True
        session.reviewed_by = reviewer_id
        session.reviewed_at = datetime.utcnow()
        session.review_outcome = outcome
        
        # Log review
        self._log_break_glass_event(session, f"reviewed:{outcome}")
        
        # If inappropriate, escalate
        if outcome == "inappropriate":
            self._escalate_inappropriate_access(session, notes)
        
        return True, "Review recorded"
    
    def get_pending_reviews(self) -> List[BreakGlassSession]:
        """Get sessions pending review"""
        return [
            s for s in self.session_history
            if not s.reviewed
        ]
    
    def _get_active_session(
        self,
        user_id: str,
        patient_id: str,
    ) -> Optional[BreakGlassSession]:
        """Find active session for user/patient combo"""
        for session in self.active_sessions.values():
            if (session.user_id == user_id and 
                session.patient_id == patient_id and 
                session.is_active):
                return session
        return None
    
    def _send_break_glass_alert(self, session: BreakGlassSession):
        """Send real-time alert for break-glass access"""
        alert = {
            "type": "break_glass_access",
            "severity": "high",
            "session_id": session.session_id,
            "user_id": session.user_id,
            "patient_id": session.patient_id,
            "reason": session.reason.value,
            "timestamp": session.started_at.isoformat(),
            "location": session.location,
        }
        # In production: Send to security team, compliance officer
        print(f"ALERT: Break-glass access initiated: {alert}")
    
    def _log_break_glass_event(
        self,
        session: BreakGlassSession,
        event: str,
    ):
        """Log break-glass event for audit trail"""
        # In production: Send to immutable audit log
        log_entry = {
            "event_type": f"break_glass.{event}",
            "session_id": session.session_id,
            "user_id": session.user_id,
            "patient_id": session.patient_id,
            "reason": session.reason.value,
            "timestamp": datetime.utcnow().isoformat(),
        }
        print(f"AUDIT: {log_entry}")
    
    def _queue_for_review(self, session: BreakGlassSession):
        """Queue session for compliance review"""
        # In production: Create ticket in compliance system
        print(f"REVIEW QUEUE: Session {session.session_id} queued for review")
    
    def _escalate_inappropriate_access(
        self,
        session: BreakGlassSession,
        notes: str,
    ):
        """Escalate inappropriate access for investigation"""
        escalation = {
            "type": "security_incident",
            "category": "inappropriate_phi_access",
            "session_id": session.session_id,
            "user_id": session.user_id,
            "patient_id": session.patient_id,
            "notes": notes,
        }
        # In production: Create incident ticket, notify security team
        print(f"ESCALATION: {escalation}")

Multi-Factor Authentication (MFA)

import pyotp
import secrets
import hashlib
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, List
from enum import Enum

class MFAType(Enum):
    TOTP = "totp"           # Time-based one-time password
    SMS = "sms"             # SMS code
    EMAIL = "email"         # Email code
    PUSH = "push"           # Push notification
    HARDWARE = "hardware"   # Hardware token (FIDO2/WebAuthn)

@dataclass
class MFAEnrollment:
    """User MFA enrollment"""
    user_id: str
    mfa_type: MFAType
    secret: str  # Encrypted in production
    backup_codes: List[str]  # Hashed backup codes
    enrolled_at: datetime
    last_used: Optional[datetime] = None
    
class MFAService:
    """
    Multi-Factor Authentication for HIPAA compliance
    
    HIPAA doesn't explicitly require MFA, but it's considered
    a best practice for the "Person or Entity Authentication"
    requirement (§164.312(d)).
    """
    
    def __init__(self):
        self.enrollments: Dict[str, MFAEnrollment] = {}
        self.pending_challenges: Dict[str, dict] = {}
    
    def enroll_totp(self, user_id: str, user_email: str) -> dict:
        """
        Enroll user in TOTP-based MFA
        
        Returns provisioning data for authenticator app
        """
        # Generate secret
        secret = pyotp.random_base32()
        
        # Generate backup codes
        backup_codes = [secrets.token_hex(4) for _ in range(10)]
        backup_codes_hashed = [
            hashlib.sha256(code.encode()).hexdigest()
            for code in backup_codes
        ]
        
        # Create enrollment
        enrollment = MFAEnrollment(
            user_id=user_id,
            mfa_type=MFAType.TOTP,
            secret=secret,
            backup_codes=backup_codes_hashed,
            enrolled_at=datetime.utcnow(),
        )
        
        # Store (encrypt secret in production!)
        self.enrollments[user_id] = enrollment
        
        # Generate provisioning URI
        totp = pyotp.TOTP(secret)
        provisioning_uri = totp.provisioning_uri(
            name=user_email,
            issuer_name="HIPAA Healthcare",
        )
        
        return {
            "secret": secret,
            "provisioning_uri": provisioning_uri,
            "backup_codes": backup_codes,  # Show once, never again
        }
    
    def verify_totp(self, user_id: str, code: str) -> bool:
        """Verify TOTP code"""
        enrollment = self.enrollments.get(user_id)
        if not enrollment or enrollment.mfa_type != MFAType.TOTP:
            return False
        
        totp = pyotp.TOTP(enrollment.secret)
        
        # Allow 1 time step tolerance for clock drift
        if totp.verify(code, valid_window=1):
            enrollment.last_used = datetime.utcnow()
            return True
        
        # Check backup codes
        code_hash = hashlib.sha256(code.encode()).hexdigest()
        if code_hash in enrollment.backup_codes:
            enrollment.backup_codes.remove(code_hash)
            enrollment.last_used = datetime.utcnow()
            return True
        
        return False
    
    def send_sms_challenge(self, user_id: str, phone_number: str) -> str:
        """Send SMS challenge code"""
        code = secrets.token_hex(3)  # 6-char code
        
        self.pending_challenges[user_id] = {
            "code_hash": hashlib.sha256(code.encode()).hexdigest(),
            "expires_at": datetime.utcnow() + timedelta(minutes=5),
            "attempts": 0,
        }
        
        # In production: Send via SMS provider
        # sms_client.send(phone_number, f"Your verification code: {code}")
        
        return code  # Return for testing; don't return in production
    
    def verify_sms_challenge(self, user_id: str, code: str) -> bool:
        """Verify SMS challenge code"""
        challenge = self.pending_challenges.get(user_id)
        if not challenge:
            return False
        
        if datetime.utcnow() > challenge["expires_at"]:
            del self.pending_challenges[user_id]
            return False
        
        challenge["attempts"] += 1
        if challenge["attempts"] > 3:
            del self.pending_challenges[user_id]
            return False
        
        code_hash = hashlib.sha256(code.encode()).hexdigest()
        if code_hash == challenge["code_hash"]:
            del self.pending_challenges[user_id]
            return True
        
        return False
    
    def require_mfa_for_action(
        self,
        user_id: str,
        action: str,
    ) -> bool:
        """Determine if action requires MFA re-verification"""
        
        # High-risk actions requiring MFA
        high_risk_actions = {
            "phi_export",
            "bulk_access",
            "password_change",
            "role_assignment",
            "break_glass",
            "patient_delete",
        }
        
        return action in high_risk_actions

Session Management

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, List
import secrets
import hashlib
import json

@dataclass
class Session:
    """User session with HIPAA-compliant timeouts"""
    
    session_id: str = field(default_factory=lambda: secrets.token_urlsafe(32))
    user_id: str = ""
    
    # Timing
    created_at: datetime = field(default_factory=datetime.utcnow)
    last_activity: datetime = field(default_factory=datetime.utcnow)
    expires_at: datetime = field(default_factory=lambda: datetime.utcnow() + timedelta(hours=8))
    
    # Security
    ip_address: str = ""
    user_agent: str = ""
    device_fingerprint: str = ""
    
    # State
    mfa_verified: bool = False
    mfa_verified_at: Optional[datetime] = None
    
    # Elevation
    elevated: bool = False
    elevated_at: Optional[datetime] = None
    elevated_reason: str = ""
    
    # Tracking
    resources_accessed: int = 0
    
    @property
    def is_valid(self) -> bool:
        return datetime.utcnow() < self.expires_at
    
    @property
    def is_idle_timeout(self) -> bool:
        # HIPAA recommends 15 minutes idle timeout
        idle_limit = timedelta(minutes=15)
        return (datetime.utcnow() - self.last_activity) > idle_limit
    
    @property
    def needs_mfa_refresh(self) -> bool:
        # Require MFA re-verification every 4 hours
        if not self.mfa_verified_at:
            return True
        mfa_validity = timedelta(hours=4)
        return (datetime.utcnow() - self.mfa_verified_at) > mfa_validity


class SessionManager:
    """
    HIPAA-compliant session management
    
    Implements:
    - Automatic logoff (§164.312(a)(2)(iii))
    - Session timeout
    - Concurrent session limits
    - Session binding
    """
    
    def __init__(self):
        self.sessions: Dict[str, Session] = {}
        
        # Configuration
        self.idle_timeout = timedelta(minutes=15)  # HIPAA recommendation
        self.absolute_timeout = timedelta(hours=8)
        self.max_concurrent_sessions = 3
        self.require_session_binding = True
    
    def create_session(
        self,
        user_id: str,
        ip_address: str,
        user_agent: str,
        mfa_verified: bool = False,
    ) -> Session:
        """Create new session after authentication"""
        
        # Check concurrent session limit
        user_sessions = self.get_user_sessions(user_id)
        if len(user_sessions) >= self.max_concurrent_sessions:
            # Terminate oldest session
            oldest = min(user_sessions, key=lambda s: s.created_at)
            self.terminate_session(oldest.session_id, "max_sessions_exceeded")
        
        session = Session(
            user_id=user_id,
            ip_address=ip_address,
            user_agent=user_agent,
            device_fingerprint=self._compute_fingerprint(ip_address, user_agent),
            mfa_verified=mfa_verified,
            mfa_verified_at=datetime.utcnow() if mfa_verified else None,
            expires_at=datetime.utcnow() + self.absolute_timeout,
        )
        
        self.sessions[session.session_id] = session
        
        # Log session creation
        self._log_session_event(session, "created")
        
        return session
    
    def validate_session(
        self,
        session_id: str,
        ip_address: str,
        user_agent: str,
    ) -> tuple[bool, str, Optional[Session]]:
        """
        Validate session for request
        
        Returns:
            Tuple of (valid: bool, message: str, session: Optional[Session])
        """
        session = self.sessions.get(session_id)
        
        if not session:
            return False, "Session not found", None
        
        if not session.is_valid:
            self.terminate_session(session_id, "expired")
            return False, "Session expired", None
        
        if session.is_idle_timeout:
            self.terminate_session(session_id, "idle_timeout")
            return False, "Session timed out due to inactivity", None
        
        # Session binding validation
        if self.require_session_binding:
            current_fingerprint = self._compute_fingerprint(ip_address, user_agent)
            if current_fingerprint != session.device_fingerprint:
                self.terminate_session(session_id, "session_binding_violation")
                self._log_security_event(session, "session_hijack_attempt")
                return False, "Session binding violation", None
        
        # Update activity
        session.last_activity = datetime.utcnow()
        session.resources_accessed += 1
        
        return True, "Valid", session
    
    def terminate_session(
        self,
        session_id: str,
        reason: str,
    ) -> bool:
        """Terminate session"""
        session = self.sessions.get(session_id)
        if not session:
            return False
        
        self._log_session_event(session, f"terminated:{reason}")
        del self.sessions[session_id]
        return True
    
    def terminate_all_user_sessions(
        self,
        user_id: str,
        reason: str,
    ):
        """Terminate all sessions for a user (password change, security event)"""
        user_sessions = self.get_user_sessions(user_id)
        for session in user_sessions:
            self.terminate_session(session.session_id, reason)
    
    def get_user_sessions(self, user_id: str) -> List[Session]:
        """Get all active sessions for user"""
        return [s for s in self.sessions.values() if s.user_id == user_id]
    
    def elevate_session(
        self,
        session_id: str,
        reason: str,
    ) -> bool:
        """Elevate session privileges (after MFA re-verification)"""
        session = self.sessions.get(session_id)
        if not session:
            return False
        
        session.elevated = True
        session.elevated_at = datetime.utcnow()
        session.elevated_reason = reason
        
        self._log_session_event(session, f"elevated:{reason}")
        return True
    
    def _compute_fingerprint(self, ip_address: str, user_agent: str) -> str:
        """Compute device fingerprint for session binding"""
        data = f"{ip_address}:{user_agent}"
        return hashlib.sha256(data.encode()).hexdigest()[:16]
    
    def _log_session_event(self, session: Session, event: str):
        """Log session event for audit"""
        log_entry = {
            "event_type": f"session.{event}",
            "session_id": session.session_id[:8] + "...",  # Partial for privacy
            "user_id": session.user_id,
            "ip_address": session.ip_address,
            "timestamp": datetime.utcnow().isoformat(),
        }
        print(f"AUDIT: {log_entry}")
    
    def _log_security_event(self, session: Session, event: str):
        """Log security event for SIEM"""
        security_event = {
            "event_type": f"security.{event}",
            "severity": "high",
            "session_id": session.session_id,
            "user_id": session.user_id,
            "ip_address": session.ip_address,
            "timestamp": datetime.utcnow().isoformat(),
        }
        print(f"SECURITY ALERT: {security_event}")

Hands-On Lab: Implement Access Control

1

Define Your Role Hierarchy

Create at least 5 roles for your healthcare application:
  • Define permissions for each role
  • Implement role inheritance where appropriate
  • Document role assignment procedures
2

Implement ABAC Policies

Create policies for:
  • Treatment team access
  • Department-based access
  • VIP patient protection
  • Time-based restrictions
  • Break-glass override
3

Build Break-Glass System

Implement:
  • Break-glass initiation with reason
  • Automatic expiration
  • Real-time alerting
  • Post-access review workflow
4

Session Management

Implement:
  • 15-minute idle timeout
  • Session binding
  • MFA re-verification for sensitive actions
  • Concurrent session limits

Access Control Integration with FastAPI

from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.security import HTTPBearer
from functools import wraps
from typing import List, Optional

app = FastAPI()
security = HTTPBearer()

# Initialize services
rbac_service = RBACService()
abac_engine = ABACEngine()
session_manager = SessionManager()
break_glass_service = BreakGlassService()


async def get_current_session(request: Request) -> Session:
    """Extract and validate session from request"""
    auth_header = request.headers.get("Authorization", "")
    if not auth_header.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Invalid authorization header")
    
    session_id = auth_header[7:]
    ip_address = request.client.host
    user_agent = request.headers.get("User-Agent", "")
    
    valid, message, session = session_manager.validate_session(
        session_id, ip_address, user_agent
    )
    
    if not valid:
        raise HTTPException(status_code=401, detail=message)
    
    return session


def require_permission(permission: Permission):
    """Decorator to require specific permission"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, session: Session = Depends(get_current_session), **kwargs):
            allowed, reason = rbac_service.check_access(
                session.user_id, permission
            )
            if not allowed:
                raise HTTPException(status_code=403, detail=reason)
            return await func(*args, session=session, **kwargs)
        return wrapper
    return decorator


def require_patient_access(action: str):
    """Decorator to check patient-level access via ABAC"""
    def decorator(func):
        @wraps(func)
        async def wrapper(
            patient_id: str,
            *args,
            session: Session = Depends(get_current_session),
            request: Request = None,
            **kwargs
        ):
            # Get user info
            user = rbac_service.users.get(session.user_id)
            if not user:
                raise HTTPException(status_code=401, detail="User not found")
            
            # Get patient/resource info (from database in production)
            resource = await get_patient_resource(patient_id)
            
            # Check break-glass first
            break_glass = break_glass_service.check_break_glass_access(
                session.user_id, patient_id
            )
            
            context = {
                "break_glass_active": break_glass is not None,
            }
            
            # Check ABAC
            allowed, reason, policies = abac_engine.check_access(
                user=user.__dict__,
                resource=resource,
                action=action,
                context=context,
            )
            
            if not allowed:
                raise HTTPException(status_code=403, detail=reason)
            
            # Log access if break-glass
            if break_glass:
                break_glass_service.log_resource_access(
                    break_glass.session_id,
                    "patient_record",
                    patient_id,
                )
            
            return await func(patient_id, *args, session=session, **kwargs)
        return wrapper
    return decorator


# Example endpoints
@app.get("/api/patients/{patient_id}")
@require_patient_access("read")
async def get_patient(patient_id: str, session: Session = Depends(get_current_session)):
    """Get patient record with access control"""
    # Implementation
    return {"patient_id": patient_id, "data": "..."}


@app.post("/api/break-glass")
async def initiate_break_glass(
    patient_id: str,
    reason: BreakGlassReason,
    reason_detail: str = "",
    session: Session = Depends(get_current_session),
    request: Request = None,
):
    """Initiate break-glass emergency access"""
    user = rbac_service.users.get(session.user_id)
    
    success, message, bg_session = break_glass_service.initiate_break_glass(
        user_id=session.user_id,
        user_roles=user.roles,
        patient_id=patient_id,
        reason=reason,
        reason_detail=reason_detail,
        location=request.headers.get("X-Location", ""),
        ip_address=request.client.host,
    )
    
    if not success:
        raise HTTPException(status_code=403, detail=message)
    
    return {"session_id": bg_session.session_id, "expires_at": bg_session.expires_at}

Key Takeaways

Least Privilege

Every user gets minimum access needed. No exceptions.

Defense in Depth

RBAC + ABAC + session controls = comprehensive access control.

Break-Glass is Required

HIPAA requires emergency access procedures. Build them thoughtfully.

Audit Everything

Log every access decision for HIPAA compliance and forensics.

Next Steps