Services Module
The services module provides cross-cutting functionality for authentication and authorization including M2M token detection, rate limiting, audit logging, and metrics collection.
M2M Token Detection
The M2M (Machine-to-Machine) Token Detector helps identify service account tokens versus human user tokens.
Overview
M2M authentication is commonly used for:
Service-to-service communication
API integrations
Background jobs and automated processes
Microservice authentication
The detector analyzes JWT claims to determine if a token represents a service account or a human user.
M2MTokenDetector Class
- class auth_middleware.services.M2MTokenDetector
Static utility class for detecting M2M tokens.
- static is_m2m_token(claims: dict) bool
Determines if JWT token represents M2M authentication.
Detection Criteria:
Missing
cognito:usernameclaimtoken_useis “access” (not “id”)Has
client_idclaimMissing typical user claims (email, name, etc.)
- Parameters:
claims – JWT token claims dictionary
- Returns:
True if token is M2M, False otherwise
Example:
from auth_middleware.services import M2MTokenDetector claims = { "token_use": "access", "client_id": "service-account-123", "scope": "api/read api/write" } is_m2m = M2MTokenDetector.is_m2m_token(claims) # Returns: True
- static get_client_id(claims: dict) str | None
Extracts client ID from token claims.
- Parameters:
claims – JWT token claims dictionary
- Returns:
Client ID or None if not found
Example:
client_id = M2MTokenDetector.get_client_id(claims) # Returns: "service-account-123"
- static get_token_metadata(claims: dict) dict
Extracts comprehensive metadata from token.
- Returns:
Dictionary with token_use, client_id, scopes, and user context
Example:
metadata = M2MTokenDetector.get_token_metadata(claims) # Returns: { # "token_use": "access", # "client_id": "service-account-123", # "scopes": ["api/read", "api/write"], # "has_user_context": False # }
Usage with User Model
The User model includes M2M detection fields:
from fastapi import FastAPI, Depends, Request, HTTPException
from auth_middleware.functions import require_user
app = FastAPI()
@app.get("/data", dependencies=[Depends(require_user())])
async def get_data(request: Request):
user = request.state.current_user
if user.is_m2m:
# Service account access
return {
"message": "Service account authenticated",
"client_id": user.client_id,
"service_id": user.id
}
else:
# Human user access
return {
"message": f"Hello {user.email}",
"user_id": user.id
}
@app.get("/human-only", dependencies=[Depends(require_user())])
async def human_only(request: Request):
user = request.state.current_user
if user.is_m2m:
raise HTTPException(
status_code=403,
detail="This endpoint requires human user authentication"
)
return {"message": "Human user access granted"}
Rate Limiting
Built-in rate limiting helps protect endpoints from abuse and excessive usage.
RateLimiter Class
- class auth_middleware.services.RateLimiter
Sliding window rate limiter with async support.
- __init__(max_requests: int, window_seconds: int)
Initialize rate limiter.
- Parameters:
max_requests – Maximum requests allowed in window
window_seconds – Time window in seconds
Example:
from auth_middleware.services import RateLimiter # 100 requests per minute limiter = RateLimiter(max_requests=100, window_seconds=60)
- async is_allowed(identifier: str) -> bool
Check if request is allowed for identifier.
- Parameters:
identifier – Unique identifier (user ID, IP, etc.)
- Returns:
True if request is allowed
Example:
allowed = await limiter.is_allowed("user-123") if not allowed: # Rate limit exceeded raise HTTPException(status_code=429)
- async get_remaining(identifier: str) -> int
Get remaining requests allowed in current window.
- Parameters:
identifier – Unique identifier
- Returns:
Number of remaining requests
- async reset(identifier: str)
Reset rate limit for specific identifier.
- Parameters:
identifier – Unique identifier to reset
- async clear_all()
Clear all rate limit data.
rate_limit Decorator
- auth_middleware.services.rate_limit(max_requests: int, window_seconds: int, identifier: Callable = None)
Decorator for endpoint rate limiting.
- Parameters:
max_requests – Maximum requests allowed
window_seconds – Time window in seconds
identifier – Optional function to extract identifier from request
Example:
from auth_middleware.services import rate_limit @app.get("/api/data") @rate_limit(max_requests=10, window_seconds=60) async def get_data(): return {"data": "value"} # Custom identifier @app.get("/api/user-data") @rate_limit( max_requests=50, window_seconds=3600, identifier=lambda req: req.state.current_user.id ) async def get_user_data(request: Request): return {"data": "user specific"}
Manual Rate Limiting
For fine-grained control:
from auth_middleware.services import RateLimiter
from fastapi import Response
api_limiter = RateLimiter(max_requests=100, window_seconds=60)
@app.get("/api/custom")
async def custom_endpoint(request: Request, response: Response):
client_id = request.client.host
if not await api_limiter.is_allowed(client_id):
remaining = await api_limiter.get_remaining(client_id)
response.headers["X-RateLimit-Remaining"] = str(remaining)
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Add rate limit headers
remaining = await api_limiter.get_remaining(client_id)
response.headers["X-RateLimit-Limit"] = "100"
response.headers["X-RateLimit-Remaining"] = str(remaining)
return {"data": "success"}
Audit Logging
Comprehensive audit logging for security and compliance requirements.
AuditEvent Class
AuditLogger Class
- class auth_middleware.services.AuditLogger
Service for logging audit events.
- __init__(log_to_console: bool = True, log_callback: Callable = None)
Initialize audit logger.
- Parameters:
log_to_console – Whether to log to console
log_callback – Optional callback function for custom logging
Example:
from auth_middleware.services import AuditLogger def send_to_cloudwatch(event): # Send to external logging service pass audit = AuditLogger( log_to_console=True, log_callback=send_to_cloudwatch )
- log(event: AuditEvent)
Log an audit event.
AuditMiddleware Class
- class auth_middleware.services.AuditMiddleware
Middleware for automatic request auditing.
from fastapi import FastAPI from auth_middleware.services import AuditMiddleware, AuditLogger app = FastAPI() # Add audit middleware app.add_middleware( AuditMiddleware, enabled=True, audit_logger=AuditLogger(), exclude_paths=["/health", "/metrics"] )
Manual Audit Logging
For specific operations:
from auth_middleware.services import AuditLogger, AuditEvent
audit_logger = AuditLogger()
@app.post("/sensitive-operation")
async def sensitive_op(request: Request, data: dict):
user = request.state.current_user
# Log operation start
event = AuditEvent(
event_type="sensitive_operation",
user_id=user.id,
is_m2m=user.is_m2m,
path="/sensitive-operation",
operation="data_modification",
data_type=data.get("type")
)
audit_logger.log(event)
# Perform operation
result = process_data(data)
# Log success
audit_logger.log_auth_success(
user_id=user.id,
operation="data_modification",
result="success"
)
return result
Metrics Collection
Monitor authentication performance and track success rates.
MetricsCollector Class
- class auth_middleware.services.MetricsCollector
Collector for authentication metrics.
- __init__()
Initialize metrics collector.
Example:
from auth_middleware.services import MetricsCollector metrics = MetricsCollector()
- async record_validation_success(duration_ms: float)
Record successful token validation.
- Parameters:
duration_ms – Validation duration in milliseconds
- async record_validation_failure(error_type: str, duration_ms: float)
Record failed validation.
- Parameters:
error_type – Type/category of error
duration_ms – Validation duration in milliseconds
- async get_metrics() -> dict
Get current metrics snapshot.
Returns:
{ "uptime_seconds": 3600.5, "tokens_validated": 10000, "tokens_failed": 150, "total_tokens": 10150, "success_rate": 98.52, "validation_time_avg_ms": 25.3, "validation_time_p95_ms": 45.7, "validation_time_p99_ms": 89.2, "errors_by_type": { "expired_token": 100, "invalid_signature": 50 } }
- async reset()
Reset all metrics to initial state.
Usage Example
from fastapi import FastAPI, Request
from auth_middleware.services import MetricsCollector
import time
app = FastAPI()
metrics = MetricsCollector()
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start = time.time()
try:
response = await call_next(request)
duration_ms = (time.time() - start) * 1000
if hasattr(request.state, "current_user"):
await metrics.record_validation_success(duration_ms)
elif response.status_code == 401:
await metrics.record_validation_failure("unauthorized", duration_ms)
return response
except Exception as e:
duration_ms = (time.time() - start) * 1000
await metrics.record_validation_failure(type(e).__name__, duration_ms)
raise
@app.get("/metrics")
async def get_metrics():
return await metrics.get_metrics()
@app.get("/metrics/prometheus")
async def prometheus_metrics():
snapshot = await metrics.get_metrics()
return f"""
# HELP auth_tokens_validated_total Total validated tokens
# TYPE auth_tokens_validated_total counter
auth_tokens_validated_total {snapshot['tokens_validated']}
# HELP auth_success_rate Authentication success rate
# TYPE auth_success_rate gauge
auth_success_rate {snapshot['success_rate']}
# HELP auth_validation_duration_avg_ms Average validation time
# TYPE auth_validation_duration_avg_ms gauge
auth_validation_duration_avg_ms {snapshot['validation_time_avg_ms']}
"""
Best Practices
M2M Token Detection
Validate M2M tokens differently - Service accounts may not have email/name
Use client_id for identification - Don’t rely on user_id for M2M
Separate permissions - M2M tokens may need different scopes
Block M2M from user endpoints - Verify
is_m2mflag for user-only operations
Rate Limiting
Choose appropriate limits - Balance security and user experience
Use per-user limits - More accurate than IP-based
Return rate limit headers - Help clients manage their requests
Monitor limit hits - Adjust limits based on usage patterns
Audit Logging
Log security-relevant events - Authentication, authorization, sensitive operations
Include context - User ID, IP, timestamp, action details
Protect audit logs - Ensure logs cannot be tampered with
Regular review - Monitor logs for suspicious activity
Compliance - Ensure logging meets regulatory requirements
Metrics Collection
Track key metrics - Success rate, latency, error types
Set up alerts - Monitor for anomalies
Export to monitoring systems - Prometheus, CloudWatch, Datadog
Analyze trends - Identify performance issues early
Capacity planning - Use metrics to predict growth
See Also
jwt_auth_middleware - Main middleware documentation
Authentication Functions - Authorization dependency functions
The User Property - User model lazy loading
JWKS Cache Strategies - JWKS caching strategies
Example Code
Complete examples can be found in the repository:
examples/m2m_detection_example.py- M2M token detectionexamples/rate_limiting_example.py- Rate limiting patternsexamples/audit_example.py- Audit logging integrationexamples/metrics_example.py- Metrics collection