blob: 9fdd44715f669dd26188be36082f79ad632f576b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import com.google.common.collect.ComparisonChain;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Key based batch message container.
*
* incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
*
* batched into multiple batch messages:
* [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
*/
class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
private Map<String, KeyedBatch> batches = new HashMap<>();
@Override
public boolean add(MessageImpl<?> msg, SendCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName,
producer.getProducerName(), numMessagesInBatch);
}
numMessagesInBatch++;
currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
String key = getKey(msg);
KeyedBatch part = batches.get(key);
if (part == null) {
part = new KeyedBatch();
part.addMsg(msg, callback);
part.compressionType = compressionType;
part.compressor = compressor;
part.maxBatchSize = maxBatchSize;
part.topicName = topicName;
part.producerName = producer.getProducerName();
batches.putIfAbsent(key, part);
if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
}
if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) {
currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
}
} else {
part.addMsg(msg, callback);
}
return isBatchFull();
}
@Override
public void clear() {
numMessagesInBatch = 0;
currentBatchSizeBytes = 0;
batches = new HashMap<>();
currentTxnidMostBits = -1L;
currentTxnidLeastBits = -1L;
}
@Override
public boolean isEmpty() {
return batches.isEmpty();
}
@Override
public void discard(Exception ex) {
try {
// Need to protect ourselves from any exception being thrown in the future handler from the application
batches.forEach((k, v) -> v.firstCallback.sendComplete(ex));
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback", topicName, producer.getProducerName(), t);
}
batches.forEach((k, v) -> ReferenceCountUtil.safeRelease(v.batchedMessageMetadataAndPayload));
clear();
}
@Override
public boolean isMultiBatches() {
return true;
}
private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOException {
ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata,
keyedBatch.getCompressedBatchMetadataAndPayload());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
keyedBatch.discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
return null;
}
final int numMessagesInBatch = keyedBatch.messages.size();
long currentBatchSizeBytes = 0;
for (MessageImpl<?> message : keyedBatch.messages) {
currentBatchSizeBytes += message.getDataBuffer().readableBytes();
}
keyedBatch.messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
if (currentTxnidMostBits != -1) {
keyedBatch.messageMetadata.setTxnidMostBits(currentTxnidMostBits);
}
if (currentTxnidLeastBits != -1) {
keyedBatch.messageMetadata.setTxnidLeastBits(currentTxnidLeastBits);
}
ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch,
keyedBatch.messageMetadata, encryptedPayload);
ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(
keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback);
op.setNumMessagesInBatch(numMessagesInBatch);
op.setBatchSizeByte(currentBatchSizeBytes);
return op;
}
@Override
public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
List<ProducerImpl.OpSendMsg> result = new ArrayList<>();
List<KeyedBatch> list = new ArrayList<>(batches.values());
list.sort(((o1, o2) -> ComparisonChain.start()
.compare(o1.sequenceId, o2.sequenceId)
.result()));
for (KeyedBatch keyedBatch : list) {
ProducerImpl.OpSendMsg op = createOpSendMsg(keyedBatch);
if (op != null) {
result.add(op);
}
}
return result;
}
@Override
public boolean hasSameSchema(MessageImpl<?> msg) {
String key = getKey(msg);
KeyedBatch part = batches.get(key);
if (part == null || part.messages.isEmpty()) {
return true;
}
if (!part.messageMetadata.hasSchemaVersion()) {
return msg.getSchemaVersion() == null;
}
return Arrays.equals(msg.getSchemaVersion(),
part.messageMetadata.getSchemaVersion());
}
private String getKey(MessageImpl<?> msg) {
if (msg.hasOrderingKey()) {
return Base64.getEncoder().encodeToString(msg.getOrderingKey());
}
return msg.getKey();
}
private static class KeyedBatch {
private final MessageMetadata messageMetadata = new MessageMetadata();
// sequence id for this batch which will be persisted as a single entry by broker
private long sequenceId = -1;
private ByteBuf batchedMessageMetadataAndPayload;
private List<MessageImpl<?>> messages = new ArrayList<>();
private SendCallback previousCallback = null;
private CompressionType compressionType;
private CompressionCodec compressor;
private int maxBatchSize;
private String topicName;
private String producerName;
// keep track of callbacks for individual messages being published in a batch
private SendCallback firstCallback;
private ByteBuf getCompressedBatchMetadataAndPayload() {
for (MessageImpl<?> msg : messages) {
batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(
msg.getMessageBuilder(), msg.getDataBuffer(), batchedMessageMetadataAndPayload);
}
int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload.release();
if (compressionType != CompressionType.NONE) {
messageMetadata.setCompression(compressionType);
messageMetadata.setUncompressedSize(uncompressedSize);
}
// Update the current max batch size using the uncompressed size, which is what we need in any case to
// accumulate the batch content
maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
return compressedPayload;
}
private void addMsg(MessageImpl<?> msg, SendCallback callback) {
if (messages.size() == 0) {
sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
.buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
firstCallback = callback;
}
if (previousCallback != null) {
previousCallback.addCallback(msg, callback);
}
previousCallback = callback;
messages.add(msg);
}
public void discard(Exception ex) {
try {
// Need to protect ourselves from any exception being thrown in the future handler from the application
if (firstCallback != null) {
firstCallback.sendComplete(ex);
}
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
sequenceId, t);
}
clear();
}
public void clear() {
messages = new ArrayList<>();
firstCallback = null;
previousCallback = null;
messageMetadata.clear();
sequenceId = -1;
batchedMessageMetadataAndPayload = null;
}
}
private static final Logger log = LoggerFactory.getLogger(BatchMessageKeyBasedContainer.class);
}