
需要借助的库
github.com/Shopify/sarama // kafka主要的库*
github.com/bsm/sarama-cluster // kafka消费组
生产者
package producer
import (
	"fmt"
	"github.com/HappyTeemo7569/teemoKit/tlog"
	"github.com/Shopify/sarama"
	"kafkaDemo/define"
var (
	ProducerId = 1
type Producer struct {
	Producer   sarama.SyncProducer
	Topic      string //主题
	ProducerID int    //生产者Id
	MessageId  int
}
func (p *Producer InitProducer( {
	config := sarama.NewConfig(
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{define.SERVER_LIST}, config
	if err != nil {
		tlog.Error("producer closed, err:", err
		return
	}
	p.Producer = client
	p.Topic = define.TOPIC
	p.ProducerID = ProducerId
	p.MessageId = 1
	ProducerId++
}
func (p *Producer SendMessage( {
	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = p.Topic
	txt := fmt.Sprintf("ProducerID:%d  this is a test log %d",
		p.ProducerID, p.MessageId
	msg.Value = sarama.StringEncoder(txt
	// 发送消息
	pid, offset, err := p.Producer.SendMessage(msg
	//_, _, err := client.SendMessage(msg
	if err != nil {
		fmt.Println("send msg failed, err:", err
		return
	}
	tlog.Info(fmt.Sprintf("ProducerID:%d pid:%v offset:%v msg:%s",
		p.ProducerID, pid, offset, txt
	p.MessageId++
}
func (p *Producer Close( {
	p.Producer.Close(
}
消费者
package consumer
import (
	"github.com/HappyTeemo7569/teemoKit/tlog"
	"github.com/Shopify/sarama"
	"kafkaDemo/define"
type Consumer struct {
	Consumer   sarama.Consumer
	Topic      string
	ConsumerId int //消费者Id
}
func (c *Consumer InitConsumer( error {
	consumer, err := sarama.NewConsumer([]string{define.SERVER_LIST}, nil
	if err != nil {
		return err
	}
	c.Consumer = consumer
	c.Topic = define.TOPIC
	c.ConsumerId = ConsumerId
	ConsumerId++
	return nil
}
//指定partition
//offset 可以指定,传-1为获取最新offest
func (c *Consumer GetMessage(partitionId int32, offset int64 {
	if offset == -1 {
		offset = sarama.OffsetNewest
	}
	pc, err := c.Consumer.ConsumePartition(c.Topic, partitionId, offset
	if err != nil {
		tlog.Error("failed to start consumer for partition %d,err:%v", partitionId, err
		//That topic/partition is already being consumed
		return
	}
	// 异步从每个分区消费信息
	go func(sarama.PartitionConsumer {
		for msg := range pc.Messages( {
			tlog.Info("ConsumerId:%d Partition:%d Offset:%d Key:%v Value:%v", c.ConsumerId, msg.Partition, msg.Offset, msg.Key, string(msg.Value
		}
	}(pc
}
//遍历所有分区
func (c *Consumer GetMessageToAll(offset int64 {
	partitionList, err := c.Consumer.Partitions(c.Topic // 根据topic取到所有的分区
	if err != nil {
		tlog.Error("fail to get list of partition:err%v", err
		return
	}
	tlog.Info("所有partition:", partitionList
	for partition := range partitionList { // 遍历所有的分区
		c.GetMessage(int32(partition, offset
	}
}
主函数
func main( {
	tlog.Info("开始"
	go producer.Put(
	go consumer.Get(
	for {
		time.Sleep(time.Hour * 60
	}
}
func Put( {
	producer := new(Producer
	producer.InitProducer(
	go func( {
		for {
			producer.SendMessage(
			time.Sleep(1 * time.Second
		}
	}(
}
func Get( {
	offest := int64(0
	consumer := new(Consumer
	err := consumer.InitConsumer(
	if err != nil {
		tlog.Error("fail to init consumer, err:%v", err
		return
	}
	consumer.GetMessageToAll(offest
}
具体源码可以查看:
kafka_demo
生产环境中的优化
- 可以存储消费的节点到redis
- 需要顺序的消费的放到一个partition,或者利用哈希算法投递
- 传入一个通道,将业务逻辑和底层逻辑解耦。
package kafka
import (
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"github.com/Shopify/sarama"
	"io/ioutil"
	"log"
	"sync"
	"vliao.com/stellar/internal/core"
type KafkaConsumer struct {
	Node         []string
	Consumer     sarama.Consumer
	Topic        string
	MessageQueue chan []byte
}
func NewKafkaConsumer(topic string KafkaConsumer {
	return KafkaConsumer{
		Node:  core.GetKafkaConn(.Conn,
		Topic: core.GetServerMode( + "_" + topic,
	}
}
// Consume 获取所有分区
func (c *KafkaConsumer Consume( {
	config := sarama.NewConfig(
	config.Net.SASL.Enable = true
	config.Net.SASL.User = core.GetKafkaConn(.SASLUser
	config.Net.SASL.Password = core.GetKafkaConn(.SASLPassword
	config.Net.SASL.Handshake = true
	certBytes, err := ioutil.ReadFile(GetFullPath("only-4096-ca-cert"
	if err != nil {
		fmt.Println("kafka client read cert file failed ", err.Error(
		return
	}
	clientCertPool := x509.NewCertPool(
	ok := clientCertPool.AppendCertsFromPEM(certBytes
	if !ok {
		fmt.Println("kafka client failed to parse root certificate"
		return
	}
	config.Net.TLS.Config = &tls.Config{
		RootCAs:            clientCertPool,
		InsecureSkipVerify: true,
	}
	config.Net.TLS.Enable = true
	consumer, err := sarama.NewConsumer(c.Node, config
	if err != nil {
		log.Fatal("NewConsumer err: ", err
	}
	defer consumer.Close(
	// 先查询该 topic 有多少分区
	partitions, err := consumer.Partitions(c.Topic
	if err != nil {
		log.Fatal("Partitions err: ", err
	}
	var wg sync.WaitGroup
	wg.Add(len(partitions
	// 然后每个分区开一个 goroutine 来消费
	for _, partitionId := range partitions {
		//不开异步会导致一个消费完才会消费另外一个
		go c.consumeByPartition(consumer, c.Topic, partitionId, &wg
	}
	wg.Wait(
}
// 暂时只是业务一对一,也就是一个生产者产生的消息不会触发多个业务的变动
// 但是可以开多个消费者增加处理能力
func getOffsetCacheKey(topic string, partitionId int32 string {
	return fmt.Sprintf("kafka_offset_%s_%d", topic, partitionId
}
func setConsumeOffset(topic string, partitionId int32, offset int64 {
	core.RedisBy(core.RedisTypeServer.SetInt64(getOffsetCacheKey(topic, partitionId, offset
}
func getConsumeOffset(topic string, partitionId int32 (offset int64 {
	key := getOffsetCacheKey(topic, partitionId
	if core.RedisBy(core.RedisTypeServer.Exists(key {
		return core.RedisBy(core.RedisTypeServer.GetInt64(key + 1
	}
	//默认从最新开始
	setConsumeOffset(topic, partitionId, sarama.OffsetNewest
	return sarama.OffsetNewest
}
func (c *KafkaConsumer consumeByPartition(consumer sarama.Consumer, topic string, partitionId int32, wg *sync.WaitGroup {
	defer wg.Done(
	offset := getConsumeOffset(topic, partitionId
	partitionConsumer, err := consumer.ConsumePartition(topic, partitionId, offset
	if err != nil {
		log.Fatal("ConsumePartition err: ", err
	}
	defer partitionConsumer.Close(
	for message := range partitionConsumer.Messages( {
		log.Printf("[Consumer] topic: %s ; partitionid: %d; offset:%d, value: %s\n", topic, message.Partition, message.Offset, string(message.Value
		setConsumeOffset(topic, partitionId, message.Offset
		c.MessageQueue <- message.Value
	}
}
package kafka
import (
	"log"
	"testing"
	"vliao.com/stellar/internal/core"
func Test_Get(t *testing.T {
	core.TestMain(
	topic := "test_log"
	var kafkaConsumer = NewKafkaConsumer(topic
	kafkaConsumer.MessageQueue = make(chan []byte, 1000
	go kafkaConsumer.Consume(
	for {
		msg := <-kafkaConsumer.MessageQueue
		deal(msg
	}
}
func deal(msg []byte {
	log.Printf(string(msg
}
