Python源码示例:responses.HEAD

示例1
def test_cached_path_extract_remote_tar(self):
        url = "http://fake.datastore.com/utf-8.tar.gz"
        byt = open(self.tar_file, "rb").read()

        responses.add(
            responses.GET,
            url,
            body=byt,
            status=200,
            content_type="application/tar+gzip",
            stream=True,
            headers={"Content-Length": str(len(byt))},
        )
        responses.add(
            responses.HEAD, url, status=200, headers={"ETag": "fake-etag"},
        )

        extracted = cached_path(url, cache_dir=self.TEST_DIR, extract_archive=True)
        assert extracted.endswith("-extracted")
        self.check_extracted(extracted) 
示例2
def test_cached_path_extract_remote_zip(self):
        url = "http://fake.datastore.com/utf-8.zip"
        byt = open(self.zip_file, "rb").read()

        responses.add(
            responses.GET,
            url,
            body=byt,
            status=200,
            content_type="application/zip",
            stream=True,
            headers={"Content-Length": str(len(byt))},
        )
        responses.add(
            responses.HEAD, url, status=200, headers={"ETag": "fake-etag"},
        )

        extracted = cached_path(url, cache_dir=self.TEST_DIR, extract_archive=True)
        assert extracted.endswith("-extracted")
        self.check_extracted(extracted) 
示例3
def mock_component_taskcluster_artifact() -> None:
    responses.add(
        responses.HEAD,
        "https://firefox-ci-tc.services.mozilla.com/api/index/v1/task/gecko.v2.mozilla-central.latest.source.source-bugzilla-info/artifacts/public/components.json",
        status=200,
        headers={"ETag": "100"},
    )

    responses.add(
        responses.GET,
        "https://firefox-ci-tc.services.mozilla.com/api/index/v1/task/gecko.v2.mozilla-central.latest.source.source-bugzilla-info/artifacts/public/components.json",
        status=200,
        json={},
    )

    repository.download_component_mapping() 
示例4
def test_download_missing(tmp_path, mock_zst):
    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.zst"
    url_version = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.version"

    db_path = tmp_path / "prova.json"
    db.register(db_path, url, 1)

    responses.add(
        responses.HEAD,
        url,
        status=404,
        headers={"ETag": "123", "Accept-Encoding": "zstd"},
    )

    responses.add(
        responses.GET, url, status=404, body=requests.exceptions.HTTPError("HTTP error")
    )

    responses.add(responses.GET, url_version, status=404)

    assert not db.download(db_path)
    assert not os.path.exists(db_path)

    with pytest.raises(Exception, match="Last-Modified is not available"):
        db.last_modified(db_path) 
示例5
def test_trainer():
    # Pretend the DB was already downloaded and no new DB is available.

    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_bugs.latest/artifacts/public/bugs.json"

    responses.add(
        responses.GET,
        f"{url}.version",
        status=200,
        body=str(db.DATABASES[bugzilla.BUGS_DB]["version"]),
    )

    responses.add(
        responses.HEAD, f"{url}.zst", status=200, headers={"ETag": "etag"},
    )

    trainer.Trainer().go(trainer.parse_args(["defect"])) 
示例6
def test_download_check_missing():
    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"

    responses.add(
        responses.HEAD,
        url,
        status=404,
        headers={"ETag": "123", "Last-Modified": "2019-04-16",},
    )

    responses.add(
        responses.GET, url, status=404, body=requests.exceptions.HTTPError("HTTP error")
    )

    with pytest.raises(requests.exceptions.HTTPError, match="HTTP error"):
        utils.download_check_etag(url)

    assert not os.path.exists("prova.txt") 
示例7
def test_lazy_git():
    lazy_git = LazyGit(git_url=DOCKERFILE_GIT)
    with lazy_git:
        assert lazy_git.git_path is not None
        assert lazy_git.commit_id is not None
        assert len(lazy_git.commit_id) == 40  # current git hashes are this long

        previous_commit_id = lazy_git.commit_id
        lazy_git.reset('HEAD~2')  # Go back two commits
        assert lazy_git.commit_id is not None
        assert lazy_git.commit_id != previous_commit_id
        assert len(lazy_git.commit_id) == 40  # current git hashes are this long 
示例8
def __init__(self, registry):
        self.hostname = registry_hostname(registry)
        self.repos = {}
        self._add_pattern(responses.GET, r'/v2/(.*)/manifests/([^/]+)',
                          self._get_manifest)
        self._add_pattern(responses.HEAD, r'/v2/(.*)/manifests/([^/]+)',
                          self._get_manifest)
        self._add_pattern(responses.PUT, r'/v2/(.*)/manifests/([^/]+)',
                          self._put_manifest)
        self._add_pattern(responses.GET, r'/v2/(.*)/blobs/([^/]+)',
                          self._get_blob)
        self._add_pattern(responses.HEAD, r'/v2/(.*)/blobs/([^/]+)',
                          self._get_blob)
        self._add_pattern(responses.POST, r'/v2/(.*)/blobs/uploads/\?mount=([^&]+)&from=(.+)',
                          self._mount_blob) 
示例9
def _add_pattern(self, method, pattern, callback):
        pat = re.compile(r'^https://' + self.hostname + pattern + '$')

        def do_it(req):
            status, headers, body = callback(req, *(pat.match(req.url).groups()))
            if method == responses.HEAD:
                return status, headers, ''
            else:
                return status, headers, body

        responses.add_callback(method, pat, do_it, match_querystring=True) 
示例10
def _add_pattern(self, method, pattern, callback):
        pat = re.compile(r'^https://' + self.hostname + pattern + '$')

        def do_it(req):
            status, headers, body = callback(req, *(pat.match(req.url).groups()))
            if method == responses.HEAD:
                return status, headers, ''
            else:
                return status, headers, body

        responses.add_callback(method, pat, do_it, match_querystring=True) 
示例11
def set_up_glove(url: str, byt: bytes, change_etag_every: int = 1000):
    # Mock response for the datastore url that returns glove vectors
    responses.add(
        responses.GET,
        url,
        body=byt,
        status=200,
        content_type="application/gzip",
        stream=True,
        headers={"Content-Length": str(len(byt))},
    )

    etags_left = change_etag_every
    etag = "0"

    def head_callback(_):
        """
        Writing this as a callback allows different responses to different HEAD requests.
        In our case, we're going to change the ETag header every `change_etag_every`
        requests, which will allow us to simulate having a new version of the file.
        """
        nonlocal etags_left, etag
        headers = {"ETag": etag}
        # countdown and change ETag
        etags_left -= 1
        if etags_left <= 0:
            etags_left = change_etag_every
            etag = str(int(etag) + 1)
        return (200, headers, "")

    responses.add_callback(responses.HEAD, url, callback=head_callback) 
示例12
def test_is_not_active_404(pydrill_instance):
    """
    :type pydrill_instance: pydrill.client.PyDrill
    """
    responses.add(**{
        'method': responses.HEAD,
        'url': 'http://localhost:8047/',
        'content_type': 'application/json',
        'status': 404,
    })
    assert pydrill_instance.is_active() is False 
示例13
def test_is_not_active_500(pydrill_instance, pydrill_url):
    """
    :type pydrill_instance: pydrill.client.PyDrill
    """
    responses.add(**{
        'method': responses.HEAD,
        'url': pydrill_url,
        'content_type': 'application/json',
        'status': 500,
    })
    assert pydrill_instance.is_active() is False 
示例14
def test_is_not_active_201(pydrill_instance, pydrill_url):
    """
    :type pydrill_instance: pydrill.client.PyDrill
    """
    responses.add(**{
        'method': responses.HEAD,
        'url': pydrill_url,
        'content_type': 'application/json',
        'status': 201,
    })
    assert pydrill_instance.is_active() is False 
示例15
def test_is_not_active_timeout(pydrill_instance):
    """
    :type pydrill_instance: pydrill.client.PyDrill
    """
    try:
        pydrill_instance.perform_request('HEAD', '/', params={'request_timeout': 0.00001})
    except TransportError as e:
        assert e.status_code == e.args[0]
        assert e.error == e.args[1]
        assert e.info == e.args[2]
        assert str(e)
    else:
        assert False

# TODO: create more tests checking other params. 
示例16
def set_up_glove(url     , byt       , change_etag_every      = 1000):
    # Mock response for the datastore url that returns glove vectors
    responses.add(
            responses.GET,
            url,
            body=byt,
            status=200,
            content_type=u'application/gzip',
            stream=True,
            headers={u'Content-Length': unicode(len(byt))}
    )

    etags_left = change_etag_every
    etag = u"0"
    def head_callback(_):
        u"""
        Writing this as a callback allows different responses to different HEAD requests.
        In our case, we're going to change the ETag header every `change_etag_every`
        requests, which will allow us to simulate having a new version of the file.
        """
        nonlocal etags_left, etag
        headers = {u"ETag": etag}
        # countdown and change ETag
        etags_left -= 1
        if etags_left <= 0:
            etags_left = change_etag_every
            etag = unicode(int(etag) + 1)
        return (200, headers, u"")

    responses.add_callback(
            responses.HEAD,
            url,
            callback=head_callback
    ) 
示例17
def mock_dataset_head():
    """A pytest fixture that creates a dataset in a temp working dir. Deletes directory after test"""
    with responses.RequestsMock() as rsps:
        rsps.add(responses.HEAD, 'https://api.gigantum.com/object-v1/tester/dataset-1',
                 headers={'x-access-level': 'a'}, status=200)
        yield 
示例18
def mock_dataset_head():
    """A pytest fixture that creates a dataset in a temp working dir. Deletes directory after test"""
    with responses.RequestsMock() as rsps:
        rsps.add(responses.HEAD, 'https://api.gigantum.com/object-v1/tester/dataset-1',
                 headers={'x-access-level': 'a'}, status=200)
        yield 
示例19
def test_prepare_push_errors(self, mock_dataset_with_cache_dir):
        sb = get_storage_backend("gigantum_object_v1")
        ds = mock_dataset_with_cache_dir[0]

        responses.add(responses.HEAD, f"{sb._object_service_endpoint(ds)}/{ds.namespace}/{ds.name}",
                      headers={'x-access-level': 'r'}, status=200)

        responses.add(responses.HEAD, f"{sb._object_service_endpoint(ds)}/{ds.namespace}/{ds.name}",
                      headers={}, status=200)

        with pytest.raises(ValueError):
            sb.prepare_push(ds, [])

        sb.configuration['username'] = "test-user"
        with pytest.raises(ValueError):
            sb.prepare_push(ds, [])

        sb.configuration['gigantum_bearer_token'] = "asdf"
        with pytest.raises(ValueError):
            sb.prepare_push(ds, [])

        sb.configuration['gigantum_id_token'] = "1234"
        with pytest.raises(IOError):
            # need more than read access to push
            sb.prepare_push(ds, [])

        with pytest.raises(IOError):
            # should not return a header
            sb.prepare_push(ds, []) 
示例20
def test_prepare_push(self, mock_dataset_with_cache_dir):
        sb = get_storage_backend("gigantum_object_v1")
        ds = mock_dataset_with_cache_dir[0]

        responses.add(responses.HEAD, f"{sb._object_service_endpoint(ds)}/{ds.namespace}/{ds.name}",
                      headers={'x-access-level': 'a'}, status=200)

        sb.set_default_configuration("test-user", "abcd", '1234')
        sb.prepare_push(ds, []) 
示例21
def test_prepare_pull(self, mock_dataset_with_cache_dir):
        sb = get_storage_backend("gigantum_object_v1")
        ds = mock_dataset_with_cache_dir[0]

        responses.add(responses.HEAD, f"{sb._object_service_endpoint(ds)}/{ds.namespace}/{ds.name}",
                      headers={}, status=404)

        responses.add(responses.HEAD, f"{sb._object_service_endpoint(ds)}/{ds.namespace}/{ds.name}",
                      headers={'x-access-level': 'r'}, status=200)

        with pytest.raises(ValueError):
            sb.prepare_pull(ds, [])

        sb.configuration['username'] = "test-user"
        with pytest.raises(ValueError):
            sb.prepare_pull(ds, [])

        sb.configuration['gigantum_bearer_token'] = "asdf"
        with pytest.raises(ValueError):
            sb.prepare_pull(ds, [])

        sb.configuration['gigantum_id_token'] = "1234"

        with pytest.raises(IOError):
            # missing header
            sb.prepare_pull(ds, [])

        sb.prepare_pull(ds, []) 
示例22
def test_download(tmp_path, mock_zst):
    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.zst"
    url_version = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.version"

    db_path = tmp_path / "prova.json"
    db.register(db_path, url, 1)

    responses.add(responses.GET, url_version, status=200, body="1")

    responses.add(
        responses.HEAD,
        url,
        status=200,
        headers={
            "ETag": "123",
            "Accept-Encoding": "zstd",
            "Last-Modified": "2019-04-16",
        },
    )

    tmp_zst_path = tmp_path / "prova_tmp.zst"
    mock_zst(tmp_zst_path)

    with open(tmp_zst_path, "rb") as content:
        responses.add(responses.GET, url, status=200, body=content.read())

    assert db.download(db_path)
    assert db.download(db_path)

    assert db.last_modified(db_path) == datetime(2019, 4, 16)

    assert os.path.exists(db_path)
    assert not os.path.exists(db_path.with_suffix(db_path.suffix + ".zst"))
    assert os.path.exists(db_path.with_suffix(db_path.suffix + ".zst.etag")) 
示例23
def test_download_old_schema(tmp_path, mock_zst):
    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.zst"
    url_version = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.version"

    db_path = tmp_path / "prova.json"
    db.register(db_path, url, 2)

    responses.add(responses.GET, url_version, status=200, body="1")

    responses.add(
        responses.HEAD,
        url,
        status=200,
        headers={
            "ETag": "123",
            "Accept-Encoding": "zstd",
            "Last-Modified": "2019-04-16",
        },
    )

    tmp_zst_path = tmp_path / "prova_tmp.zst"
    mock_zst(tmp_zst_path)

    with open(tmp_zst_path, "rb") as content:
        responses.add(responses.GET, url, status=200, body=content.read())

    assert not db.download(db_path)

    assert db.last_modified(db_path) == datetime(2019, 4, 16)

    assert not os.path.exists(db_path)
    assert not os.path.exists(db_path.with_suffix(db_path.suffix + ".zst"))
    assert not os.path.exists(db_path.with_suffix(db_path.suffix + ".zst.etag")) 
示例24
def test_download_support_file_missing(tmp_path, caplog):
    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/commits.json.zst"
    url_version = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_commits.latest/artifacts/public/prova.json.version"
    support_filename = "support_mock.zst"
    url_support = urljoin(url, support_filename)

    db_path = tmp_path / "prova.json"
    db.register(db_path, url, 1, support_files=[support_filename])

    responses.add(responses.GET, url_version, status=404)

    responses.add(
        responses.HEAD,
        url_support,
        status=404,
        headers={"ETag": "123", "Accept-Encoding": "zstd"},
    )

    responses.add(
        responses.GET,
        url_support,
        status=404,
        body=requests.exceptions.HTTPError("HTTP error"),
    )

    assert not db.download_support_file(db_path, support_filename)

    expected_message = f"Version file is not yet available to download for {db_path}"
    assert expected_message in caplog.text 
示例25
def test_download_check_etag():
    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"

    responses.add(
        responses.HEAD,
        url,
        status=200,
        headers={"ETag": "123", "Last-Modified": "2019-04-16",},
    )

    responses.add(responses.GET, url, status=200, body="prova")

    assert utils.download_check_etag(url)

    assert os.path.exists("prova.txt")

    with open("prova.txt", "r") as f:
        assert f.read() == "prova"

    assert utils.download_check_etag(url, "data/prova2.txt")

    assert os.path.exists("data/prova2.txt")

    assert not os.path.exists("prova2.txt")

    with open("data/prova2.txt", "r") as f:
        assert f.read() == "prova" 
示例26
def test_download_check_etag_changed():
    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"

    responses.add(
        responses.HEAD,
        url,
        status=200,
        headers={"ETag": "123", "Last-Modified": "2019-04-16",},
    )

    responses.add(responses.GET, url, status=200, body="prova")

    responses.add(
        responses.HEAD,
        url,
        status=200,
        headers={"ETag": "456", "Last-Modified": "2019-04-16",},
    )

    responses.add(responses.GET, url, status=200, body="prova2")

    assert utils.download_check_etag(url)

    assert os.path.exists("prova.txt")

    with open("prova.txt", "r") as f:
        assert f.read() == "prova"

    assert utils.download_check_etag(url)

    assert os.path.exists("prova.txt")

    with open("prova.txt", "r") as f:
        assert f.read() == "prova2" 
示例27
def test_get_last_modified():
    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"

    responses.add(
        responses.HEAD, url, status=200, headers={"Last-Modified": "2019-04-16",},
    )

    assert utils.get_last_modified(url) == datetime(2019, 4, 16, 0, 0) 
示例28
def test_get_last_modified_not_present():
    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"

    responses.add(
        responses.HEAD, url, status=200, headers={"ETag": "123"},
    )

    assert utils.get_last_modified(url) is None 
示例29
def test_get_last_modified_missing():
    url = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug/prova.txt"

    responses.add(
        responses.HEAD, url, status=404, headers={},
    )

    assert utils.get_last_modified(url) is None 
示例30
def test_remove_from_search_after_sync(self):
        """When an image is removed from the source, it should be removed from the search engine"""
        self._index_img(self.removed)
        s = self.s.query(Q("match", title="removed"))
        r = s.execute()
        self.assertEquals(1, r.hits.total)
        with responses.RequestsMock() as rsps:
            rsps.add(responses.HEAD, FOREIGN_URL + TEST_IMAGE_REMOVED, status=404)
            self.removed.sync()
        signals._update_search_index(self.removed)
        self.es.indices.refresh()
        s = self.s.query(Q("match", title="removed"))
        r = s.execute()
        self.assertEquals(0, r.hits.total)