blob: 35068ba42ef5e4c4f78a308d502897b73c7c4388 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Tests
{
using System;
using System.Collections.Generic;
using System.IO;
using Kafka.Client.Messages;
using Kafka.Client.Utils;
using NUnit.Framework;
[TestFixture]
public class MessageSetTests
{
private const int MessageLengthPartLength = 4;
private const int MagicNumberPartLength = 1;
private const int AttributesPartLength = 1;
private const int ChecksumPartLength = 4;
private const int MessageLengthPartOffset = 0;
private const int MagicNumberPartOffset = 4;
private const int AttributesPartOffset = 5;
private const int ChecksumPartOffset = 6;
private const int DataPartOffset = 10;
[Test]
public void BufferedMessageSetWriteToValidSequence()
{
byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5 };
Message msg1 = new Message(messageBytes);
Message msg2 = new Message(messageBytes);
MessageSet messageSet = new BufferedMessageSet(new List<Message>() { msg1, msg2 });
MemoryStream ms = new MemoryStream();
messageSet.WriteTo(ms);
////first message
byte[] messageLength = new byte[MessageLengthPartLength];
Array.Copy(ms.ToArray(), MessageLengthPartOffset, messageLength, 0, MessageLengthPartLength);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(messageLength);
}
Assert.AreEqual(MagicNumberPartLength + AttributesPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
Assert.AreEqual(1, ms.ToArray()[MagicNumberPartOffset]); // default magic number should be 1
byte[] checksumPart = new byte[ChecksumPartLength];
Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
byte[] dataPart = new byte[messageBytes.Length];
Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
Assert.AreEqual(messageBytes, dataPart);
////second message
int secondMessageOffset = MessageLengthPartLength + MagicNumberPartLength + AttributesPartLength + ChecksumPartLength +
messageBytes.Length;
messageLength = new byte[MessageLengthPartLength];
Array.Copy(ms.ToArray(), secondMessageOffset + MessageLengthPartOffset, messageLength, 0, MessageLengthPartLength);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(messageLength);
}
Assert.AreEqual(MagicNumberPartLength + AttributesPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
Assert.AreEqual(1, ms.ToArray()[secondMessageOffset + MagicNumberPartOffset]); // default magic number should be 1
checksumPart = new byte[ChecksumPartLength];
Array.Copy(ms.ToArray(), secondMessageOffset + ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
dataPart = new byte[messageBytes.Length];
Array.Copy(ms.ToArray(), secondMessageOffset + DataPartOffset, dataPart, 0, messageBytes.Length);
Assert.AreEqual(messageBytes, dataPart);
}
[Test]
public void SetSizeValid()
{
byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5 };
Message msg1 = new Message(messageBytes);
Message msg2 = new Message(messageBytes);
MessageSet messageSet = new BufferedMessageSet(new List<Message>() { msg1, msg2 });
Assert.AreEqual(
2 * (MessageLengthPartLength + MagicNumberPartLength + AttributesPartLength + ChecksumPartLength + messageBytes.Length),
messageSet.SetSize);
}
}
}