| /* |
| * Copyright (c) 2011 NeuStar, Inc. |
| * All rights reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| * NeuStar, the Neustar logo and related names and logos are registered |
| * trademarks, service marks or tradenames of NeuStar, Inc. All other |
| * product names, company names, marks, logos and symbols may be trademarks |
| * of their respective owners. |
| */ |
| |
| package kafka |
| |
| import ( |
| "testing" |
| //"fmt" |
| "bytes" |
| "compress/gzip" |
| ) |
| |
| func TestMessageCreation(t *testing.T) { |
| payload := []byte("testing") |
| msg := NewMessage(payload) |
| if msg.magic != 1 { |
| t.Errorf("magic incorrect") |
| t.Fail() |
| } |
| |
| // generated by kafka-rb: e8 f3 5a 06 |
| expected := []byte{0xe8, 0xf3, 0x5a, 0x06} |
| if !bytes.Equal(expected, msg.checksum[:]) { |
| t.Fail() |
| } |
| } |
| |
| func TestMagic0MessageEncoding(t *testing.T) { |
| // generated by kafka-rb: |
| // test the old message format |
| expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} |
| length, msgsDecoded := Decode(expected, DefaultCodecsMap) |
| |
| if length == 0 || msgsDecoded == nil { |
| t.Fail() |
| } |
| msgDecoded := msgsDecoded[0] |
| |
| payload := []byte("testing") |
| if !bytes.Equal(payload, msgDecoded.payload) { |
| t.Fatal("bytes not equal") |
| } |
| chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} |
| if !bytes.Equal(chksum, msgDecoded.checksum[:]) { |
| t.Fatal("checksums do not match") |
| } |
| if msgDecoded.magic != 0 { |
| t.Fatal("magic incorrect") |
| } |
| } |
| |
| func TestMessageEncoding(t *testing.T) { |
| |
| payload := []byte("testing") |
| msg := NewMessage(payload) |
| |
| // generated by kafka-rb: |
| expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} |
| if !bytes.Equal(expected, msg.Encode()) { |
| t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) |
| } |
| |
| // verify round trip |
| length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) |
| |
| if length == 0 || msgsDecoded == nil { |
| t.Fatal("message is nil") |
| } |
| msgDecoded := msgsDecoded[0] |
| |
| if !bytes.Equal(msgDecoded.payload, payload) { |
| t.Fatal("bytes not equal") |
| } |
| chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} |
| if !bytes.Equal(chksum, msgDecoded.checksum[:]) { |
| t.Fatal("checksums do not match") |
| } |
| if msgDecoded.magic != 1 { |
| t.Fatal("magic incorrect") |
| } |
| } |
| |
| func TestCompressedMessageEncodingCompare(t *testing.T) { |
| payload := []byte("testing") |
| uncompressedMsgBytes := NewMessage(payload).Encode() |
| |
| msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode() |
| msgDefaultBytes := NewCompressedMessage(payload).Encode() |
| if !bytes.Equal(msgDefaultBytes, msgGzipBytes) { |
| t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes) |
| } |
| } |
| |
| func TestCompressedMessageEncoding(t *testing.T) { |
| payload := []byte("testing") |
| uncompressedMsgBytes := NewMessage(payload).Encode() |
| |
| msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) |
| |
| expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, |
| 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, |
| 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, |
| 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} |
| |
| expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76} |
| |
| expected := make([]byte, len(expectedHeader)+len(expectedPayload)) |
| n := copy(expected, expectedHeader) |
| copy(expected[n:], expectedPayload) |
| |
| if msg.compression != 1 { |
| t.Fatalf("expected compression: 1 but got: %b", msg.compression) |
| } |
| |
| zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) |
| uncompressed := make([]byte, 100) |
| n, _ = zipper.Read(uncompressed) |
| uncompressed = uncompressed[:n] |
| zipper.Close() |
| |
| if !bytes.Equal(uncompressed, uncompressedMsgBytes) { |
| t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes) |
| } |
| |
| if !bytes.Equal(expected, msg.Encode()) { |
| t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) |
| } |
| |
| // verify round trip |
| length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) |
| |
| if length == 0 || msgsDecoded == nil { |
| t.Fatal("message is nil") |
| } |
| msgDecoded := msgsDecoded[0] |
| |
| if !bytes.Equal(msgDecoded.payload, payload) { |
| t.Fatal("bytes not equal") |
| } |
| chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} |
| if !bytes.Equal(chksum, msgDecoded.checksum[:]) { |
| t.Fatalf("checksums do not match, expected: % X but was: % X", |
| chksum, msgDecoded.checksum[:]) |
| } |
| if msgDecoded.magic != 1 { |
| t.Fatal("magic incorrect") |
| } |
| } |
| |
| func TestLongCompressedMessageRoundTrip(t *testing.T) { |
| payloadBuf := bytes.NewBuffer([]byte{}) |
| // make the test bigger than buffer allocated in the Decode |
| for i := 0; i < 15; i++ { |
| payloadBuf.Write([]byte("testing123 ")) |
| } |
| |
| uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode() |
| msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) |
| |
| zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) |
| uncompressed := make([]byte, 200) |
| n, _ := zipper.Read(uncompressed) |
| uncompressed = uncompressed[:n] |
| zipper.Close() |
| |
| if !bytes.Equal(uncompressed, uncompressedMsgBytes) { |
| t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", |
| uncompressed, uncompressedMsgBytes) |
| } |
| |
| // verify round trip |
| length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) |
| |
| if length == 0 || msgsDecoded == nil { |
| t.Fatal("message is nil") |
| } |
| msgDecoded := msgsDecoded[0] |
| |
| if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) { |
| t.Fatal("bytes not equal") |
| } |
| if msgDecoded.magic != 1 { |
| t.Fatal("magic incorrect") |
| } |
| } |
| |
| func TestMultipleCompressedMessages(t *testing.T) { |
| msgs := []*Message{NewMessage([]byte("testing")), |
| NewMessage([]byte("multiple")), |
| NewMessage([]byte("messages")), |
| } |
| msg := NewCompressedMessages(msgs...) |
| |
| length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) |
| if length == 0 || msgsDecoded == nil { |
| t.Fatal("msgsDecoded is nil") |
| } |
| |
| // make sure the decompressed messages match what was put in |
| for index, decodedMsg := range msgsDecoded { |
| if !bytes.Equal(msgs[index].payload, decodedMsg.payload) { |
| t.Fatalf("Payload doesn't match, expected: % X but was: % X\n", |
| msgs[index].payload, decodedMsg.payload) |
| } |
| } |
| } |
| |
| func TestRequestHeaderEncoding(t *testing.T) { |
| broker := newBroker("localhost:9092", "test", 0) |
| request := broker.EncodeRequestHeader(REQUEST_PRODUCE) |
| |
| // generated by kafka-rb: |
| expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, |
| 0x00, 0x00, 0x00, 0x00} |
| |
| if !bytes.Equal(expected, request.Bytes()) { |
| t.Errorf("expected length: %d but got: %d", len(expected), len(request.Bytes())) |
| t.Errorf("expected: %X\n but got: %X", expected, request) |
| t.Fail() |
| } |
| } |
| |
| func TestPublishRequestEncoding(t *testing.T) { |
| payload := []byte("testing") |
| msg := NewMessage(payload) |
| |
| pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) |
| request := pubBroker.broker.EncodePublishRequest(msg) |
| |
| // generated by kafka-rb: |
| expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d, |
| /* magic comp ...... chksum .... .. payload .. */ |
| 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} |
| |
| if !bytes.Equal(expected, request) { |
| t.Errorf("expected length: %d but got: %d", len(expected), len(request)) |
| t.Errorf("expected: % X\n but got: % X", expected, request) |
| t.Fail() |
| } |
| } |
| |
| func TestConsumeRequestEncoding(t *testing.T) { |
| |
| pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) |
| request := pubBroker.broker.EncodeConsumeRequest(0, 1048576) |
| |
| // generated by kafka-rb, encode_request_size + encode_request |
| expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74, |
| 0x65, 0x73, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00} |
| |
| if !bytes.Equal(expected, request) { |
| t.Errorf("expected length: %d but got: %d", len(expected), len(request)) |
| t.Errorf("expected: % X\n but got: % X", expected, request) |
| t.Fail() |
| } |
| } |