Python源码示例:responses.add_callback()

示例1
def test_retry(self):
        class request_callback:
            def __init__(self):
                self.first_req = True

            def __call__(self, req):
                if self.first_req:
                    self.first_req = False
                    return (500, {}, "Internal Server Error.")
                return (200, {}, '{"speedLimits":[]}')

        responses.add_callback(
            responses.GET,
            "https://roads.googleapis.com/v1/speedLimits",
            content_type="application/json",
            callback=request_callback(),
        )

        self.client.speed_limits([])

        self.assertEqual(2, len(responses.calls))
        self.assertEqual(responses.calls[0].request.url, responses.calls[1].request.url) 
示例2
def test_retry(self):
        class request_callback:
            def __init__(self):
                self.first_req = True

            def __call__(self, req):
                if self.first_req:
                    self.first_req = False
                    return (200, {}, '{"status":"OVER_QUERY_LIMIT"}')
                return (200, {}, '{"status":"OK","results":[]}')

        responses.add_callback(
            responses.GET,
            "https://maps.googleapis.com/maps/api/geocode/json",
            content_type="application/json",
            callback=request_callback(),
        )

        client = googlemaps.Client(key="AIzaasdf")
        client.geocode("Sesame St.")

        self.assertEqual(2, len(responses.calls))
        self.assertEqual(responses.calls[0].request.url, responses.calls[1].request.url) 
示例3
def test_retry_intermittent(self):
        class request_callback:
            def __init__(self):
                self.first_req = True

            def __call__(self, req):
                if self.first_req:
                    self.first_req = False
                    return (500, {}, "Internal Server Error.")
                return (200, {}, '{"status":"OK","results":[]}')

        responses.add_callback(
            responses.GET,
            "https://maps.googleapis.com/maps/api/geocode/json",
            content_type="application/json",
            callback=request_callback(),
        )

        client = googlemaps.Client(key="AIzaasdf")
        client.geocode("Sesame St.")

        self.assertEqual(2, len(responses.calls)) 
示例4
def test_token_cached_per_repo(self):
        responses.add(responses.GET, BEARER_REALM_URL + '?scope=repository:fedora:pull',
                      json={'token': BEARER_TOKEN}, match_querystring=True)
        responses.add(responses.GET, BEARER_REALM_URL + '?scope=repository:centos:pull',
                      json={'token': BEARER_TOKEN}, match_querystring=True)

        fedora_url = 'https://registry.example.com/v2/fedora/tags/list'
        responses.add_callback(responses.GET, fedora_url, callback=bearer_unauthorized_callback)
        responses.add(responses.GET, fedora_url, status=200, json='fedora-success')
        responses.add(responses.GET, fedora_url, status=200, json='fedora-success-also')

        centos_url = 'https://registry.example.com/v2/centos/tags/list'
        responses.add_callback(responses.GET, centos_url, callback=bearer_unauthorized_callback)
        responses.add(responses.GET, centos_url, status=200, json='centos-success')
        responses.add(responses.GET, centos_url, status=200, json='centos-success-also')

        auth = HTTPBearerAuth()

        assert requests.get(fedora_url, auth=auth).json() == 'fedora-success'
        assert requests.get(fedora_url, auth=auth).json() == 'fedora-success-also'

        assert requests.get(centos_url, auth=auth).json() == 'centos-success'
        assert requests.get(centos_url, auth=auth).json() == 'centos-success-also'

        assert len(responses.calls) == 8 
示例5
def test_non_401_error_propagated(self):

        def bearer_teapot_callback(request):
            headers = {'www-authenticate': 'Bearer realm={}'.format(BEARER_REALM_URL)}
            return (418, headers, json.dumps("I'm a teapot!"))

        url = 'https://registry.example.com/v2/fedora/tags/list'
        responses.add_callback(responses.GET, url, callback=bearer_teapot_callback)
        responses.add(responses.GET, url, status=200, json='success')  # Not actually called

        auth = HTTPBearerAuth()

        response = requests.get(url, auth=auth)
        assert response.json() == "I'm a teapot!"
        assert response.status_code == 418
        assert len(responses.calls) == 1 
示例6
def test_not_bearer_auth(self):
        url = 'https://registry.example.com/v2/fedora/tags/list'

        def unsupported_callback(request):
            headers = {'www-authenticate': 'Spam realm={}'.format(BEARER_REALM_URL)}
            return (401, headers, json.dumps('unauthorized'))

        responses.add_callback(responses.GET, url, callback=unsupported_callback)
        responses.add(responses.GET, url, status=200, json='success')  # Not actually called

        auth = HTTPBearerAuth()

        response = requests.get(url, auth=auth)
        assert response.json() == 'unauthorized'
        assert response.status_code == 401
        assert len(responses.calls) == 1 
示例7
def test_get_identities(self):
        """
        The get_identities function should call the correct URL, and return the relevant identities.
        """
        identity_store = IdentityStore("http://identitystore.org/", "auth-token", "msisdn")

        responses.add_callback(
            responses.GET,
            "http://identitystore.org/api/v1/identities/?details__name=test",
            match_querystring=True,
            callback=self.get_identities_callback,
            content_type="application/json",
        )

        [identity] = identity_store.get_identities(details__name="test")
        self.assertEqual(identity.name, "test") 
示例8
def test_get_identities_for_address(self):
        """
        The get_identities_for_address to create the correct request to list all of the identities for a specified
        address and address type.
        """
        identity_store = IdentityStore("http://identitystore.org/", "auth-token", "msisdn")

        query = "?details__addresses__msisdn=%2B1234"
        url = "http://identitystore.org/api/v1/identities/search/"
        responses.add_callback(
            responses.GET,
            url + query,
            callback=self.single_identity_callback,
            match_querystring=True,
            content_type="application/json",
        )

        [identity] = identity_store.get_identities_for_address("+1234")
        self.assertEqual(identity["details"]["addresses"]["msisdn"], {"+1234": {}}) 
示例9
def test_create_identity(self):
        """
        The create_identity method should make the correct request to create an identity with the correct details in the
        identity store.
        """
        identity_store = IdentityStore("http://identitystore.org/", "auth-token", "msisdn")

        url = "http://identitystore.org/api/v1/identities/"
        responses.add_callback(
            responses.POST,
            url,
            callback=self.create_identity_callback,
            match_querystring=True,
            content_type="application/json",
        )

        identity = identity_store.create_identity(["tel:+1234"], name="Test identity", language="eng")
        self.assertEqual(identity["details"], {"addresses": {"msisdn": {"+1234": {}}}, "default_addr_type": "msisdn"}) 
示例10
def test_should_post_changelog(self, mock_token):
        def request_callback(request):
            payload = json.loads(request.body)
            self.assertEqual(payload["tag_name"], "v1.0.0")
            self.assertEqual(payload["body"], "text")
            self.assertEqual(payload["draft"], False)
            self.assertEqual(payload["prerelease"], False)
            self.assertEqual("token super-token", request.headers["Authorization"])

            return 201, {}, json.dumps({})

        responses.add_callback(
            responses.POST,
            self.url,
            callback=request_callback,
            content_type="application/json",
        )
        status = Github.post_release_changelog("relekang", "rmoq", "1.0.0", "text")
        self.assertTrue(status) 
示例11
def testRetryDelay():
    global call
    call = 0
    def request_callback(request):
        global call
        if call == 0 :
            call = 1
            return (200, headers,  '{"meta":{"code":999,"message":"PROBLEM","retryable":true, "details":[]},"data":{}}')
        elif call == 1 :
            call = 2
            "second attempt"
            return (200, headers,  '{"meta":{"code":999,"message":"PROBLEM","retryable":true,"details":[]},"data":{}}')
        elif call == 2 :
            call = 3
            return (200, headers,  '{"meta":{"code":200,"message":"OK","details":[]},"data":{}}')
    responses.add_callback(responses.GET, 'https://REGION-api.postmen.com/v3/labels', callback=request_callback)
    api = Postmen('KEY', 'REGION')
    api.get('labels')
    responses.reset()

# TEST 9 
示例12
def testRetryMaxAttempt(monkeypatch):
    monkeypatch.setattr(time, 'sleep', lambda s: None)
    global call
    call = 0
    def request_callback(request):
        global call
        if call < 4 :
            call += 1
            return (200, headers,  '{"meta":{"code":999,"message":"PROBLEM","retryable":true, "details":[]},"data":{}}')
        else :
            return (200, headers,  '{"meta":{"code":200,"message":"OK","details":[]},"data":{}}')
    responses.add_callback(responses.GET, 'https://REGION-api.postmen.com/v3/labels', callback=request_callback)
    api = Postmen('KEY', 'REGION')
    before = time.time()
    api.get('labels')
    after = time.time()
    responses.reset()
    monkeypatch.setattr(time, 'sleep', lambda s: None)

# TEST 10 
示例13
def testRetryMaxAttemptExceeded(monkeypatch):
    monkeypatch.setattr(time, 'sleep', lambda s: None)
    global call
    call = 0
    def request_callback(request):
        global call
        if call < 5 :
            call += 1
            return (200, headers,  '{"meta":{"code":999,"message":"PROBLEM","retryable":true, "details":[]},"data":{}}')
        else :
            pytest.fail("Maximum 5 attempts of retry, test #10 failed")
    responses.add_callback(responses.GET, 'https://REGION-api.postmen.com/v3/labels', callback=request_callback)
    api = Postmen('KEY', 'REGION')
    with pytest.raises(PostmenException) as e:
        api.get('labels')
    responses.reset()
    monkeypatch.setattr(time, 'sleep', lambda s: None)
    assert "PROBLEM" in str(e.value.message())
    assert 999 == e.value.code()

# TEST 11 
示例14
def testArguments11(monkeypatch):
    monkeypatch.setattr(time, 'sleep', lambda s: None)
    global call
    call = 0
    def request_callback(request):
        global call
        if call < 1 :
            call += 1
            return (200, headers,  '{"meta":{"code":999,"message":"PROBLEM","retryable":true, "details":[]},"data":{}}')
        else :
            pytest.fail("Shall not retry if retry = False, test #11 failed")
    responses.add_callback(responses.GET, 'https://REGION-api.postmen.com/v3/labels', callback=request_callback)
    api = Postmen('KEY', 'REGION', retry=False)
    with pytest.raises(PostmenException) as e:
        api.get('labels')
    responses.reset()
    monkeypatch.setattr(time, 'sleep', lambda s: None)
    assert "PROBLEM" in str(e.value.message())
    assert 999 == e.value.code()
    assert e.value.retryable()

# TEST 12 
示例15
def testArgument12(monkeypatch):
    monkeypatch.setattr(time, 'sleep', lambda s: None)
    global call
    call = 0
    def request_callback(request):
        global call
        if call == 0 :
            call = 1
            return (200, headers,  '{"meta":{"code":999,"message":"PROBLEM","retryable":false, "details":[]},"data":{}}')
        elif call == 1 :
            pytest.fail("Shall not retry if non retryable, test #12 failed")
    responses.add_callback(responses.GET, 'https://REGION-api.postmen.com/v3/labels', callback=request_callback)
    api = Postmen('KEY', 'REGION')
    with pytest.raises(PostmenException) as e:
        api.get('labels')
    responses.reset()
    # print(e)
    assert "PROBLEM" in str(e.value.message())
    assert not e.value.retryable()
    monkeypatch.setattr(time, 'sleep', lambda s: None)

# TEST 13 
示例16
def test_ingest_with_existing_banner_image(self):
        TieredCache.dangerous_clear_all_tiers()
        programs = self.mock_api()

        for program_data in programs:
            banner_image_url = program_data.get('banner_image_urls', {}).get('w1440h480')
            if banner_image_url:
                responses.add_callback(
                    responses.GET,
                    banner_image_url,
                    callback=mock_jpeg_callback(),
                    content_type=JPEG
                )

        self.loader.ingest()
        # Verify the API was called with the correct authorization header
        self.assert_api_called(6)

        for program in programs:
            self.assert_program_loaded(program)
            self.assert_program_banner_image_loaded(program) 
示例17
def test_token_expired_automatically_refresh_authentication(self):
        self.first_call = True

        def request_callback(request):
            if self.first_call:
                self.first_call = False
                return (401, {'content_type': 'application/json'}, json.dumps({"error": "Token expired"}))
            else:
                self.first_call = None
                return (201, {'content_type': 'application/json'}, '')

        responses.add_callback(
            responses.POST, self.wrapper.test().data,
            callback=request_callback,
            content_type='application/json',
        )

        response = self.wrapper.test().post()

        # refresh_authentication method should be able to update api_params
        self.assertEqual(response._api_params['token'], 'new_token') 
示例18
def test_raises_error_if_refresh_authentication_method_returns_falsy_value(self):
        client = FailTokenRefreshClient(token='token', refresh_token_by_default=True)

        self.first_call = True

        def request_callback(request):
            if self.first_call:
                self.first_call = False
                return (401, {}, '')
            else:
                self.first_call = None
                return (201, {}, '')

        responses.add_callback(
            responses.POST, client.test().data,
            callback=request_callback,
            content_type='application/json',
        )

        with self.assertRaises(ClientError):
            client.test().post() 
示例19
def test_stores_refresh_authentication_method_response_for_further_access(self):
        self.first_call = True

        def request_callback(request):
            if self.first_call:
                self.first_call = False
                return (401, {}, '')
            else:
                self.first_call = None
                return (201, {}, '')

        responses.add_callback(
            responses.POST, self.wrapper.test().data,
            callback=request_callback,
            content_type='application/json',
        )

        response = self.wrapper.test().post()

        self.assertEqual(response().refresh_data, 'new_token') 
示例20
def test_workflow_csrf_token():
    """ Test a simple http get and the extraction of a css selector """

    body_contact = b'<form><input type="hidden" name="csrf" id="csrf" value="token_1337"><input type="text" name="name"></form>'
    body_success = b'<div id=error>You have an error in your SQL syntax</div>'

    def request_callback_csrf(request):
        """ Checks for a valid CSRF token (token_1337) and returns a fake 'sql injection success' message """
        assert 'csrf=token_1337' in request.body

        return (200, {}, body_success)

    responses.add(responses.GET, 'http://test.com/contact',
                  body=body_contact, status=200,
                  content_type='text/html')

    responses.add_callback(
        responses.POST, 'http://test.com/contact',
        callback=request_callback_csrf,
        content_type='text/html',
    )

    blocks = explo.core.load_blocks(BLOCKS_CSRF)

    assert explo.core.process_blocks(blocks) 
示例21
def test_registry_session(tmpdir, registry, insecure, method, responses_method, config_content):
    temp_dir = mkdtemp(dir=str(tmpdir))
    with open(os.path.join(temp_dir, '.dockercfg'), 'w+') as dockerconfig:
        dockerconfig.write(json.dumps({
            registry_hostname(registry): config_content
        }))
    session = RegistrySession(registry, insecure=insecure, dockercfg_path=temp_dir)
    assert session.insecure == insecure
    assert session.dockercfg_path == temp_dir

    path = '/v2/test/image/manifests/latest'
    if registry.startswith('http'):
        url = registry + path
    elif insecure:
        https_url = 'https://' + registry + path
        responses.add(responses_method, https_url, body=requests.ConnectionError())
        url = 'http://' + registry + path
    else:
        url = 'https://' + registry + path

    def request_callback(request, all_headers=True):
        assert request.headers.get('Authorization') is not None
        return (200, {}, 'A-OK')

    responses.add_callback(responses_method, url, request_callback)

    res = method(session, path)
    assert res.text == 'A-OK' 
示例22
def test_create_compose(odcs_client, source, source_type, packages, expected_packages, sigkeys,
                        modular_koji_tags, expected_koji_tags,
                        arches, flags, multilib_arches, multilib_method, expected_method):

    def handle_composes_post(request):
        assert_request_token(request, odcs_client.session)

        if isinstance(request.body, six.text_type):
            body = request.body
        else:
            body = request.body.decode()
        body_json = json.loads(body)

        assert body_json['source']['type'] == source_type
        assert body_json['source']['source'] == source
        assert body_json['source'].get('packages') == expected_packages
        assert body_json['source'].get('sigkeys') == sigkeys
        assert body_json['source'].get('modular_koji_tags') == expected_koji_tags
        assert body_json.get('flags') == flags
        assert body_json.get('arches') == arches
        assert body_json.get('multilib_arches') == multilib_arches
        if expected_method is not None:
            assert sorted(body_json.get('multilib_method')) == sorted(expected_method)
        else:
            assert 'multilib_method' not in body_json
        return (200, {}, compose_json(0, 'wait', source_type=source_type, source=source))

    responses.add_callback(responses.POST, '{}composes/'.format(ODCS_URL),
                           content_type='application/json',
                           callback=handle_composes_post)

    odcs_client.start_compose(source_type=source_type, source=source, packages=packages,
                              sigkeys=sigkeys, arches=arches, flags=flags,
                              multilib_arches=multilib_arches, multilib_method=multilib_method,
                              modular_koji_tags=modular_koji_tags) 
示例23
def test_wait_for_compose(odcs_client, final_state_id, final_state_name, expect_exc, state_reason):
    state = {'count': 1}

    def handle_composes_get(request):
        assert_request_token(request, odcs_client.session)

        if state['count'] == 1:
            response_json = compose_json(1, 'generating')
        else:
            response_json = compose_json(final_state_id, final_state_name,
                                         state_reason=state_reason)
        state['count'] += 1

        return (200, {}, response_json)

    responses.add_callback(responses.GET, '{}composes/{}'.format(ODCS_URL, COMPOSE_ID),
                           content_type='application/json',
                           callback=handle_composes_get)

    (flexmock(time)
        .should_receive('sleep')
        .and_return(None))

    if expect_exc:
        with pytest.raises(RuntimeError) as exc_info:
            odcs_client.wait_for_compose(COMPOSE_ID)
        assert expect_exc in str(exc_info.value)
    else:
        odcs_client.wait_for_compose(COMPOSE_ID) 
示例24
def test_renew_compose(odcs_client):
    new_compose_id = COMPOSE_ID + 1

    def handle_composes_patch(request):
        assert_request_token(request, odcs_client.session)
        return (200, {}, compose_json(0, 'generating', compose_id=new_compose_id))

    responses.add_callback(responses.PATCH, '{}composes/{}'.format(ODCS_URL, COMPOSE_ID),
                           content_type='application/json',
                           callback=handle_composes_patch)

    odcs_client.renew_compose(COMPOSE_ID)
    odcs_client.renew_compose(COMPOSE_ID, ['SIGKEY1', 'SIGKEY2']) 
示例25
def test_request_sources(additional_params, caplog):
    response_data = {'id': CACHITO_REQUEST_ID}

    def handle_request_sources(http_request):
        body_json = json.loads(http_request.body)

        assert body_json['repo'] == CACHITO_REQUEST_REPO
        assert body_json['ref'] == CACHITO_REQUEST_REF

        for key, value in additional_params.items():
            if value is not None:
                assert body_json[key] == value
            else:
                assert key not in body_json

        return (201, {}, json.dumps(response_data))

    responses.add_callback(
        responses.POST,
        '{}/api/v1/requests'.format(CACHITO_URL),
        content_type='application/json',
        callback=handle_request_sources)

    api = CachitoAPI(CACHITO_URL)
    response = api.request_sources(CACHITO_REQUEST_REPO, CACHITO_REQUEST_REF, **additional_params)
    assert response['id'] == CACHITO_REQUEST_ID

    response_json = 'Cachito response:\n{}'.format(json.dumps(response_data, indent=4))
    # Since Python 3.7 logger adds additional whitespaces by default -> checking without them
    assert re.sub(r'\s+', " ", response_json) in re.sub(r'\s+', " ", caplog.text) 
示例26
def test_wait_for_unsuccessful_request(error_state, error_reason, caplog):
    states = ['in_progress', 'in_progress', error_state]
    expected_total_responses_calls = len(states)

    def handle_wait_for_request(http_request):
        state = states.pop(0)
        return (200, {}, json.dumps({'state_reason': error_reason,
                                     'repo': CACHITO_REQUEST_REPO,
                                     'state': state,
                                     'ref': CACHITO_REQUEST_REF,
                                     'id': CACHITO_REQUEST_ID
                                     }))

    responses.add_callback(
        responses.GET,
        '{}/api/v1/requests/{}'.format(CACHITO_URL, CACHITO_REQUEST_ID),
        content_type='application/json',
        callback=handle_wait_for_request)

    burst_params = {'burst_retry': 0.001, 'burst_length': 0.5}
    with pytest.raises(CachitoAPIUnsuccessfulRequest):
        CachitoAPI(CACHITO_URL).wait_for_request(CACHITO_REQUEST_ID, **burst_params)
    assert len(responses.calls) == expected_total_responses_calls

    failed_response_json = json.dumps(
        {'state_reason': error_reason,
         'repo': CACHITO_REQUEST_REPO,
         'state': error_state,
         'ref': CACHITO_REQUEST_REF,
         'id': CACHITO_REQUEST_ID
         },
        indent=4
    )
    expect_in_logs = dedent(
        """\
        Request {} is in "{}" state: {}
        Details: {}
        """
    ).format(CACHITO_REQUEST_ID, error_state, error_reason, failed_response_json)
    # Since Python 3.7 logger adds additional whitespaces by default -> checking without them
    assert re.sub(r'\s+', " ", expect_in_logs) in re.sub(r'\s+', " ", caplog.text) 
示例27
def test_check_CachitoAPIUnsuccessfulRequest_text(error_state, error_reason, caplog):
    states = ['in_progress', 'in_progress', error_state]
    expected_total_responses_calls = len(states)

    cachito_request_url = '{}/api/v1/requests/{}'.format(CACHITO_URL, CACHITO_REQUEST_ID)

    def handle_wait_for_request(http_request):
        state = states.pop(0)
        return (200, {}, json.dumps({'state_reason': error_reason,
                                     'repo': CACHITO_REQUEST_REPO,
                                     'state': state,
                                     'ref': CACHITO_REQUEST_REF,
                                     'id': CACHITO_REQUEST_ID
                                     }))

    responses.add_callback(
        responses.GET,
        '{}/api/v1/requests/{}'.format(CACHITO_URL, CACHITO_REQUEST_ID),
        content_type='application/json',
        callback=handle_wait_for_request)

    burst_params = {'burst_retry': 0.001, 'burst_length': 0.5}
    expected_exc_text = dedent('''\
                               Cachito request is in "{}" state, reason: {}
                               Request {} ({}) tried to get repo '{}' at reference '{}'.
                               '''.format(error_state, error_reason, CACHITO_REQUEST_ID,
                                          cachito_request_url, CACHITO_REQUEST_REPO,
                                          CACHITO_REQUEST_REF))
    with pytest.raises(CachitoAPIUnsuccessfulRequest) as excinfo:
        CachitoAPI(CACHITO_URL).wait_for_request(CACHITO_REQUEST_ID, **burst_params)
    assert len(responses.calls) == expected_total_responses_calls
    assert expected_exc_text in str(excinfo.value) 
示例28
def test_token_negotiation(self, repo_doesnt_exist_401, auth_b64, username, password,
                               basic_auth):

        def bearer_realm_callback(request):
            # Verify if username and password were provided, token is negotiated
            # with realm via basic auth.
            if basic_auth:
                creds = auth_b64 or b64encode(username, password)
                assert request.headers['authorization'] == 'Basic {}'.format(creds)
            else:
                assert 'authorization' not in request.headers

            return (200, {}, json.dumps({'token': BEARER_TOKEN}))

        responses.add_callback(responses.GET, BEARER_REALM_URL + '?scope=repository:fedora:pull',
                               callback=bearer_realm_callback, match_querystring=True)

        url = 'https://registry.example.com/v2/fedora/tags/list'

        responses.add_callback(responses.GET, url, callback=bearer_unauthorized_callback)
        if repo_doesnt_exist_401:
            responses.add_callback(responses.GET, url, callback=bearer_unauthorized_callback)
        else:
            responses.add_callback(responses.GET, url, callback=bearer_success_callback)

        auth = HTTPBearerAuth(username=username, password=password, auth_b64=auth_b64)
        response = requests.get(url, auth=auth)

        if repo_doesnt_exist_401:
            assert response.json() == 'unauthorized'
            assert response.status_code == requests.codes.not_found
        else:
            assert response.json() == 'success'
        assert len(responses.calls) == 3 
示例29
def test_repo_extracted_from_url(self, partial_url, repo):
        responses.add(responses.GET, '{}?scope=repository:{}:pull'.format(BEARER_REALM_URL, repo),
                      json={'token': BEARER_TOKEN}, match_querystring=True)

        repo_url = 'https://registry.example.com/v2/{}/{}'.format(repo, partial_url)
        responses.add_callback(responses.GET, repo_url, callback=bearer_unauthorized_callback)
        responses.add(responses.GET, repo_url, status=200, json='success')

        auth = HTTPBearerAuth()

        assert requests.get(repo_url, auth=auth).json() == 'success' 
示例30
def test_v2(self, auth_b64, username, password, auth_type):
        bearer_token = 'the-bearer-token'
        realm_url = 'https://registry.example.com/v2/auth'

        responses.add(responses.GET, '{}?scope=repository:fedora:pull'.format(realm_url),
                      json={'token': bearer_token}, match_querystring=True)

        def auth_callback(request):
            if auth_type == 'basic':
                creds = auth_b64 or b64encode(username, password)
                assert request.headers['authorization'] == 'Basic {}'.format(creds)
                return (200, {}, json.dumps('success'))

            if auth_type == 'bearer':
                header_value = 'Bearer {}'.format(bearer_token)
                if header_value != request.headers.get('authorization'):
                    auth_headers = {'www-authenticate': 'Bearer realm={}'.format(realm_url)}
                    return (401, auth_headers, json.dumps('unauthorized'))
                else:
                    return (200, {}, json.dumps('success'))

            assert 'authorization' not in request.headers
            return (200, {}, json.dumps('success'))

        repo_url = 'https://registry.example.com/v2/fedora/tags/list'
        responses.add_callback(responses.GET, repo_url, callback=auth_callback)

        auth = HTTPRegistryAuth(username=username, password=password, auth_b64=auth_b64)
        assert requests.get(repo_url, auth=auth).json() == 'success'
        if (username and password) or auth_b64:
            assert len(auth.v2_auths) == 2
            assert isinstance(auth.v2_auths[0], HTTPBearerAuth)
            assert isinstance(auth.v2_auths[1], (HTTPBasicAuth, HTTPBasicAuthWithB64))
        else:
            assert len(auth.v2_auths) == 1
            assert isinstance(auth.v2_auths[0], HTTPBearerAuth)