blob: 8ed0f97d98179067f8a0feb314d93a93476d7bea [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernel
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
)
// SendStatus of message
type SendStatus int
const (
SendOK SendStatus = iota
SendFlushDiskTimeout
SendFlushSlaveTimeout
SendSlaveNotAvailable
FlagCompressed = 0x1
MsgIdLength = 8 + 8
)
// SendResult RocketMQ send result
type SendResult struct {
Status SendStatus
MsgID string
MessageQueue *MessageQueue
QueueOffset int64
TransactionID string
OffsetMsgID string
RegionID string
TraceOn bool
}
// SendResult send message result to string(detail result)
func (result *SendResult) String() string {
return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s, offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
result.Status, result.MsgID, result.OffsetMsgID, result.QueueOffset, result.MessageQueue.String())
}
// PullStatus pull status
type PullStatus int
// predefined pull status
const (
PullFound PullStatus = iota
PullNoNewMsg
PullNoMsgMatched
PullOffsetIllegal
PullBrokerTimeout
)
// PullResult the pull result
type PullResult struct {
NextBeginOffset int64
MinOffset int64
MaxOffset int64
Status PullStatus
SuggestWhichBrokerId int64
messageExts []*MessageExt
}
func (result *PullResult) GetMessageExts() []*MessageExt {
return result.messageExts
}
func (result *PullResult) SetMessageExts(msgExts []*MessageExt) {
result.messageExts = msgExts
}
func (result *PullResult) GetMessages() []*Message {
if result.messageExts == nil || len(result.messageExts) == 0 {
return make([]*Message, 0)
}
return toMessages(result.messageExts)
}
func (result *PullResult) String() string {
return ""
}
func decodeMessage(data []byte) []*MessageExt {
msgs := make([]*MessageExt, 0)
buf := bytes.NewBuffer(data)
count := 0
for count < len(data) {
msg := &MessageExt{}
// 1. total size
binary.Read(buf, binary.BigEndian, &msg.StoreSize)
count += 4
// 2. magic code
buf.Next(4)
count += 4
// 3. body CRC32
binary.Read(buf, binary.BigEndian, &msg.BodyCRC)
count += 4
// 4. queueID
binary.Read(buf, binary.BigEndian, &msg.QueueId)
count += 4
// 5. Flag
binary.Read(buf, binary.BigEndian, &msg.Flag)
count += 4
// 6. QueueOffset
binary.Read(buf, binary.BigEndian, &msg.QueueOffset)
count += 8
// 7. physical offset
binary.Read(buf, binary.BigEndian, &msg.CommitLogOffset)
count += 8
// 8. SysFlag
binary.Read(buf, binary.BigEndian, &msg.SysFlag)
count += 4
// 9. BornTimestamp
binary.Read(buf, binary.BigEndian, &msg.BornTimestamp)
count += 8
// 10. born host
hostBytes := buf.Next(4)
var port int32
binary.Read(buf, binary.BigEndian, &port)
msg.BornHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
count += 8
// 11. store timestamp
binary.Read(buf, binary.BigEndian, &msg.StoreTimestamp)
count += 8
// 12. store host
hostBytes = buf.Next(4)
binary.Read(buf, binary.BigEndian, &port)
msg.StoreHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
count += 8
// 13. reconsume times
binary.Read(buf, binary.BigEndian, &msg.ReconsumeTimes)
count += 4
// 14. prepared transaction offset
binary.Read(buf, binary.BigEndian, &msg.PreparedTransactionOffset)
count += 8
// 15. body
var length int32
binary.Read(buf, binary.BigEndian, &length)
msg.Body = buf.Next(int(length))
if (msg.SysFlag & FlagCompressed) == FlagCompressed {
msg.Body = utils.UnCompress(msg.Body)
}
count += 4 + int(length)
// 16. topic
_byte, _ := buf.ReadByte()
msg.Topic = string(buf.Next(int(_byte)))
count += 1 + int(_byte)
var propertiesLength int16
binary.Read(buf, binary.BigEndian, &propertiesLength)
if propertiesLength > 0 {
msg.Properties = parseProperties(buf.Next(int(propertiesLength)))
}
count += 2 + int(propertiesLength)
msg.MsgId = createMessageId(hostBytes, msg.CommitLogOffset)
//count += 16
msgs = append(msgs, msg)
}
return msgs
}
func createMessageId(addr []byte, offset int64) string {
return "msgID" // TODO
}
func parseProperties(data []byte) map[string]string {
return make(map[string]string, 0)
}
func toMessages(messageExts []*MessageExt) []*Message {
msgs := make([]*Message, 0)
return msgs
}
// MessageQueue message queue
type MessageQueue struct {
Topic string `json:"topic"`
BrokerName string `json:"brokerName"`
QueueId int `json:"queueId"`
}
func (mq *MessageQueue) String() string {
return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
}
func (mq *MessageQueue) HashCode() int {
result := 1
result = 31*result + utils.HashString(mq.BrokerName)
result = 31*result + mq.QueueId
result = 31*result + utils.HashString(mq.Topic)
return result
}
func (mq *MessageQueue) Equals(queue *MessageQueue) bool {
// TODO
return true
}
type FindBrokerResult struct {
BrokerAddr string
Slave bool
BrokerVersion int32
}
type (
// groupName of consumer
producerData string
consumeType string
ServiceState int
)
const (
StateCreateJust ServiceState = iota
StateStartFailed
StateRunning
StateShutdown
)
type SubscriptionData struct {
ClassFilterMode bool
Topic string
SubString string
Tags map[string]bool
Codes map[int32]bool
SubVersion int64
ExpType string
}
type consumerData struct {
GroupName string `json:"groupName"`
CType consumeType `json:"consumeType"`
MessageModel string `json:"messageModel"`
Where string `json:"consumeFromWhere"`
SubscriptionDatas []*SubscriptionData `json:"subscriptionDataSet"`
UnitMode bool `json:"unitMode"`
}
type heartbeatData struct {
ClientId string `json:"clientID"`
ProducerDatas []producerData `json:"producerDataSet"`
ConsumerDatas []consumerData `json:"consumerDataSet"`
}
func (data *heartbeatData) encode() []byte {
d, err := json.Marshal(data)
if err != nil {
rlog.Errorf("marshal heartbeatData error: %s", err.Error())
return nil
}
rlog.Info(string(d))
return d
}