Python源码示例:twisted.internet.reactor.connectUNIXDatagram()
示例1
def buildProtocol(self, address):
ws = ProducerConsumerWsProtocol()
ws.factory = self
control = HostapdControlProtocol(emit_json=False)
reactor.connectUNIXDatagram(self.control_interface, control,
bindAddress=control.bindAddress())
ws.consumer = control
control.consumer = ws
ws.registerProducer(control, True)
control.registerProducer(ws, True)
return ws
示例2
def execute(address, command):
deferred = Deferred()
consumer = SingleItemConsumer(deferred)
protocol = HostapdControlProtocol(consumer, command=command)
consumer.registerProducer(protocol, True)
reactor.connectUNIXDatagram(address, protocol, bindAddress=protocol.bindAddress())
return deferred
示例3
def test_exchange(self):
"""
Test that a datagram can be sent to and received by a server and vice
versa.
"""
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
self.addCleanup(s.stopListening)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
self.addCleanup(c.stopListening)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write(b"hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def _cbTestExchange(ignored):
self.assertEqual(b"hi", sp.gotwhat)
self.assertEqual(clientaddr, sp.gotfrom)
self.assertEqual(b"hi back", cp.gotback)
d.addCallback(write)
d.addCallback(_cbTestExchange)
return d
示例4
def test_exchange(self):
"""
Test that a datagram can be sent to and received by a server and vice
versa.
"""
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
self.addCleanup(s.stopListening)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
self.addCleanup(c.stopListening)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write(b"hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def _cbTestExchange(ignored):
self.assertEqual(b"hi", sp.gotwhat)
self.assertEqual(clientaddr, sp.gotfrom)
self.assertEqual(b"hi back", cp.gotback)
d.addCallback(write)
d.addCallback(_cbTestExchange)
return d
示例5
def openClientMode(self, iface=''):
try:
self._lport = reactor.connectUNIXDatagram(iface, self)
except Exception:
raise error.CarrierError(sys.exc_info()[1])
return self
示例6
def test_exchange(self):
"""
Test that a datagram can be sent to and received by a server and vice
versa.
"""
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
self.addCleanup(s.stopListening)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
self.addCleanup(c.stopListening)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write("hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def _cbTestExchange(ignored):
self.failUnlessEqual("hi", sp.gotwhat)
self.failUnlessEqual(clientaddr, sp.gotfrom)
self.failUnlessEqual("hi back", cp.gotback)
d.addCallback(write)
d.addCallback(_cbTestExchange)
return d
示例7
def testExchange(self):
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write("hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def cleanup(ignored):
d1 = defer.maybeDeferred(s.stopListening)
d1.addCallback(lambda x : os.unlink(clientaddr))
d2 = defer.maybeDeferred(c.stopListening)
d2.addCallback(lambda x : os.unlink(serveraddr))
return defer.gatherResults([d1, d2])
def _cbTestExchange(ignored):
self.failUnlessEqual("hi", sp.gotwhat)
self.failUnlessEqual(clientaddr, sp.gotfrom)
self.failUnlessEqual("hi back", cp.gotback)
d.addCallback(write)
d.addCallback(cleanup)
d.addCallback(_cbTestExchange)
return d