Python源码示例:email.message_from_string()

示例1
def test_split_long_continuation(self):
        eq = self.ndiffAssertEqual
        msg = email.message_from_string("""\
Subject: bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text

test
""")
        sfp = StringIO()
        g = Generator(sfp)
        g.flatten(msg)
        eq(sfp.getvalue(), """\
Subject: bug demonstration
 12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
 more text

test
""") 
示例2
def test_boundary_with_leading_space(self):
        eq = self.assertEqual
        msg = email.message_from_string('''\
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="    XXXX"

--    XXXX
Content-Type: text/plain


--    XXXX
Content-Type: text/plain

--    XXXX--
''')
        self.assertTrue(msg.is_multipart())
        eq(msg.get_boundary(), '    XXXX')
        eq(len(msg.get_payload()), 2) 
示例3
def test_message_from_string_with_class(self):
        fp = openfile('msg_01.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        # Create a subclass
        class MyMessage(Message):
            pass

        msg = email.message_from_string(text, MyMessage)
        self.assertIsInstance(msg, MyMessage)
        # Try something more complicated
        fp = openfile('msg_02.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        msg = email.message_from_string(text, MyMessage)
        for subpart in msg.walk():
            self.assertIsInstance(subpart, MyMessage) 
示例4
def test_whitespace_continuation(self):
        eq = self.assertEqual
        # This message contains a line after the Subject: header that has only
        # whitespace, but it is not empty!
        msg = email.message_from_string("""\
From: aperson@dom.ain
To: bperson@dom.ain
Subject: the next line has a space on it
\x20
Date: Mon, 8 Apr 2002 15:09:19 -0400
Message-ID: spam

Here's the message body
""")
        eq(msg['subject'], 'the next line has a space on it\n ')
        eq(msg['message-id'], 'spam')
        eq(msg.get_payload(), "Here's the message body\n") 
示例5
def test_whitespace_continuation_last_header(self):
        eq = self.assertEqual
        # Like the previous test, but the subject line is the last
        # header.
        msg = email.message_from_string("""\
From: aperson@dom.ain
To: bperson@dom.ain
Date: Mon, 8 Apr 2002 15:09:19 -0400
Message-ID: spam
Subject: the next line has a space on it
\x20

Here's the message body
""")
        eq(msg['subject'], 'the next line has a space on it\n ')
        eq(msg['message-id'], 'spam')
        eq(msg.get_payload(), "Here's the message body\n") 
示例6
def test_CRLFLF_at_end_of_part(self):
        # issue 5610: feedparser should not eat two chars from body part ending
        # with "\r\n\n".
        m = (
            "From: foo@bar.com\n"
            "To: baz\n"
            "Mime-Version: 1.0\n"
            "Content-Type: multipart/mixed; boundary=BOUNDARY\n"
            "\n"
            "--BOUNDARY\n"
            "Content-Type: text/plain\n"
            "\n"
            "body ending with CRLF newline\r\n"
            "\n"
            "--BOUNDARY--\n"
          )
        msg = email.message_from_string(m)
        self.assertTrue(msg.get_payload(0).get_payload().endswith('\r\n')) 
示例7
def test_rfc2231_unencoded_then_encoded_segments(self):
        eq = self.assertEqual
        m = """\
Content-Type: application/x-foo;
\tname*0=\"us-ascii'en-us'My\";
\tname*1*=\" Document\";
\tname*2*=\" For You\"

"""
        msg = email.message_from_string(m)
        charset, language, s = msg.get_param('name')
        eq(charset, 'us-ascii')
        eq(language, 'en-us')
        eq(s, 'My Document For You')



# Tests to ensure that signed parts of an email are completely preserved, as
# required by RFC1847 section 2.1.  Note that these are incomplete, because the
# email package does not currently always preserve the body.  See issue 1670765. 
示例8
def test_get_param(self):
        eq = self.assertEqual
        msg = email.message_from_string(
            "X-Header: foo=one; bar=two; baz=three\n")
        eq(msg.get_param('bar', header='x-header'), 'two')
        eq(msg.get_param('quuz', header='x-header'), None)
        eq(msg.get_param('quuz'), None)
        msg = email.message_from_string(
            'X-Header: foo; bar="one"; baz=two\n')
        eq(msg.get_param('foo', header='x-header'), '')
        eq(msg.get_param('bar', header='x-header'), 'one')
        eq(msg.get_param('baz', header='x-header'), 'two')
        # XXX: We are not RFC-2045 compliant!  We cannot parse:
        # msg["Content-Type"] = 'text/plain; weird="hey; dolly? [you] @ <\\"home\\">?"'
        # msg.get_param("weird")
        # yet. 
示例9
def test_split_long_continuation(self):
        eq = self.ndiffAssertEqual
        msg = email.message_from_string("""\
Subject: bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text

test
""")
        sfp = StringIO()
        g = Generator(sfp)
        g.flatten(msg)
        eq(sfp.getvalue(), """\
Subject: bug demonstration
 12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
 more text

test
""") 
示例10
def test_binary_body_with_encode_7or8bit(self):
        # Issue 17171.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_7or8bit)
        # Treated as a string, this will be invalid code points.
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        self.assertEqual(msg['Content-Transfer-Encoding'], '8bit')
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_string(wireform)
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)
        self.assertEqual(msg2['Content-Transfer-Encoding'], '8bit') 
示例11
def test_binary_body_with_encode_noop(self):
        # Issue 16564: This does not produce an RFC valid message, since to be
        # valid it should have a CTE of binary.  But the below works, and is
        # documented as working this way.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_noop)
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_string(wireform)
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)


# Test the basic MIMEText class 
示例12
def test_boundary_with_leading_space(self):
        eq = self.assertEqual
        msg = email.message_from_string('''\
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="    XXXX"

--    XXXX
Content-Type: text/plain


--    XXXX
Content-Type: text/plain

--    XXXX--
''')
        self.assertTrue(msg.is_multipart())
        eq(msg.get_boundary(), '    XXXX')
        eq(len(msg.get_payload()), 2) 
示例13
def test_message_from_string_with_class(self):
        fp = openfile('msg_01.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        # Create a subclass
        class MyMessage(Message):
            pass

        msg = email.message_from_string(text, MyMessage)
        self.assertIsInstance(msg, MyMessage)
        # Try something more complicated
        fp = openfile('msg_02.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        msg = email.message_from_string(text, MyMessage)
        for subpart in msg.walk():
            self.assertIsInstance(subpart, MyMessage) 
示例14
def test__all__(self):
        module = __import__('email')
        # Can't use sorted() here due to Python 2.3 compatibility
        all = module.__all__[:]
        all.sort()
        self.assertEqual(all, [
            # Old names
            'Charset', 'Encoders', 'Errors', 'Generator',
            'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
            'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
            'MIMENonMultipart', 'MIMEText', 'Message',
            'Parser', 'Utils', 'base64MIME',
            # new names
            'base64mime', 'charset', 'encoders', 'errors', 'generator',
            'header', 'iterators', 'message', 'message_from_file',
            'message_from_string', 'mime', 'parser',
            'quopriMIME', 'quoprimime', 'utils',
            ]) 
示例15
def test_whitespace_continuation(self):
        eq = self.assertEqual
        # This message contains a line after the Subject: header that has only
        # whitespace, but it is not empty!
        msg = email.message_from_string("""\
From: aperson@dom.ain
To: bperson@dom.ain
Subject: the next line has a space on it
\x20
Date: Mon, 8 Apr 2002 15:09:19 -0400
Message-ID: spam

Here's the message body
""")
        eq(msg['subject'], 'the next line has a space on it\n ')
        eq(msg['message-id'], 'spam')
        eq(msg.get_payload(), "Here's the message body\n") 
示例16
def test_copy():
    email_str = """
    From: abcd@gmail.com
    To: hey@example.org
    Subject: subject
    
    Body    
    """
    msg = email.message_from_string(email_str)
    msg2 = copy(msg)

    assert msg.as_bytes() == msg2.as_bytes() 
示例17
def _http_request_requests(self, url, headers, data, params):
        kwargs = {
            'verify': self.verify_cert,
        }
        if url is not None:
            kwargs['url'] = url
        if headers is not None:
            kwargs['headers'] = headers
        if data is not None:
            kwargs['data'] = data
        if params is not None:
            kwargs['params'] = params
        if self.timeout is not None:
            kwargs['timeout'] = self.timeout

        try:
            if data is None:
                r = requests.get(**kwargs)
            else:
                r = requests.post(**kwargs)
        except requests.exceptions.RequestException as e:
            raise PanHttpError('RequestException: ' + str(e))

        self.code = r.status_code
        self.reason = r.reason
        x = ['%s: %s' % (k, v) for k, v in r.headers.items()]
        try:
            self.headers = email.message_from_string('\n'.join(x))
        except (TypeError, email.errors.MessageError) as e:
            raise PanHttpError('email.message_from_string() %s' % e)
        self.encoding = self.headers.get_content_charset('utf8')
        self.content_type = self.headers.get_content_type()
        self.content = r.content
        self.text = r.text

    # allow non-2XX error codes
    # see http://bugs.python.org/issue18543 for why we can't just
    # install a new HTTPErrorProcessor() 
示例18
def get_mime_message(
            self,
            msg_id,
            user_id='me'):
        """Get a Message and use it to create a MIME Message.

        :param msg_id: The ID of the Message required.
        :param user_id: User's email address. The special value "me"
            can be used to indicate the authenticated user.
        :return: A MIME Message, consisting of data from Message.
        :rtype: :class:`email.message.Message`
        """
        self.connect()
        ret = None
        try:
            message = self.client.users().messages().get(
                userId=user_id,
                id=msg_id,
                format='raw'
            ).execute()
        except (socks.HTTPError, errors.HttpError) as error:
            logger.error('An error occurred: %s', error)
        else:
            logger.info('Message snippet: %s', message.get('snippet', ''))
            msg_str = urlsafe_b64decode(message.get('raw', '').encode('ASCII'))
            ret = message_from_string(msg_str)
        return ret 
示例19
def run(self, params={}):
        try:
            eml_file = base64.b64decode(params.get(Input.EML_FILE)).decode('utf-8')
        except Exception as ex:
            self.logger.debug(ex)
            self.logger.debug("Failed to parse message as UTF-8, attempting to detwingle first before retrying parse")
            eml_file = UnicodeDammit.detwingle(base64.b64decode(params.get(Input.EML_FILE))).decode('utf-8', errors='ignore')

        msg = email.message_from_string(eml_file)

        result = format_output.format_result(self.logger, msg)
        return {Output.RESULT: result} 
示例20
def run(self, params={}):
        msg = email.message_from_string(params.get(Input.EMAIL_STRING))
        result = format_output.format_result(self.logger, msg)
        return {Output.RESULT: result} 
示例21
def py2_message_from_string(text):  # nocoverpy3
    # Work around https://bugs.python.org/issue25545 where
    # email.message_from_string cannot handle Unicode on Python 2.
    io_buffer = io.StringIO(text)
    return email.message_from_file(io_buffer) 
示例22
def py2_message_from_string(text):  # nocoverpy3
    # Work around https://bugs.python.org/issue25545 where
    # email.message_from_string cannot handle Unicode on Python 2.
    io_buffer = io.StringIO(text)
    return email.message_from_file(io_buffer) 
示例23
def __init__(self, message=None):
        """Initialize a Message instance."""
        if isinstance(message, email.message.Message):
            self._become_message(copy.deepcopy(message))
            if isinstance(message, Message):
                message._explain_to(self)
        elif isinstance(message, str):
            self._become_message(email.message_from_string(message))
        elif hasattr(message, "read"):
            self._become_message(email.message_from_file(message))
        elif message is None:
            email.message.Message.__init__(self)
        else:
            raise TypeError('Invalid message type: %s' % type(message)) 
示例24
def _parse_msg(raw):
    msg = message_from_string(raw)
    chash = walk.get_hash(raw)
    multi = msg.is_multipart()
    return msg, chash, multi 
示例25
def test_bad_param(self):
        msg = email.message_from_string("Content-Type: blarg; baz; boo\n")
        self.assertEqual(msg.get_param('baz'), '') 
示例26
def test_missing_filename(self):
        msg = email.message_from_string("From: foo\n")
        self.assertEqual(msg.get_filename(), None) 
示例27
def test_bogus_filename(self):
        msg = email.message_from_string(
        "Content-Disposition: blarg; filename\n")
        self.assertEqual(msg.get_filename(), '') 
示例28
def test_missing_boundary(self):
        msg = email.message_from_string("From: foo\n")
        self.assertEqual(msg.get_boundary(), None) 
示例29
def test_get_params(self):
        eq = self.assertEqual
        msg = email.message_from_string(
            'X-Header: foo=one; bar=two; baz=three\n')
        eq(msg.get_params(header='x-header'),
           [('foo', 'one'), ('bar', 'two'), ('baz', 'three')])
        msg = email.message_from_string(
            'X-Header: foo; bar=one; baz=two\n')
        eq(msg.get_params(header='x-header'),
           [('foo', ''), ('bar', 'one'), ('baz', 'two')])
        eq(msg.get_params(), None)
        msg = email.message_from_string(
            'X-Header: foo; bar="one"; baz=two\n')
        eq(msg.get_params(header='x-header'),
           [('foo', ''), ('bar', 'one'), ('baz', 'two')]) 
示例30
def test_get_param_with_semis_in_quotes(self):
        msg = email.message_from_string(
            'Content-Type: image/pjpeg; name="Jim&amp;&amp;Jill"\n')
        self.assertEqual(msg.get_param('name'), 'Jim&amp;&amp;Jill')
        self.assertEqual(msg.get_param('name', unquote=False),
                         '"Jim&amp;&amp;Jill"')