Python源码示例:aiohttp.DummyCookieJar()

示例1
def test_aiohttp_extra_args(event_loop, aiohttp_server):
    async def handler(request):
        return web.Response(text=query1_server_answer, content_type="application/json")

    app = web.Application()
    app.router.add_route("POST", "/", handler)
    server = await aiohttp_server(app)

    url = server.make_url("/")

    # passing extra arguments to aiohttp.ClientSession
    jar = DummyCookieJar()
    sample_transport = AIOHTTPTransport(
        url=url, timeout=10, client_session_args={"version": "1.1", "cookie_jar": jar}
    )

    async with Client(transport=sample_transport,) as session:

        query = gql(query1_str)

        # Passing extra arguments to the post method of aiohttp
        result = await session.execute(query, extra_args={"allow_redirects": False})

        continents = result["continents"]

        africa = continents[0]

        assert africa["code"] == "AF" 
示例2
def start(self):
        """Start the outbound transport."""
        self.client_session = ClientSession(cookie_jar=DummyCookieJar())
        return self 
示例3
def start(self):
        """Start the transport."""
        session_args = {}
        self.connector = TCPConnector(limit=200, limit_per_host=50)
        if self.collector:
            session_args["trace_configs"] = [
                StatsTracer(self.collector, "outbound-http:")
            ]
        session_args["cookie_jar"] = DummyCookieJar()
        session_args["connector"] = self.connector
        self.client_session = ClientSession(**session_args)
        return self 
示例4
def async_init(self):
        """
        初始化
        """
        cookie_jar = aiohttp.DummyCookieJar()
        conn = aiohttp.TCPConnector(limit=0)
        timeout = aiohttp.ClientTimeout(
            total=Config().get_config("scanner.request_timeout"))
        self.session = aiohttp.ClientSession(
            cookie_jar=cookie_jar,
            connector=conn,
            timeout=timeout
        )