C# Kafka重置到最新的偏移量,即从指定的Partition订阅消息使用Assign方法

科技资讯 投稿 5300 0 评论

C# Kafka重置到最新的偏移量,即从指定的Partition订阅消息使用Assign方法

场景再现:比如昨天消费者晚上断掉了,今天上午我们会发现kafka消费的数据不是最新的,而是昨天晚上的数据,由于数据量比较多,也不会及时的消费到今天上午的数据,这个时候就需要我们对偏移量进行重置为最新的,以获取最新的数据。

关于AutoOffsetReset这个枚举的配置项如下:

  • latest (default which means consumers will read messages from the tail of the partition
    最新(默认,这意味着使用者将从分区的尾部读取消息,只消费最新的信息,即自从消费者上线后才开始推送来的消息。那么会导致忽略掉之前没有处理的消息。
  • earliest which means reading from the oldest offset in the partition
    这意味着从分区中最早的偏移量读取;自动从消费者上次开始消费的位置开始,进行消费。
  • none throw exception to the consumer if no previous offset is found for the consumer's group
    如果没有为使用者的组找到以前的偏移量,则不会向使用者抛出异常。

使用Assign订阅指定的分区,注意最后还需要使用Subscribe方法订阅

consumer.Assign(new TopicPartitionOffset(new TopicPartition(topic, new Partition(1,Offset.End;//从指定的Partition订阅消息使用Assign方法
consumer.Subscribe(topic;//订阅消息使用Subscribe方法

从指定的分区获取数据,并且指定了对应的偏移量

Offset 可以被设置为 Beginning、End、Stored 和 Unset。这些值的含义如下:

  1. End:从 Kafka 分区的最新消息开始消费。如果消费者在启动后到达了 Kafka 分区的末尾,它将停止消费,并等待新消息的到来。

  2. Unset:在消费者启动时,Offset 没有被设置。在这种情况下,消费者将根据 auto.offset.reset 配置项的值来决定从哪里开始消费。如果 auto.offset.reset 的值为 latest,则从最新的消息开始消费;如果 auto.offset.reset 的值为 earliest,则从最早的消息开始消费。

因此,存储的 Offset 必须要有效才能够被正确地使用

编程笔记 » C# Kafka重置到最新的偏移量,即从指定的Partition订阅消息使用Assign方法

赞同 (28) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽