from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional, Any
import hashlib
import json
class AuditAction(str, Enum):
QUERY = "query"
COMPLETION = "completion"
MODERATION = "moderation"
CONTENT_FILTER = "content_filter"
DATA_ACCESS = "data_access"
CONFIG_CHANGE = "config_change"
@dataclass
class AuditEntry:
timestamp: str
action: AuditAction
user_id: str
resource: str
details: dict
ip_address: Optional[str] = None
user_agent: Optional[str] = None
content_hash: Optional[str] = None # Hash of content for verification
class AuditLogger:
"""Audit logging for compliance and security"""
def __init__(self, storage_backend):
self.storage = storage_backend
def _hash_content(self, content: str) -> str:
"""Create SHA-256 hash of content"""
return hashlib.sha256(content.encode()).hexdigest()
def log_query(
self,
user_id: str,
query: str,
model: str,
response_preview: str = None,
ip_address: str = None,
user_agent: str = None
):
"""Log user query"""
entry = AuditEntry(
timestamp=datetime.utcnow().isoformat(),
action=AuditAction.QUERY,
user_id=user_id,
resource=model,
details={
"query_length": len(query),
"response_preview": response_preview[:100] if response_preview else None
},
ip_address=ip_address,
user_agent=user_agent,
content_hash=self._hash_content(query)
)
self.storage.write(entry)
def log_content_filter(
self,
user_id: str,
content: str,
filter_result: dict,
action_taken: str
):
"""Log content filter activation"""
entry = AuditEntry(
timestamp=datetime.utcnow().isoformat(),
action=AuditAction.CONTENT_FILTER,
user_id=user_id,
resource="content_filter",
details={
"content_length": len(content),
"filter_result": filter_result,
"action_taken": action_taken
},
content_hash=self._hash_content(content)
)
self.storage.write(entry)
def log_data_access(
self,
user_id: str,
data_source: str,
query: str,
records_accessed: int
):
"""Log data access for RAG"""
entry = AuditEntry(
timestamp=datetime.utcnow().isoformat(),
action=AuditAction.DATA_ACCESS,
user_id=user_id,
resource=data_source,
details={
"query": query,
"records_accessed": records_accessed
}
)
self.storage.write(entry)
# Storage backends
class FileAuditStorage:
def __init__(self, path: str):
self.path = path
def write(self, entry: AuditEntry):
with open(self.path, "a") as f:
f.write(json.dumps(asdict(entry)) + "\n")
class DatabaseAuditStorage:
def __init__(self, db_connection):
self.db = db_connection
def write(self, entry: AuditEntry):
self.db.execute(
"INSERT INTO audit_logs VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(entry.timestamp, entry.action.value, entry.user_id,
entry.resource, json.dumps(entry.details),
entry.ip_address, entry.user_agent, entry.content_hash)
)