Java源码示例:com.rabbitmq.client.ConnectionFactory
示例1
public static ConnectionFactory createConnectionFactory(BaseRabbitConfigBean conf) throws StageException {
RabbitCxnFactoryBuilder builder =
new RabbitCxnFactoryBuilder(
conf.uri,
conf.credentialsConfig.username.get(),
conf.credentialsConfig.password.get(),
conf.rabbitmqProperties,
conf.tlsConfig.getSslContext()
).setConnectionTimeout(conf.advanced.connectionTimeout)
.setAutomaticRecoveryEnabled(conf.advanced.automaticRecoveryEnabled)
.setNetworkRecoveryInterval(conf.advanced.networkRecoveryInterval)
.setHeartbeatInterval(conf.advanced.heartbeatInterval)
.setHandshakeTimeout(conf.advanced.handshakeTimeout)
.setShutdownTimeout(conf.advanced.shutdownTimeout)
.setFrameMax(conf.advanced.frameMax)
.setChannelMax(conf.advanced.channelMax);
return builder.build();
}
示例2
public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://guest:[email protected]");
factory.setConnectionTimeout(300000);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("my-queue", true, false, false, null);
int count = 0;
while (count < 5000) {
String message = "Message number " + count;
channel.basicPublish("", "my-queue", null, message.getBytes());
count++;
System.out.println("Published message: " + message);
Thread.sleep(5000);
}
}
示例3
AMQPObservableQueue(final ConnectionFactory factory, final Address[] addresses, final boolean useExchange,
final AMQPSettings settings, final int batchSize, final int pollTimeInMS) {
if (factory == null) {
throw new IllegalArgumentException("Connection factory is undefined");
}
if (addresses == null || addresses.length == 0) {
throw new IllegalArgumentException("Addresses are undefined");
}
if (settings == null) {
throw new IllegalArgumentException("Settings are undefined");
}
if (batchSize <= 0) {
throw new IllegalArgumentException("Batch size must be greater than 0");
}
if (pollTimeInMS <= 0) {
throw new IllegalArgumentException("Poll time must be greater than 0 ms");
}
this.factory = factory;
this.addresses = addresses;
this.useExchange = useExchange;
this.settings = settings;
this.batchSize = batchSize;
this.setPollTimeInMS(pollTimeInMS);
}
示例4
@Test
public void throwExceptionIfConnectionFactoryReturnNull() throws Exception {
RMQConnectionConfig connectionConfig = Mockito.mock(RMQConnectionConfig.class);
ConnectionFactory connectionFactory = Mockito.mock(ConnectionFactory.class);
Connection connection = Mockito.mock(Connection.class);
Mockito.when(connectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
Mockito.when(connection.createChannel()).thenReturn(null);
RMQSource<String> rmqSource = new RMQSource<>(
connectionConfig, "queueDummy", true, new StringDeserializationScheme());
try {
rmqSource.open(new Configuration());
} catch (RuntimeException ex) {
assertEquals("None of RabbitMQ channels are available", ex.getMessage());
}
}
示例5
public static void sendMessage(String message)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ConfigurationLoader.getInstance().getRabbitmqNodename());
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
示例6
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建一个ConnectionFactory, 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
//2、通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3、通过connection创建一个Channel
Channel channel = connection.createChannel();
//4、通过Channel发送数据
for (int i = 0; i < 5; i++) {
String msg = "Hello RabbitMQ!";
//1 exchange 2 routingKey
channel.basicPublish("", "test001", null, msg.getBytes());
}
//5、记得要关闭相关的连接
channel.close();
connection.close();
}
示例7
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_consumer_exchange";
String routingKey = "consumer.save";
String msg = "Hello RabbitMQ Consumer Message";
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
示例8
@Override
public void open(Configuration config) throws Exception {
ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory();
try {
connection = factory.newConnection();
channel = connection.createChannel();
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
}
setupQueue();
if (returnListener != null) {
channel.addReturnListener(returnListener);
}
} catch (IOException e) {
throw new RuntimeException("Error while creating the channel", e);
}
}
示例9
@Test
public void serviceShouldPublishAttributeContentWhenAttributeInMailAndIsAMap() throws Exception {
mailet.init(mailetConfig);
Channel channel = mock(Channel.class);
Connection connection = mock(Connection.class);
when(connection.createChannel()).thenReturn(channel);
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
when(connectionFactory.newConnection()).thenReturn(connection);
mailet.setConnectionFactory(connectionFactory);
Mail mail = mock(Mail.class);
when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT);
BasicProperties expectedProperties = new AMQP.BasicProperties();
mailet.service(mail);
ArgumentCaptor<BasicProperties> basicPropertiesCaptor = ArgumentCaptor.forClass(BasicProperties.class);
verify(channel).basicPublish(eq(EXCHANGE_NAME), eq(ROUTING_KEY), basicPropertiesCaptor.capture(), eq(ATTACHMENT_CONTENT));
assertThat(basicPropertiesCaptor.getValue()).isEqualToComparingFieldByField(expectedProperties);
}
示例10
@Test
public void serviceShouldPublishAttributeContentWhenAttributeInMailAndIsAList() throws Exception {
mailet.init(mailetConfig);
Channel channel = mock(Channel.class);
Connection connection = mock(Connection.class);
when(connection.createChannel()).thenReturn(channel);
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
when(connectionFactory.newConnection()).thenReturn(connection);
mailet.setConnectionFactory(connectionFactory);
Mail mail = mock(Mail.class);
when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(Optional.of(new Attribute(MAIL_ATTRIBUTE, AttributeValue.of(ImmutableList.of(AttributeValue.ofSerializable(ATTACHMENT_CONTENT))))));
BasicProperties expectedProperties = new AMQP.BasicProperties();
mailet.service(mail);
ArgumentCaptor<BasicProperties> basicPropertiesCaptor = ArgumentCaptor.forClass(BasicProperties.class);
verify(channel).basicPublish(eq(EXCHANGE_NAME), eq(ROUTING_KEY), basicPropertiesCaptor.capture(), eq(ATTACHMENT_CONTENT));
assertThat(basicPropertiesCaptor.getValue()).isEqualToComparingFieldByField(expectedProperties);
}
示例11
private synchronized Connection createConnection() throws KeyManagementException, NoSuchAlgorithmException, IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(username);
factory.setPassword(password);
factory.setHost(amqpHost);
factory.setPort(port);
if (virtualHost != null && !virtualHost.isEmpty()) {
factory.setVirtualHost(virtualHost);
} else {
factory.setVirtualHost("/");
}
if (useSsl != null && !useSsl.isEmpty() && useSsl.equalsIgnoreCase("true")) {
factory.useSslProtocol(secureProtocol);
}
Connection connection = factory.newConnection();
connection.addShutdownListener(disconnectHandler);
connection.addBlockedListener(blockedConnectionHandler);
s_connection = connection;
return s_connection;
}
示例12
public static ConnectionConfig getFromStormConfig(Map<String, Object> stormConfig) {
if (stormConfig.containsKey("rabbitmq.uri")) {
return new ConnectionConfig(getFromMap("rabbitmq.uri", stormConfig));
} else {
String highAvailabilityHostsString = getFromMap("rabbitmq.ha.hosts", stormConfig);
if(highAvailabilityHostsString != null){
final ConfigAvailableHosts haHosts = ConfigAvailableHosts.fromString(highAvailabilityHostsString);
return new ConnectionConfig(haHosts,
getFromMap("rabbitmq.host", stormConfig, ConnectionFactory.DEFAULT_HOST),
getFromMapAsInt("rabbitmq.port", stormConfig, ConnectionFactory.DEFAULT_AMQP_PORT),
getFromMap("rabbitmq.username", stormConfig, ConnectionFactory.DEFAULT_USER),
getFromMap("rabbitmq.password", stormConfig, ConnectionFactory.DEFAULT_PASS),
getFromMap("rabbitmq.virtualhost", stormConfig, ConnectionFactory.DEFAULT_VHOST),
getFromMapAsInt("rabbitmq.heartbeat", stormConfig, ConnectionFactory.DEFAULT_HEARTBEAT),
getFromMapAsBoolean("rabbitmq.ssl", stormConfig, false));
}else{
return new ConnectionConfig(getFromMap("rabbitmq.host", stormConfig, ConnectionFactory.DEFAULT_HOST),
getFromMapAsInt("rabbitmq.port", stormConfig, ConnectionFactory.DEFAULT_AMQP_PORT),
getFromMap("rabbitmq.username", stormConfig, ConnectionFactory.DEFAULT_USER),
getFromMap("rabbitmq.password", stormConfig, ConnectionFactory.DEFAULT_PASS),
getFromMap("rabbitmq.virtualhost", stormConfig, ConnectionFactory.DEFAULT_VHOST),
getFromMapAsInt("rabbitmq.heartbeat", stormConfig, ConnectionFactory.DEFAULT_HEARTBEAT),
getFromMapAsBoolean("rabbitmq.ssl", stormConfig, false));
}
}
}
示例13
@Test
public void testConfigureFactory_nullIp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
String username = "toto";
String password= "123456789";
Map<String,String> configuration = new HashMap<> ();
configuration.put( RABBITMQ_SERVER_IP, null );
configuration.put( RABBITMQ_SERVER_USERNAME, username );
configuration.put( RABBITMQ_SERVER_PASSWORD, password );
RabbitMqUtils.configureFactory( factory, configuration );
Assert.assertEquals( username, factory.getUsername());
Assert.assertEquals( password, factory.getPassword());
}
示例14
public static void main(String[] argv) throws Exception {
// 创建工厂类
ConnectionFactory factory = new ConnectionFactory();
//factory.setHost("localhost");
factory.setUri("amqp://guest:[email protected]:5672");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Map map=new HashMap();
map.put("aa", 11);
map.put("bb", 22);
map.put("cc", 33);
String message = getMessage(argv);
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
示例15
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址
factory.setHost("127.0.0.1");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明要关注的队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("C [*] Waiting for messages. To exit press CTRL+C");
// DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C [x] Received '" + message + "'");
}
};
// 自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
}
示例16
public static void main(String[] args)
throws java.io.IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException{
ConnectionFactory factory=new ConnectionFactory(); //创建连接工厂
// factory.setHost("localhost");
// factory.setVirtualHost("my_mq");
// factory.setUsername("zhxia");
// factory.setPassword("123456");
factory.setUri("amqp://guest:[email protected]:5672");//获取url
Connection connection=factory.newConnection(); //创建连接
Channel channel=connection.createChannel();//创建信道
channel.queueDeclare(queue_name, durable, false, false, null); //声明消息队列,且为可持久化的
String message="Hello world"+Math.random();
//将队列设置为持久化之后,还需要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("Send message:"+message);
channel.close();
connection.close();
}
示例17
@Override
public Connection createConnection(ConnectionFactory connectionFactory)
throws IOException, TimeoutException {
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setRequestedHeartbeat(requestedConnectionHeartbeatTimeout);
connectionFactory.setConnectionTimeout(connectTimeout);
if (secure) {
final SSLContext sslContext;
try {
sslContext = sslContextFactory.createSSLContext();
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("error during connect, fatal system configuration", e);
}
connectionFactory.setSslContextFactory(name -> sslContext);
}
if (virtualHost != null) {
connectionFactory.setVirtualHost(virtualHost);
}
if (brokerHosts.isEmpty()) {
throw new IllegalArgumentException("No broker host defined");
}
return connectionFactory.newConnection(new ArrayList<>(brokerHosts));
}
示例18
public RabbitMQProducerClient(String host, int port, String username, String password) {
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
}
示例19
static ConnectionFactory createConnectionFactory(RabbitMQContainer container) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(container.getContainerIpAddress());
connectionFactory.setPort(container.getMappedPort(RabbitMQContainer.PORTS[0]));
return connectionFactory;
}
示例20
@Override
public void open(Configuration config) throws Exception {
super.open(config);
ConnectionFactory factory = setupConnectionFactory();
try {
connection = factory.newConnection();
channel = connection.createChannel();
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
}
setupQueue();
consumer = new QueueingConsumer(channel);
RuntimeContext runtimeContext = getRuntimeContext();
if (runtimeContext instanceof StreamingRuntimeContext
&& ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) {
autoAck = false;
// enables transaction mode
channel.txSelect();
} else {
autoAck = true;
}
LOG.debug("Starting RabbitMQ source with autoAck status: " + autoAck);
channel.basicConsume(queueName, autoAck, consumer);
} catch (IOException e) {
throw new RuntimeException("Cannot create RMQ connection with " + queueName + " at "
+ rmqConnectionConfig.getHost(), e);
}
running = true;
}
示例21
public static void main(String[] args) throws Exception {
// 创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 通过链接工厂创建链接
Connection connection = connectionFactory.newConnection();
// 通过链接创建通道(channel)
Channel channel = connection.createChannel();
// 通过 channel 发送数据
// exchange:交换机,如果不传默认为 AMQP default
String topicExchangeName = "topic_exchange_name";
String topicRoutingKey1 = "topic_routingKey.test1";
String topicRoutingKey2 = "topic_routingKey.test2";
String topicRoutingKey3 = "topic_routingKey.test3.test03";
String topicMsg = "topic hello world";
channel.basicPublish(topicExchangeName, topicRoutingKey1, null, topicMsg.getBytes());
channel.basicPublish(topicExchangeName, topicRoutingKey2, null, topicMsg.getBytes());
channel.basicPublish(topicExchangeName, topicRoutingKey3, null, topicMsg.getBytes());
// 关闭链接
channel.close();
connection.close();
}
示例22
private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(this.uri);
connectionFactory.setRequestedHeartbeat(this.heartbeat);
return connectionFactory.newConnection();
}
示例23
@Test
public void shouldSetProvidedValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException,
KeyManagementException, URISyntaxException {
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName("guest")
.setPassword("guest").setVirtualHost("/")
.setConnectionTimeout(5000).build();
ConnectionFactory factory = connectionConfig.getConnectionFactory();
assertEquals(5000, factory.getConnectionTimeout());
}
示例24
@Before
public void before() throws Exception {
serializationSchema = spy(new DummySerializationSchema());
rmqConnectionConfig = mock(RMQConnectionConfig.class);
connectionFactory = mock(ConnectionFactory.class);
connection = mock(Connection.class);
channel = mock(Channel.class);
when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
when(connectionFactory.newConnection()).thenReturn(connection);
when(connection.createChannel()).thenReturn(channel);
}
示例25
public static Connection getAmqpConnection(String userName, String password, String brokerHost, String port)
throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("carbon");
connectionFactory.setHost(brokerHost);
connectionFactory.setPort(Integer.valueOf(port));
return connectionFactory.newConnection();
}
示例26
public static Channel connect() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(32768);
Connection conn = factory.newConnection();
return conn.createChannel();
}
示例27
@Test
public void testPush_autorecovery() throws Exception {
ConnectionFactory connectionFactory = getConnectionFactory();
connectionFactory.setAutomaticRecoveryEnabled(true);
testRunner.runPushTest();
}
示例28
public static void main(String[] args) throws Exception {
//1 创建一个ConnectionFactory, 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 声明(创建)一个队列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6 设置Channel
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
//7 获取消息
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
Map<String, Object> headers = delivery.getProperties().getHeaders();
System.err.println("headers get my1 value: " + headers.get("my1") + "\tmy1 value:" + headers.get("my2"));
//Envelope envelope = delivery.getEnvelope();
}
}
示例29
@Test
public void testPush() throws Exception {
ConnectionFactory connectionFactory = getConnectionFactory();
connectionFactory.setAutomaticRecoveryEnabled(false);
testRunner.runPushTest();
}
示例30
@Override
public void init(Properties properties) {
RabbitMQProducerConfig rabbitMQProperties = new RabbitMQProducerConfig();
this.mqProperties = rabbitMQProperties;
super.init(properties);
loadRabbitMQProperties(properties);
ConnectionFactory factory = new ConnectionFactory();
String servers = rabbitMQProperties.getHost();
if (servers.contains(":")) {
String[] serverHostAndPort = servers.split(":");
factory.setHost(serverHostAndPort[0]);
factory.setPort(Integer.parseInt(serverHostAndPort[1]));
} else {
factory.setHost(servers);
}
if (mqProperties.getAliyunAccessKey().length() > 0 && mqProperties.getAliyunSecretKey().length() > 0
&& mqProperties.getAliyunUid() > 0) {
factory.setCredentialsProvider(new AliyunCredentialsProvider(mqProperties.getAliyunAccessKey(),
mqProperties.getAliyunSecretKey(),
mqProperties.getAliyunUid()));
} else {
factory.setUsername(rabbitMQProperties.getUsername());
factory.setPassword(rabbitMQProperties.getPassword());
}
factory.setVirtualHost(rabbitMQProperties.getVirtualHost());
try {
connect = factory.newConnection();
channel = connect.createChannel();
// channel.exchangeDeclare(mqProperties.getExchange(), "topic");
} catch (IOException | TimeoutException ex) {
throw new CanalException("Start RabbitMQ producer error", ex);
}
}