提问者:小点点

Kafka - 使用高级使用者的延迟队列实现


想要使用高级使用者 API 实现延迟使用者

主要思想:

>

  • 按键生成消息(每个消息都包含创建时间戳) 这可确保每个分区按生成时间对消息进行排序。
  • auto.commit.enable=false(在每个消息处理后显式提交)
  • 使用消息
  • 检查
  • 消息时间戳并检查是否经过了足够的时间
  • 进程消息(此操作永远不会失败)
  • 提交 1 个偏移量

    while (it.hasNext()) {
      val msg = it.next().message()
      //checks timestamp in msg to see delay period exceeded
      while (!delayedPeriodPassed(msg)) { 
         waitSomeTime() //Thread.sleep or something....
      }
      //certain that the msg was delayed and can now be handled
      Try { process(msg) } //the msg process will never fail the consumer
      consumer.commitOffsets //commit each msg
    }
    

    关于此实现的一些问题:

    1. 提交每个偏移量可能会减慢 ZK 的速度
    2. consumer.commitOffsets可以抛出异常吗? 如果是,我将使用相同的消息两次(可以用幂等消息解决)
    3. 问题等待很长时间而不提交偏移量,例如延迟时间为 24 小时,将从迭代器获取下一个,Hibernate 24 小时,处理并提交(ZK 会话超时?
    4. ZK 会话如何在不提交新偏移的情况下保持活动状态?(设置一个蜂巢 zookeeper.session.timeout.ms 可以在死亡消费者中解析而不识别它)
    5. 我缺少任何其他问题吗?

    谢谢!


  • 共3个答案

    匿名用户

    一种方法是使用不同的主题,在其中推送所有要延迟的消息。如果所有延迟的消息都应该在相同的时间延迟后处理,这将相当简单:

    while(it.hasNext()) {
        val message = it.next().message()
        
        if(shouldBeDelayed(message)) {
            val delay = 24 hours
            val delayTo = getCurrentTime() + delay
            putMessageOnDelayedQueue(message, delay, delayTo)
        }
        else {
           process(message)
        }
    
        consumer.commitOffset()
    }
    

    现在,所有常规消息都将尽快处理,而需要延迟的消息将被放在另一个主题上。

    好消息是我们知道延迟主题开头的消息是应该首先处理的消息,因为它的 delayTo 值将是最小的。因此,我们可以设置另一个使用者来读取 head 消息,检查时间戳是否在过去,如果是,则处理消息并提交偏移量。如果不是,它不会提交偏移量,而只是Hibernate直到那个时间:

    while(it.hasNext()) {
        val delayedMessage = it.peek().message()
        if(delayedMessage.delayTo < getCurrentTime()) {
            val readMessage = it.next().message
            process(readMessage.originalMessage)
            consumer.commitOffset()
        } else {
            delayProcessingUntil(delayedMessage.delayTo)
        }
    }
    

    如果延迟时间不同,您可以将主题分区为延迟(例如 24 小时、12 小时、6 小时)。如果延迟时间比这更动态,它会变得更复杂一些。您可以通过引入两个延迟主题来解决它。读取延迟主题 A 之外的所有消息,并处理 delayTo 值为过去的所有消息。在其他人中,您只需找到延迟最近的一个,然后将它们放在主题 B 上。睡眠,直到应该处理最近的一个并反向执行所有操作,即处理来自m 主题 B,并将不应该处理的一次放回主题 A

    回答您的具体问题(有些已在您问题的评论中解决)

    1. 提交每个偏移量可能会减慢 ZK 的速度

    您可以考虑切换到在 Kafka 中存储偏移量(从 0.8.2 开始提供的功能,请查看消费者配置中的 offsets.storage 属性)

    我相信它可以,例如,如果它无法与偏移存储通信。正如您所说,使用幂等消息可以解决此问题。

    对于上述解决方案,这不会成为问题,除非消息本身的处理时间超过会话超时。

    同样,使用上述内容,您不需要设置长时间的会话超时。

    总有;)

    匿名用户

    使用Tibco EMS或其他JMS队列。它们具有内置的重试延迟。Kafka可能不是你正在做的事情的正确设计选择

    匿名用户

    在您的情况下,我会建议另一条路线。

    解决使用者主线程中的等待时间是没有意义的。这将是队列使用方式的反模式。从概念上讲,您需要尽可能快地处理消息,并将队列保持在较低的加载因子。

    相反,我会使用一个调度程序,它将为您需要延迟的每条消息调度作业。这样,您可以处理队列并创建将在预定义时间点触发的异步作业。

    使用这种技术的缺点是,它对内存中保存计划作业的 JVM 的状态是明智的。如果该 JVM 失败,您将丢失计划的作业,并且您不知道任务是否已执行。

    有一些调度程序实现,但可以配置为在集群环境中运行,从而保护您免受 JVM 崩溃的影响。

    看看这个 java 调度框架: http://www.quartz-scheduler.org/