Java源码示例:io.vertx.proton.ProtonServer
示例1
/**
* Verifies that a client provided Proton server instance is used and started by the adapter instead of
* creating/starting a new one.
*
* @param ctx The test context to use for running asynchronous tests.
*/
@SuppressWarnings("unchecked")
@Test
public void testStartUsesClientProvidedAmqpServer(final VertxTestContext ctx) {
// GIVEN an adapter with a client provided Amqp Server
final ProtonServer server = getAmqpServer();
final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server);
// WHEN starting the adapter
final Promise<Void> startupTracker = Promise.promise();
adapter.start(startupTracker);
startupTracker.future().onComplete(ctx.succeeding(result -> {
ctx.verify(() -> {
// THEN the client provided server is started
verify(server).connectHandler(any(Handler.class));
verify(server).listen(any(Handler.class));
});
ctx.completeNow();
}));
}
示例2
/**
* Verifies that the adapter offers the ANONYMOUS-RELAY capability
* in its open frame when a device connects.
*/
@Test
public void testAdapterSupportsAnonymousRelay() {
// GIVEN an AMQP adapter with a configured server.
final ProtonServer server = getAmqpServer();
final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server);
// WHEN a device connects
final Device authenticatedDevice = new Device(TEST_TENANT_ID, TEST_DEVICE);
final Record record = new RecordImpl();
record.set(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class, authenticatedDevice);
final ProtonConnection deviceConnection = mock(ProtonConnection.class);
when(deviceConnection.attachments()).thenReturn(record);
when(deviceConnection.getRemoteContainer()).thenReturn("deviceContainer");
adapter.onConnectRequest(deviceConnection);
@SuppressWarnings("unchecked")
final ArgumentCaptor<Handler<AsyncResult<ProtonConnection>>> openHandler = ArgumentCaptor.forClass(Handler.class);
verify(deviceConnection).openHandler(openHandler.capture());
openHandler.getValue().handle(Future.succeededFuture(deviceConnection));
// THEN the adapter's open frame contains the ANONYMOUS-RELAY capability
verify(deviceConnection).setOfferedCapabilities(argThat(caps -> Arrays.stream(caps).anyMatch(cap -> Constants.CAP_ANONYMOUS_RELAY.equals(cap))));
}
示例3
/**
* Verifies that the AMQP Adapter rejects (closes) AMQP links that contains a target address.
*/
@Test
public void testAdapterAcceptsAnonymousRelayReceiverOnly() {
// GIVEN an AMQP adapter with a configured server.
final ProtonServer server = getAmqpServer();
final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server);
// WHEN the adapter receives a link that contains a target address
final ResourceIdentifier targetAddress = ResourceIdentifier.from(TelemetryConstants.TELEMETRY_ENDPOINT, TEST_TENANT_ID, TEST_DEVICE);
final ProtonReceiver link = getReceiver(ProtonQoS.AT_LEAST_ONCE, getTarget(targetAddress));
adapter.handleRemoteReceiverOpen(getConnection(null), link);
// THEN the adapter closes the link.
verify(link).close();
}
示例4
/**
* Creates a protocol adapter for a given AMQP Proton server.
*
* @param server The AMQP Proton server.
* @return The AMQP adapter instance.
*/
private VertxBasedAmqpProtocolAdapter getAdapter(final ProtonServer server) {
final VertxBasedAmqpProtocolAdapter adapter = new VertxBasedAmqpProtocolAdapter();
adapter.setConfig(config);
adapter.setInsecureAmqpServer(server);
adapter.setTenantClientFactory(tenantClientFactory);
adapter.setDownstreamSenderFactory(downstreamSenderFactory);
adapter.setRegistrationClientFactory(registrationClientFactory);
adapter.setCredentialsClientFactory(credentialsClientFactory);
adapter.setCommandConsumerFactory(commandConsumerFactory);
adapter.setDeviceConnectionClientFactory(deviceConnectionClientFactory);
adapter.setCommandTargetMapper(commandTargetMapper);
adapter.setMetrics(metrics);
adapter.setResourceLimitChecks(resourceLimitChecks);
adapter.setConnectionLimitManager(connectionLimitManager);
adapter.init(vertx, context);
return adapter;
}
示例5
/**
* Start the AMQP server
*
* @param startPromise
*/
private void bindAmqpServer(Promise<Void> startPromise) {
ProtonServerOptions options = this.createServerOptions();
this.server = ProtonServer.create(this.vertx, options)
.connectHandler(this::processConnection)
.listen(ar -> {
if (ar.succeeded()) {
log.info("AMQP-Kafka Bridge started and listening on port {}", ar.result().actualPort());
log.info("AMQP-Kafka Bridge bootstrap servers {}",
this.bridgeConfig.getKafkaConfig().getConfig()
.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
);
this.isReady = true;
startPromise.complete();
} else {
log.error("Error starting AMQP-Kafka Bridge", ar.cause());
startPromise.fail(ar.cause());
}
});
}
示例6
public static void main(String[] args) {
// Create the Vert.x instance
Vertx vertx = Vertx.vertx();
// Create the Vert.x AMQP client instance
ProtonServer server = ProtonServer.create(vertx).connectHandler((connection) -> {
helloProcessConnection(vertx, connection);
}).listen(5672, (res) -> {
if (res.succeeded()) {
System.out.println("Listening on: " + res.result().actualPort());
} else {
res.cause().printStackTrace();
}
});
// Just stop main() from exiting
try {
System.in.read();
} catch (Exception ignore) {
}
}
示例7
@Test(timeout = 20000)
public void testCustomAuthenticatorSuceedsAuthentication(TestContext context) {
Async connectedAsync = context.async();
Async authenticatedAsync = context.async();
ProtonServer.create(vertx).saslAuthenticatorFactory(new TestPlainAuthenticatorFactory()).connectHandler(protonConnection -> {
// Verify the expected auth detail was recorded in the connection attachments, just using a String here.
String authValue = protonConnection.attachments().get(AUTH_KEY, String.class);
context.assertEquals(AUTH_VALUE, authValue);
authenticatedAsync.complete();
}).listen(server -> ProtonClient.create(vertx).connect("localhost", server.result().actualPort(), GOOD_USER, PASSWD,
protonConnectionAsyncResult -> {
context.assertTrue(protonConnectionAsyncResult.succeeded());
protonConnectionAsyncResult.result().disconnect();
connectedAsync.complete();
}));
authenticatedAsync.awaitSuccess();
connectedAsync.awaitSuccess();
}
示例8
private ProtonServer createServer(final ProtonServer server, final ProtonServerOptions options) {
final ProtonServer createdServer = (server != null) ? server : ProtonServer.create(this.vertx, options);
if (getConfig().isAuthenticationRequired()) {
createdServer.saslAuthenticatorFactory(authenticatorFactory);
} else {
// use proton's default authenticator -> SASL ANONYMOUS
createdServer.saslAuthenticatorFactory(null);
}
return createdServer;
}
示例9
/**
* Sets the AMQP server for handling insecure AMQP connections.
*
* @param server The insecure server instance.
* @throws NullPointerException If the server is {@code null}.
*/
protected void setInsecureAmqpServer(final ProtonServer server) {
Objects.requireNonNull(server);
if (server.actualPort() > 0) {
throw new IllegalArgumentException("AMQP Server should not be running");
} else {
this.insecureServer = server;
}
}
示例10
/**
* Creates and sets up a ProtonServer mock.
*
* @return The configured server instance.
*/
@SuppressWarnings("unchecked")
private ProtonServer getAmqpServer() {
final ProtonServer server = mock(ProtonServer.class);
when(server.actualPort()).thenReturn(0, Constants.PORT_AMQP);
when(server.connectHandler(any(Handler.class))).thenReturn(server);
when(server.listen(any(Handler.class))).then(invocation -> {
final Handler<AsyncResult<ProtonServer>> handler = invocation.getArgument(0);
handler.handle(Future.succeededFuture(server));
return server;
});
return server;
}
示例11
@Override
public void start(Promise<Void> startPromise) {
server = ProtonServer.create(vertx);
server.connectHandler(this::connectHandler);
server.listen(0, result -> {
if (result.succeeded()) {
log.info("[{}]: Starting server on port {}", containerId, server.actualPort());
startPromise.complete();
} else {
log.error("[{}]: Error starting server", containerId, result.cause());
startPromise.fail(result.cause());
}
});
}
示例12
private Handler<AsyncResult<NetServer>> convertHandler(final Handler<AsyncResult<ProtonServer>> handler) {
return result -> {
if (result.succeeded()) {
handler.handle(Future.succeededFuture(ProtonServerImpl.this));
} else {
handler.handle(Future.failedFuture(result.cause()));
}
};
}
示例13
@Override
public ProtonServer saslAuthenticatorFactory(ProtonSaslAuthenticatorFactory authenticatorFactory) {
if (authenticatorFactory == null) {
// restore the default
this.authenticatorFactory = new DefaultAuthenticatorFactory();
} else {
this.authenticatorFactory = authenticatorFactory;
}
return this;
}
示例14
private ProtonServer createServer(Handler<ProtonConnection> serverConnHandler) throws InterruptedException,
ExecutionException {
ProtonServer server = ProtonServer.create(vertx);
server.connectHandler(serverConnHandler);
FutureHandler<ProtonServer, AsyncResult<ProtonServer>> handler = FutureHandler.asyncResult();
server.listen(0, handler);
handler.get();
return server;
}
示例15
private ProtonServer createServer(Handler<ProtonConnection> serverConnHandler) throws InterruptedException,
ExecutionException {
ProtonServer server = ProtonServer.create(vertx);
server.connectHandler(serverConnHandler);
FutureHandler<ProtonServer, AsyncResult<ProtonServer>> handler = FutureHandler.asyncResult();
server.listen(0, handler);
handler.get();
return server;
}
示例16
@Test(timeout = 20000)
public void testCustomAuthenticatorHasInitCalled(TestContext context) {
Async initCalledAsync = context.async();
ProtonServer.create(vertx).saslAuthenticatorFactory(new ProtonSaslAuthenticatorFactory() {
@Override
public ProtonSaslAuthenticator create() {
return new ProtonSaslAuthenticator() {
@Override
public void init(NetSocket socket, ProtonConnection protonConnection, Transport transport) {
initCalledAsync.complete();
}
@Override
public void process(Handler<Boolean> completionHandler) {
completionHandler.handle(false);
}
@Override
public boolean succeeded() {
return false;
}
};
}
}).connectHandler(protonConnection -> {
}).listen(server -> ProtonClient.create(vertx).connect("localhost", server.result().actualPort(),
protonConnectionAsyncResult -> {
}));
}
示例17
@Test(timeout = 20000)
public void testCustomAuthenticatorFailsAuthentication(TestContext context) {
Async connectedAsync = context.async();
ProtonServer.create(vertx).saslAuthenticatorFactory(new TestPlainAuthenticatorFactory()).connectHandler(protonConnection -> {
context.fail("Handler should not be called for connection that failed authentication");
}).listen(server -> ProtonClient.create(vertx).connect("localhost", server.result().actualPort(), BAD_USER, PASSWD,
protonConnectionAsyncResult -> {
context.assertFalse(protonConnectionAsyncResult.succeeded());
connectedAsync.complete();
}));
connectedAsync.awaitSuccess();
}
示例18
@Test(timeout = 20000)
public void testAuthenticatorCreatedPerConnection(TestContext context) {
Async connectedAsync = context.async();
Async connectedAsync2 = context.async();
AtomicInteger port = new AtomicInteger(-1);
final TestPlainAuthenticatorFactory authenticatorFactory = new TestPlainAuthenticatorFactory();
ProtonServer.create(vertx).saslAuthenticatorFactory(authenticatorFactory).connectHandler(protonConnection -> {
// Verify the expected auth detail was recorded in the connection attachments, just using a String here.
String authValue = protonConnection.attachments().get(AUTH_KEY, String.class);
context.assertEquals(AUTH_VALUE, authValue);
}).listen(server -> {
port.set(server.result().actualPort());
ProtonClient.create(vertx).connect("localhost", port.intValue(), GOOD_USER, PASSWD,
protonConnectionAsyncResult -> {
context.assertTrue(protonConnectionAsyncResult.succeeded());
protonConnectionAsyncResult.result().disconnect();
connectedAsync.complete();
});
});
connectedAsync.awaitSuccess();
context.assertEquals(1, authenticatorFactory.getCreateCount(), "unexpected authenticator count");
ProtonClient.create(vertx).connect("localhost", port.intValue(), GOOD_USER, PASSWD, protonConnectionAsyncResult -> {
context.assertTrue(protonConnectionAsyncResult.succeeded());
protonConnectionAsyncResult.result().disconnect();
connectedAsync2.complete();
});
connectedAsync2.awaitSuccess();
context.assertEquals(2, authenticatorFactory.getCreateCount(), "unexpected authenticator count");
}
示例19
private void doTestAsyncServerAuthenticatorTestImpl(TestContext context, boolean passAuthentication) {
Async connectAsync = context.async();
AtomicBoolean connectedServer = new AtomicBoolean();
final long delay = 750;
TestAsyncAuthenticator testAsyncAuthenticator = new TestAsyncAuthenticator(delay, passAuthentication);
TestAsyncAuthenticatorFactory authenticatorFactory = new TestAsyncAuthenticatorFactory(testAsyncAuthenticator);
ProtonServer.create(vertx).saslAuthenticatorFactory(authenticatorFactory).connectHandler(protonConnection -> {
connectedServer.set(true);
}).listen(server -> {
final long startTime = System.currentTimeMillis();
ProtonClient.create(vertx).connect("localhost", server.result().actualPort(), GOOD_USER, PASSWD, conResult -> {
// Verify the process took expected time from auth delay.
long actual = System.currentTimeMillis() - startTime;
context.assertTrue(actual >= delay, "Connect completed before expected time delay elapsed! " + actual);
if (passAuthentication) {
context.assertTrue(conResult.succeeded(), "Expected connect to succeed");
conResult.result().disconnect();
} else {
context.assertFalse(conResult.succeeded(), "Expected connect to fail");
}
connectAsync.complete();
});
});
connectAsync.awaitSuccess();
if(passAuthentication) {
context.assertTrue(connectedServer.get(), "Server handler should have been called");
} else {
context.assertFalse(connectedServer.get(), "Server handler should not have been called");
}
context.assertEquals(1, authenticatorFactory.getCreateCount(), "unexpected authenticator creation count");
}
示例20
/**
* Verifies that the adapter closes a corresponding command consumer if
* the connection to a device fails unexpectedly.
*
* @param ctx The vert.x test context.
* @throws InterruptedException if the test execution gets interrupted.
*/
@SuppressWarnings("unchecked")
private void testAdapterClosesCommandConsumer(
final VertxTestContext ctx,
final Handler<ProtonConnection> connectionLossTrigger) throws InterruptedException {
// GIVEN an AMQP adapter
final Promise<ProtonDelivery> outcome = Promise.promise();
outcome.complete();
final DownstreamSender downstreamEventSender = givenAnEventSender(outcome);
final ProtonServer server = getAmqpServer();
final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server);
final Promise<Void> startupTracker = Promise.promise();
startupTracker.future().onComplete(ctx.completing());
adapter.start(startupTracker);
assertThat(ctx.awaitCompletion(2, TimeUnit.SECONDS)).isTrue();
// to which a device is connected
final Device authenticatedDevice = new Device(TEST_TENANT_ID, TEST_DEVICE);
final Record record = new RecordImpl();
record.set(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class, authenticatedDevice);
final ProtonConnection deviceConnection = mock(ProtonConnection.class);
when(deviceConnection.attachments()).thenReturn(record);
final ArgumentCaptor<Handler<ProtonConnection>> connectHandler = ArgumentCaptor.forClass(Handler.class);
verify(server).connectHandler(connectHandler.capture());
connectHandler.getValue().handle(deviceConnection);
// that wants to receive commands
final ProtocolAdapterCommandConsumer commandConsumer = mock(ProtocolAdapterCommandConsumer.class);
when(commandConsumer.close(any())).thenReturn(Future.succeededFuture());
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));
final String sourceAddress = getCommandEndpoint();
final ProtonSender sender = getSender(sourceAddress);
adapter.handleRemoteSenderOpenForCommands(deviceConnection, sender);
// WHEN the connection to the device is lost
connectionLossTrigger.handle(deviceConnection);
// THEN the adapter closes the command consumer
verify(commandConsumer).close(any());
// and sends an empty event with TTD = 0 downstream
final ArgumentCaptor<Message> messageCaptor = ArgumentCaptor.forClass(Message.class);
verify(downstreamEventSender, times(2)).sendAndWaitForOutcome(messageCaptor.capture(), any());
assertThat(messageCaptor.getValue().getContentType()).isEqualTo(EventConstants.CONTENT_TYPE_EMPTY_NOTIFICATION);
assertThat(MessageHelper.getTimeUntilDisconnect(messageCaptor.getValue())).isEqualTo(0);
}
示例21
private ProtonServer createProtonServer(final ProtonServerOptions options) {
return ProtonServer.create(vertx, options)
.saslAuthenticatorFactory(saslAuthenticatorFactory);
}
示例22
@BeforeEach
public void setup() throws InterruptedException {
vertx = Vertx.vertx();
server = ProtonServer.create(vertx);
CountDownLatch latch = new CountDownLatch(1);
CompletableFuture<ProtonSender> futureSender = new CompletableFuture<>();
server.connectHandler(conn -> {
conn.closeHandler(c -> {
conn.close();
conn.disconnect();
});
conn.disconnectHandler(c -> {
conn.disconnect();
}).open();
conn.sessionOpenHandler(ProtonSession::open);
conn.receiverOpenHandler(receiver -> {
System.out.println("Receiver open");
receiver.setTarget(receiver.getRemoteTarget());
receiver.handler((delivery, message) -> {
Message response = Message.Factory.create();
response.setAddress(message.getAddress());
response.setBody(new AmqpValue(true));
response.setCorrelationId(message.getCorrelationId());
response.setReplyTo(message.getReplyTo());
try {
futureSender.get().send(response);
} catch (Exception e) {
e.printStackTrace();
}
});
receiver.open();
});
conn.senderOpenHandler(sender -> {
sender.setSource(sender.getRemoteSource());
sender.open();
futureSender.complete(sender);
});
}).listen(12347, res -> {
latch.countDown();
});
latch.await();
}
示例23
@BeforeEach
public void setup() throws Exception {
vertx = Vertx.vertx();
server = ProtonServer.create(vertx);
inbox = new LinkedBlockingDeque<>();
outbox = new LinkedBlockingDeque<>();
CountDownLatch latch = new CountDownLatch(1);
server.connectHandler(conn -> {
conn.closeHandler(c -> {
conn.close();
conn.disconnect();
});
conn.disconnectHandler(c -> {
conn.disconnect();
}).open();
conn.sessionOpenHandler(ProtonSession::open);
conn.receiverOpenHandler(receiver -> {
log.debug("Receiver open");
receiver.setTarget(receiver.getRemoteTarget());
receiver.handler((delivery, message) -> {
inbox.add(message);
});
receiver.open();
});
conn.senderOpenHandler(sender -> {
vertx.setPeriodic(100, id -> {
try {
Message m = outbox.poll(0, TimeUnit.SECONDS);
if (m != null) {
sender.send(m);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Try again later
}
});
sender.open();
});
}).listen(0, res -> latch.countDown());
latch.await();
actualPort = server.actualPort();
log.debug("Using port {}", actualPort);
}
示例24
@Override
public ProtonServerImpl listen(int port, String host, Handler<AsyncResult<ProtonServer>> handler) {
server.listen(port, host, convertHandler(handler));
return this;
}
示例25
@Override
public ProtonServerImpl listen(Handler<AsyncResult<ProtonServer>> handler) {
server.listen(convertHandler(handler));
return this;
}
示例26
@Override
public ProtonServerImpl listen(int i, Handler<AsyncResult<ProtonServer>> handler) {
server.listen(i, convertHandler(handler));
return this;
}
示例27
@Test(timeout = 20000)
public void testCreateUseAndCancelSubscriber(TestContext context) throws Exception {
server.close();
final Async serverLinkOpenAsync = context.async();
final Async serverReceivedMessageAsync = context.async();
final Async serverLinkCloseAsync = context.async();
ProtonServer protonServer = null;
try {
protonServer = createServer((serverConnection) -> {
serverConnection.openHandler(result -> {
serverConnection.open();
});
serverConnection.sessionOpenHandler(session -> session.open());
serverConnection.receiverOpenHandler(serverReceiver -> {
LOG.trace("Server receiver opened");
serverReceiver.handler((delivery, msg) -> {
// We got the message that was sent, complete the test
if (LOG.isTraceEnabled()) {
LOG.trace("Server got msg: " + getMessageBody(context, msg));
}
validateMessage(context, 1, "1", msg);
serverReceivedMessageAsync.complete();
});
serverReceiver.closeHandler(x -> {
serverReceiver.close();
serverLinkCloseAsync.complete();
});
// Set the local terminus details [naively]
serverReceiver.setTarget(serverReceiver.getRemoteTarget().copy());
serverReceiver.open();
serverLinkOpenAsync.complete();
});
});
// ===== Client Handling =====
ProtonClient client = ProtonClient.create(vertx);
client.connect("localhost", protonServer.actualPort(), res -> {
context.assertTrue(res.succeeded());
ProtonConnection connection = res.result();
connection.open();
ProtonSubscriber<Tracker> subscriber = ProtonStreams.createTrackerProducer(connection,"myAddress");
Tracker tracker = Tracker.create(message("1"), t -> {
context.assertTrue(t.isAccepted(), "msg should be accepted");
context.assertTrue(t.isRemotelySettled(), "msg should be remotely settled");
});
Publisher<Tracker> producer = Flowable.just(tracker);
producer.subscribe(subscriber);
});
serverLinkOpenAsync.awaitSuccess();
serverReceivedMessageAsync.awaitSuccess();
serverLinkCloseAsync.awaitSuccess();
} finally {
if (protonServer != null) {
protonServer.close();
}
}
}
示例28
@Test(timeout = 20000)
public void testSubCancelledOnLinkClose(TestContext context) throws Exception {
server.close();
final Async serverLinkOpenAsync = context.async();
final Async clientLinkCloseAsync = context.async();
ProtonServer protonServer = null;
try {
protonServer = createServer((serverConnection) -> {
serverConnection.openHandler(result -> {
serverConnection.open();
});
serverConnection.sessionOpenHandler(session -> session.open());
serverConnection.receiverOpenHandler(serverReceiver -> {
LOG.trace("Server receiver opened");
// Set the local terminus details [naively]
serverReceiver.setTarget(serverReceiver.getRemoteTarget().copy());
serverReceiver.open();
serverLinkOpenAsync.complete();
serverReceiver.close();
});
});
// ===== Client Handling =====
ProtonClient client = ProtonClient.create(vertx);
client.connect("localhost", protonServer.actualPort(), res -> {
context.assertTrue(res.succeeded());
ProtonConnection connection = res.result();
connection.open();
ProtonSubscriber<Tracker> subscriber = ProtonStreams.createTrackerProducer(connection,"myAddress");
// Create a Publisher that doesn't produce elements, but indicates when its subscription is cancelled.
Publisher<Tracker> producer = Flowable.<Tracker>never()
.doOnCancel(() -> {
LOG.trace("Cancelled!");
clientLinkCloseAsync.complete();
});
producer.subscribe(subscriber);
});
serverLinkOpenAsync.awaitSuccess();
clientLinkCloseAsync.awaitSuccess();
} finally {
if (protonServer != null) {
protonServer.close();
}
}
}
示例29
@Test(timeout = 20000)
public void testSubCancelledOnConnectionEnd(TestContext context) throws Exception {
server.close();
final Async serverLinkOpenAsync = context.async();
final Async subCancelled = context.async();
ProtonServer protonServer = null;
try {
protonServer = createServer((serverConnection) -> {
serverConnection.openHandler(result -> {
serverConnection.open();
});
serverConnection.sessionOpenHandler(session -> session.open());
serverConnection.receiverOpenHandler(serverReceiver -> {
LOG.trace("Server receiver opened");
// Set the local terminus details [naively]
serverReceiver.setTarget(serverReceiver.getRemoteTarget().copy());
serverReceiver.open();
serverLinkOpenAsync.complete();
});
});
// ===== Client Handling =====
ProtonClient client = ProtonClient.create(vertx);
client.connect("localhost", protonServer.actualPort(), res -> {
context.assertTrue(res.succeeded());
ProtonConnection connection = res.result();
connection.open();
ProtonSubscriber<Tracker> subscriber = ProtonStreams.createTrackerProducer(connection,"myAddress");
// Create a Publisher that doesn't produce elements, but indicates when its subscription is cancelled.
Publisher<Tracker> producer = Flowable.<Tracker>never()
.doOnCancel(() -> {
LOG.trace("Cancelled!");
subCancelled.complete();
});
producer.subscribe(subscriber);
});
serverLinkOpenAsync.awaitSuccess();
context.assertFalse(subCancelled.isCompleted());
protonServer.close();
protonServer = null;
subCancelled.awaitSuccess();
} finally {
if (protonServer != null) {
protonServer.close();
}
}
}
示例30
@Test(timeout = 20000)
public void testConfigureProducerLinkName(TestContext context) throws Exception {
server.close();
final Async serverLinkOpenAsync = context.async();
final Async serverReceivedMessageAsync = context.async();
final Async serverLinkCloseAsync = context.async();
final String linkName = "testConfigureProducerLinkName";
ProtonServer protonServer = null;
try {
protonServer = createServer((serverConnection) -> {
serverConnection.openHandler(result -> {
serverConnection.open();
});
serverConnection.sessionOpenHandler(session -> session.open());
serverConnection.receiverOpenHandler(serverReceiver -> {
LOG.trace("Server receiver opened");
// Verify the link details used were as expected
context.assertEquals(linkName, serverReceiver.getName(), "unexpected link name");
serverReceiver.handler((delivery, msg) -> {
// We got the message that was sent, complete the test
if (LOG.isTraceEnabled()) {
LOG.trace("Server got msg: " + getMessageBody(context, msg));
}
validateMessage(context, 1, "1", msg);
serverReceivedMessageAsync.complete();
});
serverReceiver.closeHandler(x -> {
serverReceiver.close();
serverLinkCloseAsync.complete();
});
// Set the local terminus details [naively]
serverReceiver.setTarget(serverReceiver.getRemoteTarget().copy());
serverReceiver.open();
serverLinkOpenAsync.complete();
});
});
// ===== Client Handling =====
ProtonClient client = ProtonClient.create(vertx);
client.connect("localhost", protonServer.actualPort(), res -> {
context.assertTrue(res.succeeded());
ProtonConnection connection = res.result();
connection.open();
// Create subscriber with given link name
ProtonSubscriberOptions options = new ProtonSubscriberOptions().setLinkName(linkName);
ProtonSubscriber<Tracker> subscriber = ProtonStreams.createTrackerProducer(connection,"myAddress", options);
Tracker envelope = Tracker.create(message("1"));
Publisher<Tracker> producer = Flowable.just(envelope);
producer.subscribe(subscriber);
});
serverLinkOpenAsync.awaitSuccess();
serverReceivedMessageAsync.awaitSuccess();
serverLinkCloseAsync.awaitSuccess();
} finally {
if (protonServer != null) {
protonServer.close();
}
}
}