Go语言实现Kafka消息队列的完整示例

下面是一个使用Go语言操作Kafka消息队列的完整实现,包含生产者、消费者和管理工具的实现。

图片[1]_Go语言实现Kafka消息队列的完整示例_知途无界

一、环境准备

首先安装必要的Kafka Go客户端库:

go get github.com/segmentio/kafka-go

二、生产者实现

1. 基础生产者

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/segmentio/kafka-go"
)

func produce(brokers []string, topic string) {
	// 配置生产者
	writer := &kafka.Writer{
		Addr:         kafka.TCP(brokers...),
		Topic:        topic,
		Balancer:     &kafka.LeastBytes{}, // 分区选择策略
		BatchSize:    100,                 // 批量大小
		BatchTimeout: 10 * time.Millisecond, // 批量超时
		RequiredAcks: kafka.RequireAll,    // 消息确认级别
	}

	defer writer.Close()

	for i := 0; i < 10; i++ {
		msg := kafka.Message{
			Key:   []byte(fmt.Sprintf("Key-%d", i)),
			Value: []byte(fmt.Sprintf("Message-%d", i)),
		}

		err := writer.WriteMessages(context.Background(), msg)
		if err != nil {
			fmt.Printf("Failed to write message: %v\n", err)
			continue
		}

		fmt.Printf("Produced message: %s\n", msg.Value)
		time.Sleep(1 * time.Second) // 模拟延迟
	}
}

func main() {
	brokers := []string{"localhost:9092"}
	topic := "test-topic"
	produce(brokers, topic)
}

2. 高级生产者(带回调)

func produceWithCallback(brokers []string, topic string) {
	writer := &kafka.Writer{
		Addr:    kafka.TCP(brokers...),
		Topic:   topic,
		Async:   true, // 异步模式
		Completion: func(messages []kafka.Message, err error) {
			if err != nil {
				fmt.Printf("Failed to deliver messages: %v\n", err)
				return
			}
			for _, msg := range messages {
				fmt.Printf("Successfully delivered message: %s\n", msg.Value)
			}
		},
	}

	defer writer.Close()

	for i := 0; i < 10; i++ {
		msg := kafka.Message{
			Key:   []byte(fmt.Sprintf("Key-%d", i)),
			Value: []byte(fmt.Sprintf("Async-Message-%d", i)),
		}

		err := writer.WriteMessages(context.Background(), msg)
		if err != nil {
			fmt.Printf("Failed to queue message: %v\n", err)
		}
	}

	// 等待所有消息发送完成
	time.Sleep(5 * time.Second)
}

三、消费者实现

1. 基础消费者

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/segmentio/kafka-go"
)

func consume(brokers []string, topic string, groupID string) {
	// 配置消费者
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   brokers,
		Topic:     topic,
		GroupID:   groupID,               // 消费者组ID
		Partition: 0,                     // 指定分区
		MinBytes:  10e3,                  // 10KB
		MaxBytes:  10e6,                 // 10MB
		MaxWait:   10 * time.Second,     // 最长等待时间
	})

	defer reader.Close()

	for {
		msg, err := reader.ReadMessage(context.Background())
		if err != nil {
			fmt.Printf("Failed to read message: %v\n", err)
			break
		}

		fmt.Printf("Consumed message at offset %d: key=%s, value=%s\n", 
			msg.Offset, string(msg.Key), string(msg.Value))
	}
}

func main() {
	brokers := []string{"localhost:9092"}
	topic := "test-topic"
	groupID := "test-group"
	consume(brokers, topic, groupID)
}

2. 消费者组实现

func consumeGroup(brokers []string, topic string, groupID string) {
	// 配置消费者组
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:        brokers,
		Topic:         topic,
		GroupID:       groupID,
		StartOffset:   kafka.LastOffset, // 从最新位置开始消费
		CommitInterval: time.Second,     // 提交偏移量的频率
	})

	defer reader.Close()

	for {
		msg, err := reader.ReadMessage(context.Background())
		if err != nil {
			fmt.Printf("Failed to read message: %v\n", err)
			break
		}

		fmt.Printf("Consumed message from partition %d: %s\n", 
			msg.Partition, string(msg.Value))
		
		// 模拟处理耗时
		time.Sleep(500 * time.Millisecond)
	}
}

四、管理工具实现

1. 创建主题

func createTopic(brokers []string, topic string, partitions int) {
	conn, err := kafka.Dial("tcp", brokers[0])
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	topicConfigs := []kafka.TopicConfig{
		{
			Topic:             topic,
			NumPartitions:     partitions,
			ReplicationFactor: 1,
		},
	}

	err = conn.CreateTopics(topicConfigs...)
	if err != nil {
		panic(err.Error())
	}

	fmt.Printf("Topic %s created successfully\n", topic)
}

2. 列出所有主题

func listTopics(brokers []string) {
	conn, err := kafka.Dial("tcp", brokers[0])
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	partitions, err := conn.ReadPartitions()
	if err != nil {
		panic(err.Error())
	}

	topicMap := make(map[string]struct{})
	for _, p := range partitions {
		topicMap[p.Topic] = struct{}{}
	}

	fmt.Println("Available topics:")
	for topic := range topicMap {
		fmt.Println("-", topic)
	}
}

3. 查看消费者组偏移量

func getConsumerOffsets(brokers []string, groupID string) {
	conn, err := kafka.Dial("tcp", brokers[0])
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	partitions, err := conn.ReadPartitions()
	if err != nil {
		panic(err.Error())
	}

	offsets, err := conn.ConsumerOffsets(groupID, partitions)
	if err != nil {
		panic(err.Error())
	}

	for _, p := range partitions {
		fmt.Printf("Topic: %s, Partition: %d, Offset: %d\n",
			p.Topic, p.Partition, offsets[p])
	}
}

五、完整示例:生产消费集成

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	brokers := []string{"localhost:9092"}
	topic := "test-topic"
	groupID := "test-group"

	// 创建主题(如果不存在)
	createTopic(brokers, topic, 3)

	var wg sync.WaitGroup
	wg.Add(2)

	// 启动生产者
	go func() {
		defer wg.Done()
		produce(brokers, topic)
	}()

	// 启动消费者
	go func() {
		defer wg.Done()
		consumeGroup(brokers, topic, groupID)
	}()

	// 等待中断信号
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
	<-sigchan

	fmt.Println("\nShutting down...")
	wg.Wait()
}

六、性能优化建议

  1. 生产者优化​:
    • 调整BatchSizeBatchTimeout平衡延迟和吞吐量
    • 使用异步模式(Async: true)提高吞吐量
    • 根据需求选择适当的RequiredAcks级别
  2. 消费者优化​:
    • 调整MinBytesMaxWait减少请求次数
    • 合理设置CommitInterval避免频繁提交偏移量
    • 使用多个消费者实例并行处理不同分区
  3. 错误处理​:
    • 实现重试机制处理临时性错误
    • 监控消费者滞后情况(Consumer Lag)
    • 处理消息重复和丢失问题

七、常见问题解决

  1. 连接问题​: // 设置连接超时 dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, } writer := &kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: topic, Dialer: dialer, }
  2. 消息顺序保证​:
    • 为需要有序的消息使用相同的消息键(Key)
    • 确保这些消息被发送到同一分区
  3. 消费者再平衡处理​: reader := kafka.NewReader(kafka.ReaderConfig{ // ...其他配置 RebalanceTimeout: 60 * time.Second, MaxAttempts: 5, RetentionTime: 24 * time.Hour, })
  4. SSL/TLS配置​: dialer := &kafka.Dialer{ Timeout: 10 * time.Second, TLS: &tls.Config{ InsecureSkipVerify: true, // 仅测试环境使用 }, }

通过这个完整的Kafka Go实现示例,您可以快速构建高效可靠的消息队列系统。根据实际业务需求调整配置参数,并注意处理各种边界情况和错误场景。

© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞79 分享
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容