Source code for tardis.rest.app.routers.user

from typing import Optional
from ..scopes import User
from .. import security
from fastapi import APIRouter, Depends, Security
from fastapi_jwt_auth import AuthJWT

router = APIRouter(prefix="/user", tags=["user"])


[docs]@router.post( "/login", description="Sets httponly access token in session cookie. The scopes are optional.", # noqa B950 ) async def login( login_user: security.LoginUser, expires_delta: Optional[int] = None, Authorize: AuthJWT = Depends(), ): user = security.check_authentication(login_user.user_name, login_user.password) # set and check the scopes that are applied to the returned token if login_user.scopes is None: scopes = {"scopes": user.scopes} else: # The next two lines are very critical as if wrongly implemented a user can give his token unlimited scopes. # noqa B950 # This functionality has to be tested thoroughly security.check_scope_permissions(login_user.scopes, user.scopes) scopes = {"scopes": login_user.scopes} access_token = Authorize.create_access_token( subject=user.user_name, user_claims=scopes, expires_time=expires_delta ) refresh_token = Authorize.create_refresh_token( subject=user.user_name, user_claims=scopes ) Authorize.set_access_cookies(access_token) Authorize.set_refresh_cookies(refresh_token) # If the user doesn't have the user:get scope, he can't get the user data. # noqa B950 if User.get not in user.scopes: return { "msg": "Successfully logged in!", } else: return { "msg": "Successfully logged in!", "user": security.BaseUser( user_name=user.user_name, scopes=scopes["scopes"] ), }
[docs]@router.post( "/logout", description="Logout the current user by deleting all access token cookies", ) async def logout(Authorize: AuthJWT = Depends()): Authorize.jwt_required() Authorize.unset_jwt_cookies() return {"msg": "Successfully logged out!"}
[docs]@router.post( "/refresh", description="Use refresh token cookie to refresh expiration on cookie", # noqa B950 ) async def refresh(Authorize: AuthJWT = Depends()): Authorize.jwt_refresh_token_required() current_user = Authorize.get_jwt_subject() scopes = security.get_token_scopes(Authorize) new_access_token = Authorize.create_access_token( subject=current_user, user_claims=({"scopes": scopes}) ) Authorize.set_access_cookies(new_access_token) return {"msg": "Token successfully refreshed"}
[docs]@router.get( "/me", response_model=security.BaseUser, description="Get the user data how it's stored in the database (no password)", ) async def get_user_me( Authorize: AuthJWT = Security(security.check_authorization, scopes=[User.get]), ): Authorize.jwt_required() user_name = Authorize.get_jwt_subject() return security.get_user(user_name)
[docs]@router.get("/token_scopes", description="get scopes of CURRENT token (not of user)") async def get_token_scopes(Authorize: AuthJWT = Depends()): Authorize.jwt_required() return security.get_token_scopes(Authorize)