$ kubectl get events
15m Warning FailedCreate replicaset/ml-pipeline-visualizationserver-865c7865bc
Error creating: pods "ml-pipeline-visualizationserver-865c7865bc-" is forbidden: error looking up service account default/default-editor: serviceaccount "default-editor" not found
构建K8s事件处理链路
- Eventrouter,开源的 Kubernetes event 处理器,它可以将所有集群事件整合汇总到某个 Kafka 主题中。
- Strimzi Operator,在 Kubernetes 中轻松管理 Kafka broker。
- 自定义 Go 二进制文件以将事件分发到相应的 Kafka 主题中。
为什么要把事件分发到不同的主题中?比方说,在集群的每个命名空间中存在与特定客户相关的 Kubernetes 资产,那么在使用这些资产之前你当然希望将相关事件隔离开。
创建 Kafka broker 和主题
我选择使用 Strimzi(strimzi.io/) 将 Kafka 部署到 Kubernetes 中。简而言之,它是用于创建和更新 Kafka broker 和主题的。你可以在官方文档中找到如何安装该 Operator 的详细说明:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: kube-events
spec:
entityOperator:
topicOperator: {}
userOperator: {}
kafka:
config:
default.replication.factor: 3
log.message.format.version: "2.6"
offsets.topic.replication.factor: 3
transaction.state.log.min.isr: 2
transaction.state.log.replication.factor: 3
listeners:
- name: plain
port: 9092
tls: false
type: internal
- name: tls
port: 9093
tls: true
type: internal
replicas: 3
storage:
type: jbod
volumes:
- deleteClaim: false
id: 0
size: 10Gi
type: persistent-claim
version: 2.6.0
zookeeper:
replicas: 3
storage:
deleteClaim: false
size: 10Gi
type: persistent-claim
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: cluster-events
spec:
config:
retention.ms: 7200000
segment.bytes: 1073741824
partitions: 1
replicas: 1
设置 EventRouter
apiVersion: v1
data:
config.json: |-
{
"sink": "kafka",
"kafkaBrokers": "kube-events-kafka-bootstrap.kube-events.svc.cluster.local:9092",
"kafkaTopic": "cluster-events"
}
kind: ConfigMap
metadata:
name: eventrouter-cm
验证设置是否正常工作
kubectl -n kube-events exec kube-events-kafka-0 -- bin/kafka-console-consumer.sh \
--bootstrap-server kube-events-kafka-bootstrap:9092 \
--topic kube-events \
--from-beginning
{"verb":"ADDED","event":{...}}
{"verb":"ADDED","event":{...}}
...
编写 Golang 消费者
- 消费者部分在 cluster-events 主题上监听传入的集群事件
- 生产者部分写入与事件的命名空间相匹配的 Kafka 主题中
如果为Kafka配置了适当的选项(默认情况),就不需要特地创建新的主题,因为 Kafka 会默认为你创建主题。这是 Kafka 客户端 API 的一个非常酷的功能。
p, err := kafka.NewProducer(cfg.Endpoint)
if err != nil {
sugar.Fatal("cannot create producer")
}
defer p.Close()
c, err := kafka.NewConsumer(cfg.Endpoint, cfg.Topic)
if err != nil {
sugar.Fatal("cannot create consumer")
}
defer c.Close()
run := true
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
sugar.Infof("signal %s received, terminating", sig)
run = false
}()
var wg sync.WaitGroup
go func() {
wg.Add(1)
for run {
data, err := c.Read()
if err != nil {
sugar.Errorf("read event error: %v", err)
time.Sleep(5 * time.Second)
continue
}
if data == nil {
continue
}
msg, err := event.CreateDestinationMessage(data)
if err != nil {
sugar.Errorf("cannot create destination event: %v", err)
}
p.Write(msg.Topic, msg.Message)
}
sugar.Info("worker thread done")
wg.Done()
}()
wg.Wait()
部署消费者
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: events-fanout
name: events-fanout
spec:
replicas: 1
selector:
matchLabels:
app: events-fanout
template:
metadata:
labels:
app: events-fanout
spec:
containers:
- image: emmsys/events-fanout:latest
name: events-fanout
command: [ "./events-fanout"]
args:
- -logLevel=info
env:
- name: ENDPOINT
value: kube-events-kafka-bootstrap:9092
- name: TOPIC
value: cluster-events
检查目标主题是否创建
kubectl -n kube-events get kafkatopics.kafka.strimzi.io -o name
kafkatopic.kafka.strimzi.io/cluster-events
kafkatopic.kafka.strimzi.io/kube-system
kafkatopic.kafka.strimzi.io/default
kafkatopic.kafka.strimzi.io/kafka
kafkatopic.kafka.strimzi.io/kube-events
总结
https://hackernoon.com/monitor-your-kubernetes-cluster-events-with-eventrouter-golang-and-kafka-wh2a35l0