Sie können OAuth2-Scopes direkt in FastAPI verwenden, sie sind nahtlos integriert.
Das ermöglicht es Ihnen, ein feingranuliertes Berechtigungssystem nach dem OAuth2-Standard in Ihre OpenAPI-Anwendung (und deren API-Dokumentation) zu integrieren.
OAuth2 mit Scopes ist der Mechanismus, der von vielen großen Authentifizierungsanbietern wie Facebook, Google, GitHub, Microsoft, Twitter usw. verwendet wird. Sie verwenden ihn, um Benutzern und Anwendungen spezifische Berechtigungen zu erteilen.
Jedes Mal, wenn Sie sich mit Facebook, Google, GitHub, Microsoft oder Twitter anmelden („log in with“), verwendet die entsprechende Anwendung OAuth2 mit Scopes.
In diesem Abschnitt erfahren Sie, wie Sie Authentifizierung und Autorisierung mit demselben OAuth2, mit Scopes in Ihrer FastAPI-Anwendung verwalten.
Achtung
Dies ist ein mehr oder weniger fortgeschrittener Abschnitt. Wenn Sie gerade erst anfangen, können Sie ihn überspringen.
Sie benötigen nicht unbedingt OAuth2-Scopes, und Sie können die Authentifizierung und Autorisierung handhaben wie Sie möchten.
Aber OAuth2 mit Scopes kann bequem in Ihre API (mit OpenAPI) und deren API-Dokumentation integriert werden.
Dennoch, verwenden Sie solche Scopes oder andere Sicherheits-/Autorisierungsanforderungen in Ihrem Code so wie Sie es möchten.
In vielen Fällen kann OAuth2 mit Scopes ein Overkill sein.
Aber wenn Sie wissen, dass Sie es brauchen oder neugierig sind, lesen Sie weiter.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,List,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Sehen wir uns diese Änderungen nun Schritt für Schritt an.
Die erste Änderung ist, dass wir jetzt das OAuth2-Sicherheitsschema mit zwei verfügbaren Scopes deklarieren: me und items.
Der scopes-Parameter erhält ein dict mit jedem Scope als Schlüssel und dessen Beschreibung als Wert:
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,List,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Da wir diese Scopes jetzt deklarieren, werden sie in der API-Dokumentation angezeigt, wenn Sie sich einloggen/autorisieren.
Und Sie können auswählen, auf welche Scopes Sie Zugriff haben möchten: me und items.
Das ist derselbe Mechanismus, der verwendet wird, wenn Sie beim Anmelden mit Facebook, Google, GitHub, usw. Berechtigungen erteilen:
Ändern Sie nun die Token-Pfadoperation, um die angeforderten Scopes zurückzugeben.
Wir verwenden immer noch dasselbe OAuth2PasswordRequestForm. Es enthält eine Eigenschaft scopes mit einer liste von strs für jeden Scope, den es im Request erhalten hat.
Und wir geben die Scopes als Teil des JWT-Tokens zurück.
Gefahr
Der Einfachheit halber fügen wir hier die empfangenen Scopes direkt zum Token hinzu.
Aus Sicherheitsgründen sollten Sie jedoch sicherstellen, dass Sie in Ihrer Anwendung nur die Scopes hinzufügen, die der Benutzer tatsächlich haben kann, oder die Sie vordefiniert haben.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,List,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Scopes in Pfadoperationen und Abhängigkeiten deklarieren¶
Jetzt deklarieren wir, dass die Pfadoperation für /users/me/items/ den Scope items erfordert.
Dazu importieren und verwenden wir Security von fastapi.
Sie können Security verwenden, um Abhängigkeiten zu deklarieren (genau wie Depends), aber Security erhält auch einen Parameter scopes mit einer Liste von Scopes (Strings).
In diesem Fall übergeben wir eine Abhängigkeitsfunktion get_current_active_user an Security (genauso wie wir es mit Depends tun würden).
Wir übergeben aber auch eine liste von Scopes, in diesem Fall mit nur einem Scope: items (es könnten mehrere sein).
Und die Abhängigkeitsfunktion get_current_active_user kann auch Unterabhängigkeiten deklarieren, nicht nur mit Depends, sondern auch mit Security. Ihre eigene Unterabhängigkeitsfunktion (get_current_user) und weitere Scope-Anforderungen deklarierend.
In diesem Fall erfordert sie den Scope me (sie könnte mehr als einen Scope erfordern).
Hinweis
Sie müssen nicht unbedingt an verschiedenen Stellen verschiedene Scopes hinzufügen.
Wir tun dies hier, um zu demonstrieren, wie FastAPI auf verschiedenen Ebenen deklarierte Scopes verarbeitet.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,List,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Technische Details
Security ist tatsächlich eine Unterklasse von Depends und hat nur noch einen zusätzlichen Parameter, den wir später kennenlernen werden.
Durch die Verwendung von Security anstelle von Depends weiß FastAPI jedoch, dass es Sicherheits-Scopes deklarieren, intern verwenden und die API mit OpenAPI dokumentieren kann.
Wenn Sie jedoch Query, Path, Depends, Security und andere von fastapi importieren, handelt es sich tatsächlich um Funktionen, die spezielle Klassen zurückgeben.
Aktualisieren Sie nun die Abhängigkeit get_current_user.
Das ist diejenige, die von den oben genannten Abhängigkeiten verwendet wird.
Hier verwenden wir dasselbe OAuth2-Schema, das wir zuvor erstellt haben, und deklarieren es als Abhängigkeit: oauth2_scheme.
Da diese Abhängigkeitsfunktion selbst keine Scope-Anforderungen hat, können wir Depends mit oauth2_scheme verwenden. Wir müssen Security nicht verwenden, wenn wir keine Sicherheits-Scopes angeben müssen.
Wir deklarieren auch einen speziellen Parameter vom Typ SecurityScopes, der aus fastapi.security importiert wird.
Diese SecurityScopes-Klasse ähnelt Request (Request wurde verwendet, um das Request-Objekt direkt zu erhalten).
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,List,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Der Parameter security_scopes wird vom Typ SecurityScopes sein.
Dieses verfügt über ein Attribut scopes mit einer Liste, die alle von ihm selbst benötigten Scopes enthält und ferner alle Abhängigkeiten, die dieses als Unterabhängigkeit verwenden. Sprich, alle „Dependanten“ ... das mag verwirrend klingen, wird aber später noch einmal erklärt.
Das security_scopes-Objekt (der Klasse SecurityScopes) stellt außerdem ein scope_str-Attribut mit einem einzelnen String bereit, der die durch Leerzeichen getrennten Scopes enthält (den werden wir verwenden).
Wir erstellen eine HTTPException, die wir später an mehreren Stellen wiederverwenden (raisen) können.
In diese Exception fügen wir (falls vorhanden) die erforderlichen Scopes als durch Leerzeichen getrennten String ein (unter Verwendung von scope_str). Wir fügen diesen String mit den Scopes in den Header WWW-Authenticate ein (das ist Teil der Spezifikation).
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,List,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Wir verifizieren, dass wir einen username erhalten, und extrahieren die Scopes.
Und dann validieren wir diese Daten mit dem Pydantic-Modell (wobei wir die ValidationError-Exception abfangen), und wenn wir beim Lesen des JWT-Tokens oder beim Validieren der Daten mit Pydantic einen Fehler erhalten, lösen wir die zuvor erstellte HTTPException aus.
Dazu aktualisieren wir das Pydantic-Modell TokenData mit einem neuen Attribut scopes.
Durch die Validierung der Daten mit Pydantic können wir sicherstellen, dass wir beispielsweise präzise eine liste von strs mit den Scopes und einen str mit dem username haben.
Anstelle beispielsweise eines dicts oder etwas anderem, was später in der Anwendung zu Fehlern führen könnte und darum ein Sicherheitsrisiko darstellt.
Wir verifizieren auch, dass wir einen Benutzer mit diesem Benutzernamen haben, und wenn nicht, lösen wir dieselbe Exception aus, die wir zuvor erstellt haben.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,List,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Wir überprüfen nun, ob das empfangenen Token alle Scopes enthält, die von dieser Abhängigkeit und deren Verwendern (einschließlich Pfadoperationen) gefordert werden. Andernfalls lösen wir eine HTTPException aus.
Hierzu verwenden wir security_scopes.scopes, das eine liste mit allen diesen Scopes als str enthält.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,List,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationErrorfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Tipp
Bevorzugen Sie die Annotated-Version, falls möglich.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportList,UnionfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=Nonescopes:List[str]=[]classUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_scopes=payload.get("scopes",[])token_data=TokenData(scopes=token_scopes,username=username)except(JWTError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scopes":form_data.scopes},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Sehen wir uns diesen Abhängigkeitsbaum und die Scopes noch einmal an.
Da die Abhängigkeit get_current_active_user von get_current_user abhängt, wird der bei get_current_active_user deklarierte Scope "me" in die Liste der erforderlichen Scopes in security_scopes.scopes aufgenommen, das an get_current_user übergeben wird.
Die Pfadoperation selbst deklariert auch einen Scope, "items", sodass dieser auch in der Liste der security_scopes.scopes enthalten ist, die an get_current_user übergeben wird.
So sieht die Hierarchie der Abhängigkeiten und Scopes aus:
Die Pfadoperationread_own_items hat:
Erforderliche Scopes ["items"] mit der Abhängigkeit:
get_current_active_user:
Die Abhängigkeitsfunktion get_current_active_user hat:
Erforderliche Scopes ["me"] mit der Abhängigkeit:
get_current_user:
Die Abhängigkeitsfunktion get_current_user hat:
Selbst keine erforderlichen Scopes.
Eine Abhängigkeit, die oauth2_scheme verwendet.
Einen security_scopes-Parameter vom Typ SecurityScopes:
Dieser security_scopes-Parameter hat ein Attribut scopes mit einer liste, die alle oben deklarierten Scopes enthält, sprich:
security_scopes.scopes enthält ["me", "items"] für die Pfadoperationread_own_items.
security_scopes.scopes enthält ["me"] für die Pfadoperationread_users_me, da das in der Abhängigkeit get_current_active_user deklariert ist.
security_scopes.scopes wird [] (nichts) für die Pfadoperationread_system_status enthalten, da diese keine Security mit scopes deklariert hat, und deren Abhängigkeit get_current_user ebenfalls keinerlei scopes deklariert.
Tipp
Das Wichtige und „Magische“ hier ist, dass get_current_user für jede Pfadoperation eine andere Liste von scopes hat, die überprüft werden.
Alles hängt von den „Scopes“ ab, die in jeder Pfadoperation und jeder Abhängigkeit im Abhängigkeitsbaum für diese bestimmte Pfadoperation deklariert wurden.
Sie können SecurityScopes an jeder Stelle und an mehreren Stellen verwenden, es muss sich nicht in der „Wurzel“-Abhängigkeit befinden.
Es wird immer die Sicherheits-Scopes enthalten, die in den aktuellen Security-Abhängigkeiten deklariert sind und in allen Abhängigkeiten für diese spezifischePfadoperation und diesen spezifischen Abhängigkeitsbaum.
Da die SecurityScopes alle von den Verwendern der Abhängigkeiten deklarierten Scopes enthalten, können Sie damit überprüfen, ob ein Token in einer zentralen Abhängigkeitsfunktion über die erforderlichen Scopes verfügt, und dann unterschiedliche Scope-Anforderungen in unterschiedlichen Pfadoperationen deklarieren.
Diese werden für jede Pfadoperation unabhängig überprüft.
Wenn Sie die API-Dokumentation öffnen, können Sie sich authentisieren und angeben, welche Scopes Sie autorisieren möchten.
Wenn Sie keinen Scope auswählen, werden Sie „authentifiziert“, aber wenn Sie versuchen, auf /users/me/ oder /users/me/items/ zuzugreifen, wird eine Fehlermeldung angezeigt, die sagt, dass Sie nicht über genügend Berechtigungen verfügen. Sie können aber auf /status/ zugreifen.
Und wenn Sie den Scope me, aber nicht den Scope items auswählen, können Sie auf /users/me/ zugreifen, aber nicht auf /users/me/items/.
Das würde einer Drittanbieteranwendung passieren, die versucht, auf eine dieser Pfadoperationen mit einem Token zuzugreifen, das von einem Benutzer bereitgestellt wurde, abhängig davon, wie viele Berechtigungen der Benutzer dieser Anwendung erteilt hat.
In diesem Beispiel verwenden wir den OAuth2-Flow „Password“.
Das ist angemessen, wenn wir uns bei unserer eigenen Anwendung anmelden, wahrscheinlich mit unserem eigenen Frontend.
Weil wir darauf vertrauen können, dass es den username und das password erhält, welche wir kontrollieren.
Wenn Sie jedoch eine OAuth2-Anwendung erstellen, mit der andere eine Verbindung herstellen würden (d.h. wenn Sie einen Authentifizierungsanbieter erstellen, der Facebook, Google, GitHub usw. entspricht), sollten Sie einen der anderen Flows verwenden.
Am häufigsten ist der „Implicit“-Flow.
Am sichersten ist der „Code“-Flow, die Implementierung ist jedoch komplexer, da mehr Schritte erforderlich sind. Da er komplexer ist, schlagen viele Anbieter letztendlich den „Implicit“-Flow vor.
Hinweis
Es ist üblich, dass jeder Authentifizierungsanbieter seine Flows anders benennt, um sie zu einem Teil seiner Marke zu machen.
Aber am Ende implementieren sie denselben OAuth2-Standard.
FastAPI enthält Werkzeuge für alle diese OAuth2-Authentifizierungs-Flows in fastapi.security.oauth2.
Auf die gleiche Weise können Sie eine liste von Depends im Parameter dependencies des Dekorators definieren (wie in Abhängigkeiten in Pfadoperation-Dekoratoren erläutert), Sie könnten auch dort Security mit scopes verwenden.