blob: 05944bce22299219c32ba5f35e1e9d1635188f79 [file] [log] [blame]
/*
* Copyright (c) 2011 NeuStar, Inc.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* NeuStar, the Neustar logo and related names and logos are registered
* trademarks, service marks or tradenames of NeuStar, Inc. All other
* product names, company names, marks, logos and symbols may be trademarks
* of their respective owners.
*/
package kafka
import (
"testing"
//"fmt"
"bytes"
"compress/gzip"
)
func TestMessageCreation(t *testing.T) {
payload := []byte("testing")
msg := NewMessage(payload)
if msg.magic != 1 {
t.Errorf("magic incorrect")
t.Fail()
}
// generated by kafka-rb: e8 f3 5a 06
expected := []byte{0xe8, 0xf3, 0x5a, 0x06}
if !bytes.Equal(expected, msg.checksum[:]) {
t.Fail()
}
}
func TestMagic0MessageEncoding(t *testing.T) {
// generated by kafka-rb:
// test the old message format
expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
length, msgsDecoded := Decode(expected, DefaultCodecsMap)
if length == 0 || msgsDecoded == nil {
t.Fail()
}
msgDecoded := msgsDecoded[0]
payload := []byte("testing")
if !bytes.Equal(payload, msgDecoded.payload) {
t.Fatal("bytes not equal")
}
chksum := []byte{0xE8, 0xF3, 0x5A, 0x06}
if !bytes.Equal(chksum, msgDecoded.checksum[:]) {
t.Fatal("checksums do not match")
}
if msgDecoded.magic != 0 {
t.Fatal("magic incorrect")
}
}
func TestMessageEncoding(t *testing.T) {
payload := []byte("testing")
msg := NewMessage(payload)
// generated by kafka-rb:
expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
if !bytes.Equal(expected, msg.Encode()) {
t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode())
}
// verify round trip
length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode())
if length == 0 || msgsDecoded == nil {
t.Fatal("message is nil")
}
msgDecoded := msgsDecoded[0]
if !bytes.Equal(msgDecoded.payload, payload) {
t.Fatal("bytes not equal")
}
chksum := []byte{0xE8, 0xF3, 0x5A, 0x06}
if !bytes.Equal(chksum, msgDecoded.checksum[:]) {
t.Fatal("checksums do not match")
}
if msgDecoded.magic != 1 {
t.Fatal("magic incorrect")
}
}
func TestCompressedMessageEncodingCompare(t *testing.T) {
payload := []byte("testing")
uncompressedMsgBytes := NewMessage(payload).Encode()
msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode()
msgDefaultBytes := NewCompressedMessage(payload).Encode()
if !bytes.Equal(msgDefaultBytes, msgGzipBytes) {
t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes)
}
}
func TestCompressedMessageEncoding(t *testing.T) {
payload := []byte("testing")
uncompressedMsgBytes := NewMessage(payload).Encode()
msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID])
expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04,
0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A,
0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00,
0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00}
expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76}
expected := make([]byte, len(expectedHeader)+len(expectedPayload))
n := copy(expected, expectedHeader)
copy(expected[n:], expectedPayload)
if msg.compression != 1 {
t.Fatalf("expected compression: 1 but got: %b", msg.compression)
}
zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload))
uncompressed := make([]byte, 100)
n, _ = zipper.Read(uncompressed)
uncompressed = uncompressed[:n]
zipper.Close()
if !bytes.Equal(uncompressed, uncompressedMsgBytes) {
t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes)
}
if !bytes.Equal(expected, msg.Encode()) {
t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode())
}
// verify round trip
length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap)
if length == 0 || msgsDecoded == nil {
t.Fatal("message is nil")
}
msgDecoded := msgsDecoded[0]
if !bytes.Equal(msgDecoded.payload, payload) {
t.Fatal("bytes not equal")
}
chksum := []byte{0xE8, 0xF3, 0x5A, 0x06}
if !bytes.Equal(chksum, msgDecoded.checksum[:]) {
t.Fatalf("checksums do not match, expected: % X but was: % X",
chksum, msgDecoded.checksum[:])
}
if msgDecoded.magic != 1 {
t.Fatal("magic incorrect")
}
}
func TestLongCompressedMessageRoundTrip(t *testing.T) {
payloadBuf := bytes.NewBuffer([]byte{})
// make the test bigger than buffer allocated in the Decode
for i := 0; i < 15; i++ {
payloadBuf.Write([]byte("testing123 "))
}
uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode()
msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID])
zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload))
uncompressed := make([]byte, 200)
n, _ := zipper.Read(uncompressed)
uncompressed = uncompressed[:n]
zipper.Close()
if !bytes.Equal(uncompressed, uncompressedMsgBytes) {
t.Fatalf("uncompressed: % X \npayload: % X bytes not equal",
uncompressed, uncompressedMsgBytes)
}
// verify round trip
length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap)
if length == 0 || msgsDecoded == nil {
t.Fatal("message is nil")
}
msgDecoded := msgsDecoded[0]
if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) {
t.Fatal("bytes not equal")
}
if msgDecoded.magic != 1 {
t.Fatal("magic incorrect")
}
}
func TestMultipleCompressedMessages(t *testing.T) {
msgs := []*Message{NewMessage([]byte("testing")),
NewMessage([]byte("multiple")),
NewMessage([]byte("messages")),
}
msg := NewCompressedMessages(msgs...)
length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode())
if length == 0 || msgsDecoded == nil {
t.Fatal("msgsDecoded is nil")
}
// make sure the decompressed messages match what was put in
for index, decodedMsg := range msgsDecoded {
if !bytes.Equal(msgs[index].payload, decodedMsg.payload) {
t.Fatalf("Payload doesn't match, expected: % X but was: % X\n",
msgs[index].payload, decodedMsg.payload)
}
}
}
func TestRequestHeaderEncoding(t *testing.T) {
broker := newBroker("localhost:9092", "test", 0)
request := broker.EncodeRequestHeader(REQUEST_PRODUCE)
// generated by kafka-rb:
expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74,
0x00, 0x00, 0x00, 0x00}
if !bytes.Equal(expected, request.Bytes()) {
t.Errorf("expected length: %d but got: %d", len(expected), len(request.Bytes()))
t.Errorf("expected: %X\n but got: %X", expected, request)
t.Fail()
}
}
func TestPublishRequestEncoding(t *testing.T) {
payload := []byte("testing")
msg := NewMessage(payload)
pubBroker := NewBrokerPublisher("localhost:9092", "test", 0)
request := pubBroker.broker.EncodePublishRequest(msg)
// generated by kafka-rb:
expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d,
/* magic comp ...... chksum .... .. payload .. */
0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
if !bytes.Equal(expected, request) {
t.Errorf("expected length: %d but got: %d", len(expected), len(request))
t.Errorf("expected: % X\n but got: % X", expected, request)
t.Fail()
}
}
func TestConsumeRequestEncoding(t *testing.T) {
pubBroker := NewBrokerPublisher("localhost:9092", "test", 0)
request := pubBroker.broker.EncodeConsumeRequest(0, 1048576)
// generated by kafka-rb, encode_request_size + encode_request
expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74,
0x65, 0x73, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00}
if !bytes.Equal(expected, request) {
t.Errorf("expected length: %d but got: %d", len(expected), len(request))
t.Errorf("expected: % X\n but got: % X", expected, request)
t.Fail()
}
}