Java源码示例:io.openmessaging.Message

示例1
@Override
public void run() {
    while (true) {
        try {
            String queueOrTopic;
            //queue和topic数量为1:9,都只有10种类型
            if (sendNum % 10 == 0) {
                queueOrTopic = "QUEUE_" + random.nextInt(10);
            } else {
                queueOrTopic = "TOPIC_" + random.nextInt(10);
            }
            Message message = producer.createBytesMessageToQueue(queueOrTopic, (label + "_" + offsets.get(queueOrTopic)).getBytes());
            logger.debug("queueOrTopic:{} offset:{}", queueOrTopic, label + "_" + offsets.get(queueOrTopic));
            offsets.put(queueOrTopic, offsets.get(queueOrTopic) + 1);
            producer.send(message);
            sendNum++;
            if (sendNum >= Constants.PRO_MAX) {
                break;
            }
        } catch (Exception e) {
            logger.error("Error occurred in the sending process", e);
            break;
        }
    }
}
 
示例2
@Test
public void testConsumeMessage() {
    final byte[] testBody = new byte[] {'a', 'b'};

    MessageExt consumedMsg = new MessageExt();
    consumedMsg.setMsgId("NewMsgId");
    consumedMsg.setBody(testBody);
    consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
    consumedMsg.setTopic("HELLO_QUEUE");
    consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
        @Override
        public void onReceived(Message message, Context context) {
            assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
            assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
            context.ack();
        }
    });
    ((MessageListenerConcurrently) rocketmqPushConsumer
        .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
}
 
示例3
private SendResult send(final Message message, long timeout) {
    checkMessageType(message);
    org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
    try {
        org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
        if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
            log.error(String.format("Send message to RocketMQ failed, %s", message));
            throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
        }
        message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
        return OMSUtil.sendResultConvert(rmqResult);
    } catch (Exception e) {
        log.error(String.format("Send message to RocketMQ failed, %s", message), e);
        throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
    }
}
 
示例4
public static void main(String[] args) {
    final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
        .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

    final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
        OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            consumer.shutdown();
            messagingAccessPoint.shutdown();
        }
    }));

    consumer.startup();
    System.out.printf("Consumer startup OK%n");

    while (true) {
        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }
    }
}
 
示例5
@Override
public void sendOneway(final Message message) {
    checkMessageType(message);
    org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
    try {
        this.rocketmqProducer.sendOneway(rmqMessage);
    } catch (Exception ignore) { //Ignore the oneway exception.
    }
}
 
示例6
@Override
public void sendOneway(final Message message) {
    checkMessageType(message);
    org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
    try {
        this.rocketmqProducer.sendOneway(rmqMessage);
    } catch (Exception ignore) { //Ignore the oneway exception.
    }
}
 
示例7
@Override
public Message putProperties(String key, long value) {
    if (properties == null)
        properties = new DefaultKeyValue();
    properties.put(key, value);
    return this;
}
 
示例8
@Test
public void testPoll() {
    final byte[] testBody = new byte[] {'a', 'b'};
    MessageExt consumedMsg = new MessageExt();
    consumedMsg.setMsgId("NewMsgId");
    consumedMsg.setBody(testBody);
    consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
    consumedMsg.setTopic(queueName);

    when(localMessageCache.poll()).thenReturn(consumedMsg);

    Message message = consumer.poll();
    assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
    assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
}
 
示例9
@Override
public void sendOneway(final Message message) {
    checkMessageType(message);
    org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
    try {
        this.rocketmqProducer.sendOneway(rmqMessage);
    } catch (Exception ignore) { //Ignore the oneway exception.
    }
}
 
示例10
@Override public void send(Message message) {
 	//处理以前没有发送完留下的对象
 	if( readyForRead.size() > 0){
 		Iterator<Entry<String, List<DefaultBytesMessageList>>> it = readyForRead.entrySet().iterator();
 		Entry<String, List<DefaultBytesMessageList>> e = null;
 		String key = null;
 		while(it.hasNext()){
 			e = it.next();
 			key =  e.getKey();
 			//能把值更改为true的才获得mapBuffer,并开始写。否则等待下次send
 			if(isReading.get(key).compareAndSet(false, true)){
 				mapBuffer = allMapBuffer.get(key);
 				sendList(e.getValue(), storeOff.get(key));
 				isReading.get(key).set(false);
 				it.remove();
 				storeOff.put(key, 0);
 				
 			}
 		}
 	}
 	/**
 	 * 当存储偏移达到writeThreshold时,写该list. off为下一个要写的数组下标
 	 * 偏移大于writeThreshold时,肯定已经在readyForRead中了。
 	 * 偏移小于于writeThreshold时,不用处理
 	 */
 	if(off == writeThreshold){
 		//能把值更改为true的才获得mapBuffer,并开始写。否则等待下次send
if(isReading.get(queueOrTopic).compareAndSet(false, true)){
	mapBuffer = allMapBuffer.get(queueOrTopic);
	sendList(list , off);
	isReading.get(queueOrTopic).set(false);
	storeOff.put(queueOrTopic, 0);
	
}else{
	//需要清理,但是获得buffer失败,偏移+1,尝试放入readyForRead
	readyForRead.put(queueOrTopic, list);
}
 	}
 }
 
示例11
@Override
public Message putProperties(String key, int value) {
    if (properties == null)
        properties = new DefaultKeyValue();
    properties.put(key, value);
    return this;
}
 
示例12
@Test
public void testPoll_WithTimeout() {
    //There is a default timeout value, @see ClientConfig#omsOperationTimeout.
    Message message = consumer.receive();
    assertThat(message).isNull();

    message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
    assertThat(message).isNull();
}
 
示例13
MessageExt poll(final KeyValue properties) {
    int currentPollTimeout = clientConfig.getOperationTimeout();
    if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
        currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT);
    }
    return poll(currentPollTimeout);
}
 
示例14
public static void main(String[] args) {
    final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
        .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

    final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
        OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            consumer.shutdown();
            messagingAccessPoint.shutdown();
        }
    }));

    consumer.startup();
    System.out.printf("Consumer startup OK%n");

    while (true) {
        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }
    }
}
 
示例15
@Override public Message putProperties(String key, double value) {
        if (prosBiulder == null) prosBiulder = new StringBuilder(sem);
//        if (properties == null) properties = new DefaultKeyValue();
        prosBiulder.append(key);
        prosBiulder.append(sem);
        prosBiulder.append(value);
        prosBiulder.append(sem);
        proSize++;
        return this;
    }
 
示例16
@Test
public void testPoll() {
    final byte[] testBody = new byte[] {'a', 'b'};
    MessageExt consumedMsg = new MessageExt();
    consumedMsg.setMsgId("NewMsgId");
    consumedMsg.setBody(testBody);
    consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
    consumedMsg.setTopic(queueName);

    when(localMessageCache.poll()).thenReturn(consumedMsg);

    Message message = consumer.receive();
    assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
    assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
}
 
示例17
@Override public void send(Message message) {
 	//处理以前没有发送完留下的对象
 	if( readyForRead.size() > 0){
 		Iterator<Entry<String, List<DefaultBytesMessage>>> it = readyForRead.entrySet().iterator();
 		Entry<String, List<DefaultBytesMessage>> e = null;
 		String key = null;
 		while(it.hasNext()){
 			e = it.next();
 			key =  e.getKey();
 			//能把值更改为true的才获得mapBuffer,并开始写。否则等待下次send
 			if(isReading.get(key).compareAndSet(false, true)){
 				mapBuffer = allMapBuffer.get(key);
 				sendList(e.getValue(), storeOff.get(key));
 				isReading.get(key).set(false);
 				it.remove();
 				storeOff.put(key, 0);
 				
 			}
 		}
 	}
 	/**
 	 * 当存储偏移达到writeThreshold时,写该list. off为下一个要写的数组下标
 	 * 偏移大于writeThreshold时,肯定已经在readyForRead中了。
 	 * 偏移小于于writeThreshold时,不用处理
 	 */
 	if(off == writeThreshold){
 		//能把值更改为true的才获得mapBuffer,并开始写。否则等待下次send
if(isReading.get(queueOrTopic).compareAndSet(false, true)){
	mapBuffer = allMapBuffer.get(queueOrTopic);
	sendList(list , off);
	isReading.get(queueOrTopic).set(false);
	storeOff.put(queueOrTopic, 0);
	
}else{
	//需要清理,但是获得buffer失败,偏移+1,尝试放入readyForRead
	readyForRead.put(queueOrTopic, list);
}
 	}
 }
 
示例18
public static void main(String[] args) {
    final MessagingAccessPoint messagingAccessPoint = OMS
        .getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");

    final PushConsumer consumer = messagingAccessPoint.
        createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));

    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            consumer.shutdown();
            messagingAccessPoint.shutdown();
        }
    }));

    consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
        @Override
        public void onReceived(Message message, Context context) {
            System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
            context.ack();
        }
    });

    consumer.startup();
    System.out.printf("Consumer startup OK%n");
}
 
示例19
public Message poll() {
	// 每个线程都应该先读文件,或者阻塞住等待读文件完毕
	
	if (isReaded == false) {
		long start = System.currentTimeMillis();
		
		readFile();
		isReaded = true;
		// 将files赋为null,下个测试例才知道是新的测试例,重新初始化各个属性
		files = null;
		
		long end = System.currentTimeMillis() - start;
		System.out.println(Thread.currentThread().getName()+" 读文件耗时(ms):"+end);
	}
	// 可能某个queue/topic一个message都没有,滚动到下一个
	while (currentList == null && bucketOffset < bucketList.size() - 1) {
		++bucketOffset;
		currentList = data.get(bucketList.get(bucketOffset));
		
	}

	if (currentList == null) {
		return null;
	}

	BytesMessage message = currentList.get(messageOffset);
	if (messageOffset < currentList.size() - 1) {
		messageOffset++;
	} else {
		// bucketOffset++;
		currentList = null;
		messageOffset = 0;
	}
	return message;
}
 
示例20
@Override
public Message poll() {
    if (naiveDataReader.hasNext()) {
        return naiveDataReader.next();
    } else {
        return null;
    }
}
 
示例21
@Override
public Message putProperties(String key, double value) {
    if (properties == null)
        properties = new DefaultKeyValue();
    properties.put(key, value);
    return this;
}
 
示例22
@Override public Message putHeaders(String key, int value) {
    headersBiulder.append(seq.get(key));
    headersBiulder.append(value);
    headersBiulder.append(sem);
    headerSize++;
    return this;
}
 
示例23
MessageExt poll(final KeyValue properties) {
    int currentPollTimeout = clientConfig.getOperationTimeout();
    if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
        currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT);
    }
    return poll(currentPollTimeout);
}
 
示例24
@Override
        public void run() {
            while (true) {
                try {
                    String queueOrTopic;
                    Message message;
                    if (sendNum % 10 == 0) {
                        queueOrTopic = "QUEUE_" + random.nextInt(10);
                        message = producer.createBytesMessageToQueue(queueOrTopic, (label + "_" + offsets.get(queueOrTopic)).getBytes());
                    } else {
                        queueOrTopic = "TOPIC_" + random.nextInt(10);
                        message = producer.createBytesMessageToTopic(queueOrTopic, (label + "_" + offsets.get(queueOrTopic)).getBytes());
                    }
                    /**
                     * 消息主体 :“生产者线程名_偏移” 的字节.
                     * 对于label(线程名)和queue/topic的组合来说来说,偏移是一直累加的,0、1、2、3……
                     * 即每个线程对于每个queue/topic的消息中偏移都是一个连续的累加的。
                     */
//                    Message message = producer.createBytesMessageToQueue(queueOrTopic, (label + "_" + offsets.get(queueOrTopic)).getBytes());
                    logger.debug("queueOrTopic:{} offset:{}", queueOrTopic, label + "_" + offsets.get(queueOrTopic));
                    //每产生一个消息,该bucket对应的便偏移+1
                    offsets.put(queueOrTopic, offsets.get(queueOrTopic) + 1);
                    producer.send(message);
                    sendNum++;
                    if (sendNum >= Constants.PRO_MAX) {
                    	//自己测试时记得关闭,不然没法删除持有的文件。实测时kill进程,没有这个问题
                    	producer.shutdown();
                        break;
                    }
                } catch (Exception e) {
                    logger.error("Error occurred in the sending process", e);
                    break;
                }
            }
        }
 
示例25
@Override public Message putProperties(String key, double value) {
    	putProperties(key, String.valueOf(value));
//        if (prosBiulder == null) prosBiulder = new StringBuilder(sem);
////        if (properties == null) properties = new DefaultKeyValue();
//        prosBiulder.append(key);
//        prosBiulder.append(sem);
//        prosBiulder.append(value);
//        prosBiulder.append(sem);
//        proSize++;
        return this;
    }
 
示例26
@Test
public void testPoll() {
    final byte[] testBody = new byte[] {'a', 'b'};
    MessageExt consumedMsg = new MessageExt();
    consumedMsg.setMsgId("NewMsgId");
    consumedMsg.setBody(testBody);
    consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
    consumedMsg.setTopic(queueName);

    when(localMessageCache.poll()).thenReturn(consumedMsg);

    Message message = consumer.receive();
    assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
    assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
}
 
示例27
@Override
public void start() {

    producer.startup();

    consumer.attachQueue(queueName, (message, context) -> {

        try {

            // Need openMessaging to support start consume message from tail.
            if(Long.parseLong(message.sysHeaders().getString(Message.BuiltinKeys.BORN_TIMESTAMP)) + 10000 < System.currentTimeMillis()){
                context.ack();
                return;
            }
            log.info("Received one message: {}", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID) + "\n");
            byte[] bytes = message.getBody(byte[].class);
            Map<K, V> map = decodeKeyValue(bytes);
            for (K key : map.keySet()) {
                dataSynchronizerCallback.onCompletion(null, key, map.get(key));
            }
            context.ack();
        }catch(Exception e){
            log.error("BrokerBasedLog process message failed.", e);
        }
    });
    consumer.startup();
}
 
示例28
@Override public Message putHeaders(String key, long value) {
    headers.put(key, value);
    return this;
}
 
示例29
@Override public Promise<Void> sendAsync(Message message, KeyValue properties) {
    throw new UnsupportedOperationException("Unsupported");
}
 
示例30
@Override
public void send(final Message message, final KeyValue properties) {
    send(message);
}