Secure API with OAuth2, JWT, and AWS Cognito - Part 2

Table Of Content
By CamelEdge
Updated on Fri Jan 10 2025
Introduction
In modern web applications, managing user authentication and authorization is a critical aspect of security and user experience. This implementation demonstrates a robust approach to user authentication by leveraging AWS Cognito for user management and FastAPI for creating APIs. We define models using Pydantic for data validation, ensuring clean and consistent data handling. AWS Cognito provides a scalable and secure solution for handling user signup, verification, signin, token management, and logout.
In part 1, we manually created the JWT token and verified it. In this part, we will use AWS cognito to generate the JWT token which we will verify in the FastAPI application. We will also implement the user signup, verify, signin, refresh token, and logout functionality using AWS Cognito.
Description
This solution comprises multiple components:
1. Pydantic Models
Pydantic is a Python library used to define and validate data models with type hints. The following models are used to represent user authentication data:
UserSignup
: Represents the data required for user registration.UserVerify
: Handles account verification using a confirmation code.UserSignin
: Encapsulates the user credentials for login.RefreshToken
&AccessToken
: Define the structure for tokens returned by AWS Cognito.
2. AWS Cognito Integration
The AWS_Cognito
class encapsulates operations with AWS Cognito, using the boto3
SDK to perform tasks like:
- User signup
- Account verification
- User signin
- Token refresh
- Logout
3. Authentication Service
The AuthService
class acts as an intermediary between the API and AWS Cognito. It processes exceptions raised by Cognito and converts them into meaningful HTTP responses for the client, ensuring a seamless user experience.
4. Access Token Verification
Access tokens, returned upon successful signin, are used for securing APIs. The tokens are verified using the JSON Web Key Set (JWKS) fetched from AWS Cognito. This ensures that only valid and untampered tokens are accepted.
5. Routes
The FastAPI router defines endpoints for user authentication workflows:
- Signup
- Verify account
- Signin
- Token refresh
- Logout
Each route interacts with the AuthService
to process user requests and return appropriate responses.
This setup provides a secure and scalable foundation for user authentication and authorization, leveraging best practices for token-based authentication and JWT verification.
Implementation
The implementation has following steps:
1. Define Pydantic Models
Let define the models for the user signup, verify and signin. We will use Pydantic to define the models. Pydantic is a data validation library that allows you to define data models using Python type hints. FastAPI uses Pydantic models to automatically validate incoming requests and serialize outgoing responses.
class UserSignup(BaseModel):
username: str = Field(max_length=50)
email: EmailStr
password: Annotated[str, MinLen(8)]
mode: str
class UserVerify(BaseModel):
email: EmailStr
confirmation_code: Annotated[str, MaxLen(6)]
class UserSignin(BaseModel):
email: EmailStr
password: Annotated[str, MinLen(8)]
RefreshToken and AccessToken models are used to define the response structure for the token endpoints. These models are used to return the refresh token and access token to the client.
class RefreshToken(BaseModel):
refresh_token: str
class AccessToken(BaseModel):
access_token: str
2. Define AWS Cognito Class
Lets now define the AWS Cognito class which will handle the user signup, verify, signin, refresh token and logout. We will use the boto3 library to interact with the AWS Cognito service. Boto3 is the AWS SDK for Python, which allows you to interact with AWS services using Python code.
class AWS_Cognito:
def __init__(self):
self.client = boto3.client("cognito-idp", region_name=AWS_REGION_NAME)
def user_signup(self, user: UserSignup):
response = self.client.sign_up(
ClientId=AWS_COGNITO_APP_CLIENT_ID,
Username=user.email,
Password=user.password,
UserAttributes=[
{
'Name': 'username',
'Value': user.username,
},
{
'Name': 'custom:mode',
'Value': user.mode
}
],
)
return response
def verify_account(self, data: UserVerify):
response = self.client.confirm_sign_up(
ClientId=AWS_COGNITO_APP_CLIENT_ID,
Username=data.email,
ConfirmationCode=data.confirmation_code,
)
return response
def user_signin(self, data: UserSignin):
response = self.client.initiate_auth(
ClientId=AWS_COGNITO_APP_CLIENT_ID,
AuthFlow='USER_PASSWORD_AUTH',
AuthParameters={
'USERNAME': data.email,
'PASSWORD': data.password
}
)
return response
def new_access_token(self, refresh_token: str):
response = self.client.initiate_auth(
ClientId=AWS_COGNITO_APP_CLIENT_ID,
AuthFlow='REFRESH_TOKEN_AUTH',
AuthParameters={
'REFRESH_TOKEN': refresh_token,
}
)
return response
def logout(self, access_token: str):
response = self.client.global_sign_out(
AccessToken = access_token
)
return response
3. Define AuthService Class
The AuthService class will handle the user signup, verify, signin, refresh token and logout. The AuthService class will call the AWS_Cognito class to interact with the AWS Cognito service. The AuthService class will handle the exceptions raised by the AWS Cognito service and return the appropriate HTTP response to the client.
class AuthService:
@staticmethod
def user_signup(user: UserSignup, cognito: AWS_Cognito):
try:
response = cognito.user_signup(user)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'UsernameExistsException':
raise HTTPException(
status_code=409, detail="An account with the given email already exists")
else:
raise HTTPException(status_code=500, detail="Internal Server")
else:
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
content = {
"message": "User created successfully",
"sub": response["UserSub"]
}
return JSONResponse(content=content, status_code=201)
@staticmethod
def verify_account(data: UserVerify, cognito: AWS_Cognito):
try:
response = cognito.verify_account(data)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'CodeMismatchException':
raise HTTPException(
status_code=400, detail="The provided code does not match the expected value.")
elif e.response['Error']['Code'] == 'ExpiredCodeException':
raise HTTPException(
status_code=400, detail="The provided code has expired.")
elif e.response['Error']['Code'] == 'UserNotFoundException':
raise HTTPException(
status_code=404, detail="User not found")
elif e.response['Error']['Code'] == 'NotAuthorizedException':
raise HTTPException(
status_code=200, detail="User already verified.")
else:
raise HTTPException(status_code=500, detail="Internal Server")
else:
return JSONResponse(content={"message": "Account verification successful"}, status_code=200)
@staticmethod
def user_signin(data: UserSignin, cognito: AWS_Cognito):
try:
response = cognito.user_signin(data)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'UserNotFoundException':
raise HTTPException(
status_code=404, detail="User deos not exist")
elif e.response['Error']['Code'] == 'UserNotConfirmedException':
raise HTTPException(
status_code=403, detail="Please verify your account")
elif e.response['Error']['Code'] == 'NotAuthorizedException':
raise HTTPException(
status_code=401, detail="Incorrect username or password")
else:
raise HTTPException(status_code=500, detail="Internal Server")
else:
content = {
"message": 'User signed in successfully',
"AccessToken": response['AuthenticationResult']['AccessToken'],
"RefreshToken": response['AuthenticationResult']['RefreshToken']
}
return JSONResponse(content=content, status_code=200)
@staticmethod
def new_access_token(refresh_token: str, cognito: AWS_Cognito):
try:
response = cognito.new_access_token(refresh_token)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'InvalidParameterException':
raise HTTPException(
status_code=400, detail="Refresh token provided has wrong format")
elif e.response['Error']['Code'] == 'NotAuthorizedException':
raise HTTPException(
status_code=401, detail="Invalid refresh token provided")
elif e.response['Error']['Code'] == 'LimitExceededException':
raise HTTPException(
status_code=429, detail="Attempt limit exceeded, please try again later")
else:
raise HTTPException(status_code=500, detail="Internal Server")
else:
content = {
"message": 'Refresh token generated successfully',
"AccessToken": response['AuthenticationResult']['AccessToken'],
"ExpiresIn": response['AuthenticationResult']['ExpiresIn'],
}
return JSONResponse(content=content, status_code=200)
@staticmethod
def logout(access_token: str, cognito: AWS_Cognito):
try:
response = cognito.logout(access_token)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'InvalidParameterException':
raise HTTPException(
status_code=400, detail="Access token provided has wrong format")
elif e.response['Error']['Code'] == 'NotAuthorizedException':
raise HTTPException(
status_code=401, detail="Invalid access token provided")
elif e.response['Error']['Code'] == 'TooManyRequestsException':
raise HTTPException(
status_code=429, detail="Too many requests")
else:
raise HTTPException(status_code=500, detail="Internal Server")
else:
return
4. Access Token Verification
When the user signins the AWS Cognito service will return the access token and refresh token. The access token is used to authenticate the user and access protected resources, while the refresh token is used to obtain a new access token when the current one expires. The refresh token is a long-lived token that can be used to generate new access tokens without requiring the user to sign in again. The refresh token is typically stored securely on the client side and sent to the server to obtain a new access token.
In order to authenticate the user, the server needs to verify the access token. The access token is a JSON Web Token (JWT) that contains information about the user and the permissions granted to the user. The server can decode and verify the access token using the public key provided by the AWS Cognito service. The public key is used to verify the signature of the access token and ensure that it has not been tampered with. The server can then extract the user information and permissions from the access token and use it to authorize the user to access protected resources.
fetch_jwks
function fetches the JSON Web Key Set (JWKS) from the AWS Cognito service. The JWKS contains the public keys used to verify the access tokens issued by the AWS Cognito service. The public keys are used to verify the signature of the access token and ensure that it has not been tampered with. The server can cache the JWKS to reduce the number of requests to the AWS Cognito service and improve performance.
@lru_cache
def fetch_jwks() -> JWKS:
response = requests.get(JWKS_URL)
if response.status_code != 200:
raise Exception("Unable to fetch JWKS")
return response.json()
The decode_token
function decodes and verifies the access token using the public key provided by the AWS Cognito service. The function first fetches the JWKS from the AWS Cognito service and extracts the public key corresponding to the key ID (kid) in the token header. The function then decodes the access token using the public key and verifies the signature of the token. If the access token is valid, the function returns the decoded token, which contains the user information and permissions. If the access token is invalid or expired, the function raises an HTTP exception with the appropriate error message.
@staticmethod
def decode_token(token: str):
"""Decode and verify the Cognito JWT access token."""
# Get the JWKS
jwks = fetch_jwks()
# Get the Key ID (kid) from the token header
unverified_header = jwt.get_unverified_header(token)
public_key = None
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
public_key = key
if not public_key:
raise HTTPException(status_code=401, detail="Invalid Token")
# Decode the JWT
try:
public_key = RSAAlgorithm.from_jwk(json.dumps(public_key))
decoded_token = jwt.decode(
token,
public_key,
algorithms=["RS256"]
)
return decoded_token
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token has expired.")
except jwt.JWTClaimsError:
raise HTTPException(status_code=401, detail="Invalid token claims.")
except Exception as e:
raise HTTPException(status_code=401, detail=f"Token verification failed: {str(e)}")
5. Define Routes
The routes for the user signup, verify, signin, refresh token and logout are defined in the auth_router. The routes call the corresponding methods in the AuthService class to handle the user authentication and authorization. The routes return the appropriate HTTP response to the client based on the outcome of the authentication and authorization process.
auth_router = APIRouter(prefix='/api/v1/auth')
def get_aws_cognito() -> AWS_Cognito:
return AWS_Cognito()
# USER SIGNUP
@auth_router.post('/signup', status_code=status.HTTP_201_CREATED, tags=['Auth'])
async def signup_user(user: UserSignup, cognito: AWS_Cognito = Depends(get_aws_cognito)):
return AuthService.user_signup(user, cognito)
# VERIFY ACCOUNT
@auth_router.post('/verify_account', status_code=status.HTTP_200_OK, tags=["Auth"])
async def verify_account(
data: UserVerify,
cognito: AWS_Cognito = Depends(get_aws_cognito),
):
return AuthService.verify_account(data, cognito)
# USER SIGNIN
@auth_router.post('/signin', status_code=status.HTTP_200_OK, tags=["Auth"])
async def signin(data: UserSignin, cognito: AWS_Cognito = Depends(get_aws_cognito)):
return AuthService.user_signin(data, cognito)
# GENERATE NEW ACCESS TOKEN
@auth_router.post('/new_token', status_code=status.HTTP_200_OK, tags=["Auth"])
async def new_access_token(refresh_token: RefreshToken, cognito: AWS_Cognito = Depends(get_aws_cognito)):
return AuthService.new_access_token(refresh_token.refresh_token, cognito)
# LOGOUT
@auth_router.post('/logout', status_code=status.HTTP_204_NO_CONTENT, tags=["Auth"])
async def logout(access_token: AccessToken, cognito: AWS_Cognito = Depends(get_aws_cognito)):
return AuthService.logout(access_token.access_token, cognito)
The Depends
function is used to inject the AWS_Cognito
instance into the route handlers. The get_aws_cognito
function is used to create a new instance of the AWS_Cognito
class for each request.
6. Main FastAPI Application
The main FastAPI application is defined in the app.py file. The application includes the auth_router, which contains the routes for the user authentication and authorization. The application also includes an index route for the health check. The application is wrapped with the Mangum handler to enable deployment on AWS Lambda.
OAuth2PasswordBearer is used to define the OAuth2 authentication scheme for the API routes. The demopage
route is protected with the oauth2_scheme
dependency, which requires a valid access token to access the route. The AuthService.decode_token
method is used to decode and verify the access token before allowing access to the route.
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer
from mangum import Mangum # Import Mangum for AWS Lambda support
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI(
title="Auth Service",
description="IAPI authentication service",
version="1.0.0",
)
app.include_router(auth_router)
# Index health check
@app.get('/')
def index():
return {"message": "Authentication service"}
@app.get("/demopage")
async def demopage(token: str = Depends(oauth2_scheme)):
AuthService.decode_token(token)
return {"message": "Authenicated"}
handler = Mangum(app)
Before running the application AWS_REGION_NAME
, AWS_COGNITO_APP_CLIENT_ID
, and AWS_COGNITO_USER_POOL_ID
environment variables need to be set.
Conclusion
This implementation demonstrates a robust approach to user authentication and authorization using AWS Cognito and FastAPI. The solution leverages Pydantic models for data validation, AWS Cognito for user management, and FastAPI for API development. The AuthService class acts as an intermediary between the API and AWS Cognito, handling user requests and responses. The access token verification process ensures that only valid and untampered tokens are accepted, providing a secure foundation for user authentication and authorization.
Part 1 of this series demonstrated how to manually create and verify JWT tokens.
** Reference ** Link