How to use

go mod

require (
    github.com/apache/rocketmq-client-go/v2 v2.1.0-rc3
)

Set Logger

Go Client define the Logger interface for log output, user can specify implementation of private. in default, client use logrus.

rlog.SetLogger(Logger)

Send message

Interface

Producer interface {
	Start() error
	Shutdown() error
	SendSync(context.Context, *primitive.Message) (*internal.SendResult, error)
	SendOneWay(context.Context, *primitive.Message) error
}

Examples

  • create a new Producer instance
p, err := rocketmq.NewProducer(
		producer.WithNameServer(endPoint),
		//producer.WithNsResolver(primitive.NewPassthroughResolver(endPoint)),
		producer.WithRetry(2),
		producer.WithGroupName("GID_xxxxxx"),
	)
  • start the producer
err := p.Start()
  • send message with sync
result, err := p.SendSync(context.Background(), &primitive.Message{
    Topic: "test",
    Body:  []byte("Hello RocketMQ Go Client!"),
})

// do something with result
  • or send message with oneway
err := p.SendOneWay(context.Background(), &primitive.Message{
    Topic: "test",
    Body:  []byte("Hello RocketMQ Go Client!"),
})

Full examples: producer

Consume Message

now only support PushConsumer

Interface

PushConsumer interface {
	// Start the PullConsumer for consuming message
	Start() error

	// Shutdown the PullConsumer, all offset of MessageQueue will be sync to broker before process exit
	Shutdown() error
	// Subscribe a topic for consuming
	Subscribe(topic string, selector consumer.MessageSelector,
		f func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error
}

Usage

  • Create a PushConsumer instance
c, err := rocketmq.NewPushConsumer(
		consumer.WithNameServer(endPoint),
        consumer.WithConsumerModel(consumer.Clustering),
		consumer.WithGroupName("GID_XXXXXX"),
	)
  • Subscribe a topic(only support one topic now), and define your consuming function
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
    msgs []*primitive.MessageExt) (consumer.ConsumeResult, error) {
    fmt.Printf("subscribe callback: %v \n", msgs)
    return consumer.ConsumeSuccess, nil
})
  • start the consumer(NOTE: MUST after subscribe)
err = c.Start()

Full examples: consumer

Admin: Topic Operation

Examples

  • create topic
testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})))
err = testAdmin.CreateTopic(
	context.Background(),
	admin.WithTopicCreate("newTopic"),
	admin.WithBrokerAddrCreate("127.0.0.1:10911"),
)
  • delete topic ClusterName not supported yet
err = testAdmin.DeleteTopic(
	context.Background(),
	admin.WithTopicDelete("newTopic"),
	//admin.WithBrokerAddrDelete("127.0.0.1:10911"),	//optional
	//admin.WithNameSrvAddr(nameSrvAddr),				//optional
)