Python源码示例:responses.POST
示例1
def test_client_get_nodes_for_filter_post():
node_list = ['node1', 'node2']
host = 'foo.bar.baz'
responses.add(
responses.POST,
"http://%s/api/v1.0/nodefilter" % (host),
json=node_list,
status=200)
dd_ses = dc_session.DrydockSession(host)
dd_client = dc_client.DrydockClient(dd_ses)
design_ref = {'ref': 'hello'}
validation_resp = dd_client.get_nodes_for_filter(design_ref)
assert 'node1' in validation_resp
assert 'node2' in validation_resp
示例2
def test_client_validate_design_post():
validation = {'status': 'success'}
host = 'foo.bar.baz'
responses.add(
responses.POST,
"http://%s/api/v1.0/validatedesign" % (host),
json=validation,
status=200)
dd_ses = dc_session.DrydockSession(host)
dd_client = dc_client.DrydockClient(dd_ses)
validation_resp = dd_client.validate_design('href-placeholder')
assert validation_resp['status'] == validation['status']
示例3
def test_post(patch1, patch2):
"""
Test post functionality
"""
responses.add(
responses.POST,
'http://promhost:80/api/v1.0/node-label/n1',
body='{"key1":"label1"}',
status=200)
prom_session = PromenadeSession()
result = prom_session.post(
'v1.0/node-label/n1', body='{"key1":"label1"}', timeout=(60, 60))
assert PROM_HOST == prom_session.host
assert result.status_code == 200
示例4
def test_invalid_confidentials_should_raise():
responses.add(
responses.POST,
"https://api.twitter.com/oauth2/token",
json={
"errors": [
{
"code": 99,
"message": "Unable to verify your credentials",
"label": "authenticity_token_error",
}
]
},
status=403,
)
with pytest.raises(e.BearerTokenNotFetchedError):
_ = twt_img.Downloader("my api key", "my api secret")
示例5
def test_cli_mapmatching():
responses.add(
responses.POST,
'https://api.mapbox.com/matching/v4/mapbox.cycling.json?gps_precision=4&access_token=bogus',
match_querystring=True,
body="", status=200,
content_type='application/json')
runner = CliRunner()
result = runner.invoke(
main_group,
['--access-token', 'bogus',
"mapmatching",
"--gps-precision", "4",
"--profile", "mapbox.cycling",
"tests/line_feature.geojson"])
assert result.exit_code == 0
示例6
def mock_response(command, **kvargs):
resp = RAIBIRD_COMMANDS["ControllerResponses"][command]
data = command + ("00" * (resp["length"] - 1))
for k in resp:
if k in ["type", "length"]:
continue
param_template = "%%0%dX" % (resp[k]["length"])
start_ = resp[k]["position"]
end_ = start_ + resp[k]["length"]
data = "%s%s%s" % (
data[:start_],
(param_template % kvargs[k]),
data[end_:],
)
responses.add(
responses.POST,
"http://%s/stick" % MOCKED_RAINBIRD_URL,
body=encrypt(
(u'{"jsonrpc": "2.0", "result": {"data":"%s"}, "id": 1} ' % data),
MOCKED_PASSWORD,
),
content_type="application/octet-stream",
)
示例7
def test_simple_geolocate(self):
responses.add(
responses.POST,
"https://www.googleapis.com/geolocation/v1/geolocate",
body='{"location": {"lat": 51.0,"lng": -0.1},"accuracy": 1200.4}',
status=200,
content_type="application/json",
)
results = self.client.geolocate()
self.assertEqual(1, len(responses.calls))
self.assertURLEqual(
"https://www.googleapis.com/geolocation/v1/geolocate?" "key=%s" % self.key,
responses.calls[0].request.url,
)
示例8
def test_request_sources_error(status_code, error, error_body, caplog):
responses.add(
responses.POST,
'{}/api/v1/requests'.format(CACHITO_URL),
content_type='application/json',
body=error_body,
status=status_code,
)
with pytest.raises(error):
CachitoAPI(CACHITO_URL).request_sources(CACHITO_REQUEST_REPO, CACHITO_REQUEST_REF)
try:
response_data = json.loads(error_body)
except ValueError: # json.JSONDecodeError in py3
assert 'Cachito response' not in caplog.text
else:
response_json = 'Cachito response:\n{}'.format(json.dumps(response_data, indent=4))
# Since Python 3.7 logger adds additional whitespaces by default -> checking without them
assert re.sub(r'\s+', " ", response_json) in re.sub(r'\s+', " ", caplog.text)
示例9
def test_create_identity(self):
"""
The create_identity method should make the correct request to create an identity with the correct details in the
identity store.
"""
identity_store = IdentityStore("http://identitystore.org/", "auth-token", "msisdn")
url = "http://identitystore.org/api/v1/identities/"
responses.add_callback(
responses.POST,
url,
callback=self.create_identity_callback,
match_querystring=True,
content_type="application/json",
)
identity = identity_store.create_identity(["tel:+1234"], name="Test identity", language="eng")
self.assertEqual(identity["details"], {"addresses": {"msisdn": {"+1234": {}}}, "default_addr_type": "msisdn"})
示例10
def test_okta_auth_value_error(
self,
mock_print_tty,
mock_makedirs
):
responses.add(
responses.POST,
'https://organization.okta.com/api/v1/authn',
body="NOT JSON",
status=500
)
with self.assertRaises(SystemExit):
Okta(
user_name="user_name",
user_pass="user_pass",
organization="organization.okta.com"
)
print_tty_calls = [
call("Error: Status Code: 500"),
call("Error: Invalid JSON")
]
mock_print_tty.assert_has_calls(print_tty_calls)
示例11
def test_okta_connection_timeout(
self,
mock_print_tty,
mock_makedirs,
mock_open,
mock_chmod
):
responses.add(
responses.POST,
'https://organization.okta.com/api/v1/authn',
body=ConnectTimeout()
)
with self.assertRaises(SystemExit):
Okta(
user_name="user_name",
user_pass="user_pass",
organization="organization.okta.com"
)
print_tty_calls = [
call("Error: Timed Out")
]
mock_print_tty.assert_has_calls(print_tty_calls)
示例12
def test_should_post_changelog(self, mock_token):
def request_callback(request):
payload = json.loads(request.body)
self.assertEqual(payload["tag_name"], "v1.0.0")
self.assertEqual(payload["body"], "text")
self.assertEqual(payload["draft"], False)
self.assertEqual(payload["prerelease"], False)
self.assertEqual("token super-token", request.headers["Authorization"])
return 201, {}, json.dumps({})
responses.add_callback(
responses.POST,
self.url,
callback=request_callback,
content_type="application/json",
)
status = Github.post_release_changelog("relekang", "rmoq", "1.0.0", "text")
self.assertTrue(status)
示例13
def test_should_return_false_status_if_it_failed(self):
responses.add(
responses.POST,
self.url,
status=400,
body="{}",
content_type="application/json",
)
responses.add(
responses.GET,
self.get_url,
status=200,
body='{"id": 1}',
content_type="application/json",
)
responses.add(
responses.POST,
self.edit_url,
status=400,
body="{}",
content_type="application/json",
)
self.assertFalse(
Github.post_release_changelog("relekang", "rmoq", "1.0.0", "text")
)
示例14
def test_add_txt_record(dnspod):
responses.add(
responses.POST, "https://dnsapi.cn/Record.Create",
json={"status": {"code": "1"}}
)
dnspod.add_txt_record(RECORD_NAME, RECORD_VALUE)
expected = get_params({
"record_type": "TXT",
"record_line": "默认",
"ttl": str(TTL),
"domain": DOMAIN,
"sub_domain": SUB_DOMAIN,
"value": RECORD_VALUE
})
assert len(responses.calls) == 1
assert parse_data(responses.calls[0].request.body) == expected
示例15
def test_remove_txt_record(dnspod):
responses.add(
responses.POST, "https://dnsapi.cn/Record.List",
json={
"status": {"code": "1"},
"records": [
{"id": "err-0", "type": "A", "value": RECORD_VALUE},
{"id": "err-1", "type": "TXT", "value": "some-value"},
{"id": RECORD_ID, "type": "TXT", "value": RECORD_VALUE}
]
}
)
responses.add(
responses.POST, "https://dnsapi.cn/Record.Remove",
json={"status": {"code": "1"}}
)
dnspod.remove_txt_record(RECORD_NAME, RECORD_VALUE)
assert len(responses.calls) == 2
expected_list = get_params({"domain": DOMAIN, "sub_domain": SUB_DOMAIN})
assert parse_data(responses.calls[0].request.body) == expected_list
expected_remove = get_params({"domain": DOMAIN, "record_id": RECORD_ID})
assert parse_data(responses.calls[1].request.body), expected_remove
示例16
def test_move_txt_record_error_during_delete(dnspod):
responses.add(
responses.POST, "https://dnsapi.cn/Record.List",
json={
"status": {"code": "1"},
"records": [
{"id": RECORD_ID, "type": "TXT", "value": RECORD_VALUE}
]
}
)
responses.add(
responses.POST, "https://dnsapi.cn/Record.Remove", json=ERROR
)
dnspod.remove_txt_record(RECORD_NAME, RECORD_VALUE)
assert len(responses.calls) == 2
expected_list = get_params({"domain": DOMAIN, "sub_domain": SUB_DOMAIN})
assert parse_data(responses.calls[0].request.body) == expected_list
expected_remove = get_params({"domain": DOMAIN, "record_id": RECORD_ID})
assert parse_data(responses.calls[1].request.body) == expected_remove
示例17
def test_send_post_request_includes_given_files(self):
self.setup_responses(
method=responses.POST,
url='https://retdec.com/service/api'
)
conn = APIConnection('https://retdec.com/service/api', 'KEY')
files = {'input': ('test.c', io.StringIO('main()'))}
conn.send_post_request(files=files)
body = str(responses.calls[0].request.body)
self.assertIn(
'Content-Disposition: form-data; name="input"; filename="test.c"',
body
)
self.assertIn('main()', body)
示例18
def test_create_video_refresh_error(self) -> None:
responses.add(
responses.POST,
"https://zoom.us/oauth/token",
json={"access_token": "token", "expires_in": -60},
)
response = self.client_get(
"/calls/zoom/complete",
{"code": "code", "state": '{"realm":"zulip","sid":""}'},
)
self.assertEqual(response.status_code, 200)
responses.replace(responses.POST, "https://zoom.us/oauth/token", status=400)
response = self.client_post("/json/calls/zoom/create")
self.assert_json_error(response, "Invalid Zoom access token")
示例19
def test_deauthorize_zoom_user(self) -> None:
responses.add(responses.POST, "https://api.zoom.us/oauth/data/compliance")
response = self.client_post(
"/calls/zoom/deauthorize",
"""\
{
"event": "app_deauthorized",
"payload": {
"user_data_retention": "false",
"account_id": "EabCDEFghiLHMA",
"user_id": "z9jkdsfsdfjhdkfjQ",
"signature": "827edc3452044f0bc86bdd5684afb7d1e6becfa1a767f24df1b287853cf73000",
"deauthorization_time": "2019-06-17T13:52:28.632Z",
"client_id": "ADZ9k9bTWmGUoUbECUKU_a"
}
}
""",
content_type="application/json",
)
self.assert_json_success(response)
示例20
def test_get_work(self):
responses.add(
responses.POST,
self.anilist.BASE_URL,
body=self.read_fixture('anilist/hibike_euphonium.json'),
status=200,
content_type='application/json'
)
hibike_by_id = self.anilist.get_work(search_id=20912)
hibike_by_title = self.anilist.get_work(search_title="Hibike")
hibike_by_id_and_title = self.anilist.get_work(search_id=20912, search_title="Hibike")
hibike = hibike_by_id_and_title
self.assertEqual(hibike, hibike_by_id)
self.assertEqual(hibike, hibike_by_title)
示例21
def test_add_alert_policy(self):
"""
Check that the add_alert_policy function adds an alert
"""
url = '{}.json'.format(newrelic.ALERTS_POLICIES_API_URL)
policy_name = 'Test'
policy_id = 1
response_json = {
'policy': {
'id': policy_id
}
}
responses.add(responses.POST, url, json=response_json, status=201)
self.assertEqual(newrelic.add_alert_policy(policy_name), policy_id)
self.assertEqual(len(responses.calls), 1)
request_json = json.loads(responses.calls[0].request.body.decode())
request_headers = responses.calls[0].request.headers
self.assertEqual(request_headers['x-api-key'], 'admin-api-key')
self.assertEqual(request_json, {
'policy': {
'incident_preference': 'PER_POLICY',
'name': policy_name
}
})
示例22
def test(self):
responses.add(
responses.POST,
"http://foo",
status=200,
body='{"jsonrpc": "2.0", "result": 5, "id": 1}',
)
HTTPClient("http://foo").send_message(
str(Request("foo")), response_expected=True
)
示例23
def test_non_2xx_response_error(self):
responses.add(responses.POST, "http://foo", status=400)
with pytest.raises(ReceivedNon2xxResponseError):
HTTPClient("http://foo").request("foo")
示例24
def downloader():
responses.add(
responses.POST,
"https://api.twitter.com/oauth2/token",
json={"access_token": "loremipsum"},
status=200,
)
downloader = twt_img.Downloader("key", "secret")
return downloader
示例25
def test_create_policy(nomad_setup):
responses.add(
responses.POST,
"http://{ip}:{port}/v1/sentinel/policy/my-policy".format(ip=common.IP, port=common.NOMAD_PORT),
status=200
)
policy_example = '{"Name": "my-policy", "Description": "This is a great policy", "Scope": "submit-job", "EnforcementLevel": "advisory", "Policy": "main = rule { true }"}'
json_policy = json.loads(policy_example)
nomad_setup.sentinel.create_policy(id="my-policy", policy=json_policy)
示例26
def test_update_policy(nomad_setup):
responses.add(
responses.POST,
"http://{ip}:{port}/v1/sentinel/policy/my-policy".format(ip=common.IP, port=common.NOMAD_PORT),
status=200
)
policy_example = '{"Name": "my-policy", "Description": "Update", "Scope": "submit-job", "EnforcementLevel": "advisory", "Policy": "main = rule { true }"}'
json_policy = json.loads(policy_example)
nomad_setup.sentinel.update_policy(id="my-policy", policy=json_policy)
示例27
def test_create_namespace(nomad_setup):
responses.add(
responses.POST,
"http://{ip}:{port}/v1/namespace".format(ip=common.IP, port=common.NOMAD_PORT),
status=200
)
namespace_api = '{"Name":"api","Description":"api server namespace"}'
namespace = json.loads(namespace_api)
nomad_setup.namespace.create_namespace(namespace)
示例28
def test_update_namespace(nomad_setup):
responses.add(
responses.POST,
"http://{ip}:{port}/v1/namespace/api".format(ip=common.IP, port=common.NOMAD_PORT),
status=200
)
namespace_api = '{"Name":"api","Description":"updated namespace"}'
namespace = json.loads(namespace_api)
nomad_setup.namespace.update_namespace("api", namespace)
示例29
def test_cli_dataset_create_noargs():
created = """
{{"owner":"{username}",
"id":"cii9dtexw0039uelz7nzk1lq3",
"name":null,
"description":null,
"created":"2015-12-16T22:20:38.847Z",
"modified":"2015-12-16T22:20:38.847Z"}}
""".format(username=username)
created = "".join(created.split())
responses.add(
responses.POST,
'https://api.mapbox.com/datasets/v1/{0}?access_token={1}'.format(username, access_token),
match_querystring=True,
body=created, status=200,
content_type='application/json')
runner = CliRunner()
result = runner.invoke(
main_group,
['--access-token', access_token,
'datasets',
'create'])
assert result.exit_code == 0
assert result.output.strip() == created.strip()
示例30
def test_cli_dataset_create_withargs():
name = "the-name"
description = "the-description"
created = """
{{"owner":"{username}",
"id":"cii9dtexw0039uelz7nzk1lq3",
"name":{name},
"description":{description},
"created":"2015-12-16T22:20:38.847Z",
"modified":"2015-12-16T22:20:38.847Z"}}
""".format(username=username, name=name, description=description)
created = "".join(created.split())
responses.add(
responses.POST,
'https://api.mapbox.com/datasets/v1/{0}?access_token={1}'.format(username, access_token),
match_querystring=True,
body=created, status=200,
content_type='application/json')
runner = CliRunner()
result = runner.invoke(
main_group,
['--access-token', access_token,
'datasets',
'create',
'--name', name,
'-d', description])
assert result.exit_code == 0
assert result.output.strip() == created.strip()