提问者:小点点

从 kafka 高级使用者获取偏移量


我想在java程序中从高级消费者中获取kafka消息偏移量。由于我使用的是自定义提交偏移量属性,因此我想测试我的自定义提交偏移量是否正常工作。谁能帮我如何抵消???我遇到了几个Kafka工具(如getoffsetshell),但它对我的测试没有帮助。


共1个答案

匿名用户

消费者迭代器获取消息时,您还可以通过执行以下操作来获取偏移量:

    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(getConsumerConfig());
    KafkaStream<byte[], byte[]> stream = getKafkaStream(consumerConnector);
    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    while(iterator.hasNext()) {
        MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
        String message = new String(messageAndMetadata.message());
        long offset = messageAndMetadata.offset();
    }