Python源码示例:fastapi.Body()
示例1
def update_user_me(
*,
password: str = Body(None),
full_name: str = Body(None),
email: EmailStr = Body(None),
current_user: UserInDB = Depends(get_current_active_user),
):
"""
Update own user.
"""
user_in = UserUpdate(**current_user.dict())
if password is not None:
user_in.password = password
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
bucket = get_default_bucket()
user = crud.user.update(bucket, username=current_user.username, user_in=user_in)
return user
示例2
def reset_password(token: str = Body(...), new_password: str = Body(...)):
"""
Reset password.
"""
username = verify_password_reset_token(token)
if not username:
raise HTTPException(status_code=400, detail="Invalid token")
bucket = get_default_bucket()
user = crud.user.get(bucket, username=username)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
user_in = UserUpdate(name=username, password=new_password)
user = crud.user.update(bucket, username=username, user_in=user_in)
return {"msg": "Password updated successfully"}
示例3
def read_items(
item_id: UUID,
start_datetime: Optional[datetime] = Body(None),
end_datetime: Optional[datetime] = Body(None),
repeat_at: Optional[time] = Body(None),
process_after: Optional[timedelta] = Body(None),
):
start_process = start_datetime + process_after
duration = end_datetime - start_process
return {
"item_id": item_id,
"start_datetime": start_datetime,
"end_datetime": end_datetime,
"repeat_at": repeat_at,
"process_after": process_after,
"start_process": start_process,
"duration": duration,
}
示例4
def get_autocomplete(
query: QueryParams = Depends(QueryParams), extra: ExtraParams = Body(ExtraParams())
):
async def get_intentions():
if not query.nlu or query.lang not in nlu_allowed_languages:
return None
focus = None
if query.lon and query.lat:
focus = Point(query.lon, query.lat)
return await nlu_client.get_intentions(text=query.q, lang=query.lang, focus=focus)
autocomplete_response, intentions = await asyncio.gather(
bragi_client.autocomplete(query, extra), get_intentions()
)
if intentions is not None:
autocomplete_response["intentions"] = intentions
return autocomplete_response
示例5
def create_new_article(
article_create: ArticleInCreate = Body(..., embed=True, alias="article"),
user: User = Depends(get_current_user_authorizer()),
articles_repo: ArticlesRepository = Depends(get_repository(ArticlesRepository)),
) -> ArticleInResponse:
slug = get_slug_for_article(article_create.title)
if await check_article_exists(articles_repo, slug):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=strings.ARTICLE_ALREADY_EXISTS,
)
article = await articles_repo.create_article(
slug=slug,
title=article_create.title,
description=article_create.description,
body=article_create.body,
author=user,
tags=article_create.tags,
)
return ArticleInResponse(article=ArticleForResponse.from_orm(article))
示例6
def tokenize(self, text_input: str = Body(None, embed=True), return_ids: bool = Body(False, embed=True)):
"""
Tokenize the provided input and eventually returns corresponding tokens id:
- **text_input**: String to tokenize
- **return_ids**: Boolean flags indicating if the tokens have to be converted to their integer mapping.
"""
try:
tokens_txt = self._pipeline.tokenizer.tokenize(text_input)
if return_ids:
tokens_ids = self._pipeline.tokenizer.convert_tokens_to_ids(tokens_txt)
return ServeTokenizeResult(tokens=tokens_txt, tokens_ids=tokens_ids)
else:
return ServeTokenizeResult(tokens=tokens_txt)
except Exception as e:
raise HTTPException(status_code=500, detail={"model": "", "error": str(e)})
示例7
def detokenize(
self,
tokens_ids: List[int] = Body(None, embed=True),
skip_special_tokens: bool = Body(False, embed=True),
cleanup_tokenization_spaces: bool = Body(True, embed=True),
):
"""
Detokenize the provided tokens ids to readable text:
- **tokens_ids**: List of tokens ids
- **skip_special_tokens**: Flag indicating to not try to decode special tokens
- **cleanup_tokenization_spaces**: Flag indicating to remove all leading/trailing spaces and intermediate ones.
"""
try:
decoded_str = self._pipeline.tokenizer.decode(tokens_ids, skip_special_tokens, cleanup_tokenization_spaces)
return ServeDeTokenizeResult(model="", text=decoded_str)
except Exception as e:
raise HTTPException(status_code=500, detail={"model": "", "error": str(e)})
示例8
def forward(self, inputs=Body(None, embed=True)):
"""
**inputs**:
**attention_mask**:
**tokens_type_ids**:
"""
# Check we don't have empty string
if len(inputs) == 0:
return ServeForwardResult(output=[], attention=[])
try:
# Forward through the model
output = self._pipeline(inputs)
return ServeForwardResult(output=output)
except Exception as e:
raise HTTPException(500, {"error": str(e)})
示例9
def update_user_me(
*,
db: Session = Depends(deps.get_db),
password: str = Body(None),
full_name: str = Body(None),
email: EmailStr = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.UserUpdate(**current_user_data)
if password is not None:
user_in.password = password
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
示例10
def create_user_open(
*,
db: Session = Depends(deps.get_db),
password: str = Body(...),
email: EmailStr = Body(...),
full_name: str = Body(None),
) -> Any:
"""
Create new user without the need to be logged in.
"""
if not settings.USERS_OPEN_REGISTRATION:
raise HTTPException(
status_code=403,
detail="Open user registration is forbidden on this server",
)
user = crud.user.get_by_email(db, email=email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system",
)
user_in = schemas.UserCreate(password=password, email=email, full_name=full_name)
user = crud.user.create(db, obj_in=user_in)
return user
示例11
def reset_password(
token: str = Body(...),
new_password: str = Body(...),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Reset password
"""
email = verify_password_reset_token(token)
if not email:
raise HTTPException(status_code=400, detail="Invalid token")
user = crud.user.get_by_email(db, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
hashed_password = get_password_hash(new_password)
user.hashed_password = hashed_password
db.add(user)
db.commit()
return {"msg": "Password updated successfully"}
示例12
def config_edit(data: RequestConfigEdit = Body(...)) -> StatusSchema:
"""
Change a given configuration key to a specified value.
This endpoint is not stable and may be subject to change in the future.
"""
db.set_plugin_configuration_key(data.plugin, data.key, data.value)
return StatusSchema(status="ok")
示例13
def create_user_open(
*,
username: str = Body(...),
password: str = Body(...),
email: EmailStr = Body(None),
full_name: str = Body(None),
):
"""
Create new user without the need to be logged in.
"""
if not config.USERS_OPEN_REGISTRATION:
raise HTTPException(
status_code=403,
detail="Open user resistration is forbidden on this server",
)
bucket = get_default_bucket()
user = crud.user.get(bucket, username=username)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system",
)
user_in = UserCreate(
username=username, password=password, email=email, full_name=full_name
)
user = crud.user.upsert(bucket, user_in=user_in, persist_to=1)
if config.EMAILS_ENABLED and user_in.email:
send_new_account_email(
email_to=user_in.email, username=user_in.username, password=user_in.password
)
return user
示例14
def compute(a: int = Body(...), b: str = Body(...)):
return {"a": a, "b": b}
示例15
def compute(a: int = Body(...), b: str = Body(...)):
return {"a": a, "b": b}
示例16
def create_product(data: Product = Body(..., media_type=media_type, embed=True)):
pass # pragma: no cover
示例17
def create_shop(
data: Shop = Body(..., media_type=media_type),
included: typing.List[Product] = Body([], media_type=media_type),
):
pass # pragma: no cover
示例18
def update_item(
item_id: int,
item: Item = Body(
...,
example={
"name": "Foo",
"description": "A very nice Item",
"price": 35.4,
"tax": 3.2,
},
),
):
results = {"item_id": item_id, "item": item}
return results
示例19
def update_item(item_id: int, item: Item = Body(..., embed=True)):
results = {"item_id": item_id, "item": item}
return results
示例20
def update_item(item_id: int, item: Item = Body(..., embed=True)):
results = {"item_id": item_id, "item": item}
return results
示例21
def update_item(
item_id: int, item: Item, user: User, importance: int = Body(...)
):
results = {"item_id": item_id, "item": item, "user": user, "importance": importance}
return results
示例22
def sum_numbers(numbers: List[int] = Body(...)):
return sum(numbers)
示例23
def update_current_user(
user_update: UserInUpdate = Body(..., embed=True, alias="user"),
current_user: User = Depends(get_current_user_authorizer()),
users_repo: UsersRepository = Depends(get_repository(UsersRepository)),
) -> UserInResponse:
if user_update.username and user_update.username != current_user.username:
if await check_username_is_taken(users_repo, user_update.username):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail=strings.USERNAME_TAKEN,
)
if user_update.email and user_update.email != current_user.email:
if await check_email_is_taken(users_repo, user_update.email):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail=strings.EMAIL_TAKEN,
)
user = await users_repo.update_user(user=current_user, **user_update.dict())
token = jwt.create_access_token_for_user(user, str(config.SECRET_KEY))
return UserInResponse(
user=UserWithToken(
username=user.username,
email=user.email,
bio=user.bio,
image=user.image,
token=token,
),
)
示例24
def create_comment_for_article(
comment_create: CommentInCreate = Body(..., embed=True, alias="comment"),
article: Article = Depends(get_article_by_slug_from_path),
user: User = Depends(get_current_user_authorizer()),
comments_repo: CommentsRepository = Depends(get_repository(CommentsRepository)),
) -> CommentInResponse:
comment = await comments_repo.create_comment_for_article(
body=comment_create.body, article=article, user=user,
)
return CommentInResponse(comment=comment)
示例25
def update_article_by_slug(
article_update: ArticleInUpdate = Body(..., embed=True, alias="article"),
current_article: Article = Depends(get_article_by_slug_from_path),
articles_repo: ArticlesRepository = Depends(get_repository(ArticlesRepository)),
) -> ArticleInResponse:
slug = get_slug_for_article(article_update.title) if article_update.title else None
article = await articles_repo.update_article(
article=current_article, slug=slug, **article_update.dict(),
)
return ArticleInResponse(article=ArticleForResponse.from_orm(article))
示例26
def login(
user_login: UserInLogin = Body(..., embed=True, alias="user"),
users_repo: UsersRepository = Depends(get_repository(UsersRepository)),
) -> UserInResponse:
wrong_login_error = HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail=strings.INCORRECT_LOGIN_INPUT,
)
try:
user = await users_repo.get_user_by_email(email=user_login.email)
except EntityDoesNotExist as existence_error:
raise wrong_login_error from existence_error
if not user.check_password(user_login.password):
raise wrong_login_error
token = jwt.create_access_token_for_user(user, str(config.SECRET_KEY))
return UserInResponse(
user=UserWithToken(
username=user.username,
email=user.email,
bio=user.bio,
image=user.image,
token=token,
),
)
示例27
def pydantic_exception(item: str = Body(...)):
return {"item": item}
示例28
def pydantic_exception_duplicate(
kind: Optional[Kind] = Body(default=Kind.a), temp: int = Body(default=1)
):
return {"kind": kind}
示例29示例30
def query(
data: QueryRequestSchema = Body(...),
) -> Union[QueryResponseSchema, List[ParseResponseSchema]]:
"""
Starts a new search. Response will contain a new job ID that can be used
to check the job status and download matched files.
"""
try:
rules = parse_yara(data.raw_yara)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Yara rule parsing failed: {e}"
)
if not rules:
raise HTTPException(status_code=400, detail=f"No rule was specified.")
if data.method == RequestQueryMethod.parse:
return [
ParseResponseSchema(
rule_name=rule.name,
rule_author=rule.author,
is_global=rule.is_global,
is_private=rule.is_private,
parsed=rule.parse().query,
)
for rule in rules
]
active_agents = db.get_active_agents()
for agent, agent_spec in active_agents.items():
missing = set(data.required_plugins).difference(
agent_spec.active_plugins
)
if missing:
raise HTTPException(
status_code=409,
detail=f"Agent {agent} doesn't support "
f"required plugins: {', '.join(missing)}",
)
if not data.taints:
data.taints = []
job = db.create_search_task(
rules[-1].name,
rules[-1].author,
data.raw_yara,
data.priority,
data.files_limit or 0,
data.reference or "",
data.taints,
list(active_agents.keys()),
)
return QueryResponseSchema(query_hash=job.hash)