Python源码示例:xmltodict.parse()
示例1
def _handle_result(self, res):
res.encoding = "utf-8-sig"
xml = res.text
logger.debug("Response from WeChat API \n %s", xml)
try:
data = xmltodict.parse(xml)["xml"]
except (xmltodict.ParsingInterrupted, ExpatError):
# 解析 XML 失败
logger.debug("WeChat payment result xml parsing error", exc_info=True)
return xml
return_code = data["return_code"]
return_msg = data.get("return_msg", data.get("retmsg"))
result_code = data.get("result_code", data.get("retcode"))
errcode = data.get("err_code")
errmsg = data.get("err_code_des")
if return_code != "SUCCESS" or result_code != "SUCCESS":
# 返回状态码不为成功
raise WeChatPayException(
return_code, result_code, return_msg, errcode, errmsg, client=self, request=res.request, response=res,
)
return data
示例2
def parse_message(self, msg, msg_signature, timestamp, nonce):
"""
处理 wechat server 推送消息
:params msg: 加密内容
:params msg_signature: 消息签名
:params timestamp: 时间戳
:params nonce: 随机数
"""
content = self.crypto.decrypt_message(msg, msg_signature, timestamp, nonce)
message = xmltodict.parse(to_text(content))["xml"]
message_type = message["InfoType"].lower()
message_class = COMPONENT_MESSAGE_TYPES.get(message_type, ComponentUnknownMessage)
msg = message_class(message)
if msg.type == "component_verify_ticket":
self.session.set(msg.type, msg.verify_ticket)
elif msg.type in ("authorized", "updateauthorized"):
msg.query_auth_result = self.query_auth(msg.authorization_code)
return msg
示例3
def get_timestamp_from_date_string(date_string):
""" parse a date string into unix epoch (ms) """
if 'strip_tz' in agent_config_vars and agent_config_vars['strip_tz']:
date_string = ''.join(agent_config_vars['strip_tz_fmt'].split(date_string))
if 'timestamp_format' in agent_config_vars:
if agent_config_vars['timestamp_format'] == 'epoch':
timestamp_datetime = get_datetime_from_unix_epoch(date_string)
else:
timestamp_datetime = datetime.strptime(date_string, agent_config_vars['timestamp_format'])
else:
try:
timestamp_datetime = dateutil.parse.parse(date_string)
except:
timestamp_datetime = get_datetime_from_unix_epoch(date_string)
agent_config_vars['timestamp_format'] = 'epoch'
return get_timestamp_from_datetime(timestamp_datetime)
示例4
def _add_batch(self, catalog_entry, job_id, start_date, order_by_clause=True):
endpoint = "job/{}/batch".format(job_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
body = self.sf._build_query_string(catalog_entry, start_date, order_by_clause=order_by_clause)
headers = self._get_bulk_headers()
headers['Content-Type'] = 'text/csv'
with metrics.http_request_timer("add_batch") as timer:
timer.tags['sobject'] = catalog_entry['stream']
resp = self.sf._make_request('POST', url, headers=headers, body=body)
batch = xmltodict.parse(resp.text)
return batch['batchInfo']['id']
示例5
def _url_status(url):
parse_obj = urllib.parse.urlparse(url)
timer = 1
for i in range(6):
try:
connection = http.client.HTTPConnection(parse_obj.netloc)
connection.request('HEAD', parse_obj.path)
break
except Exception as e:
print(url, e, 'sleep', timer)
time.sleep(timer)
timer *= 2
else:
return e
response = connection.getresponse()
connection.close()
return response.status
示例6
def generate_references(self):
self.zip_file = zipfile.ZipFile(
self.book_filename, mode='r', allowZip64=True)
self.file_list = self.zip_file.namelist()
# Book structure relies on parsing the .opf file
# in the book. Now that might be the usual content.opf
# or package.opf or it might be named after your favorite
# eldritch abomination. The point is we have to check
# the container.xml
container = self.find_file('container.xml')
if container:
container_xml = self.zip_file.read(container)
container_dict = xmltodict.parse(container_xml)
packagefile = container_dict['container']['rootfiles']['rootfile']['@full-path']
else:
presumptive_names = ('content.opf', 'package.opf', 'volume.opf')
for i in presumptive_names:
packagefile = self.find_file(i)
if packagefile:
logger.info('Using presumptive package file: ' + self.book_filename)
break
packagefile_data = self.zip_file.read(packagefile)
self.opf_dict = xmltodict.parse(packagefile_data)
示例7
def get_chapter_content(self, chapter_file):
this_file = self.find_file(chapter_file)
if this_file:
chapter_content = self.zip_file.read(this_file).decode()
# Generate a None return for a blank chapter
# These will be removed from the contents later
contentDocument = QtGui.QTextDocument(None)
contentDocument.setHtml(chapter_content)
contentText = contentDocument.toPlainText().replace('\n', '')
if contentText == '':
chapter_content = None
return chapter_content
else:
return 'Possible parse error: ' + chapter_file
示例8
def _createWorkitem(self, url_post, workitem_raw):
headers = copy.deepcopy(self.headers)
headers['Content-Type'] = self.OSLC_CR_XML
resp = self.post(url_post, verify=False,
headers=headers, proxies=self.proxies,
data=workitem_raw)
raw_data = xmltodict.parse(resp.content)
workitem_raw = raw_data["oslc_cm:ChangeRequest"]
workitem_id = workitem_raw["dc:identifier"]
workitem_url = "/".join([self.url,
"oslc/workitems/%s" % workitem_id])
new_wi = Workitem(workitem_url,
self,
workitem_id=workitem_id,
raw_data=raw_data["oslc_cm:ChangeRequest"])
self.log.info("Successfully create <Workitem %s>" % new_wi)
return new_wi
示例9
def get_qos_class_map(netconf_handler):
'''
This procedure takes in the netconf handler for the switch and read what is the QoS Softmax Multiplier
Procedure returns True if configuration successful, else returns False
'''
netconf_reply = netconf_handler.get_config( source='running', filter=('xpath', "/native/policy/class-map"))
data = xmltodict.parse(netconf_reply.xml)
#print (data)
class_map_names = []
for cmap in (data["rpc-reply"]["data"]["native"]["policy"]["class-map"]):
class_map_names.append(cmap["name"])
return class_map_names
示例10
def get_qos_policy_map(netconf_handler):
'''
This procedure takes in the netconf handler for the switch and read what is the QoS Softmax Multiplier
Procedure returns True if configuration successful, else returns False
'''
netconf_reply = netconf_handler.get_config( source='running', filter=('xpath', "/native/policy/policy-map"))
data = xmltodict.parse(netconf_reply.xml)
policy_map_names = []
for pmap in (data["rpc-reply"]["data"]["native"]["policy"]["policy-map"]):
if (pmap["name"]) != "system-cpp-policy":
class_maps_in_policy = " Contains Class maps: "
for cmap in (pmap["class"]):
class_maps_in_policy = class_maps_in_policy + cmap["name"] + " "
policy_map_names.append("Policy map: " + pmap["name"] + class_maps_in_policy)
return policy_map_names
示例11
def get_pytornado_settings_from_CPACS(cpacs_in_path):
""" Try to read PyTornado settings from CPACS
Note:
* Returns None if PyTornado settings not found in CPACS
Args:
cpacs_in_path (str): Path to CPACS file
Returns:
cpacs_settings (dict): PyTornado settings dictionary read from CPACS
"""
with open(cpacs_in_path, "r") as fp:
cpacs_as_dict = xml.parse(fp.read())
cpacs_settings = cpacs_as_dict.get('cpacs', {}).get('toolspecific', {}).get('pytornado', None)
parse_pytornado_settings_dict(cpacs_settings)
return cpacs_settings
示例12
def get_external_ip(soap_url) -> str:
"""get external ip address"""
s_o_a_p = '<?xml version="1.0"?>\r\n'
s_o_a_p += '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle=' \
'"http://schemas.xmlsoap.org/soap/encoding/">\r\n'
s_o_a_p += '<s:Body>\r\n'
s_o_a_p += '<u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANPPPConnection:1">\r\n'
s_o_a_p += '</u:GetExternalIPAddress>\r\n'
s_o_a_p += '</s:Body>\r\n'
s_o_a_p += '</s:Envelope>\r\n'
try:
req = Request(soap_url)
req.add_header('Content-Type', 'text/xml; charset="utf-8"')
req.add_header('SOAPACTION', '"urn:schemas-upnp-org:service:WANPPPConnection:1#GetExternalIPAddress"')
req.data = s_o_a_p.encode('utf8')
result = xmltodict.parse(urlopen(req).read().decode())
return result['s:Envelope']['s:Body']['u:GetExternalIPAddressResponse']['NewExternalIPAddress']
except Exception:
log.debug("get_external_ip exception", exc_info=True)
示例13
def __init__(self, file_name):
self.data = []
content = open(file_name, encoding="utf-8").read()
is_test_file = file_name.split("/")[-1] == "testdata_with_lex.xml"
structure = xmltodict.parse(content)
for i, entry in enumerate(structure["benchmark"]["entries"]["entry"]):
triplets = [tuple(map(str.strip, r.split("|"))) for r in
self.triplets_from_object(entry["modifiedtripleset"], "mtriple")]
sentences = list(self.extract_sentences(entry["lex"]))
for s in sentences:
info = {
"id": i,
"seen": not is_test_file or i <= 970,
"manual": is_test_file and i + 1 in FOR_MANUAL_EVAL and i <= 970
}
self.data.append(Datum(rdfs=triplets, text=s, info=info))
示例14
def parse_nmap_xml(nmap_output_file, protocol):
targets = []
with open(nmap_output_file, 'r') as file_handle:
scan_output = xmltodict.parse(file_handle.read())
for host in scan_output['nmaprun']['host']:
if host['address'][0]['@addrtype'] != 'ipv4':
continue
ip = host['address'][0]['@addr']
for port in host['ports']['port']:
if port['state']['@state'] == 'open':
if 'service' in port and (port['service']['@name'] in protocol_dict[protocol]['services']):
if ip not in targets:
targets.append(ip)
elif port['@portid'] in protocol_dict[protocol]['ports']:
if ip not in targets:
targets.append(ip)
return targets
示例15
def _fetch_sandbox_api_key(self):
nonce_str = random_string(32)
sign = calculate_signature({"mch_id": self.mch_id, "nonce_str": nonce_str}, self.api_key)
payload = dict_to_xml({"mch_id": self.mch_id, "nonce_str": nonce_str,}, sign=sign)
headers = {"Content-Type": "text/xml"}
api_url = f"{self.API_BASE_URL}sandboxnew/pay/getsignkey"
response = self._http.post(api_url, data=payload, headers=headers)
return xmltodict.parse(response.text)["xml"].get("sandbox_signkey")
示例16
def get_payment_data(cls, xml):
"""
解析微信支付结果通知,获得appid, mch_id, out_trade_no, transaction_id
如果你需要进一步判断,请先用appid, mch_id来生成WeChatPay,
然后用`wechatpay.parse_payment_result(xml)`来校验支付结果
使用示例::
from wechatpy.pay import WeChatPay
# 假设你已经获取了微信服务器推送的请求中的xml数据并存入xml变量
data = WeChatPay.get_payment_appid(xml)
{
"appid": "公众号或者小程序的id",
"mch_id": "商户id",
}
"""
try:
data = xmltodict.parse(xml)
except (xmltodict.ParsingInterrupted, ExpatError):
raise ValueError("invalid xml")
if not data or "xml" not in data:
raise ValueError("invalid xml")
return {
"appid": data["appid"],
"mch_id": data["mch_id"],
"out_trade_no": data["out_trade_no"],
"transaction_id": data["transaction_id"],
}
示例17
def parse_payment_result(self, xml):
"""解析微信支付结果通知"""
try:
data = xmltodict.parse(xml)
except (xmltodict.ParsingInterrupted, ExpatError):
raise InvalidSignatureException()
if not data or "xml" not in data:
raise InvalidSignatureException()
data = data["xml"]
sign = data.pop("sign", None)
real_sign = calculate_signature(data, self.api_key if not self.sandbox else self.sandbox_api_key)
if sign != real_sign:
raise InvalidSignatureException()
for key in (
"total_fee",
"settlement_total_fee",
"cash_fee",
"coupon_fee",
"coupon_count",
):
if key in data:
data[key] = int(data[key])
data["sign"] = sign
return data
示例18
def _decrypt_message(self, msg, signature, timestamp, nonce, crypto_class=None):
if not isinstance(msg, dict):
import xmltodict
msg = xmltodict.parse(to_text(msg))["xml"]
encrypt = msg["Encrypt"]
_signature = _get_signature(self.token, timestamp, nonce, encrypt)
if _signature != signature:
raise InvalidSignatureException()
pc = crypto_class(self.key)
return pc.decrypt(encrypt, self._id)
示例19
def parse_message(xml):
if not xml:
return
message = xmltodict.parse(to_text(xml))["xml"]
message_type = message["MsgType"].lower()
if message_type == "event":
event_type = message["Event"].lower()
message_class = EVENT_TYPES.get(event_type, UnknownMessage)
else:
message_class = MESSAGE_TYPES.get(message_type, UnknownMessage)
return message_class(message)
示例20
def deserialize_reply(xml, update_time=False):
"""
反序列化被动回复
:param xml: 待反序列化的xml
:param update_time: 是否用当前时间替换xml中的时间
:raises ValueError: 不能辨识的reply xml
:rtype: wechatpy.replies.BaseReply
"""
if not xml:
return EmptyReply()
try:
reply_dict = xmltodict.parse(xml)["xml"]
msg_type = reply_dict["MsgType"]
except (xmltodict.expat.ExpatError, KeyError):
raise ValueError("bad reply xml")
if msg_type not in REPLY_TYPES:
raise ValueError("unknown reply type")
cls = REPLY_TYPES[msg_type]
kwargs = dict()
for attr, field in cls._fields.items():
if field.name in reply_dict:
str_value = reply_dict[field.name]
kwargs[attr] = field.from_xml(str_value)
if update_time:
kwargs["time"] = time.time()
return cls(**kwargs)
示例21
def parse_message(xml):
"""
解析微信服务器推送的 XML 消息
:param xml: XML 消息
:return: 解析成功返回对应的消息或事件,否则返回 ``UnknownMessage``
"""
if not xml:
return
message = xmltodict.parse(to_text(xml))["xml"]
message_type = message["MsgType"].lower()
event_type = None
if message_type == "event" or message_type.startswith("device_"):
if "Event" in message:
event_type = message["Event"].lower()
# special event type for device_event
if event_type is None and message_type.startswith("device_"):
event_type = message_type
elif message_type.startswith("device_"):
event_type = f"device_{event_type}"
if event_type == "subscribe" and message.get("EventKey"):
event_key = message["EventKey"]
if event_key.startswith(("scanbarcode|", "scanimage|")):
event_type = "subscribe_scan_product"
message["Event"] = event_type
elif event_key.startswith("qrscene_"):
# Scan to subscribe with scene id event
event_type = "subscribe_scan"
message["Event"] = event_type
message["EventKey"] = event_key[len("qrscene_") :]
message_class = EVENT_TYPES.get(event_type, UnknownMessage)
else:
message_class = MESSAGE_TYPES.get(message_type, UnknownMessage)
return message_class(message)
示例22
def test_decrypt_message(self):
xml = """<xml><ToUserName><![CDATA[wx49f0ab532d5d035a]]></ToUserName>
<Encrypt><![CDATA[RgqEoJj5A4EMYlLvWO1F86ioRjZfaex/gePD0gOXTxpsq5Yj4GNglrBb8I2BAJVODGajiFnXBu7mCPatfjsu6IHCrsTyeDXzF6Bv283dGymzxh6ydJRvZsryDyZbLTE7rhnus50qGPMfp2wASFlzEgMW9z1ef/RD8XzaFYgm7iTdaXpXaG4+BiYyolBug/gYNx410cvkKR2/nPwBiT+P4hIiOAQqGp/TywZBtDh1yCF2KOd0gpiMZ5jSw3e29mTvmUHzkVQiMS6td7vXUaWOMZnYZlF3So2SjHnwh4jYFxdgpkHHqIrH/54SNdshoQgWYEvccTKe7FS709/5t6NMxuGhcUGAPOQipvWTT4dShyqio7mlsl5noTrb++x6En749zCpQVhDpbV6GDnTbcX2e8K9QaNWHp91eBdCRxthuL0=]]></Encrypt>
<AgentID><![CDATA[1]]></AgentID>
</xml>"""
signature = "74d92dfeb87ba7c714f89d98870ae5eb62dff26d"
timestamp = "1411525903"
nonce = "461056294"
crypto = WeChatCrypto(self.token, self.encoding_aes_key, self.corp_id)
msg = crypto.decrypt_message(xml, signature, timestamp, nonce)
msg_dict = xmltodict.parse(msg)["xml"]
self.assertEqual("test", msg_dict["Content"])
self.assertEqual("messense", msg_dict["FromUserName"])
示例23
def xml_to_dict(xml, listpaths=None, **kwargs):
listpaths = listpaths or []
opts = dict(
attr_prefix = '',
dict_constructor = dict,
postprocessor = lambda p,k,d: (k.split(':')[1] if ':' in k else k,d)
)
opts.update(kwargs)
content = xmltodict.parse(xml, **opts)
for listpath in listpaths:
pathdata = rget(content, listpath, [])
pathdata = pathdata if isinstance(pathdata, list) else [pathdata]
rset(content, listpath, pathdata)
return content
示例24
def parse_xml(self, ret, method=None):
if method is None:
method = self._xmlparser
if method == 'EUtilsParser':
ret = self.easyXML(ret)
return EUtilsParser(ret)
elif method == 'objectify': # used in docstrings
from bioservices.xmltools import XMLObjectify
return XMLObjectify(ret)
elif method == 'dict':
import xmltodict
return xmltodict.parse(ret)
示例25
def ECitMatch(self, bdata, **kargs):
r"""
:param bdata: Citation strings. Each input citation must
be represented by a citation string in the following format::
journal_title|year|volume|first_page|author_name|your_key|
Multiple citation strings may be provided by separating the
strings with a carriage return character (%0D) or simply \\r or \\n.
The your_key value is an arbitrary label provided by the user
that may serve as a local identifier for the citation,
and it will be included in the output.
all spaces must be replaced by + symbols and that citation
strings should end with a final vertical bar |.
Only xml supported at the time of this implementation.
::
from bioservices import EUtils
s = EUtils()
print(s.ECitMatch("proc+natl+acad+sci+u+s+a|1991|88|3248|mann+bj|Art1|%0Dscience|1987|235|182|palmenberg+ac|Art2|"))
"""
# Fixes https://github.com/cokelaer/bioservices/issues/169
from urllib.parse import unquote
params = {'bdata': unquote(bdata), "retmode": "xml"}
# note here, we use .cgi not .fcgi
query = "ecitmatch.cgi?db=pubmed&retmode=xml"
ret = self.http_get(query, None, params=params)
try: ret = ret.content
except: pass
return ret
示例26
def start_data_processing(thread_number):
# get list of file(s)
files = agent_config_vars['files']
for evtx_file in files:
with evtx.Evtx(evtx_file) as log:
for record in log.records():
if record.timestamp():
parse_json_message(json.loads(json.dumps(xmltodict.parse(record.xml()))))
示例27
def get_class_bbox(self, img_name):
with open(self._label_path(img_name), 'r') as f:
xml = xmltodict.parse(f.read())
img_size = xml['annotation']['size']
img_w, img_h = float(img_size['width']), float(img_size['height'])
objs = xml['annotation']['object']
if type(objs) is not list:
objs = [objs]
clses = np.zeros_like(self.labels, dtype=np.float)
bboxes = np.zeros(shape=[len(self.labels), 4], dtype=np.float)
bbox_cls = defaultdict(list)
for obj in objs:
idx = self.label2idx[obj['name']]
clses[idx] = 1
bndbox = obj['bndbox']
bbox = (bndbox['xmin'], bndbox['ymin'], bndbox['xmax'], bndbox['ymax'])
bbox = self._normalize_bbox(bbox, (img_w, img_h))
bbox = np.array(bbox, dtype=np.float)
bbox_cls[idx].append(bbox)
for k, v in bbox_cls.items():
sample_idx = np.random.randint(0, len(v))
bboxes[k] = v[sample_idx]
return clses, bboxes
示例28
def get_mcu_definition(self, project_file):
""" Parse project file to get mcu definition """
project_file = join(getcwd(), project_file)
uvproj_dic = xmltodict.parse(open(project_file, "rb"), dict_constructor=dict)
# Generic Target, should get from Target class !
mcu = MCU_TEMPLATE
try:
mcu['tool_specific'] = {
# legacy device
'uvision' : {
'TargetOption' : {
'Device' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Device']],
'DeviceId' : [None if not uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['DeviceId'] else
int(uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['DeviceId'])],
'Vendor' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Vendor']],
'Cpu' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Cpu']],
'FlashDriverDll' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['FlashDriverDll']],
'SFDFile' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['SFDFile']],
'RegisterFile': [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['RegisterFile']],
}
}
}
except KeyError:
# validity check for uvision project
logging.debug("The project_file %s seems to be not valid .uvproj file.")
return mcu
return mcu
示例29
def get_mcu_definition(self, project_file):
""" Parse project file to get mcu definition """
project_file = join(getcwd(), project_file)
uvproj_dic = xmltodict.parse(open(project_file, "rb"), dict_constructor=dict)
# Generic Target, should get from Target class !
mcu = MCU_TEMPLATE
try:
mcu['tool_specific'] = {
'uvision5' : {
'TargetOption' : {
'Device' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Device']],
'DeviceId' : [None if not uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['DeviceId'] else
int(uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['DeviceId'])],
'Vendor' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Vendor']],
'Cpu' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Cpu']],
'FlashDriverDll' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['FlashDriverDll']],
'SFDFile' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['SFDFile']],
'PackID' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['PackID']],
'RegisterFile': [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['RegisterFile']],
}
}
}
except KeyError:
# validity check for uvision project
logging.debug("The project_file %s seems to be not valid .uvproj file.")
return mcu
return mcu
示例30
def _get_batches(self, job_id):
endpoint = "job/{}/batch".format(job_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
headers = self._get_bulk_headers()
with metrics.http_request_timer("get_batches"):
resp = self.sf._make_request('GET', url, headers=headers)
batches = xmltodict.parse(resp.text,
xml_attribs=False,
force_list=('batchInfo',))['batchInfoList']['batchInfo']
return batches