前言
kafka基础架构
现在假如有100T大小的消息要发送到kafka中,数据量非常大,一台机器存储不下,面对这种情况,你该如何设计呢?
-
Topic: 可以理解为一个队列,一个kafka集群中可以定义很多的topic,比如上图中的
-
Partition: 为了实现扩展性,提高吞吐量,一个非常大的
topic
可以分布到多个broker
(即服务器)上,一个topic
可以分为多个partition
,每个partition
是一个有序的队列。比如上图中的topicA被分成了3个partition
。 -
Replica: 副本,如果数据只放在一个
broker
中,万一这个broker
宕机了怎么办?为了实现高可用,一个topic
的每个分区都有若干个副本,一个Leader
和若干个Follower
。比如上图中的虚线连接的就是它的副本。 -
Leader: 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是
Leader
。 -
Follower: 每个分区多个副本中的“从”,实时从
Leader
中同步数据,保持和Leader
数据的同步。Leader
发生故障时,某个Follower
会成为新的Leader
。 -
Producer: 消息生产者,就是向
Kafka broker
发消息的客户端,后面详细讲解。 -
Consumer: 消息消费者,向
Kafka broker
取消息的客户端,多个Consumer
会组成一个消费者组,后面详细讲解。 -
Zookeeper:用来记录kafka中的一些元数据,比如kafka集群中的broker,leader是谁等等,但
Kafka
2.8.0版本以后也支持非zk的方式,大大减少了和zk的交互。
topicA
。
kafka生产者流程
在消息发送的过程中,涉及到了两个线程——main
线程和 Sender
线程。在 main
线程中创建了一个双端队列 RecordAccumulator
。main
线程将消息发送给 RecordAccumulator
,Sender
线程不断从 RecordAccumulator
中拉取消息发送到 Kafka Broker
。
- 在主线程中由
kafkaProducer
创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator
, 也称为消息收集器)中。
-
拦截器: 可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
- 序列化器: 用于在网络传输中将数据序列化为字节流进行传输,保证数据不会丢失。
- 分区器: 用于按照一定的规则将数据分发到不同的kafka broker节点中
-
Sender
线程负责从RecordAccumulator
获取消息并将其发送到Kafka
中。
RecordAccumulator
主要用来缓存消息以便 Sender
线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
RecordAccumulator
缓存的大小可以通过生产者客户端参数 buffer.memory
配置,默认值为 33554432B
,即 32M
。
- 主线程中发送过来的消息都会被迫加到
-
Sender
读取消息时,从双端队列的头部读取。ProducerBatch
是指一个消息批次;与此同时,会将较小的ProducerBatch
凑成一个较大ProducerBatch
,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch
大小可以通过batch.size
控制,默认16kb
。 -
Sender
线程会在有数据积累到batch.size
,默认16kb,或者如果数据迟迟未达到batch.size
,Sender
线程等待linger.ms
设置的时间到了之后就会获取数据。linger.ms
单位ms
,默认值是0ms
,表示没有延迟。
RecordAccumulator
的某个双端队列( Deque
)中,RecordAccumulator
内部为每个分区都维护了一个双端队列,即 Deque<ProducerBatch>
, 消息写入缓存时,追加到双端队列的尾部。
-
Sender
从RecordAccumulator
获取缓存的消息之后,会将数据封装成网络请求<Node,Request>
的形式,这样就可以将Request
请求发往各个Node
了。 - 请求在从
sender
线程发往Kafka
之前还会保存到InFlightRequests
中,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求。InFlightRequests
默认每个分区下最多缓存5个请求,可以通过配置参数为max.in.flight.request.per. connection
修改。 - 请求
Request
通过通道Selector
发送到kafka
节点。 - 发送后,需要等待kafka的应答机制,取决于配置项
acks
.
- 0:生产者发送过来的数据,不需要等待数据落盘就应答。
- 1:生产者发送过来的数据,
Leader
收到数据后应答。 - -1(all):生产者发送过来的数据,Leader和副本节点收齐数据后应答。默认值是-1,-1 和all 是等价的。
-
Request
请求接受到kafka的响应结果,如果成功的话,从InFlightRequests
清除请求,否则的话需要进行重发操作,可以通过配置项retries
决定,当消息发送出现错误的时候,系统会重发消息。retries
表示重试次数。默认是 int 最大值,2147483647
。 - 清理消息累加器
RecordAccumulator
中的数据。
kafka消费者流程
Kafka 中的消费是基于拉取模式的。消息的消费一般有两种模式:推送模式和拉取模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。
- 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
那么问题来了,kafka是如何指定消费者组的每个消费者消费哪个分区?每次消费的数量是多少呢?
一、如何制定消费方案
- 消费者consumerA,consumerB, consumerC向kafka集群中的协调器
coordinator
发送JoinGroup
的请求。coordinator
主要是用来辅助实现消费者组的初始化和分区的分配。
coordinator
老大节点选择 = groupid
的hashcode
值 % 50( __consumer_offsets
内置主题位移的分区数量)例如: groupid
的hashcode值 为1,1% 50 = 1
,那么__consumer_offsets
主题的1号分区,在哪个broker
上,就选择这个节点的coordinator
作为这个消费者组的老大。消费者组下的所有的消费者提交offset
的时候就往这个分区去提交offset
。
- 选出一个
consumer
作为消费中的leader
,比如上图中的ConsumerB
。 - 消费者
leader
制定出消费方案,比如谁来消费哪个分区等 - 把消费方案发给
coordinator
- 最后
coordinator
就把消费方 案下发给各个consumer
, 图中只画了一条线,实际上是有下发各个consumer
。
coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s
),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms
=5分钟),也会触发再平衡,也就是重新进行上面的流程。
二、消费者消费细节
- 消费者创建一个网络连接客户端
ConsumerNetworkClient
, 发送消费请求,可以进行如下配置:
-
fetch.max.bytes
: 每批次最大抓取大小,默认50M -
fetch.max.wait.ms
:最大超时时间,默认500ms
fetch.min.bytes
: 每批次最小抓取大小,默认1字节
- 发送请求到kafka集群
- 成功的回调,会将数据保存到
completedFetches
队列中 - 消费者从队列中抓取数据,根据配置
max.poll.records
一次拉取数据返回消息的最大条数,默认500条。 - 获取到数据后,需要经过反序列化器、拦截器等。
kafka的存储机制
topic分为多个partition
,每个partition对应于一个log
文件,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,每个partition
分为多个segment
。每个segment
包括:“.index
”文件、“.log
”文件和.timeindex
等文件,Producer
生产的数据会被不断追加到该log文件末端。
topic的名称,而“t1-0/t1-1”则表明这个目录是t1这个topic
的哪个partition
。
sparseindex)的方式构造消息的索引,如下图所示:
offset定位segment
文件
offset的最大offset
对应的索引项
log文件
Record
log文件写入4kb
数据,会往index
文件写入一条索引。通过参数log.index.interval.bytes
控制,默认4kb
。
那kafka中磁盘文件保存多久呢?
-
log.retention.minutes
,分钟。 -
log.retention.ms
,最高优先级毫秒。 -
log.retention.check.interval.ms
,负责设置检查周期,默认 5 分钟。
log.retention.hours
,最低优先级小时,默认 7 天。
总结
其实kafka中的细节十分多,本文也只是对kafka的一些核心机制从理论层面做了一个总结,更多的细节还是需要自行去实践,去学习。
欢迎关注个人公众号【JAVA旭阳】交流学习