Python源码示例:responses.PATCH

示例1
def test_pr_synchronize_callback(self, config):
        """Test that a snooze label is removed from PRs when a new commit is
        pushed."""
        responses.add(
            responses.PATCH,
            "https://api.github.com/repos/octocat/Hello-World/issues/1347")
        responses.add(
            responses.GET,
            "https://api.github.com/repos/baxterthehacker/public-repo/issues/1",
            body=github_responses.SNOOZED_ISSUE_GET)
        r = snooze.github_callback(
            "pull_request",
            json.loads(github_responses.PULL_REQUEST),
            (config["github_username"], config["github_token"]),
            config["snooze_label"],
            config["ignore_members_of"])
        assert r is True
        assert len(responses.calls) == 2 
示例2
def test_perform_checksum(self):
        self.uploader.upload_checksum = True
        tus_request = request.TusRequest(self.uploader)

        with open('LICENSE', 'r') as stream, responses.RequestsMock() as resps:
            license_ = stream.read()
            encoded_file = license_.encode('utf-8')
            expected_checksum = "sha1 " + \
                base64.standard_b64encode(hashlib.sha1(
                    encoded_file).digest()).decode("ascii")

            sent_checksum = ''
            def validate_headers(req):
                nonlocal sent_checksum
                sent_checksum = req.headers['upload-checksum']
                return (204, {}, None)

            resps.add_callback(responses.PATCH, self.url, callback=validate_headers)
            tus_request.perform()
            self.assertEqual(sent_checksum, expected_checksum) 
示例3
def test_cli_dataset_update_dataset():
    id = "cii9dtexw0039uelz7nzk1lq3"
    name = "the-name"
    description = "the-description"
    created = """
    {{"owner":"{username}",
      "id":"{id}",
      "name":{name},
      "description":{description},
      "created":"2015-12-16T22:20:38.847Z",
      "modified":"2015-12-16T22:20:38.847Z"}}
    """.format(username=username, id=id, name=name, description=description)

    created = "".join(created.split())

    responses.add(
        responses.PATCH,
        'https://api.mapbox.com/datasets/v1/{0}/{1}?access_token={2}'.format(username, id, access_token),
        match_querystring=True,
        body=created, status=200,
        content_type='application/json')

    runner = CliRunner()
    result = runner.invoke(
        main_group,
        ['--access-token', access_token,
         'datasets',
         'update-dataset', id,
         '--name', name,
         '-d', description])

    assert result.exit_code == 0
    assert result.output.strip() == created.strip() 
示例4
def test_issue_comment_callback(self, config):
        """Test that a snooze label is removed from issues when a new comment
        is received."""
        responses.add(
            responses.PATCH,
            "https://api.github.com/repos/baxterthehacker/public-repo/issues/2")
        r = snooze.github_callback(
            "issue_comment",
            json.loads(github_responses.SNOOZED_ISSUE_COMMENT),
            (config["github_username"], config["github_token"]),
            config["snooze_label"],
            config["ignore_members_of"])
        assert r is True
        assert len(responses.calls) == 1

        org_url = "https://api.github.com/orgs/fellowship/members/baxterthehacker"
        responses.add(responses.GET, org_url, status=204)  # is a member
        r = snooze.github_callback(
            "issue_comment",
            json.loads(github_responses.SNOOZED_ISSUE_COMMENT),
            (config["github_username"], config["github_token"]),
            config["snooze_label"],
            ignore_members_of="fellowship")
        assert r is False

        orc_url = "https://api.github.com/orgs/orcs/members/baxterthehacker"
        responses.add(responses.GET, orc_url, status=404)  # is not a member
        r = snooze.github_callback(
            "issue_comment",
            json.loads(github_responses.SNOOZED_ISSUE_COMMENT),
            (config["github_username"], config["github_token"]),
            config["snooze_label"],
            ignore_members_of="orcs")
        assert r is True 
示例5
def test_pr_commit_comment_callback(self, config):
        """Test that a snooze label is removed from PRs when a new commit is
        pushed."""
        responses.add(
            responses.PATCH,
            "https://api.github.com/repos/octocat/Hello-World/issues/1347")
        responses.add(
            responses.GET,
            "https://api.github.com/repos/baxterthehacker/public-repo/issues/1",
            body=github_responses.SNOOZED_ISSUE_GET)
        r = snooze.github_callback(
            "pull_request_review_comment",
            json.loads(github_responses.PULL_REQUEST_REVIEW_COMMENT),
            (config["github_username"], config["github_token"]),
            config["snooze_label"],
            config["ignore_members_of"])
        assert r is True
        assert len(responses.calls) == 2

        org_url = "https://api.github.com/orgs/fellowship/members/baxterthehacker"
        responses.add(responses.GET, org_url, status=204)  # is a member
        r = snooze.github_callback(
            "pull_request_review_comment",
            json.loads(github_responses.PULL_REQUEST_REVIEW_COMMENT),
            (config["github_username"], config["github_token"]),
            config["snooze_label"],
            ignore_members_of="fellowship")
        assert r is False

        orc_url = "https://api.github.com/orgs/orcs/members/baxterthehacker"
        responses.add(responses.GET, orc_url, status=404)  # is not a member
        r = snooze.github_callback(
            "pull_request_review_comment",
            json.loads(github_responses.PULL_REQUEST_REVIEW_COMMENT),
            (config["github_username"], config["github_token"]),
            config["snooze_label"],
            ignore_members_of="orcs")
        assert r is True 
示例6
def patch(self, url, filename, status=200):
        """Setup a mock response for a PATCH request."""
        body = self._get_body(filename)
        self.add(responses.PATCH, url, body=body, status=status, content_type='application/hal+json') 
示例7
def mock_edit_release(self, body=None, draft=True, prerelease=False):
        if body is None:
            body = "Test release body"
        responses.add(
            method=responses.PATCH,
            url="{}/releases/1".format(self.repo_url),
            json=self._get_expected_release(
                "1", body=body, draft=draft, prerelease=prerelease
            ),
            status=http.client.OK,
        ) 
示例8
def test_activate_some_flow_processes(self):
        cc_task = create_task(
            ActivateFlow,
            {
                "developer_names": [
                    "Auto_Populate_Date_And_Name_On_Program_Engagement",
                    "ape",
                ],
            },
        )
        record_id = "3001F0000009GFwQAM"
        activate_url = "{}/services/data/v43.0/tooling/sobjects/FlowDefinition/{}".format(
            cc_task.org_config.instance_url, record_id
        )
        responses.add(
            method="GET",
            url="https://test.salesforce.com/services/data/v43.0/tooling/query/?q=SELECT+Id%2C+ActiveVersion.VersionNumber%2C+LatestVersion.VersionNumber%2C+DeveloperName+FROM+FlowDefinition+WHERE+DeveloperName+IN+%28%27Auto_Populate_Date_And_Name_On_Program_Engagement%27%2C%27ape%27%29",
            body=json.dumps(
                {
                    "records": [
                        {
                            "Id": record_id,
                            "DeveloperName": "Auto_Populate_Date_And_Name_On_Program_Engagement",
                            "LatestVersion": {"VersionNumber": 1},
                        }
                    ]
                }
            ),
            status=200,
        )
        data = {"Metadata": {"activeVersionNumber": 1}}
        responses.add(method=responses.PATCH, url=activate_url, status=204, json=data)

        cc_task()
        self.assertEqual(2, len(responses.calls)) 
示例9
def test_dataset_update():
    """Updating dataset name and description works."""

    def request_callback(request):
        payload = json.loads(request.body.decode())
        resp_body = {
            'owner': username,
            'id': 'foo',
            'name': payload['name'],
            'description': payload['description'],
            'created': '2015-09-19',
            'modified': '2015-09-19'}
        headers = {}
        return (200, headers, json.dumps(resp_body))

    responses.add_callback(
        responses.PATCH,
        'https://api.mapbox.com/datasets/v1/{0}/{1}?access_token={2}'.format(
            username, 'foo', access_token),
        match_querystring=True,
        callback=request_callback)

    response = Datasets(access_token=access_token).update_dataset(
        'foo', name='things', description='a collection of things')
    assert response.status_code == 200
    assert response.json()['name'] == 'things'
    assert response.json()['description'] == 'a collection of things' 
示例10
def test_update_exam_attempt_status(self, provider_method_name, corresponding_status):
        attempt_id = 2
        responses.add(
            responses.PATCH,
            url=self.provider.exam_attempt_url.format(exam_id=self.backend_exam['external_id'], attempt_id=attempt_id),
            json={'id': 2, 'status': corresponding_status}
        )
        status = getattr(self.provider, provider_method_name)(self.backend_exam['external_id'], attempt_id)
        self.assertEqual(status, corresponding_status) 
示例11
def test_malformed_json_in_response(self):
        attempt_id = 2
        responses.add(
            responses.PATCH,
            url=self.provider.exam_attempt_url.format(exam_id=self.backend_exam['external_id'], attempt_id=attempt_id),
            body='"]'
        )
        status = self.provider.mark_erroneous_exam_attempt(self.backend_exam['external_id'], attempt_id)
        # the important thing is that it didn't raise an exception for bad json
        self.assertEqual(status, None) 
示例12
def test_perform(self):
        with open('LICENSE', 'rb') as stream, responses.RequestsMock() as resps:
            size = stream.tell()
            resps.add(responses.PATCH, self.url,
                      adding_headers={'upload-offset': str(size)},
                      status=204)

            self.request.perform()
            self.assertEqual(str(size), self.request.response_headers['upload-offset']) 
示例13
def test_update_entity(service):
    """Check updating of entity properties"""

    # pylint: disable=redefined-outer-name

    responses.add(
        responses.PATCH,
        "{0}/TemperatureMeasurements(Sensor='sensor1',Date=datetime'2017-12-24T18:00:00')".format(service.url),
        json={'d': {
            'Sensor': 'Sensor-address',
            'Date': "/Date(1714138400000)/",
            'Value': '34.0d'
        }},
        status=204)

    request = service.entity_sets.TemperatureMeasurements.update_entity(
        Sensor='sensor1',
        Date=datetime.datetime(2017, 12, 24, 18, 0))

    assert isinstance(request, pyodata.v2.service.EntityModifyRequest)

    request.set(Value=34.0)
    # Tests if update entity correctly calls 'to_json' method
    request.set(Date=datetime.datetime(2017, 12, 24, 19, 0))

    assert request._values['Value'] == '3.400000E+01'
    assert request._values['Date'] == '/Date(1514142000000)/'

    # If preformatted datetime is passed (e. g. you already replaced datetime instance with string which is
    # complaint with odata specification), 'to_json' does not update given value (for backward compatibility reasons)
    request.set(Date='/Date(1714138400000)/')
    assert request._values['Date'] == '/Date(1714138400000)/'

    request.execute() 
示例14
def test_update_entity_with_patch_method_specified(service):
    """Make sure the method update_entity handles correctly when PATCH method is specified"""

    # pylint: disable=redefined-outer-name


    key = EntityKey(
        service.schema.entity_type('TemperatureMeasurement'),
        Sensor='sensor1',
        Date=datetime.datetime(2017, 12, 24, 18, 0))

    query = service.entity_sets.TemperatureMeasurements.update_entity(key, method="PATCH")
    assert query.get_method() == "PATCH" 
示例15
def test_update_entity_with_no_method_specified(service):
    """Make sure the method update_entity handles correctly when no method is specified"""

    # pylint: disable=redefined-outer-name


    key = EntityKey(
        service.schema.entity_type('TemperatureMeasurement'),
        Sensor='sensor1',
        Date=datetime.datetime(2017, 12, 24, 18, 0))

    query = service.entity_sets.TemperatureMeasurements.update_entity(key)
    assert query.get_method() == "PATCH" 
示例16
def test_update_entity_with_wrong_method_specified(service):
    """Make sure the method update_entity raises ValueError when wrong method is specified"""

    # pylint: disable=redefined-outer-name


    key = EntityKey(
        service.schema.entity_type('TemperatureMeasurement'),
        Sensor='sensor1',
        Date=datetime.datetime(2017, 12, 24, 18, 0))

    with pytest.raises(ValueError) as caught_ex:
        service.entity_sets.TemperatureMeasurements.update_entity(key, method='DELETE')

    assert str(caught_ex.value).startswith('The value "DELETE" is not on the list of allowed Entity Update HTTP Methods: PATCH, PUT, MERGE') 
示例17
def test_aggregate_child_change_notes(self, generator, mock_util, gh_api):
        def request_intercept(request):
            """Assert that the body is correct"""
            body = json.loads(request.body)["body"]
            assert (
                "# Critical Changes\r\n\r\n* Everything works now. [[PR2](https://github.com/TestOwner/TestRepo/pulls/2)]"
                in body
            )
            assert (
                "# Changes\r\n\r\n* Now, more code! [[PR1](https://github.com/TestOwner/TestRepo/pulls/1)]"
                in body
            )
            assert "# Notes From Child PRs\r\n\r\n* Dev note 1 [[PR1](https://github.com/TestOwner/TestRepo/pulls/1)]\r\n\r\n* Dev note 2 [[PR2](https://github.com/TestOwner/TestRepo/pulls/2)]"
            assert (
                "# Pull requests with no release notes\r\n\n* Pull Request #4 [[PR4](https://github.com/TestOwner/TestRepo/pulls/4)]"
                in body
            )
            assert "Should not be in body" not in body
            return (200, {}, json.dumps(self._get_expected_pull_request(3, 3, body)))

        self.init_github()
        dev_note1 = "* Dev note 1"
        dev_note2 = "* Dev note 2"
        change = "# Changes\r\n\r\n* Now, more code!"
        critical_change = "# Critical Changes\r\n\r\n* Everything works now."

        pr1_json = self._get_expected_pull_request(1, 1, dev_note1 + "\r\n" + change)
        pr2_json = self._get_expected_pull_request(
            2, 2, dev_note2 + "\r\n" + critical_change
        )
        pr3_json = self._get_expected_pull_request(4, 4, None)
        pr3_json["body"] = ""

        pr4_json = self._get_expected_pull_request(5, 5, "Should not be in body")
        # simulate merge from master back into parent
        pr4_json["head"]["ref"] = "master"

        mock_util.mock_pulls(pulls=[pr1_json, pr2_json, pr3_json, pr4_json])

        pr_json = self._get_expected_pull_request(3, 3, "Body of Parent PR")
        parent_pr = ShortPullRequest(pr_json, gh_api)
        parent_pr.head.label = "repo:some-other-branch"

        responses.add_callback(
            responses.PATCH,
            "https://github.com/TestOwner/TestRepo/pulls/3",  # TODO: again, no '.api' needed in this endpoint?
            callback=request_intercept,
            content_type="application/json",
        )
        generator.aggregate_child_change_notes(parent_pr) 
示例18
def test_activate_all_flow_processes(self):
        cc_task = create_task(
            ActivateFlow,
            {
                "developer_names": [
                    "Auto_Populate_Date_And_Name_On_Program_Engagement",
                    "ape",
                ]
            },
        )
        record_id = "3001F0000009GFwQAM"
        record_id2 = "3001F0000009GFwQAW"
        activate_url = "{}/services/data/v43.0/tooling/sobjects/FlowDefinition/{}".format(
            cc_task.org_config.instance_url, record_id
        )
        activate_url2 = "{}/services/data/v43.0/tooling/sobjects/FlowDefinition/{}".format(
            cc_task.org_config.instance_url, record_id2
        )
        responses.add(
            method="GET",
            url="https://test.salesforce.com/services/data/v43.0/tooling/query/?q=SELECT+Id%2C+ActiveVersion.VersionNumber%2C+LatestVersion.VersionNumber%2C+DeveloperName+FROM+FlowDefinition+WHERE+DeveloperName+IN+%28%27Auto_Populate_Date_And_Name_On_Program_Engagement%27%2C%27ape%27%29",
            body=json.dumps(
                {
                    "records": [
                        {
                            "Id": record_id,
                            "DeveloperName": "Auto_Populate_Date_And_Name_On_Program_Engagement",
                            "LatestVersion": {"VersionNumber": 1},
                        },
                        {
                            "Id": record_id2,
                            "DeveloperName": "ape",
                            "LatestVersion": {"VersionNumber": 1},
                        },
                    ]
                }
            ),
            status=200,
        )
        data = {"Metadata": {"activeVersionNumber": 1}}
        responses.add(method=responses.PATCH, url=activate_url, status=204, json=data)
        responses.add(method=responses.PATCH, url=activate_url2, status=204, json=data)
        cc_task()
        self.assertEqual(3, len(responses.calls))