IT虾米网

Golang Kafka 不消耗所有消息 offsetnewest

third_qq_acbf90bbd2dede1d 2023年11月22日 编程语言 252 0

第一批:- 我正在尝试从 100 个平面文件中提取数据并将其加载到一个数组中,然后将它们作为字节数组一个一个地插入到 kafka 生产者中。

第二批:- 我从 kafka 消费者消费,然后将它们插入 NoSQL 数据库。

我在 Kafka 的 shopify sarama golang 包的配置文件中使用了 Offsetnewset。

我可以接收消息并将消息插入到 kafka,但在消费时我只收到第一条消息。因为我在 sarama 配置中提供了最新的 Offset。 我怎样才能得到这里的所有数据。

请您参考如下方法:

如果没有任何代码或关于如何配置 kafka 的更深入的解释(即:主题、分区等),很难说出一些事情,所以我想到的快速检查很少:

  1. 假设您在开始制作之前开始使用 OffsetNewest 集进行消费,可能发生的一件事是您没有从该主题的所有分区中消费,关于 sarama 文档,您必须通过创建 PartitionConsumers 显式地使用每个分区。来自 https://godoc.org/github.com/Shopify/sarama#Consumer 中的示例:

    partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest) 
    if err != nil { 
        panic(err) 
    } 
     
    ... 
     
    consumed := 0 
    ConsumerLoop: 
    for { 
        select { 
        case msg := <-partitionConsumer.Messages(): 
            log.Printf("Consumed message offset %d\n", msg.Offset) 
            consumed++ 
        case <-signals: 
            break ConsumerLoop 
        } 
    } 
    
  2. 事实上,您在生成所有事件后开始消费,因此,读取所有事件的指针不是 OffsetNewest,而是 OffsetOldest。

    <

很抱歉无法为您提供更有用的答案,但也许如果您粘贴一些代码或提供更多详细信息,我们可以提供更多帮助。


评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!