Java源码示例:io.openmessaging.Message
示例1
@Override
public void run() {
while (true) {
try {
String queueOrTopic;
//queue和topic数量为1:9,都只有10种类型
if (sendNum % 10 == 0) {
queueOrTopic = "QUEUE_" + random.nextInt(10);
} else {
queueOrTopic = "TOPIC_" + random.nextInt(10);
}
Message message = producer.createBytesMessageToQueue(queueOrTopic, (label + "_" + offsets.get(queueOrTopic)).getBytes());
logger.debug("queueOrTopic:{} offset:{}", queueOrTopic, label + "_" + offsets.get(queueOrTopic));
offsets.put(queueOrTopic, offsets.get(queueOrTopic) + 1);
producer.send(message);
sendNum++;
if (sendNum >= Constants.PRO_MAX) {
break;
}
} catch (Exception e) {
logger.error("Error occurred in the sending process", e);
break;
}
}
}
示例2
@Test
public void testConsumeMessage() {
final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
context.ack();
}
});
((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
}
示例3
private SendResult send(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
try {
org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
log.error(String.format("Send message to RocketMQ failed, %s", message));
throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
}
message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
return OMSUtil.sendResultConvert(rmqResult);
} catch (Exception e) {
log.error(String.format("Send message to RocketMQ failed, %s", message), e);
throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
}
}
示例4
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.startup();
System.out.printf("Consumer startup OK%n");
while (true) {
Message message = consumer.poll();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
}
}
}
示例5
@Override
public void sendOneway(final Message message) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
try {
this.rocketmqProducer.sendOneway(rmqMessage);
} catch (Exception ignore) { //Ignore the oneway exception.
}
}
示例6
@Override
public void sendOneway(final Message message) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
try {
this.rocketmqProducer.sendOneway(rmqMessage);
} catch (Exception ignore) { //Ignore the oneway exception.
}
}
示例7
@Override
public Message putProperties(String key, long value) {
if (properties == null)
properties = new DefaultKeyValue();
properties.put(key, value);
return this;
}
示例8
@Test
public void testPoll() {
final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic(queueName);
when(localMessageCache.poll()).thenReturn(consumedMsg);
Message message = consumer.poll();
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
}
示例9
@Override
public void sendOneway(final Message message) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
try {
this.rocketmqProducer.sendOneway(rmqMessage);
} catch (Exception ignore) { //Ignore the oneway exception.
}
}
示例10
@Override public void send(Message message) {
//处理以前没有发送完留下的对象
if( readyForRead.size() > 0){
Iterator<Entry<String, List<DefaultBytesMessageList>>> it = readyForRead.entrySet().iterator();
Entry<String, List<DefaultBytesMessageList>> e = null;
String key = null;
while(it.hasNext()){
e = it.next();
key = e.getKey();
//能把值更改为true的才获得mapBuffer,并开始写。否则等待下次send
if(isReading.get(key).compareAndSet(false, true)){
mapBuffer = allMapBuffer.get(key);
sendList(e.getValue(), storeOff.get(key));
isReading.get(key).set(false);
it.remove();
storeOff.put(key, 0);
}
}
}
/**
* 当存储偏移达到writeThreshold时,写该list. off为下一个要写的数组下标
* 偏移大于writeThreshold时,肯定已经在readyForRead中了。
* 偏移小于于writeThreshold时,不用处理
*/
if(off == writeThreshold){
//能把值更改为true的才获得mapBuffer,并开始写。否则等待下次send
if(isReading.get(queueOrTopic).compareAndSet(false, true)){
mapBuffer = allMapBuffer.get(queueOrTopic);
sendList(list , off);
isReading.get(queueOrTopic).set(false);
storeOff.put(queueOrTopic, 0);
}else{
//需要清理,但是获得buffer失败,偏移+1,尝试放入readyForRead
readyForRead.put(queueOrTopic, list);
}
}
}
示例11
@Override
public Message putProperties(String key, int value) {
if (properties == null)
properties = new DefaultKeyValue();
properties.put(key, value);
return this;
}
示例12
@Test
public void testPoll_WithTimeout() {
//There is a default timeout value, @see ClientConfig#omsOperationTimeout.
Message message = consumer.receive();
assertThat(message).isNull();
message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
assertThat(message).isNull();
}
示例13
MessageExt poll(final KeyValue properties) {
int currentPollTimeout = clientConfig.getOperationTimeout();
if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT);
}
return poll(currentPollTimeout);
}
示例14
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.startup();
System.out.printf("Consumer startup OK%n");
while (true) {
Message message = consumer.poll();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
}
}
}
示例15
@Override public Message putProperties(String key, double value) {
if (prosBiulder == null) prosBiulder = new StringBuilder(sem);
// if (properties == null) properties = new DefaultKeyValue();
prosBiulder.append(key);
prosBiulder.append(sem);
prosBiulder.append(value);
prosBiulder.append(sem);
proSize++;
return this;
}
示例16
@Test
public void testPoll() {
final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic(queueName);
when(localMessageCache.poll()).thenReturn(consumedMsg);
Message message = consumer.receive();
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
}
示例17
@Override public void send(Message message) {
//处理以前没有发送完留下的对象
if( readyForRead.size() > 0){
Iterator<Entry<String, List<DefaultBytesMessage>>> it = readyForRead.entrySet().iterator();
Entry<String, List<DefaultBytesMessage>> e = null;
String key = null;
while(it.hasNext()){
e = it.next();
key = e.getKey();
//能把值更改为true的才获得mapBuffer,并开始写。否则等待下次send
if(isReading.get(key).compareAndSet(false, true)){
mapBuffer = allMapBuffer.get(key);
sendList(e.getValue(), storeOff.get(key));
isReading.get(key).set(false);
it.remove();
storeOff.put(key, 0);
}
}
}
/**
* 当存储偏移达到writeThreshold时,写该list. off为下一个要写的数组下标
* 偏移大于writeThreshold时,肯定已经在readyForRead中了。
* 偏移小于于writeThreshold时,不用处理
*/
if(off == writeThreshold){
//能把值更改为true的才获得mapBuffer,并开始写。否则等待下次send
if(isReading.get(queueOrTopic).compareAndSet(false, true)){
mapBuffer = allMapBuffer.get(queueOrTopic);
sendList(list , off);
isReading.get(queueOrTopic).set(false);
storeOff.put(queueOrTopic, 0);
}else{
//需要清理,但是获得buffer失败,偏移+1,尝试放入readyForRead
readyForRead.put(queueOrTopic, list);
}
}
}
示例18
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
context.ack();
}
});
consumer.startup();
System.out.printf("Consumer startup OK%n");
}
示例19
public Message poll() {
// 每个线程都应该先读文件,或者阻塞住等待读文件完毕
if (isReaded == false) {
long start = System.currentTimeMillis();
readFile();
isReaded = true;
// 将files赋为null,下个测试例才知道是新的测试例,重新初始化各个属性
files = null;
long end = System.currentTimeMillis() - start;
System.out.println(Thread.currentThread().getName()+" 读文件耗时(ms):"+end);
}
// 可能某个queue/topic一个message都没有,滚动到下一个
while (currentList == null && bucketOffset < bucketList.size() - 1) {
++bucketOffset;
currentList = data.get(bucketList.get(bucketOffset));
}
if (currentList == null) {
return null;
}
BytesMessage message = currentList.get(messageOffset);
if (messageOffset < currentList.size() - 1) {
messageOffset++;
} else {
// bucketOffset++;
currentList = null;
messageOffset = 0;
}
return message;
}
示例20
@Override
public Message poll() {
if (naiveDataReader.hasNext()) {
return naiveDataReader.next();
} else {
return null;
}
}
示例21
@Override
public Message putProperties(String key, double value) {
if (properties == null)
properties = new DefaultKeyValue();
properties.put(key, value);
return this;
}
示例22
@Override public Message putHeaders(String key, int value) {
headersBiulder.append(seq.get(key));
headersBiulder.append(value);
headersBiulder.append(sem);
headerSize++;
return this;
}
示例23
MessageExt poll(final KeyValue properties) {
int currentPollTimeout = clientConfig.getOperationTimeout();
if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT);
}
return poll(currentPollTimeout);
}
示例24
@Override
public void run() {
while (true) {
try {
String queueOrTopic;
Message message;
if (sendNum % 10 == 0) {
queueOrTopic = "QUEUE_" + random.nextInt(10);
message = producer.createBytesMessageToQueue(queueOrTopic, (label + "_" + offsets.get(queueOrTopic)).getBytes());
} else {
queueOrTopic = "TOPIC_" + random.nextInt(10);
message = producer.createBytesMessageToTopic(queueOrTopic, (label + "_" + offsets.get(queueOrTopic)).getBytes());
}
/**
* 消息主体 :“生产者线程名_偏移” 的字节.
* 对于label(线程名)和queue/topic的组合来说来说,偏移是一直累加的,0、1、2、3……
* 即每个线程对于每个queue/topic的消息中偏移都是一个连续的累加的。
*/
// Message message = producer.createBytesMessageToQueue(queueOrTopic, (label + "_" + offsets.get(queueOrTopic)).getBytes());
logger.debug("queueOrTopic:{} offset:{}", queueOrTopic, label + "_" + offsets.get(queueOrTopic));
//每产生一个消息,该bucket对应的便偏移+1
offsets.put(queueOrTopic, offsets.get(queueOrTopic) + 1);
producer.send(message);
sendNum++;
if (sendNum >= Constants.PRO_MAX) {
//自己测试时记得关闭,不然没法删除持有的文件。实测时kill进程,没有这个问题
producer.shutdown();
break;
}
} catch (Exception e) {
logger.error("Error occurred in the sending process", e);
break;
}
}
}
示例25
@Override public Message putProperties(String key, double value) {
putProperties(key, String.valueOf(value));
// if (prosBiulder == null) prosBiulder = new StringBuilder(sem);
//// if (properties == null) properties = new DefaultKeyValue();
// prosBiulder.append(key);
// prosBiulder.append(sem);
// prosBiulder.append(value);
// prosBiulder.append(sem);
// proSize++;
return this;
}
示例26
@Test
public void testPoll() {
final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic(queueName);
when(localMessageCache.poll()).thenReturn(consumedMsg);
Message message = consumer.receive();
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
}
示例27
@Override
public void start() {
producer.startup();
consumer.attachQueue(queueName, (message, context) -> {
try {
// Need openMessaging to support start consume message from tail.
if(Long.parseLong(message.sysHeaders().getString(Message.BuiltinKeys.BORN_TIMESTAMP)) + 10000 < System.currentTimeMillis()){
context.ack();
return;
}
log.info("Received one message: {}", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID) + "\n");
byte[] bytes = message.getBody(byte[].class);
Map<K, V> map = decodeKeyValue(bytes);
for (K key : map.keySet()) {
dataSynchronizerCallback.onCompletion(null, key, map.get(key));
}
context.ack();
}catch(Exception e){
log.error("BrokerBasedLog process message failed.", e);
}
});
consumer.startup();
}
示例28
@Override public Message putHeaders(String key, long value) {
headers.put(key, value);
return this;
}
示例29
@Override public Promise<Void> sendAsync(Message message, KeyValue properties) {
throw new UnsupportedOperationException("Unsupported");
}
示例30
@Override
public void send(final Message message, final KeyValue properties) {
send(message);
}