Java源码示例:com.alibaba.rocketmq.common.message.MessageExt
示例1
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
示例2
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumeMessageDirectlyResultRequestHeader requestHeader =
(ConsumeMessageDirectlyResultRequestHeader) request
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
ConsumeMessageDirectlyResult result =
this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());
if (null != result) {
response.setCode(ResponseCode.SUCCESS);
response.setBody(result.encode());
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
}
return response;
}
示例3
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
} else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
示例4
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("server checking TrMsg " + msg.toString());
int value = transactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
}
else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
示例5
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl",
filterCode);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
示例6
@Override
public boolean match(MessageExt msg, FilterContext context) {
String property = msg.getUserProperty("SequenceId");
if (property != null) {
int id = Integer.parseInt(property);
if ((id % 3) == 0 && (id > 10)) {
return true;
}
}
return false;
}
示例7
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request
.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
producer.checkTransactionState(addr, messageExt, requestHeader);
}
else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
}
else {
log.warn("checkTransactionState, pick producer group failed");
}
}
else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
示例8
public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, //
final String consumerGroup, //
final String brokerName) {
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);
return result;
}
return null;
}
示例9
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
// System.out.println("server checking TrMsg " + msg.toString());
statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
if (ischeckffalse) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
示例10
public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
List<MessageExt> msgFoundList) {
super();
this.pullStatus = pullStatus;
this.nextBeginOffset = nextBeginOffset;
this.minOffset = minOffset;
this.maxOffset = maxOffset;
this.msgFoundList = msgFoundList;
}
示例11
/**from Message**/
public static MessageView fromMessageExt(MessageExt messageExt) {
MessageView messageView = new MessageView();
BeanUtils.copyProperties(messageExt, messageView);
if (messageExt.getBody() != null) {
messageView.setMessageBody(new String(messageExt.getBody(), Charsets.UTF_8));
}
return messageView;
}
示例12
@Override
public void submitConsumeRequest(//
final List<MessageExt> msgs, //
final ProcessQueue processQueue, //
final MessageQueue messageQueue, //
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
示例13
public void putMessages(final MessageQueue mq, final List<MessageExt> msgs) {
CachedQueue cachedQueue = this.mqCachedTable.get(mq);
if (null == cachedQueue) {
cachedQueue = new CachedQueue();
this.mqCachedTable.put(mq, cachedQueue);
}
for (MessageExt msg : msgs) {
cachedQueue.getMsgCachedTable().put(msg.getQueueOffset(), msg);
}
}
示例14
@Override
public void registerOrderlyListener(final MQMessageListener messageListner) {
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
handleRocketMqMessage(msgs, messageListner);
return ConsumeOrderlyStatus.SUCCESS;
}
});
}
示例15
@Override
public String MessageConsumer(String strBody, MessageExt msg,
ConsumeConcurrentlyContext context)
{
return "我了个去";
}
示例16
void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key)
throws MQClientException, InterruptedException {
admin.start();
QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);
System.out.printf("%-50s %4s %40s%n",//
"#Message ID",//
"#QID",//
"#Offset");
for (MessageExt msg : queryResult.getMessageList()) {
System.out.printf("%-50s %4d %40d%n", msg.getMsgId(), msg.getQueueId(), msg.getQueueOffset());
}
}
示例17
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
}
else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
}
else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
}
else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
示例18
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
TopicFilterType topicFilterType =
(msgInner.getSysFlag() & MessageSysFlag.MultiTagsFlag) == MessageSysFlag.MultiTagsFlag
? TopicFilterType.MULTI_TAG : TopicFilterType.SINGLE_TAG;
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
msgInner.setTopic(msgExt.getTopic());
msgInner.setQueueId(msgExt.getQueueId());
return msgInner;
}
示例19
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Broadcast Consumer Started.");
}
示例20
@Override
public ConsumeConcurrentlyStatus consumeMessage(String strBody, MessageExt msg,
ConsumeConcurrentlyContext context)
{
LOGGER.info("\n 当前线程是" + Thread.currentThread().getId() + " \n 数据是" + strBody);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
示例21
public MessageExt lookMessageByOffset(long commitLogOffset, int size) {
SelectMapedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size);
if (null != sbr) {
try {
return MessageDecoder.decode(sbr.getByteBuffer(), true, false);
} finally {
sbr.release();
}
}
return null;
}
示例22
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
MessageExt msg = this.viewMessage(msgId);
return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
consumerGroup, clientId, msgId, timeoutMillis * 3);
}
示例23
public boolean consumed(final MessageExt msg, final String group) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
ConsumeStats cstats = this.examineConsumeStats(group);
ClusterInfo ci = this.examineBrokerClusterInfo();
Iterator<Entry<MessageQueue, OffsetWrapper>> it = cstats.getOffsetTable().entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, OffsetWrapper> next = it.next();
MessageQueue mq = next.getKey();
if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
if (brokerData != null) {
String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
/* 检查msg是否已经被消费,根据offset进行比较 */
if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
return true;
}
}
}
}
}
/* 还没有被消费 */
return false;
}
示例24
@Override
public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
MessageDecoder.decodeMessageId(msgId);
return this.viewMessage(msgId);
} catch (Exception e) {
}
return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, msgId);
}
示例25
/**
* Consumer将处理不了的消息发回服务器
*
* @param addr
* @param msg
* @param consumerGroup
* @param delayLevel
* @param timeoutMillis
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public void consumerSendMessageBack(final String addr, final MessageExt msg, final String consumerGroup,
final int delayLevel, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
String consumerGroupWithProjectGroup = consumerGroup;
if (!UtilAll.isBlank(projectGroupPrefix)) {
consumerGroupWithProjectGroup =
VirtualEnvUtil.buildWithProjectGroup(consumerGroup, projectGroupPrefix);
msg.setTopic(VirtualEnvUtil.buildWithProjectGroup(msg.getTopic(), projectGroupPrefix));
}
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
requestHeader.setGroup(consumerGroupWithProjectGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
示例26
void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key)
throws MQClientException, InterruptedException {
admin.start();
QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);
System.out.printf("%-50s %4s %40s\n",//
"#Message ID",//
"#QID",//
"#Offset");
for (MessageExt msg : queryResult.getMessageList()) {
System.out.printf("%-50s %4d %40d\n", msg.getMsgId(), msg.getQueueId(), msg.getQueueOffset());
}
}
示例27
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
TopicFilterType topicFilterType =
(msgInner.getSysFlag() & MessageSysFlag.MultiTagsFlag) == MessageSysFlag.MultiTagsFlag ? TopicFilterType.MULTI_TAG
: TopicFilterType.SINGLE_TAG;
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
msgInner.setTopic(msgExt.getTopic());
msgInner.setQueueId(msgExt.getQueueId());
return msgInner;
}
示例28
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
int queueId = Integer.parseInt(queueIdStr);
msgInner.setQueueId(queueId);
return msgInner;
}
示例29
private long getMessageDiff(MessageExt msg){
try {
long offset = msg.getQueueOffset();//消息自身的offset
String maxOffset = msg.getProperty(MessageConst.PROPERTY_MAX_OFFSET);//当前最大的消息offset
long diff = Long.parseLong(maxOffset)-offset;//消费当前消息时积压了多少消息未消费
return diff;
} catch (Exception e) {
return 0;
}
}
示例30
void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key)
throws MQClientException, InterruptedException {
admin.start();
QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);
System.out.printf("%-50s %4s %40s\n", //
"#Message ID", //
"#QID", //
"#Offset");
for (MessageExt msg : queryResult.getMessageList()) {
System.out.printf("%-50s %4d %40d\n", msg.getMsgId(), msg.getQueueId(), msg.getQueueOffset());
}
}