我想在java程序中从高级消费者中获取kafka消息偏移量。由于我使用的是自定义提交偏移量属性,因此我想测试我的自定义提交偏移量是否正常工作。谁能帮我如何抵消???我遇到了几个Kafka工具(如getoffsetshell),但它对我的测试没有帮助。
从消费者迭代器
获取消息时,您还可以通过执行以下操作来获取偏移量:
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(getConsumerConfig());
KafkaStream<byte[], byte[]> stream = getKafkaStream(consumerConnector);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
String message = new String(messageAndMetadata.message());
long offset = messageAndMetadata.offset();
}