想要使用高级使用者 API 实现延迟使用者
主要思想:
>
提交 1 个偏移量
while (it.hasNext()) {
val msg = it.next().message()
//checks timestamp in msg to see delay period exceeded
while (!delayedPeriodPassed(msg)) {
waitSomeTime() //Thread.sleep or something....
}
//certain that the msg was delayed and can now be handled
Try { process(msg) } //the msg process will never fail the consumer
consumer.commitOffsets //commit each msg
}
关于此实现的一些问题:
谢谢!
一种方法是使用不同的主题,在其中推送所有要延迟的消息。如果所有延迟的消息都应该在相同的时间延迟后处理,这将相当简单:
while(it.hasNext()) {
val message = it.next().message()
if(shouldBeDelayed(message)) {
val delay = 24 hours
val delayTo = getCurrentTime() + delay
putMessageOnDelayedQueue(message, delay, delayTo)
}
else {
process(message)
}
consumer.commitOffset()
}
现在,所有常规消息都将尽快处理,而需要延迟的消息将被放在另一个主题上。
好消息是我们知道延迟主题开头的消息是应该首先处理的消息,因为它的 delayTo 值将是最小的。因此,我们可以设置另一个使用者来读取 head 消息,检查时间戳是否在过去,如果是,则处理消息并提交偏移量。如果不是,它不会提交偏移量,而只是Hibernate直到那个时间:
while(it.hasNext()) {
val delayedMessage = it.peek().message()
if(delayedMessage.delayTo < getCurrentTime()) {
val readMessage = it.next().message
process(readMessage.originalMessage)
consumer.commitOffset()
} else {
delayProcessingUntil(delayedMessage.delayTo)
}
}
如果延迟时间不同,您可以将主题分区为延迟(例如 24 小时、12 小时、6 小时)。如果延迟时间比这更动态,它会变得更复杂一些。您可以通过引入两个延迟主题来解决它。读取延迟主题 A
之外的所有消息,并处理 delayTo
值为过去的所有消息。在其他人中,您只需找到延迟
最近的一个,然后将它们放在主题 B
上。睡眠,直到应该处理最近的一个并反向执行所有操作,即处理来自m 主题 B
,并将不应该处理的一次放回主题 A
。
回答您的具体问题(有些已在您问题的评论中解决)
您可以考虑切换到在 Kafka 中存储偏移量(从 0.8.2 开始提供的功能,请查看消费者配置中的 offsets.storage
属性)
我相信它可以,例如,如果它无法与偏移存储通信。正如您所说,使用幂等消息可以解决此问题。
对于上述解决方案,这不会成为问题,除非消息本身的处理时间超过会话超时。
同样,使用上述内容,您不需要设置长时间的会话超时。
总有;)
使用Tibco EMS或其他JMS队列。它们具有内置的重试延迟。Kafka可能不是你正在做的事情的正确设计选择
在您的情况下,我会建议另一条路线。
解决使用者主线程中的等待时间是没有意义的。这将是队列使用方式的反模式。从概念上讲,您需要尽可能快地处理消息,并将队列保持在较低的加载因子。
相反,我会使用一个调度程序,它将为您需要延迟的每条消息调度作业。这样,您可以处理队列并创建将在预定义时间点触发的异步作业。
使用这种技术的缺点是,它对内存中保存计划作业的 JVM 的状态是明智的。如果该 JVM 失败,您将丢失计划的作业,并且您不知道任务是否已执行。
有一些调度程序实现,但可以配置为在集群环境中运行,从而保护您免受 JVM 崩溃的影响。
看看这个 java 调度框架: http://www.quartz-scheduler.org/