blob: b43335a57301eb634250b2c1483d0baa7535075f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestConvertStringMap(t *testing.T) {
m := make(map[string]string)
m["a"] = "1"
m["b"] = "2"
pbm := ConvertFromStringMap(m)
assert.Equal(t, 2, len(pbm))
m2 := ConvertToStringMap(pbm)
assert.Equal(t, 2, len(m2))
assert.Equal(t, "1", m2["a"])
assert.Equal(t, "2", m2["b"])
}
func TestReadMessageMetadata(t *testing.T) {
// read old style message (not batched)
reader := NewMessageReaderFromArray(rawCompatSingleMessage)
meta, err := reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
}
props := meta.GetProperties()
assert.Equal(t, len(props), 2)
assert.Equal(t, "a", props[0].GetKey())
assert.Equal(t, "1", props[0].GetValue())
assert.Equal(t, "b", props[1].GetKey())
assert.Equal(t, "2", props[1].GetValue())
// read message with batch of 1
reader = NewMessageReaderFromArray(rawBatchMessage1)
meta, err = reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, int(meta.GetNumMessagesInBatch()))
// read message with batch of 10
reader = NewMessageReaderFromArray(rawBatchMessage10)
meta, err = reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
}
func TestReadMessageOldFormat(t *testing.T) {
reader := NewMessageReaderFromArray(rawCompatSingleMessage)
_, err := reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
}
ssm, payload, err := reader.ReadMessage()
if err != nil {
t.Fatal(err)
}
// old message format does not have a single message metadata
assert.Equal(t, true, ssm == nil)
assert.Equal(t, "hello", string(payload))
_, _, err = reader.ReadMessage()
assert.Equal(t, ErrEOM, err)
}
func TestReadMessagesBatchSize1(t *testing.T) {
reader := NewMessageReaderFromArray(rawBatchMessage1)
meta, err := reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, int(meta.GetNumMessagesInBatch()))
for i := 0; i < int(meta.GetNumMessagesInBatch()); i++ {
ssm, payload, err := reader.ReadMessage()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, true, ssm != nil)
assert.Equal(t, "hello", string(payload))
}
_, _, err = reader.ReadMessage()
assert.Equal(t, ErrEOM, err)
}
func TestReadMessagesBatchSize10(t *testing.T) {
reader := NewMessageReaderFromArray(rawBatchMessage10)
meta, err := reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
for i := 0; i < int(meta.GetNumMessagesInBatch()); i++ {
ssm, payload, err := reader.ReadMessage()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, true, ssm != nil)
assert.Equal(t, "hello", string(payload))
}
_, _, err = reader.ReadMessage()
assert.Equal(t, ErrEOM, err)
}
// Raw single message in old format
// metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
// payload = "hello"
var rawCompatSingleMessage = []byte{
0x0e, 0x01, 0x08, 0x36, 0xb4, 0x66, 0x00, 0x00,
0x00, 0x31, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e,
0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37,
0x34, 0x2d, 0x30, 0x10, 0x00, 0x18, 0xac, 0xef,
0xe8, 0xa0, 0xe2, 0x2d, 0x22, 0x06, 0x0a, 0x01,
0x61, 0x12, 0x01, 0x31, 0x22, 0x06, 0x0a, 0x01,
0x62, 0x12, 0x01, 0x32, 0x48, 0x05, 0x60, 0x05,
0x82, 0x01, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
}
// Message with batch of 1
// singe message metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
// payload = "hello"
var rawBatchMessage1 = []byte{
0x0e, 0x01, 0x1f, 0x80, 0x09, 0x68, 0x00, 0x00,
0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e,
0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37,
0x34, 0x2d, 0x31, 0x10, 0x00, 0x18, 0xdb, 0x80,
0xf4, 0xa0, 0xe2, 0x2d, 0x58, 0x01, 0x82, 0x01,
0x00, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a,
0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a,
0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28,
0x05, 0x40, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
}
// Message with batch of 10
// singe message metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
// payload = "hello"
var rawBatchMessage10 = []byte{
0x0e, 0x01, 0x7b, 0x28, 0x8c, 0x08,
0x00, 0x00, 0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74,
0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65,
0x2d, 0x37, 0x34, 0x2d, 0x32, 0x10, 0x00, 0x18,
0xd0, 0xc2, 0xfa, 0xa0, 0xe2, 0x2d, 0x58, 0x0a,
0x82, 0x01, 0x00, 0x00, 0x00, 0x00, 0x16, 0x0a,
0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a,
0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18,
0x05, 0x28, 0x05, 0x40, 0x00, 0x68, 0x65, 0x6c,
0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06,
0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06,
0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05,
0x28, 0x05, 0x40, 0x01, 0x68, 0x65, 0x6c, 0x6c,
0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a,
0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a,
0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28,
0x05, 0x40, 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01,
0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01,
0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05,
0x40, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00,
0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61,
0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62,
0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40,
0x04, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00,
0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12,
0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12,
0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x05,
0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00,
0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01,
0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01,
0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x06, 0x68,
0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16,
0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31,
0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32,
0x18, 0x05, 0x28, 0x05, 0x40, 0x07, 0x68, 0x65,
0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a,
0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a,
0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18,
0x05, 0x28, 0x05, 0x40, 0x08, 0x68, 0x65, 0x6c,
0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06,
0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06,
0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05,
0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
0x6f,
}