Python源码示例:fastapi.Request()
示例1
def get_directions_with_coordinates(
# URL values
f_lon: float,
f_lat: float,
t_lon: float,
t_lat: float,
# Query parameters
type: str,
language: str = "en",
# Request
request: Request = Depends(directions_request),
):
from_place = Latlon(f_lat, f_lon)
to_place = Latlon(t_lat, t_lon)
if not type:
raise HTTPException(status_code=400, detail='"type" query param is required')
return directions_client.get_directions(
from_place, to_place, type, language, params=request.query_params
)
示例2
def get_route_handler(self):
handler_name = self.name
original_handler = super().get_route_handler()
async def custom_handler(request: Request) -> Response:
method = request["method"]
REQUESTS_INPROGRESS.labels(method=method, handler=handler_name).inc()
try:
before_time = time.monotonic()
response = await original_handler(request)
after_time = time.monotonic()
except Exception as e:
raise e from None
else:
REQUEST_DURATION.labels(method=method, handler=handler_name).observe(
after_time - before_time
)
REQUEST_COUNT.labels(
method=method, handler=handler_name, code=response.status_code
).inc()
finally:
REQUESTS_INPROGRESS.labels(method=method, handler=handler_name).dec()
return response
return custom_handler
示例3
def get_entry_info(request: Request, entry: str):
from optimade.models import EntryInfoResource
valid_entry_info_endpoints = ENTRY_INFO_SCHEMAS.keys()
if entry not in valid_entry_info_endpoints:
raise StarletteHTTPException(
status_code=404,
detail=f"Entry info not found for {entry}, valid entry info endpoints are: {', '.join(valid_entry_info_endpoints)}",
)
schema = ENTRY_INFO_SCHEMAS[entry]()
queryable_properties = {"id", "type", "attributes"}
properties = retrieve_queryable_properties(schema, queryable_properties)
output_fields_by_format = {"json": list(properties.keys())}
return EntryInfoResponse(
meta=meta_values(str(request.url), 1, 1, more_data_available=False),
data=EntryInfoResource(
formats=list(output_fields_by_format.keys()),
description=schema.get("description", "Entry Resources"),
properties=properties,
output_fields_by_format=output_fields_by_format,
),
)
示例4
def test_existing_user(
self, test_app_client: httpx.AsyncClient, after_forgot_password, user
):
json = {"email": "king.arthur@camelot.bt"}
response = await test_app_client.post("/forgot-password", json=json)
assert response.status_code == status.HTTP_202_ACCEPTED
assert after_forgot_password.called is True
actual_user = after_forgot_password.call_args[0][0]
assert actual_user.id == user.id
actual_token = after_forgot_password.call_args[0][1]
decoded_token = jwt.decode(
actual_token,
SECRET,
audience="fastapi-users:reset",
algorithms=[JWT_ALGORITHM],
)
assert decoded_token["user_id"] == str(user.id)
request = after_forgot_password.call_args[0][2]
assert isinstance(request, Request)
示例5
def test_empty_body(
self, test_app_client: httpx.AsyncClient, user: UserDB, after_update
):
response = await test_app_client.patch(
"/me", json={}, headers={"Authorization": f"Bearer {user.id}"}
)
assert response.status_code == status.HTTP_200_OK
data = cast(Dict[str, Any], response.json())
assert data["email"] == user.email
assert after_update.called is True
actual_user = after_update.call_args[0][0]
assert actual_user.id == user.id
updated_fields = after_update.call_args[0][1]
assert updated_fields == {}
request = after_update.call_args[0][2]
assert isinstance(request, Request)
示例6
def test_valid_body(
self, test_app_client: httpx.AsyncClient, user: UserDB, after_update
):
json = {"email": "king.arthur@tintagel.bt"}
response = await test_app_client.patch(
"/me", json=json, headers={"Authorization": f"Bearer {user.id}"}
)
assert response.status_code == status.HTTP_200_OK
data = cast(Dict[str, Any], response.json())
assert data["email"] == "king.arthur@tintagel.bt"
assert after_update.called is True
actual_user = after_update.call_args[0][0]
assert actual_user.id == user.id
updated_fields = after_update.call_args[0][1]
assert updated_fields == {"email": "king.arthur@tintagel.bt"}
request = after_update.call_args[0][2]
assert isinstance(request, Request)
示例7
def test_valid_body_is_active(
self, test_app_client: httpx.AsyncClient, user: UserDB, after_update
):
json = {"is_active": False}
response = await test_app_client.patch(
"/me", json=json, headers={"Authorization": f"Bearer {user.id}"}
)
assert response.status_code == status.HTTP_200_OK
data = cast(Dict[str, Any], response.json())
assert data["is_active"] is True
assert after_update.called is True
actual_user = after_update.call_args[0][0]
assert actual_user.id == user.id
updated_fields = after_update.call_args[0][1]
assert updated_fields == {}
request = after_update.call_args[0][2]
assert isinstance(request, Request)
示例8
def get_register_router(
self, after_register: Optional[Callable[[models.UD, Request], None]] = None,
) -> APIRouter:
"""
Return a router with a register route.
:param after_register: Optional function called
after a successful registration.
"""
return get_register_router(
self.db,
self._user_model,
self._user_create_model,
self._user_db_model,
after_register,
)
示例9
def get_reset_password_router(
self,
reset_password_token_secret: str,
reset_password_token_lifetime_seconds: int = 3600,
after_forgot_password: Optional[
Callable[[models.UD, str, Request], None]
] = None,
) -> APIRouter:
"""
Return a reset password process router.
:param reset_password_token_secret: Secret to encode reset password token.
:param reset_password_token_lifetime_seconds: Lifetime of reset password token.
:param after_forgot_password: Optional function called after a successful
forgot password request.
"""
return get_reset_password_router(
self.db,
reset_password_token_secret,
reset_password_token_lifetime_seconds,
after_forgot_password,
)
示例10
def get_users_router(
self,
after_update: Optional[
Callable[[models.UD, Dict[str, Any], Request], None]
] = None,
) -> APIRouter:
"""
Return a router with routes to manage users.
:param after_update: Optional function called
after a successful user update.
"""
return get_users_router(
self.db,
self._user_model,
self._user_update_model,
self._user_db_model,
self.authenticator,
after_update,
)
示例11
def group(request: Request, response: Response, group_name: Optional[str] = None) -> Response:
global state
if group_name is not None:
res = []
for s in state.sources:
for k, p in sorted(s.packages.items()):
if group_name in p.groups:
res.append(p)
return templates.TemplateResponse("group.html", {
"request": request,
"name": group_name,
"packages": res,
}, headers=dict(response.headers))
else:
groups: Dict[str, int] = {}
for s in state.sources:
for k, p in sorted(s.packages.items()):
for name in p.groups:
groups[name] = groups.get(name, 0) + 1
return templates.TemplateResponse('groups.html', {
"request": request,
"groups": groups,
}, headers=dict(response.headers))
示例12
def package(request: Request, response: Response, package_name: Optional[str] = None, repo: Optional[str] = None, variant: Optional[str] = None) -> Response:
global state
packages = []
for s in state.sources:
for k, p in sorted(s.packages.items()):
if package_name is None or p.name == package_name or package_name in p.provides:
if not repo or p.repo == repo:
if not variant or p.repo_variant == variant:
packages.append((s, p))
if package_name is not None:
return templates.TemplateResponse("package.html", {
"request": request,
"packages": packages,
}, headers=dict(response.headers))
else:
return templates.TemplateResponse("packages.html", {
"request": request,
"packages": packages,
}, headers=dict(response.headers))
示例13
def queue(request: Request, response: Response) -> Response:
# Create entries for all packages where the version doesn't match
updates = []
for s in state.sources:
for k, p in sorted(s.packages.items()):
if p.name in state.sourceinfos:
srcinfo = state.sourceinfos[p.name]
if package_name_is_vcs(s.name):
continue
if version_is_newer_than(srcinfo.build_version, p.version):
updates.append((srcinfo, s, p))
break
updates.sort(
key=lambda i: (i[0].date, i[0].pkgbase, i[0].pkgname),
reverse=True)
return templates.TemplateResponse("queue.html", {
"request": request,
"updates": updates,
}, headers=dict(response.headers))
示例14
def new(request: Request, response: Response) -> Response:
# Create dummy entries for all GIT only packages
available = {}
for srcinfo in state.sourceinfos.values():
if package_name_is_vcs(srcinfo.pkgbase):
continue
available[srcinfo.pkgbase] = srcinfo
for s in state.sources:
available.pop(s.name, None)
new = list(available.values())
new.sort(
key=lambda i: (i.date, i.pkgbase, i.pkgname),
reverse=True)
return templates.TemplateResponse("new.html", {
"request": request,
"new": new,
}, headers=dict(response.headers))
示例15
def github_payload(request: Request) -> Response:
secret = os.environ.get("GITHUB_WEBHOOK_SECRET")
if not secret:
raise HTTPException(500, 'webhook secret config incomplete')
if not await check_github_signature(request, secret):
raise HTTPException(400, 'Invalid signature')
event = request.headers.get('X-GitHub-Event', '')
if event == 'ping':
return JSONResponse({'msg': 'pong'})
if event == 'push':
account = os.environ.get("APPVEYOR_ACCOUNT")
project = os.environ.get("APPVEYOR_PROJECT")
token = os.environ.get("APPVEYOR_TOKEN")
if not account or not project or not token:
raise HTTPException(500, 'appveyor config incomplete')
build_url = await trigger_appveyor_build(account, project, token)
return JSONResponse({'msg': 'triggered a build: %s' % build_url})
else:
raise HTTPException(400, 'Unsupported event type: ' + event)
示例16
def read_main(request: Request):
return {"message": "Hello World", "root_path": request.scope.get("root_path")}
示例17
def check_gzip_request(request: Request):
return {"request_class": type(request).__name__}
示例18
def read_items(request: Request):
return JSONResponse({"hello": "world"})
示例19
def db_session_middleware(request: Request, call_next):
response = Response("Internal server error", status_code=500)
try:
request.state.db = SessionLocal()
response = await call_next(request)
finally:
request.state.db.close()
return response
# Dependency
示例20
def get_db(request: Request):
return request.state.db
示例21
def unicorn_exception_handler(request: Request, exc: UnicornException):
return JSONResponse(
status_code=418,
content={"message": f"Oops! {exc.name} did something. There goes a rainbow..."},
)
示例22
def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
示例23
def read_item(request: Request, id: str):
return templates.TemplateResponse("item.html", {"request": request, "id": id})
示例24
def read_root(item_id: str, request: Request):
client_host = request.client.host
return {"client_host": client_host, "item_id": item_id}
示例25
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
try:
return await original_route_handler(request)
except RequestValidationError as exc:
body = await request.body()
detail = {"errors": exc.errors(), "body": body.decode()}
raise HTTPException(status_code=422, detail=detail)
return custom_route_handler
示例26
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
before = time.time()
response: Response = await original_route_handler(request)
duration = time.time() - before
response.headers["X-Response-Time"] = str(duration)
print(f"route duration: {duration}")
print(f"route response: {response}")
print(f"route response headers: {response.headers}")
return response
return custom_route_handler
示例27
def read_main(request: Request):
return {"message": "Hello World", "root_path": request.scope.get("root_path")}
示例28
def read_main(request: Request):
return {"message": "Hello World", "root_path": request.scope.get("root_path")}
示例29
def db_session_middleware(request: Request, call_next):
response = await call_next(request)
# TODO: only set it when there is an Origin header!
response.headers["Access-Control-Allow-Origin"] = "*"
return response
示例30
def get_place(
id: str,
request: Request,
background_tasks: BackgroundTasks,
lang: str = None,
type=None,
verbosity=DEFAULT_VERBOSITY,
):
"""Main handler that returns the requested place"""
verbosity = validate_verbosity(verbosity)
lang = validate_lang(lang)
try:
place = place_from_id(id, type)
except InvalidPlaceId as e:
raise HTTPException(status_code=404, detail=e.message)
except RedirectToPlaceId as e:
path_prefix = request.headers.get("x-forwarded-prefix", "").rstrip("/")
path = request.app.url_path_for("get_place", id=e.target_id)
query = request.url.query
return JSONResponse(
status_code=303,
headers={"location": str(URL(path=f"{path_prefix}{path}", query=query))},
content={"id": e.target_id},
)
log_place_request(place, request.headers)
if settings["BLOCK_COVID_ENABLED"] and settings["COVID19_USE_REDIS_DATASET"]:
background_tasks.add_task(covid19_osm_task)
return place.load_place(lang, verbosity)