我需要使用golang来访问kafka,所以我在docker中安装了kafka和zookepper。

1.这里是kafka安装脚本:

# pull images 
docker pull wurstmeister/zookeeper  
docker pull wurstmeister/kafka 
 
# run kafka & zookepper 
docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper   
docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092:9092 --link zookeeper:zk -t wurstmeister/kafka   
 
# enter container 
docker exec -it ${CONTAINER ID} /bin/bash   
cd opt/kafka_2.11-0.10.1.1/  
 
# make a tpoic 
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka  
 
# start a producer in terminal-1 
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka  
 
# start another terminal-2 and start a consumer 
bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning  

当我在生产者中输入一些消息时,消费者会立即得到它。 所以我认为 kafka 工作正常

2.现在我需要用golang创建一个consumer来访问kafka。

这是我的 golang 演示代码:

import "github.com/bsm/sarama-cluster" 
func Consumer(){ 
    // init (custom) config, enable errors and notifications 
    config := cluster.NewConfig() 
    config.Consumer.Return.Errors = true 
    config.Group.Return.Notifications = true 
 
    // init consumer 
    brokers := []string{"192.168.9.100:9092"} 
    topics := []string{"mykafka"} 
    consumer, err := cluster.NewConsumer(brokers, "my-group-id", topics, config) 
    if err != nil { 
        panic(err) 
    } 
    defer consumer.Close() 
 
    // trap SIGINT to trigger a shutdown. 
    signals := make(chan os.Signal, 1) 
    signal.Notify(signals, os.Interrupt) 
 
    // consume messages, watch errors and notifications 
    for { 
        select { 
        case msg, more := <-consumer.Messages(): 
            if more { 
                fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) 
                consumer.MarkOffset(msg, "")    // mark message as processed 
            } 
        case err, more := <-consumer.Errors(): 
            if more { 
                log.Printf("Error: %s\n", err.Error()) 
            } 
        case ntf, more := <-consumer.Notifications(): 
            if more { 
                log.Printf("Rebalanced: %+v\n", ntf) 
            } 
        case <-signals: 
            return 
    } 
} 

实际上这个演示代码是从github repo的演示中复制的:sarama-cluster

运行代码时出现错误:

kafka: client has run out of available brokers to talk to (Is your cluster reachable?) 

我在启动kafka时确实使用了端口映射,但在golang中无法访问它

有没有办法用curl访问kafka? 我试过:

curl http://192.168.99.10:9092 

而kafka报错:

[2017-08-02 06:39:15,232] WARN Unexpected error from /192.168.99.1; closing connection (org.apache.kafka.common.network.Selector) 
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600) 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:95) 
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75) 
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203) 
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167) 
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:379) 
    at org.apache.kafka.common.network.Selector.poll(Selector.java:326) 
    at kafka.network.Processor.poll(SocketServer.scala:499) 
    at kafka.network.Processor.run(SocketServer.scala:435) 
    at java.lang.Thread.run(Thread.java:748) 

顺便说一句:

我用的是windows 7

dcoker机器的ip :192.168.99.100

这让我发疯

有什么建议或解决方案吗?欣赏!!!

请您参考如下方法:

如果你想创建一个消费者来听取来自 Kafka 的主题,让我们尝试一下。 我使用了教程中的 confluent-kafka-go:https://github.com/confluentinc/confluent-kafka-go

这是 main.go 文件中的代码:

import ( 
    "fmt" 
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" 
) 
 
func main() { 
 
    c, err := kafka.NewConsumer(&kafka.ConfigMap{ 
        "bootstrap.servers": "localhost", 
        "group.id":          "myGroup", 
        "auto.offset.reset": "earliest", 
    }) 
 
    if err != nil { 
        panic(err) 
    } 
 
    c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil) 
 
    for { 
        msg, err := c.ReadMessage(-1) 
        if err == nil { 
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) 
        } else { 
            // The client will automatically try to recover from all errors. 
            fmt.Printf("Consumer error: %v (%v)\n", err, msg) 
        } 
    } 
 
    c.Close() 
} 

如果你使用 docker 构建:按照这个注释添加合适的包

对于基于 Debian 和 Ubuntu 的发行版,从标准存储库或使用 Confluent 的 Deb 存储库安装 librdkafka-dev。

对于基于 Redhat 的发行版,使用 Confluent 的 YUM 存储库安装 librdkafka-devel。

对于 MacOS X,从 Homebrew 安装 librdkafka。如果您还没有 brew install pkg-config,您可能还需要它。 brew 安装 librdkafka pkg-config。

对于 Alpine:apk 添加 librdkafka-dev pkgconf Windows 不支持 confluent-kafka-go。

With Alpine, please remember that install the community version, because it cannot install librdkafka with max version 1.1.0 (not use Alpine community version)

祝你好运!


评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!