CamelEdge
web development

Secure API with OAuth2, JWT, and AWS Cognito - Part 2

security
Table Of Content

    By CamelEdge

    Updated on Fri Jan 10 2025


    In modern web applications, managing user authentication and authorization is a critical aspect of security and user experience. This implementation demonstrates a robust approach to user authentication by leveraging AWS Cognito for user management and FastAPI for creating APIs. We define models using Pydantic for data validation, ensuring clean and consistent data handling. AWS Cognito provides a scalable and secure solution for handling user signup, verification, signin, token management, and logout.

    In part 1, we manually created the JWT token and verified it. In this part, we will use AWS cognito to generate the JWT token which we will verify in the FastAPI application. We will also implement the user signup, verify, signin, refresh token, and logout functionality using AWS Cognito.

    Description

    This solution comprises multiple components:

    1. Pydantic Models

    Pydantic is a Python library used to define and validate data models with type hints. The following models are used to represent user authentication data:

    • UserSignup: Represents the data required for user registration.
    • UserVerify: Handles account verification using a confirmation code.
    • UserSignin: Encapsulates the user credentials for login.
    • RefreshToken & AccessToken: Define the structure for tokens returned by AWS Cognito.

    2. AWS Cognito Integration

    The AWS_Cognito class encapsulates operations with AWS Cognito, using the boto3 SDK to perform tasks like:

    • User signup
    • Account verification
    • User signin
    • Token refresh
    • Logout

    3. Authentication Service

    The AuthService class acts as an intermediary between the API and AWS Cognito. It processes exceptions raised by Cognito and converts them into meaningful HTTP responses for the client, ensuring a seamless user experience.

    4. Access Token Verification

    Access tokens, returned upon successful signin, are used for securing APIs. The tokens are verified using the JSON Web Key Set (JWKS) fetched from AWS Cognito. This ensures that only valid and untampered tokens are accepted.

    5. Routes

    The FastAPI router defines endpoints for user authentication workflows:

    • Signup
    • Verify account
    • Signin
    • Token refresh
    • Logout

    Each route interacts with the AuthService to process user requests and return appropriate responses.

    This setup provides a secure and scalable foundation for user authentication and authorization, leveraging best practices for token-based authentication and JWT verification.

    Implementation

    The implementation has following steps:

    1. Define Pydantic Models

    Let define the models for the user signup, verify and signin. We will use Pydantic to define the models. Pydantic is a data validation library that allows you to define data models using Python type hints. FastAPI uses Pydantic models to automatically validate incoming requests and serialize outgoing responses.

    class UserSignup(BaseModel):
        username: str = Field(max_length=50)
        email: EmailStr
        password: Annotated[str, MinLen(8)]
        mode: str
    
    class UserVerify(BaseModel):
        email: EmailStr
        confirmation_code: Annotated[str, MaxLen(6)]
    
    class UserSignin(BaseModel):
        email: EmailStr
        password: Annotated[str, MinLen(8)]
    

    RefreshToken and AccessToken models are used to define the response structure for the token endpoints. These models are used to return the refresh token and access token to the client.

    class RefreshToken(BaseModel):
        refresh_token: str
    
    class AccessToken(BaseModel):
        access_token: str
    

    2. Define AWS Cognito Class

    Lets now define the AWS Cognito class which will handle the user signup, verify, signin, refresh token and logout. We will use the boto3 library to interact with the AWS Cognito service. Boto3 is the AWS SDK for Python, which allows you to interact with AWS services using Python code.

    class AWS_Cognito:
      def __init__(self):
          self.client = boto3.client("cognito-idp", region_name=AWS_REGION_NAME)
    
      def user_signup(self, user: UserSignup):
    
        response = self.client.sign_up(
            ClientId=AWS_COGNITO_APP_CLIENT_ID,
            Username=user.email,
            Password=user.password,
            UserAttributes=[
                {
                    'Name': 'username',
                    'Value': user.username,
                },
                {
                    'Name': 'custom:mode',
                    'Value': user.mode
                }
            ],
        )
    
        return response
    
      def verify_account(self, data: UserVerify):
        response = self.client.confirm_sign_up(
            ClientId=AWS_COGNITO_APP_CLIENT_ID,
            Username=data.email,
            ConfirmationCode=data.confirmation_code,
        )
    
        return response
    
      def user_signin(self, data: UserSignin):
        response = self.client.initiate_auth(
            ClientId=AWS_COGNITO_APP_CLIENT_ID,
            AuthFlow='USER_PASSWORD_AUTH',
            AuthParameters={
                'USERNAME': data.email,
                'PASSWORD': data.password
            }
        )
    
        return response
      
      def new_access_token(self, refresh_token: str):
        response = self.client.initiate_auth(
            ClientId=AWS_COGNITO_APP_CLIENT_ID,
            AuthFlow='REFRESH_TOKEN_AUTH',
            AuthParameters={
                'REFRESH_TOKEN': refresh_token,
            }
        )
    
        return response
    
      def logout(self, access_token: str):
        response = self.client.global_sign_out(
            AccessToken = access_token
        )
    
        return response        
    

    3. Define AuthService Class

    The AuthService class will handle the user signup, verify, signin, refresh token and logout. The AuthService class will call the AWS_Cognito class to interact with the AWS Cognito service. The AuthService class will handle the exceptions raised by the AWS Cognito service and return the appropriate HTTP response to the client.

    class AuthService:
      @staticmethod
      def user_signup(user: UserSignup, cognito: AWS_Cognito):
        try:
            response = cognito.user_signup(user)
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == 'UsernameExistsException':
                raise HTTPException(
                    status_code=409, detail="An account with the given email already exists")
            else:
                raise HTTPException(status_code=500, detail="Internal Server")
        else:
            if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                content = {
                    "message": "User created successfully",
                    "sub": response["UserSub"]
                }
                return JSONResponse(content=content, status_code=201)
    
      @staticmethod
      def verify_account(data: UserVerify, cognito: AWS_Cognito):
        try:
            response = cognito.verify_account(data)
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == 'CodeMismatchException':
                raise HTTPException(
                    status_code=400, detail="The provided code does not match the expected value.")
            elif e.response['Error']['Code'] == 'ExpiredCodeException':
                raise HTTPException(
                    status_code=400, detail="The provided code has expired.")
            elif e.response['Error']['Code'] == 'UserNotFoundException':
                raise HTTPException(
                    status_code=404, detail="User not found")
            elif e.response['Error']['Code'] == 'NotAuthorizedException':
                raise HTTPException(
                    status_code=200, detail="User already verified.")
            else:
                raise HTTPException(status_code=500, detail="Internal Server")
        else:
            return JSONResponse(content={"message": "Account verification successful"}, status_code=200)
    
        
      @staticmethod
      def user_signin(data: UserSignin, cognito: AWS_Cognito):
        try:
            response = cognito.user_signin(data)
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == 'UserNotFoundException':
                raise HTTPException(
                    status_code=404, detail="User deos not exist")
            elif e.response['Error']['Code'] == 'UserNotConfirmedException':
                raise HTTPException(
                    status_code=403, detail="Please verify your account")
            elif e.response['Error']['Code'] == 'NotAuthorizedException':
                raise HTTPException(
                    status_code=401, detail="Incorrect username or password")
            else:
                raise HTTPException(status_code=500, detail="Internal Server")
        else:
            content = {
                "message": 'User signed in successfully',
                "AccessToken": response['AuthenticationResult']['AccessToken'],
                "RefreshToken": response['AuthenticationResult']['RefreshToken']
            }
            return JSONResponse(content=content, status_code=200)
          
      @staticmethod
      def new_access_token(refresh_token: str, cognito: AWS_Cognito):
        try:
            response = cognito.new_access_token(refresh_token)
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == 'InvalidParameterException':
                raise HTTPException(
                    status_code=400, detail="Refresh token provided has wrong format")
            elif e.response['Error']['Code'] == 'NotAuthorizedException':
                raise HTTPException(
                    status_code=401, detail="Invalid refresh token provided")
            elif e.response['Error']['Code'] == 'LimitExceededException':
                raise HTTPException(
                    status_code=429, detail="Attempt limit exceeded, please try again later")
            else:
                raise HTTPException(status_code=500, detail="Internal Server")
        else:
            content = {
                "message": 'Refresh token generated successfully',
                "AccessToken": response['AuthenticationResult']['AccessToken'],
                "ExpiresIn": response['AuthenticationResult']['ExpiresIn'],
            }
            return JSONResponse(content=content, status_code=200)
    
    
      @staticmethod
      def logout(access_token: str, cognito: AWS_Cognito):
        try:
            response = cognito.logout(access_token)
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == 'InvalidParameterException':
                raise HTTPException(
                    status_code=400, detail="Access token provided has wrong format")
            elif e.response['Error']['Code'] == 'NotAuthorizedException':
                raise HTTPException(
                    status_code=401, detail="Invalid access token provided")
            elif e.response['Error']['Code'] == 'TooManyRequestsException':
                raise HTTPException(
                    status_code=429, detail="Too many requests")
            else:
                raise HTTPException(status_code=500, detail="Internal Server")
        else:
            return
    

    4. Access Token Verification

    When the user signins the AWS Cognito service will return the access token and refresh token. The access token is used to authenticate the user and access protected resources, while the refresh token is used to obtain a new access token when the current one expires. The refresh token is a long-lived token that can be used to generate new access tokens without requiring the user to sign in again. The refresh token is typically stored securely on the client side and sent to the server to obtain a new access token.

    In order to authenticate the user, the server needs to verify the access token. The access token is a JSON Web Token (JWT) that contains information about the user and the permissions granted to the user. The server can decode and verify the access token using the public key provided by the AWS Cognito service. The public key is used to verify the signature of the access token and ensure that it has not been tampered with. The server can then extract the user information and permissions from the access token and use it to authorize the user to access protected resources.

    fetch_jwks function fetches the JSON Web Key Set (JWKS) from the AWS Cognito service. The JWKS contains the public keys used to verify the access tokens issued by the AWS Cognito service. The public keys are used to verify the signature of the access token and ensure that it has not been tampered with. The server can cache the JWKS to reduce the number of requests to the AWS Cognito service and improve performance.

    @lru_cache
    def fetch_jwks() -> JWKS:
        response = requests.get(JWKS_URL)
        if response.status_code != 200:
          raise Exception("Unable to fetch JWKS")
        return response.json()
    

    The decode_token function decodes and verifies the access token using the public key provided by the AWS Cognito service. The function first fetches the JWKS from the AWS Cognito service and extracts the public key corresponding to the key ID (kid) in the token header. The function then decodes the access token using the public key and verifies the signature of the token. If the access token is valid, the function returns the decoded token, which contains the user information and permissions. If the access token is invalid or expired, the function raises an HTTP exception with the appropriate error message.

    @staticmethod
    def decode_token(token: str):
      """Decode and verify the Cognito JWT access token."""
      # Get the JWKS
      jwks = fetch_jwks()
      
      # Get the Key ID (kid) from the token header
      unverified_header = jwt.get_unverified_header(token)
    
      public_key = None
      for key in jwks["keys"]:
        if key["kid"] == unverified_header["kid"]:
            public_key = key
    
      if not public_key:
          raise HTTPException(status_code=401, detail="Invalid Token")
      # Decode the JWT
      try:
          
          public_key = RSAAlgorithm.from_jwk(json.dumps(public_key))
          decoded_token = jwt.decode(
              token,
              public_key,
              algorithms=["RS256"]
          )
          return decoded_token
      except jwt.ExpiredSignatureError:
          raise HTTPException(status_code=401, detail="Token has expired.")
      except jwt.JWTClaimsError:
          raise HTTPException(status_code=401, detail="Invalid token claims.")
      except Exception as e:
          raise HTTPException(status_code=401, detail=f"Token verification failed: {str(e)}")
    

    5. Define Routes

    The routes for the user signup, verify, signin, refresh token and logout are defined in the auth_router. The routes call the corresponding methods in the AuthService class to handle the user authentication and authorization. The routes return the appropriate HTTP response to the client based on the outcome of the authentication and authorization process.

    
    auth_router = APIRouter(prefix='/api/v1/auth')
    
    def get_aws_cognito() -> AWS_Cognito:
        return AWS_Cognito()
    
    # USER SIGNUP
    @auth_router.post('/signup', status_code=status.HTTP_201_CREATED, tags=['Auth'])
    async def signup_user(user: UserSignup, cognito: AWS_Cognito = Depends(get_aws_cognito)):
        return AuthService.user_signup(user, cognito)
    
    # VERIFY ACCOUNT
    @auth_router.post('/verify_account', status_code=status.HTTP_200_OK, tags=["Auth"])
    async def verify_account(
        data: UserVerify,
        cognito: AWS_Cognito = Depends(get_aws_cognito),
    ):
        return AuthService.verify_account(data, cognito)
    
    # USER SIGNIN
    @auth_router.post('/signin', status_code=status.HTTP_200_OK, tags=["Auth"])
    async def signin(data: UserSignin, cognito: AWS_Cognito = Depends(get_aws_cognito)):
        return AuthService.user_signin(data, cognito)
    
    # GENERATE NEW ACCESS TOKEN
    @auth_router.post('/new_token', status_code=status.HTTP_200_OK, tags=["Auth"])
    async def new_access_token(refresh_token: RefreshToken, cognito: AWS_Cognito = Depends(get_aws_cognito)):
        return AuthService.new_access_token(refresh_token.refresh_token, cognito)
    
    # LOGOUT
    @auth_router.post('/logout', status_code=status.HTTP_204_NO_CONTENT, tags=["Auth"])
    async def logout(access_token: AccessToken, cognito: AWS_Cognito = Depends(get_aws_cognito)):
        return AuthService.logout(access_token.access_token, cognito)
    

    The Depends function is used to inject the AWS_Cognito instance into the route handlers. The get_aws_cognito function is used to create a new instance of the AWS_Cognito class for each request.

    6. Main FastAPI Application

    The main FastAPI application is defined in the app.py file. The application includes the auth_router, which contains the routes for the user authentication and authorization. The application also includes an index route for the health check. The application is wrapped with the Mangum handler to enable deployment on AWS Lambda.

    OAuth2PasswordBearer is used to define the OAuth2 authentication scheme for the API routes. The demopage route is protected with the oauth2_scheme dependency, which requires a valid access token to access the route. The AuthService.decode_token method is used to decode and verify the access token before allowing access to the route.

    from fastapi import FastAPI, Depends
    from fastapi.security import OAuth2PasswordBearer
    from mangum import Mangum # Import Mangum for AWS Lambda support
    
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
    
    app = FastAPI(
        title="Auth Service",
        description="IAPI authentication service",
        version="1.0.0",
    )
    
    app.include_router(auth_router)
    
    # Index health check
    @app.get('/')
    def index():
        return {"message": "Authentication service"}
    
    @app.get("/demopage")
    async def demopage(token: str = Depends(oauth2_scheme)):
        AuthService.decode_token(token)
        return {"message": "Authenicated"}
    
    handler = Mangum(app)
    

    Before running the application AWS_REGION_NAME, AWS_COGNITO_APP_CLIENT_ID, and AWS_COGNITO_USER_POOL_ID environment variables need to be set.

    Conclusion

    This implementation demonstrates a robust approach to user authentication and authorization using AWS Cognito and FastAPI. The solution leverages Pydantic models for data validation, AWS Cognito for user management, and FastAPI for API development. The AuthService class acts as an intermediary between the API and AWS Cognito, handling user requests and responses. The access token verification process ensures that only valid and untampered tokens are accepted, providing a secure foundation for user authentication and authorization.

    Part 1 of this series demonstrated how to manually create and verify JWT tokens.


    ** Reference ** Link