下面是一个使用Go语言操作Kafka消息队列的完整实现,包含生产者、消费者和管理工具的实现。
![图片[1]_Go语言实现Kafka消息队列的完整示例_知途无界](https://zhituwujie.com/wp-content/uploads/2025/07/d2b5ca33bd20250715094943-1024x562.png)
一、环境准备
首先安装必要的Kafka Go客户端库:
go get github.com/segmentio/kafka-go
二、生产者实现
1. 基础生产者
package main
import (
"context"
"fmt"
"time"
"github.com/segmentio/kafka-go"
)
func produce(brokers []string, topic string) {
// 配置生产者
writer := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{}, // 分区选择策略
BatchSize: 100, // 批量大小
BatchTimeout: 10 * time.Millisecond, // 批量超时
RequiredAcks: kafka.RequireAll, // 消息确认级别
}
defer writer.Close()
for i := 0; i < 10; i++ {
msg := kafka.Message{
Key: []byte(fmt.Sprintf("Key-%d", i)),
Value: []byte(fmt.Sprintf("Message-%d", i)),
}
err := writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Printf("Failed to write message: %v\n", err)
continue
}
fmt.Printf("Produced message: %s\n", msg.Value)
time.Sleep(1 * time.Second) // 模拟延迟
}
}
func main() {
brokers := []string{"localhost:9092"}
topic := "test-topic"
produce(brokers, topic)
}
2. 高级生产者(带回调)
func produceWithCallback(brokers []string, topic string) {
writer := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Async: true, // 异步模式
Completion: func(messages []kafka.Message, err error) {
if err != nil {
fmt.Printf("Failed to deliver messages: %v\n", err)
return
}
for _, msg := range messages {
fmt.Printf("Successfully delivered message: %s\n", msg.Value)
}
},
}
defer writer.Close()
for i := 0; i < 10; i++ {
msg := kafka.Message{
Key: []byte(fmt.Sprintf("Key-%d", i)),
Value: []byte(fmt.Sprintf("Async-Message-%d", i)),
}
err := writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Printf("Failed to queue message: %v\n", err)
}
}
// 等待所有消息发送完成
time.Sleep(5 * time.Second)
}
三、消费者实现
1. 基础消费者
package main
import (
"context"
"fmt"
"time"
"github.com/segmentio/kafka-go"
)
func consume(brokers []string, topic string, groupID string) {
// 配置消费者
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID, // 消费者组ID
Partition: 0, // 指定分区
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
MaxWait: 10 * time.Second, // 最长等待时间
})
defer reader.Close()
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
fmt.Printf("Failed to read message: %v\n", err)
break
}
fmt.Printf("Consumed message at offset %d: key=%s, value=%s\n",
msg.Offset, string(msg.Key), string(msg.Value))
}
}
func main() {
brokers := []string{"localhost:9092"}
topic := "test-topic"
groupID := "test-group"
consume(brokers, topic, groupID)
}
2. 消费者组实现
func consumeGroup(brokers []string, topic string, groupID string) {
// 配置消费者组
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID,
StartOffset: kafka.LastOffset, // 从最新位置开始消费
CommitInterval: time.Second, // 提交偏移量的频率
})
defer reader.Close()
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
fmt.Printf("Failed to read message: %v\n", err)
break
}
fmt.Printf("Consumed message from partition %d: %s\n",
msg.Partition, string(msg.Value))
// 模拟处理耗时
time.Sleep(500 * time.Millisecond)
}
}
四、管理工具实现
1. 创建主题
func createTopic(brokers []string, topic string, partitions int) {
conn, err := kafka.Dial("tcp", brokers[0])
if err != nil {
panic(err.Error())
}
defer conn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: partitions,
ReplicationFactor: 1,
},
}
err = conn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
fmt.Printf("Topic %s created successfully\n", topic)
}
2. 列出所有主题
func listTopics(brokers []string) {
conn, err := kafka.Dial("tcp", brokers[0])
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err.Error())
}
topicMap := make(map[string]struct{})
for _, p := range partitions {
topicMap[p.Topic] = struct{}{}
}
fmt.Println("Available topics:")
for topic := range topicMap {
fmt.Println("-", topic)
}
}
3. 查看消费者组偏移量
func getConsumerOffsets(brokers []string, groupID string) {
conn, err := kafka.Dial("tcp", brokers[0])
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err.Error())
}
offsets, err := conn.ConsumerOffsets(groupID, partitions)
if err != nil {
panic(err.Error())
}
for _, p := range partitions {
fmt.Printf("Topic: %s, Partition: %d, Offset: %d\n",
p.Topic, p.Partition, offsets[p])
}
}
五、完整示例:生产消费集成
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
brokers := []string{"localhost:9092"}
topic := "test-topic"
groupID := "test-group"
// 创建主题(如果不存在)
createTopic(brokers, topic, 3)
var wg sync.WaitGroup
wg.Add(2)
// 启动生产者
go func() {
defer wg.Done()
produce(brokers, topic)
}()
// 启动消费者
go func() {
defer wg.Done()
consumeGroup(brokers, topic, groupID)
}()
// 等待中断信号
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
<-sigchan
fmt.Println("\nShutting down...")
wg.Wait()
}
六、性能优化建议
- 生产者优化:
- 调整
BatchSize和BatchTimeout平衡延迟和吞吐量 - 使用异步模式(
Async: true)提高吞吐量 - 根据需求选择适当的
RequiredAcks级别
- 调整
- 消费者优化:
- 调整
MinBytes和MaxWait减少请求次数 - 合理设置
CommitInterval避免频繁提交偏移量 - 使用多个消费者实例并行处理不同分区
- 调整
- 错误处理:
- 实现重试机制处理临时性错误
- 监控消费者滞后情况(Consumer Lag)
- 处理消息重复和丢失问题
七、常见问题解决
- 连接问题:
// 设置连接超时 dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, } writer := &kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: topic, Dialer: dialer, } - 消息顺序保证:
- 为需要有序的消息使用相同的消息键(Key)
- 确保这些消息被发送到同一分区
- 消费者再平衡处理:
reader := kafka.NewReader(kafka.ReaderConfig{ // ...其他配置 RebalanceTimeout: 60 * time.Second, MaxAttempts: 5, RetentionTime: 24 * time.Hour, }) - SSL/TLS配置:
dialer := &kafka.Dialer{ Timeout: 10 * time.Second, TLS: &tls.Config{ InsecureSkipVerify: true, // 仅测试环境使用 }, }
通过这个完整的Kafka Go实现示例,您可以快速构建高效可靠的消息队列系统。根据实际业务需求调整配置参数,并注意处理各种边界情况和错误场景。
© 版权声明
文中内容均来源于公开资料,受限于信息的时效性和复杂性,可能存在误差或遗漏。我们已尽力确保内容的准确性,但对于因信息变更或错误导致的任何后果,本站不承担任何责任。如需引用本文内容,请注明出处并尊重原作者的版权。
THE END

























暂无评论内容