场景再现:比如昨天消费者晚上断掉了,今天上午我们会发现kafka消费的数据不是最新的,而是昨天晚上的数据,由于数据量比较多,也不会及时的消费到今天上午的数据,这个时候就需要我们对偏移量进行重置为最新的,以获取最新的数据。
关于AutoOffsetReset这个枚举的配置项如下:
-
latest
(default which means consumers will read messages from the tail of the partition
最新(默认,这意味着使用者将从分区的尾部读取消息,只消费最新的信息,即自从消费者上线后才开始推送来的消息。那么会导致忽略掉之前没有处理的消息。 -
earliest
which means reading from the oldest offset in the partition
这意味着从分区中最早的偏移量读取;自动从消费者上次开始消费的位置开始,进行消费。 -
none
throw exception to the consumer if no previous offset is found for the consumer's group
如果没有为使用者的组找到以前的偏移量,则不会向使用者抛出异常。
使用Assign订阅指定的分区,注意最后还需要使用Subscribe方法订阅
consumer.Assign(new TopicPartitionOffset(new TopicPartition(topic, new Partition(1,Offset.End;//从指定的Partition订阅消息使用Assign方法
consumer.Subscribe(topic;//订阅消息使用Subscribe方法
从指定的分区获取数据,并且指定了对应的偏移量
Offset 可以被设置为 Beginning、End、Stored 和 Unset。这些值的含义如下:
-
End:从 Kafka 分区的最新消息开始消费。如果消费者在启动后到达了 Kafka 分区的末尾,它将停止消费,并等待新消息的到来。
-
Unset:在消费者启动时,Offset 没有被设置。在这种情况下,消费者将根据 auto.offset.reset 配置项的值来决定从哪里开始消费。如果 auto.offset.reset 的值为 latest,则从最新的消息开始消费;如果 auto.offset.reset 的值为 earliest,则从最早的消息开始消费。
因此,存储的 Offset 必须要有效才能够被正确地使用