Senior Level: Security is non-negotiable at senior levels. You’ll be expected to design systems that are secure by default and can explain authentication, authorization, and common attack mitigations.
Authentication vs Authorization
Copy
┌─────────────────────────────────────────────────────────────────┐
│ AuthN vs AuthZ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ AUTHENTICATION (AuthN) AUTHORIZATION (AuthZ) │
│ ────────────────────── ───────────────────── │
│ "WHO are you?" "WHAT can you do?" │
│ │
│ Verifies identity Controls access │
│ Happens first Happens after AuthN │
│ │
│ Methods: Methods: │
│ • Password • RBAC (Role-Based) │
│ • OAuth/OIDC • ABAC (Attribute-Based) │
│ • API Keys • ACL (Access Control List) │
│ • mTLS • Policy (e.g., OPA) │
│ │
│ Example Flow: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ User: "I'm [email protected], password: ****" │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ AuthN: Verify credentials → Identity confirmed │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ User: "I want to DELETE /orders/123" │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ AuthZ: Does alice have 'orders:delete' permission? │ │
│ │ Alice's role = 'viewer' → Denied │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Token-Based Authentication
JWT (JSON Web Token)
Copy
┌─────────────────────────────────────────────────────────────────┐
│ JWT Structure │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Header.Payload.Signature │
│ ────────────────────────── │
│ │
│ HEADER (Algorithm + Type): │
│ { │
│ "alg": "RS256", │
│ "typ": "JWT" │
│ } │
│ Base64: eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9 │
│ │
│ PAYLOAD (Claims): │
│ { │
│ "sub": "usr_123", // Subject (user ID) │
│ "iat": 1699900000, // Issued at │
│ "exp": 1699903600, // Expires (1 hour) │
│ "iss": "auth.example.com", // Issuer │
│ "aud": "api.example.com", // Audience │
│ "roles": ["user", "admin"] // Custom claims │
│ } │
│ Base64: eyJzdWIiOiJ1c3JfMTIzIiwiaWF0IjoxNjk5OTAwMDAw... │
│ │
│ SIGNATURE: │
│ RS256( │
│ base64(header) + "." + base64(payload), │
│ privateKey │
│ ) │
│ Base64: SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c │
│ │
│ FULL TOKEN: │
│ eyJhbGci...eyJzdWIi...SflKxw... │
│ │
└─────────────────────────────────────────────────────────────────┘
Access Token vs Refresh Token
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Token Types │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ACCESS TOKEN REFRESH TOKEN │
│ ──────────── ───────────── │
│ Short-lived (15 min - 1 hour) Long-lived (days - weeks) │
│ Sent with every request Stored securely │
│ Stateless validation Stateful (can revoke) │
│ Contains permissions Just an opaque ID │
│ │
│ TOKEN FLOW: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. Login with credentials │ │
│ │ Client → Auth Server │ │
│ │ ← Access Token (15min) + Refresh Token (7 days) │ │
│ │ │ │
│ │ 2. API calls │ │
│ │ Client → API Server │ │
│ │ Header: Authorization: Bearer <access_token> │ │
│ │ │ │
│ │ 3. Access token expires │ │
│ │ API returns 401 Unauthorized │ │
│ │ │ │
│ │ 4. Refresh token │ │
│ │ Client → Auth Server (with refresh token) │ │
│ │ ← New Access Token + New Refresh Token │ │
│ │ │ │
│ │ 5. Logout / Revoke │ │
│ │ Delete refresh token from server's allowlist │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ SECURITY: │
│ • Access token: Can't revoke (short expiry is the mitigation) │
│ • Refresh token: Can revoke (check against DB/Redis) │
│ • Refresh token rotation: Issue new refresh on each use │
│ │
└─────────────────────────────────────────────────────────────────┘
Implementation
Copy
import jwt
from datetime import datetime, timedelta
from typing import Optional
class TokenService:
def __init__(self, private_key: str, public_key: str):
self.private_key = private_key
self.public_key = public_key
self.access_token_ttl = timedelta(minutes=15)
self.refresh_token_ttl = timedelta(days=7)
def create_tokens(self, user_id: str, roles: list) -> dict:
"""Create access and refresh tokens"""
now = datetime.utcnow()
# Access token with claims
access_payload = {
"sub": user_id,
"roles": roles,
"type": "access",
"iat": now,
"exp": now + self.access_token_ttl,
"iss": "auth.example.com"
}
access_token = jwt.encode(
access_payload,
self.private_key,
algorithm="RS256"
)
# Refresh token (minimal claims)
refresh_payload = {
"sub": user_id,
"type": "refresh",
"jti": str(uuid.uuid4()), # Unique ID for revocation
"iat": now,
"exp": now + self.refresh_token_ttl
}
refresh_token = jwt.encode(
refresh_payload,
self.private_key,
algorithm="RS256"
)
# Store refresh token JTI in Redis for revocation
self.redis.setex(
f"refresh:{refresh_payload['jti']}",
self.refresh_token_ttl,
user_id
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": self.access_token_ttl.seconds
}
def verify_access_token(self, token: str) -> dict:
"""Verify and decode access token"""
try:
payload = jwt.decode(
token,
self.public_key,
algorithms=["RS256"],
options={"require": ["sub", "exp", "roles"]}
)
if payload.get("type") != "access":
raise jwt.InvalidTokenError("Invalid token type")
return payload
except jwt.ExpiredSignatureError:
raise AuthError("Token expired")
except jwt.InvalidTokenError as e:
raise AuthError(f"Invalid token: {e}")
def refresh_tokens(self, refresh_token: str) -> dict:
"""Exchange refresh token for new tokens"""
try:
payload = jwt.decode(
refresh_token,
self.public_key,
algorithms=["RS256"]
)
# Check if revoked
jti = payload.get("jti")
if not self.redis.exists(f"refresh:{jti}"):
raise AuthError("Token revoked")
# Revoke old refresh token
self.redis.delete(f"refresh:{jti}")
# Get user and issue new tokens
user = self.get_user(payload["sub"])
return self.create_tokens(user.id, user.roles)
except jwt.InvalidTokenError:
raise AuthError("Invalid refresh token")
OAuth 2.0 / OpenID Connect
Copy
┌─────────────────────────────────────────────────────────────────┐
│ OAuth 2.0 Authorization Code Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────────┐ ┌──────────────┐ │
│ │ User │ │ Client │ │ Auth Server │ │
│ │ Browser │ │ (App) │ │ (Google/etc) │ │
│ └────┬────┘ └──────┬──────┘ └──────┬───────┘ │
│ │ │ │ │
│ │ 1. Click "Login │ │ │
│ │ with Google" │ │ │
│ │────────────────────>│ │ │
│ │ │ │ │
│ │ 2. Redirect to auth │ │ │
│ │<────────────────────│ │ │
│ │ │ │ │
│ │ 3. Login & Consent │ │ │
│ │─────────────────────────────────────────────> │
│ │ │ │ │
│ │ 4. Redirect with authorization code │ │
│ │<───────────────────────────────────────────── │
│ │ ?code=abc123&state=xyz │ │
│ │ │ │ │
│ │────────────────────>│ │ │
│ │ │ │ │
│ │ │ 5. Exchange code │ │
│ │ │ for tokens │ │
│ │ │──────────────────────>│ │
│ │ │ │ │
│ │ │ 6. Access + ID Token │ │
│ │ │<──────────────────────│ │
│ │ │ │ │
│ │ 7. Logged in! │ │ │
│ │<────────────────────│ │ │
│ │ │ │ │
└─────────────────────────────────────────────────────────────────┘
Key Points:
• Authorization code is short-lived, exchanged server-to-server
• Access token never exposed to browser (secure)
• ID Token contains user info (OpenID Connect extension)
• State parameter prevents CSRF attacks
Role-Based Access Control (RBAC)
- Python
- JavaScript
Copy
from enum import Enum
from functools import wraps
class Permission(Enum):
# Resource: Action
ORDERS_READ = "orders:read"
ORDERS_CREATE = "orders:create"
ORDERS_UPDATE = "orders:update"
ORDERS_DELETE = "orders:delete"
USERS_READ = "users:read"
USERS_CREATE = "users:create"
USERS_UPDATE = "users:update"
USERS_DELETE = "users:delete"
ADMIN_PANEL = "admin:access"
# Role definitions
ROLES = {
"viewer": {
Permission.ORDERS_READ,
Permission.USERS_READ
},
"editor": {
Permission.ORDERS_READ,
Permission.ORDERS_CREATE,
Permission.ORDERS_UPDATE,
Permission.USERS_READ
},
"admin": {
Permission.ORDERS_READ,
Permission.ORDERS_CREATE,
Permission.ORDERS_UPDATE,
Permission.ORDERS_DELETE,
Permission.USERS_READ,
Permission.USERS_CREATE,
Permission.USERS_UPDATE,
Permission.USERS_DELETE,
Permission.ADMIN_PANEL
}
}
def has_permission(user, permission: Permission) -> bool:
"""Check if user has specific permission"""
user_permissions = set()
for role in user.roles:
user_permissions.update(ROLES.get(role, set()))
return permission in user_permissions
def require_permission(permission: Permission):
"""Decorator for authorization"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request') or args[0]
user = request.state.user
if not has_permission(user, permission):
raise HTTPException(
status_code=403,
detail=f"Missing permission: {permission.value}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
# Usage
@app.delete("/orders/{order_id}")
@require_permission(Permission.ORDERS_DELETE)
async def delete_order(request: Request, order_id: str):
# Only users with 'orders:delete' permission can access
await order_service.delete(order_id)
return {"status": "deleted"}
Copy
const crypto = require('crypto');
const jwt = require('jsonwebtoken');
// ============== Permission System ==============
const Permission = {
ORDERS_READ: 'orders:read',
ORDERS_CREATE: 'orders:create',
ORDERS_UPDATE: 'orders:update',
ORDERS_DELETE: 'orders:delete',
USERS_READ: 'users:read',
USERS_CREATE: 'users:create',
USERS_UPDATE: 'users:update',
USERS_DELETE: 'users:delete',
ADMIN_PANEL: 'admin:access'
};
const ROLES = {
viewer: new Set([
Permission.ORDERS_READ,
Permission.USERS_READ
]),
editor: new Set([
Permission.ORDERS_READ,
Permission.ORDERS_CREATE,
Permission.ORDERS_UPDATE,
Permission.USERS_READ
]),
admin: new Set([
Permission.ORDERS_READ,
Permission.ORDERS_CREATE,
Permission.ORDERS_UPDATE,
Permission.ORDERS_DELETE,
Permission.USERS_READ,
Permission.USERS_CREATE,
Permission.USERS_UPDATE,
Permission.USERS_DELETE,
Permission.ADMIN_PANEL
])
};
function hasPermission(user, permission) {
const userPermissions = new Set();
for (const role of user.roles || []) {
const rolePermissions = ROLES[role] || new Set();
for (const perm of rolePermissions) {
userPermissions.add(perm);
}
}
return userPermissions.has(permission);
}
function requirePermission(permission) {
return (req, res, next) => {
const user = req.user;
if (!user) {
return res.status(401).json({ error: 'Unauthorized' });
}
if (!hasPermission(user, permission)) {
return res.status(403).json({
error: `Missing permission: ${permission}`
});
}
next();
};
}
// ============== Token Service ==============
class TokenService {
constructor(privateKey, publicKey) {
this.privateKey = privateKey;
this.publicKey = publicKey;
this.accessTokenTtl = 15 * 60; // 15 minutes
this.refreshTokenTtl = 7 * 24 * 60 * 60; // 7 days
}
createTokens(userId, roles) {
const now = Math.floor(Date.now() / 1000);
// Access token
const accessPayload = {
sub: userId,
roles,
type: 'access',
iat: now,
exp: now + this.accessTokenTtl,
iss: 'auth.example.com'
};
const accessToken = jwt.sign(accessPayload, this.privateKey, {
algorithm: 'RS256'
});
// Refresh token
const jti = crypto.randomUUID();
const refreshPayload = {
sub: userId,
type: 'refresh',
jti,
iat: now,
exp: now + this.refreshTokenTtl
};
const refreshToken = jwt.sign(refreshPayload, this.privateKey, {
algorithm: 'RS256'
});
return {
accessToken,
refreshToken,
expiresIn: this.accessTokenTtl,
jti
};
}
verifyAccessToken(token) {
try {
const payload = jwt.verify(token, this.publicKey, {
algorithms: ['RS256']
});
if (payload.type !== 'access') {
throw new Error('Invalid token type');
}
return payload;
} catch (error) {
throw new AuthError(error.message);
}
}
verifyRefreshToken(token) {
try {
return jwt.verify(token, this.publicKey, {
algorithms: ['RS256']
});
} catch (error) {
throw new AuthError('Invalid refresh token');
}
}
}
// ============== Password Hashing ==============
const bcrypt = require('bcrypt');
class PasswordService {
constructor(saltRounds = 12) {
this.saltRounds = saltRounds;
}
async hash(password) {
return bcrypt.hash(password, this.saltRounds);
}
async verify(password, hash) {
return bcrypt.compare(password, hash);
}
validateStrength(password) {
const checks = {
length: password.length >= 8,
uppercase: /[A-Z]/.test(password),
lowercase: /[a-z]/.test(password),
number: /[0-9]/.test(password),
special: /[!@#$%^&*(),.?":{}|<>]/.test(password)
};
const passed = Object.values(checks).filter(Boolean).length;
return {
valid: passed >= 4,
score: passed,
checks
};
}
}
// ============== Express Middleware ==============
function authMiddleware(tokenService, redis) {
return async (req, res, next) => {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.status(401).json({ error: 'Missing token' });
}
const token = authHeader.substring(7);
try {
// Check if token is blacklisted
const isBlacklisted = await redis.exists(`blacklist:${token}`);
if (isBlacklisted) {
return res.status(401).json({ error: 'Token revoked' });
}
const payload = tokenService.verifyAccessToken(token);
req.user = {
id: payload.sub,
roles: payload.roles
};
next();
} catch (error) {
return res.status(401).json({ error: error.message });
}
};
}
// Usage
const app = require('express')();
const tokenService = new TokenService(privateKey, publicKey);
app.use('/api', authMiddleware(tokenService, redis));
app.delete('/api/orders/:id',
requirePermission(Permission.ORDERS_DELETE),
async (req, res) => {
await orderService.delete(req.params.id);
res.json({ status: 'deleted' });
}
);
module.exports = {
Permission,
ROLES,
hasPermission,
requirePermission,
TokenService,
PasswordService,
authMiddleware
};
API Security
API Key vs OAuth
Copy
┌─────────────────────────────────────────────────────────────────┐
│ API Key vs OAuth │
├─────────────────────────────────────────────────────────────────┤
│ │
│ API KEY OAUTH TOKEN │
│ ─────── ─────────── │
│ Simple, long-lived Complex, short-lived │
│ Server-to-server User delegation │
│ All-or-nothing access Scoped permissions │
│ Easy to leak Rotates automatically │
│ │
│ USE API KEY FOR: USE OAUTH FOR: │
│ • Server-to-server APIs • User-facing apps │
│ • Public APIs with rate limits • Third-party integrations │
│ • Simple integrations • Fine-grained permissions │
│ │
│ BEST PRACTICES: │
│ • Prefix keys: sk_live_xxx (Stripe style) │
│ • Hash before storing │
│ • Rate limit per key │
│ • Log usage for audit │
│ │
└─────────────────────────────────────────────────────────────────┘
Rate Limiting
Copy
import time
from typing import Tuple
class RateLimiter:
"""
Sliding window rate limiter
"""
def __init__(self, redis_client, requests_per_minute: int = 60):
self.redis = redis_client
self.limit = requests_per_minute
self.window = 60 # seconds
def is_allowed(self, key: str) -> Tuple[bool, dict]:
"""
Check if request is allowed
Returns (allowed, rate_limit_info)
"""
now = time.time()
window_start = now - self.window
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current requests
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, self.window)
results = pipe.execute()
request_count = results[1]
allowed = request_count < self.limit
return allowed, {
"X-RateLimit-Limit": str(self.limit),
"X-RateLimit-Remaining": str(max(0, self.limit - request_count - 1)),
"X-RateLimit-Reset": str(int(now + self.window))
}
# Middleware
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Use API key or IP as identifier
identifier = request.headers.get("X-API-Key") or request.client.host
key = f"rate_limit:{identifier}"
allowed, headers = rate_limiter.is_allowed(key)
if not allowed:
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"},
headers=headers
)
response = await call_next(request)
# Add rate limit headers
for header, value in headers.items():
response.headers[header] = value
return response
Common Security Vulnerabilities
Copy
┌─────────────────────────────────────────────────────────────────┐
│ OWASP Top 10 Mitigations │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. INJECTION (SQL, NoSQL, Command) │
│ + Use parameterized queries / ORMs │
│ + Validate and sanitize input │
│ - Never concatenate user input into queries │
│ │
│ 2. BROKEN AUTHENTICATION │
│ + Use strong password hashing (bcrypt, Argon2) │
│ + Implement MFA │
│ + Rate limit login attempts │
│ + Secure session management │
│ │
│ 3. SENSITIVE DATA EXPOSURE │
│ + Encrypt data at rest (AES-256) │
│ + Encrypt data in transit (TLS 1.3) │
│ + Don't log sensitive data │
│ + Hash + salt passwords │
│ │
│ 4. XML EXTERNAL ENTITIES (XXE) │
│ + Disable external entity processing │
│ + Use JSON instead of XML │
│ │
│ 5. BROKEN ACCESS CONTROL │
│ + Deny by default │
│ + Server-side authorization │
│ + Check ownership (can user X access resource Y?) │
│ │
│ 6. SECURITY MISCONFIGURATION │
│ + Remove default credentials │
│ + Disable unnecessary features │
│ + Keep software updated │
│ + Secure headers (CSP, HSTS, etc.) │
│ │
│ 7. CROSS-SITE SCRIPTING (XSS) │
│ + Escape output (HTML encode) │
│ + Content Security Policy header │
│ + HttpOnly cookies │
│ │
│ 8. INSECURE DESERIALIZATION │
│ + Don't deserialize untrusted data │
│ + Use signed serialized objects │
│ │
│ 9. USING COMPONENTS WITH KNOWN VULNERABILITIES │
│ + Keep dependencies updated │
│ + Scan with tools (Snyk, Dependabot) │
│ │
│ 10. INSUFFICIENT LOGGING & MONITORING │
│ + Log security events │
│ + Alert on suspicious activity │
│ + Audit trail for sensitive operations │
│ │
└─────────────────────────────────────────────────────────────────┘
Secure Communication
mTLS (Mutual TLS)
Copy
┌─────────────────────────────────────────────────────────────────┐
│ mTLS for Service-to-Service │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Regular TLS: Server proves identity to client │
│ ┌─────────┐ ┌─────────┐ │
│ │ Client │ ─── HTTPS ────────│ Server │ │
│ │ │ Server cert ✓ │ │ │
│ └─────────┘ └─────────┘ │
│ │
│ mTLS: Both prove identity │
│ ┌─────────┐ ┌─────────┐ │
│ │ Service │ ←── Server cert ──│ Service │ │
│ │ A │ ─── Client cert ──│ B │ │
│ └─────────┘ Both verified └─────────┘ │
│ │
│ USE CASE: Microservices in zero-trust network │
│ • Each service has its own certificate │
│ • Only services with valid certs can communicate │
│ • Handled by service mesh (Istio, Linkerd) │
│ │
│ CERTIFICATE MANAGEMENT: │
│ • Short-lived certs (hours, not years) │
│ • Automatic rotation │
│ • Service mesh handles transparently │
│ │
└─────────────────────────────────────────────────────────────────┘
Secrets Management
Copy
# DON'T: Hardcode secrets
API_KEY = "sk_live_abc123" # Never do this
# DON'T: Environment variables for production
import os
API_KEY = os.environ["API_KEY"] # OK for dev, not production
# DO: Use secrets manager
from aws_secrets_manager import get_secret
class SecretsManager:
"""
Centralized secrets management with caching
"""
def __init__(self):
self.cache = {}
self.cache_ttl = 300 # 5 minutes
def get_secret(self, secret_name: str) -> str:
"""
Retrieve secret from AWS Secrets Manager
with local caching
"""
cached = self.cache.get(secret_name)
if cached and cached['expires'] > time.time():
return cached['value']
# Fetch from secrets manager
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId=secret_name)
value = response['SecretString']
# Cache locally
self.cache[secret_name] = {
'value': value,
'expires': time.time() + self.cache_ttl
}
return value
# Best practices:
# 1. Never log secrets
# 2. Rotate regularly
# 3. Least privilege access
# 4. Audit access to secrets
# 5. Use managed solutions (AWS Secrets Manager, Vault, etc.)
Senior Interview Questions
How would you design authentication for a multi-tenant SaaS?
How would you design authentication for a multi-tenant SaaS?
Key considerations:Implementation:
- Isolation: Tenant data must be completely isolated
- SSO: Enterprise customers want their own IdP
- Roles: Per-tenant roles (admin of tenant A ≠ admin of tenant B)
Copy
JWT Payload:
{
"sub": "user_123",
"tenant_id": "tenant_abc",
"roles": ["admin"], // Roles within this tenant
"org_ids": ["org_1", "org_2"] // Sub-organizations
}
- Every DB query includes tenant_id
- Middleware validates tenant_id matches token
- Support SAML/OIDC for enterprise SSO
- Separate encryption keys per tenant (optional)
How do you handle secret rotation without downtime?
How do you handle secret rotation without downtime?
Strategy: Dual-read, Single-write
- Add new secret (old still works)
- Deploy services to accept both old and new
- Switch write to new secret
- Verify all traffic uses new secret
- Revoke old secret
Copy
def validate_api_key(key: str) -> bool:
# Check against both old and new
current_key = secrets.get("api_key_current")
previous_key = secrets.get("api_key_previous")
return key == current_key or key == previous_key
How would you secure inter-service communication?
How would you secure inter-service communication?
Layers of security:
- Network: VPC/private network (services not internet-accessible)
- Transport: mTLS (mutual authentication)
- Application: JWT tokens with service identity
- Authorization: Service-level permissions
- Automatic mTLS between all services
- Policy-based access control
- No code changes needed
- Traffic encryption by default
Design a rate limiter that's fair and abuse-resistant
Design a rate limiter that's fair and abuse-resistant
Multi-layer approach:
- Per-IP: Prevent anonymous abuse
- Per-User: Fair usage for authenticated users
- Per-API-Key: For API consumers
- Global: Protect overall system
- Token bucket: Allows bursts, smooths traffic
- Sliding window: More accurate than fixed windows
- Weighted limits: Premium tiers get more
- Priority queues: Critical requests bypass limits
- Fingerprinting (detect rotating IPs)
- Behavioral analysis (bot detection)
- CAPTCHA after threshold
- Progressive delays (slow down, don’t block)