Middleware Configuration
This section provides detailed information about configuring the Cache Middleware, including backend options, decorator parameters, and advanced settings.
Backend Configuration
Cache Backend Interface
All cache backends implement the CacheBackend
interface:
from abc import ABC, abstractmethod
from typing import Optional
class CacheBackend(ABC):
"""Abstract base class for cache backends."""
@abstractmethod
async def get(self, key: str) -> Optional[str]:
"""Get a value from the cache."""
pass
@abstractmethod
async def set(self, key: str, value: str, timeout: int) -> None:
"""Set a value in the cache with expiration."""
pass
@abstractmethod
async def delete(self, key: str) -> None:
"""Delete a key from the cache."""
pass
@abstractmethod
async def close(self) -> None:
"""Close backend connections."""
pass
Redis/ValKey Backend Options
The RedisBackend
supports extensive configuration and works with both Redis and ValKey:
from cache_middleware import RedisBackend
redis_backend = RedisBackend(
url="redis://localhost:6379", # Redis/ValKey connection URL (use 6380 for ValKey)
max_connections=10, # Max connections in pool
retry_on_timeout=True, # Retry on timeout
socket_keepalive=True, # Enable TCP keepalive
socket_keepalive_options={ # TCP keepalive settings
1: 1, # TCP_KEEPIDLE
2: 3, # TCP_KEEPINTVL
3: 5, # TCP_KEEPCNT
},
health_check_interval=30, # Health check interval (seconds)
password=None, # Redis/ValKey password
db=0, # Database number
encoding='utf-8', # String encoding
decode_responses=True, # Auto-decode responses
socket_timeout=5.0, # Socket timeout
socket_connect_timeout=5.0, # Connection timeout
connection_pool=None, # Custom connection pool
ssl=False, # Enable SSL
ssl_keyfile=None, # SSL key file
ssl_certfile=None, # SSL certificate file
ssl_cert_reqs='required', # SSL certificate requirements
ssl_ca_certs=None, # SSL CA certificates
ssl_check_hostname=False, # Verify hostname in SSL
max_connections_per_pool=50, # Max connections per pool
)
URL Format Examples:
# Basic Redis
"redis://localhost:6379"
# Basic ValKey
"redis://localhost:6380"
# Redis/ValKey with password
"redis://:password@localhost:6379"
# Redis/ValKey with username and password
"redis://username:password@localhost:6379"
# Redis/ValKey with specific database
"redis://localhost:6379/1"
# Redis/ValKey with SSL
"rediss://localhost:6380"
# Redis Sentinel (works with ValKey too)
"redis+sentinel://sentinel-host:26379/mymaster"
Production Redis/ValKey Configuration:
redis_backend = RedisBackend(
url="redis://prod-redis-01:6379",
max_connections=50,
retry_on_timeout=True,
socket_keepalive=True,
health_check_interval=30,
socket_timeout=10.0,
socket_connect_timeout=10.0,
# Enable SSL for production
ssl=True,
ssl_cert_reqs='required',
ssl_ca_certs='/etc/ssl/certs/redis-ca.pem'
)
Memory Backend Options
The MemoryBackend
has simpler configuration:
from cache_middleware import MemoryBackend
memory_backend = MemoryBackend(
max_size=1000, # Maximum number of cached items
cleanup_interval=300, # Cleanup expired items every 5 minutes
default_timeout=3600, # Default timeout for items (1 hour)
)
Memory Usage Considerations:
Each cached item stores both key and value in memory
Large response bodies consume significant memory
Consider using Redis for production or large datasets
Monitor memory usage in production environments
Custom Backend Configuration
For implementing custom backends:
from cache_middleware import CacheBackend
class CustomBackend(CacheBackend):
def __init__(self, custom_param: str, timeout: int = 300):
self.custom_param = custom_param
self.default_timeout = timeout
self.storage = {}
async def get(self, key: str) -> Optional[str]:
item = self.storage.get(key)
if item and item['expires'] > time.time():
return item['value']
elif item:
del self.storage[key] # Clean up expired item
return None
async def set(self, key: str, value: str, timeout: int) -> None:
expires = time.time() + timeout
self.storage[key] = {'value': value, 'expires': expires}
async def delete(self, key: str) -> None:
self.storage.pop(key, None)
async def close(self) -> None:
self.storage.clear()
Decorator Configuration
Cache Decorator Options
The @cache
decorator accepts several parameters:
from cache_middleware import cache
@cache(
timeout=300, # Cache timeout in seconds
cache_control=True, # Respect HTTP Cache-Control headers
exclude_headers=None, # Headers to exclude from cache key
include_headers=None, # Headers to include in cache key
)
async def my_endpoint():
return {"data": "cached"}
Timeout Configuration:
# Short-lived cache (1 minute)
@cache(timeout=60)
async def real_time_data():
return {"timestamp": time.time()}
# Medium-lived cache (5 minutes)
@cache(timeout=300)
async def user_profile(user_id: int):
return {"user_id": user_id}
# Long-lived cache (1 hour)
@cache(timeout=3600)
async def application_config():
return {"version": "1.0"}
Cache-Control Header Support:
@cache(timeout=300, cache_control=True)
async def cacheable_endpoint():
"""
Supports standard HTTP Cache-Control directives:
- no-cache: Bypasses cache for this request
- no-store: Prevents caching of this response
- max-age=60: Overrides default timeout
"""
return {"data": "value"}
Header-Based Cache Keys:
# Include specific headers in cache key
@cache(timeout=300, include_headers=['Accept-Language', 'User-Agent'])
async def localized_content():
return {"message": "Hello"}
# Exclude sensitive headers from cache key
@cache(timeout=300, exclude_headers=['Authorization', 'Cookie'])
async def public_data():
return {"public": "data"}
Middleware Registration
Basic Registration
Register the middleware with your FastAPI application:
from fastapi import FastAPI
from cache_middleware import CacheMiddleware, RedisBackend
app = FastAPI()
# Create backend instance
backend = RedisBackend(url="redis://localhost:6379")
# Register middleware
app.add_middleware(CacheMiddleware, backend=backend)
Important Notes:
Backend must be fully initialized before passing to middleware
Middleware should be registered before route definitions
Each application instance requires its own backend instance
Middleware with Dependency Injection
For advanced scenarios, use dependency injection:
from fastapi import FastAPI, Depends
from cache_middleware.helpers import get_cache_backend
app = FastAPI()
async def get_backend():
"""Dependency to provide cache backend"""
return get_cache_backend()
# Use in routes that need direct cache access
@app.get("/cache-stats")
async def cache_stats(backend: CacheBackend = Depends(get_backend)):
# Direct backend access for administrative functions
return {"status": "operational"}
Multiple Middleware Instances
For applications needing different cache strategies:
# Different backends for different purposes
session_backend = RedisBackend(url="redis://localhost:6379/0")
data_backend = RedisBackend(url="redis://localhost:6379/1")
# Register multiple middleware instances (not recommended)
# Instead, use a single middleware with smart routing
# Better approach: Backend router
class BackendRouter(CacheBackend):
def __init__(self, backends: dict):
self.backends = backends
async def get(self, key: str) -> Optional[str]:
backend_name = self._get_backend_for_key(key)
return await self.backends[backend_name].get(key)
def _get_backend_for_key(self, key: str) -> str:
if key.startswith("session:"):
return "session"
return "data"
Environment-Based Configuration
Development Configuration
import os
from cache_middleware.helpers import create_backend_from_env
def get_development_backend():
"""Development-specific backend configuration"""
if os.getenv("USE_REDIS", "false").lower() == "true":
return RedisBackend(
url=os.getenv("REDIS_URL", "redis://localhost:6379"),
max_connections=5 # Lower connection pool for dev
)
else:
return MemoryBackend(max_size=100) # Small cache for dev
Production Configuration
def get_production_backend():
"""Production-specific backend configuration"""
return RedisBackend(
url=os.getenv("REDIS_URL"),
max_connections=int(os.getenv("REDIS_MAX_CONNECTIONS", "50")),
retry_on_timeout=True,
socket_keepalive=True,
health_check_interval=30,
socket_timeout=10.0,
# Production SSL settings
ssl=os.getenv("REDIS_SSL", "false").lower() == "true",
ssl_cert_reqs='required',
ssl_ca_certs=os.getenv("REDIS_SSL_CA_CERTS"),
)
Configuration Factory
Create a configuration factory for different environments:
class CacheConfig:
@staticmethod
def create_backend(environment: str) -> CacheBackend:
config_map = {
"development": CacheConfig._development_config,
"testing": CacheConfig._testing_config,
"staging": CacheConfig._staging_config,
"production": CacheConfig._production_config,
}
config_func = config_map.get(environment)
if not config_func:
raise ValueError(f"Unknown environment: {environment}")
return config_func()
@staticmethod
def _development_config() -> CacheBackend:
return MemoryBackend(max_size=100)
@staticmethod
def _testing_config() -> CacheBackend:
return MemoryBackend(max_size=50)
@staticmethod
def _staging_config() -> CacheBackend:
return RedisBackend(
url=os.getenv("REDIS_URL", "redis://staging-redis:6379"),
max_connections=10
)
@staticmethod
def _production_config() -> CacheBackend:
return RedisBackend(
url=os.getenv("REDIS_URL"),
max_connections=50,
retry_on_timeout=True,
socket_keepalive=True,
ssl=True
)
Cache Key Configuration
Default Key Generation
The middleware generates cache keys using this pattern:
# Key format: cache:{hash}
# Hash includes: method, path, query parameters, request body
def generate_cache_key(request):
method = request.method
path = request.url.path
query_params = sorted(request.url.query.split("&"))
body = await request.body()
key_base = f"{method}:{path}?{'&'.join(query_params)}|{body.decode()}"
cache_key = f"cache:{hashlib.sha256(key_base.encode()).hexdigest()}"
return cache_key
Custom Key Generation
Override key generation for specific needs:
class CustomCacheMiddleware(CacheMiddleware):
def generate_cache_key(self, request):
# Custom key generation logic
user_id = request.headers.get("X-User-ID", "anonymous")
endpoint = request.url.path
return f"user_cache:{user_id}:{endpoint}"
Cache Invalidation
Implement cache invalidation patterns:
@app.post("/invalidate-cache")
async def invalidate_cache(pattern: str, backend: CacheBackend = Depends(get_backend)):
"""Invalidate cache entries matching pattern"""
if hasattr(backend, 'delete_pattern'):
await backend.delete_pattern(pattern)
return {"message": "Cache invalidated"}
# Tag-based invalidation
@cache(timeout=300, tags=["user_data", f"user_{user_id}"])
async def get_user_profile(user_id: int):
return {"user_id": user_id}
Performance Tuning
Connection Pooling
Optimize Redis connection pooling:
# High-traffic configuration
redis_backend = RedisBackend(
url="redis://localhost:6379",
max_connections=100, # Increased pool size
socket_keepalive=True,
socket_keepalive_options={
1: 600, # TCP_KEEPIDLE (10 minutes)
2: 60, # TCP_KEEPINTVL (1 minute)
3: 3, # TCP_KEEPCNT
},
health_check_interval=60,
socket_timeout=30.0,
)
Memory Optimization
For memory-constrained environments:
# Optimize memory backend
memory_backend = MemoryBackend(
max_size=500, # Smaller cache size
cleanup_interval=60, # More frequent cleanup
)
# Use compression for large responses
class CompressedMemoryBackend(MemoryBackend):
async def set(self, key: str, value: str, timeout: int) -> None:
compressed_value = gzip.compress(value.encode())
await super().set(key, compressed_value, timeout)
async def get(self, key: str) -> Optional[str]:
compressed_value = await super().get(key)
if compressed_value:
return gzip.decompress(compressed_value).decode()
return None
Monitoring Configuration
Logging Configuration
Configure detailed cache monitoring:
from cache_middleware.logger_config import configure_logger, logger
# Production logging
configure_logger()
logger.add(
"cache_middleware.log",
rotation="100 MB",
retention="30 days",
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
serialize=True # JSON format for log aggregation
)
Metrics Collection
Integrate with monitoring systems:
class MetricsBackend(CacheBackend):
def __init__(self, backend: CacheBackend, metrics_client):
self.backend = backend
self.metrics = metrics_client
async def get(self, key: str) -> Optional[str]:
start_time = time.time()
result = await self.backend.get(key)
duration = time.time() - start_time
self.metrics.histogram('cache.get.duration', duration)
self.metrics.increment('cache.get.requests')
if result:
self.metrics.increment('cache.get.hits')
else:
self.metrics.increment('cache.get.misses')
return result
Health Checks
Implement backend health monitoring:
@app.get("/health/cache")
async def cache_health(backend: CacheBackend = Depends(get_backend)):
"""Check cache backend health"""
try:
# Test cache operations
test_key = f"health_check_{int(time.time())}"
await backend.set(test_key, "ok", 10)
result = await backend.get(test_key)
await backend.delete(test_key)
if result == "ok":
return {"status": "healthy", "backend": type(backend).__name__}
else:
return {"status": "degraded", "error": "Cache not responding correctly"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
Next Steps
Learn how to implement custom backends in Extending Backends
Check the complete API documentation in API Reference
Return to practical examples in User Guide