Java源码示例:io.vertx.proton.ProtonClientOptions

示例1
/**
 * Verifies that if a client disconnects from the server, then an attempt to connect again will be successful.
 *
 * @param ctx The test execution context.
 */
@Test
public void testConnectSucceedsAfterDisconnect(final VertxTestContext ctx) {

    honoConnection.connect()
        .compose(ok -> {
            // GIVEN a client that is connected to a server
            final Promise<Void> disconnected = Promise.promise();
            // WHEN the client disconnects
            honoConnection.disconnect(disconnected);
            @SuppressWarnings("unchecked")
            final ArgumentCaptor<Handler<AsyncResult<ProtonConnection>>> closeHandler = ArgumentCaptor.forClass(Handler.class);
            ctx.verify(() -> verify(con).closeHandler(closeHandler.capture()));
            closeHandler.getValue().handle(Future.succeededFuture(con));
            return disconnected.future();
        })
        .compose(d -> {
            // AND tries to reconnect again
            return honoConnection.connect(new ProtonClientOptions());
        })
        // THEN the connection succeeds
        .onComplete(ctx.completing());
}
 
示例2
@Override
public void connect(
        final ProtonClientOptions options,
        final String username,
        final String password,
        final String containerId,
        final Handler<AsyncResult<ProtonConnection>> closeHandler,
        final Handler<ProtonConnection> disconnectHandler,
        final Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {

    this.closeHandler = closeHandler;
    this.disconnectHandler = disconnectHandler;
    if (expectedFailingConnectionAttempts.getCount() > 0) {
        expectedFailingConnectionAttempts.countDown();
        connectionResultHandler.handle(Future.failedFuture(causeForFailure));
    } else {
        expectedSucceedingConnectionAttempts.countDown();
        connectionResultHandler.handle(Future.succeededFuture(connectionToCreate));
    }
}
 
示例3
/**
 * Connects to the AMQP protocol adapter using a username and password.
 *
 *
 * @param username The username to use for authentication.
 * @param password The password to use for authentication.
 * @return A succeeded future containing the established connection.
 */
protected Future<ProtonConnection> connectToAdapter(final String username, final String password) {

    final Promise<ProtonConnection> result = Promise.promise();
    final ProtonClient client = ProtonClient.create(VERTX);

    final ProtonClientOptions options = new ProtonClientOptions(defaultOptions);
    options.addEnabledSaslMechanism(ProtonSaslPlainImpl.MECH_NAME);
    client.connect(
            options,
            IntegrationTestSupport.AMQP_HOST,
            IntegrationTestSupport.AMQPS_PORT,
            username,
            password,
            result);
    return result.future().compose(this::handleConnectAttempt);
}
 
示例4
/**
 * Connects to the AMQP protocol adapter using a client certificate.
 *
 * @param clientCertificate The certificate to use for authentication.
 * @return A succeeded future containing the established connection.
 */
protected Future<ProtonConnection> connectToAdapter(final SelfSignedCertificate clientCertificate) {

    final Promise<ProtonConnection> result = Promise.promise();
    final ProtonClient client = ProtonClient.create(VERTX);

    final ProtonClientOptions secureOptions = new ProtonClientOptions(defaultOptions);
    secureOptions.setKeyCertOptions(clientCertificate.keyCertOptions());
    secureOptions.addEnabledSaslMechanism(ProtonSaslExternalImpl.MECH_NAME);
    client.connect(
            secureOptions,
            IntegrationTestSupport.AMQP_HOST,
            IntegrationTestSupport.AMQPS_PORT,
            result);
    return result.future().compose(this::handleConnectAttempt);
}
 
示例5
private void addTlsTrustOptions(final ProtonClientOptions clientOptions, final ClientConfigProperties config) {

        if (config.isTlsEnabled()) {
            clientOptions.setSsl(true);
        }

        final TrustOptions trustOptions = config.getTrustOptions();
        if (trustOptions != null) {
            clientOptions.setSsl(true).setTrustOptions(trustOptions);
        }

        if (clientOptions.isSsl()) {
            if (config.isHostnameVerificationRequired()) {
                clientOptions.setHostnameVerificationAlgorithm("HTTPS");
            } else {
                clientOptions.setHostnameVerificationAlgorithm("");
            }
        }
    }
 
示例6
private void handleTimedOutConnectionAttemptResult(final AsyncResult<ProtonConnection> conAttempt, final ProtonClientOptions clientOptions) {
    if (conAttempt.succeeded()) {
        logger.debug("ignoring successful connection attempt to AMQP 1.0 container [{}://{}:{}, role: {}]: attempt already timed out",
                clientOptions.isSsl() ? "amqps" : "amqp",
                config.getHost(),
                config.getPort(),
                config.getServerRole());
        final ProtonConnection downstreamConnection = conAttempt.result();
        downstreamConnection.disconnect();
    } else {
        logger.debug("ignoring failed connection attempt to AMQP 1.0 container [{}://{}:{}, role: {}]: attempt already timed out",
                clientOptions.isSsl() ? "amqps" : "amqp",
                config.getHost(),
                config.getPort(),
                config.getServerRole(),
                conAttempt.cause());
    }
}
 
示例7
/**
 * Verifies that the factory does not enable SASL_PLAIN if the username and password are empty
 * strings.
 */
@SuppressWarnings("unchecked")
@Test
public void testConnectDoesNotUseSaslPlainForEmptyUsernameAndPassword() {

    // GIVEN a factory configured to connect to a server
    final ProtonClientOptions options = new ProtonClientOptions();
    final ProtonClient client = mock(ProtonClient.class);
    final ConnectionFactoryImpl factory = new ConnectionFactoryImpl(vertx, props);
    factory.setProtonClient(client);

    // WHEN connecting to the server using empty strings for username and password
    factory.connect(options, "", "", null, null, c -> {});

    // THEN the factory does not enable the SASL_PLAIN mechanism when establishing
    // the connection
    final ArgumentCaptor<ProtonClientOptions> optionsCaptor = ArgumentCaptor.forClass(ProtonClientOptions.class);
    verify(client).connect(optionsCaptor.capture(), anyString(), anyInt(), eq(""), eq(""), any(Handler.class));
    assertFalse(optionsCaptor.getValue().getEnabledSaslMechanisms().contains("PLAIN"));
}
 
示例8
/**
 * Verifies that the factory enables SASL_PLAIN if the username and password are non-empty
 * strings.
 */
@SuppressWarnings("unchecked")
@Test
public void testConnectAddsSaslPlainForNonEmptyUsernameAndPassword() {

    // GIVEN a factory configured to connect to a server
    final ProtonClientOptions options = new ProtonClientOptions();
    final ProtonClient client = mock(ProtonClient.class);
    final ConnectionFactoryImpl factory = new ConnectionFactoryImpl(vertx, props);
    factory.setProtonClient(client);

    // WHEN connecting to the server using non-empty strings for username and password
    factory.connect(options, "user", "pw", null, null, c -> {});

    // THEN the factory uses SASL_PLAIN when establishing the connection
    final ArgumentCaptor<ProtonClientOptions> optionsCaptor = ArgumentCaptor.forClass(ProtonClientOptions.class);
    verify(client).connect(optionsCaptor.capture(), anyString(), anyInt(), eq("user"), eq("pw"), any(Handler.class));
    assertTrue(optionsCaptor.getValue().getEnabledSaslMechanisms().contains("PLAIN"));
}
 
示例9
/**
 * Verifies that the factory uses TLS when connecting to the peer if no trust store
 * is configured but TLS has been enabled explicitly.
 */
@SuppressWarnings("unchecked")
@Test
public void testConnectEnablesSslIfExplicitlyConfigured() {

    // GIVEN a factory configured to connect to a server using TLS
    final ClientConfigProperties config = new ClientConfigProperties();
    config.setHost("remote.host");
    config.setTlsEnabled(true);
    final ProtonClient client = mock(ProtonClient.class);
    final ConnectionFactoryImpl factory = new ConnectionFactoryImpl(vertx, config);
    factory.setProtonClient(client);

    // WHEN connecting to the server
    factory.connect(null, null, null, c -> {});

    // THEN the factory uses TLS when establishing the connection
    final ArgumentCaptor<ProtonClientOptions> optionsCaptor = ArgumentCaptor.forClass(ProtonClientOptions.class);
    verify(client).connect(optionsCaptor.capture(), eq("remote.host"), anyInt(), any(), any(), any(Handler.class));
    assertTrue(optionsCaptor.getValue().isSsl());
}
 
示例10
/**
 * Verifies that the factory uses TLS when connecting to the peer if a trust store
 * is configured but TLS has not been enabled explicitly.
 */
@SuppressWarnings("unchecked")
@Test
public void testConnectEnablesSslIfTrustStoreIsConfigured() {

    // GIVEN a factory configured to use a specific trust store
    final ClientConfigProperties config = new ClientConfigProperties();
    config.setHost("remote.host");
    config.setTrustStorePath(PREFIX_KEY_PATH + "trusted-certs.pem");
    final ProtonClient client = mock(ProtonClient.class);
    final ConnectionFactoryImpl factory = new ConnectionFactoryImpl(vertx, config);
    factory.setProtonClient(client);

    // WHEN connecting to the server
    factory.connect(null, null, null, c -> {});

    // THEN the factory uses TLS when establishing the connection
    final ArgumentCaptor<ProtonClientOptions> optionsCaptor = ArgumentCaptor.forClass(ProtonClientOptions.class);
    verify(client).connect(optionsCaptor.capture(), eq("remote.host"), anyInt(), any(), any(), any(Handler.class));
    assertTrue(optionsCaptor.getValue().isSsl());
}
 
示例11
/**
 * Create an options instance for the ProtonClient
 *
 * @return ProtonClient options instance
 */
private ProtonClientOptions createClientOptions() {

    ProtonClientOptions options = new ProtonClientOptions();
    options.setConnectTimeout(1000);
    options.setReconnectAttempts(-1).setReconnectInterval(1000); // reconnect forever, every 1000 millisecs

    if (this.bridgeConfig.getAmqpConfig().getCertDir() != null && this.bridgeConfig.getAmqpConfig().getCertDir().length() > 0) {
        String certDir = this.bridgeConfig.getAmqpConfig().getCertDir();
        log.info("Enabling SSL configuration for AMQP with TLS certificates from {}", certDir);
        options.setSsl(true)
                .addEnabledSaslMechanism("EXTERNAL")
                .setHostnameVerificationAlgorithm("")
                .setPemTrustOptions(new PemTrustOptions()
                        .addCertPath(new File(certDir, "ca.crt").getAbsolutePath()))
                .setPemKeyCertOptions(new PemKeyCertOptions()
                        .addCertPath(new File(certDir, "tls.crt").getAbsolutePath())
                        .addKeyPath(new File(certDir, "tls.key").getAbsolutePath()));
    }

    return options;
}
 
示例12
public AmqpClient createClient(TerminusFactory terminusFactory, ProtonQoS qos, AddressSpace addressSpace) throws Exception {
    assertNotNull(addressSpace, "Address space is null");
    Endpoint messagingEndpoint = AddressSpaceUtils.getEndpointByServiceName(addressSpace, "messaging");
    if (messagingEndpoint == null) {
        String externalEndpointName = AddressSpaceUtils.getExternalEndpointName(addressSpace, "messaging");
        messagingEndpoint = Kubernetes.getInstance().getExternalEndpoint(externalEndpointName + "-" + AddressSpaceUtils.getAddressSpaceInfraUuid(addressSpace));
    }
    Endpoint clientEndpoint;
    ProtonClientOptions clientOptions = new ProtonClientOptions();
    clientOptions.setSsl(true);
    clientOptions.setTrustAll(true);
    clientOptions.setHostnameVerificationAlgorithm("");

    if (TestUtils.resolvable(messagingEndpoint)) {
        clientEndpoint = messagingEndpoint;
    } else {
        clientEndpoint = new Endpoint("localhost", 443);
        clientOptions.setSniServerName(messagingEndpoint.getHost());
    }
    log.info("External endpoint: " + clientEndpoint + ", internal: " + messagingEndpoint);
    probeEndpoint(messagingEndpoint);

    return createClient(terminusFactory, clientEndpoint, clientOptions, qos);
}
 
示例13
void doTestSendReceiveOutsideCluster(String host, int port, String address, boolean tls, boolean verifyHost, String caCert) throws Exception {
    ProtonClientOptions protonClientOptions = new ProtonClientOptions();
    if (tls) {
        protonClientOptions.setSsl(true);
        if (!verifyHost) {
            protonClientOptions.setHostnameVerificationAlgorithm("");
        }
        if (caCert != null) {
            protonClientOptions.setTrustOptions(new PemTrustOptions()
                    .addCertValue(Buffer.buffer(caCert)));
        }
    }
    AmqpClient client = resourceManager.getAmqpClientFactory().createClient(new AmqpConnectOptions()
            .setSaslMechanism("ANONYMOUS")
            .setQos(ProtonQoS.AT_LEAST_ONCE)
            .setEndpoint(new Endpoint(host, port))
            .setProtonClientOptions(protonClientOptions)
            .setTerminusFactory(new QueueTerminusFactory()));

    assertEquals(1, client.sendMessages(address, Collections.singletonList("hello")).get(1, TimeUnit.MINUTES));
    var result = client.recvMessages(address, 1).get();
    assertEquals(1, result.size());
    assertEquals("hello", ((AmqpValue) result.get(0).getBody()).getValue());
}
 
示例14
public ClientFactory(Vertx vertx) {

        this.vertx = vertx;


        String path = System.getenv("HOME") + System.getenv("AMQ_NAME") + "/etc";

        JksOptions clientJksOptions = new JksOptions();
        clientJksOptions
            .setPath(path + "/enmasse-keystore.jks")
            .setPassword("enmasse");

        PfxOptions clientPfxOptions = new PfxOptions()
            .setPath(path + "/enmasse-truststore.jks")
            .setPassword("enmasse");

        this.protonClientOptions =  new ProtonClientOptions()
            .setSsl(true)
            .setHostnameVerificationAlgorithm("")
            .setKeyStoreOptions(clientJksOptions)
            .setPfxTrustOptions(clientPfxOptions);

    }
 
示例15
/**
 * {@inheritDoc}
 */
@Override
public final Future<HonoConnection> connect(final ProtonClientOptions options) {
    final Promise<HonoConnection> result = Promise.promise();
    if (shuttingDown.get()) {
        result.fail(new ClientErrorException(HttpURLConnection.HTTP_CONFLICT, "client is already shut down"));
    } else {
        connect(options, result, null);
    }
    return result.future();
}
 
示例16
/**
 * Verifies username/password credentials with a remote authentication server using SASL PLAIN.
 *
 * @param authzid The identity to act as.
 * @param authcid The username.
 * @param password The password.
 * @param authenticationResultHandler The handler to invoke with the authentication result. On successful authentication,
 *                                    the result contains a JWT with the authenticated user's claims.
 */
public void verifyPlain(final String authzid, final String authcid, final String password,
        final Handler<AsyncResult<HonoUser>> authenticationResultHandler) {

    final ProtonClientOptions options = new ProtonClientOptions();
    options.setReconnectAttempts(3).setReconnectInterval(50);
    options.addEnabledSaslMechanism(AuthenticationConstants.MECHANISM_PLAIN);

    final Promise<ProtonConnection> connectAttempt = Promise.promise();
    factory.connect(options, authcid, password, null, null, connectAttempt);

    connectAttempt.future()
    .compose(openCon -> getToken(openCon))
    .onComplete(s -> {
        if (s.succeeded()) {
            authenticationResultHandler.handle(Future.succeededFuture(s.result()));
        } else {
            authenticationResultHandler
            .handle(Future.failedFuture(mapConnectionFailureToServiceInvocationException(s.cause())));
        }
        Optional.ofNullable(connectAttempt.future().result())
        .ifPresent(con -> {
            LOG.debug("closing connection to Authentication service");
            con.close();
        });
    });
}
 
示例17
/**
 * Verifies that the client tries to re-establish a lost connection to a server.
 *
 * @param ctx The vert.x test context.
 */
@Test
public void testDownstreamDisconnectTriggersReconnect(final VertxTestContext ctx) {

    // GIVEN an client that is connected to a peer to which the
    // connection can be established on the third attempt only
    connectionFactory = new DisconnectHandlerProvidingConnectionFactory(con);
    props.setReconnectAttempts(1);
    final ProtonClientOptions options = new ProtonClientOptions()
            .setReconnectAttempts(0);
    honoConnection = new HonoConnectionImpl(vertx, connectionFactory, props);
    honoConnection.connect(options).onComplete(ctx.succeeding());
    ctx.verify(() -> assertThat(connectionFactory.await()).isTrue());
    connectionFactory.setExpectedSucceedingConnectionAttempts(1);

    // WHEN the downstream connection fails
    connectionFactory.getDisconnectHandler().handle(con);

    // THEN the adapter reconnects to the downstream container
    ctx.verify(() -> assertThat(connectionFactory.await()).isTrue());
    connectionFactory.setExpectedSucceedingConnectionAttempts(1);

    // and when the downstream connection fails again
    connectionFactory.getDisconnectHandler().handle(con);

    // THEN the adapter reconnects to the downstream container again
    ctx.verify(() -> assertThat(connectionFactory.await()).isTrue());
    ctx.completeNow();
}
 
示例18
/**
 * Verifies that the client tries to re-connect to a server instance if the
 * connection is closed by the peer.
 *
 * @param ctx The test context.
 *
 */
@Test
public void testOnRemoteCloseTriggersReconnection(final VertxTestContext ctx) {

    // GIVEN a client that is connected to a server
    final Promise<HonoConnection> connected = Promise.promise();
    @SuppressWarnings("unchecked")
    final DisconnectListener<HonoConnection> disconnectListener = mock(DisconnectListener.class);
    honoConnection.addDisconnectListener(disconnectListener);
    honoConnection.connect(new ProtonClientOptions().setReconnectAttempts(1))
        .onComplete(connected);
    connectionFactory.setExpectedSucceedingConnectionAttempts(1);

    connected.future().onComplete(ctx.succeeding(c -> {
        // WHEN the peer closes the connection
        connectionFactory.getCloseHandler().handle(Future.failedFuture("shutting down for maintenance"));

        ctx.verify(() -> {
            // THEN the client invokes the registered disconnect handler
            verify(disconnectListener).onDisconnect(honoConnection);
            // and the original connection has been closed locally
            verify(con).close();
            verify(con).disconnectHandler(null);
            // and the connection is re-established
            assertThat(connectionFactory.await()).isTrue();
        });
        ctx.completeNow();
    }));
}
 
示例19
@Override
public void connect(
        final ProtonClientOptions options,
        final Handler<AsyncResult<ProtonConnection>> closeHandler,
        final Handler<ProtonConnection> disconnectHandler,
        final Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {
    connect(options, null, null, closeHandler, disconnectHandler, connectionResultHandler);
}
 
示例20
@Override
public void connect(
        final ProtonClientOptions options,
        final String username,
        final String password,
        final Handler<AsyncResult<ProtonConnection>> closeHandler,
        final Handler<ProtonConnection> disconnectHandler,
        final Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {
    connect(options, null, null, null, closeHandler, disconnectHandler, connectionResultHandler);
}
 
示例21
/**
 * Create a HTTP client for accessing the device registry (for registering devices and credentials) and
 * an AMQP 1.0 client for consuming messages from the messaging network.
 *
 * @param ctx The Vert.x test context.
 */
@BeforeAll
public static void setup(final VertxTestContext ctx) {

    VERTX = Vertx.vertx();

    defaultOptions = new ProtonClientOptions()
            .setTrustOptions(new PemTrustOptions().addCertPath(IntegrationTestSupport.TRUST_STORE_PATH))
            .setHostnameVerificationAlgorithm("")
            .setSsl(true);

    helper = new IntegrationTestSupport(VERTX);
    helper.init().onComplete(ctx.completing());

}
 
示例22
@Override
public void connect(
        final ProtonClientOptions options,
        final Handler<AsyncResult<ProtonConnection>> closeHandler,
        final Handler<ProtonConnection> disconnectHandler,
        final Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {
    connect(options, null, null, closeHandler, disconnectHandler, connectionResultHandler);
}
 
示例23
@Override
public void connect(
        final ProtonClientOptions options,
        final String username,
        final String password,
        final Handler<AsyncResult<ProtonConnection>> closeHandler,
        final Handler<ProtonConnection> disconnectHandler,
        final Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {
    connect(options, username, password, null, closeHandler, disconnectHandler, connectionResultHandler);
}
 
示例24
private void failConnectionAttempt(final ProtonClientOptions clientOptions,
                                   final Handler<AsyncResult<ProtonConnection>> connectionResultHandler, final Throwable cause) {
    logger.debug("can't connect to AMQP 1.0 container [{}://{}:{}, role: {}]: {}",
            clientOptions.isSsl() ? "amqps" : "amqp",
            config.getHost(),
            config.getPort(),
            config.getServerRole(),
            cause.getMessage());
    connectionResultHandler.handle(Future.failedFuture(cause));
}
 
示例25
private void handleTimedOutOpenHandlerResult(final AsyncResult<ProtonConnection> openConnectionResult,
        final ProtonConnection downstreamConnection, final ProtonClientOptions clientOptions) {
    if (openConnectionResult.succeeded()) {
        logger.debug("ignoring received open frame from container [{}] at [{}://{}:{}, role: {}]: connection attempt already timed out",
                downstreamConnection.getRemoteContainer(),
                clientOptions.isSsl() ? "amqps" : "amqp",
                config.getHost(),
                config.getPort(),
                config.getServerRole());
        // close the connection again
        downstreamConnection.closeHandler(null);
        downstreamConnection.disconnectHandler(null);
        downstreamConnection.close();
    } else {
        final ErrorCondition error = downstreamConnection.getRemoteCondition();
        if (error == null) {
            logger.warn("ignoring failure to open connection to container [{}] at [{}://{}:{}, role: {}]: attempt already timed out",
                    downstreamConnection.getRemoteContainer(),
                    clientOptions.isSsl() ? "amqps" : "amqp",
                    config.getHost(),
                    config.getPort(),
                    config.getServerRole(),
                    openConnectionResult.cause());
        } else {
            logger.warn("ignoring failure to open connection to container [{}] at [{}://{}:{}, role: {}]: attempt already timed out; error: {} -{}",
                    downstreamConnection.getRemoteContainer(),
                    clientOptions.isSsl() ? "amqps" : "amqp",
                    config.getHost(),
                    config.getPort(),
                    config.getServerRole(),
                    error.getCondition(),
                    error.getDescription());
        }
    }
}
 
示例26
private void addOptions(final ProtonClientOptions clientOptions, final String username, final String password) {

        addTlsTrustOptions(clientOptions);
        if (!Strings.isNullOrEmpty(username) && !Strings.isNullOrEmpty(password)) {
            clientOptions.addEnabledSaslMechanism(ProtonSaslPlainImpl.MECH_NAME);
        } else {
            addTlsKeyCertOptions(clientOptions);
        }
    }
 
示例27
private void addTlsKeyCertOptions(final ProtonClientOptions clientOptions) {

        if (clientOptions.getKeyCertOptions() == null) {
            final KeyCertOptions keyCertOptions = config.getKeyCertOptions();
            if (keyCertOptions != null) {
                clientOptions.setSsl(true).setKeyCertOptions(keyCertOptions);
                clientOptions.addEnabledSaslMechanism(ProtonSaslExternalImpl.MECH_NAME);
            }
        }
    }
 
示例28
private ProtonClientOptions createClientOptions() {
    final ProtonClientOptions options = new ProtonClientOptions();
    options.setConnectTimeout(config.getConnectTimeout());
    options.setHeartbeat(config.getHeartbeatInterval());
    options.setReconnectAttempts(0);
    return options;
}
 
示例29
/**
 * Connect to an AMQP server/router
 *
 * @param startPromise
 */
private void connectAmqpClient(Promise<Void> startPromise) {

    this.client = ProtonClient.create(this.vertx);

    String host = this.bridgeConfig.getAmqpConfig().getHost();
    int port = this.bridgeConfig.getAmqpConfig().getPort();

    ProtonClientOptions options = this.createClientOptions();

    this.client.connect(options, host, port, ar -> {

        if (ar.succeeded()) {

            ProtonConnection connection = ar.result();
            connection.setContainer(CONTAINER_ID);

            this.processConnection(connection);

            log.info("AMQP-Kafka Bridge started and connected in client mode to {}:{}", host, port);
            log.info("AMQP-Kafka Bridge bootstrap servers {}",
                    this.bridgeConfig.getKafkaConfig().getConfig()
                            .get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
            );

            this.isReady = true;

            startPromise.complete();

        } else {
            log.error("Error connecting AMQP-Kafka Bridge as client", ar.cause());
            startPromise.fail(ar.cause());
        }
    });
}
 
示例30
private boolean canConnectWithAmqpAddress(ResourceManager resourceManager, AddressSpace addressSpace, UserCredentials credentials, AddressType addressType, String address, boolean defaultValue) throws Exception {
    Set<AddressType> brokeredAddressTypes = new HashSet<>(Arrays.asList(AddressType.QUEUE, AddressType.TOPIC));
    if (AddressSpaceUtils.isBrokered(addressSpace) && !brokeredAddressTypes.contains(addressType)) {
        return defaultValue;
    }
    try (AmqpClient client = resourceManager.getAmqpClientFactory().createAddressClient(addressSpace, addressType)) {
        client.getConnectOptions().setCredentials(credentials);
        ProtonClientOptions protonClientOptions = client.getConnectOptions().getProtonClientOptions();
        protonClientOptions.setLogActivity(true);
        client.getConnectOptions().setProtonClientOptions(protonClientOptions);

        try {
            Future<List<Message>> received = client.recvMessages(address, 1);
            Future<Integer> sent = client.sendMessages(address, Collections.singletonList("msg1"));

            int numReceived = received.get(1, TimeUnit.MINUTES).size();
            int numSent = sent.get(1, TimeUnit.MINUTES);
            return (numSent == numReceived);
        } catch (ExecutionException | SecurityException | UnauthorizedAccessException ex) {
            Throwable cause = ex;
            if (ex instanceof ExecutionException) {
                cause = ex.getCause();
            }

            if (cause instanceof AuthenticationException || cause instanceof SaslSystemException || cause instanceof SecurityException || cause instanceof UnauthorizedAccessException || cause instanceof MechanismMismatchException) {
                LOGGER.info("canConnectWithAmqpAddress {} ({}): {}", address, addressType, ex.getMessage());
                return false;
            } else {
                LOGGER.warn("canConnectWithAmqpAddress {} ({}) exception", address, addressType, ex);
                throw ex;
            }
        }
    }
}