使用Golang(Sarama)連接Kafka
近期在使用Go建立Http Server到連接Kafka,接著還要做Parser存進資料庫,趁著技藝還是熱騰騰時來記錄一下,跟大家分享
下面是使用Golang 建立Http Server 並將資料傳入Kafka的方法
首先說明一下,我是用的是Sarama的Package,所以大家可以先安裝Sarama
// 這邊先import需要的package
package main
import (
"github.com/Shopify/sarama"
"flag"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"time"
"fmt"
"io"
"bufio"
"strconv"
)
// 相關連線設定(your_port_number:設定想監聽的port、your_kafka_IP:kafka_port設定連接的Kafka和port)
var (
addr = flag.String("addr", ":your_port_number", "The address to bind to")
brokers = flag.String("brokers", "your_kafka_IP:kafka_port", "The Kafka brokers to connect to, as a comma separated list")
verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
)
func main() {
flag.Parse()
if *verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
if *brokers == "" {
flag.PrintDefaults()
os.Exit(1)
}
brokerList := strings.Split(*brokers, ",")
log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
server := &Server{
AccessLogProducer: newAccessLogProducer(brokerList),
}
defer func() {
if err := server.Close(); err != nil {
log.Println("Failed to close server", err)
}
}()
log.Fatal(server.Run(*addr))
}
type Server struct {
AccessLogProducer sarama.AsyncProducer
}
func (s *Server) Close() error {
if err := s.AccessLogProducer.Close(); err != nil {
log.Println("Failed to shut down access log producer cleanly", err)
}
return nil
}
func (s *Server) Handler() http.Handler {
return s.withAccessLog()
}
func (s *Server) Run(addr string) error {
httpServer := &http.Server{
Addr: addr,
Handler: s.Handler(),
}
log.Printf("Listening for requests on %s...\n", addr)
return httpServer.ListenAndServe()
}
// 執行資料解析與送入Kafka
func (s *Server) withAccessLog() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
// 網址路徑(your_path:填入您的網址,可用此方法分別處理來自不同路徑的資料)
case "/your_path":
// 解析收到的資料
bodyData, _ := ioutil.ReadAll(r.Body)
// 送入Kafka(yout_kafka_topic:填入您的Topic即可)
s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
Topic: "yout_kafka_topic",
Value: sarama.StringEncoder(bodyData),
}
// 如果未指定路徑,資料將會進入預設的方法
default:
bodyData, _ := ioutil.ReadAll(r.Body)
s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
Topic: "yout_kafka_topic",
Value: sarama.StringEncoder(bodyData),
}
}
})
}
// 初始化Kafka Producer的相關設定
func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = 500 * time.Millisecond
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
return producer
}
留言列表