我正在使用 Kafka-go 库在 Kafka 中构建一个请求-响应设置,使用消息 key 作为相关 ID。 我的设置在没有并发的情况下工作正常,但是当消息开始在单独的 goroutine 中发送时,读取器部分会跳过正确的键(因为其他例程可能已经读取了它)。
考虑到连接由不同的 goroutine 共享,我如何才能只读取主题中的特定键?
下面的客户端示例(为简洁起见删除了错误评估):
package main
import (
"bytes"
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
kafka "github.com/segmentio/kafka-go"
)
var wg sync.WaitGroup
func requestMessage(connR *kafka.Conn, connW *kafka.Conn, body []byte, index int) {
currentUUID := uuid.New()
byteUUID := []byte(fmt.Sprintf("%s", currentUUID))
connW.WriteMessages(kafka.Message{
Key: byteUUID,
Value: body,
})
fmt.Println("Posted id " + string(byteUUID))
for {
m, _ := connR.ReadMessage(10e6)
if bytes.Equal(m.Key, byteUUID) {
break
}
}
wg.Done()
fmt.Println("Done " + string(byteUUID))
}
func main() {
iterations := 100
interval := 500 * time.Millisecond
kafkaURL := "kafka:9092"
topic := "benchmarktopic"
partition := 0
connW, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic, partition)
defer connW.Close()
connR, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic+"response", partition)
defer connR.Close()
for i := 0; i < iterations; i++ {
<-time.After(interval)
go requestMessage(connR, connW, []byte("body"), i)
wg.Add(1)
}
wg.Wait()
}
请您参考如下方法:
您不能真正只从 Kafka 主题分区中读取特定键。 问题是您的记录将根据 key 的哈希值(默认行为)分派(dispatch)到特定分区。所以你可能在同一个分区中有不同的键。因此,只要您的键多于分区数,您就会找到一个包含不同键的分区。
我想到的唯一一种方法是为您的主题设置 N 个分区,其中 N 是您可以拥有的不同键的数量(如果您使用 uuid 作为键,这是一个很大的数字)并使用静态分配分区映射(键 ->分区)到您的生产者/消费者。
顺便说一句,您已经将第 0 部分分配给了您的制作人,想知道为什么吗?
亚尼克






