Python源码示例:xmltodict.parse()

示例1
def _handle_result(self, res):
        res.encoding = "utf-8-sig"
        xml = res.text
        logger.debug("Response from WeChat API \n %s", xml)
        try:
            data = xmltodict.parse(xml)["xml"]
        except (xmltodict.ParsingInterrupted, ExpatError):
            # 解析 XML 失败
            logger.debug("WeChat payment result xml parsing error", exc_info=True)
            return xml

        return_code = data["return_code"]
        return_msg = data.get("return_msg", data.get("retmsg"))
        result_code = data.get("result_code", data.get("retcode"))
        errcode = data.get("err_code")
        errmsg = data.get("err_code_des")
        if return_code != "SUCCESS" or result_code != "SUCCESS":
            # 返回状态码不为成功
            raise WeChatPayException(
                return_code, result_code, return_msg, errcode, errmsg, client=self, request=res.request, response=res,
            )
        return data 
示例2
def parse_message(self, msg, msg_signature, timestamp, nonce):
        """
        处理 wechat server 推送消息

        :params msg: 加密内容
        :params msg_signature: 消息签名
        :params timestamp: 时间戳
        :params nonce: 随机数
        """
        content = self.crypto.decrypt_message(msg, msg_signature, timestamp, nonce)
        message = xmltodict.parse(to_text(content))["xml"]
        message_type = message["InfoType"].lower()
        message_class = COMPONENT_MESSAGE_TYPES.get(message_type, ComponentUnknownMessage)
        msg = message_class(message)
        if msg.type == "component_verify_ticket":
            self.session.set(msg.type, msg.verify_ticket)
        elif msg.type in ("authorized", "updateauthorized"):
            msg.query_auth_result = self.query_auth(msg.authorization_code)
        return msg 
示例3
def get_timestamp_from_date_string(date_string):
    """ parse a date string into unix epoch (ms) """
    if 'strip_tz' in agent_config_vars and agent_config_vars['strip_tz']:
        date_string = ''.join(agent_config_vars['strip_tz_fmt'].split(date_string))

    if 'timestamp_format' in agent_config_vars:
        if agent_config_vars['timestamp_format'] == 'epoch':
            timestamp_datetime = get_datetime_from_unix_epoch(date_string)
        else:
            timestamp_datetime = datetime.strptime(date_string, agent_config_vars['timestamp_format'])
    else:
        try:
            timestamp_datetime = dateutil.parse.parse(date_string)
        except:
            timestamp_datetime = get_datetime_from_unix_epoch(date_string)
            agent_config_vars['timestamp_format'] = 'epoch'

    return get_timestamp_from_datetime(timestamp_datetime) 
示例4
def _add_batch(self, catalog_entry, job_id, start_date, order_by_clause=True):
        endpoint = "job/{}/batch".format(job_id)
        url = self.bulk_url.format(self.sf.instance_url, endpoint)

        body = self.sf._build_query_string(catalog_entry, start_date, order_by_clause=order_by_clause)

        headers = self._get_bulk_headers()
        headers['Content-Type'] = 'text/csv'

        with metrics.http_request_timer("add_batch") as timer:
            timer.tags['sobject'] = catalog_entry['stream']
            resp = self.sf._make_request('POST', url, headers=headers, body=body)

        batch = xmltodict.parse(resp.text)

        return batch['batchInfo']['id'] 
示例5
def _url_status(url):
    parse_obj = urllib.parse.urlparse(url)

    timer = 1
    for i in range(6):
        try:
            connection = http.client.HTTPConnection(parse_obj.netloc)
            connection.request('HEAD', parse_obj.path)
            break
        except Exception as e:
            print(url, e, 'sleep', timer)
            time.sleep(timer)
            timer *= 2
    else:
        return e

    response = connection.getresponse()
    connection.close()
    return response.status 
示例6
def generate_references(self):
        self.zip_file = zipfile.ZipFile(
            self.book_filename, mode='r', allowZip64=True)
        self.file_list = self.zip_file.namelist()

        # Book structure relies on parsing the .opf file
        # in the book. Now that might be the usual content.opf
        # or package.opf or it might be named after your favorite
        # eldritch abomination. The point is we have to check
        # the container.xml
        container = self.find_file('container.xml')
        if container:
            container_xml = self.zip_file.read(container)
            container_dict = xmltodict.parse(container_xml)
            packagefile = container_dict['container']['rootfiles']['rootfile']['@full-path']
        else:
            presumptive_names = ('content.opf', 'package.opf', 'volume.opf')
            for i in presumptive_names:
                packagefile = self.find_file(i)
                if packagefile:
                    logger.info('Using presumptive package file: ' + self.book_filename)
                    break

        packagefile_data = self.zip_file.read(packagefile)
        self.opf_dict = xmltodict.parse(packagefile_data) 
示例7
def get_chapter_content(self, chapter_file):
        this_file = self.find_file(chapter_file)
        if this_file:
            chapter_content = self.zip_file.read(this_file).decode()

            # Generate a None return for a blank chapter
            # These will be removed from the contents later
            contentDocument = QtGui.QTextDocument(None)
            contentDocument.setHtml(chapter_content)
            contentText = contentDocument.toPlainText().replace('\n', '')
            if contentText == '':
                chapter_content = None

            return chapter_content
        else:
            return 'Possible parse error: ' + chapter_file 
示例8
def _createWorkitem(self, url_post, workitem_raw):
        headers = copy.deepcopy(self.headers)
        headers['Content-Type'] = self.OSLC_CR_XML

        resp = self.post(url_post, verify=False,
                         headers=headers, proxies=self.proxies,
                         data=workitem_raw)

        raw_data = xmltodict.parse(resp.content)
        workitem_raw = raw_data["oslc_cm:ChangeRequest"]
        workitem_id = workitem_raw["dc:identifier"]
        workitem_url = "/".join([self.url,
                                 "oslc/workitems/%s" % workitem_id])
        new_wi = Workitem(workitem_url,
                          self,
                          workitem_id=workitem_id,
                          raw_data=raw_data["oslc_cm:ChangeRequest"])

        self.log.info("Successfully create <Workitem %s>" % new_wi)
        return new_wi 
示例9
def get_qos_class_map(netconf_handler):
    '''
    This procedure takes in the netconf handler for the switch and read what is the QoS Softmax Multiplier
    Procedure returns True if configuration successful, else returns False
    '''

    netconf_reply = netconf_handler.get_config( source='running', filter=('xpath', "/native/policy/class-map"))

    data = xmltodict.parse(netconf_reply.xml)
    #print (data)
   
    class_map_names = [] 
    for cmap in (data["rpc-reply"]["data"]["native"]["policy"]["class-map"]):
        class_map_names.append(cmap["name"])


    return class_map_names 
示例10
def get_qos_policy_map(netconf_handler):
    '''
    This procedure takes in the netconf handler for the switch and read what is the QoS Softmax Multiplier
    Procedure returns True if configuration successful, else returns False
    '''

    netconf_reply = netconf_handler.get_config( source='running', filter=('xpath', "/native/policy/policy-map"))

    data = xmltodict.parse(netconf_reply.xml)

    policy_map_names = [] 
    for pmap in (data["rpc-reply"]["data"]["native"]["policy"]["policy-map"]):
        if (pmap["name"]) != "system-cpp-policy":
            class_maps_in_policy = "  Contains Class maps: " 
            for cmap in (pmap["class"]):
                class_maps_in_policy = class_maps_in_policy + cmap["name"] + "  "
            policy_map_names.append("Policy map: " + pmap["name"] + class_maps_in_policy) 

    return policy_map_names 
示例11
def get_pytornado_settings_from_CPACS(cpacs_in_path):
    """ Try to read PyTornado settings from CPACS

    Note:
        * Returns None if PyTornado settings not found in CPACS

    Args:
        cpacs_in_path (str): Path to CPACS file

    Returns:
        cpacs_settings (dict): PyTornado settings dictionary read from CPACS
    """

    with open(cpacs_in_path, "r") as fp:
        cpacs_as_dict = xml.parse(fp.read())
    cpacs_settings = cpacs_as_dict.get('cpacs', {}).get('toolspecific', {}).get('pytornado', None)
    parse_pytornado_settings_dict(cpacs_settings)
    return cpacs_settings 
示例12
def get_external_ip(soap_url) -> str:
    """get external ip address"""
    s_o_a_p = '<?xml version="1.0"?>\r\n'
    s_o_a_p += '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle=' \
               '"http://schemas.xmlsoap.org/soap/encoding/">\r\n'
    s_o_a_p += '<s:Body>\r\n'
    s_o_a_p += '<u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANPPPConnection:1">\r\n'
    s_o_a_p += '</u:GetExternalIPAddress>\r\n'
    s_o_a_p += '</s:Body>\r\n'
    s_o_a_p += '</s:Envelope>\r\n'

    try:
        req = Request(soap_url)
        req.add_header('Content-Type', 'text/xml; charset="utf-8"')
        req.add_header('SOAPACTION', '"urn:schemas-upnp-org:service:WANPPPConnection:1#GetExternalIPAddress"')
        req.data = s_o_a_p.encode('utf8')
        result = xmltodict.parse(urlopen(req).read().decode())
        return result['s:Envelope']['s:Body']['u:GetExternalIPAddressResponse']['NewExternalIPAddress']
    except Exception:
        log.debug("get_external_ip exception", exc_info=True) 
示例13
def __init__(self, file_name):
        self.data = []

        content = open(file_name, encoding="utf-8").read()

        is_test_file = file_name.split("/")[-1] == "testdata_with_lex.xml"

        structure = xmltodict.parse(content)
        for i, entry in enumerate(structure["benchmark"]["entries"]["entry"]):
            triplets = [tuple(map(str.strip, r.split("|"))) for r in
                        self.triplets_from_object(entry["modifiedtripleset"], "mtriple")]
            sentences = list(self.extract_sentences(entry["lex"]))

            for s in sentences:
                info = {
                    "id": i,
                    "seen": not is_test_file or i <= 970,
                    "manual": is_test_file and i + 1 in FOR_MANUAL_EVAL and i <= 970
                }
                self.data.append(Datum(rdfs=triplets, text=s, info=info)) 
示例14
def parse_nmap_xml(nmap_output_file, protocol):
    targets = []

    with open(nmap_output_file, 'r') as file_handle:
        scan_output = xmltodict.parse(file_handle.read())

    for host in scan_output['nmaprun']['host']:
        if host['address'][0]['@addrtype'] != 'ipv4':
            continue

        ip = host['address'][0]['@addr']
        for port in host['ports']['port']:
            if port['state']['@state'] == 'open':
                if 'service' in port and (port['service']['@name'] in protocol_dict[protocol]['services']):
                    if ip not in targets:
                        targets.append(ip)
                elif port['@portid'] in protocol_dict[protocol]['ports']:
                    if ip not in targets:
                        targets.append(ip)

    return targets 
示例15
def _fetch_sandbox_api_key(self):
        nonce_str = random_string(32)
        sign = calculate_signature({"mch_id": self.mch_id, "nonce_str": nonce_str}, self.api_key)
        payload = dict_to_xml({"mch_id": self.mch_id, "nonce_str": nonce_str,}, sign=sign)
        headers = {"Content-Type": "text/xml"}
        api_url = f"{self.API_BASE_URL}sandboxnew/pay/getsignkey"
        response = self._http.post(api_url, data=payload, headers=headers)
        return xmltodict.parse(response.text)["xml"].get("sandbox_signkey") 
示例16
def get_payment_data(cls, xml):
        """
        解析微信支付结果通知,获得appid, mch_id, out_trade_no, transaction_id
        如果你需要进一步判断,请先用appid, mch_id来生成WeChatPay,
        然后用`wechatpay.parse_payment_result(xml)`来校验支付结果

        使用示例::

            from wechatpy.pay import WeChatPay
            # 假设你已经获取了微信服务器推送的请求中的xml数据并存入xml变量
            data = WeChatPay.get_payment_appid(xml)
            {
                "appid": "公众号或者小程序的id",
                "mch_id": "商户id",
            }

        """
        try:
            data = xmltodict.parse(xml)
        except (xmltodict.ParsingInterrupted, ExpatError):
            raise ValueError("invalid xml")
        if not data or "xml" not in data:
            raise ValueError("invalid xml")
        return {
            "appid": data["appid"],
            "mch_id": data["mch_id"],
            "out_trade_no": data["out_trade_no"],
            "transaction_id": data["transaction_id"],
        } 
示例17
def parse_payment_result(self, xml):
        """解析微信支付结果通知"""
        try:
            data = xmltodict.parse(xml)
        except (xmltodict.ParsingInterrupted, ExpatError):
            raise InvalidSignatureException()

        if not data or "xml" not in data:
            raise InvalidSignatureException()

        data = data["xml"]
        sign = data.pop("sign", None)
        real_sign = calculate_signature(data, self.api_key if not self.sandbox else self.sandbox_api_key)
        if sign != real_sign:
            raise InvalidSignatureException()

        for key in (
            "total_fee",
            "settlement_total_fee",
            "cash_fee",
            "coupon_fee",
            "coupon_count",
        ):
            if key in data:
                data[key] = int(data[key])
        data["sign"] = sign
        return data 
示例18
def _decrypt_message(self, msg, signature, timestamp, nonce, crypto_class=None):
        if not isinstance(msg, dict):
            import xmltodict

            msg = xmltodict.parse(to_text(msg))["xml"]

        encrypt = msg["Encrypt"]
        _signature = _get_signature(self.token, timestamp, nonce, encrypt)
        if _signature != signature:
            raise InvalidSignatureException()
        pc = crypto_class(self.key)
        return pc.decrypt(encrypt, self._id) 
示例19
def parse_message(xml):
    if not xml:
        return
    message = xmltodict.parse(to_text(xml))["xml"]
    message_type = message["MsgType"].lower()
    if message_type == "event":
        event_type = message["Event"].lower()
        message_class = EVENT_TYPES.get(event_type, UnknownMessage)
    else:
        message_class = MESSAGE_TYPES.get(message_type, UnknownMessage)
    return message_class(message) 
示例20
def deserialize_reply(xml, update_time=False):
    """
    反序列化被动回复
    :param xml: 待反序列化的xml
    :param update_time: 是否用当前时间替换xml中的时间
    :raises ValueError: 不能辨识的reply xml
    :rtype: wechatpy.replies.BaseReply
    """
    if not xml:
        return EmptyReply()

    try:
        reply_dict = xmltodict.parse(xml)["xml"]
        msg_type = reply_dict["MsgType"]
    except (xmltodict.expat.ExpatError, KeyError):
        raise ValueError("bad reply xml")
    if msg_type not in REPLY_TYPES:
        raise ValueError("unknown reply type")

    cls = REPLY_TYPES[msg_type]
    kwargs = dict()
    for attr, field in cls._fields.items():
        if field.name in reply_dict:
            str_value = reply_dict[field.name]
            kwargs[attr] = field.from_xml(str_value)

    if update_time:
        kwargs["time"] = time.time()

    return cls(**kwargs) 
示例21
def parse_message(xml):
    """
    解析微信服务器推送的 XML 消息

    :param xml: XML 消息
    :return: 解析成功返回对应的消息或事件,否则返回 ``UnknownMessage``
    """
    if not xml:
        return
    message = xmltodict.parse(to_text(xml))["xml"]
    message_type = message["MsgType"].lower()
    event_type = None
    if message_type == "event" or message_type.startswith("device_"):
        if "Event" in message:
            event_type = message["Event"].lower()
        # special event type for device_event
        if event_type is None and message_type.startswith("device_"):
            event_type = message_type
        elif message_type.startswith("device_"):
            event_type = f"device_{event_type}"

        if event_type == "subscribe" and message.get("EventKey"):
            event_key = message["EventKey"]
            if event_key.startswith(("scanbarcode|", "scanimage|")):
                event_type = "subscribe_scan_product"
                message["Event"] = event_type
            elif event_key.startswith("qrscene_"):
                # Scan to subscribe with scene id event
                event_type = "subscribe_scan"
                message["Event"] = event_type
                message["EventKey"] = event_key[len("qrscene_") :]
        message_class = EVENT_TYPES.get(event_type, UnknownMessage)
    else:
        message_class = MESSAGE_TYPES.get(message_type, UnknownMessage)
    return message_class(message) 
示例22
def test_decrypt_message(self):
        xml = """<xml><ToUserName><![CDATA[wx49f0ab532d5d035a]]></ToUserName>
<Encrypt><![CDATA[RgqEoJj5A4EMYlLvWO1F86ioRjZfaex/gePD0gOXTxpsq5Yj4GNglrBb8I2BAJVODGajiFnXBu7mCPatfjsu6IHCrsTyeDXzF6Bv283dGymzxh6ydJRvZsryDyZbLTE7rhnus50qGPMfp2wASFlzEgMW9z1ef/RD8XzaFYgm7iTdaXpXaG4+BiYyolBug/gYNx410cvkKR2/nPwBiT+P4hIiOAQqGp/TywZBtDh1yCF2KOd0gpiMZ5jSw3e29mTvmUHzkVQiMS6td7vXUaWOMZnYZlF3So2SjHnwh4jYFxdgpkHHqIrH/54SNdshoQgWYEvccTKe7FS709/5t6NMxuGhcUGAPOQipvWTT4dShyqio7mlsl5noTrb++x6En749zCpQVhDpbV6GDnTbcX2e8K9QaNWHp91eBdCRxthuL0=]]></Encrypt>
<AgentID><![CDATA[1]]></AgentID>
</xml>"""

        signature = "74d92dfeb87ba7c714f89d98870ae5eb62dff26d"
        timestamp = "1411525903"
        nonce = "461056294"

        crypto = WeChatCrypto(self.token, self.encoding_aes_key, self.corp_id)
        msg = crypto.decrypt_message(xml, signature, timestamp, nonce)
        msg_dict = xmltodict.parse(msg)["xml"]
        self.assertEqual("test", msg_dict["Content"])
        self.assertEqual("messense", msg_dict["FromUserName"]) 
示例23
def xml_to_dict(xml, listpaths=None, **kwargs):
    listpaths = listpaths or []
    opts = dict(
        attr_prefix = '',
        dict_constructor = dict,
        postprocessor = lambda p,k,d: (k.split(':')[1] if ':' in k else k,d)
    )
    opts.update(kwargs)
    content = xmltodict.parse(xml, **opts)
    for listpath in listpaths:
        pathdata = rget(content, listpath, [])
        pathdata = pathdata if isinstance(pathdata, list) else [pathdata]
        rset(content, listpath, pathdata)
    return content 
示例24
def parse_xml(self, ret, method=None):
        if method is None:
            method = self._xmlparser

        if method == 'EUtilsParser':
            ret = self.easyXML(ret)
            return EUtilsParser(ret)
        elif method == 'objectify': # used in docstrings
            from bioservices.xmltools import XMLObjectify
            return XMLObjectify(ret)
        elif method == 'dict':
            import xmltodict
            return xmltodict.parse(ret) 
示例25
def ECitMatch(self, bdata, **kargs):
        r"""


        :param bdata: Citation strings. Each input citation must
            be represented by a citation string in the following format::

                journal_title|year|volume|first_page|author_name|your_key|

            Multiple citation strings may be provided by separating the
            strings with a carriage return character (%0D) or simply \\r or \\n.

            The your_key value is an arbitrary label provided by the user
            that may serve as a local identifier for the citation,
            and it will be included in the output.

            all spaces must be replaced by + symbols and that citation
            strings should end with a final vertical bar |.


        Only xml supported at the time of this implementation.

        ::

            from bioservices import EUtils
            s = EUtils()
            print(s.ECitMatch("proc+natl+acad+sci+u+s+a|1991|88|3248|mann+bj|Art1|%0Dscience|1987|235|182|palmenberg+ac|Art2|"))

        """
        # Fixes https://github.com/cokelaer/bioservices/issues/169
        from urllib.parse import unquote
        params = {'bdata': unquote(bdata), "retmode": "xml"}

        # note here, we use .cgi not .fcgi
        query = "ecitmatch.cgi?db=pubmed&retmode=xml"
        ret = self.http_get(query, None,  params=params)
        try: ret = ret.content
        except: pass

        return ret 
示例26
def start_data_processing(thread_number):
    # get list of file(s)
    files = agent_config_vars['files']
    for evtx_file in files:
        with evtx.Evtx(evtx_file) as log:
            for record in log.records():
                if record.timestamp():
                    parse_json_message(json.loads(json.dumps(xmltodict.parse(record.xml())))) 
示例27
def get_class_bbox(self, img_name):
        with open(self._label_path(img_name), 'r') as f:
            xml = xmltodict.parse(f.read())

        img_size = xml['annotation']['size']
        img_w, img_h = float(img_size['width']), float(img_size['height'])

        objs = xml['annotation']['object']

        if type(objs) is not list:
            objs = [objs]

        clses = np.zeros_like(self.labels, dtype=np.float)
        bboxes = np.zeros(shape=[len(self.labels), 4], dtype=np.float)
        bbox_cls = defaultdict(list)

        for obj in objs:
            idx = self.label2idx[obj['name']]
            clses[idx] = 1

            bndbox = obj['bndbox']
            bbox = (bndbox['xmin'], bndbox['ymin'], bndbox['xmax'], bndbox['ymax'])
            bbox = self._normalize_bbox(bbox, (img_w, img_h))
            bbox = np.array(bbox, dtype=np.float)
            bbox_cls[idx].append(bbox)

        for k, v in bbox_cls.items():
            sample_idx = np.random.randint(0, len(v))
            bboxes[k] = v[sample_idx]

        return clses, bboxes 
示例28
def get_mcu_definition(self, project_file):
        """ Parse project file to get mcu definition """
        project_file = join(getcwd(), project_file)
        uvproj_dic = xmltodict.parse(open(project_file, "rb"), dict_constructor=dict)
        # Generic Target, should get from Target class !
        mcu = MCU_TEMPLATE

        try:
            mcu['tool_specific'] = {
                # legacy device
                'uvision' : {
                    'TargetOption' : {
                        'Device' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Device']],
                        'DeviceId' : [None if not uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['DeviceId'] else 
                            int(uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['DeviceId'])],
                        'Vendor' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Vendor']],
                        'Cpu' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Cpu']],
                        'FlashDriverDll' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['FlashDriverDll']],
                        'SFDFile' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['SFDFile']],
                        'RegisterFile': [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['RegisterFile']],
                    }
                }
            }
        except KeyError:
            # validity check for uvision project
            logging.debug("The project_file %s seems to be not valid .uvproj file.")
            return mcu

        return mcu 
示例29
def get_mcu_definition(self, project_file):
        """ Parse project file to get mcu definition """
        project_file = join(getcwd(), project_file)
        uvproj_dic = xmltodict.parse(open(project_file, "rb"), dict_constructor=dict)
        # Generic Target, should get from Target class !
        mcu = MCU_TEMPLATE

        try:
            mcu['tool_specific'] = {
                'uvision5' : {
                    'TargetOption' : {
                        'Device' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Device']],
                        'DeviceId' : [None if not uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['DeviceId'] else 
                            int(uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['DeviceId'])],
                        'Vendor' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Vendor']],
                        'Cpu' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['Cpu']],
                        'FlashDriverDll' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['FlashDriverDll']],
                        'SFDFile' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['SFDFile']],
                        'PackID' : [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['PackID']],
                        'RegisterFile': [uvproj_dic['Project']['Targets']['Target']['TargetOption']['TargetCommonOption']['RegisterFile']],
                    }
                }
            }
        except KeyError:
            # validity check for uvision project
            logging.debug("The project_file %s seems to be not valid .uvproj file.")
            return mcu

        return mcu 
示例30
def _get_batches(self, job_id):
        endpoint = "job/{}/batch".format(job_id)
        url = self.bulk_url.format(self.sf.instance_url, endpoint)
        headers = self._get_bulk_headers()

        with metrics.http_request_timer("get_batches"):
            resp = self.sf._make_request('GET', url, headers=headers)

        batches = xmltodict.parse(resp.text,
                                  xml_attribs=False,
                                  force_list=('batchInfo',))['batchInfoList']['batchInfo']

        return batches