Python源码示例:responses.HEAD
示例1
def test_cached_path_extract_remote_tar(self):
url = "http://fake.datastore.com/utf-8.tar.gz"
byt = open(self.tar_file, "rb").read()
responses.add(
responses.GET,
url,
body=byt,
status=200,
content_type="application/tar+gzip",
stream=True,
headers={"Content-Length": str(len(byt))},
)
responses.add(
responses.HEAD, url, status=200, headers={"ETag": "fake-etag"},
)
extracted = cached_path(url, cache_dir=self.TEST_DIR, extract_archive=True)
assert extracted.endswith("-extracted")
self.check_extracted(extracted)
示例2
def test_cached_path_extract_remote_zip(self):
url = "http://fake.datastore.com/utf-8.zip"
byt = open(self.zip_file, "rb").read()
responses.add(
responses.GET,
url,
body=byt,
status=200,
content_type="application/zip",
stream=True,
headers={"Content-Length": str(len(byt))},
)
responses.add(
responses.HEAD, url, status=200, headers={"ETag": "fake-etag"},
)
extracted = cached_path(url, cache_dir=self.TEST_DIR, extract_archive=True)
assert extracted.endswith("-extracted")
self.check_extracted(extracted)
示例3
def mock_component_taskcluster_artifact() -> None:
responses.add(
responses.HEAD,
"https://firefox-ci-tc.services.mozilla.com/api/index/v1/task/gecko.v2.mozilla-central.latest.source.source-bugzilla-info/artifacts/public/components.json",
status=200,
headers={"ETag": "100"},
)
responses.add(
responses.GET,
"https://firefox-ci-tc.services.mozilla.com/api/index/v1/task/gecko.v2.mozilla-central.latest.source.source-bugzilla-info/artifacts/public/components.json",
status=200,
json={},
)
repository.download_component_mapping()
示例4
def test_download_missing(tmp_path, mock_zst):
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.zst"
url_version = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.version"
db_path = tmp_path / "prova.json"
db.register(db_path, url, 1)
responses.add(
responses.HEAD,
url,
status=404,
headers={"ETag": "123", "Accept-Encoding": "zstd"},
)
responses.add(
responses.GET, url, status=404, body=requests.exceptions.HTTPError("HTTP error")
)
responses.add(responses.GET, url_version, status=404)
assert not db.download(db_path)
assert not os.path.exists(db_path)
with pytest.raises(Exception, match="Last-Modified is not available"):
db.last_modified(db_path)
示例5
def test_trainer():
# Pretend the DB was already downloaded and no new DB is available.
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_bugs.latest/artifacts/public/bugs.json"
responses.add(
responses.GET,
f"{url}.version",
status=200,
body=str(db.DATABASES[bugzilla.BUGS_DB]["version"]),
)
responses.add(
responses.HEAD, f"{url}.zst", status=200, headers={"ETag": "etag"},
)
trainer.Trainer().go(trainer.parse_args(["defect"]))
示例6
def test_download_check_missing():
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"
responses.add(
responses.HEAD,
url,
status=404,
headers={"ETag": "123", "Last-Modified": "2019-04-16",},
)
responses.add(
responses.GET, url, status=404, body=requests.exceptions.HTTPError("HTTP error")
)
with pytest.raises(requests.exceptions.HTTPError, match="HTTP error"):
utils.download_check_etag(url)
assert not os.path.exists("prova.txt")
示例7
def test_lazy_git():
lazy_git = LazyGit(git_url=DOCKERFILE_GIT)
with lazy_git:
assert lazy_git.git_path is not None
assert lazy_git.commit_id is not None
assert len(lazy_git.commit_id) == 40 # current git hashes are this long
previous_commit_id = lazy_git.commit_id
lazy_git.reset('HEAD~2') # Go back two commits
assert lazy_git.commit_id is not None
assert lazy_git.commit_id != previous_commit_id
assert len(lazy_git.commit_id) == 40 # current git hashes are this long
示例8
def __init__(self, registry):
self.hostname = registry_hostname(registry)
self.repos = {}
self._add_pattern(responses.GET, r'/v2/(.*)/manifests/([^/]+)',
self._get_manifest)
self._add_pattern(responses.HEAD, r'/v2/(.*)/manifests/([^/]+)',
self._get_manifest)
self._add_pattern(responses.PUT, r'/v2/(.*)/manifests/([^/]+)',
self._put_manifest)
self._add_pattern(responses.GET, r'/v2/(.*)/blobs/([^/]+)',
self._get_blob)
self._add_pattern(responses.HEAD, r'/v2/(.*)/blobs/([^/]+)',
self._get_blob)
self._add_pattern(responses.POST, r'/v2/(.*)/blobs/uploads/\?mount=([^&]+)&from=(.+)',
self._mount_blob)
示例9
def _add_pattern(self, method, pattern, callback):
pat = re.compile(r'^https://' + self.hostname + pattern + '$')
def do_it(req):
status, headers, body = callback(req, *(pat.match(req.url).groups()))
if method == responses.HEAD:
return status, headers, ''
else:
return status, headers, body
responses.add_callback(method, pat, do_it, match_querystring=True)
示例10
def _add_pattern(self, method, pattern, callback):
pat = re.compile(r'^https://' + self.hostname + pattern + '$')
def do_it(req):
status, headers, body = callback(req, *(pat.match(req.url).groups()))
if method == responses.HEAD:
return status, headers, ''
else:
return status, headers, body
responses.add_callback(method, pat, do_it, match_querystring=True)
示例11
def set_up_glove(url: str, byt: bytes, change_etag_every: int = 1000):
# Mock response for the datastore url that returns glove vectors
responses.add(
responses.GET,
url,
body=byt,
status=200,
content_type="application/gzip",
stream=True,
headers={"Content-Length": str(len(byt))},
)
etags_left = change_etag_every
etag = "0"
def head_callback(_):
"""
Writing this as a callback allows different responses to different HEAD requests.
In our case, we're going to change the ETag header every `change_etag_every`
requests, which will allow us to simulate having a new version of the file.
"""
nonlocal etags_left, etag
headers = {"ETag": etag}
# countdown and change ETag
etags_left -= 1
if etags_left <= 0:
etags_left = change_etag_every
etag = str(int(etag) + 1)
return (200, headers, "")
responses.add_callback(responses.HEAD, url, callback=head_callback)
示例12
def test_is_not_active_404(pydrill_instance):
"""
:type pydrill_instance: pydrill.client.PyDrill
"""
responses.add(**{
'method': responses.HEAD,
'url': 'http://localhost:8047/',
'content_type': 'application/json',
'status': 404,
})
assert pydrill_instance.is_active() is False
示例13
def test_is_not_active_500(pydrill_instance, pydrill_url):
"""
:type pydrill_instance: pydrill.client.PyDrill
"""
responses.add(**{
'method': responses.HEAD,
'url': pydrill_url,
'content_type': 'application/json',
'status': 500,
})
assert pydrill_instance.is_active() is False
示例14
def test_is_not_active_201(pydrill_instance, pydrill_url):
"""
:type pydrill_instance: pydrill.client.PyDrill
"""
responses.add(**{
'method': responses.HEAD,
'url': pydrill_url,
'content_type': 'application/json',
'status': 201,
})
assert pydrill_instance.is_active() is False
示例15
def test_is_not_active_timeout(pydrill_instance):
"""
:type pydrill_instance: pydrill.client.PyDrill
"""
try:
pydrill_instance.perform_request('HEAD', '/', params={'request_timeout': 0.00001})
except TransportError as e:
assert e.status_code == e.args[0]
assert e.error == e.args[1]
assert e.info == e.args[2]
assert str(e)
else:
assert False
# TODO: create more tests checking other params.
示例16
def set_up_glove(url , byt , change_etag_every = 1000):
# Mock response for the datastore url that returns glove vectors
responses.add(
responses.GET,
url,
body=byt,
status=200,
content_type=u'application/gzip',
stream=True,
headers={u'Content-Length': unicode(len(byt))}
)
etags_left = change_etag_every
etag = u"0"
def head_callback(_):
u"""
Writing this as a callback allows different responses to different HEAD requests.
In our case, we're going to change the ETag header every `change_etag_every`
requests, which will allow us to simulate having a new version of the file.
"""
nonlocal etags_left, etag
headers = {u"ETag": etag}
# countdown and change ETag
etags_left -= 1
if etags_left <= 0:
etags_left = change_etag_every
etag = unicode(int(etag) + 1)
return (200, headers, u"")
responses.add_callback(
responses.HEAD,
url,
callback=head_callback
)
示例17
def mock_dataset_head():
"""A pytest fixture that creates a dataset in a temp working dir. Deletes directory after test"""
with responses.RequestsMock() as rsps:
rsps.add(responses.HEAD, 'https://api.gigantum.com/object-v1/tester/dataset-1',
headers={'x-access-level': 'a'}, status=200)
yield
示例18
def mock_dataset_head():
"""A pytest fixture that creates a dataset in a temp working dir. Deletes directory after test"""
with responses.RequestsMock() as rsps:
rsps.add(responses.HEAD, 'https://api.gigantum.com/object-v1/tester/dataset-1',
headers={'x-access-level': 'a'}, status=200)
yield
示例19
def test_prepare_push_errors(self, mock_dataset_with_cache_dir):
sb = get_storage_backend("gigantum_object_v1")
ds = mock_dataset_with_cache_dir[0]
responses.add(responses.HEAD, f"{sb._object_service_endpoint(ds)}/{ds.namespace}/{ds.name}",
headers={'x-access-level': 'r'}, status=200)
responses.add(responses.HEAD, f"{sb._object_service_endpoint(ds)}/{ds.namespace}/{ds.name}",
headers={}, status=200)
with pytest.raises(ValueError):
sb.prepare_push(ds, [])
sb.configuration['username'] = "test-user"
with pytest.raises(ValueError):
sb.prepare_push(ds, [])
sb.configuration['gigantum_bearer_token'] = "asdf"
with pytest.raises(ValueError):
sb.prepare_push(ds, [])
sb.configuration['gigantum_id_token'] = "1234"
with pytest.raises(IOError):
# need more than read access to push
sb.prepare_push(ds, [])
with pytest.raises(IOError):
# should not return a header
sb.prepare_push(ds, [])
示例20
def test_prepare_push(self, mock_dataset_with_cache_dir):
sb = get_storage_backend("gigantum_object_v1")
ds = mock_dataset_with_cache_dir[0]
responses.add(responses.HEAD, f"{sb._object_service_endpoint(ds)}/{ds.namespace}/{ds.name}",
headers={'x-access-level': 'a'}, status=200)
sb.set_default_configuration("test-user", "abcd", '1234')
sb.prepare_push(ds, [])
示例21
def test_prepare_pull(self, mock_dataset_with_cache_dir):
sb = get_storage_backend("gigantum_object_v1")
ds = mock_dataset_with_cache_dir[0]
responses.add(responses.HEAD, f"{sb._object_service_endpoint(ds)}/{ds.namespace}/{ds.name}",
headers={}, status=404)
responses.add(responses.HEAD, f"{sb._object_service_endpoint(ds)}/{ds.namespace}/{ds.name}",
headers={'x-access-level': 'r'}, status=200)
with pytest.raises(ValueError):
sb.prepare_pull(ds, [])
sb.configuration['username'] = "test-user"
with pytest.raises(ValueError):
sb.prepare_pull(ds, [])
sb.configuration['gigantum_bearer_token'] = "asdf"
with pytest.raises(ValueError):
sb.prepare_pull(ds, [])
sb.configuration['gigantum_id_token'] = "1234"
with pytest.raises(IOError):
# missing header
sb.prepare_pull(ds, [])
sb.prepare_pull(ds, [])
示例22
def test_download(tmp_path, mock_zst):
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.zst"
url_version = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.version"
db_path = tmp_path / "prova.json"
db.register(db_path, url, 1)
responses.add(responses.GET, url_version, status=200, body="1")
responses.add(
responses.HEAD,
url,
status=200,
headers={
"ETag": "123",
"Accept-Encoding": "zstd",
"Last-Modified": "2019-04-16",
},
)
tmp_zst_path = tmp_path / "prova_tmp.zst"
mock_zst(tmp_zst_path)
with open(tmp_zst_path, "rb") as content:
responses.add(responses.GET, url, status=200, body=content.read())
assert db.download(db_path)
assert db.download(db_path)
assert db.last_modified(db_path) == datetime(2019, 4, 16)
assert os.path.exists(db_path)
assert not os.path.exists(db_path.with_suffix(db_path.suffix + ".zst"))
assert os.path.exists(db_path.with_suffix(db_path.suffix + ".zst.etag"))
示例23
def test_download_old_schema(tmp_path, mock_zst):
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.zst"
url_version = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.version"
db_path = tmp_path / "prova.json"
db.register(db_path, url, 2)
responses.add(responses.GET, url_version, status=200, body="1")
responses.add(
responses.HEAD,
url,
status=200,
headers={
"ETag": "123",
"Accept-Encoding": "zstd",
"Last-Modified": "2019-04-16",
},
)
tmp_zst_path = tmp_path / "prova_tmp.zst"
mock_zst(tmp_zst_path)
with open(tmp_zst_path, "rb") as content:
responses.add(responses.GET, url, status=200, body=content.read())
assert not db.download(db_path)
assert db.last_modified(db_path) == datetime(2019, 4, 16)
assert not os.path.exists(db_path)
assert not os.path.exists(db_path.with_suffix(db_path.suffix + ".zst"))
assert not os.path.exists(db_path.with_suffix(db_path.suffix + ".zst.etag"))
示例24
def test_download_support_file_missing(tmp_path, caplog):
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/commits.json.zst"
url_version = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.version"
support_filename = "support_mock.zst"
url_support = urljoin(url, support_filename)
db_path = tmp_path / "prova.json"
db.register(db_path, url, 1, support_files=[support_filename])
responses.add(responses.GET, url_version, status=404)
responses.add(
responses.HEAD,
url_support,
status=404,
headers={"ETag": "123", "Accept-Encoding": "zstd"},
)
responses.add(
responses.GET,
url_support,
status=404,
body=requests.exceptions.HTTPError("HTTP error"),
)
assert not db.download_support_file(db_path, support_filename)
expected_message = f"Version file is not yet available to download for {db_path}"
assert expected_message in caplog.text
示例25
def test_download_check_etag():
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"
responses.add(
responses.HEAD,
url,
status=200,
headers={"ETag": "123", "Last-Modified": "2019-04-16",},
)
responses.add(responses.GET, url, status=200, body="prova")
assert utils.download_check_etag(url)
assert os.path.exists("prova.txt")
with open("prova.txt", "r") as f:
assert f.read() == "prova"
assert utils.download_check_etag(url, "data/prova2.txt")
assert os.path.exists("data/prova2.txt")
assert not os.path.exists("prova2.txt")
with open("data/prova2.txt", "r") as f:
assert f.read() == "prova"
示例26
def test_download_check_etag_changed():
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"
responses.add(
responses.HEAD,
url,
status=200,
headers={"ETag": "123", "Last-Modified": "2019-04-16",},
)
responses.add(responses.GET, url, status=200, body="prova")
responses.add(
responses.HEAD,
url,
status=200,
headers={"ETag": "456", "Last-Modified": "2019-04-16",},
)
responses.add(responses.GET, url, status=200, body="prova2")
assert utils.download_check_etag(url)
assert os.path.exists("prova.txt")
with open("prova.txt", "r") as f:
assert f.read() == "prova"
assert utils.download_check_etag(url)
assert os.path.exists("prova.txt")
with open("prova.txt", "r") as f:
assert f.read() == "prova2"
示例27
def test_get_last_modified():
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"
responses.add(
responses.HEAD, url, status=200, headers={"Last-Modified": "2019-04-16",},
)
assert utils.get_last_modified(url) == datetime(2019, 4, 16, 0, 0)
示例28
def test_get_last_modified_not_present():
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"
responses.add(
responses.HEAD, url, status=200, headers={"ETag": "123"},
)
assert utils.get_last_modified(url) is None
示例29
def test_get_last_modified_missing():
url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"
responses.add(
responses.HEAD, url, status=404, headers={},
)
assert utils.get_last_modified(url) is None
示例30
def test_remove_from_search_after_sync(self):
"""When an image is removed from the source, it should be removed from the search engine"""
self._index_img(self.removed)
s = self.s.query(Q("match", title="removed"))
r = s.execute()
self.assertEquals(1, r.hits.total)
with responses.RequestsMock() as rsps:
rsps.add(responses.HEAD, FOREIGN_URL + TEST_IMAGE_REMOVED, status=404)
self.removed.sync()
signals._update_search_index(self.removed)
self.es.indices.refresh()
s = self.s.query(Q("match", title="removed"))
r = s.execute()
self.assertEquals(0, r.hits.total)