使用Golang(Sarama)連接Kafka

近期在使用Go建立Http Server到連接Kafka,接著還要做Parser存進資料庫,趁著技藝還是熱騰騰時來記錄一下,跟大家分享

下面是使用Golang 建立Http Server 並將資料傳入Kafka的方法

首先說明一下,我是用的是Sarama的Package,所以大家可以先安裝Sarama

 

// 這邊先import需要的package

package main

import (
    "github.com/Shopify/sarama"
    "flag"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "strings"
    "time"
    "fmt"
    "io"
    "bufio"
    "strconv"
)
// 相關連線設定(your_port_number:設定想監聽的port、your_kafka_IP:kafka_port設定連接的Kafka和port)
var (
    addr      = flag.String("addr", ":your_port_number", "The address to bind to")
    brokers   = flag.String("brokers", "your_kafka_IP:kafka_port", "The Kafka brokers to connect to, as a comma separated list")
    verbose   = flag.Bool("verbose", false, "Turn on Sarama logging")
)

func main() {
    flag.Parse()

    if *verbose {
        sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
    }

    if *brokers == "" {
        flag.PrintDefaults()
        os.Exit(1)
    }

    brokerList := strings.Split(*brokers, ",")
    log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))

    server := &Server{
        AccessLogProducer: newAccessLogProducer(brokerList),
    }
    defer func() {
        if err := server.Close(); err != nil {
            log.Println("Failed to close server", err)
        }
    }()

    log.Fatal(server.Run(*addr))
}

type Server struct {
    AccessLogProducer sarama.AsyncProducer
}

func (s *Server) Close() error {
    if err := s.AccessLogProducer.Close(); err != nil {
        log.Println("Failed to shut down access log producer cleanly", err)
    }
    return nil
}

func (s *Server) Handler() http.Handler {
    return s.withAccessLog()
}

func (s *Server) Run(addr string) error {
    httpServer := &http.Server{
        Addr:    addr,
        Handler: s.Handler(),
    }

    log.Printf("Listening for requests on %s...\n", addr)
    return httpServer.ListenAndServe()
}
// 執行資料解析與送入Kafka
func (s *Server) withAccessLog() http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        switch r.URL.Path {
            // 網址路徑(your_path:填入您的網址,可用此方法分別處理來自不同路徑的資料)
            case "/your_path":
                // 解析收到的資料
                bodyData, _ := ioutil.ReadAll(r.Body)
                // 送入Kafka(yout_kafka_topic:填入您的Topic即可)
                s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
                    Topic: "yout_kafka_topic",
                    Value: sarama.StringEncoder(bodyData),
                }

             // 如果未指定路徑,資料將會進入預設的方法
            default:

                bodyData, _ := ioutil.ReadAll(r.Body)
                s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
                    Topic: "yout_kafka_topic",
                    Value: sarama.StringEncoder(bodyData),
                }
        }
    })
}

// 初始化Kafka Producer的相關設定
func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {

    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForLocal       
    config.Producer.Compression = sarama.CompressionSnappy  
    config.Producer.Flush.Frequency = 500 * time.Millisecond 

    producer, err := sarama.NewAsyncProducer(brokerList, config)
    if err != nil {
        log.Fatalln("Failed to start Sarama producer:", err)
    }

    go func() {
        for err := range producer.Errors() {
            log.Println("Failed to write access log entry:", err)
        }
    }()

    return producer
}

 

arrow
arrow
    全站熱搜

    newaurora 發表在 痞客邦 留言(0) 人氣()