feat(protocol): unify name of request header (#284)
- unify request header format: XXXRequestHeader
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 55fe08f..f6a0bcd 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -828,7 +828,7 @@
queue.BrokerName, brokerResult.BrokerVersion, data.ExpType)
}
- pullRequest := &internal.PullMessageRequest{
+ pullRequest := &internal.PullMessageRequestHeader{
ConsumerGroup: dc.consumerGroup,
Topic: queue.Topic,
QueueId: int32(queue.QueueId),
@@ -898,7 +898,7 @@
}
if brokerAddr != "" {
- req := &internal.GetConsumerList{
+ req := &internal.GetConsumerListRequestHeader{
ConsumerGroup: dc.consumerGroup,
}
cmd := remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil)
@@ -937,7 +937,7 @@
return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
}
- request := &internal.GetMaxOffsetRequest{
+ request := &internal.GetMaxOffsetRequestHeader{
Topic: mq.Topic,
QueueId: mq.QueueId,
}
@@ -966,7 +966,7 @@
return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
}
- request := &internal.SearchOffsetRequest{
+ request := &internal.SearchOffsetRequestHeader{
Topic: mq.Topic,
QueueId: mq.QueueId,
Timestamp: timestamp,
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 2db940c..e86ec3b 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -330,7 +330,7 @@
if broker == "" {
return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
}
- queryOffsetRequest := &internal.QueryConsumerOffsetRequest{
+ queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
ConsumerGroup: group,
Topic: mq.Topic,
QueueId: mq.QueueId,
@@ -363,7 +363,7 @@
return fmt.Errorf("broker: %s address not found", mq.BrokerName)
}
- updateOffsetRequest := &internal.UpdateConsumerOffsetRequest{
+ updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
ConsumerGroup: group,
Topic: mq.Topic,
QueueId: mq.QueueId,
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d75f7da..600f4ed 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -500,7 +500,7 @@
sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)
- pullRequest := &internal.PullMessageRequest{
+ pullRequest := &internal.PullMessageRequestHeader{
ConsumerGroup: pc.consumerGroup,
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
@@ -616,7 +616,7 @@
}
func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLevel int) *remote.RemotingCommand {
- req := &internal.ConsumerSendMsgBackRequest{
+ req := &internal.ConsumerSendMsgBackRequestHeader{
Group: pc.consumerGroup,
OriginTopic: msg.Topic,
Offset: msg.CommitLogOffset,
diff --git a/internal/client.go b/internal/client.go
index e9cc59a..ee77b97 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -143,8 +143,8 @@
RegisterConsumer(group string, consumer InnerConsumer) error
UnregisterConsumer(group string)
- PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error)
- PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *primitive.PullResult)) error
+ PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error)
+ PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error
RebalanceImmediately()
UpdatePublishInfo(topic string, data *TopicRouteData)
}
@@ -465,7 +465,7 @@
}
// PullMessage with sync
-func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) {
+func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 10*time.Second)
if err != nil {
@@ -520,7 +520,7 @@
}
// PullMessageAsync pull message async
-func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *primitive.PullResult)) error {
+func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error {
return nil
}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index a216e93..730c073 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -363,7 +363,7 @@
}
// PullMessage mocks base method
-func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) {
+func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) {
ret := m.ctrl.Call(m, "PullMessage", ctx, brokerAddrs, request)
ret0, _ := ret[0].(*primitive.PullResult)
ret1, _ := ret[1].(error)
@@ -376,7 +376,7 @@
}
// PullMessageAsync mocks base method
-func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(*primitive.PullResult)) error {
+func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(*primitive.PullResult)) error {
ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
ret0, _ := ret[0].(error)
return ret0
diff --git a/internal/request.go b/internal/request.go
index 3dce875..50af13a 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -45,7 +45,7 @@
ReqConsumeMessageDirectly = int16(309)
)
-type SendMessageRequest struct {
+type SendMessageRequestHeader struct {
ProducerGroup string `json:"producerGroup"`
Topic string `json:"topic"`
QueueId int `json:"queueId"`
@@ -59,7 +59,7 @@
Batch bool
}
-func (request *SendMessageRequest) Encode() map[string]string {
+func (request *SendMessageRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["producerGroup"] = request.ProducerGroup
maps["topic"] = request.Topic
@@ -78,7 +78,7 @@
return maps
}
-func (request *SendMessageRequest) Decode(properties map[string]string) error {
+func (request *SendMessageRequestHeader) Decode(properties map[string]string) error {
return nil
}
@@ -144,7 +144,7 @@
}
}
-type ConsumerSendMsgBackRequest struct {
+type ConsumerSendMsgBackRequestHeader struct {
Group string `json:"group"`
Offset int64 `json:"offset"`
DelayLevel int `json:"delayLevel"`
@@ -154,7 +154,7 @@
MaxReconsumeTimes int32 `json:"maxReconsumeTimes"`
}
-func (request *ConsumerSendMsgBackRequest) Encode() map[string]string {
+func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["group"] = request.Group
maps["offset"] = strconv.FormatInt(request.Offset, 10)
@@ -167,7 +167,7 @@
return maps
}
-type PullMessageRequest struct {
+type PullMessageRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
QueueId int32 `json:"queueId"`
@@ -181,7 +181,7 @@
ExpressionType string `json:"expressionType"`
}
-func (request *PullMessageRequest) Encode() map[string]string {
+func (request *PullMessageRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.ConsumerGroup
maps["topic"] = request.Topic
@@ -197,35 +197,35 @@
return maps
}
-type GetConsumerList struct {
+type GetConsumerListRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
}
-func (request *GetConsumerList) Encode() map[string]string {
+func (request *GetConsumerListRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.ConsumerGroup
return maps
}
-type GetMaxOffsetRequest struct {
+type GetMaxOffsetRequestHeader struct {
Topic string `json:"topic"`
QueueId int `json:"queueId"`
}
-func (request *GetMaxOffsetRequest) Encode() map[string]string {
+func (request *GetMaxOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["topic"] = request.Topic
maps["queueId"] = strconv.Itoa(request.QueueId)
return maps
}
-type QueryConsumerOffsetRequest struct {
+type QueryConsumerOffsetRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
QueueId int `json:"queueId"`
}
-func (request *QueryConsumerOffsetRequest) Encode() map[string]string {
+func (request *QueryConsumerOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.ConsumerGroup
maps["topic"] = request.Topic
@@ -233,13 +233,13 @@
return maps
}
-type SearchOffsetRequest struct {
+type SearchOffsetRequestHeader struct {
Topic string `json:"topic"`
QueueId int `json:"queueId"`
Timestamp int64 `json:"timestamp"`
}
-func (request *SearchOffsetRequest) Encode() map[string]string {
+func (request *SearchOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["Topic"] = request.Topic
maps["QueueId"] = strconv.Itoa(request.QueueId)
@@ -247,14 +247,14 @@
return maps
}
-type UpdateConsumerOffsetRequest struct {
+type UpdateConsumerOffsetRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
QueueId int `json:"queueId"`
CommitOffset int64 `json:"commitOffset"`
}
-func (request *UpdateConsumerOffsetRequest) Encode() map[string]string {
+func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.ConsumerGroup
maps["topic"] = request.Topic
@@ -263,16 +263,16 @@
return maps
}
-type GetRouteInfoRequest struct {
+type GetRouteInfoRequestHeader struct {
Topic string `json:"topic"`
}
-func (request *GetRouteInfoRequest) Encode() map[string]string {
+func (request *GetRouteInfoRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["topic"] = request.Topic
return maps
}
-func (request *GetRouteInfoRequest) Decode(properties map[string]string) error {
+func (request *GetRouteInfoRequestHeader) Decode(properties map[string]string) error {
return nil
}
diff --git a/internal/route.go b/internal/route.go
index 39551be..716a2f7 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -310,7 +310,7 @@
}
func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
- request := &GetRouteInfoRequest{
+ request := &GetRouteInfoRequestHeader{
Topic: topic,
}
diff --git a/internal/trace.go b/internal/trace.go
index 1392e77..00074ee 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -440,7 +440,7 @@
func (td *traceDispatcher) buildSendRequest(mq *primitive.MessageQueue,
msg *primitive.Message) *remote.RemotingCommand {
- req := &SendMessageRequest{
+ req := &SendMessageRequestHeader{
ProducerGroup: TraceGroupName,
Topic: mq.Topic,
QueueId: mq.QueueId,
diff --git a/producer/producer.go b/producer/producer.go
index f0a995b..78cd8d9 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -275,7 +275,7 @@
}
}
- req := &internal.SendMessageRequest{
+ req := &internal.SendMessageRequestHeader{
ProducerGroup: p.group,
Topic: mq.Topic,
QueueId: mq.QueueId,