Services Module

The services module provides cross-cutting functionality for authentication and authorization including M2M token detection, rate limiting, audit logging, and metrics collection.

M2M Token Detection

The M2M (Machine-to-Machine) Token Detector helps identify service account tokens versus human user tokens.

Overview

M2M authentication is commonly used for:

  • Service-to-service communication

  • API integrations

  • Background jobs and automated processes

  • Microservice authentication

The detector analyzes JWT claims to determine if a token represents a service account or a human user.

M2MTokenDetector Class

class auth_middleware.services.M2MTokenDetector

Static utility class for detecting M2M tokens.

static is_m2m_token(claims: dict) bool

Determines if JWT token represents M2M authentication.

Detection Criteria:

  • Missing cognito:username claim

  • token_use is “access” (not “id”)

  • Has client_id claim

  • Missing typical user claims (email, name, etc.)

Parameters:

claims – JWT token claims dictionary

Returns:

True if token is M2M, False otherwise

Example:

from auth_middleware.services import M2MTokenDetector

claims = {
    "token_use": "access",
    "client_id": "service-account-123",
    "scope": "api/read api/write"
}

is_m2m = M2MTokenDetector.is_m2m_token(claims)
# Returns: True
static get_client_id(claims: dict) str | None

Extracts client ID from token claims.

Parameters:

claims – JWT token claims dictionary

Returns:

Client ID or None if not found

Example:

client_id = M2MTokenDetector.get_client_id(claims)
# Returns: "service-account-123"
static get_token_metadata(claims: dict) dict

Extracts comprehensive metadata from token.

Returns:

Dictionary with token_use, client_id, scopes, and user context

Example:

metadata = M2MTokenDetector.get_token_metadata(claims)
# Returns: {
#     "token_use": "access",
#     "client_id": "service-account-123",
#     "scopes": ["api/read", "api/write"],
#     "has_user_context": False
# }
static requires_user_context(claims: dict) bool

Checks if token has user context (for user operations).

Parameters:

claims – JWT token claims dictionary

Returns:

True if user context is present

Usage with User Model

The User model includes M2M detection fields:

from fastapi import FastAPI, Depends, Request, HTTPException
from auth_middleware.functions import require_user

app = FastAPI()

@app.get("/data", dependencies=[Depends(require_user())])
async def get_data(request: Request):
    user = request.state.current_user

    if user.is_m2m:
        # Service account access
        return {
            "message": "Service account authenticated",
            "client_id": user.client_id,
            "service_id": user.id
        }
    else:
        # Human user access
        return {
            "message": f"Hello {user.email}",
            "user_id": user.id
        }

@app.get("/human-only", dependencies=[Depends(require_user())])
async def human_only(request: Request):
    user = request.state.current_user

    if user.is_m2m:
        raise HTTPException(
            status_code=403,
            detail="This endpoint requires human user authentication"
        )

    return {"message": "Human user access granted"}

Rate Limiting

Built-in rate limiting helps protect endpoints from abuse and excessive usage.

RateLimiter Class

class auth_middleware.services.RateLimiter

Sliding window rate limiter with async support.

__init__(max_requests: int, window_seconds: int)

Initialize rate limiter.

Parameters:
  • max_requests – Maximum requests allowed in window

  • window_seconds – Time window in seconds

Example:

from auth_middleware.services import RateLimiter

# 100 requests per minute
limiter = RateLimiter(max_requests=100, window_seconds=60)
async is_allowed(identifier: str) -> bool

Check if request is allowed for identifier.

Parameters:

identifier – Unique identifier (user ID, IP, etc.)

Returns:

True if request is allowed

Example:

allowed = await limiter.is_allowed("user-123")
if not allowed:
    # Rate limit exceeded
    raise HTTPException(status_code=429)
async get_remaining(identifier: str) -> int

Get remaining requests allowed in current window.

Parameters:

identifier – Unique identifier

Returns:

Number of remaining requests

async reset(identifier: str)

Reset rate limit for specific identifier.

Parameters:

identifier – Unique identifier to reset

async clear_all()

Clear all rate limit data.

rate_limit Decorator

auth_middleware.services.rate_limit(max_requests: int, window_seconds: int, identifier: Callable = None)

Decorator for endpoint rate limiting.

Parameters:
  • max_requests – Maximum requests allowed

  • window_seconds – Time window in seconds

  • identifier – Optional function to extract identifier from request

Example:

from auth_middleware.services import rate_limit

@app.get("/api/data")
@rate_limit(max_requests=10, window_seconds=60)
async def get_data():
    return {"data": "value"}

# Custom identifier
@app.get("/api/user-data")
@rate_limit(
    max_requests=50,
    window_seconds=3600,
    identifier=lambda req: req.state.current_user.id
)
async def get_user_data(request: Request):
    return {"data": "user specific"}

Manual Rate Limiting

For fine-grained control:

from auth_middleware.services import RateLimiter
from fastapi import Response

api_limiter = RateLimiter(max_requests=100, window_seconds=60)

@app.get("/api/custom")
async def custom_endpoint(request: Request, response: Response):
    client_id = request.client.host

    if not await api_limiter.is_allowed(client_id):
        remaining = await api_limiter.get_remaining(client_id)
        response.headers["X-RateLimit-Remaining"] = str(remaining)
        raise HTTPException(status_code=429, detail="Rate limit exceeded")

    # Add rate limit headers
    remaining = await api_limiter.get_remaining(client_id)
    response.headers["X-RateLimit-Limit"] = "100"
    response.headers["X-RateLimit-Remaining"] = str(remaining)

    return {"data": "success"}

Audit Logging

Comprehensive audit logging for security and compliance requirements.

AuditEvent Class

class auth_middleware.services.AuditEvent

Represents a security audit event.

timestamp: datetime

Event timestamp (UTC)

event_type: str

Event type (auth_success, auth_failure, access_denied, etc.)

user_id: str | None

User or service account ID

client_id: str | None

Client ID for M2M tokens

is_m2m: bool

Whether this is M2M authentication

path: str | None

Request path

method: str | None

HTTP method

status_code: int | None

Response status code

ip_address: str | None

Client IP address

user_agent: str | None

Client user agent

metadata: dict

Additional event-specific data

to_dict() dict

Convert event to dictionary for logging.

to_json() str

Convert event to JSON string.

AuditLogger Class

class auth_middleware.services.AuditLogger

Service for logging audit events.

__init__(log_to_console: bool = True, log_callback: Callable = None)

Initialize audit logger.

Parameters:
  • log_to_console – Whether to log to console

  • log_callback – Optional callback function for custom logging

Example:

from auth_middleware.services import AuditLogger

def send_to_cloudwatch(event):
    # Send to external logging service
    pass

audit = AuditLogger(
    log_to_console=True,
    log_callback=send_to_cloudwatch
)
log(event: AuditEvent)

Log an audit event.

log_auth_success(user_id: str, is_m2m: bool = False, **kwargs)

Log successful authentication.

log_auth_failure(reason: str, **kwargs)

Log failed authentication.

log_access_denied(user_id: str, path: str, reason: str, **kwargs)

Log access denial.

AuditMiddleware Class

class auth_middleware.services.AuditMiddleware

Middleware for automatic request auditing.

from fastapi import FastAPI
from auth_middleware.services import AuditMiddleware, AuditLogger

app = FastAPI()

# Add audit middleware
app.add_middleware(
    AuditMiddleware,
    enabled=True,
    audit_logger=AuditLogger(),
    exclude_paths=["/health", "/metrics"]
)

Manual Audit Logging

For specific operations:

from auth_middleware.services import AuditLogger, AuditEvent

audit_logger = AuditLogger()

@app.post("/sensitive-operation")
async def sensitive_op(request: Request, data: dict):
    user = request.state.current_user

    # Log operation start
    event = AuditEvent(
        event_type="sensitive_operation",
        user_id=user.id,
        is_m2m=user.is_m2m,
        path="/sensitive-operation",
        operation="data_modification",
        data_type=data.get("type")
    )
    audit_logger.log(event)

    # Perform operation
    result = process_data(data)

    # Log success
    audit_logger.log_auth_success(
        user_id=user.id,
        operation="data_modification",
        result="success"
    )

    return result

Metrics Collection

Monitor authentication performance and track success rates.

MetricsCollector Class

class auth_middleware.services.MetricsCollector

Collector for authentication metrics.

__init__()

Initialize metrics collector.

Example:

from auth_middleware.services import MetricsCollector

metrics = MetricsCollector()
async record_validation_success(duration_ms: float)

Record successful token validation.

Parameters:

duration_ms – Validation duration in milliseconds

async record_validation_failure(error_type: str, duration_ms: float)

Record failed validation.

Parameters:
  • error_type – Type/category of error

  • duration_ms – Validation duration in milliseconds

async get_metrics() -> dict

Get current metrics snapshot.

Returns:

{
    "uptime_seconds": 3600.5,
    "tokens_validated": 10000,
    "tokens_failed": 150,
    "total_tokens": 10150,
    "success_rate": 98.52,
    "validation_time_avg_ms": 25.3,
    "validation_time_p95_ms": 45.7,
    "validation_time_p99_ms": 89.2,
    "errors_by_type": {
        "expired_token": 100,
        "invalid_signature": 50
    }
}
async reset()

Reset all metrics to initial state.

Usage Example

from fastapi import FastAPI, Request
from auth_middleware.services import MetricsCollector
import time

app = FastAPI()
metrics = MetricsCollector()

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start = time.time()

    try:
        response = await call_next(request)
        duration_ms = (time.time() - start) * 1000

        if hasattr(request.state, "current_user"):
            await metrics.record_validation_success(duration_ms)
        elif response.status_code == 401:
            await metrics.record_validation_failure("unauthorized", duration_ms)

        return response
    except Exception as e:
        duration_ms = (time.time() - start) * 1000
        await metrics.record_validation_failure(type(e).__name__, duration_ms)
        raise

@app.get("/metrics")
async def get_metrics():
    return await metrics.get_metrics()

@app.get("/metrics/prometheus")
async def prometheus_metrics():
    snapshot = await metrics.get_metrics()

    return f"""
    # HELP auth_tokens_validated_total Total validated tokens
    # TYPE auth_tokens_validated_total counter
    auth_tokens_validated_total {snapshot['tokens_validated']}

    # HELP auth_success_rate Authentication success rate
    # TYPE auth_success_rate gauge
    auth_success_rate {snapshot['success_rate']}

    # HELP auth_validation_duration_avg_ms Average validation time
    # TYPE auth_validation_duration_avg_ms gauge
    auth_validation_duration_avg_ms {snapshot['validation_time_avg_ms']}
    """

Best Practices

M2M Token Detection

  1. Validate M2M tokens differently - Service accounts may not have email/name

  2. Use client_id for identification - Don’t rely on user_id for M2M

  3. Separate permissions - M2M tokens may need different scopes

  4. Block M2M from user endpoints - Verify is_m2m flag for user-only operations

Rate Limiting

  1. Choose appropriate limits - Balance security and user experience

  2. Use per-user limits - More accurate than IP-based

  3. Return rate limit headers - Help clients manage their requests

  4. Monitor limit hits - Adjust limits based on usage patterns

Audit Logging

  1. Log security-relevant events - Authentication, authorization, sensitive operations

  2. Include context - User ID, IP, timestamp, action details

  3. Protect audit logs - Ensure logs cannot be tampered with

  4. Regular review - Monitor logs for suspicious activity

  5. Compliance - Ensure logging meets regulatory requirements

Metrics Collection

  1. Track key metrics - Success rate, latency, error types

  2. Set up alerts - Monitor for anomalies

  3. Export to monitoring systems - Prometheus, CloudWatch, Datadog

  4. Analyze trends - Identify performance issues early

  5. Capacity planning - Use metrics to predict growth

See Also

Example Code

Complete examples can be found in the repository:

  • examples/m2m_detection_example.py - M2M token detection

  • examples/rate_limiting_example.py - Rate limiting patterns

  • examples/audit_example.py - Audit logging integration

  • examples/metrics_example.py - Metrics collection