Python源码示例:responses.add_callback()
示例1
def test_retry(self):
class request_callback:
def __init__(self):
self.first_req = True
def __call__(self, req):
if self.first_req:
self.first_req = False
return (500, {}, "Internal Server Error.")
return (200, {}, '{"speedLimits":[]}')
responses.add_callback(
responses.GET,
"https://roads.googleapis.com/v1/speedLimits",
content_type="application/json",
callback=request_callback(),
)
self.client.speed_limits([])
self.assertEqual(2, len(responses.calls))
self.assertEqual(responses.calls[0].request.url, responses.calls[1].request.url)
示例2
def test_retry(self):
class request_callback:
def __init__(self):
self.first_req = True
def __call__(self, req):
if self.first_req:
self.first_req = False
return (200, {}, '{"status":"OVER_QUERY_LIMIT"}')
return (200, {}, '{"status":"OK","results":[]}')
responses.add_callback(
responses.GET,
"https://maps.googleapis.com/maps/api/geocode/json",
content_type="application/json",
callback=request_callback(),
)
client = googlemaps.Client(key="AIzaasdf")
client.geocode("Sesame St.")
self.assertEqual(2, len(responses.calls))
self.assertEqual(responses.calls[0].request.url, responses.calls[1].request.url)
示例3
def test_retry_intermittent(self):
class request_callback:
def __init__(self):
self.first_req = True
def __call__(self, req):
if self.first_req:
self.first_req = False
return (500, {}, "Internal Server Error.")
return (200, {}, '{"status":"OK","results":[]}')
responses.add_callback(
responses.GET,
"https://maps.googleapis.com/maps/api/geocode/json",
content_type="application/json",
callback=request_callback(),
)
client = googlemaps.Client(key="AIzaasdf")
client.geocode("Sesame St.")
self.assertEqual(2, len(responses.calls))
示例4
def test_token_cached_per_repo(self):
responses.add(responses.GET, BEARER_REALM_URL + '?scope=repository:fedora:pull',
json={'token': BEARER_TOKEN}, match_querystring=True)
responses.add(responses.GET, BEARER_REALM_URL + '?scope=repository:centos:pull',
json={'token': BEARER_TOKEN}, match_querystring=True)
fedora_url = 'https://registry.example.com/v2/fedora/tags/list'
responses.add_callback(responses.GET, fedora_url, callback=bearer_unauthorized_callback)
responses.add(responses.GET, fedora_url, status=200, json='fedora-success')
responses.add(responses.GET, fedora_url, status=200, json='fedora-success-also')
centos_url = 'https://registry.example.com/v2/centos/tags/list'
responses.add_callback(responses.GET, centos_url, callback=bearer_unauthorized_callback)
responses.add(responses.GET, centos_url, status=200, json='centos-success')
responses.add(responses.GET, centos_url, status=200, json='centos-success-also')
auth = HTTPBearerAuth()
assert requests.get(fedora_url, auth=auth).json() == 'fedora-success'
assert requests.get(fedora_url, auth=auth).json() == 'fedora-success-also'
assert requests.get(centos_url, auth=auth).json() == 'centos-success'
assert requests.get(centos_url, auth=auth).json() == 'centos-success-also'
assert len(responses.calls) == 8
示例5
def test_non_401_error_propagated(self):
def bearer_teapot_callback(request):
headers = {'www-authenticate': 'Bearer realm={}'.format(BEARER_REALM_URL)}
return (418, headers, json.dumps("I'm a teapot!"))
url = 'https://registry.example.com/v2/fedora/tags/list'
responses.add_callback(responses.GET, url, callback=bearer_teapot_callback)
responses.add(responses.GET, url, status=200, json='success') # Not actually called
auth = HTTPBearerAuth()
response = requests.get(url, auth=auth)
assert response.json() == "I'm a teapot!"
assert response.status_code == 418
assert len(responses.calls) == 1
示例6
def test_not_bearer_auth(self):
url = 'https://registry.example.com/v2/fedora/tags/list'
def unsupported_callback(request):
headers = {'www-authenticate': 'Spam realm={}'.format(BEARER_REALM_URL)}
return (401, headers, json.dumps('unauthorized'))
responses.add_callback(responses.GET, url, callback=unsupported_callback)
responses.add(responses.GET, url, status=200, json='success') # Not actually called
auth = HTTPBearerAuth()
response = requests.get(url, auth=auth)
assert response.json() == 'unauthorized'
assert response.status_code == 401
assert len(responses.calls) == 1
示例7
def test_get_identities(self):
"""
The get_identities function should call the correct URL, and return the relevant identities.
"""
identity_store = IdentityStore("http://identitystore.org/", "auth-token", "msisdn")
responses.add_callback(
responses.GET,
"http://identitystore.org/api/v1/identities/?details__name=test",
match_querystring=True,
callback=self.get_identities_callback,
content_type="application/json",
)
[identity] = identity_store.get_identities(details__name="test")
self.assertEqual(identity.name, "test")
示例8
def test_get_identities_for_address(self):
"""
The get_identities_for_address to create the correct request to list all of the identities for a specified
address and address type.
"""
identity_store = IdentityStore("http://identitystore.org/", "auth-token", "msisdn")
query = "?details__addresses__msisdn=%2B1234"
url = "http://identitystore.org/api/v1/identities/search/"
responses.add_callback(
responses.GET,
url + query,
callback=self.single_identity_callback,
match_querystring=True,
content_type="application/json",
)
[identity] = identity_store.get_identities_for_address("+1234")
self.assertEqual(identity["details"]["addresses"]["msisdn"], {"+1234": {}})
示例9
def test_create_identity(self):
"""
The create_identity method should make the correct request to create an identity with the correct details in the
identity store.
"""
identity_store = IdentityStore("http://identitystore.org/", "auth-token", "msisdn")
url = "http://identitystore.org/api/v1/identities/"
responses.add_callback(
responses.POST,
url,
callback=self.create_identity_callback,
match_querystring=True,
content_type="application/json",
)
identity = identity_store.create_identity(["tel:+1234"], name="Test identity", language="eng")
self.assertEqual(identity["details"], {"addresses": {"msisdn": {"+1234": {}}}, "default_addr_type": "msisdn"})
示例10
def test_should_post_changelog(self, mock_token):
def request_callback(request):
payload = json.loads(request.body)
self.assertEqual(payload["tag_name"], "v1.0.0")
self.assertEqual(payload["body"], "text")
self.assertEqual(payload["draft"], False)
self.assertEqual(payload["prerelease"], False)
self.assertEqual("token super-token", request.headers["Authorization"])
return 201, {}, json.dumps({})
responses.add_callback(
responses.POST,
self.url,
callback=request_callback,
content_type="application/json",
)
status = Github.post_release_changelog("relekang", "rmoq", "1.0.0", "text")
self.assertTrue(status)
示例11
def testRetryDelay():
global call
call = 0
def request_callback(request):
global call
if call == 0 :
call = 1
return (200, headers, '{"meta":{"code":999,"message":"PROBLEM","retryable":true, "details":[]},"data":{}}')
elif call == 1 :
call = 2
"second attempt"
return (200, headers, '{"meta":{"code":999,"message":"PROBLEM","retryable":true,"details":[]},"data":{}}')
elif call == 2 :
call = 3
return (200, headers, '{"meta":{"code":200,"message":"OK","details":[]},"data":{}}')
responses.add_callback(responses.GET, 'https://REGION-api.postmen.com/v3/labels', callback=request_callback)
api = Postmen('KEY', 'REGION')
api.get('labels')
responses.reset()
# TEST 9
示例12
def testRetryMaxAttempt(monkeypatch):
monkeypatch.setattr(time, 'sleep', lambda s: None)
global call
call = 0
def request_callback(request):
global call
if call < 4 :
call += 1
return (200, headers, '{"meta":{"code":999,"message":"PROBLEM","retryable":true, "details":[]},"data":{}}')
else :
return (200, headers, '{"meta":{"code":200,"message":"OK","details":[]},"data":{}}')
responses.add_callback(responses.GET, 'https://REGION-api.postmen.com/v3/labels', callback=request_callback)
api = Postmen('KEY', 'REGION')
before = time.time()
api.get('labels')
after = time.time()
responses.reset()
monkeypatch.setattr(time, 'sleep', lambda s: None)
# TEST 10
示例13
def testRetryMaxAttemptExceeded(monkeypatch):
monkeypatch.setattr(time, 'sleep', lambda s: None)
global call
call = 0
def request_callback(request):
global call
if call < 5 :
call += 1
return (200, headers, '{"meta":{"code":999,"message":"PROBLEM","retryable":true, "details":[]},"data":{}}')
else :
pytest.fail("Maximum 5 attempts of retry, test #10 failed")
responses.add_callback(responses.GET, 'https://REGION-api.postmen.com/v3/labels', callback=request_callback)
api = Postmen('KEY', 'REGION')
with pytest.raises(PostmenException) as e:
api.get('labels')
responses.reset()
monkeypatch.setattr(time, 'sleep', lambda s: None)
assert "PROBLEM" in str(e.value.message())
assert 999 == e.value.code()
# TEST 11
示例14
def testArguments11(monkeypatch):
monkeypatch.setattr(time, 'sleep', lambda s: None)
global call
call = 0
def request_callback(request):
global call
if call < 1 :
call += 1
return (200, headers, '{"meta":{"code":999,"message":"PROBLEM","retryable":true, "details":[]},"data":{}}')
else :
pytest.fail("Shall not retry if retry = False, test #11 failed")
responses.add_callback(responses.GET, 'https://REGION-api.postmen.com/v3/labels', callback=request_callback)
api = Postmen('KEY', 'REGION', retry=False)
with pytest.raises(PostmenException) as e:
api.get('labels')
responses.reset()
monkeypatch.setattr(time, 'sleep', lambda s: None)
assert "PROBLEM" in str(e.value.message())
assert 999 == e.value.code()
assert e.value.retryable()
# TEST 12
示例15
def testArgument12(monkeypatch):
monkeypatch.setattr(time, 'sleep', lambda s: None)
global call
call = 0
def request_callback(request):
global call
if call == 0 :
call = 1
return (200, headers, '{"meta":{"code":999,"message":"PROBLEM","retryable":false, "details":[]},"data":{}}')
elif call == 1 :
pytest.fail("Shall not retry if non retryable, test #12 failed")
responses.add_callback(responses.GET, 'https://REGION-api.postmen.com/v3/labels', callback=request_callback)
api = Postmen('KEY', 'REGION')
with pytest.raises(PostmenException) as e:
api.get('labels')
responses.reset()
# print(e)
assert "PROBLEM" in str(e.value.message())
assert not e.value.retryable()
monkeypatch.setattr(time, 'sleep', lambda s: None)
# TEST 13
示例16
def test_ingest_with_existing_banner_image(self):
TieredCache.dangerous_clear_all_tiers()
programs = self.mock_api()
for program_data in programs:
banner_image_url = program_data.get('banner_image_urls', {}).get('w1440h480')
if banner_image_url:
responses.add_callback(
responses.GET,
banner_image_url,
callback=mock_jpeg_callback(),
content_type=JPEG
)
self.loader.ingest()
# Verify the API was called with the correct authorization header
self.assert_api_called(6)
for program in programs:
self.assert_program_loaded(program)
self.assert_program_banner_image_loaded(program)
示例17
def test_token_expired_automatically_refresh_authentication(self):
self.first_call = True
def request_callback(request):
if self.first_call:
self.first_call = False
return (401, {'content_type': 'application/json'}, json.dumps({"error": "Token expired"}))
else:
self.first_call = None
return (201, {'content_type': 'application/json'}, '')
responses.add_callback(
responses.POST, self.wrapper.test().data,
callback=request_callback,
content_type='application/json',
)
response = self.wrapper.test().post()
# refresh_authentication method should be able to update api_params
self.assertEqual(response._api_params['token'], 'new_token')
示例18
def test_raises_error_if_refresh_authentication_method_returns_falsy_value(self):
client = FailTokenRefreshClient(token='token', refresh_token_by_default=True)
self.first_call = True
def request_callback(request):
if self.first_call:
self.first_call = False
return (401, {}, '')
else:
self.first_call = None
return (201, {}, '')
responses.add_callback(
responses.POST, client.test().data,
callback=request_callback,
content_type='application/json',
)
with self.assertRaises(ClientError):
client.test().post()
示例19
def test_stores_refresh_authentication_method_response_for_further_access(self):
self.first_call = True
def request_callback(request):
if self.first_call:
self.first_call = False
return (401, {}, '')
else:
self.first_call = None
return (201, {}, '')
responses.add_callback(
responses.POST, self.wrapper.test().data,
callback=request_callback,
content_type='application/json',
)
response = self.wrapper.test().post()
self.assertEqual(response().refresh_data, 'new_token')
示例20
def test_workflow_csrf_token():
""" Test a simple http get and the extraction of a css selector """
body_contact = b'<form><input type="hidden" name="csrf" id="csrf" value="token_1337"><input type="text" name="name"></form>'
body_success = b'<div id=error>You have an error in your SQL syntax</div>'
def request_callback_csrf(request):
""" Checks for a valid CSRF token (token_1337) and returns a fake 'sql injection success' message """
assert 'csrf=token_1337' in request.body
return (200, {}, body_success)
responses.add(responses.GET, 'http://test.com/contact',
body=body_contact, status=200,
content_type='text/html')
responses.add_callback(
responses.POST, 'http://test.com/contact',
callback=request_callback_csrf,
content_type='text/html',
)
blocks = explo.core.load_blocks(BLOCKS_CSRF)
assert explo.core.process_blocks(blocks)
示例21
def test_registry_session(tmpdir, registry, insecure, method, responses_method, config_content):
temp_dir = mkdtemp(dir=str(tmpdir))
with open(os.path.join(temp_dir, '.dockercfg'), 'w+') as dockerconfig:
dockerconfig.write(json.dumps({
registry_hostname(registry): config_content
}))
session = RegistrySession(registry, insecure=insecure, dockercfg_path=temp_dir)
assert session.insecure == insecure
assert session.dockercfg_path == temp_dir
path = '/v2/test/image/manifests/latest'
if registry.startswith('http'):
url = registry + path
elif insecure:
https_url = 'https://' + registry + path
responses.add(responses_method, https_url, body=requests.ConnectionError())
url = 'http://' + registry + path
else:
url = 'https://' + registry + path
def request_callback(request, all_headers=True):
assert request.headers.get('Authorization') is not None
return (200, {}, 'A-OK')
responses.add_callback(responses_method, url, request_callback)
res = method(session, path)
assert res.text == 'A-OK'
示例22
def test_create_compose(odcs_client, source, source_type, packages, expected_packages, sigkeys,
modular_koji_tags, expected_koji_tags,
arches, flags, multilib_arches, multilib_method, expected_method):
def handle_composes_post(request):
assert_request_token(request, odcs_client.session)
if isinstance(request.body, six.text_type):
body = request.body
else:
body = request.body.decode()
body_json = json.loads(body)
assert body_json['source']['type'] == source_type
assert body_json['source']['source'] == source
assert body_json['source'].get('packages') == expected_packages
assert body_json['source'].get('sigkeys') == sigkeys
assert body_json['source'].get('modular_koji_tags') == expected_koji_tags
assert body_json.get('flags') == flags
assert body_json.get('arches') == arches
assert body_json.get('multilib_arches') == multilib_arches
if expected_method is not None:
assert sorted(body_json.get('multilib_method')) == sorted(expected_method)
else:
assert 'multilib_method' not in body_json
return (200, {}, compose_json(0, 'wait', source_type=source_type, source=source))
responses.add_callback(responses.POST, '{}composes/'.format(ODCS_URL),
content_type='application/json',
callback=handle_composes_post)
odcs_client.start_compose(source_type=source_type, source=source, packages=packages,
sigkeys=sigkeys, arches=arches, flags=flags,
multilib_arches=multilib_arches, multilib_method=multilib_method,
modular_koji_tags=modular_koji_tags)
示例23
def test_wait_for_compose(odcs_client, final_state_id, final_state_name, expect_exc, state_reason):
state = {'count': 1}
def handle_composes_get(request):
assert_request_token(request, odcs_client.session)
if state['count'] == 1:
response_json = compose_json(1, 'generating')
else:
response_json = compose_json(final_state_id, final_state_name,
state_reason=state_reason)
state['count'] += 1
return (200, {}, response_json)
responses.add_callback(responses.GET, '{}composes/{}'.format(ODCS_URL, COMPOSE_ID),
content_type='application/json',
callback=handle_composes_get)
(flexmock(time)
.should_receive('sleep')
.and_return(None))
if expect_exc:
with pytest.raises(RuntimeError) as exc_info:
odcs_client.wait_for_compose(COMPOSE_ID)
assert expect_exc in str(exc_info.value)
else:
odcs_client.wait_for_compose(COMPOSE_ID)
示例24
def test_renew_compose(odcs_client):
new_compose_id = COMPOSE_ID + 1
def handle_composes_patch(request):
assert_request_token(request, odcs_client.session)
return (200, {}, compose_json(0, 'generating', compose_id=new_compose_id))
responses.add_callback(responses.PATCH, '{}composes/{}'.format(ODCS_URL, COMPOSE_ID),
content_type='application/json',
callback=handle_composes_patch)
odcs_client.renew_compose(COMPOSE_ID)
odcs_client.renew_compose(COMPOSE_ID, ['SIGKEY1', 'SIGKEY2'])
示例25
def test_request_sources(additional_params, caplog):
response_data = {'id': CACHITO_REQUEST_ID}
def handle_request_sources(http_request):
body_json = json.loads(http_request.body)
assert body_json['repo'] == CACHITO_REQUEST_REPO
assert body_json['ref'] == CACHITO_REQUEST_REF
for key, value in additional_params.items():
if value is not None:
assert body_json[key] == value
else:
assert key not in body_json
return (201, {}, json.dumps(response_data))
responses.add_callback(
responses.POST,
'{}/api/v1/requests'.format(CACHITO_URL),
content_type='application/json',
callback=handle_request_sources)
api = CachitoAPI(CACHITO_URL)
response = api.request_sources(CACHITO_REQUEST_REPO, CACHITO_REQUEST_REF, **additional_params)
assert response['id'] == CACHITO_REQUEST_ID
response_json = 'Cachito response:\n{}'.format(json.dumps(response_data, indent=4))
# Since Python 3.7 logger adds additional whitespaces by default -> checking without them
assert re.sub(r'\s+', " ", response_json) in re.sub(r'\s+', " ", caplog.text)
示例26
def test_wait_for_unsuccessful_request(error_state, error_reason, caplog):
states = ['in_progress', 'in_progress', error_state]
expected_total_responses_calls = len(states)
def handle_wait_for_request(http_request):
state = states.pop(0)
return (200, {}, json.dumps({'state_reason': error_reason,
'repo': CACHITO_REQUEST_REPO,
'state': state,
'ref': CACHITO_REQUEST_REF,
'id': CACHITO_REQUEST_ID
}))
responses.add_callback(
responses.GET,
'{}/api/v1/requests/{}'.format(CACHITO_URL, CACHITO_REQUEST_ID),
content_type='application/json',
callback=handle_wait_for_request)
burst_params = {'burst_retry': 0.001, 'burst_length': 0.5}
with pytest.raises(CachitoAPIUnsuccessfulRequest):
CachitoAPI(CACHITO_URL).wait_for_request(CACHITO_REQUEST_ID, **burst_params)
assert len(responses.calls) == expected_total_responses_calls
failed_response_json = json.dumps(
{'state_reason': error_reason,
'repo': CACHITO_REQUEST_REPO,
'state': error_state,
'ref': CACHITO_REQUEST_REF,
'id': CACHITO_REQUEST_ID
},
indent=4
)
expect_in_logs = dedent(
"""\
Request {} is in "{}" state: {}
Details: {}
"""
).format(CACHITO_REQUEST_ID, error_state, error_reason, failed_response_json)
# Since Python 3.7 logger adds additional whitespaces by default -> checking without them
assert re.sub(r'\s+', " ", expect_in_logs) in re.sub(r'\s+', " ", caplog.text)
示例27
def test_check_CachitoAPIUnsuccessfulRequest_text(error_state, error_reason, caplog):
states = ['in_progress', 'in_progress', error_state]
expected_total_responses_calls = len(states)
cachito_request_url = '{}/api/v1/requests/{}'.format(CACHITO_URL, CACHITO_REQUEST_ID)
def handle_wait_for_request(http_request):
state = states.pop(0)
return (200, {}, json.dumps({'state_reason': error_reason,
'repo': CACHITO_REQUEST_REPO,
'state': state,
'ref': CACHITO_REQUEST_REF,
'id': CACHITO_REQUEST_ID
}))
responses.add_callback(
responses.GET,
'{}/api/v1/requests/{}'.format(CACHITO_URL, CACHITO_REQUEST_ID),
content_type='application/json',
callback=handle_wait_for_request)
burst_params = {'burst_retry': 0.001, 'burst_length': 0.5}
expected_exc_text = dedent('''\
Cachito request is in "{}" state, reason: {}
Request {} ({}) tried to get repo '{}' at reference '{}'.
'''.format(error_state, error_reason, CACHITO_REQUEST_ID,
cachito_request_url, CACHITO_REQUEST_REPO,
CACHITO_REQUEST_REF))
with pytest.raises(CachitoAPIUnsuccessfulRequest) as excinfo:
CachitoAPI(CACHITO_URL).wait_for_request(CACHITO_REQUEST_ID, **burst_params)
assert len(responses.calls) == expected_total_responses_calls
assert expected_exc_text in str(excinfo.value)
示例28
def test_token_negotiation(self, repo_doesnt_exist_401, auth_b64, username, password,
basic_auth):
def bearer_realm_callback(request):
# Verify if username and password were provided, token is negotiated
# with realm via basic auth.
if basic_auth:
creds = auth_b64 or b64encode(username, password)
assert request.headers['authorization'] == 'Basic {}'.format(creds)
else:
assert 'authorization' not in request.headers
return (200, {}, json.dumps({'token': BEARER_TOKEN}))
responses.add_callback(responses.GET, BEARER_REALM_URL + '?scope=repository:fedora:pull',
callback=bearer_realm_callback, match_querystring=True)
url = 'https://registry.example.com/v2/fedora/tags/list'
responses.add_callback(responses.GET, url, callback=bearer_unauthorized_callback)
if repo_doesnt_exist_401:
responses.add_callback(responses.GET, url, callback=bearer_unauthorized_callback)
else:
responses.add_callback(responses.GET, url, callback=bearer_success_callback)
auth = HTTPBearerAuth(username=username, password=password, auth_b64=auth_b64)
response = requests.get(url, auth=auth)
if repo_doesnt_exist_401:
assert response.json() == 'unauthorized'
assert response.status_code == requests.codes.not_found
else:
assert response.json() == 'success'
assert len(responses.calls) == 3
示例29
def test_repo_extracted_from_url(self, partial_url, repo):
responses.add(responses.GET, '{}?scope=repository:{}:pull'.format(BEARER_REALM_URL, repo),
json={'token': BEARER_TOKEN}, match_querystring=True)
repo_url = 'https://registry.example.com/v2/{}/{}'.format(repo, partial_url)
responses.add_callback(responses.GET, repo_url, callback=bearer_unauthorized_callback)
responses.add(responses.GET, repo_url, status=200, json='success')
auth = HTTPBearerAuth()
assert requests.get(repo_url, auth=auth).json() == 'success'
示例30
def test_v2(self, auth_b64, username, password, auth_type):
bearer_token = 'the-bearer-token'
realm_url = 'https://registry.example.com/v2/auth'
responses.add(responses.GET, '{}?scope=repository:fedora:pull'.format(realm_url),
json={'token': bearer_token}, match_querystring=True)
def auth_callback(request):
if auth_type == 'basic':
creds = auth_b64 or b64encode(username, password)
assert request.headers['authorization'] == 'Basic {}'.format(creds)
return (200, {}, json.dumps('success'))
if auth_type == 'bearer':
header_value = 'Bearer {}'.format(bearer_token)
if header_value != request.headers.get('authorization'):
auth_headers = {'www-authenticate': 'Bearer realm={}'.format(realm_url)}
return (401, auth_headers, json.dumps('unauthorized'))
else:
return (200, {}, json.dumps('success'))
assert 'authorization' not in request.headers
return (200, {}, json.dumps('success'))
repo_url = 'https://registry.example.com/v2/fedora/tags/list'
responses.add_callback(responses.GET, repo_url, callback=auth_callback)
auth = HTTPRegistryAuth(username=username, password=password, auth_b64=auth_b64)
assert requests.get(repo_url, auth=auth).json() == 'success'
if (username and password) or auth_b64:
assert len(auth.v2_auths) == 2
assert isinstance(auth.v2_auths[0], HTTPBearerAuth)
assert isinstance(auth.v2_auths[1], (HTTPBasicAuth, HTTPBasicAuthWithB64))
else:
assert len(auth.v2_auths) == 1
assert isinstance(auth.v2_auths[0], HTTPBearerAuth)