Java源码示例:io.vertx.proton.ProtonServer

示例1
/**
 * Verifies that a client provided Proton server instance is used and started by the adapter instead of
 * creating/starting a new one.
 *
 * @param ctx The test context to use for running asynchronous tests.
 */
@SuppressWarnings("unchecked")
@Test
public void testStartUsesClientProvidedAmqpServer(final VertxTestContext ctx) {
    // GIVEN an adapter with a client provided Amqp Server
    final ProtonServer server = getAmqpServer();
    final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server);

    // WHEN starting the adapter
    final Promise<Void> startupTracker = Promise.promise();
    adapter.start(startupTracker);

    startupTracker.future().onComplete(ctx.succeeding(result -> {
        ctx.verify(() -> {
            // THEN the client provided server is started
            verify(server).connectHandler(any(Handler.class));
            verify(server).listen(any(Handler.class));
        });
        ctx.completeNow();
    }));
}
 
示例2
/**
 * Verifies that the adapter offers the ANONYMOUS-RELAY capability
 * in its open frame when a device connects.
 */
@Test
public void testAdapterSupportsAnonymousRelay() {

    // GIVEN an AMQP adapter with a configured server.
    final ProtonServer server = getAmqpServer();
    final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server);

    // WHEN a device connects
    final Device authenticatedDevice = new Device(TEST_TENANT_ID, TEST_DEVICE);
    final Record record = new RecordImpl();
    record.set(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class, authenticatedDevice);
    final ProtonConnection deviceConnection = mock(ProtonConnection.class);
    when(deviceConnection.attachments()).thenReturn(record);
    when(deviceConnection.getRemoteContainer()).thenReturn("deviceContainer");

    adapter.onConnectRequest(deviceConnection);

    @SuppressWarnings("unchecked")
    final ArgumentCaptor<Handler<AsyncResult<ProtonConnection>>> openHandler = ArgumentCaptor.forClass(Handler.class);
    verify(deviceConnection).openHandler(openHandler.capture());
    openHandler.getValue().handle(Future.succeededFuture(deviceConnection));

    // THEN the adapter's open frame contains the ANONYMOUS-RELAY capability
    verify(deviceConnection).setOfferedCapabilities(argThat(caps -> Arrays.stream(caps).anyMatch(cap -> Constants.CAP_ANONYMOUS_RELAY.equals(cap))));
}
 
示例3
/**
 * Verifies that the AMQP Adapter rejects (closes) AMQP links that contains a target address.
 */
@Test
public void testAdapterAcceptsAnonymousRelayReceiverOnly() {
    // GIVEN an AMQP adapter with a configured server.
    final ProtonServer server = getAmqpServer();
    final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server);

    // WHEN the adapter receives a link that contains a target address
    final ResourceIdentifier targetAddress = ResourceIdentifier.from(TelemetryConstants.TELEMETRY_ENDPOINT, TEST_TENANT_ID, TEST_DEVICE);
    final ProtonReceiver link = getReceiver(ProtonQoS.AT_LEAST_ONCE, getTarget(targetAddress));

    adapter.handleRemoteReceiverOpen(getConnection(null), link);

    // THEN the adapter closes the link.
    verify(link).close();
}
 
示例4
/**
 * Creates a protocol adapter for a given AMQP Proton server.
 *
 * @param server The AMQP Proton server.
 * @return The AMQP adapter instance.
 */
private VertxBasedAmqpProtocolAdapter getAdapter(final ProtonServer server) {

    final VertxBasedAmqpProtocolAdapter adapter = new VertxBasedAmqpProtocolAdapter();

    adapter.setConfig(config);
    adapter.setInsecureAmqpServer(server);
    adapter.setTenantClientFactory(tenantClientFactory);
    adapter.setDownstreamSenderFactory(downstreamSenderFactory);
    adapter.setRegistrationClientFactory(registrationClientFactory);
    adapter.setCredentialsClientFactory(credentialsClientFactory);
    adapter.setCommandConsumerFactory(commandConsumerFactory);
    adapter.setDeviceConnectionClientFactory(deviceConnectionClientFactory);
    adapter.setCommandTargetMapper(commandTargetMapper);
    adapter.setMetrics(metrics);
    adapter.setResourceLimitChecks(resourceLimitChecks);
    adapter.setConnectionLimitManager(connectionLimitManager);
    adapter.init(vertx, context);
    return adapter;
}
 
示例5
/**
 * Start the AMQP server
 *
 * @param startPromise
 */
private void bindAmqpServer(Promise<Void> startPromise) {

    ProtonServerOptions options = this.createServerOptions();

    this.server = ProtonServer.create(this.vertx, options)
            .connectHandler(this::processConnection)
            .listen(ar -> {

                if (ar.succeeded()) {

                    log.info("AMQP-Kafka Bridge started and listening on port {}", ar.result().actualPort());
                    log.info("AMQP-Kafka Bridge bootstrap servers {}",
                            this.bridgeConfig.getKafkaConfig().getConfig()
                                    .get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
                    );

                    this.isReady = true;

                    startPromise.complete();
                } else {
                    log.error("Error starting AMQP-Kafka Bridge", ar.cause());
                    startPromise.fail(ar.cause());
                }
            });
}
 
示例6
public static void main(String[] args) {

    // Create the Vert.x instance
    Vertx vertx = Vertx.vertx();

    // Create the Vert.x AMQP client instance
    ProtonServer server = ProtonServer.create(vertx).connectHandler((connection) -> {
      helloProcessConnection(vertx, connection);
    }).listen(5672, (res) -> {
      if (res.succeeded()) {
        System.out.println("Listening on: " + res.result().actualPort());
      } else {
        res.cause().printStackTrace();
      }
    });

    // Just stop main() from exiting
    try {
      System.in.read();
    } catch (Exception ignore) {
    }
  }
 
示例7
@Test(timeout = 20000)
public void testCustomAuthenticatorSuceedsAuthentication(TestContext context) {
  Async connectedAsync = context.async();
  Async authenticatedAsync = context.async();

  ProtonServer.create(vertx).saslAuthenticatorFactory(new TestPlainAuthenticatorFactory()).connectHandler(protonConnection -> {
    // Verify the expected auth detail was recorded in the connection attachments, just using a String here.
    String authValue = protonConnection.attachments().get(AUTH_KEY, String.class);
    context.assertEquals(AUTH_VALUE, authValue);
    authenticatedAsync.complete();
  }).listen(server -> ProtonClient.create(vertx).connect("localhost", server.result().actualPort(), GOOD_USER, PASSWD,
      protonConnectionAsyncResult -> {
        context.assertTrue(protonConnectionAsyncResult.succeeded());
        protonConnectionAsyncResult.result().disconnect();
        connectedAsync.complete();
      }));

  authenticatedAsync.awaitSuccess();
  connectedAsync.awaitSuccess();
}
 
示例8
private ProtonServer createServer(final ProtonServer server, final ProtonServerOptions options) {
    final ProtonServer createdServer = (server != null) ? server : ProtonServer.create(this.vertx, options);
    if (getConfig().isAuthenticationRequired()) {
        createdServer.saslAuthenticatorFactory(authenticatorFactory);
    } else {
        // use proton's default authenticator -> SASL ANONYMOUS
        createdServer.saslAuthenticatorFactory(null);
    }
    return createdServer;
}
 
示例9
/**
 * Sets the AMQP server for handling insecure AMQP connections.
 *
 * @param server The insecure server instance.
 * @throws NullPointerException If the server is {@code null}.
 */
protected void setInsecureAmqpServer(final ProtonServer server) {
    Objects.requireNonNull(server);
    if (server.actualPort() > 0) {
        throw new IllegalArgumentException("AMQP Server should not be running");
    } else {
        this.insecureServer = server;
    }
}
 
示例10
/**
 * Creates and sets up a ProtonServer mock.
 *
 * @return The configured server instance.
 */
@SuppressWarnings("unchecked")
private ProtonServer getAmqpServer() {

    final ProtonServer server = mock(ProtonServer.class);
    when(server.actualPort()).thenReturn(0, Constants.PORT_AMQP);
    when(server.connectHandler(any(Handler.class))).thenReturn(server);
    when(server.listen(any(Handler.class))).then(invocation -> {
        final Handler<AsyncResult<ProtonServer>> handler = invocation.getArgument(0);
        handler.handle(Future.succeededFuture(server));
        return server;
    });
    return server;
}
 
示例11
@Override
public void start(Promise<Void> startPromise) {
    server = ProtonServer.create(vertx);
    server.connectHandler(this::connectHandler);
    server.listen(0, result -> {
        if (result.succeeded()) {
            log.info("[{}]: Starting server on port {}", containerId, server.actualPort());
            startPromise.complete();
        } else {
            log.error("[{}]: Error starting server", containerId, result.cause());
            startPromise.fail(result.cause());
        }
    });
}
 
示例12
private Handler<AsyncResult<NetServer>> convertHandler(final Handler<AsyncResult<ProtonServer>> handler) {
  return result -> {
    if (result.succeeded()) {
      handler.handle(Future.succeededFuture(ProtonServerImpl.this));
    } else {
      handler.handle(Future.failedFuture(result.cause()));
    }
  };
}
 
示例13
@Override
public ProtonServer saslAuthenticatorFactory(ProtonSaslAuthenticatorFactory authenticatorFactory) {
  if (authenticatorFactory == null) {
    // restore the default
    this.authenticatorFactory = new DefaultAuthenticatorFactory();
  } else {
    this.authenticatorFactory = authenticatorFactory;
  }
  return this;
}
 
示例14
private ProtonServer createServer(Handler<ProtonConnection> serverConnHandler) throws InterruptedException,
                                                                               ExecutionException {
  ProtonServer server = ProtonServer.create(vertx);

  server.connectHandler(serverConnHandler);

  FutureHandler<ProtonServer, AsyncResult<ProtonServer>> handler = FutureHandler.asyncResult();
  server.listen(0, handler);
  handler.get();

  return server;
}
 
示例15
private ProtonServer createServer(Handler<ProtonConnection> serverConnHandler) throws InterruptedException,
                                                                               ExecutionException {
  ProtonServer server = ProtonServer.create(vertx);

  server.connectHandler(serverConnHandler);

  FutureHandler<ProtonServer, AsyncResult<ProtonServer>> handler = FutureHandler.asyncResult();
  server.listen(0, handler);
  handler.get();

  return server;
}
 
示例16
@Test(timeout = 20000)
public void testCustomAuthenticatorHasInitCalled(TestContext context) {
  Async initCalledAsync = context.async();

  ProtonServer.create(vertx).saslAuthenticatorFactory(new ProtonSaslAuthenticatorFactory() {
    @Override
    public ProtonSaslAuthenticator create() {
      return new ProtonSaslAuthenticator() {
        @Override
        public void init(NetSocket socket, ProtonConnection protonConnection, Transport transport) {
          initCalledAsync.complete();
        }

        @Override
        public void process(Handler<Boolean> completionHandler) {
          completionHandler.handle(false);
        }

        @Override
        public boolean succeeded() {
          return false;
        }
      };
    }
  }).connectHandler(protonConnection -> {
  }).listen(server -> ProtonClient.create(vertx).connect("localhost", server.result().actualPort(),
      protonConnectionAsyncResult -> {
      }));
}
 
示例17
@Test(timeout = 20000)
public void testCustomAuthenticatorFailsAuthentication(TestContext context) {
  Async connectedAsync = context.async();

  ProtonServer.create(vertx).saslAuthenticatorFactory(new TestPlainAuthenticatorFactory()).connectHandler(protonConnection -> {
    context.fail("Handler should not be called for connection that failed authentication");
  }).listen(server -> ProtonClient.create(vertx).connect("localhost", server.result().actualPort(), BAD_USER, PASSWD,
      protonConnectionAsyncResult -> {
        context.assertFalse(protonConnectionAsyncResult.succeeded());
        connectedAsync.complete();
      }));

  connectedAsync.awaitSuccess();
}
 
示例18
@Test(timeout = 20000)
public void testAuthenticatorCreatedPerConnection(TestContext context) {
  Async connectedAsync = context.async();
  Async connectedAsync2 = context.async();
  AtomicInteger port = new AtomicInteger(-1);

  final TestPlainAuthenticatorFactory authenticatorFactory = new TestPlainAuthenticatorFactory();

  ProtonServer.create(vertx).saslAuthenticatorFactory(authenticatorFactory).connectHandler(protonConnection -> {
    // Verify the expected auth detail was recorded in the connection attachments, just using a String here.
    String authValue = protonConnection.attachments().get(AUTH_KEY, String.class);
    context.assertEquals(AUTH_VALUE, authValue);
  }).listen(server -> {
    port.set(server.result().actualPort());
    ProtonClient.create(vertx).connect("localhost", port.intValue(), GOOD_USER, PASSWD,
        protonConnectionAsyncResult -> {
          context.assertTrue(protonConnectionAsyncResult.succeeded());
          protonConnectionAsyncResult.result().disconnect();
          connectedAsync.complete();
        });
  });

  connectedAsync.awaitSuccess();

  context.assertEquals(1, authenticatorFactory.getCreateCount(), "unexpected authenticator count");

  ProtonClient.create(vertx).connect("localhost", port.intValue(), GOOD_USER, PASSWD, protonConnectionAsyncResult -> {
    context.assertTrue(protonConnectionAsyncResult.succeeded());
    protonConnectionAsyncResult.result().disconnect();
    connectedAsync2.complete();
  });

  connectedAsync2.awaitSuccess();

  context.assertEquals(2, authenticatorFactory.getCreateCount(), "unexpected authenticator count");
}
 
示例19
private void doTestAsyncServerAuthenticatorTestImpl(TestContext context, boolean passAuthentication) {
  Async connectAsync = context.async();
  AtomicBoolean connectedServer = new AtomicBoolean();

  final long delay = 750;
  TestAsyncAuthenticator testAsyncAuthenticator = new TestAsyncAuthenticator(delay, passAuthentication);
  TestAsyncAuthenticatorFactory authenticatorFactory = new TestAsyncAuthenticatorFactory(testAsyncAuthenticator);

  ProtonServer.create(vertx).saslAuthenticatorFactory(authenticatorFactory).connectHandler(protonConnection -> {
    connectedServer.set(true);
  }).listen(server -> {
    final long startTime = System.currentTimeMillis();
    ProtonClient.create(vertx).connect("localhost", server.result().actualPort(), GOOD_USER, PASSWD, conResult -> {
      // Verify the process took expected time from auth delay.
      long actual = System.currentTimeMillis() - startTime;
      context.assertTrue(actual >= delay, "Connect completed before expected time delay elapsed! " + actual);

      if (passAuthentication) {
        context.assertTrue(conResult.succeeded(), "Expected connect to succeed");
        conResult.result().disconnect();
      } else {
        context.assertFalse(conResult.succeeded(), "Expected connect to fail");
      }

      connectAsync.complete();
    });
  });

  connectAsync.awaitSuccess();

  if(passAuthentication) {
    context.assertTrue(connectedServer.get(), "Server handler should have been called");
  } else {
    context.assertFalse(connectedServer.get(), "Server handler should not have been called");
  }

  context.assertEquals(1, authenticatorFactory.getCreateCount(), "unexpected authenticator creation count");
}
 
示例20
/**
 * Verifies that the adapter closes a corresponding command consumer if
 * the connection to a device fails unexpectedly.
 *
 * @param ctx The vert.x test context.
 * @throws InterruptedException if the test execution gets interrupted.
 */
@SuppressWarnings("unchecked")
private void testAdapterClosesCommandConsumer(
        final VertxTestContext ctx,
        final Handler<ProtonConnection> connectionLossTrigger) throws InterruptedException {

    // GIVEN an AMQP adapter
    final Promise<ProtonDelivery> outcome = Promise.promise();
    outcome.complete();
    final DownstreamSender downstreamEventSender = givenAnEventSender(outcome);
    final ProtonServer server = getAmqpServer();
    final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server);

    final Promise<Void> startupTracker = Promise.promise();
    startupTracker.future().onComplete(ctx.completing());
    adapter.start(startupTracker);
    assertThat(ctx.awaitCompletion(2, TimeUnit.SECONDS)).isTrue();

    // to which a device is connected
    final Device authenticatedDevice = new Device(TEST_TENANT_ID, TEST_DEVICE);
    final Record record = new RecordImpl();
    record.set(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class, authenticatedDevice);
    final ProtonConnection deviceConnection = mock(ProtonConnection.class);
    when(deviceConnection.attachments()).thenReturn(record);
    final ArgumentCaptor<Handler<ProtonConnection>> connectHandler = ArgumentCaptor.forClass(Handler.class);
    verify(server).connectHandler(connectHandler.capture());
    connectHandler.getValue().handle(deviceConnection);

    // that wants to receive commands
    final ProtocolAdapterCommandConsumer commandConsumer = mock(ProtocolAdapterCommandConsumer.class);
    when(commandConsumer.close(any())).thenReturn(Future.succeededFuture());
    when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any(), any()))
        .thenReturn(Future.succeededFuture(commandConsumer));
    final String sourceAddress = getCommandEndpoint();
    final ProtonSender sender = getSender(sourceAddress);

    adapter.handleRemoteSenderOpenForCommands(deviceConnection, sender);

    // WHEN the connection to the device is lost
    connectionLossTrigger.handle(deviceConnection);

    // THEN the adapter closes the command consumer
    verify(commandConsumer).close(any());
    // and sends an empty event with TTD = 0 downstream
    final ArgumentCaptor<Message> messageCaptor = ArgumentCaptor.forClass(Message.class);
    verify(downstreamEventSender, times(2)).sendAndWaitForOutcome(messageCaptor.capture(), any());
    assertThat(messageCaptor.getValue().getContentType()).isEqualTo(EventConstants.CONTENT_TYPE_EMPTY_NOTIFICATION);
    assertThat(MessageHelper.getTimeUntilDisconnect(messageCaptor.getValue())).isEqualTo(0);
}
 
示例21
private ProtonServer createProtonServer(final ProtonServerOptions options) {
    return ProtonServer.create(vertx, options)
            .saslAuthenticatorFactory(saslAuthenticatorFactory);
}
 
示例22
@BeforeEach
public void setup() throws InterruptedException {
    vertx = Vertx.vertx();
    server = ProtonServer.create(vertx);
    CountDownLatch latch = new CountDownLatch(1);
    CompletableFuture<ProtonSender> futureSender = new CompletableFuture<>();
    server.connectHandler(conn -> {
        conn.closeHandler(c -> {
            conn.close();
            conn.disconnect();
        });
        conn.disconnectHandler(c -> {
            conn.disconnect();
        }).open();

        conn.sessionOpenHandler(ProtonSession::open);

        conn.receiverOpenHandler(receiver -> {
            System.out.println("Receiver open");
            receiver.setTarget(receiver.getRemoteTarget());
            receiver.handler((delivery, message) -> {
                Message response = Message.Factory.create();
                response.setAddress(message.getAddress());
                response.setBody(new AmqpValue(true));
                response.setCorrelationId(message.getCorrelationId());
                response.setReplyTo(message.getReplyTo());
                try {
                    futureSender.get().send(response);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            receiver.open();
        });

        conn.senderOpenHandler(sender -> {
            sender.setSource(sender.getRemoteSource());
            sender.open();
            futureSender.complete(sender);
        });
    }).listen(12347, res -> {
        latch.countDown();
    });
    latch.await();
}
 
示例23
@BeforeEach
public void setup() throws Exception {
    vertx = Vertx.vertx();
    server = ProtonServer.create(vertx);
    inbox = new LinkedBlockingDeque<>();
    outbox = new LinkedBlockingDeque<>();
    CountDownLatch latch = new CountDownLatch(1);
    server.connectHandler(conn -> {
        conn.closeHandler(c -> {
            conn.close();
            conn.disconnect();
        });
        conn.disconnectHandler(c -> {
            conn.disconnect();
        }).open();

        conn.sessionOpenHandler(ProtonSession::open);

        conn.receiverOpenHandler(receiver -> {
            log.debug("Receiver open");
            receiver.setTarget(receiver.getRemoteTarget());
            receiver.handler((delivery, message) -> {
                inbox.add(message);
            });
            receiver.open();
        });

        conn.senderOpenHandler(sender -> {
            vertx.setPeriodic(100, id -> {
                try {
                    Message m = outbox.poll(0, TimeUnit.SECONDS);
                    if (m != null) {
                        sender.send(m);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    // Try again later
                }
            });
            sender.open();
        });
    }).listen(0, res -> latch.countDown());
    latch.await();
    actualPort = server.actualPort();
    log.debug("Using port {}", actualPort);
}
 
示例24
@Override
public ProtonServerImpl listen(int port, String host, Handler<AsyncResult<ProtonServer>> handler) {
  server.listen(port, host, convertHandler(handler));
  return this;
}
 
示例25
@Override
public ProtonServerImpl listen(Handler<AsyncResult<ProtonServer>> handler) {
  server.listen(convertHandler(handler));
  return this;
}
 
示例26
@Override
public ProtonServerImpl listen(int i, Handler<AsyncResult<ProtonServer>> handler) {
  server.listen(i, convertHandler(handler));
  return this;
}
 
示例27
@Test(timeout = 20000)
public void testCreateUseAndCancelSubscriber(TestContext context) throws Exception {
  server.close();

  final Async serverLinkOpenAsync = context.async();
  final Async serverReceivedMessageAsync = context.async();
  final Async serverLinkCloseAsync = context.async();

  ProtonServer protonServer = null;
  try {
    protonServer = createServer((serverConnection) -> {
      serverConnection.openHandler(result -> {
        serverConnection.open();
      });

      serverConnection.sessionOpenHandler(session -> session.open());

      serverConnection.receiverOpenHandler(serverReceiver -> {
        LOG.trace("Server receiver opened");

        serverReceiver.handler((delivery, msg) -> {
          // We got the message that was sent, complete the test
          if (LOG.isTraceEnabled()) {
            LOG.trace("Server got msg: " + getMessageBody(context, msg));
          }
          validateMessage(context, 1, "1", msg);
          serverReceivedMessageAsync.complete();
        });

        serverReceiver.closeHandler(x -> {
          serverReceiver.close();
          serverLinkCloseAsync.complete();
        });

        // Set the local terminus details [naively]
        serverReceiver.setTarget(serverReceiver.getRemoteTarget().copy());

        serverReceiver.open();

        serverLinkOpenAsync.complete();
      });
    });

    // ===== Client Handling =====

    ProtonClient client = ProtonClient.create(vertx);
    client.connect("localhost", protonServer.actualPort(), res -> {
      context.assertTrue(res.succeeded());

      ProtonConnection connection = res.result();
      connection.open();

      ProtonSubscriber<Tracker> subscriber = ProtonStreams.createTrackerProducer(connection,"myAddress");

      Tracker tracker = Tracker.create(message("1"), t -> {
        context.assertTrue(t.isAccepted(), "msg should be accepted");
        context.assertTrue(t.isRemotelySettled(), "msg should be remotely settled");
      });

      Publisher<Tracker> producer = Flowable.just(tracker);
      producer.subscribe(subscriber);
    });

    serverLinkOpenAsync.awaitSuccess();
    serverReceivedMessageAsync.awaitSuccess();
    serverLinkCloseAsync.awaitSuccess();
  } finally {
    if (protonServer != null) {
      protonServer.close();
    }
  }
}
 
示例28
@Test(timeout = 20000)
public void testSubCancelledOnLinkClose(TestContext context) throws Exception {
  server.close();

  final Async serverLinkOpenAsync = context.async();
  final Async clientLinkCloseAsync = context.async();

  ProtonServer protonServer = null;
  try {
    protonServer = createServer((serverConnection) -> {
      serverConnection.openHandler(result -> {
        serverConnection.open();
      });

      serverConnection.sessionOpenHandler(session -> session.open());

      serverConnection.receiverOpenHandler(serverReceiver -> {
        LOG.trace("Server receiver opened");
        // Set the local terminus details [naively]
        serverReceiver.setTarget(serverReceiver.getRemoteTarget().copy());

        serverReceiver.open();

        serverLinkOpenAsync.complete();

        serverReceiver.close();
      });
    });

    // ===== Client Handling =====

    ProtonClient client = ProtonClient.create(vertx);
    client.connect("localhost", protonServer.actualPort(), res -> {
      context.assertTrue(res.succeeded());

      ProtonConnection connection = res.result();
      connection.open();

      ProtonSubscriber<Tracker> subscriber = ProtonStreams.createTrackerProducer(connection,"myAddress");

      // Create a Publisher that doesn't produce elements, but indicates when its subscription is cancelled.
      Publisher<Tracker> producer = Flowable.<Tracker>never()
                                            .doOnCancel(() -> {
                                              LOG.trace("Cancelled!");
                                              clientLinkCloseAsync.complete();
                                            });
      producer.subscribe(subscriber);
    });

    serverLinkOpenAsync.awaitSuccess();
    clientLinkCloseAsync.awaitSuccess();
  } finally {
    if (protonServer != null) {
      protonServer.close();
    }
  }
}
 
示例29
@Test(timeout = 20000)
public void testSubCancelledOnConnectionEnd(TestContext context) throws Exception {
  server.close();

  final Async serverLinkOpenAsync = context.async();
  final Async subCancelled = context.async();

  ProtonServer protonServer = null;
  try {
    protonServer = createServer((serverConnection) -> {
      serverConnection.openHandler(result -> {
        serverConnection.open();
      });

      serverConnection.sessionOpenHandler(session -> session.open());

      serverConnection.receiverOpenHandler(serverReceiver -> {
        LOG.trace("Server receiver opened");
        // Set the local terminus details [naively]
        serverReceiver.setTarget(serverReceiver.getRemoteTarget().copy());

        serverReceiver.open();

        serverLinkOpenAsync.complete();
      });
    });

    // ===== Client Handling =====

    ProtonClient client = ProtonClient.create(vertx);
    client.connect("localhost", protonServer.actualPort(), res -> {
      context.assertTrue(res.succeeded());

      ProtonConnection connection = res.result();
      connection.open();

      ProtonSubscriber<Tracker> subscriber = ProtonStreams.createTrackerProducer(connection,"myAddress");

      // Create a Publisher that doesn't produce elements, but indicates when its subscription is cancelled.
      Publisher<Tracker> producer = Flowable.<Tracker>never()
                                            .doOnCancel(() -> {
                                              LOG.trace("Cancelled!");
                                              subCancelled.complete();
                                            });
      producer.subscribe(subscriber);
    });

    serverLinkOpenAsync.awaitSuccess();
    context.assertFalse(subCancelled.isCompleted());
    protonServer.close();
    protonServer = null;
    subCancelled.awaitSuccess();
  } finally {
    if (protonServer != null) {
      protonServer.close();
    }
  }
}
 
示例30
@Test(timeout = 20000)
public void testConfigureProducerLinkName(TestContext context) throws Exception {
  server.close();

  final Async serverLinkOpenAsync = context.async();
  final Async serverReceivedMessageAsync = context.async();
  final Async serverLinkCloseAsync = context.async();

  final String linkName = "testConfigureProducerLinkName";

  ProtonServer protonServer = null;
  try {
    protonServer = createServer((serverConnection) -> {
      serverConnection.openHandler(result -> {
        serverConnection.open();
      });

      serverConnection.sessionOpenHandler(session -> session.open());

      serverConnection.receiverOpenHandler(serverReceiver -> {
        LOG.trace("Server receiver opened");

        // Verify the link details used were as expected
        context.assertEquals(linkName, serverReceiver.getName(), "unexpected link name");

        serverReceiver.handler((delivery, msg) -> {
          // We got the message that was sent, complete the test
          if (LOG.isTraceEnabled()) {
            LOG.trace("Server got msg: " + getMessageBody(context, msg));
          }
          validateMessage(context, 1, "1", msg);
          serverReceivedMessageAsync.complete();
        });

        serverReceiver.closeHandler(x -> {
          serverReceiver.close();
          serverLinkCloseAsync.complete();
        });

        // Set the local terminus details [naively]
        serverReceiver.setTarget(serverReceiver.getRemoteTarget().copy());

        serverReceiver.open();

        serverLinkOpenAsync.complete();
      });
    });

    // ===== Client Handling =====

    ProtonClient client = ProtonClient.create(vertx);
    client.connect("localhost", protonServer.actualPort(), res -> {
      context.assertTrue(res.succeeded());

      ProtonConnection connection = res.result();
      connection.open();

      // Create subscriber with given link name
      ProtonSubscriberOptions options = new ProtonSubscriberOptions().setLinkName(linkName);

      ProtonSubscriber<Tracker> subscriber = ProtonStreams.createTrackerProducer(connection,"myAddress", options);

      Tracker envelope = Tracker.create(message("1"));
      Publisher<Tracker> producer = Flowable.just(envelope);
      producer.subscribe(subscriber);
    });

    serverLinkOpenAsync.awaitSuccess();
    serverReceivedMessageAsync.awaitSuccess();
    serverLinkCloseAsync.awaitSuccess();
  } finally {
    if (protonServer != null) {
      protonServer.close();
    }
  }
}