Java源码示例:com.alibaba.rocketmq.common.UtilAll
示例1
private boolean tryToCompressMessage(final Message msg) {
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = UtilAll.compress(body, zipCompressLevel);
if (data != null) {
msg.setBody(data);
return true;
}
}
catch (IOException e) {
log.error("tryToCompressMessage exception", e);
log.warn(msg.toString());
}
}
}
return false;
}
示例2
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerRunningInfoRequestHeader requestHeader =
(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
if (null != consumerRunningInfo) {
if (requestHeader.isJstackEnable()) {
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
consumerRunningInfo.setStackTraceElementMap(map);
String jstack = UtilAll.jstack(map);
consumerRunningInfo.setJstack(jstack);
}
response.setCode(ResponseCode.SUCCESS);
response.setBody(consumerRunningInfo.encode());
}
else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
}
return response;
}
示例3
public StoreCheckpoint(final String scpPath) throws IOException {
File file = new File(scpPath);
MapedFile.ensureDirOK(file.getParent());
boolean fileExists = file.exists();
this.randomAccessFile = new RandomAccessFile(file, "rw");
this.fileChannel = this.randomAccessFile.getChannel();
this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MapedFile.OS_PAGE_SIZE);
if (fileExists) {
log.info("store checkpoint file exists, " + scpPath);
this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
}
else {
log.info("store checkpoint file not exists, " + scpPath);
}
}
示例4
/**
* Validate topic
*
* @param topic
*
* @throws com.alibaba.rocketmq.client.exception.MQClientException
*/
public static void checkTopic(String topic) throws MQClientException {
if (UtilAll.isBlank(topic)) {
throw new MQClientException("the specified topic is blank", null);
}
if (!regularExpressionMatcher(topic, PATTERN)) {
throw new MQClientException(String.format(
"the specified topic[%s] contains illegal characters, allowing only %s", topic,
VALID_PATTERN_STR), null);
}
if (topic.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified topic is longer than topic max length 255.", null);
}
//whether the same with system reserved keyword
if (topic.equals(MixAll.DEFAULT_TOPIC)) {
throw new MQClientException(
String.format("the topic[%s] is conflict with default topic.", topic), null);
}
}
示例5
public boolean sendMessageBack(final MessageExt msg) {
try {
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPushConsumer.getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
示例6
public StoreCheckpoint(final String scpPath) throws IOException {
File file = new File(scpPath);
MapedFile.ensureDirOK(file.getParent());
boolean fileExists = file.exists();
this.randomAccessFile = new RandomAccessFile(file, "rw");
this.fileChannel = this.randomAccessFile.getChannel();
this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MapedFile.OS_PAGE_SIZE);
if (fileExists) {
log.info("store checkpoint file exists, " + scpPath);
this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
}
else {
log.info("store checkpoint file not exists, " + scpPath);
}
}
示例7
public StoreCheckpoint(final String scpPath) throws IOException {
File file = new File(scpPath);
MapedFile.ensureDirOK(file.getParent());
boolean fileExists = file.exists();
this.randomAccessFile = new RandomAccessFile(file, "rw");
this.fileChannel = this.randomAccessFile.getChannel();
this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MapedFile.OS_PAGE_SIZE);
if (fileExists) {
log.info("store checkpoint file exists, " + scpPath);
this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
} else {
log.info("store checkpoint file not exists, " + scpPath);
}
}
示例8
public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePostion() + " M:"
+ this.getCommittedPosition() + ", "
+ UtilAll.computeEclipseTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy maped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
示例9
public final String fetchNSAddr(boolean verbose, long timeoutMills) {
String url = this.wsAddr;
try {
if (!UtilAll.isBlank(this.unitName)) {
url = url + "-" + this.unitName + "?nofix=1";
}
HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);
if (200 == result.code) {
String responseStr = result.content;
if (responseStr != null) {
return clearNewLine(responseStr);
} else {
log.error("fetch nameserver address is null");
}
} else {
log.error("fetch nameserver address failed. statusCode={}", result.code);
}
} catch (IOException e) {
if (verbose) {
log.error("fetch name server address exception", e);
}
}
if (verbose) {
String errorMsg =
"connect to " + url + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + " not bind in /etc/hosts";
errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL);
log.warn(errorMsg);
}
return null;
}
示例10
/**
* Validate group
*
* @param group
* @throws com.alibaba.rocketmq.client.exception.MQClientException
*/
public static void checkGroup(String group) throws MQClientException {
if (UtilAll.isBlank(group)) {
throw new MQClientException("the specified group is blank", null);
}
if (!regularExpressionMatcher(group, PATTERN)) {
throw new MQClientException(String.format(
"the specified group[%s] contains illegal characters, allowing only %s", group,
VALID_PATTERN_STR), null);
}
if (group.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
}
示例11
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr =
(null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
//结合结合SendMessageProcessor.consumerSendMsgBack阅读
//消费失败,重新打回消息到broker中 这里发送的报文的code:CONSUMER_SEND_MSG_BACK,对端收到后,会创建重试队列RETRY_GROUP_TOPIC_PREFIX + consumer
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000);
}
catch (Exception e) { //消费失败的消息打回重试队列失败,,需要重新发送到重试队列
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
//这里发送的报文code默认为code:SEND_MESSAGE,因此需要带上重试队列名,对于broker来说就相当于收到了一条发往RETRY_GROUP_TOPIC_PREFIX + consumer的消息
Message newMsg =
//修改topic,修改后的topic为 RETRY_GROUP_TOPIC_PREFIX + consumer 需要重新发送
new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()),
msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId()
: originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
int reTimes = msg.getReconsumeTimes() + 1;
MessageAccessor.setReconsumeTime(newMsg, reTimes + "");
newMsg.setDelayTimeLevel(3 + reTimes);
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
}
示例12
public void init() {
// 分钟整点执行
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
printAtMinutes();
}
catch (Throwable e) {
}
}
}, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), //
1000 * 60 * 5, TimeUnit.MILLISECONDS);
}
示例13
public void init() {
// 分钟整点执行
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
printAtMinutes();
}
catch (Throwable e) {
}
}
}, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), //
1000 * 60 * 5, TimeUnit.MILLISECONDS);
}
示例14
/**
* 默认clientip + pid 做clientid .如果是单元模式,则加上 unitname. 通过这个来标识客户端
* @return
*/
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if(!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
示例15
private void fetchClassFromRemoteHost() {
Iterator<Entry<String, FilterClassInfo>> it = this.filterClassTable.entrySet().iterator();
while (it.hasNext()) {
try {
Entry<String, FilterClassInfo> next = it.next();
FilterClassInfo filterClassInfo = next.getValue();
String[] topicAndGroup = next.getKey().split("@");
String responseStr = this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1],
filterClassInfo.getClassName());
byte[] filterSourceBinary = responseStr.getBytes("UTF-8");
int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8"));
if (classCRC != filterClassInfo.getClassCRC()) {
String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
Class<?> newClass =
DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
Object newInstance = newClass.newInstance();
filterClassInfo.setMessageFilter((MessageFilter) newInstance);
filterClassInfo.setClassCRC(classCRC);
log.info("fetch Remote class File OK, {} {}", next.getKey(),
filterClassInfo.getClassName());
}
}
catch (Exception e) {
log.error("fetchClassFromRemoteHost Exception", e);
}
}
}
示例16
public static long timestampFormat(final String value) {
long timestamp = 0;
try {
timestamp = Long.valueOf(value);
} catch (NumberFormatException e) {
timestamp = UtilAll.parseDate(value, UtilAll.yyyy_MM_dd_HH_mm_ss_SSS).getTime();
}
return timestamp;
}
示例17
public boolean initialize() {
MixAll.printObjectProperties(log, this.filtersrvConfig);
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
FiltersrvController.this.registerFilterServerToBroker();
}
}, 3, 10, TimeUnit.SECONDS);
this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer
.getBrokerSuspendMaxTimeMillis() - 1000);
this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer
.getConsumerTimeoutMillisWhenSuspend() - 1000);
this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());
this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));
return true;
}
示例18
private void fetchClassFromRemoteHost() {
Iterator<Entry<String, FilterClassInfo>> it = this.filterClassTable.entrySet().iterator();
while (it.hasNext()) {
try {
Entry<String, FilterClassInfo> next = it.next();
FilterClassInfo filterClassInfo = next.getValue();
String[] topicAndGroup = next.getKey().split("@");
String responseStr =
this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1],
filterClassInfo.getClassName());
byte[] filterSourceBinary = responseStr.getBytes("UTF-8");
int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8"));
if (classCRC != filterClassInfo.getClassCRC()) {
String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
Class<?> newClass =
DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
Object newInstance = newClass.newInstance();
filterClassInfo.setMessageFilter((MessageFilter) newInstance);
filterClassInfo.setClassCRC(classCRC);
log.info("fetch Remote class File OK, {} {}", next.getKey(),
filterClassInfo.getClassName());
}
}
catch (Exception e) {
log.error("fetchClassFromRemoteHost Exception", e);
}
}
}
示例19
/**
* 创建消息id
*
* @param input
* @param addr
* @param offset
* @return
*/
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
input.flip();
input.limit(MessageDecoder.MSG_ID_LENGTH);
// 消息存储主机地址 IP PORT 8
input.put(addr);
// 消息对应的物理分区 OFFSET 8
input.putLong(offset);
return UtilAll.bytes2string(input.array());
}
示例20
@Override
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
// 检测物理文件磁盘空间
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
}
// 检测逻辑文件磁盘空间
{
String storePathLogics = StorePathConfigHelper
.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
}
// 延时进度
{
if (this.scheduleMessageService != null) {
this.scheduleMessageService.buildRunningStats(result);
}
}
result.put(RunningStats.commitLogMinOffset.name(),
String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
result.put(RunningStats.commitLogMaxOffset.name(),
String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));
return result;
}
示例21
private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CloneGroupOffsetRequestHeader requestHeader =
(CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
Set<String> topics;
if (UtilAll.isBlank(requestHeader.getTopic())) {
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup());
}
else {
topics = new HashSet<String>();
topics.add(requestHeader.getTopic());
}
for (String topic : topics) {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
log.warn("[cloneGroupOffset], topic config not exist, {}", topic);
continue;
}
if (!requestHeader.isOffline()) {
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic);
if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0
&& findSubscriptionData == null) {
log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic);
continue;
}
}
this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(),
requestHeader.getTopic());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
示例22
private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
String storePathIndex =
StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
示例23
public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePostion() + " M:"
+ this.getCommittedPosition() + ", "
+ UtilAll.computeEclipseTimeMilliseconds(beginTime));
}
catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
}
else {
log.warn("destroy maped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
示例24
private boolean isMapedFileMatchedRecover(final MapedFile mapedFile) {
ByteBuffer byteBuffer = mapedFile.sliceByteBuffer();
int magicCode = byteBuffer.getInt(MessageDecoder.MessageMagicCodePostion);
if (magicCode != MessageMagicCode) {
return false;
}
long storeTimestamp = byteBuffer.getLong(MessageDecoder.MessageStoreTimestampPostion);
if (0 == storeTimestamp) {
return false;
}
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}", //
storeTimestamp,//
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
else {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}", //
storeTimestamp,//
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
return false;
}
示例25
public SendResult(SendStatus sendStatus, String msgId, MessageQueue messageQueue, long queueOffset,
String projectGroupPrefix) {
this.sendStatus = sendStatus;
this.msgId = msgId;
this.messageQueue = messageQueue;
this.queueOffset = queueOffset;
if (!UtilAll.isBlank(projectGroupPrefix)) {
this.messageQueue
.setTopic(VirtualEnvUtil.clearProjectGroup(this.messageQueue.getTopic(), projectGroupPrefix));
}
}
示例26
public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) {
ByteBuffer byteBuffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
byteBuffer.put(inetSocketAddress.getAddress().getAddress());
byteBuffer.putInt(inetSocketAddress.getPort());
byteBuffer.putLong(transactionIdhashCode);
byteBuffer.flip();
return UtilAll.bytes2string(byteBuffer.array());
}
示例27
public void init() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
printAtMinutes();
}
catch (Throwable e) {
}
}
}, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), //
1000 * 60 * 5, TimeUnit.MILLISECONDS);
}
示例28
public void init() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
printAtMinutes();
}
catch (Throwable e) {
}
}
}, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), //
1000 * 60 * 5, TimeUnit.MILLISECONDS);
}
示例29
/**
* 获取producer连接信息
*
* @param addr
* @param producerGroup
* @param timeoutMillis
* @return
* @throws RemotingConnectException
* @throws RemotingSendRequestException
* @throws RemotingTimeoutException
* @throws InterruptedException
* @throws MQBrokerException
*/
public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup,
final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
String producerGroupWithProjectGroup = producerGroup;
if (!UtilAll.isBlank(projectGroupPrefix)) {
producerGroupWithProjectGroup =
VirtualEnvUtil.buildWithProjectGroup(producerGroup, projectGroupPrefix);
}
GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
requestHeader.setProducerGroup(producerGroupWithProjectGroup);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ProducerConnection.decode(response.getBody(), ProducerConnection.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
示例30
/**
* @param origin
* @param projectGroup
* @return
*/
public static String buildWithProjectGroup(String origin, String projectGroup) {
if (!UtilAll.isBlank(projectGroup)) {
String prefix = String.format(VIRTUAL_APPGROUP_PREFIX, projectGroup);
if (!origin.endsWith(prefix)) {
return origin + prefix;
} else {
return origin;
}
} else {
return origin;
}
}