tree: 5d9dcda874bafc7d9aff44320e9b1fe5828af878 [path history] [tgz]
  1. client/
  2. codec/
  3. config/
  4. errs/
  5. example/
  6. flowctrl/
  7. log/
  8. metadata/
  9. multiplexing/
  10. protocol/
  11. remote/
  12. rpc/
  13. selector/
  14. sub/
  15. tdmsg/
  16. transport/
  17. util/
  18. go.mod
  19. go.sum
  20. README.md
inlong-tubemq/tubemq-client-twins/tubemq-client-go/README.md

Go Report Card Language

TubeMQ Go Client Library

Goal

This project is a pure-Go client library for TubeMQ that does not depend on the TubeMQ C++ library. Production is not supported yet.

Requirements

  • Go 1.11+

Usage

Import the client andconfig library:

import "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client"
import "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config"

Import the log library for log if needed:

import "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log"

Create a Consumer by parsing address:

cfg, err := config.ParseAddress("127.0.0.1:8099?topic=test_1&group=test_group")
if err != nil {
	fmt.Errorf("Failed to parse address %s", err.Error())
	panic(err)
}
c, err := client.NewConsumer(cfg)
if err != nil {
	fmt.Errorf("new consumer error %s", err.Error())
	panic(err)
}

defer c.Close()

cr, err := c.GetMessage()
// need to confirm by yourself.
_, err = c.Confirm(cr.ConfirmContext, true)

for _, msg := range cr.Messages {
	fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID, string(msg.Data))
}

Create a Consumer by constructing a config:

topicFilters := map[string][]string{"topic1": {"filter1", "filter2"}, "topic2": {"filter3", "filter4"}}
partitionOffset := map[string]int64{"181895251:topic1:1": 0, "181895251:topic2:2": 10}
cfg := config.New(config.WithMasters("127.0.0.1:8099"),
config.WithGroup("group"),
//For topic filters
config.WithTopicFilters(topicFilters),
// For bound consume
config.WithBoundConsume("ss", 1, true, partitionOffset))
c, err := client.NewConsumer(cfg)
if err != nil {
	fmt.Errorf("new consumer error %s", err.Error())
	panic(err)
}

defer c.Close()

cr, err := c.GetMessage()
// need to confirm by yourself.
_, err = c.Confirm(cr.ConfirmContext, true)

for _, msg := range cr.Messages {
	fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID, string(msg.Data))
}

Example

Multiple Goroutines consumption is also supported. More specific examples can be referred in Go Client Examples.