require ( github.com/apache/rocketmq-client-go/v2 v2.1.0-rc3 )
Go Client define the Logger
interface for log output, user can specify implementation of private. in default, client use logrus
.
rlog.SetLogger(Logger)
Producer interface { Start() error Shutdown() error SendSync(context.Context, *primitive.Message) (*internal.SendResult, error) SendOneWay(context.Context, *primitive.Message) error }
Producer
instancep, err := rocketmq.NewProducer( producer.WithNameServer(endPoint), //producer.WithNsResolver(primitive.NewPassthroughResolver(endPoint)), producer.WithRetry(2), producer.WithGroupName("GID_xxxxxx"), )
err := p.Start()
result, err := p.SendSync(context.Background(), &primitive.Message{ Topic: "test", Body: []byte("Hello RocketMQ Go Client!"), }) // do something with result
err := p.SendOneWay(context.Background(), &primitive.Message{ Topic: "test", Body: []byte("Hello RocketMQ Go Client!"), })
Full examples: producer
now only support PushConsumer
PushConsumer interface { // Start the PullConsumer for consuming message Start() error // Shutdown the PullConsumer, all offset of MessageQueue will be sync to broker before process exit Shutdown() error // Subscribe a topic for consuming Subscribe(topic string, selector consumer.MessageSelector, f func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error }
PushConsumer
instancec, err := rocketmq.NewPushConsumer( consumer.WithNameServer(endPoint), consumer.WithConsumerModel(consumer.Clustering), consumer.WithGroupName("GID_XXXXXX"), )
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext, msgs []*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil })
err = c.Start()
Full examples: consumer
testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"}))) err = testAdmin.CreateTopic( context.Background(), admin.WithTopicCreate("newTopic"), admin.WithBrokerAddrCreate("127.0.0.1:10911"), )
ClusterName
not supported yeterr = testAdmin.DeleteTopic( context.Background(), admin.WithTopicDelete("newTopic"), //admin.WithBrokerAddrDelete("127.0.0.1:10911"), //optional //admin.WithNameSrvAddr(nameSrvAddr), //optional )