This project is a pure-Go client library for TubeMQ that does not depend on the TubeMQ C++ library. Production is not supported yet.
Import the client
andconfig
library:
import "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client"
import "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config"
Import the log
library for log if needed:
import "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log"
Create a Consumer by parsing address:
cfg, err := config.ParseAddress("127.0.0.1:8099?topic=test_1&group=test_group") if err != nil { fmt.Errorf("Failed to parse address %s", err.Error()) panic(err) } c, err := client.NewConsumer(cfg) if err != nil { fmt.Errorf("new consumer error %s", err.Error()) panic(err) } defer c.Close() cr, err := c.GetMessage() // need to confirm by yourself. _, err = c.Confirm(cr.ConfirmContext, true) for _, msg := range cr.Messages { fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID, string(msg.Data)) }
Create a Consumer by constructing a config:
topicFilters := map[string][]string{"topic1": {"filter1", "filter2"}, "topic2": {"filter3", "filter4"}} partitionOffset := map[string]int64{"181895251:topic1:1": 0, "181895251:topic2:2": 10} cfg := config.New(config.WithMasters("127.0.0.1:8099"), config.WithGroup("group"), //For topic filters config.WithTopicFilters(topicFilters), // For bound consume config.WithBoundConsume("ss", 1, true, partitionOffset)) c, err := client.NewConsumer(cfg) if err != nil { fmt.Errorf("new consumer error %s", err.Error()) panic(err) } defer c.Close() cr, err := c.GetMessage() // need to confirm by yourself. _, err = c.Confirm(cr.ConfirmContext, true) for _, msg := range cr.Messages { fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID, string(msg.Data)) }
Multiple Goroutines consumption is also supported. More specific examples can be referred in Go Client Examples.