Skip to content

FastAPI — Auth & Security

OAuth2 Password Flow

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")


@app.post("/auth/token")
async def login(form: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(form.username, form.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid credentials",
        )
    token = create_access_token(data={"sub": user.email})
    return {"access_token": token, "token_type": "bearer"}


@app.get("/me")
async def get_me(token: str = Depends(oauth2_scheme)):
    return decode_token(token)

OAuth2PasswordBearer extracts the token from Authorization: Bearer <token> header. Helper functions like authenticate_user() and decode_token() are placeholders in this snippet.


JWT Token Management

from datetime import UTC, datetime, timedelta

from jose import JWTError, jwt
from pydantic import BaseModel

SECRET_KEY = "your-secret-key-from-env"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


class TokenData(BaseModel):
    sub: str
    exp: datetime


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(UTC) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    to_encode["exp"] = expire
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def decode_access_token(token: str) -> TokenData:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return TokenData(**payload)
    except JWTError as err:
        raise HTTPException(status_code=401, detail="Invalid token") from err
JWT Field Purpose
sub Subject — user ID or email
exp Expiration timestamp
iss Issuer (optional, multi-service)
aud Audience (optional, multi-service)

Password Hashing

from passlib.context import CryptContext

# Prefer Argon2id for new systems; keep bcrypt only for legacy compatibility.
pwd_context = CryptContext(schemes=["argon2", "bcrypt"], deprecated="auto")


def hash_password(plain: str) -> str:
    return pwd_context.hash(plain)


def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

Current User Dependency

from fastapi import Depends, HTTPException, status


async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    token_data = decode_access_token(token)
    result = await db.execute(select(User).where(User.email == token_data.sub))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

Role-Based Access Control

import enum

from fastapi import Depends, HTTPException, status


class Role(str, enum.Enum):
    ADMIN = "admin"
    EDITOR = "editor"
    VIEWER = "viewer"


def require_role(*allowed: Role):
    async def check(user: User = Depends(get_current_user)):
        if user.role not in allowed:
            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
        return user
    return check


@app.get("/admin", dependencies=[Depends(require_role(Role.ADMIN))])
async def admin_only():
    return {"section": "admin"}

API Key Authentication

from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")


async def verify_api_key(api_key: str = Security(api_key_header)):
    if api_key not in VALID_API_KEYS:
        raise HTTPException(status_code=403, detail="Invalid API key")
    return api_key

Refresh Tokens

REFRESH_TOKEN_EXPIRE_DAYS = 7


def create_refresh_token(data: dict) -> str:
    payload = data | {"type": "refresh"}
    return create_access_token(
        data=payload,
        expires_delta=timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS),
    )


@app.post("/auth/refresh")
async def refresh(refresh_token: str):
    token_data = decode_access_token(refresh_token)
    if getattr(token_data, "type", None) != "refresh":
        raise HTTPException(status_code=401, detail="Invalid refresh token")
    return {"access_token": create_access_token(data={"sub": token_data.sub})}

Production note: rotate refresh tokens on every use, store only hashed refresh tokens server-side, and revoke the whole token family on reuse detection.

Token Lifetime Storage
Access 15–30 min Memory / header
Refresh 7–30 days HttpOnly cookie / secure storage

Security Checklist

Practice Why
Use HTTPS everywhere Tokens are plaintext in transit
Short-lived access tokens Limit blast radius of stolen tokens
Validate exp, iss, aud Prevent token reuse across services
Hash passwords with bcrypt Timing-attack resistant, auto-salted
Store secrets in env vars Never hardcode keys in source
Rate-limit auth endpoints Prevent brute-force attacks
Log failed auth attempts Detect attacks early