blob: a4e954e8b65705b40868cd1363da79e8909fb0e5 [file] [view]
## How to use
### go mod
```
require (
github.com/apache/rocketmq-client-go/v2 v2.1.0-rc3
)
```
### Set Logger
Go Client define the `Logger` interface for log output, user can specify implementation of private.
in default, client use `logrus`.
```
rlog.SetLogger(Logger)
```
### Send message
#### Interface
```
Producer interface {
Start() error
Shutdown() error
SendSync(context.Context, *primitive.Message) (*internal.SendResult, error)
SendOneWay(context.Context, *primitive.Message) error
}
```
#### Examples
- create a new `Producer` instance
```
p, err := rocketmq.NewProducer(
producer.WithNameServer(endPoint),
//producer.WithNsResolver(primitive.NewPassthroughResolver(endPoint)),
producer.WithRetry(2),
producer.WithGroupName("GID_xxxxxx"),
)
```
- start the producer
```go
err := p.Start()
```
- send message with sync
```
result, err := p.SendSync(context.Background(), &primitive.Message{
Topic: "test",
Body: []byte("Hello RocketMQ Go Client!"),
})
// do something with result
```
- or send message with oneway
```
err := p.SendOneWay(context.Background(), &primitive.Message{
Topic: "test",
Body: []byte("Hello RocketMQ Go Client!"),
})
```
Full examples: [producer](../examples/producer)
### Consume Message
now only support `PushConsumer`
#### Interface
```
PushConsumer interface {
// Start the PullConsumer for consuming message
Start() error
// Shutdown the PullConsumer, all offset of MessageQueue will be sync to broker before process exit
Shutdown() error
// Subscribe a topic for consuming
Subscribe(topic string, selector consumer.MessageSelector,
f func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error
}
```
#### Usage
- Create a `PushConsumer` instance
```
c, err := rocketmq.NewPushConsumer(
consumer.WithNameServer(endPoint),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithGroupName("GID_XXXXXX"),
)
```
- Subscribe a topic(only support one topic now), and define your consuming function
```
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
msgs []*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Printf("subscribe callback: %v \n", msgs)
return consumer.ConsumeSuccess, nil
})
```
- start the consumer(**NOTE: MUST after subscribe**)
```
err = c.Start()
```
Full examples: [consumer](../examples/consumer)
### Admin: Topic Operation
#### Examples
- create topic
```
testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})))
err = testAdmin.CreateTopic(
context.Background(),
admin.WithTopicCreate("newTopic"),
admin.WithBrokerAddrCreate("127.0.0.1:10911"),
)
```
- delete topic
`ClusterName` not supported yet
```
err = testAdmin.DeleteTopic(
context.Background(),
admin.WithTopicDelete("newTopic"),
//admin.WithBrokerAddrDelete("127.0.0.1:10911"), //optional
//admin.WithNameSrvAddr(nameSrvAddr), //optional
)
```