Java源码示例:org.apache.rocketmq.client.impl.MQClientManager
示例1
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ "] has created already, specifed another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mqClientInstance.start();
log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
示例2
@Test
public void testNewGroup() throws Exception {
group = "cg_test_new_rmq_group";
topic = "test-2";
TopicStatsTable ts = mqAdmin.examineTopicStats(topic);
for (Map.Entry<MessageQueue, TopicOffset> messageQueueTopicOffsetEntry : ts.getOffsetTable().entrySet()) {
System.out.println(messageQueueTopicOffsetEntry);
}
TopicRouteData routeInfo = mqAdmin.examineTopicRouteInfo(topic);
System.out.println();
MQClientInstance instance = MQClientManager.getInstance().getAndCreateMQClientInstance(mqAdmin);
for (BrokerData brokerData : routeInfo.getBrokerDatas()) {
ConsumeStats r = instance.getMQClientAPIImpl().getConsumeStats(brokerData.selectBrokerAddr(), group, topic, 3000);
System.out.println(brokerData + "consumeStats:" + r.getOffsetTable());
}
// List<RollbackStats> r = mqAdmin.resetOffsetByTimestampOld(group, topic, -1, true);
// for (RollbackStats rollbackStats : r) {
// System.out.println(rollbackStats);
// }
// ConsumeStats cs = mqAdmin.examineConsumeStats(group, topic);
// for (Map.Entry<MessageQueue, OffsetWrapper> messageQueueOffsetWrapperEntry : cs.getOffsetTable().entrySet()) {
// System.out.println(messageQueueOffsetWrapperEntry);
// }
}
示例3
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST://服务只启动,不创建
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
// 创建mqclient对象 =》
this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
// 注册管理服务处理器=》
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ "] has created already, specifed another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
// 启动mqclient =》
mqClientInstance.start();
log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
示例4
@SuppressWarnings("unchecked")
private DeFiBusClientManager() {
try {
//factoryTable, shared by all producer and consumer
//same clientId will return the same MQClientInstance
//In order to set our own deFiclient instance to all producer and consumer, need to get the pointer of this table
factoryTable = (ConcurrentHashMap<String/* clientId */, DeFiBusClientInstance>) ReflectUtil.getSimpleProperty(MQClientManager.class,
MQClientManager.getInstance(), "factoryTable");
} catch (Exception e) {
LOGGER.warn("failed to initialize factory in mqclient manager.", e);
}
}
示例5
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ "] has created already, specifed another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mqClientInstance.start();
log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
示例6
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ "] has created already, specifed another name please."//
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mqClientInstance.start();
log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
示例7
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ "] has created already, specifed another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mqClientInstance.start();
log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
示例8
@Test
public void testNewGroup() throws Exception {
group = "cg_test_new_rmq_group";
topic = "test-2";
TopicStatsTable ts = mqAdmin.examineTopicStats(topic);
for (Map.Entry<MessageQueue, TopicOffset> messageQueueTopicOffsetEntry : ts.getOffsetTable().entrySet()) {
System.out.println(messageQueueTopicOffsetEntry);
}
TopicRouteData routeInfo = mqAdmin.examineTopicRouteInfo(topic);
System.out.println();
MQClientInstance instance = MQClientManager.getInstance().getAndCreateMQClientInstance(mqAdmin);
for (BrokerData brokerData : routeInfo.getBrokerDatas()) {
ConsumeStats r = instance.getMQClientAPIImpl().getConsumeStats(brokerData.selectBrokerAddr(), group, topic, 3000);
System.out.println(brokerData + "consumeStats:" + r.getOffsetTable());
}
// List<RollbackStats> r = mqAdmin.resetOffsetByTimestampOld(group, topic, -1, true);
// for (RollbackStats rollbackStats : r) {
// System.out.println(rollbackStats);
// }
// ConsumeStats cs = mqAdmin.examineConsumeStats(group, topic);
// for (Map.Entry<MessageQueue, OffsetWrapper> messageQueueOffsetWrapperEntry : cs.getOffsetTable().entrySet()) {
// System.out.println(messageQueueOffsetWrapperEntry);
// }
}
示例9
public void shutdown() {
// Consumer
if (!this.consumerTable.isEmpty())
return;
// AdminExt
if (!this.adminExtTable.isEmpty())
return;
// Producer
if (this.producerTable.size() > 1)
return;
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
this.pullMessageService.shutdown(true);
this.scheduledExecutorService.shutdown();
this.mQClientAPIImpl.shutdown();
this.rebalanceService.shutdown();
if (this.datagramSocket != null) {
this.datagramSocket.close();
this.datagramSocket = null;
}
MQClientManager.getInstance().removeClientFactory(this.clientId);
log.info("the client factory [{}] shutdown OK", this.clientId);
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
}
示例10
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ "] has created already, specifed another name please."//
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mqClientInstance.start();
log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
示例11
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ "] has created already, specifed another name please."//
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mqClientInstance.start();
log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
示例12
private void initMQClientFactory() throws MQClientException {
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
}
示例13
public void shutdown() {
// Consumer
if (!this.consumerTable.isEmpty())
return;
// AdminExt
if (!this.adminExtTable.isEmpty())
return;
// Producer
if (this.producerTable.size() > 1)
return;
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
this.pullMessageService.shutdown(true);
this.scheduledExecutorService.shutdown();
this.mQClientAPIImpl.shutdown();
this.rebalanceService.shutdown();
MQClientManager.getInstance().removeClientFactory(this.clientId);
log.info("the client factory [{}] shutdown OK", this.clientId);
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
}
示例14
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
this.mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ "] has created already, specifed another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mqClientInstance.start();
log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
示例15
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
示例16
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//cproxy custom cid,do not use the default policy.
this.defaultMQPushConsumer.setClientId(this.defaultMQPushConsumer.getInstanceName());
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
if (!MixAll.MQTT_MODE) {
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
}
示例17
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPullConsumer.changeInstanceNameToPID();
}
//cproxy custom cid,do not use the default policy.
this.defaultMQPullConsumer.setClientId(this.defaultMQPullConsumer.getInstanceName());
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
} else {
switch (this.defaultMQPullConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
示例18
public void shutdown() {
// Consumer
if (!this.consumerTable.isEmpty())
return;
// AdminExt
if (!this.adminExtTable.isEmpty())
return;
// Producer
if (this.producerTable.size() > 1)
return;
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
if (!shareThread) {
this.pullMessageService.shutdown(true);
}
if (shareThread) {
if (scheduleTaskFutures != null) {
for (Future future : scheduleTaskFutures) {
future.cancel(true);
}
}
} else {
this.scheduledExecutorService.shutdown();
}
this.mQClientAPIImpl.shutdown();
if (shareThread) {
sharedRebalanceService.remove(this);
} else {
this.rebalanceService.shutdown();
}
if (this.datagramSocket != null) {
this.datagramSocket.close();
this.datagramSocket = null;
}
MQClientManager.getInstance().removeClientFactory(this.clientId);
log.info("the client factory [{}] shutdown OK", this.clientId);
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
}
示例19
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 检查配置=》
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 创建mqclient对象=》
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册生产者=》
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) { //
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 存储topic发布信息
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
// 启动mqclient=》
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 同步发送心跳检测请求向所有的broker=》
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
示例20
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
// 检查配置=》
this.checkConfig();
// copy订阅配置=》
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 创建消费者
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 注册过滤消息的钩子方法
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 广播 本地存储
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
// 集群 远程存储
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加载offset=》
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// 消息消息服务启动=》
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
示例21
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 检查配置=》
this.checkConfig();
// copy topic订阅配置=》
this.copySubscription();
if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPullConsumer.changeInstanceNameToPID();
}
// 创建mqclient对象=》
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
// 注册过滤消息钩子方法
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
} else {
switch (this.defaultMQPullConsumer.getMessageModel()) {
// 广播 本地存储
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
// 集群 远程存储
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
}
// offset加载=》
this.offsetStore.load();
// 注册消费者=》
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 服务启动=》
mQClientFactory.start();
log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
示例22
public void shutdown() {
// Consumer
if (!this.consumerTable.isEmpty())
return;
// AdminExt
if (!this.adminExtTable.isEmpty())
return;
// Producer
if (this.producerTable.size() > 1)
return;
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
// 生产者关闭=》
this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
// 拉取消息服务关闭=》
this.pullMessageService.shutdown(true);
this.scheduledExecutorService.shutdown();
// mqclient服务关闭
this.mQClientAPIImpl.shutdown();
this.rebalanceService.shutdown();
if (this.datagramSocket != null) {
this.datagramSocket.close();
this.datagramSocket = null;
}
// 删除缓存的mqclient=》
MQClientManager.getInstance().removeClientFactory(this.clientId);
log.info("the client factory [{}] shutdown OK", this.clientId);
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
}
示例23
/**
* 启动生产者
* @param startFactory 是否启动mQClientFactory
* @throws MQClientException ;
*/
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
//更新实例Name变成pid
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//在mQClientFactory中注册producer
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//将创建topic的key放到topicPublishInfoTable,其实就是AUTO_CREATE_TOPIC_KEY_TOPIC
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
//启动
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
//设置为running状态
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
示例24
/**
* 启动DefaultMQPushConsumer
* @throws MQClientException ;
*/
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
//如果是集群消费,changeInstanceNameToPid
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//获取mqClientFactory
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
//设置rebalanceImpl相关的参数
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
//PullAPIWrapper
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//offsetStore
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
//广播消息,LocalFileOffsetStore
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
//集群消费
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
//如果是顺序消费,就标记顺序消费为true
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
//顺序消费
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
//并发消费
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//消费Message的Service启动
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//更新topicSubscribeInfo,当消息有变化
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
//在服务端检查本consumer的消息过滤相关配置是否正确
this.mQClientFactory.checkClientInBroker();
//心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//立即执行rebalance
this.mQClientFactory.rebalanceImmediately();
}
示例25
/**
* 启动PullCOnsumer
* @throws MQClientException ;
*/
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
//如果是集群消费模式,就将changeInstanceNameToPID
if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPullConsumer.changeInstanceNameToPID();
}
//构造mqclientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
//构造rebalanceImpl的相关东西。因为构造函数里是以null的形式构造的
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
//构造pullAPIWrapper
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
//pullAPIWapper注册FilterMessageHook
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
} else {
switch (this.defaultMQPullConsumer.getMessageModel()) {
//如果是广播消费,就是LocalFileOffsetStore
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
//如果是集群消息,偏移量是在broker端维持的
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
}
//调用offsetstore.load
this.offsetStore.load();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
示例26
public void shutdown() {
// Consumer
if (!this.consumerTable.isEmpty()) {
return;
}
// AdminExt
if (!this.adminExtTable.isEmpty()) {
return;
}
// Producer
if (this.producerTable.size() > 1) {
return;
}
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
this.pullMessageService.shutdown(true);
this.scheduledExecutorService.shutdown();
this.mQClientAPIImpl.shutdown();
this.rebalanceService.shutdown();
if (this.datagramSocket != null) {
this.datagramSocket.close();
this.datagramSocket = null;
}
MQClientManager.getInstance().removeClientFactory(this.clientId);
log.info("the client factory [{}] shutdown OK", this.clientId);
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
}
示例27
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// 标记初始化失败,这个技巧不错。
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 获取MQClient 对象
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册Producer
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//把 MixAll.DEFAULT_TOPIC 放入其中
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 启动MQClient对象
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 标记初始化成功
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
示例28
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
// 检查配置
this.checkConfig();
// Rebalance负载均衡 复制订阅数据
this.copySubscription();
// 设置instanceName,为一个字符串化的数字,比如10072
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 获取MQClient对象,clientId为[email protected],比如[email protected]
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 设置负载均衡器
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
//默认这是消费模式为集群模式,每条消息被同一组的消费者中的一个消费
//还可以设置为广播模式,每条消息被同一个组的所有消费者都消费一次
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//默认是AllocateMessageQueueAveragely,均分策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 拉取API封装
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//生成消费进度处理器,集群模式下消费进度保存在Broker上,因为同一组内的消费者要共享进度;广播模式下进度保存在消费者端
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
}
this.offsetStore.load(); //若是广播模式,加载本地的消费进度文件
// 根据监听是顺序模式还是并发模式来生成相应的ConsumerService
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
}
this.consumeMessageService.start();
// 设置MQClient对象
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
// 设置服务状态
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 从Namesrv获取TopicRouteData,更新TopicPublishInfo和MessageQueue (在Consumer start时马上调用,之后每隔一段时间调用一次)
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 向TopicRouteData里的所有Broker发送心跳,注册Consumer/Producer信息到Broker上 (在Consumer start时马上调用,之后每隔一段时间调用一次)
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒MessageQueue均衡服务,负载均衡后马上开启第一次拉取消息
this.mQClientFactory.rebalanceImmediately();
}
示例29
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPullConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(//
mQClientFactory, //
this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
} else {
switch (this.defaultMQPullConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
default:
break;
}
}
this.offsetStore.load();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
示例30
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}