Java源码示例:org.apache.rocketmq.broker.BrokerController
示例1
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Exception e) {
logger.info("Broker start failed");
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
return brokerController;
}
示例2
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Exception e) {
logger.info("Broker start failed");
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
return brokerController;
}
示例3
/**
* Start rocketmq broker service
* @throws Exception
*/
private static void startBroker() throws Exception {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setNamesrvAddr(nameServer);
brokerConfig.setBrokerId(MixAll.MASTER_ID);
NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(10911);
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
boolean initResult = brokerController.initialize();
if (!initResult) {
brokerController.shutdown();
throw new Exception();
}
brokerController.start();
}
示例4
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
return createAndStartBroker(storeConfig, brokerConfig);
}
示例5
public static BrokerController createAndStartBroker(MessageStoreConfig storeConfig, BrokerConfig brokerConfig) {
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(nextPort());
storeConfig.setHaListenPort(nextPort());
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Throwable t) {
logger.error("Broker start failed, will exit", t);
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
return brokerController;
}
示例6
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Exception e) {
logger.info("Broker start failed");
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
return brokerController;
}
示例7
@Before
public void init() throws IllegalAccessException, NoSuchFieldException {
clientInfo = new ClientChannelInfo(channel, "127.0.0.1", LanguageCode.JAVA, 0);
brokerController.setMessageStore(messageStore);
Field field = BrokerController.class.getDeclaredField("broker2Client");
field.setAccessible(true);
field.set(brokerController, broker2Client);
when(messageStore.now()).thenReturn(System.currentTimeMillis());
Channel mockChannel = mock(Channel.class);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel);
replyMessageProcessor = new ReplyMessageProcessor(brokerController);
}
示例8
public ConsumerFilterManager(BrokerController brokerController) {
this.brokerController = brokerController;
this.bloomFilter = BloomFilter.createByFn(
brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),
brokerController.getBrokerConfig().getExpectConsumerNumUseFilter()
);
// then set bit map length of store config.
brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(
this.bloomFilter.getM()
);
}
示例9
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Exception e) {
logger.info("Broker start failed");
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
return brokerController;
}
示例10
/**
* broker 启动
*/
@Test
public void testBrokerRestart() throws Exception {
// 设置版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(10911);
//
final BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerName("broker-a");
brokerConfig.setNamesrvAddr("127.0.0.1:9876");
//
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setDeleteWhen("04");
messageStoreConfig.setFileReservedTime(48);
messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
messageStoreConfig.setDuplicationEnable(false);
// BrokerPathConfigHelper.setBrokerConfigPath("/Users/yunai/百度云同步盘/开发/Javascript/Story/incubator-rocketmq/conf/broker.conf");
// broker 启动
BrokerController brokerController = new BrokerController(//
brokerConfig, //
nettyServerConfig, //
new NettyClientConfig(), //
messageStoreConfig);
brokerController.initialize();
brokerController.start();
// brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod()
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}
示例11
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Exception e) {
logger.info("Broker start failed");
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
return brokerController;
}
示例12
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Exception e) {
logger.info("Broker start failed");
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
return brokerController;
}
示例13
public ConsumerFilterManager(BrokerController brokerController) {
this.brokerController = brokerController;
this.bloomFilter = BloomFilter.createByFn(
brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),
brokerController.getBrokerConfig().getExpectConsumerNumUseFilter()
);
// then set bit map length of store config.
brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(
this.bloomFilter.getM()
);
}
示例14
@Override public void run() {
for (NamesrvController namesrvController : NAMESRV_CONTROLLERS) {
if (namesrvController != null) {
namesrvController.shutdown();
}
}
for (BrokerController brokerController : BROKER_CONTROLLERS) {
if (brokerController != null) {
brokerController.shutdown();
}
}
for (File file : TMPE_FILES) {
deleteFile(file);
}
}
示例15
public ConsumerFilterManager(BrokerController brokerController) {
this.brokerController = brokerController;
this.bloomFilter = BloomFilter.createByFn(
brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),
brokerController.getBrokerConfig().getExpectConsumerNumUseFilter()
);
// then set bit map length of store config.
brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(
this.bloomFilter.getM()
);
}
示例16
public SubscriptionGroupManager(BrokerController brokerController) {
this.brokerController = brokerController;
this.init();
}
示例17
public ConsumerOffsetManager(BrokerController brokerController) {
this.brokerController = brokerController;
}
示例18
public SlaveSynchronize(BrokerController brokerController) {
this.brokerController = brokerController;
}
示例19
public AbstractSendMessageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.storeHost =
new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
.getNettyServerConfig().getListenPort());
}
示例20
public PullRequestHoldService(final BrokerController brokerController) {
this.brokerController = brokerController;
}
示例21
public ForwardRequestProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
}
示例22
public ConsumerManageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
}
示例23
public ClientManageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
}
示例24
public AbstractSendMessageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.storeHost =
new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
.getNettyServerConfig().getListenPort());
}
示例25
public PullRequestHoldService(final BrokerController brokerController) {
this.brokerController = brokerController;
}
示例26
public PullRequestHoldService(final BrokerController brokerController) {
this.brokerController = brokerController;
}
示例27
public ForwardRequestProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
}
示例28
public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
this.brokerController = brokerController;
}
示例29
public Broker2Client(BrokerController brokerController) {
this.brokerController = brokerController;
}
示例30
public PullMessageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
}