Middleware Configuration

This section provides detailed information about configuring the Cache Middleware, including backend options, decorator parameters, and advanced settings.

Backend Configuration

Cache Backend Interface

All cache backends implement the CacheBackend interface:

from abc import ABC, abstractmethod
from typing import Optional

class CacheBackend(ABC):
    """Abstract base class for cache backends."""

    @abstractmethod
    async def get(self, key: str) -> Optional[str]:
        """Get a value from the cache."""
        pass

    @abstractmethod
    async def set(self, key: str, value: str, timeout: int) -> None:
        """Set a value in the cache with expiration."""
        pass

    @abstractmethod
    async def delete(self, key: str) -> None:
        """Delete a key from the cache."""
        pass

    @abstractmethod
    async def close(self) -> None:
        """Close backend connections."""
        pass

Redis/ValKey Backend Options

The RedisBackend supports extensive configuration and works with both Redis and ValKey:

from cache_middleware import RedisBackend

redis_backend = RedisBackend(
    url="redis://localhost:6379",           # Redis/ValKey connection URL (use 6380 for ValKey)
    max_connections=10,                     # Max connections in pool
    retry_on_timeout=True,                  # Retry on timeout
    socket_keepalive=True,                  # Enable TCP keepalive
    socket_keepalive_options={              # TCP keepalive settings
        1: 1,  # TCP_KEEPIDLE
        2: 3,  # TCP_KEEPINTVL
        3: 5,  # TCP_KEEPCNT
    },
    health_check_interval=30,               # Health check interval (seconds)
    password=None,                          # Redis/ValKey password
    db=0,                                   # Database number
    encoding='utf-8',                       # String encoding
    decode_responses=True,                  # Auto-decode responses
    socket_timeout=5.0,                     # Socket timeout
    socket_connect_timeout=5.0,             # Connection timeout
    connection_pool=None,                   # Custom connection pool
    ssl=False,                              # Enable SSL
    ssl_keyfile=None,                       # SSL key file
    ssl_certfile=None,                      # SSL certificate file
    ssl_cert_reqs='required',               # SSL certificate requirements
    ssl_ca_certs=None,                      # SSL CA certificates
    ssl_check_hostname=False,               # Verify hostname in SSL
    max_connections_per_pool=50,            # Max connections per pool
)

URL Format Examples:

# Basic Redis
"redis://localhost:6379"

# Basic ValKey
"redis://localhost:6380"

# Redis/ValKey with password
"redis://:password@localhost:6379"

# Redis/ValKey with username and password
"redis://username:password@localhost:6379"

# Redis/ValKey with specific database
"redis://localhost:6379/1"

# Redis/ValKey with SSL
"rediss://localhost:6380"

# Redis Sentinel (works with ValKey too)
"redis+sentinel://sentinel-host:26379/mymaster"

Production Redis/ValKey Configuration:

redis_backend = RedisBackend(
    url="redis://prod-redis-01:6379",
    max_connections=50,
    retry_on_timeout=True,
    socket_keepalive=True,
    health_check_interval=30,
    socket_timeout=10.0,
    socket_connect_timeout=10.0,
    # Enable SSL for production
    ssl=True,
    ssl_cert_reqs='required',
    ssl_ca_certs='/etc/ssl/certs/redis-ca.pem'
)

Memory Backend Options

The MemoryBackend has simpler configuration:

from cache_middleware import MemoryBackend

memory_backend = MemoryBackend(
    max_size=1000,          # Maximum number of cached items
    cleanup_interval=300,    # Cleanup expired items every 5 minutes
    default_timeout=3600,    # Default timeout for items (1 hour)
)

Memory Usage Considerations:

  • Each cached item stores both key and value in memory

  • Large response bodies consume significant memory

  • Consider using Redis for production or large datasets

  • Monitor memory usage in production environments

Custom Backend Configuration

For implementing custom backends:

from cache_middleware import CacheBackend

class CustomBackend(CacheBackend):
    def __init__(self, custom_param: str, timeout: int = 300):
        self.custom_param = custom_param
        self.default_timeout = timeout
        self.storage = {}

    async def get(self, key: str) -> Optional[str]:
        item = self.storage.get(key)
        if item and item['expires'] > time.time():
            return item['value']
        elif item:
            del self.storage[key]  # Clean up expired item
        return None

    async def set(self, key: str, value: str, timeout: int) -> None:
        expires = time.time() + timeout
        self.storage[key] = {'value': value, 'expires': expires}

    async def delete(self, key: str) -> None:
        self.storage.pop(key, None)

    async def close(self) -> None:
        self.storage.clear()

Decorator Configuration

Cache Decorator Options

The @cache decorator accepts several parameters:

from cache_middleware import cache

@cache(
    timeout=300,            # Cache timeout in seconds
    cache_control=True,     # Respect HTTP Cache-Control headers
    exclude_headers=None,   # Headers to exclude from cache key
    include_headers=None,   # Headers to include in cache key
)
async def my_endpoint():
    return {"data": "cached"}

Timeout Configuration:

# Short-lived cache (1 minute)
@cache(timeout=60)
async def real_time_data():
    return {"timestamp": time.time()}

# Medium-lived cache (5 minutes)
@cache(timeout=300)
async def user_profile(user_id: int):
    return {"user_id": user_id}

# Long-lived cache (1 hour)
@cache(timeout=3600)
async def application_config():
    return {"version": "1.0"}

Cache-Control Header Support:

@cache(timeout=300, cache_control=True)
async def cacheable_endpoint():
    """
    Supports standard HTTP Cache-Control directives:
    - no-cache: Bypasses cache for this request
    - no-store: Prevents caching of this response
    - max-age=60: Overrides default timeout
    """
    return {"data": "value"}

Header-Based Cache Keys:

# Include specific headers in cache key
@cache(timeout=300, include_headers=['Accept-Language', 'User-Agent'])
async def localized_content():
    return {"message": "Hello"}

# Exclude sensitive headers from cache key
@cache(timeout=300, exclude_headers=['Authorization', 'Cookie'])
async def public_data():
    return {"public": "data"}

Middleware Registration

Basic Registration

Register the middleware with your FastAPI application:

from fastapi import FastAPI
from cache_middleware import CacheMiddleware, RedisBackend

app = FastAPI()

# Create backend instance
backend = RedisBackend(url="redis://localhost:6379")

# Register middleware
app.add_middleware(CacheMiddleware, backend=backend)

Important Notes:

  • Backend must be fully initialized before passing to middleware

  • Middleware should be registered before route definitions

  • Each application instance requires its own backend instance

Middleware with Dependency Injection

For advanced scenarios, use dependency injection:

from fastapi import FastAPI, Depends
from cache_middleware.helpers import get_cache_backend

app = FastAPI()

async def get_backend():
    """Dependency to provide cache backend"""
    return get_cache_backend()

# Use in routes that need direct cache access
@app.get("/cache-stats")
async def cache_stats(backend: CacheBackend = Depends(get_backend)):
    # Direct backend access for administrative functions
    return {"status": "operational"}

Multiple Middleware Instances

For applications needing different cache strategies:

# Different backends for different purposes
session_backend = RedisBackend(url="redis://localhost:6379/0")
data_backend = RedisBackend(url="redis://localhost:6379/1")

# Register multiple middleware instances (not recommended)
# Instead, use a single middleware with smart routing

# Better approach: Backend router
class BackendRouter(CacheBackend):
    def __init__(self, backends: dict):
        self.backends = backends

    async def get(self, key: str) -> Optional[str]:
        backend_name = self._get_backend_for_key(key)
        return await self.backends[backend_name].get(key)

    def _get_backend_for_key(self, key: str) -> str:
        if key.startswith("session:"):
            return "session"
        return "data"

Environment-Based Configuration

Development Configuration

import os
from cache_middleware.helpers import create_backend_from_env

def get_development_backend():
    """Development-specific backend configuration"""
    if os.getenv("USE_REDIS", "false").lower() == "true":
        return RedisBackend(
            url=os.getenv("REDIS_URL", "redis://localhost:6379"),
            max_connections=5  # Lower connection pool for dev
        )
    else:
        return MemoryBackend(max_size=100)  # Small cache for dev

Production Configuration

def get_production_backend():
    """Production-specific backend configuration"""
    return RedisBackend(
        url=os.getenv("REDIS_URL"),
        max_connections=int(os.getenv("REDIS_MAX_CONNECTIONS", "50")),
        retry_on_timeout=True,
        socket_keepalive=True,
        health_check_interval=30,
        socket_timeout=10.0,
        # Production SSL settings
        ssl=os.getenv("REDIS_SSL", "false").lower() == "true",
        ssl_cert_reqs='required',
        ssl_ca_certs=os.getenv("REDIS_SSL_CA_CERTS"),
    )

Configuration Factory

Create a configuration factory for different environments:

class CacheConfig:
    @staticmethod
    def create_backend(environment: str) -> CacheBackend:
        config_map = {
            "development": CacheConfig._development_config,
            "testing": CacheConfig._testing_config,
            "staging": CacheConfig._staging_config,
            "production": CacheConfig._production_config,
        }

        config_func = config_map.get(environment)
        if not config_func:
            raise ValueError(f"Unknown environment: {environment}")

        return config_func()

    @staticmethod
    def _development_config() -> CacheBackend:
        return MemoryBackend(max_size=100)

    @staticmethod
    def _testing_config() -> CacheBackend:
        return MemoryBackend(max_size=50)

    @staticmethod
    def _staging_config() -> CacheBackend:
        return RedisBackend(
            url=os.getenv("REDIS_URL", "redis://staging-redis:6379"),
            max_connections=10
        )

    @staticmethod
    def _production_config() -> CacheBackend:
        return RedisBackend(
            url=os.getenv("REDIS_URL"),
            max_connections=50,
            retry_on_timeout=True,
            socket_keepalive=True,
            ssl=True
        )

Cache Key Configuration

Default Key Generation

The middleware generates cache keys using this pattern:

# Key format: cache:{hash}
# Hash includes: method, path, query parameters, request body

def generate_cache_key(request):
    method = request.method
    path = request.url.path
    query_params = sorted(request.url.query.split("&"))
    body = await request.body()

    key_base = f"{method}:{path}?{'&'.join(query_params)}|{body.decode()}"
    cache_key = f"cache:{hashlib.sha256(key_base.encode()).hexdigest()}"
    return cache_key

Custom Key Generation

Override key generation for specific needs:

class CustomCacheMiddleware(CacheMiddleware):
    def generate_cache_key(self, request):
        # Custom key generation logic
        user_id = request.headers.get("X-User-ID", "anonymous")
        endpoint = request.url.path
        return f"user_cache:{user_id}:{endpoint}"

Cache Invalidation

Implement cache invalidation patterns:

@app.post("/invalidate-cache")
async def invalidate_cache(pattern: str, backend: CacheBackend = Depends(get_backend)):
    """Invalidate cache entries matching pattern"""
    if hasattr(backend, 'delete_pattern'):
        await backend.delete_pattern(pattern)
    return {"message": "Cache invalidated"}

# Tag-based invalidation
@cache(timeout=300, tags=["user_data", f"user_{user_id}"])
async def get_user_profile(user_id: int):
    return {"user_id": user_id}

Performance Tuning

Connection Pooling

Optimize Redis connection pooling:

# High-traffic configuration
redis_backend = RedisBackend(
    url="redis://localhost:6379",
    max_connections=100,  # Increased pool size
    socket_keepalive=True,
    socket_keepalive_options={
        1: 600,  # TCP_KEEPIDLE (10 minutes)
        2: 60,   # TCP_KEEPINTVL (1 minute)
        3: 3,    # TCP_KEEPCNT
    },
    health_check_interval=60,
    socket_timeout=30.0,
)

Memory Optimization

For memory-constrained environments:

# Optimize memory backend
memory_backend = MemoryBackend(
    max_size=500,           # Smaller cache size
    cleanup_interval=60,    # More frequent cleanup
)

# Use compression for large responses
class CompressedMemoryBackend(MemoryBackend):
    async def set(self, key: str, value: str, timeout: int) -> None:
        compressed_value = gzip.compress(value.encode())
        await super().set(key, compressed_value, timeout)

    async def get(self, key: str) -> Optional[str]:
        compressed_value = await super().get(key)
        if compressed_value:
            return gzip.decompress(compressed_value).decode()
        return None

Monitoring Configuration

Logging Configuration

Configure detailed cache monitoring:

from cache_middleware.logger_config import configure_logger, logger

# Production logging
configure_logger()
logger.add(
    "cache_middleware.log",
    rotation="100 MB",
    retention="30 days",
    level="INFO",
    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
    serialize=True  # JSON format for log aggregation
)

Metrics Collection

Integrate with monitoring systems:

class MetricsBackend(CacheBackend):
    def __init__(self, backend: CacheBackend, metrics_client):
        self.backend = backend
        self.metrics = metrics_client

    async def get(self, key: str) -> Optional[str]:
        start_time = time.time()
        result = await self.backend.get(key)
        duration = time.time() - start_time

        self.metrics.histogram('cache.get.duration', duration)
        self.metrics.increment('cache.get.requests')

        if result:
            self.metrics.increment('cache.get.hits')
        else:
            self.metrics.increment('cache.get.misses')

        return result

Health Checks

Implement backend health monitoring:

@app.get("/health/cache")
async def cache_health(backend: CacheBackend = Depends(get_backend)):
    """Check cache backend health"""
    try:
        # Test cache operations
        test_key = f"health_check_{int(time.time())}"
        await backend.set(test_key, "ok", 10)
        result = await backend.get(test_key)
        await backend.delete(test_key)

        if result == "ok":
            return {"status": "healthy", "backend": type(backend).__name__}
        else:
            return {"status": "degraded", "error": "Cache not responding correctly"}
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}

Next Steps