blob: 0a57d936549cfba379921d46f4a7d0c468df8b20 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.BooleanSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
import static org.testng.AssertJUnit.fail;
import org.testng.annotations.Test;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
/**
* Unit test of {@link MessageImpl}.
*/
public class MessageImplTest {
@Test
public void testGetSequenceIdNotAssociated() {
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<?> msg = MessageImpl.create(new MessageMetadata(), payload, Schema.BYTES);
assertEquals(-1, msg.getSequenceId());
}
@Test
public void testSetDuplicatePropertiesKey() {
MessageMetadata builder = new MessageMetadata();
builder.addProperty().setKey("key1").setValue("value1");
builder.addProperty().setKey("key1").setValue("value2");
builder.addProperty().setKey("key3").setValue("value3");
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES);
assertEquals("value2", msg.getProperty("key1"));
assertEquals("value3", msg.getProperty("key3"));
}
@Test
public void testGetSequenceIdAssociated() {
MessageMetadata builder = new MessageMetadata()
.setSequenceId(1234);
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES);
assertEquals(1234, msg.getSequenceId());
}
@Test
public void testGetProducerNameNotAssigned() {
MessageMetadata builder = new MessageMetadata();
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES);
assertNull(msg.getProducerName());
}
@Test
public void testGetProducerNameAssigned() {
MessageMetadata builder = new MessageMetadata()
.setProducerName("test-producer");
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES);
assertEquals("test-producer", msg.getProducerName());
}
@Test
public void testDefaultGetProducerDataAssigned() {
AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
// // Check kv.encoding.type default, not set value
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata builder = new MessageMetadata()
.setProducerName("default");
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
assertFalse(builder.hasPartitionKey());
}
@Test
public void testInlineGetProducerDataAssigned() {
AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
fooSchema, barSchema, KeyValueEncodingType.INLINE);
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
// Check kv.encoding.type INLINE
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata builder = new MessageMetadata()
.setProducerName("inline");
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
assertFalse(builder.hasPartitionKey());
}
@Test
public void testSeparatedGetProducerDataAssigned() {
AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
// Check kv.encoding.type SPRAERATE
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata builder = new MessageMetadata()
.setProducerName("separated");
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
builder.setPartitionKeyB64Encoded(true);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
assertTrue(builder.hasPartitionKey());
}
@Test
public void testDefaultAVROVersionGetProducerDataAssigned() {
AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata builder = new MessageMetadata()
.setProducerName("default");
builder.setSchemaVersion(new byte[10]);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
assertFalse(builder.hasPartitionKey());
Assert.assertEquals(
KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
KeyValueEncodingType.INLINE);
}
@Test
public void testSeparatedAVROVersionGetProducerDataAssigned() {
AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata builder = new MessageMetadata()
.setProducerName("separated");
builder.setSchemaVersion(new byte[10]);
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
builder.setPartitionKeyB64Encoded(true);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
assertTrue(builder.hasPartitionKey());
Assert.assertEquals(
KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
KeyValueEncodingType.SEPARATED);
}
@Test
public void testDefaultJSONVersionGetProducerDataAssigned() {
JSONSchema<SchemaTestUtils.Foo> fooSchema = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(
SchemaTestUtils.Foo.class).build());
JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(
SchemaTestUtils.Bar.class).build());
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata builder = new MessageMetadata()
.setProducerName("default");
builder.setSchemaVersion(new byte[10]);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
assertFalse(builder.hasPartitionKey());
Assert.assertEquals(
KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
KeyValueEncodingType.INLINE);
}
@Test
public void testSeparatedJSONVersionGetProducerDataAssigned() {
JSONSchema<SchemaTestUtils.Foo> fooSchema = JSONSchema.of(
SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(
SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata builder = new MessageMetadata()
.setProducerName("separated");
builder.setSchemaVersion(new byte[10]);
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
builder.setPartitionKeyB64Encoded(true);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
assertTrue(builder.hasPartitionKey());
Assert.assertEquals(
KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
KeyValueEncodingType.SEPARATED);
}
@Test
public void testDefaultAVROJSONVersionGetProducerDataAssigned() {
AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(
SchemaTestUtils.Foo.class).build());
JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(
SchemaTestUtils.Bar.class).build());
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata builder = new MessageMetadata()
.setProducerName("default");
builder.setSchemaVersion(new byte[10]);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
assertFalse(builder.hasPartitionKey());
Assert.assertEquals(
KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
KeyValueEncodingType.INLINE);
}
@Test
public void testSeparatedAVROJSONVersionGetProducerDataAssigned() {
AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(
SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata builder = new MessageMetadata()
.setProducerName("separated");
builder.setSchemaVersion(new byte[10]);
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
builder.setPartitionKeyB64Encoded(true);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
assertTrue(builder.hasPartitionKey());
Assert.assertEquals(
KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
KeyValueEncodingType.SEPARATED);
}
@Test
public void testTypedSchemaGetNullValue() {
byte[] encodeBytes = new byte[0];
MessageMetadata builder = new MessageMetadata()
.setProducerName("valueNotSet");
builder.setSchemaVersion(new byte[0]);
builder.setPartitionKey(Base64.getEncoder().encodeToString(encodeBytes));
builder.setPartitionKeyB64Encoded(true);
builder.setNullValue(true);
MessageImpl<Boolean> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), BooleanSchema.of());
assertNull(msg.getValue());
}
@Test(timeOut = 30000)
public void testMessageBrokerAndEntryMetadataTimestampMissed() {
int MOCK_BATCH_SIZE = 10;
String data = "test-message";
ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
try {
// test BrokerTimestamp not set.
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(1)
.setProducerName("test")
.setSequenceId(1);
byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
.setIndex(MOCK_BATCH_SIZE - 1);
int brokerMetaSize = brokerMetadata.getSerializedSize();
ByteBuf brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
brokerMeta.writeInt(brokerMetaSize);
brokerMetadata.writeTo(brokerMeta);
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
assertTrue(message.isExpired(100));
// test BrokerTimestamp set.
byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("test")
.setSequenceId(1);
byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
brokerMetadata = new BrokerEntryMetadata()
.setBrokerTimestamp(System.currentTimeMillis())
.setIndex(MOCK_BATCH_SIZE - 1);
brokerMetaSize = brokerMetadata.getSerializedSize();
brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
brokerMeta.writeInt(brokerMetaSize);
brokerMetadata.writeTo(brokerMeta);
compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
assertFalse(message.isExpired(24 * 3600));
} catch (IOException e) {
fail();
}
}
}