Python源码示例:smtpd.SMTPChannel()

示例1
def __init__(self, localaddr, map, debug=False, esmtp=True):
        self._localaddr = localaddr
        self._remoteaddr = None
        self.__smtpchannel = _SMTPChannel if esmtp else smtpd.SMTPChannel
        self.__debug = debug
        self.__messages = []
        asyncore.dispatcher.__init__(self, map=map)
        try:
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
            # try to re-use a server port if possible
            self.set_reuse_addr()
            self.bind(localaddr)
            self.listen(5)
        except:
            # cleanup asyncore.socket_map before raising
            self.close()
            raise 
示例2
def setUp(self):
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
        # temporarily replace sys.stdout to capture DebuggingServer output
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output

        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
        # Capture SMTPChannel debug output
        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
        smtpd.DEBUGSTREAM = io.StringIO()
        # Pick a random unused port by passing 0 for the port number
        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
                                          decode_data=True)
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
        serv_args = (self.serv, self.serv_evt, self.client_evt)
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()

        # wait until server thread has assigned a port number
        self.serv_evt.wait()
        self.serv_evt.clear() 
示例3
def test_process_message_with_decode_data_true(self):
        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
                                       decode_data=True)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
        with support.captured_stdout() as s:
            self.send_data(channel, b'From: test\n\nhello\n')
        stdout = s.getvalue()
        self.assertEqual(stdout, textwrap.dedent("""\
             ---------- MESSAGE FOLLOWS ----------
             From: test
             X-Peer: peer-address

             hello
             ------------ END MESSAGE ------------
             """)) 
示例4
def test_process_message_with_decode_data_false(self):
        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
                                       decode_data=False)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
        with support.captured_stdout() as s:
            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
        stdout = s.getvalue()
        self.assertEqual(stdout, textwrap.dedent("""\
             ---------- MESSAGE FOLLOWS ----------
             b'From: test'
             b'X-Peer: peer-address'
             b''
             b'h\\xc3\\xa9llo\\xff'
             ------------ END MESSAGE ------------
             """)) 
示例5
def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
                                       enable_SMTPUTF8=True)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
        with support.captured_stdout() as s:
            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
                           enable_SMTPUTF8=True)
        stdout = s.getvalue()
        self.assertEqual(stdout, textwrap.dedent("""\
             ---------- MESSAGE FOLLOWS ----------
             mail options: ['BODY=8BITMIME', 'SMTPUTF8']
             b'From: test'
             b'X-Peer: peer-address'
             b''
             b'h\\xc3\\xa9llo\\xff'
             ------------ END MESSAGE ------------
             """)) 
示例6
def test_with_decode_data_false(self):
        server = DummyServer((support.HOST, 0), ('b', 0), decode_data=False)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
        self.write_line(channel, b'EHLO example')
        for line in [
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
        ]:
            self.write_line(channel, line)
            self.assertEqual(channel.socket.last, self.error_response)
        self.write_line(
            channel,
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN')
        self.assertEqual(
            channel.socket.last,
            b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')
        self.write_line(
            channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime')
        self.assertEqual(channel.socket.last, b'250 OK\r\n') 
示例7
def setUp(self):
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
        # temporarily replace sys.stdout to capture DebuggingServer output
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output

        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
        # Capture SMTPChannel debug output
        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
        smtpd.DEBUGSTREAM = io.StringIO()
        # Pick a random unused port by passing 0 for the port number
        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
        serv_args = (self.serv, self.serv_evt, self.client_evt)
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()

        # wait until server thread has assigned a port number
        self.serv_evt.wait()
        self.serv_evt.clear() 
示例8
def setUp(self):
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
        # temporarily replace sys.stdout to capture DebuggingServer output
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output

        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
        # Capture SMTPChannel debug output
        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
        smtpd.DEBUGSTREAM = io.StringIO()
        # Pick a random unused port by passing 0 for the port number
        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
                                          decode_data=True)
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
        serv_args = (self.serv, self.serv_evt, self.client_evt)
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()

        # wait until server thread has assigned a port number
        self.serv_evt.wait()
        self.serv_evt.clear() 
示例9
def test_process_message_with_decode_data_true(self):
        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
                                       decode_data=True)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
        with support.captured_stdout() as s:
            self.send_data(channel, b'From: test\n\nhello\n')
        stdout = s.getvalue()
        self.assertEqual(stdout, textwrap.dedent("""\
             ---------- MESSAGE FOLLOWS ----------
             From: test
             X-Peer: peer-address

             hello
             ------------ END MESSAGE ------------
             """)) 
示例10
def test_process_message_with_decode_data_false(self):
        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
                                       decode_data=False)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
        with support.captured_stdout() as s:
            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
        stdout = s.getvalue()
        self.assertEqual(stdout, textwrap.dedent("""\
             ---------- MESSAGE FOLLOWS ----------
             b'From: test'
             b'X-Peer: peer-address'
             b''
             b'h\\xc3\\xa9llo\\xff'
             ------------ END MESSAGE ------------
             """)) 
示例11
def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
                                       enable_SMTPUTF8=True)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
        with support.captured_stdout() as s:
            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
                           enable_SMTPUTF8=True)
        stdout = s.getvalue()
        self.assertEqual(stdout, textwrap.dedent("""\
             ---------- MESSAGE FOLLOWS ----------
             mail options: ['BODY=8BITMIME', 'SMTPUTF8']
             b'From: test'
             b'X-Peer: peer-address'
             b''
             b'h\\xc3\\xa9llo\\xff'
             ------------ END MESSAGE ------------
             """)) 
示例12
def test_with_decode_data_false(self):
        server = DummyServer((support.HOST, 0), ('b', 0), decode_data=False)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
        self.write_line(channel, b'EHLO example')
        for line in [
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
        ]:
            self.write_line(channel, line)
            self.assertEqual(channel.socket.last, self.error_response)
        self.write_line(
            channel,
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN')
        self.assertEqual(
            channel.socket.last,
            b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')
        self.write_line(
            channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime')
        self.assertEqual(channel.socket.last, b'250 OK\r\n') 
示例13
def setUp(self):
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
        # temporarily replace sys.stdout to capture DebuggingServer output
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output

        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
        # Capture SMTPChannel debug output
        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
        smtpd.DEBUGSTREAM = io.StringIO()
        # Pick a random unused port by passing 0 for the port number
        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
                                          decode_data=True)
        # Keep a note of what server host and port were assigned
        self.host, self.port = self.serv.socket.getsockname()[:2]
        serv_args = (self.serv, self.serv_evt, self.client_evt)
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()

        # wait until server thread has assigned a port number
        self.serv_evt.wait()
        self.serv_evt.clear() 
示例14
def test_process_message_with_decode_data_true(self):
        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
                                       decode_data=True)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
        with support.captured_stdout() as s:
            self.send_data(channel, b'From: test\n\nhello\n')
        stdout = s.getvalue()
        self.assertEqual(stdout, textwrap.dedent("""\
             ---------- MESSAGE FOLLOWS ----------
             From: test
             X-Peer: peer-address

             hello
             ------------ END MESSAGE ------------
             """)) 
示例15
def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
                                       enable_SMTPUTF8=True)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
        with support.captured_stdout() as s:
            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
                           enable_SMTPUTF8=True)
        stdout = s.getvalue()
        self.assertEqual(stdout, textwrap.dedent("""\
             ---------- MESSAGE FOLLOWS ----------
             mail options: ['BODY=8BITMIME', 'SMTPUTF8']
             b'From: test'
             b'X-Peer: peer-address'
             b''
             b'h\\xc3\\xa9llo\\xff'
             ------------ END MESSAGE ------------
             """)) 
示例16
def test_with_decode_data_false(self):
        server = DummyServer((support.HOST, 0), ('b', 0))
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr)
        self.write_line(channel, b'EHLO example')
        for line in [
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
        ]:
            self.write_line(channel, line)
            self.assertEqual(channel.socket.last, self.error_response)
        self.write_line(
            channel,
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN')
        self.assertEqual(
            channel.socket.last,
            b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')
        self.write_line(
            channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime')
        self.assertEqual(channel.socket.last, b'250 OK\r\n') 
示例17
def testSecondHELO(self):
        # check that a second HELO returns a message that it's a duplicate
        # (this behavior is specific to smtpd.SMTPChannel)
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
        smtp.helo()
        expected = (503, 'Duplicate HELO/EHLO')
        self.assertEqual(smtp.helo(), expected)
        smtp.quit() 
示例18
def __init__(self, extra_features, *args, **kw):
        self._extrafeatures = ''.join(
            [ "250-{0}\r\n".format(x) for x in extra_features ])
        smtpd.SMTPChannel.__init__(self, *args, **kw) 
示例19
def testSecondHELO(self):
        # check that a second HELO returns a message that it's a duplicate
        # (this behavior is specific to smtpd.SMTPChannel)
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.helo()
        expected = (503, 'Duplicate HELO/EHLO')
        self.assertEqual(smtp.helo(), expected)
        smtp.quit() 
示例20
def __init__(self, extra_features, *args, **kw):
        self._extrafeatures = ''.join(
            [ "250-{0}\r\n".format(x) for x in extra_features ])
        smtpd.SMTPChannel.__init__(self, *args, **kw) 
示例21
def testSecondHELO(self):
        # check that a second HELO returns a message that it's a duplicate
        # (this behavior is specific to smtpd.SMTPChannel)
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.helo()
        expected = (503, 'Duplicate HELO/EHLO')
        self.assertEqual(smtp.helo(), expected)
        smtp.quit() 
示例22
def __init__(self, extra_features, *args, **kw):
        self._extrafeatures = ''.join(
            [ "250-{0}\r\n".format(x) for x in extra_features ])
        smtpd.SMTPChannel.__init__(self, *args, **kw) 
示例23
def __init__(self, server, conn, addr):
        smtpd.SMTPChannel.__init__(self, server, conn, addr)
        self._server = server 
示例24
def smtp_LHLO(self, arg):
        smtpd.SMTPChannel.smtp_HELO(self, arg) 
示例25
def testSecondHELO(self):
        # check that a second HELO returns a message that it's a duplicate
        # (this behavior is specific to smtpd.SMTPChannel)
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.helo()
        expected = (503, b'Duplicate HELO/EHLO')
        self.assertEqual(smtp.helo(), expected)
        smtp.quit() 
示例26
def test_process_message_unimplemented(self):
        server = smtpd.SMTPServer((support.HOST, 0), ('b', 0),
                                  decode_data=True)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)

        def write_line(line):
            channel.socket.queue_recv(line)
            channel.handle_read()

        write_line(b'HELO example')
        write_line(b'MAIL From:eggs@example')
        write_line(b'RCPT To:spam@example')
        write_line(b'DATA')
        self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') 
示例27
def test_params_rejected(self):
        server = DummyServer((support.HOST, 0), ('b', 0), decode_data=False)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
        self.write_line(channel, b'EHLO example')
        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
        self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar')
        self.assertEqual(channel.socket.last, self.error_response) 
示例28
def test_nothing_accepted(self):
        server = DummyServer((support.HOST, 0), ('b', 0), decode_data=False)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
        self.write_line(channel, b'EHLO example')
        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
        self.write_line(channel, b'RCPT to: <foo@example.com>')
        self.assertEqual(channel.socket.last, b'250 OK\r\n') 
示例29
def test_with_decode_data_true(self):
        server = DummyServer((support.HOST, 0), ('b', 0), decode_data=True)
        conn, addr = server.accept()
        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
        self.write_line(channel, b'EHLO example')
        for line in [
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
            b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN',
            b'MAIL from: <foo@example.com> size=20 body=8bitmime',
        ]:
            self.write_line(channel, line)
            self.assertEqual(channel.socket.last, self.error_response)
        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
        self.assertEqual(channel.socket.last, b'250 OK\r\n') 
示例30
def setUp(self):
        smtpd.socket = asyncore.socket = mock_socket
        self.old_debugstream = smtpd.DEBUGSTREAM
        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
        self.server = DummyServer((support.HOST, 0), ('b', 0),
                                  decode_data=True)
        conn, addr = self.server.accept()
        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
                                         decode_data=True)