blob: 479caa8b7761a13139cf32cb8bc3e8311369c027 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Tests
{
using System;
using System.IO;
using System.Linq;
using System.Text;
using Kafka.Client.Messages;
using Kafka.Client.Utils;
using NUnit.Framework;
/// <summary>
/// Tests for the <see cref="Message"/> class.
/// </summary>
[TestFixture]
public class MessageTests
{
private readonly int ChecksumPartLength = 4;
private readonly int MagicNumberPartOffset = 0;
private readonly int ChecksumPartOffset = 2;
private readonly int DataPartOffset = 6;
/// <summary>
/// Demonstrates a properly parsed message.
/// </summary>
[Test]
public void ParseFromValid()
{
Crc32Hasher crc32 = new Crc32Hasher();
string payload = "kafka";
byte magic = 1;
byte attributes = 0;
byte[] payloadData = Encoding.UTF8.GetBytes(payload);
byte[] payloadSize = BitConverter.GetBytes(payloadData.Length);
byte[] checksum = crc32.ComputeHash(payloadData);
byte[] messageData = new byte[payloadData.Length + 2 + payloadSize.Length + checksum.Length];
Buffer.BlockCopy(payloadSize, 0, messageData, 0, payloadSize.Length);
messageData[4] = magic;
messageData[5] = attributes;
Buffer.BlockCopy(checksum, 0, messageData, payloadSize.Length + 2, checksum.Length);
Buffer.BlockCopy(payloadData, 0, messageData, payloadSize.Length + 2 + checksum.Length, payloadData.Length);
Message message = Message.ParseFrom(messageData);
Assert.IsNotNull(message);
Assert.AreEqual(magic, message.Magic);
Assert.IsTrue(payloadData.SequenceEqual(message.Payload));
Assert.IsTrue(checksum.SequenceEqual(message.Checksum));
}
/// <summary>
/// Ensure that the bytes returned from the message are in valid kafka sequence.
/// </summary>
[Test]
public void GetBytesValidSequence()
{
Message message = new Message(new byte[10], CompressionCodecs.NoCompressionCodec);
MemoryStream ms = new MemoryStream();
message.WriteTo(ms);
// len(payload) + 1 + 4
Assert.AreEqual(16, ms.Length);
// first 4 bytes = the magic number
Assert.AreEqual((byte)1, ms.ToArray()[0]);
// attributes
Assert.AreEqual((byte)0, ms.ToArray()[1]);
// next 4 bytes = the checksum
Assert.IsTrue(message.Checksum.SequenceEqual(ms.ToArray().Skip(2).Take(4).ToArray<byte>()));
// remaining bytes = the payload
Assert.AreEqual(10, ms.ToArray().Skip(6).ToArray<byte>().Length);
}
[Test]
public void WriteToValidSequenceForDefaultConstructor()
{
byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
Message message = new Message(messageBytes);
MemoryStream ms = new MemoryStream();
message.WriteTo(ms);
Assert.AreEqual(1, ms.ToArray()[MagicNumberPartOffset]); // default magic number should be 1
byte[] checksumPart = new byte[ChecksumPartLength];
Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
message.ToString();
byte[] dataPart = new byte[messageBytes.Length];
Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
Assert.AreEqual(messageBytes, dataPart);
}
[Test]
public void WriteToValidSequenceForCustomConstructor()
{
byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
byte[] customChecksum = new byte[] { 3, 4, 5, 6 };
Message message = new Message(messageBytes, customChecksum);
MemoryStream ms = new MemoryStream();
message.WriteTo(ms);
Assert.AreEqual((byte)1, ms.ToArray()[MagicNumberPartOffset]);
byte[] checksumPart = new byte[ChecksumPartLength];
Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
Assert.AreEqual(customChecksum, checksumPart);
byte[] dataPart = new byte[messageBytes.Length];
Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
Assert.AreEqual(messageBytes, dataPart);
}
}
}