Python源码示例:email.message_from_string()
示例1
def test_split_long_continuation(self):
eq = self.ndiffAssertEqual
msg = email.message_from_string("""\
Subject: bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text
test
""")
sfp = StringIO()
g = Generator(sfp)
g.flatten(msg)
eq(sfp.getvalue(), """\
Subject: bug demonstration
12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
more text
test
""")
示例2
def test_boundary_with_leading_space(self):
eq = self.assertEqual
msg = email.message_from_string('''\
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary=" XXXX"
-- XXXX
Content-Type: text/plain
-- XXXX
Content-Type: text/plain
-- XXXX--
''')
self.assertTrue(msg.is_multipart())
eq(msg.get_boundary(), ' XXXX')
eq(len(msg.get_payload()), 2)
示例3
def test_message_from_string_with_class(self):
fp = openfile('msg_01.txt')
try:
text = fp.read()
finally:
fp.close()
# Create a subclass
class MyMessage(Message):
pass
msg = email.message_from_string(text, MyMessage)
self.assertIsInstance(msg, MyMessage)
# Try something more complicated
fp = openfile('msg_02.txt')
try:
text = fp.read()
finally:
fp.close()
msg = email.message_from_string(text, MyMessage)
for subpart in msg.walk():
self.assertIsInstance(subpart, MyMessage)
示例4
def test_whitespace_continuation(self):
eq = self.assertEqual
# This message contains a line after the Subject: header that has only
# whitespace, but it is not empty!
msg = email.message_from_string("""\
From: aperson@dom.ain
To: bperson@dom.ain
Subject: the next line has a space on it
\x20
Date: Mon, 8 Apr 2002 15:09:19 -0400
Message-ID: spam
Here's the message body
""")
eq(msg['subject'], 'the next line has a space on it\n ')
eq(msg['message-id'], 'spam')
eq(msg.get_payload(), "Here's the message body\n")
示例5
def test_whitespace_continuation_last_header(self):
eq = self.assertEqual
# Like the previous test, but the subject line is the last
# header.
msg = email.message_from_string("""\
From: aperson@dom.ain
To: bperson@dom.ain
Date: Mon, 8 Apr 2002 15:09:19 -0400
Message-ID: spam
Subject: the next line has a space on it
\x20
Here's the message body
""")
eq(msg['subject'], 'the next line has a space on it\n ')
eq(msg['message-id'], 'spam')
eq(msg.get_payload(), "Here's the message body\n")
示例6
def test_CRLFLF_at_end_of_part(self):
# issue 5610: feedparser should not eat two chars from body part ending
# with "\r\n\n".
m = (
"From: foo@bar.com\n"
"To: baz\n"
"Mime-Version: 1.0\n"
"Content-Type: multipart/mixed; boundary=BOUNDARY\n"
"\n"
"--BOUNDARY\n"
"Content-Type: text/plain\n"
"\n"
"body ending with CRLF newline\r\n"
"\n"
"--BOUNDARY--\n"
)
msg = email.message_from_string(m)
self.assertTrue(msg.get_payload(0).get_payload().endswith('\r\n'))
示例7
def test_rfc2231_unencoded_then_encoded_segments(self):
eq = self.assertEqual
m = """\
Content-Type: application/x-foo;
\tname*0=\"us-ascii'en-us'My\";
\tname*1*=\" Document\";
\tname*2*=\" For You\"
"""
msg = email.message_from_string(m)
charset, language, s = msg.get_param('name')
eq(charset, 'us-ascii')
eq(language, 'en-us')
eq(s, 'My Document For You')
# Tests to ensure that signed parts of an email are completely preserved, as
# required by RFC1847 section 2.1. Note that these are incomplete, because the
# email package does not currently always preserve the body. See issue 1670765.
示例8
def test_get_param(self):
eq = self.assertEqual
msg = email.message_from_string(
"X-Header: foo=one; bar=two; baz=three\n")
eq(msg.get_param('bar', header='x-header'), 'two')
eq(msg.get_param('quuz', header='x-header'), None)
eq(msg.get_param('quuz'), None)
msg = email.message_from_string(
'X-Header: foo; bar="one"; baz=two\n')
eq(msg.get_param('foo', header='x-header'), '')
eq(msg.get_param('bar', header='x-header'), 'one')
eq(msg.get_param('baz', header='x-header'), 'two')
# XXX: We are not RFC-2045 compliant! We cannot parse:
# msg["Content-Type"] = 'text/plain; weird="hey; dolly? [you] @ <\\"home\\">?"'
# msg.get_param("weird")
# yet.
示例9
def test_split_long_continuation(self):
eq = self.ndiffAssertEqual
msg = email.message_from_string("""\
Subject: bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text
test
""")
sfp = StringIO()
g = Generator(sfp)
g.flatten(msg)
eq(sfp.getvalue(), """\
Subject: bug demonstration
12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
more text
test
""")
示例10
def test_binary_body_with_encode_7or8bit(self):
# Issue 17171.
bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
msg = MIMEApplication(bytesdata, _encoder=encoders.encode_7or8bit)
# Treated as a string, this will be invalid code points.
self.assertEqual(msg.get_payload(), bytesdata)
self.assertEqual(msg.get_payload(decode=True), bytesdata)
self.assertEqual(msg['Content-Transfer-Encoding'], '8bit')
s = StringIO()
g = Generator(s)
g.flatten(msg)
wireform = s.getvalue()
msg2 = email.message_from_string(wireform)
self.assertEqual(msg.get_payload(), bytesdata)
self.assertEqual(msg2.get_payload(decode=True), bytesdata)
self.assertEqual(msg2['Content-Transfer-Encoding'], '8bit')
示例11
def test_binary_body_with_encode_noop(self):
# Issue 16564: This does not produce an RFC valid message, since to be
# valid it should have a CTE of binary. But the below works, and is
# documented as working this way.
bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
msg = MIMEApplication(bytesdata, _encoder=encoders.encode_noop)
self.assertEqual(msg.get_payload(), bytesdata)
self.assertEqual(msg.get_payload(decode=True), bytesdata)
s = StringIO()
g = Generator(s)
g.flatten(msg)
wireform = s.getvalue()
msg2 = email.message_from_string(wireform)
self.assertEqual(msg.get_payload(), bytesdata)
self.assertEqual(msg2.get_payload(decode=True), bytesdata)
# Test the basic MIMEText class
示例12
def test_boundary_with_leading_space(self):
eq = self.assertEqual
msg = email.message_from_string('''\
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary=" XXXX"
-- XXXX
Content-Type: text/plain
-- XXXX
Content-Type: text/plain
-- XXXX--
''')
self.assertTrue(msg.is_multipart())
eq(msg.get_boundary(), ' XXXX')
eq(len(msg.get_payload()), 2)
示例13
def test_message_from_string_with_class(self):
fp = openfile('msg_01.txt')
try:
text = fp.read()
finally:
fp.close()
# Create a subclass
class MyMessage(Message):
pass
msg = email.message_from_string(text, MyMessage)
self.assertIsInstance(msg, MyMessage)
# Try something more complicated
fp = openfile('msg_02.txt')
try:
text = fp.read()
finally:
fp.close()
msg = email.message_from_string(text, MyMessage)
for subpart in msg.walk():
self.assertIsInstance(subpart, MyMessage)
示例14
def test__all__(self):
module = __import__('email')
# Can't use sorted() here due to Python 2.3 compatibility
all = module.__all__[:]
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
示例15
def test_whitespace_continuation(self):
eq = self.assertEqual
# This message contains a line after the Subject: header that has only
# whitespace, but it is not empty!
msg = email.message_from_string("""\
From: aperson@dom.ain
To: bperson@dom.ain
Subject: the next line has a space on it
\x20
Date: Mon, 8 Apr 2002 15:09:19 -0400
Message-ID: spam
Here's the message body
""")
eq(msg['subject'], 'the next line has a space on it\n ')
eq(msg['message-id'], 'spam')
eq(msg.get_payload(), "Here's the message body\n")
示例16
def test_copy():
email_str = """
From: abcd@gmail.com
To: hey@example.org
Subject: subject
Body
"""
msg = email.message_from_string(email_str)
msg2 = copy(msg)
assert msg.as_bytes() == msg2.as_bytes()
示例17
def _http_request_requests(self, url, headers, data, params):
kwargs = {
'verify': self.verify_cert,
}
if url is not None:
kwargs['url'] = url
if headers is not None:
kwargs['headers'] = headers
if data is not None:
kwargs['data'] = data
if params is not None:
kwargs['params'] = params
if self.timeout is not None:
kwargs['timeout'] = self.timeout
try:
if data is None:
r = requests.get(**kwargs)
else:
r = requests.post(**kwargs)
except requests.exceptions.RequestException as e:
raise PanHttpError('RequestException: ' + str(e))
self.code = r.status_code
self.reason = r.reason
x = ['%s: %s' % (k, v) for k, v in r.headers.items()]
try:
self.headers = email.message_from_string('\n'.join(x))
except (TypeError, email.errors.MessageError) as e:
raise PanHttpError('email.message_from_string() %s' % e)
self.encoding = self.headers.get_content_charset('utf8')
self.content_type = self.headers.get_content_type()
self.content = r.content
self.text = r.text
# allow non-2XX error codes
# see http://bugs.python.org/issue18543 for why we can't just
# install a new HTTPErrorProcessor()
示例18
def get_mime_message(
self,
msg_id,
user_id='me'):
"""Get a Message and use it to create a MIME Message.
:param msg_id: The ID of the Message required.
:param user_id: User's email address. The special value "me"
can be used to indicate the authenticated user.
:return: A MIME Message, consisting of data from Message.
:rtype: :class:`email.message.Message`
"""
self.connect()
ret = None
try:
message = self.client.users().messages().get(
userId=user_id,
id=msg_id,
format='raw'
).execute()
except (socks.HTTPError, errors.HttpError) as error:
logger.error('An error occurred: %s', error)
else:
logger.info('Message snippet: %s', message.get('snippet', ''))
msg_str = urlsafe_b64decode(message.get('raw', '').encode('ASCII'))
ret = message_from_string(msg_str)
return ret
示例19
def run(self, params={}):
try:
eml_file = base64.b64decode(params.get(Input.EML_FILE)).decode('utf-8')
except Exception as ex:
self.logger.debug(ex)
self.logger.debug("Failed to parse message as UTF-8, attempting to detwingle first before retrying parse")
eml_file = UnicodeDammit.detwingle(base64.b64decode(params.get(Input.EML_FILE))).decode('utf-8', errors='ignore')
msg = email.message_from_string(eml_file)
result = format_output.format_result(self.logger, msg)
return {Output.RESULT: result}
示例20
def run(self, params={}):
msg = email.message_from_string(params.get(Input.EMAIL_STRING))
result = format_output.format_result(self.logger, msg)
return {Output.RESULT: result}
示例21
def py2_message_from_string(text): # nocoverpy3
# Work around https://bugs.python.org/issue25545 where
# email.message_from_string cannot handle Unicode on Python 2.
io_buffer = io.StringIO(text)
return email.message_from_file(io_buffer)
示例22
def py2_message_from_string(text): # nocoverpy3
# Work around https://bugs.python.org/issue25545 where
# email.message_from_string cannot handle Unicode on Python 2.
io_buffer = io.StringIO(text)
return email.message_from_file(io_buffer)
示例23
def __init__(self, message=None):
"""Initialize a Message instance."""
if isinstance(message, email.message.Message):
self._become_message(copy.deepcopy(message))
if isinstance(message, Message):
message._explain_to(self)
elif isinstance(message, str):
self._become_message(email.message_from_string(message))
elif hasattr(message, "read"):
self._become_message(email.message_from_file(message))
elif message is None:
email.message.Message.__init__(self)
else:
raise TypeError('Invalid message type: %s' % type(message))
示例24
def _parse_msg(raw):
msg = message_from_string(raw)
chash = walk.get_hash(raw)
multi = msg.is_multipart()
return msg, chash, multi
示例25
def test_bad_param(self):
msg = email.message_from_string("Content-Type: blarg; baz; boo\n")
self.assertEqual(msg.get_param('baz'), '')
示例26
def test_missing_filename(self):
msg = email.message_from_string("From: foo\n")
self.assertEqual(msg.get_filename(), None)
示例27
def test_bogus_filename(self):
msg = email.message_from_string(
"Content-Disposition: blarg; filename\n")
self.assertEqual(msg.get_filename(), '')
示例28
def test_missing_boundary(self):
msg = email.message_from_string("From: foo\n")
self.assertEqual(msg.get_boundary(), None)
示例29
def test_get_params(self):
eq = self.assertEqual
msg = email.message_from_string(
'X-Header: foo=one; bar=two; baz=three\n')
eq(msg.get_params(header='x-header'),
[('foo', 'one'), ('bar', 'two'), ('baz', 'three')])
msg = email.message_from_string(
'X-Header: foo; bar=one; baz=two\n')
eq(msg.get_params(header='x-header'),
[('foo', ''), ('bar', 'one'), ('baz', 'two')])
eq(msg.get_params(), None)
msg = email.message_from_string(
'X-Header: foo; bar="one"; baz=two\n')
eq(msg.get_params(header='x-header'),
[('foo', ''), ('bar', 'one'), ('baz', 'two')])
示例30
def test_get_param_with_semis_in_quotes(self):
msg = email.message_from_string(
'Content-Type: image/pjpeg; name="Jim&&Jill"\n')
self.assertEqual(msg.get_param('name'), 'Jim&&Jill')
self.assertEqual(msg.get_param('name', unquote=False),
'"Jim&&Jill"')