Python源码示例:aiohttp.FormData()
示例1
def send_files(self, channel_id, *, files, content=None, tts=False, embed=None, nonce=None, allowed_mentions=None):
r = Route('POST', '/channels/{channel_id}/messages', channel_id=channel_id)
form = aiohttp.FormData()
payload = {'tts': tts}
if content:
payload['content'] = content
if embed:
payload['embed'] = embed
if nonce:
payload['nonce'] = nonce
if allowed_mentions:
payload['allowed_mentions'] = allowed_mentions
form.add_field('payload_json', utils.to_json(payload))
if len(files) == 1:
file = files[0]
form.add_field('file', file.fp, filename=file.filename, content_type='application/octet-stream')
else:
for index, file in enumerate(files):
form.add_field('file%s' % index, file.fp, filename=file.filename, content_type='application/octet-stream')
return self.request(r, data=form, files=files)
示例2
def add_formfields(self):
self.data = aiohttp.FormData()
if isinstance(self.data_old, dict):
for key, value in self.data_old.items():
self.data.add_field(key, value)
elif isinstance(self.data_old, tuple):
for td in self.data_old:
if isinstance(td, tuple):
self.data.add_field(td[0], td[1])
if isinstance(self.files, str):
content_type = self.get_content_type(self.files)
filename = os.path.basename(self.files)
self.data.add_field(filename, open(self.files, 'rb'),
content_type=content_type)
elif isinstance(self.files, tuple):
for file_name in self.files:
content_type = self.get_content_type(file_name)
filename = os.path.basename(file_name)
self.data.add_field(filename, open(file_name, 'rb'),
content_type=content_type)
elif isinstance(self.files, dict):
for file_title, file_name in self.files.items():
content_type = self.get_content_type(file_name)
self.data.add_field(file_title, open(file_name, 'rb'),
content_type=content_type)
示例3
def get_meme(
self, meme: int, boxes: List[Dict[str, str]], username: str, password: str
) -> str:
log.debug(boxes)
try:
form_data = aiohttp.FormData()
form_data.add_field("template_id", meme)
form_data.add_field("username", username)
form_data.add_field("password", password)
i = 0
for box in boxes:
for k, v in box.items():
form_data.add_field(f"boxes[{i}][{k}]", v)
i += 1
async with aiohttp.ClientSession() as session:
async with session.post(CAPTION_URL, data=form_data) as r:
result = await r.json()
if not result["success"]:
raise ImgFlipAPIError(result["error_message"])
except Exception as e:
log.error("Error grabbing meme", exc_info=True)
raise ImgFlipAPIError(e)
return result["data"]["url"]
示例4
def _compose_data(req, **user_kw):
token, method, params, files = req
data = aiohttp.FormData()
if params:
for key,value in params.items():
data.add_field(key, str(value))
if files:
for key,f in files.items():
if isinstance(f, tuple):
if len(f) == 2:
filename, fileobj = f
else:
raise ValueError('Tuple must have exactly 2 elements: filename, fileobj')
else:
filename, fileobj = _guess_filename(f) or key, f
data.add_field(key, fileobj, filename=filename)
return data
示例5
def upload():
data = FormData()
data.add_field('package', open('tests/QXmokuai_3.apk', 'rb'), filename='QXmokuai_3.apk')
data.add_field('msg', 'test upload')
_, response = app.test_client.post('/upload/app', data=data)
response_normal_check(response)
return response
示例6
def test_allows_post_with_url_encoding(client):
data = FormData()
data.add_field("query", "{test}")
response = await client.post(
"/graphql",
data=data(),
headers={"content-type": "application/x-www-form-urlencoded"},
)
assert await response.json() == {"data": {"test": "Hello World"}}
assert response.status == 200
示例7
def request(session, oauth_token, skill_id=None, method=None, json=None,
file=None, request_method='POST', custom_url=None, **kwargs):
"""
Make a request to API
:param session: HTTP Client session
:type session: :obj:`aiohttp.ClientSession`
:param oauth_token: oauth_token
:type oauth_token: :obj:`str`
:param skill_id: skill_id. Optional. Not used if custom_url is provided
:type skill_id: :obj:`str`
:param method: API method. Optional. Not used if custom_url is provided
:type method: :obj:`str`
:param json: request payload
:type json: :obj: `dict`
:param file: file
:type file: :obj: `io.BytesIO`
:param request_method: API request method
:type request_method: :obj:`str`
:param custom_url: Yandex has very developer UNfriendly API, so some endpoints cannot be achieved by standatd template.
:type custom_url: :obj:`str`
:return: result
:rtype: ::obj:`dict`
"""
log.debug("Making a `%s` request to %r with json `%r` or file `%r`",
request_method, method, json, file)
if custom_url is None:
url = Methods.api_url(skill_id, method)
else:
url = custom_url
headers = {'Authorization': oauth_token}
data = None
if file:
data = aiohttp.FormData()
data.add_field('file', file)
try:
async with session.request(request_method, url, json=json, data=data, headers=headers, **kwargs) as response:
return await _check_result(response)
except aiohttp.ClientError as e:
raise exceptions.NetworkError(f"aiohttp client throws an error: {e.__class__.__name__}: {e}")
示例8
def test_predict(self, api: ApiConfig, client: aiohttp.ClientSession, image_file_obj: io.BytesIO) -> None:
predict_url = api.model_base_url + "/predict"
data = FormData()
data.add_field("file", image_file_obj, filename="test_image.jpg", content_type="image/img")
async with client.post(predict_url, data=data) as response:
assert response.status == HTTPOk.status_code
res_data = await response.json()
assert "mean_score" in res_data
assert "std_score" in res_data
assert "scores" in res_data
assert "total_time" in res_data
示例9
def put(session, fid: str, content: bytes):
url = f'{seaweedfs_url}/{fid}'
data = aiohttp.FormData()
data.add_field(
'file',
content,
content_type='application/gzip'
)
async with session.put(url, data=data) as response:
result = await response.read()
return response.status, result
示例10
def upload_audio_message(api, multipart_data, peer_id):
"""Upload audio file `multipart_data` and return Attachment for sending to user with id `peer_id`(possibly)"""
sender = api.get_default_sender("docs.getMessagesUploadServer")
client = api.get_current_sender("docs.getMessagesUploadServer", sender=sender)
data = aiohttp.FormData()
data.add_field('file', multipart_data, filename="message.mp3", content_type='multipart/form-data')
values = {'type': "audio_message", 'peer_id': peer_id}
if client.group_id:
values['group_id'] = client.group_id
response = await api(sender=sender).docs.getMessagesUploadServer(**values)
if not response or not response.get('upload_url'):
return None
upload_url = response['upload_url']
async with aiohttp.ClientSession() as sess:
async with sess.post(upload_url, data=data) as resp:
result = json.loads(await resp.text())
if not result:
return None
data = dict(file=result['file'])
result = await api(sender=sender).docs.save(**data)
if not result:
return None
return Attachment.from_upload_result(result[0], "doc")
示例11
def upload_doc(api, multipart_data, filename="image.png", additional_params=None):
"""Upload file `multipart_data` and return Attachment for sending to user"""
if additional_params is None:
additional_params = {}
sender = api.get_default_sender("docs.getWallUploadServer")
client = api.get_current_sender("docs.getWallUploadServer", sender=sender)
data = aiohttp.FormData()
data.add_field('file', multipart_data, filename=filename,
content_type='multipart/form-data')
values = {}
values.update(additional_params)
if client.group_id:
values['group_id'] = client.group_id
response = await api(sender=sender).docs.getWallUploadServer(**values)
if not response or not response.get('upload_url'):
return None
upload_url = response['upload_url']
async with aiohttp.ClientSession() as sess:
async with sess.post(upload_url, data=data) as resp:
result = json.loads(await resp.text())
if not result or not result.get("file"):
print(result)
return None
data = dict(file=result['file'])
result = await api(sender=sender).docs.save(**data)
if not result:
return None
return Attachment.from_upload_result(result[0], "doc")
示例12
def upload_photo(api, multipart_data, peer_id=None):
"""Upload photo file `multipart_data` and return Attachment for sending to
user with id `peer_id`(optional but recommended)"""
sender = api.get_default_sender('photos.getMessagesUploadServer')
data = aiohttp.FormData()
data.add_field('photo', multipart_data, filename='picture.png',
content_type='multipart/form-data')
if peer_id:
kwargs = {"peer_id": peer_id}
else:
kwargs = {}
response = await api(sender=sender).photos.getMessagesUploadServer(**kwargs)
if not response or not response.get('upload_url'):
return None
upload_url = response['upload_url']
async with aiohttp.ClientSession() as sess:
async with sess.post(upload_url, data=data) as resp:
result = json.loads(await resp.text())
if not result:
return None
result = await api(sender=sender).photos.saveMessagesPhoto(
**{'photo': result['photo'], 'hash': result['hash'], 'server': result['server']})
if not result:
return None
return Attachment.from_upload_result(result[0])
示例13
def do_retro(self, text, bcg):
if '|' not in text:
if len(text) >= 15:
text = [text[i:i + 15] for i in range(0, len(text), 15)]
else:
split = text.split()
if len(split) == 1:
text = [x for x in text]
if len(text) == 4:
text[2] = text[2]+text[-1]
del text[3]
else:
text = split
else:
text = text.split('|')
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:43.0) Gecko/20100101 Firefox/43.0'}
payload = aiohttp.FormData()
payload.add_field('current-category', 'all_effects')
payload.add_field('bcg', bcg)
payload.add_field('txt', '4')
count = 1
for s in text:
if count > 3:
break
payload.add_field('text'+str(count), s.replace("'", "\'"))
count += 1
try:
with aiohttp.ClientSession() as session:
with aiohttp.Timeout(5):
async with session.post('https://photofunia.com/effects/retro-wave?server=3', data=payload, headers=headers) as r:
txt = await r.text()
except asyncio.TimeoutError:
return
match = self.retro_regex.findall(txt)
if match:
download_url = match[0][0]
b = await self.bytes_download(download_url)
return b
return False
示例14
def upload_log(self, file, user):
params = {"json": 1}
token = await self.get_dpsreport_usertoken(user)
if token:
params["userToken"] = token
data = aiohttp.FormData()
data.add_field("file", await file.read(), filename=file.filename)
async with self.session.post(
UPLOAD_URL, data=data, params=params) as r:
resp = await r.json()
error = resp["error"]
if error:
raise APIError(error)
return resp
示例15
def make_mulitpart_form(self, fields, aio=False):
"""Create a multipart form to be used across the Symphony API, that works for both requests
and the asynchronous aiohttp. Requests basically uses requests-toolbelt, but it's a little
bit more involved for aiohttp. The output of this is expected to be passed to either
execute_rest_request or execute_rest_request_async depending whether aio was true"""
if aio:
# This appears to be the canonical way to use aiohttp to pass mulipart data into the API
# in the same way that MultipartEncoder does for Requests.
# aiohttp.FormData does appear to work because of the way the Symphony API demands a boundary
# in the header. aiohttp.MultipartWriter.append_form doesn't appear to work because it
# encodes as a application/x-www-form-urlencoded that Symphony doesn't appear to like for
# attachments
with aiohttp.MultipartWriter("form-data") as data:
for k, v in fields.items():
if len(v) == 1:
part = data.append(v)
part.set_content_disposition("form-data", name=k)
if len(v) == 3:
filename, file_object, content_type = v
part = data.append(file_object, {'Content-Type': content_type})
part.set_content_disposition('form-data', name=k, filename=filename)
headers = {
'Content-Type': 'multipart/form-data; boundary=' + data.boundary
}
else:
print(fields)
data = MultipartEncoder(
fields=fields
)
headers = {
'Content-Type': data.content_type
}
return {"data": data, "headers": headers}
示例16
def scan_file_async(self, file, wait_for_completion=False):
"""Like :func:`scan_file` but returns a coroutine."""
# The snippet below could be replaced with this simpler code:
#
# form_data = aiohttp.FormData()
# form_data.add_field('file', file)
#
# However, aiohttp.FormData assumes that the server supports RFC 5987 and
# send a Content-Disposition like:
#
# 'form-data; name="file"; filename="foobar"; filename*=UTF-8''foobar
#
# AppEngine's upload handler doesn't like the filename*=UTF-8''foobar field
# and fails with this Content-Disposition header.
part = aiohttp.get_payload(file)
filename = file.name if hasattr(file, 'name') else 'unknown'
disposition = 'form-data; name="file"; filename="{}"'.format(filename)
part.headers['Content-Disposition'] = disposition
form_data = aiohttp.MultipartWriter('form-data')
form_data.append_payload(part)
upload_url = await self.get_data_async('/files/upload_url')
response = ClientResponse(
await self._get_session().post(upload_url, data=form_data))
analysis = await self._response_to_object(response)
if wait_for_completion:
analysis = await self._wait_for_analysis_completion(analysis)
return analysis
示例17
def scan_url_async(self, url, wait_for_completion=False):
"""Like :func:`scan_url` but returns a coroutine."""
form_data = aiohttp.FormData()
form_data.add_field('url', url)
response = ClientResponse(
await self._get_session().post(self._full_url('/urls'), data=form_data))
analysis = await self._response_to_object(response)
if wait_for_completion:
analysis = await self._wait_for_analysis_completion(analysis)
return analysis
示例18
def search_nomae_async(self, def_lst: list, region: int) -> List[Solution]:
if region == 2 or region == 3:
raise RuntimeError('当前搜索模式下无法执行此类查询')
headers = {
'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/78.0.3904.87 Safari/537.36'),
'X-From': 'https://nomae.net/arenadb/',
'Authority': 'nomae.net',
}
req = aiohttp.FormData()
req.add_field('type', 'search')
req.add_field('userid', 0)
req.add_field('public', 1)
for _, jpname in def_lst:
req.add_field('def[]', jpname)
req.add_field('page', 0)
req.add_field('sort', 0)
retry = 2
while retry >= 0:
retry -= 1
try:
async with aiohttp.request(
'POST',
'https://nomae.net/princess_connect/public/_arenadb/receive.php',
headers=headers,
data=req) as resp:
restxt = await resp.text()
except aiohttp.ClientError as e:
raise RuntimeError('错误'+str(e))
try:
receive = json.loads(restxt)
except json.JSONDecodeError:
continue
return list(map(self._parse_nomae_team, receive))
raise RuntimeError('服务器错误,请稍后再试')
示例19
def request(self, verb, url, payload=None, multipart=None, *, files=None, reason=None):
headers = {}
data = None
files = files or []
if payload:
headers['Content-Type'] = 'application/json'
data = utils.to_json(payload)
if reason:
headers['X-Audit-Log-Reason'] = _uriquote(reason, safe='/ ')
if multipart:
data = aiohttp.FormData()
for key, value in multipart.items():
if key.startswith('file'):
data.add_field(key, value[1], filename=value[0], content_type=value[2])
else:
data.add_field(key, value)
for tries in range(5):
for file in files:
file.reset(seek=tries)
async with self.session.request(verb, url, headers=headers, data=data) as r:
# Coerce empty strings to return None for hygiene purposes
response = (await r.text(encoding='utf-8')) or None
if r.headers['Content-Type'] == 'application/json':
response = json.loads(response)
# check if we have rate limit header information
remaining = r.headers.get('X-Ratelimit-Remaining')
if remaining == '0' and r.status != 429:
delta = utils._parse_ratelimit_header(r)
await asyncio.sleep(delta)
if 300 > r.status >= 200:
return response
# we are being rate limited
if r.status == 429:
retry_after = response['retry_after'] / 1000.0
await asyncio.sleep(retry_after)
continue
if r.status in (500, 502):
await asyncio.sleep(1 + tries * 2)
continue
if r.status == 403:
raise Forbidden(r, response)
elif r.status == 404:
raise NotFound(r, response)
else:
raise HTTPException(r, response)
# no more retries
raise HTTPException(r, response)
示例20
def call(
self,
method: str,
data: Dict[str, Any] = None,
*,
token: str = None,
json_mode: bool = False,
) -> APIResponse:
"""Call API methods."""
async with aiohttp.ClientSession() as session:
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
}
payload: Union[str, aiohttp.FormData]
if json_mode:
payload = json.dumps(data)
headers['Content-Type'] = 'application/json'
headers['Authorization'] = 'Bearer {}'.format(
token or self.config.TOKEN
)
else:
payload = aiohttp.FormData(data or {})
payload.add_field('token', token or self.config.TOKEN)
try:
async with session.post(
'https://slack.com/api/{}'.format(method),
data=payload,
headers=headers,
) as response:
try:
result = await response.json(loads=json.loads)
except ContentTypeError:
result = await response.text()
return APIResponse(
body=result,
status=response.status,
headers=response.headers,
)
except ClientConnectorError:
raise APICallError(
'fail to call {} with {}'.format(method, data)
)
示例21
def request_webhook(self, partialurl, content=None, username=None,
avatar_url=None, tts=False, file=None, embeds=None,
filename=None):
"""Requests an webhook with the data provided to this function."""
if self.create_form_data:
self.create_form_data = False
self.partialurl = partialurl
self.content = content
self.username = username
self.avatar_url = avatar_url
self.tts = tts
self.file = file
self.embeds = embeds
if filename is None:
filename = 'image.jpg'
if self.partialurl is not None:
if self.content is not None:
self.payload['content'] = self.content
if self.username is not None:
self.payload['username'] = self.username
if self.avatar_url is not None:
self.payload['avatar_url'] = self.avatar_url
if self.tts:
self.payload['tts'] = self.tts
if self.file is not None:
self.create_form_data = True
if self.embeds is not None:
self.payload['embeds'] = self.embeds
if self.create_form_data:
self.form = aiohttp.FormData()
self.form.add_field('payload_json', discord.utils.to_json(self.payload))
self.form.add_field('file', self.file, filename=filename, content_type='multipart/form-data')
yield from self.http.request(
WebHookRoute(
'POST',
self.partialurl),
data=self.form)
else:
yield from self.http.request(
WebHookRoute(
'POST',
self.partialurl),
json=self.payload)
示例22
def __request(self, method, api, **kwargs):
method = self.methods[method]
kwargs.setdefault('params', {})
kwargs.setdefault('timeout', None)
if self.token:
kwargs['params']['token'] = self.token
kwargs['params'] = urlencode(kwargs['params'], doseq=True)
if method == 'POST':
files = kwargs.pop('files', None)
if files is not None:
data = kwargs.pop('data', {})
_data = aiohttp.FormData()
for k, v in files.items():
_data.add_field(k, open(v.name, 'rb'))
for k, v in data.items():
if v is not None:
_data.add_field(k, str(v))
kwargs['data'] = _data
_url = slacker.API_BASE_URL.format(api=api)
_request = self.session.request(method, _url, **kwargs)
_response = None
try:
with async_timeout.timeout(self.timeout, loop=self.loop):
_response = yield from _request
_response.raise_for_status()
text = yield from _response.text()
finally:
if _response is not None:
yield from _response.release()
response = Response(text)
if not response.successful:
raise Error(response.error)
return response
示例23
def do_retro(self, text, bcg):
if "|" not in text:
if len(text) >= 15:
text = [text[i : i + 15] for i in range(0, len(text), 15)]
else:
split = text.split()
if len(split) == 1:
text = [x for x in text]
if len(text) == 4:
text[2] = text[2] + text[-1]
del text[3]
else:
text = split
else:
text = text.split("|")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:43.0) Gecko/20100101 Firefox/43.0"
}
payload = aiohttp.FormData()
payload.add_field("current-category", "all_effects")
payload.add_field("bcg", bcg)
payload.add_field("txt", "4")
count = 1
for s in text:
if count > 3:
break
payload.add_field("text" + str(count), s.replace("'", '"'))
count += 1
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://photofunia.com/effects/retro-wave?guild=3",
data=payload,
headers=headers,
) as r:
txt = await r.text()
except Exception:
return
match = self.retro_regex.findall(txt)
if match:
download_url = match[0][0]
b, mime = await self.bytes_download(download_url)
return b
return False