我正在使用 Kafka-go 库在 Kafka 中构建一个请求-响应设置,使用消息 key 作为相关 ID。 我的设置在没有并发的情况下工作正常,但是当消息开始在单独的 goroutine 中发送时,读取器部分会跳过正确的键(因为其他例程可能已经读取了它)。

考虑到连接由不同的 goroutine 共享,我如何才能只读取主题中的特定键?

下面的客户端示例(为简洁起见删除了错误评估):

package main 
 
import ( 
    "bytes" 
    "context" 
    "fmt" 
    "sync" 
    "time" 
 
    "github.com/google/uuid" 
    kafka "github.com/segmentio/kafka-go" 
) 
 
var wg sync.WaitGroup 
 
func requestMessage(connR *kafka.Conn, connW *kafka.Conn, body []byte, index int) { 
    currentUUID := uuid.New() 
    byteUUID := []byte(fmt.Sprintf("%s", currentUUID)) 
    connW.WriteMessages(kafka.Message{ 
        Key:   byteUUID, 
        Value: body, 
    }) 
    fmt.Println("Posted id " + string(byteUUID)) 
    for { 
        m, _ := connR.ReadMessage(10e6) 
        if bytes.Equal(m.Key, byteUUID) { 
            break 
        } 
    } 
 
    wg.Done() 
    fmt.Println("Done " + string(byteUUID)) 
 
} 
 
func main() { 
    iterations := 100 
    interval := 500 * time.Millisecond 
    kafkaURL := "kafka:9092" 
    topic := "benchmarktopic" 
    partition := 0 
    connW, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic, partition) 
    defer connW.Close() 
    connR, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic+"response", partition) 
    defer connR.Close() 
    for i := 0; i < iterations; i++ { 
        <-time.After(interval) 
        go requestMessage(connR, connW, []byte("body"), i) 
        wg.Add(1) 
    } 
    wg.Wait() 
} 

请您参考如下方法:

您不能真正只从 Kafka 主题分区中读取特定键。 问题是您的记录将根据 key 的哈希值(默认行为)分派(dispatch)到特定分区。所以你可能在同一个分区中有不同的键。因此,只要您的键多于分区数,您就会找到一个包含不同键的分区。

我想到的唯一一种方法是为您的主题设置 N 个分区,其中 N 是您可以拥有的不同键的数量(如果您使用 uuid 作为键,这是一个很大的数字)并使用静态分配分区映射(键 ->分区)到您的生产者/消费者。

顺便说一句,您已经将第 0 部分分配给了您的制作人,想知道为什么吗?

亚尼克


评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!