Java源码示例:io.vertx.proton.ProtonClientOptions
示例1
/**
* Verifies that if a client disconnects from the server, then an attempt to connect again will be successful.
*
* @param ctx The test execution context.
*/
@Test
public void testConnectSucceedsAfterDisconnect(final VertxTestContext ctx) {
honoConnection.connect()
.compose(ok -> {
// GIVEN a client that is connected to a server
final Promise<Void> disconnected = Promise.promise();
// WHEN the client disconnects
honoConnection.disconnect(disconnected);
@SuppressWarnings("unchecked")
final ArgumentCaptor<Handler<AsyncResult<ProtonConnection>>> closeHandler = ArgumentCaptor.forClass(Handler.class);
ctx.verify(() -> verify(con).closeHandler(closeHandler.capture()));
closeHandler.getValue().handle(Future.succeededFuture(con));
return disconnected.future();
})
.compose(d -> {
// AND tries to reconnect again
return honoConnection.connect(new ProtonClientOptions());
})
// THEN the connection succeeds
.onComplete(ctx.completing());
}
示例2
@Override
public void connect(
final ProtonClientOptions options,
final String username,
final String password,
final String containerId,
final Handler<AsyncResult<ProtonConnection>> closeHandler,
final Handler<ProtonConnection> disconnectHandler,
final Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {
this.closeHandler = closeHandler;
this.disconnectHandler = disconnectHandler;
if (expectedFailingConnectionAttempts.getCount() > 0) {
expectedFailingConnectionAttempts.countDown();
connectionResultHandler.handle(Future.failedFuture(causeForFailure));
} else {
expectedSucceedingConnectionAttempts.countDown();
connectionResultHandler.handle(Future.succeededFuture(connectionToCreate));
}
}
示例3
/**
* Connects to the AMQP protocol adapter using a username and password.
*
*
* @param username The username to use for authentication.
* @param password The password to use for authentication.
* @return A succeeded future containing the established connection.
*/
protected Future<ProtonConnection> connectToAdapter(final String username, final String password) {
final Promise<ProtonConnection> result = Promise.promise();
final ProtonClient client = ProtonClient.create(VERTX);
final ProtonClientOptions options = new ProtonClientOptions(defaultOptions);
options.addEnabledSaslMechanism(ProtonSaslPlainImpl.MECH_NAME);
client.connect(
options,
IntegrationTestSupport.AMQP_HOST,
IntegrationTestSupport.AMQPS_PORT,
username,
password,
result);
return result.future().compose(this::handleConnectAttempt);
}
示例4
/**
* Connects to the AMQP protocol adapter using a client certificate.
*
* @param clientCertificate The certificate to use for authentication.
* @return A succeeded future containing the established connection.
*/
protected Future<ProtonConnection> connectToAdapter(final SelfSignedCertificate clientCertificate) {
final Promise<ProtonConnection> result = Promise.promise();
final ProtonClient client = ProtonClient.create(VERTX);
final ProtonClientOptions secureOptions = new ProtonClientOptions(defaultOptions);
secureOptions.setKeyCertOptions(clientCertificate.keyCertOptions());
secureOptions.addEnabledSaslMechanism(ProtonSaslExternalImpl.MECH_NAME);
client.connect(
secureOptions,
IntegrationTestSupport.AMQP_HOST,
IntegrationTestSupport.AMQPS_PORT,
result);
return result.future().compose(this::handleConnectAttempt);
}
示例5
private void addTlsTrustOptions(final ProtonClientOptions clientOptions, final ClientConfigProperties config) {
if (config.isTlsEnabled()) {
clientOptions.setSsl(true);
}
final TrustOptions trustOptions = config.getTrustOptions();
if (trustOptions != null) {
clientOptions.setSsl(true).setTrustOptions(trustOptions);
}
if (clientOptions.isSsl()) {
if (config.isHostnameVerificationRequired()) {
clientOptions.setHostnameVerificationAlgorithm("HTTPS");
} else {
clientOptions.setHostnameVerificationAlgorithm("");
}
}
}
示例6
private void handleTimedOutConnectionAttemptResult(final AsyncResult<ProtonConnection> conAttempt, final ProtonClientOptions clientOptions) {
if (conAttempt.succeeded()) {
logger.debug("ignoring successful connection attempt to AMQP 1.0 container [{}://{}:{}, role: {}]: attempt already timed out",
clientOptions.isSsl() ? "amqps" : "amqp",
config.getHost(),
config.getPort(),
config.getServerRole());
final ProtonConnection downstreamConnection = conAttempt.result();
downstreamConnection.disconnect();
} else {
logger.debug("ignoring failed connection attempt to AMQP 1.0 container [{}://{}:{}, role: {}]: attempt already timed out",
clientOptions.isSsl() ? "amqps" : "amqp",
config.getHost(),
config.getPort(),
config.getServerRole(),
conAttempt.cause());
}
}
示例7
/**
* Verifies that the factory does not enable SASL_PLAIN if the username and password are empty
* strings.
*/
@SuppressWarnings("unchecked")
@Test
public void testConnectDoesNotUseSaslPlainForEmptyUsernameAndPassword() {
// GIVEN a factory configured to connect to a server
final ProtonClientOptions options = new ProtonClientOptions();
final ProtonClient client = mock(ProtonClient.class);
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl(vertx, props);
factory.setProtonClient(client);
// WHEN connecting to the server using empty strings for username and password
factory.connect(options, "", "", null, null, c -> {});
// THEN the factory does not enable the SASL_PLAIN mechanism when establishing
// the connection
final ArgumentCaptor<ProtonClientOptions> optionsCaptor = ArgumentCaptor.forClass(ProtonClientOptions.class);
verify(client).connect(optionsCaptor.capture(), anyString(), anyInt(), eq(""), eq(""), any(Handler.class));
assertFalse(optionsCaptor.getValue().getEnabledSaslMechanisms().contains("PLAIN"));
}
示例8
/**
* Verifies that the factory enables SASL_PLAIN if the username and password are non-empty
* strings.
*/
@SuppressWarnings("unchecked")
@Test
public void testConnectAddsSaslPlainForNonEmptyUsernameAndPassword() {
// GIVEN a factory configured to connect to a server
final ProtonClientOptions options = new ProtonClientOptions();
final ProtonClient client = mock(ProtonClient.class);
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl(vertx, props);
factory.setProtonClient(client);
// WHEN connecting to the server using non-empty strings for username and password
factory.connect(options, "user", "pw", null, null, c -> {});
// THEN the factory uses SASL_PLAIN when establishing the connection
final ArgumentCaptor<ProtonClientOptions> optionsCaptor = ArgumentCaptor.forClass(ProtonClientOptions.class);
verify(client).connect(optionsCaptor.capture(), anyString(), anyInt(), eq("user"), eq("pw"), any(Handler.class));
assertTrue(optionsCaptor.getValue().getEnabledSaslMechanisms().contains("PLAIN"));
}
示例9
/**
* Verifies that the factory uses TLS when connecting to the peer if no trust store
* is configured but TLS has been enabled explicitly.
*/
@SuppressWarnings("unchecked")
@Test
public void testConnectEnablesSslIfExplicitlyConfigured() {
// GIVEN a factory configured to connect to a server using TLS
final ClientConfigProperties config = new ClientConfigProperties();
config.setHost("remote.host");
config.setTlsEnabled(true);
final ProtonClient client = mock(ProtonClient.class);
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl(vertx, config);
factory.setProtonClient(client);
// WHEN connecting to the server
factory.connect(null, null, null, c -> {});
// THEN the factory uses TLS when establishing the connection
final ArgumentCaptor<ProtonClientOptions> optionsCaptor = ArgumentCaptor.forClass(ProtonClientOptions.class);
verify(client).connect(optionsCaptor.capture(), eq("remote.host"), anyInt(), any(), any(), any(Handler.class));
assertTrue(optionsCaptor.getValue().isSsl());
}
示例10
/**
* Verifies that the factory uses TLS when connecting to the peer if a trust store
* is configured but TLS has not been enabled explicitly.
*/
@SuppressWarnings("unchecked")
@Test
public void testConnectEnablesSslIfTrustStoreIsConfigured() {
// GIVEN a factory configured to use a specific trust store
final ClientConfigProperties config = new ClientConfigProperties();
config.setHost("remote.host");
config.setTrustStorePath(PREFIX_KEY_PATH + "trusted-certs.pem");
final ProtonClient client = mock(ProtonClient.class);
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl(vertx, config);
factory.setProtonClient(client);
// WHEN connecting to the server
factory.connect(null, null, null, c -> {});
// THEN the factory uses TLS when establishing the connection
final ArgumentCaptor<ProtonClientOptions> optionsCaptor = ArgumentCaptor.forClass(ProtonClientOptions.class);
verify(client).connect(optionsCaptor.capture(), eq("remote.host"), anyInt(), any(), any(), any(Handler.class));
assertTrue(optionsCaptor.getValue().isSsl());
}
示例11
/**
* Create an options instance for the ProtonClient
*
* @return ProtonClient options instance
*/
private ProtonClientOptions createClientOptions() {
ProtonClientOptions options = new ProtonClientOptions();
options.setConnectTimeout(1000);
options.setReconnectAttempts(-1).setReconnectInterval(1000); // reconnect forever, every 1000 millisecs
if (this.bridgeConfig.getAmqpConfig().getCertDir() != null && this.bridgeConfig.getAmqpConfig().getCertDir().length() > 0) {
String certDir = this.bridgeConfig.getAmqpConfig().getCertDir();
log.info("Enabling SSL configuration for AMQP with TLS certificates from {}", certDir);
options.setSsl(true)
.addEnabledSaslMechanism("EXTERNAL")
.setHostnameVerificationAlgorithm("")
.setPemTrustOptions(new PemTrustOptions()
.addCertPath(new File(certDir, "ca.crt").getAbsolutePath()))
.setPemKeyCertOptions(new PemKeyCertOptions()
.addCertPath(new File(certDir, "tls.crt").getAbsolutePath())
.addKeyPath(new File(certDir, "tls.key").getAbsolutePath()));
}
return options;
}
示例12
public AmqpClient createClient(TerminusFactory terminusFactory, ProtonQoS qos, AddressSpace addressSpace) throws Exception {
assertNotNull(addressSpace, "Address space is null");
Endpoint messagingEndpoint = AddressSpaceUtils.getEndpointByServiceName(addressSpace, "messaging");
if (messagingEndpoint == null) {
String externalEndpointName = AddressSpaceUtils.getExternalEndpointName(addressSpace, "messaging");
messagingEndpoint = Kubernetes.getInstance().getExternalEndpoint(externalEndpointName + "-" + AddressSpaceUtils.getAddressSpaceInfraUuid(addressSpace));
}
Endpoint clientEndpoint;
ProtonClientOptions clientOptions = new ProtonClientOptions();
clientOptions.setSsl(true);
clientOptions.setTrustAll(true);
clientOptions.setHostnameVerificationAlgorithm("");
if (TestUtils.resolvable(messagingEndpoint)) {
clientEndpoint = messagingEndpoint;
} else {
clientEndpoint = new Endpoint("localhost", 443);
clientOptions.setSniServerName(messagingEndpoint.getHost());
}
log.info("External endpoint: " + clientEndpoint + ", internal: " + messagingEndpoint);
probeEndpoint(messagingEndpoint);
return createClient(terminusFactory, clientEndpoint, clientOptions, qos);
}
示例13
void doTestSendReceiveOutsideCluster(String host, int port, String address, boolean tls, boolean verifyHost, String caCert) throws Exception {
ProtonClientOptions protonClientOptions = new ProtonClientOptions();
if (tls) {
protonClientOptions.setSsl(true);
if (!verifyHost) {
protonClientOptions.setHostnameVerificationAlgorithm("");
}
if (caCert != null) {
protonClientOptions.setTrustOptions(new PemTrustOptions()
.addCertValue(Buffer.buffer(caCert)));
}
}
AmqpClient client = resourceManager.getAmqpClientFactory().createClient(new AmqpConnectOptions()
.setSaslMechanism("ANONYMOUS")
.setQos(ProtonQoS.AT_LEAST_ONCE)
.setEndpoint(new Endpoint(host, port))
.setProtonClientOptions(protonClientOptions)
.setTerminusFactory(new QueueTerminusFactory()));
assertEquals(1, client.sendMessages(address, Collections.singletonList("hello")).get(1, TimeUnit.MINUTES));
var result = client.recvMessages(address, 1).get();
assertEquals(1, result.size());
assertEquals("hello", ((AmqpValue) result.get(0).getBody()).getValue());
}
示例14
public ClientFactory(Vertx vertx) {
this.vertx = vertx;
String path = System.getenv("HOME") + System.getenv("AMQ_NAME") + "/etc";
JksOptions clientJksOptions = new JksOptions();
clientJksOptions
.setPath(path + "/enmasse-keystore.jks")
.setPassword("enmasse");
PfxOptions clientPfxOptions = new PfxOptions()
.setPath(path + "/enmasse-truststore.jks")
.setPassword("enmasse");
this.protonClientOptions = new ProtonClientOptions()
.setSsl(true)
.setHostnameVerificationAlgorithm("")
.setKeyStoreOptions(clientJksOptions)
.setPfxTrustOptions(clientPfxOptions);
}
示例15
/**
* {@inheritDoc}
*/
@Override
public final Future<HonoConnection> connect(final ProtonClientOptions options) {
final Promise<HonoConnection> result = Promise.promise();
if (shuttingDown.get()) {
result.fail(new ClientErrorException(HttpURLConnection.HTTP_CONFLICT, "client is already shut down"));
} else {
connect(options, result, null);
}
return result.future();
}
示例16
/**
* Verifies username/password credentials with a remote authentication server using SASL PLAIN.
*
* @param authzid The identity to act as.
* @param authcid The username.
* @param password The password.
* @param authenticationResultHandler The handler to invoke with the authentication result. On successful authentication,
* the result contains a JWT with the authenticated user's claims.
*/
public void verifyPlain(final String authzid, final String authcid, final String password,
final Handler<AsyncResult<HonoUser>> authenticationResultHandler) {
final ProtonClientOptions options = new ProtonClientOptions();
options.setReconnectAttempts(3).setReconnectInterval(50);
options.addEnabledSaslMechanism(AuthenticationConstants.MECHANISM_PLAIN);
final Promise<ProtonConnection> connectAttempt = Promise.promise();
factory.connect(options, authcid, password, null, null, connectAttempt);
connectAttempt.future()
.compose(openCon -> getToken(openCon))
.onComplete(s -> {
if (s.succeeded()) {
authenticationResultHandler.handle(Future.succeededFuture(s.result()));
} else {
authenticationResultHandler
.handle(Future.failedFuture(mapConnectionFailureToServiceInvocationException(s.cause())));
}
Optional.ofNullable(connectAttempt.future().result())
.ifPresent(con -> {
LOG.debug("closing connection to Authentication service");
con.close();
});
});
}
示例17
/**
* Verifies that the client tries to re-establish a lost connection to a server.
*
* @param ctx The vert.x test context.
*/
@Test
public void testDownstreamDisconnectTriggersReconnect(final VertxTestContext ctx) {
// GIVEN an client that is connected to a peer to which the
// connection can be established on the third attempt only
connectionFactory = new DisconnectHandlerProvidingConnectionFactory(con);
props.setReconnectAttempts(1);
final ProtonClientOptions options = new ProtonClientOptions()
.setReconnectAttempts(0);
honoConnection = new HonoConnectionImpl(vertx, connectionFactory, props);
honoConnection.connect(options).onComplete(ctx.succeeding());
ctx.verify(() -> assertThat(connectionFactory.await()).isTrue());
connectionFactory.setExpectedSucceedingConnectionAttempts(1);
// WHEN the downstream connection fails
connectionFactory.getDisconnectHandler().handle(con);
// THEN the adapter reconnects to the downstream container
ctx.verify(() -> assertThat(connectionFactory.await()).isTrue());
connectionFactory.setExpectedSucceedingConnectionAttempts(1);
// and when the downstream connection fails again
connectionFactory.getDisconnectHandler().handle(con);
// THEN the adapter reconnects to the downstream container again
ctx.verify(() -> assertThat(connectionFactory.await()).isTrue());
ctx.completeNow();
}
示例18
/**
* Verifies that the client tries to re-connect to a server instance if the
* connection is closed by the peer.
*
* @param ctx The test context.
*
*/
@Test
public void testOnRemoteCloseTriggersReconnection(final VertxTestContext ctx) {
// GIVEN a client that is connected to a server
final Promise<HonoConnection> connected = Promise.promise();
@SuppressWarnings("unchecked")
final DisconnectListener<HonoConnection> disconnectListener = mock(DisconnectListener.class);
honoConnection.addDisconnectListener(disconnectListener);
honoConnection.connect(new ProtonClientOptions().setReconnectAttempts(1))
.onComplete(connected);
connectionFactory.setExpectedSucceedingConnectionAttempts(1);
connected.future().onComplete(ctx.succeeding(c -> {
// WHEN the peer closes the connection
connectionFactory.getCloseHandler().handle(Future.failedFuture("shutting down for maintenance"));
ctx.verify(() -> {
// THEN the client invokes the registered disconnect handler
verify(disconnectListener).onDisconnect(honoConnection);
// and the original connection has been closed locally
verify(con).close();
verify(con).disconnectHandler(null);
// and the connection is re-established
assertThat(connectionFactory.await()).isTrue();
});
ctx.completeNow();
}));
}
示例19
@Override
public void connect(
final ProtonClientOptions options,
final Handler<AsyncResult<ProtonConnection>> closeHandler,
final Handler<ProtonConnection> disconnectHandler,
final Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {
connect(options, null, null, closeHandler, disconnectHandler, connectionResultHandler);
}
示例20
@Override
public void connect(
final ProtonClientOptions options,
final String username,
final String password,
final Handler<AsyncResult<ProtonConnection>> closeHandler,
final Handler<ProtonConnection> disconnectHandler,
final Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {
connect(options, null, null, null, closeHandler, disconnectHandler, connectionResultHandler);
}
示例21
/**
* Create a HTTP client for accessing the device registry (for registering devices and credentials) and
* an AMQP 1.0 client for consuming messages from the messaging network.
*
* @param ctx The Vert.x test context.
*/
@BeforeAll
public static void setup(final VertxTestContext ctx) {
VERTX = Vertx.vertx();
defaultOptions = new ProtonClientOptions()
.setTrustOptions(new PemTrustOptions().addCertPath(IntegrationTestSupport.TRUST_STORE_PATH))
.setHostnameVerificationAlgorithm("")
.setSsl(true);
helper = new IntegrationTestSupport(VERTX);
helper.init().onComplete(ctx.completing());
}
示例22
@Override
public void connect(
final ProtonClientOptions options,
final Handler<AsyncResult<ProtonConnection>> closeHandler,
final Handler<ProtonConnection> disconnectHandler,
final Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {
connect(options, null, null, closeHandler, disconnectHandler, connectionResultHandler);
}
示例23
@Override
public void connect(
final ProtonClientOptions options,
final String username,
final String password,
final Handler<AsyncResult<ProtonConnection>> closeHandler,
final Handler<ProtonConnection> disconnectHandler,
final Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {
connect(options, username, password, null, closeHandler, disconnectHandler, connectionResultHandler);
}
示例24
private void failConnectionAttempt(final ProtonClientOptions clientOptions,
final Handler<AsyncResult<ProtonConnection>> connectionResultHandler, final Throwable cause) {
logger.debug("can't connect to AMQP 1.0 container [{}://{}:{}, role: {}]: {}",
clientOptions.isSsl() ? "amqps" : "amqp",
config.getHost(),
config.getPort(),
config.getServerRole(),
cause.getMessage());
connectionResultHandler.handle(Future.failedFuture(cause));
}
示例25
private void handleTimedOutOpenHandlerResult(final AsyncResult<ProtonConnection> openConnectionResult,
final ProtonConnection downstreamConnection, final ProtonClientOptions clientOptions) {
if (openConnectionResult.succeeded()) {
logger.debug("ignoring received open frame from container [{}] at [{}://{}:{}, role: {}]: connection attempt already timed out",
downstreamConnection.getRemoteContainer(),
clientOptions.isSsl() ? "amqps" : "amqp",
config.getHost(),
config.getPort(),
config.getServerRole());
// close the connection again
downstreamConnection.closeHandler(null);
downstreamConnection.disconnectHandler(null);
downstreamConnection.close();
} else {
final ErrorCondition error = downstreamConnection.getRemoteCondition();
if (error == null) {
logger.warn("ignoring failure to open connection to container [{}] at [{}://{}:{}, role: {}]: attempt already timed out",
downstreamConnection.getRemoteContainer(),
clientOptions.isSsl() ? "amqps" : "amqp",
config.getHost(),
config.getPort(),
config.getServerRole(),
openConnectionResult.cause());
} else {
logger.warn("ignoring failure to open connection to container [{}] at [{}://{}:{}, role: {}]: attempt already timed out; error: {} -{}",
downstreamConnection.getRemoteContainer(),
clientOptions.isSsl() ? "amqps" : "amqp",
config.getHost(),
config.getPort(),
config.getServerRole(),
error.getCondition(),
error.getDescription());
}
}
}
示例26
private void addOptions(final ProtonClientOptions clientOptions, final String username, final String password) {
addTlsTrustOptions(clientOptions);
if (!Strings.isNullOrEmpty(username) && !Strings.isNullOrEmpty(password)) {
clientOptions.addEnabledSaslMechanism(ProtonSaslPlainImpl.MECH_NAME);
} else {
addTlsKeyCertOptions(clientOptions);
}
}
示例27
private void addTlsKeyCertOptions(final ProtonClientOptions clientOptions) {
if (clientOptions.getKeyCertOptions() == null) {
final KeyCertOptions keyCertOptions = config.getKeyCertOptions();
if (keyCertOptions != null) {
clientOptions.setSsl(true).setKeyCertOptions(keyCertOptions);
clientOptions.addEnabledSaslMechanism(ProtonSaslExternalImpl.MECH_NAME);
}
}
}
示例28
private ProtonClientOptions createClientOptions() {
final ProtonClientOptions options = new ProtonClientOptions();
options.setConnectTimeout(config.getConnectTimeout());
options.setHeartbeat(config.getHeartbeatInterval());
options.setReconnectAttempts(0);
return options;
}
示例29
/**
* Connect to an AMQP server/router
*
* @param startPromise
*/
private void connectAmqpClient(Promise<Void> startPromise) {
this.client = ProtonClient.create(this.vertx);
String host = this.bridgeConfig.getAmqpConfig().getHost();
int port = this.bridgeConfig.getAmqpConfig().getPort();
ProtonClientOptions options = this.createClientOptions();
this.client.connect(options, host, port, ar -> {
if (ar.succeeded()) {
ProtonConnection connection = ar.result();
connection.setContainer(CONTAINER_ID);
this.processConnection(connection);
log.info("AMQP-Kafka Bridge started and connected in client mode to {}:{}", host, port);
log.info("AMQP-Kafka Bridge bootstrap servers {}",
this.bridgeConfig.getKafkaConfig().getConfig()
.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
);
this.isReady = true;
startPromise.complete();
} else {
log.error("Error connecting AMQP-Kafka Bridge as client", ar.cause());
startPromise.fail(ar.cause());
}
});
}
示例30
private boolean canConnectWithAmqpAddress(ResourceManager resourceManager, AddressSpace addressSpace, UserCredentials credentials, AddressType addressType, String address, boolean defaultValue) throws Exception {
Set<AddressType> brokeredAddressTypes = new HashSet<>(Arrays.asList(AddressType.QUEUE, AddressType.TOPIC));
if (AddressSpaceUtils.isBrokered(addressSpace) && !brokeredAddressTypes.contains(addressType)) {
return defaultValue;
}
try (AmqpClient client = resourceManager.getAmqpClientFactory().createAddressClient(addressSpace, addressType)) {
client.getConnectOptions().setCredentials(credentials);
ProtonClientOptions protonClientOptions = client.getConnectOptions().getProtonClientOptions();
protonClientOptions.setLogActivity(true);
client.getConnectOptions().setProtonClientOptions(protonClientOptions);
try {
Future<List<Message>> received = client.recvMessages(address, 1);
Future<Integer> sent = client.sendMessages(address, Collections.singletonList("msg1"));
int numReceived = received.get(1, TimeUnit.MINUTES).size();
int numSent = sent.get(1, TimeUnit.MINUTES);
return (numSent == numReceived);
} catch (ExecutionException | SecurityException | UnauthorizedAccessException ex) {
Throwable cause = ex;
if (ex instanceof ExecutionException) {
cause = ex.getCause();
}
if (cause instanceof AuthenticationException || cause instanceof SaslSystemException || cause instanceof SecurityException || cause instanceof UnauthorizedAccessException || cause instanceof MechanismMismatchException) {
LOGGER.info("canConnectWithAmqpAddress {} ({}): {}", address, addressType, ex.getMessage());
return false;
} else {
LOGGER.warn("canConnectWithAmqpAddress {} ({}) exception", address, addressType, ex);
throw ex;
}
}
}
}