第一批:- 我正在尝试从 100 个平面文件中提取数据并将其加载到一个数组中,然后将它们作为字节数组一个一个地插入到 kafka 生产者中。
第二批:- 我从 kafka 消费者消费,然后将它们插入 NoSQL 数据库。
我在 Kafka 的 shopify sarama golang 包的配置文件中使用了 Offsetnewset。
我可以接收消息并将消息插入到 kafka,但在消费时我只收到第一条消息。因为我在 sarama 配置中提供了最新的 Offset。 我怎样才能得到这里的所有数据。
请您参考如下方法:
如果没有任何代码或关于如何配置 kafka 的更深入的解释(即:主题、分区等),很难说出一些事情,所以我想到的快速检查很少:
假设您在开始制作之前开始使用 OffsetNewest 集进行消费,可能发生的一件事是您没有从该主题的所有分区中消费,关于 sarama 文档,您必须通过创建 PartitionConsumers 显式地使用每个分区。来自 https://godoc.org/github.com/Shopify/sarama#Consumer 中的示例:
partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest) if err != nil { panic(err) } ... consumed := 0 ConsumerLoop: for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Consumed message offset %d\n", msg.Offset) consumed++ case <-signals: break ConsumerLoop } }事实上,您在生成所有事件后开始消费,因此,读取所有事件的指针不是 OffsetNewest,而是 OffsetOldest。
<
很抱歉无法为您提供更有用的答案,但也许如果您粘贴一些代码或提供更多详细信息,我们可以提供更多帮助。






