blob: ae1bda513b9043ac117ffb2ee02b8b4c0207c75d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.schema.AbstractSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
public class MessageImpl<T> implements Message<T> {
protected MessageId messageId;
private final MessageMetadata msgMetadata;
private ClientCnx cnx;
private ByteBuf payload;
private Schema<T> schema;
private SchemaHash schemaHash;
private SchemaInfo schemaInfoForReplicator;
private SchemaState schemaState = SchemaState.None;
private Optional<EncryptionContext> encryptionCtx = Optional.empty();
private String topic; // only set for incoming messages
private transient Map<String, String> properties;
private int redeliveryCount;
private int uncompressedSize;
private BrokerEntryMetadata brokerEntryMetadata;
private boolean poolMessage;
@Getter
private long consumerEpoch;
// Constructor for out-going message
public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, ByteBuffer payload, Schema<T> schema,
String topic) {
@SuppressWarnings("unchecked")
MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
msg.msgMetadata.clear();
msg.msgMetadata.copyFrom(msgMetadata);
msg.messageId = null;
msg.topic = topic;
msg.cnx = null;
msg.payload = Unpooled.wrappedBuffer(payload);
msg.properties = null;
msg.schema = schema;
msg.schemaHash = SchemaHash.of(schema);
msg.uncompressedSize = payload.remaining();
return msg;
}
// Constructor for incoming message
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
ByteBuf payload, ClientCnx cnx, Schema<T> schema) {
this(topic, messageId, msgMetadata, payload, Optional.empty(), cnx, schema);
}
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0, false, DEFAULT_CONSUMER_EPOCH);
}
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
boolean pooledMessage, long consumerEpoch) {
this.msgMetadata = new MessageMetadata();
init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
schema, redeliveryCount, pooledMessage, consumerEpoch);
}
public static <T> MessageImpl<T> create(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
ByteBuf payload, Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema,
int redeliveryCount, boolean pooledMessage, long consumerEpoch) {
if (pooledMessage) {
@SuppressWarnings("unchecked")
MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
init(msg, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount,
pooledMessage, consumerEpoch);
return msg;
} else {
return new MessageImpl<>(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema,
redeliveryCount, pooledMessage, consumerEpoch);
}
}
MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata msgMetadata,
SingleMessageMetadata singleMessageMetadata, ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
ClientCnx cnx, Schema<T> schema, long consumerEpoch) {
this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, 0,
false, consumerEpoch);
}
MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata batchMetadata,
SingleMessageMetadata singleMessageMetadata, ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
ClientCnx cnx, Schema<T> schema, int redeliveryCount,
boolean keepMessageInDirectMemory, long consumerEpoch) {
this.msgMetadata = new MessageMetadata();
init(this, topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, encryptionCtx,
cnx, schema, redeliveryCount, keepMessageInDirectMemory, consumerEpoch);
}
public static <T> MessageImpl<T> create(String topic, BatchMessageIdImpl batchMessageIdImpl,
MessageMetadata batchMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
boolean pooledMessage, long consumerEpoch) {
if (pooledMessage) {
@SuppressWarnings("unchecked")
MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
init(msg, topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, encryptionCtx, cnx,
schema, redeliveryCount, pooledMessage, consumerEpoch);
return msg;
} else {
return new MessageImpl<>(topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload,
encryptionCtx, cnx, schema, redeliveryCount, pooledMessage, consumerEpoch);
}
}
static <T> void init(MessageImpl<T> msg, String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
ByteBuf payload, Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema,
int redeliveryCount, boolean poolMessage, long consumerEpoch) {
init(msg, topic, null /* batchMessageIdImpl */, msgMetadata, null /* singleMessageMetadata */, payload,
encryptionCtx, cnx, schema, redeliveryCount, poolMessage, consumerEpoch);
msg.messageId = messageId;
}
private static <T> void init(MessageImpl<T> msg, String topic, BatchMessageIdImpl batchMessageIdImpl,
MessageMetadata msgMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
boolean poolMessage, long consumerEpoch) {
msg.msgMetadata.clear();
msg.msgMetadata.copyFrom(msgMetadata);
msg.messageId = batchMessageIdImpl;
msg.topic = topic;
msg.cnx = cnx;
msg.redeliveryCount = redeliveryCount;
msg.encryptionCtx = encryptionCtx;
msg.schema = schema;
msg.consumerEpoch = consumerEpoch;
msg.poolMessage = poolMessage;
// If it's not pool message then need to make a copy since the passed payload is
// using a ref-count buffer that we don't know when could release, since the
// Message is passed to the user. Also, the passed ByteBuf is coming from network
// and is backed by a direct buffer which we could not expose as a byte[]
msg.payload = poolMessage ? payload.retain() : Unpooled.copiedBuffer(payload);
if (singleMessageMetadata != null) {
if (singleMessageMetadata.getPropertiesCount() > 0) {
Map<String, String> properties = new TreeMap<>();
for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
properties.put(entry.getKey(), entry.getValue());
}
msg.properties = Collections.unmodifiableMap(properties);
} else {
msg.properties = Collections.emptyMap();
}
if (singleMessageMetadata.hasPartitionKey()) {
msg.msgMetadata.setPartitionKeyB64Encoded(singleMessageMetadata.isPartitionKeyB64Encoded())
.setPartitionKey(singleMessageMetadata.getPartitionKey());
} else if (msg.msgMetadata.hasPartitionKey()) {
msg.msgMetadata.clearPartitionKey();
msg.msgMetadata.clearPartitionKeyB64Encoded();
}
if (singleMessageMetadata.hasOrderingKey()) {
msg.msgMetadata.setOrderingKey(singleMessageMetadata.getOrderingKey());
} else if (msg.msgMetadata.hasOrderingKey()) {
msg.msgMetadata.clearOrderingKey();
}
if (singleMessageMetadata.hasEventTime()) {
msg.msgMetadata.setEventTime(singleMessageMetadata.getEventTime());
}
if (singleMessageMetadata.hasSequenceId()) {
msg.msgMetadata.setSequenceId(singleMessageMetadata.getSequenceId());
}
if (singleMessageMetadata.hasNullValue()) {
msg.msgMetadata.setNullValue(singleMessageMetadata.isNullValue());
}
if (singleMessageMetadata.hasNullPartitionKey()) {
msg.msgMetadata.setNullPartitionKey(singleMessageMetadata.isNullPartitionKey());
}
} else if (msgMetadata.getPropertiesCount() > 0) {
msg.properties = Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream().collect(
Collectors.toMap(KeyValue::getKey, KeyValue::getValue, (oldValue, newValue) -> newValue)));
} else {
msg.properties = Collections.emptyMap();
}
}
public MessageImpl(String topic, String msgId, Map<String, String> properties,
byte[] payload, Schema<T> schema, MessageMetadata msgMetadata) {
this(topic, msgId, properties, Unpooled.wrappedBuffer(payload), schema, msgMetadata);
}
public MessageImpl(String topic, String msgId, Map<String, String> properties,
ByteBuf payload, Schema<T> schema, MessageMetadata msgMetadata) {
String[] data = msgId.split(":");
long ledgerId = Long.parseLong(data[0]);
long entryId = Long.parseLong(data[1]);
if (data.length == 3) {
this.messageId = new BatchMessageIdImpl(ledgerId, entryId, -1, Integer.parseInt(data[2]));
} else {
this.messageId = new MessageIdImpl(ledgerId, entryId, -1);
}
this.topic = topic;
this.cnx = null;
this.payload = payload;
this.properties = Collections.unmodifiableMap(properties);
this.schema = schema;
this.redeliveryCount = 0;
this.msgMetadata = new MessageMetadata().copyFrom(msgMetadata);
}
public static MessageImpl<byte[]> deserialize(ByteBuf headersAndPayload) throws IOException {
@SuppressWarnings("unchecked")
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
Commands.parseMessageMetadata(headersAndPayload, msg.msgMetadata);
msg.payload = headersAndPayload;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.properties = Collections.emptyMap();
msg.brokerEntryMetadata = null;
return msg;
}
public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {
return messageTTLInSeconds != 0
&& (System.currentTimeMillis() > entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));
}
public static boolean isEntryPublishedEarlierThan(long entryTimestamp, long timestamp) {
return entryTimestamp < timestamp;
}
public static MessageImpl<byte[]> deserializeSkipBrokerEntryMetaData(
ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
@SuppressWarnings("unchecked")
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
msg.payload = headersAndPayloadWithBrokerEntryMetadata;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.brokerEntryMetadata = brokerEntryMetadata;
msg.consumerEpoch = DEFAULT_CONSUMER_EPOCH;
return msg;
}
public void setReplicatedFrom(String cluster) {
msgMetadata.setReplicatedFrom(cluster);
}
@Override
public boolean isReplicated() {
return msgMetadata.hasReplicatedFrom();
}
@Override
public String getReplicatedFrom() {
if (isReplicated()) {
return msgMetadata.getReplicatedFrom();
} else {
return null;
}
}
@Override
public long getPublishTime() {
return msgMetadata.getPublishTime();
}
@Override
public long getEventTime() {
if (msgMetadata.hasEventTime()) {
return msgMetadata.getEventTime();
}
return 0;
}
public long getDeliverAtTime() {
if (msgMetadata.hasDeliverAtTime()) {
return msgMetadata.getDeliverAtTime();
}
return 0;
}
public boolean isExpired(int messageTTLInSeconds) {
return messageTTLInSeconds != 0 && (brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp()
? (System.currentTimeMillis() > getPublishTime() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds))
: (System.currentTimeMillis()
> brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
}
@Override
public byte[] getData() {
if (msgMetadata.isNullValue()) {
return null;
}
if (payload.isDirect()) {
byte[] data = new byte[payload.readableBytes()];
payload.getBytes(payload.readerIndex(), data);
return data;
}
if (payload.arrayOffset() == 0 && payload.capacity() == payload.array().length) {
return payload.array();
} else {
// Need to copy into a smaller byte array
byte[] data = new byte[payload.readableBytes()];
payload.readBytes(data);
return data;
}
}
@Override
public int size() {
if (msgMetadata.isNullValue()) {
return 0;
}
return payload.readableBytes();
}
public Schema<T> getSchemaInternal() {
return this.schema;
}
@Override
public Optional<Schema<?>> getReaderSchema() {
ensureSchemaIsLoaded();
if (schema == null) {
return Optional.empty();
}
byte[] schemaVersion = getSchemaVersion();
if (schemaVersion == null) {
return Optional.of(schema);
}
if (schema instanceof AutoConsumeSchema) {
return Optional.of(((AutoConsumeSchema) schema)
.atSchemaVersion(schemaVersion));
} else if (schema instanceof AbstractSchema) {
return Optional.of(((AbstractSchema<?>) schema)
.atSchemaVersion(schemaVersion));
} else {
return Optional.of(schema);
}
}
// For messages produced by older version producers without schema, the schema version is an empty byte array
// rather than null.
@Override
public byte[] getSchemaVersion() {
if (msgMetadata.hasSchemaVersion()) {
byte[] schemaVersion = msgMetadata.getSchemaVersion();
return (schemaVersion.length == 0) ? null : schemaVersion;
} else {
return null;
}
}
private void ensureSchemaIsLoaded() {
if (schema instanceof AutoConsumeSchema) {
((AutoConsumeSchema) schema).fetchSchemaIfNeeded(BytesSchemaVersion.of(getSchemaVersion()));
} else if (schema instanceof KeyValueSchemaImpl) {
((KeyValueSchemaImpl) schema)
.fetchSchemaIfNeeded(getTopicName(), BytesSchemaVersion.of(getSchemaVersion()));
}
}
public SchemaInfo getSchemaInfo() {
if (schema == null) {
return null;
}
ensureSchemaIsLoaded();
if (schema instanceof AutoConsumeSchema) {
return ((AutoConsumeSchema) schema).getSchemaInfo(getSchemaVersion());
}
return schema.getSchemaInfo();
}
public SchemaHash getSchemaHash() {
return schemaHash == null ? SchemaHash.empty() : schemaHash;
}
public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
if (msgMetadata.hasReplicatedFrom()) {
this.schemaInfoForReplicator = schemaInfo;
this.schemaHash = SchemaHash.of(schemaInfo);
} else {
throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message.");
}
}
public SchemaInfo getSchemaInfoForReplicator() {
return msgMetadata.hasReplicatedFrom() ? this.schemaInfoForReplicator : null;
}
@Override
public T getValue() {
SchemaInfo schemaInfo = getSchemaInfo();
if (schemaInfo != null && SchemaType.KEY_VALUE == schemaInfo.getType()) {
if (schema.supportSchemaVersioning()) {
return getKeyValueBySchemaVersion();
} else {
return getKeyValue();
}
} else {
if (msgMetadata.isNullValue()) {
return null;
}
// check if the schema passed in from client supports schema versioning or not
// this is an optimization to only get schema version when necessary
return decode(schema.supportSchemaVersioning() ? getSchemaVersion() : null);
}
}
private KeyValueSchemaImpl getKeyValueSchema() {
if (schema instanceof AutoConsumeSchema) {
return (KeyValueSchemaImpl) ((AutoConsumeSchema) schema).getInternalSchema(getSchemaVersion());
} else {
return (KeyValueSchemaImpl) schema;
}
}
private T decode(byte[] schemaVersion) {
try {
return decodeBySchema(schemaVersion);
} catch (ArrayIndexOutOfBoundsException e) {
// It usually means the message was produced without schema check while the message is not compatible with
// the current schema. Therefore, convert it to SchemaSerializationException with a better description.
final int payloadSize = payload.readableBytes();
throw new SchemaSerializationException("payload (" + payloadSize + " bytes) cannot be decoded with schema "
+ new String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8));
}
}
private T decodeBySchema(byte[] schemaVersion) {
T value = poolMessage ? schema.decode(payload.nioBuffer(), schemaVersion) : null;
if (value != null) {
return value;
}
if (null == schemaVersion) {
return schema.decode(getData());
} else {
return schema.decode(getData(), schemaVersion);
}
}
private T getKeyValueBySchemaVersion() {
KeyValueSchemaImpl kvSchema = getKeyValueSchema();
byte[] schemaVersion = getSchemaVersion();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
org.apache.pulsar.common.schema.KeyValue keyValue =
(org.apache.pulsar.common.schema.KeyValue) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
if (schema instanceof AutoConsumeSchema) {
return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
((AutoConsumeSchema) schema).getSchemaInfo(schemaVersion).getType(), schemaVersion);
} else {
return (T) keyValue;
}
} else {
return decode(schemaVersion);
}
}
private T getKeyValue() {
KeyValueSchemaImpl kvSchema = getKeyValueSchema();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
org.apache.pulsar.common.schema.KeyValue keyValue =
(org.apache.pulsar.common.schema.KeyValue) kvSchema.decode(getKeyBytes(), getData(), null);
if (schema instanceof AutoConsumeSchema) {
return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
((AutoConsumeSchema) schema).getSchemaInfo(getSchemaVersion()).getType(), null);
} else {
return (T) keyValue;
}
} else {
return decode(null);
}
}
@Override
public long getSequenceId() {
if (msgMetadata.hasSequenceId()) {
return msgMetadata.getSequenceId();
}
return -1;
}
@Override
public String getProducerName() {
if (msgMetadata.hasProducerName()) {
return msgMetadata.getProducerName();
}
return null;
}
public ByteBuf getDataBuffer() {
return payload;
}
@Override
public MessageId getMessageId() {
return messageId;
}
@Override
public synchronized Map<String, String> getProperties() {
if (this.properties == null) {
if (msgMetadata.getPropertiesCount() > 0) {
this.properties = Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
(oldValue, newValue) -> newValue)));
} else {
this.properties = Collections.emptyMap();
}
}
return this.properties;
}
@Override
public boolean hasProperty(String name) {
return getProperties().containsKey(name);
}
@Override
public String getProperty(String name) {
return this.getProperties().get(name);
}
public MessageMetadata getMessageBuilder() {
return msgMetadata;
}
@Override
public boolean hasKey() {
return msgMetadata.hasPartitionKey();
}
@Override
public String getTopicName() {
return topic;
}
@Override
public String getKey() {
if (msgMetadata.hasPartitionKey()) {
return msgMetadata.getPartitionKey();
} else {
return null;
}
}
@Override
public boolean hasBase64EncodedKey() {
return msgMetadata.isPartitionKeyB64Encoded();
}
@Override
public byte[] getKeyBytes() {
if (!msgMetadata.hasPartitionKey() || msgMetadata.isNullPartitionKey()) {
return null;
} else if (hasBase64EncodedKey()) {
return Base64.getDecoder().decode(getKey());
} else {
return getKey().getBytes(UTF_8);
}
}
@Override
public boolean hasOrderingKey() {
return msgMetadata.hasOrderingKey();
}
@Override
public byte[] getOrderingKey() {
if (msgMetadata.hasOrderingKey()) {
return msgMetadata.getOrderingKey();
} else {
return null;
}
}
public BrokerEntryMetadata getBrokerEntryMetadata() {
return brokerEntryMetadata;
}
public void setBrokerEntryMetadata(BrokerEntryMetadata brokerEntryMetadata) {
this.brokerEntryMetadata = brokerEntryMetadata;
}
public ClientCnx getCnx() {
return cnx;
}
public void recycle() {
if (msgMetadata != null) {
msgMetadata.clear();
}
if (brokerEntryMetadata != null) {
brokerEntryMetadata.clear();
}
cnx = null;
messageId = null;
topic = null;
payload = null;
encryptionCtx = null;
redeliveryCount = 0;
uncompressedSize = 0;
properties = null;
schema = null;
schemaState = SchemaState.None;
poolMessage = false;
consumerEpoch = DEFAULT_CONSUMER_EPOCH;
if (recyclerHandle != null) {
recyclerHandle.recycle(this);
}
}
@Override
public void release() {
if (poolMessage) {
ReferenceCountUtil.safeRelease(payload);
recycle();
}
}
@Override
public boolean hasBrokerPublishTime() {
return brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp();
}
@Override
public Optional<Long> getBrokerPublishTime() {
if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
}
return Optional.empty();
}
@Override
public boolean hasIndex() {
return brokerEntryMetadata != null && brokerEntryMetadata.hasIndex();
}
@Override
public Optional<Long> getIndex() {
if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof BatchMessageIdImpl) {
int batchSize = ((BatchMessageIdImpl) messageId).getBatchSize();
int batchIndex = ((BatchMessageIdImpl) messageId).getBatchIndex();
return Optional.of(brokerEntryMetadata.getIndex() - batchSize + batchIndex + 1);
}
return Optional.of(brokerEntryMetadata.getIndex());
}
return Optional.empty();
}
private MessageImpl(Handle<MessageImpl<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
this.redeliveryCount = 0;
this.msgMetadata = new MessageMetadata();
this.brokerEntryMetadata = new BrokerEntryMetadata();
this.consumerEpoch = DEFAULT_CONSUMER_EPOCH;
}
private Handle<MessageImpl<?>> recyclerHandle;
private static final Recycler<MessageImpl<?>> RECYCLER = new Recycler<MessageImpl<?>>() {
@Override
protected MessageImpl<?> newObject(Handle<MessageImpl<?>> handle) {
return new MessageImpl<>(handle);
}
};
public boolean hasReplicateTo() {
return msgMetadata.getReplicateTosCount() > 0;
}
public List<String> getReplicateTo() {
return msgMetadata.getReplicateTosList();
}
public boolean hasReplicateFrom() {
return msgMetadata.hasReplicatedFrom();
}
void setMessageId(MessageId messageId) {
this.messageId = messageId;
}
@Override
public Optional<EncryptionContext> getEncryptionCtx() {
return encryptionCtx;
}
@Override
public int getRedeliveryCount() {
return redeliveryCount;
}
int getUncompressedSize() {
return uncompressedSize;
}
SchemaState getSchemaState() {
return schemaState;
}
void setSchemaState(SchemaState schemaState) {
this.schemaState = schemaState;
}
/**
* used only for unit-test to validate payload's state and ref-cnt.
*
* @return
*/
@VisibleForTesting
ByteBuf getPayload() {
return payload;
}
enum SchemaState {
None, Ready, Broken
}
}