好湿?好紧?好多水好爽自慰,久久久噜久噜久久综合,成人做爰A片免费看黄冈,机机对机机30分钟无遮挡

主頁 > 知識庫 > golang如何使用sarama訪問kafka

golang如何使用sarama訪問kafka

熱門標簽:西部云谷一期地圖標注 浙江高速公路地圖標注 南通如皋申請開通400電話 中國地圖標注省會高清 學海導航地圖標注 地圖標注的汽車標 江西轉化率高的羿智云外呼系統 高德地圖標注口訣 廣州呼叫中心外呼系統

下面一個客戶端代碼例子訪問kafka服務器,來發送和接受消息。

使用方式

1、命令行參數

$ ./kafkaclient -h
Usage of ./client:
 -ca string
  CA Certificate (default "ca.pem")
 -cert string
  Client Certificate (default "cert.pem")
 -command string
  consumer|producer (default "consumer")
 -host string
  Common separated kafka hosts (default "localhost:9093")
 -key string
  Client Key (default "key.pem")
 -partition int
  Kafka topic partition
 -tls
  TLS enable
 -topic string
  Kafka topic (default "test--topic")

2、作為producer啟動

$ ./kafkaclient -command producer \

 -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command producer \

 -tls -cert client.pem -key client.key -ca ca.pem \

 -host kafka1:9093,kafka2:9093

producer發送消息給kafka:

> aaa
2018/12/15 07:11:21 Produced message: [aaa]
> bbb
2018/12/15 07:11:30 Produced message: [bbb]
> quit

3、作為consumer啟動

$ ./kafkaclient -command consumer \

 -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command consumer \

 -tls -cert client.pem -key client.key -ca ca.pem \

 -host kafka1:9093,kafka2:9093

consumer從kafka接受消息:

2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]

完整源代碼如下

這個代碼使用到了Shopify/sarama庫,請自行下載使用。

$ cat kafkaclient.go
package main

import (
 "flag"
 "fmt"
 "log"
 "os"
 "io/ioutil"
 "bufio"
 "strings"

 "crypto/tls"
 "crypto/x509"

 "github.com/Shopify/sarama"
)

var (
 command  string
 tlsEnable bool
 hosts  string
 topic  string
 partition int
 clientcert string
 clientkey string
 cacert  string
)

func main() {
 flag.StringVar(command, "command",  "consumer",   "consumer|producer")
 flag.BoolVar(tlsEnable, "tls",   false,    "TLS enable")
 flag.StringVar(hosts,  "host",   "localhost:9093", "Common separated kafka hosts")
 flag.StringVar(topic,  "topic",  "test--topic",  "Kafka topic")
 flag.IntVar(partition,  "partition", 0,     "Kafka topic partition")
 flag.StringVar(clientcert, "cert",   "cert.pem",   "Client Certificate")
 flag.StringVar(clientkey, "key",   "key.pem",   "Client Key")
 flag.StringVar(cacert,  "ca",   "ca.pem",   "CA Certificate")
 flag.Parse()

 config := sarama.NewConfig()
 if tlsEnable {
  //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
  if err != nil {
   log.Fatal(err)
  }

  config.Net.TLS.Enable = true
  config.Net.TLS.Config = tlsConfig
 }
 client, err := sarama.NewClient(strings.Split(hosts, ","), config)
 if err != nil {
  log.Fatalf("unable to create kafka client: %q", err)
 }

 if command == "consumer" {
  consumer, err := sarama.NewConsumerFromClient(client)
  if err != nil {
   log.Fatal(err)
  }
  defer consumer.Close()
  loopConsumer(consumer, topic, partition)
 } else {
  producer, err := sarama.NewAsyncProducerFromClient(client)
  if err != nil {
   log.Fatal(err)
  }
  defer producer.Close()
  loopProducer(producer, topic, partition)
 }
}

func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
 // load client cert
 clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
 if err != nil {
  return nil, err
 }

 // load ca cert pool
 cacert, err := ioutil.ReadFile(cacertfile)
 if err != nil {
  return nil, err
 }
 cacertpool := x509.NewCertPool()
 cacertpool.AppendCertsFromPEM(cacert)

 // generate tlcconfig
 tlsConfig := tls.Config{}
 tlsConfig.RootCAs = cacertpool
 tlsConfig.Certificates = []tls.Certificate{clientcert}
 tlsConfig.BuildNameToCertificate()
 // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
 return tlsConfig, err
}

func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
 scanner := bufio.NewScanner(os.Stdin)
 fmt.Print("> ")
 for scanner.Scan() {
  text := scanner.Text()
  if text == "" {
  } else if text == "exit" || text == "quit" {
   break
  } else {
   producer.Input() - sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
   log.Printf("Produced message: [%s]\n",text)
  }
  fmt.Print("> ")
 }
}

func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
 partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
 if err != nil {
  log.Println(err)
  return
 }
 defer partitionConsumer.Close()

 for {
  msg := -partitionConsumer.Messages()
  log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
 }
}

編譯:

$ go build kafkaclient.go

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

您可能感興趣的文章:
  • 在Golang中使用http.FileServer返回靜態文件的操作
  • 解決golang http.FileServer 遇到的坑
  • 解決golang處理http response碰到的問題和需要注意的點
  • golang bad file descriptor問題的解決方法
  • golang復用http.request.body的方法示例
  • golang連接kafka消費進ES操作

標簽:吐魯番 許昌 保定 東營 德宏 曲靖 貴州 常州

巨人網絡通訊聲明:本文標題《golang如何使用sarama訪問kafka》,本文關鍵詞  golang,如何,使用,sarama,訪問,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《golang如何使用sarama訪問kafka》相關的同類信息!
  • 本頁收集關于golang如何使用sarama訪問kafka的相關信息資訊供網民參考!
  • 推薦文章
    主站蜘蛛池模板: 污污在线观看网站| 你的欲梦喷水视频| 超薄透明乳罩情趣丁字内裤| A级毛片高清免费观看| 国产精品国产精品国产| 绫清竹的堕落h嗯啊好深啊| 黄色软件下载链接| 成人一级视频| 国产精品久久久久无码AV铜川| 免费jzzjzz在线播放视频| 成人国产在线视频| 午夜福利小视频在线观看| 美女mm视频| 淫欲的代价完整版| 国产理论最新国产精品视频| 女人zooxx禽交vid| 99久久就热视颖精品98| 国产?麻豆?女教师张津瑜| 国产天堂在线一区二区三区| 欧美一级纶理片免费| 激情爱爱动态图a| 工本口子库入口免费密码在哪| 再深点灬舒服灬受不了了APP冫| jk制服女装| 教坊司玉势深处调教| 美女的私密秘?撒尿无痕| 小雪爽?好大?快?深点| 成年人啪啪| 合作市| 97国产精品久久碰碰蜜臀小说| 姐妹的诱惑在线观看| 日韩精品高分影片| 亚洲日韩精品秘?在线观看| 成人网站?视频免费涩涩屋漫画| 三级黄色毛片网站| 国产免费黄| 美国一级大黄一片高清| 国产看真人毛片爱做A片| 美女走光| 亚洲狠狠干| 日本又爽又激情A片|