Source code for auth_middleware.functions

from fastapi import HTTPException, Request

from auth_middleware.group_checker import GroupChecker
from auth_middleware.permissions_checker import PermissionsChecker
from auth_middleware.settings import settings


from typing import Any, Callable
from fastapi import Depends, Request

from auth_middleware.group_checker import GroupChecker
from auth_middleware.permissions_checker import PermissionsChecker
from auth_middleware.types.user import User


[docs] def require_permissions(allowed_permissions: list[str]) -> Callable[..., Any]: """Check if the user has the required permissions Args: allowed_permissions (List[str]): a list of required permissions """ async def _permissions_checker(request: Request) -> None: """Calls the GroupChecker class to check if the user has the required permissions Args: request (Request): FastAPI request object Returns: GroupChecker: group checker object """ checker = PermissionsChecker(allowed_permissions) await checker(request) return _permissions_checker
[docs] async def has_permissions(request: Request, allowed_permissions: list[str]) -> bool: """Check if the user has the required permissions asynchronously Args: request (Request): FastAPI request object allowed_permissions (List[str]): a list of required permissions Returns: bool: True if the user has the required permissions, False otherwise """ checker = PermissionsChecker(allowed_permissions) try: await checker(request) return True except HTTPException: return False
[docs] def require_groups(allowed_groups: list[str]) -> Callable[..., Any]: """Check if the user has the required groups Args: allowed_groups (List[str]): a list of required groups """ async def _group_checker(request: Request) -> None: """Calls the GroupChecker class to check if the user has the required groups Args: request (Request): FastAPI request object Returns: GroupChecker: group checker object """ checker = GroupChecker(allowed_groups) await checker(request) return _group_checker
[docs] async def has_groups(request: Request, allowed_groups: list[str]) -> bool: """Check if the user has the required groups asynchronously Args: request (Request): FastAPI request object allowed_groups (List[str]): a list of required groups Returns: bool: True if the user has the required groups, False otherwise """ checker = GroupChecker(allowed_groups=allowed_groups) try: await checker(request) return True except HTTPException: return False
[docs] def require_user() -> Callable[..., Any]: """Check if the user is authenticated""" def _user_checker(request: Request) -> User: if settings.AUTH_MIDDLEWARE_DISABLED: # Return a dummy user or raise an exception based on your needs raise HTTPException(status_code=401, detail="Authentication required") if not hasattr(request.state, "current_user") or not request.state.current_user: raise HTTPException(status_code=401, detail="Authentication required") return request.state.current_user # type: ignore[no-any-return] return _user_checker
[docs] def get_current_user() -> Callable[..., Any]: """Returns the current user object if it exists""" def _get_user(request: Request) -> User | None: return ( request.state.current_user if hasattr(request, "state") and hasattr(request.state, "current_user") else None ) return _get_user