提问者:小点点

使用 Kafka 高级使用者 0.8.x 防止消息丢失


典型的 kafka 使用者如下所示:

Kafka经纪人---

根据 Kafka 高级消费者的文档:

“auto.commit.interval.ms”设置是将消耗的偏移量的更新写入 ZooKeeper 的频率

如果发生以下两件事,似乎可能会丢失消息:

  1. 偏移量是在从 kafka 代理检索某些消息后立即提交的。
  2. 下游消费者(比如 Elastic-Search)无法处理最近一批消息,或者消费者进程本身被杀死。

如果偏移量不是根据时间间隔自动提交,而是由 API 提交,则可能是最理想的。这将确保 kafka 消费者只有在收到下游消费者确认他们已成功使用消息后才能发出偏移量提交的信号。可能会有一些消息重放(如果 kafka-consumer 在提交偏移量之前死亡),但至少不会丢失消息。

请让我知道高级消费者中是否存在这样的 API。

注意:我知道 Kafka 0.8.x 版本中的低级消费者 API,但我不想自己管理所有内容,因为我只需要高级消费者中的一个简单的 API。

裁判:

  1. AutoCommitTask.run(), 查找 commitOffsetsAsync
  2. SubscriptionState.allConsumed()

共1个答案

匿名用户

高级消费者 API 中有一个 commitOffsets() API 可用于解决此问题。

同时将选项“auto.commit.enable”设置为“false”,以便Kafka消费者在任何时候都不会自动提交偏移量。