| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.client.impl; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; |
| import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum; |
| import static org.apache.pulsar.common.api.Commands.hasChecksum; |
| import static org.apache.pulsar.common.api.Commands.readChecksum; |
| |
| import java.io.IOException; |
| import java.time.Instant; |
| import java.time.ZoneId; |
| import java.time.format.DateTimeFormatter; |
| import java.util.List; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLongFieldUpdater; |
| |
| import org.apache.pulsar.client.api.CompressionType; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.ProducerConfiguration; |
| import org.apache.pulsar.client.api.ProducerCryptoFailureAction; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.PulsarClientException.CryptoException; |
| import org.apache.pulsar.common.api.Commands; |
| import org.apache.pulsar.common.api.DoubleByteBuf; |
| import org.apache.pulsar.common.api.PulsarDecoder; |
| import org.apache.pulsar.common.api.Commands.ChecksumType; |
| import org.apache.pulsar.common.api.proto.PulsarApi; |
| import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; |
| import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; |
| import org.apache.pulsar.common.compression.CompressionCodec; |
| import org.apache.pulsar.common.compression.CompressionCodecProvider; |
| import org.apache.pulsar.common.util.DateFormatter; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.Queues; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.util.Recycler; |
| import io.netty.util.Recycler.Handle; |
| import io.netty.util.ReferenceCountUtil; |
| import io.netty.util.Timeout; |
| import io.netty.util.TimerTask; |
| import static java.lang.String.format; |
| |
| public class ProducerImpl extends ProducerBase implements TimerTask { |
| |
| // Producer id, used to identify a producer within a single connection |
| private final long producerId; |
| |
| // Variable is used through the atomic updater |
| @SuppressWarnings("unused") |
| private volatile long msgIdGenerator; |
| |
| private final BlockingQueue<OpSendMsg> pendingMessages; |
| private final BlockingQueue<OpSendMsg> pendingCallbacks; |
| private final Semaphore semaphore; |
| private volatile Timeout sendTimeout = null; |
| private long createProducerTimeout; |
| private final int maxNumMessagesInBatch; |
| private final BatchMessageContainer batchMessageContainer; |
| |
| // Globally unique producer name |
| private String producerName; |
| |
| private String connectionId; |
| private String connectedSince; |
| private final int partitionIndex; |
| |
| private final ProducerStats stats; |
| |
| private final CompressionCodec compressor; |
| |
| private volatile long lastSequenceIdPublished; |
| private MessageCrypto msgCrypto = null; |
| |
| private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater |
| .newUpdater(ProducerImpl.class, "msgIdGenerator"); |
| |
| private ScheduledExecutorService keyGenExecutor = null; |
| |
| public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf, |
| CompletableFuture<Producer> producerCreatedFuture, int partitionIndex) { |
| super(client, topic, conf, producerCreatedFuture); |
| this.producerId = client.newProducerId(); |
| this.producerName = conf.getProducerName(); |
| this.partitionIndex = partitionIndex; |
| this.pendingMessages = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages()); |
| this.pendingCallbacks = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages()); |
| this.semaphore = new Semaphore(conf.getMaxPendingMessages(), true); |
| this.compressor = CompressionCodecProvider |
| .getCompressionCodec(convertCompressionType(conf.getCompressionType())); |
| |
| if (conf.getInitialSequenceId().isPresent()) { |
| long initialSequenceId = conf.getInitialSequenceId().get(); |
| this.lastSequenceIdPublished = initialSequenceId; |
| this.msgIdGenerator = initialSequenceId + 1; |
| } else { |
| this.lastSequenceIdPublished = -1; |
| this.msgIdGenerator = 0; |
| } |
| |
| if (conf.isEncryptionEnabled()) { |
| String logCtx = "[" + topic + "] [" + producerName + "] [" + producerId + "]"; |
| this.msgCrypto = new MessageCrypto(logCtx , true); |
| |
| // Regenerate data key cipher at fixed interval |
| keyGenExecutor = Executors.newSingleThreadScheduledExecutor(); |
| keyGenExecutor.scheduleWithFixedDelay(() -> { |
| try { |
| msgCrypto.addPublicKeyCipher(conf.getEncryptionKeys(), conf.getCryptoKeyReader()); |
| } catch (CryptoException e) { |
| if (!producerCreatedFuture.isDone()) { |
| log.warn("[{}] [{}] [{}] Failed to add public key cipher.", topic, producerName, producerId); |
| producerCreatedFuture.completeExceptionally(e); |
| } |
| } |
| }, 0L, 4L, TimeUnit.HOURS); |
| |
| } |
| |
| if (conf.getSendTimeoutMs() > 0) { |
| sendTimeout = client.timer().newTimeout(this, conf.getSendTimeoutMs(), TimeUnit.MILLISECONDS); |
| } |
| |
| this.createProducerTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs(); |
| if (conf.getBatchingEnabled()) { |
| this.maxNumMessagesInBatch = conf.getBatchingMaxMessages(); |
| this.batchMessageContainer = new BatchMessageContainer(maxNumMessagesInBatch, |
| convertCompressionType(conf.getCompressionType()), topic, producerName); |
| } else { |
| this.maxNumMessagesInBatch = 1; |
| this.batchMessageContainer = null; |
| } |
| if (client.getConfiguration().getStatsIntervalSeconds() > 0) { |
| stats = new ProducerStats(client, conf, this); |
| } else { |
| stats = ProducerStats.PRODUCER_STATS_DISABLED; |
| } |
| grabCnx(); |
| } |
| |
| private boolean isBatchMessagingEnabled() { |
| return conf.getBatchingEnabled(); |
| } |
| |
| @Override |
| public long getLastSequenceId() { |
| return lastSequenceIdPublished; |
| } |
| |
| @Override |
| public CompletableFuture<MessageId> sendAsync(Message message) { |
| CompletableFuture<MessageId> future = new CompletableFuture<>(); |
| |
| sendAsync(message, new SendCallback() { |
| SendCallback nextCallback = null; |
| long createdAt = System.nanoTime(); |
| |
| @Override |
| public CompletableFuture<MessageId> getFuture() { |
| return future; |
| } |
| |
| @Override |
| public SendCallback getNextSendCallback() { |
| return nextCallback; |
| } |
| |
| @Override |
| public void sendComplete(Exception e) { |
| if (e != null) { |
| stats.incrementSendFailed(); |
| future.completeExceptionally(e); |
| } else { |
| future.complete(message.getMessageId()); |
| stats.incrementNumAcksReceived(System.nanoTime() - createdAt); |
| } |
| while (nextCallback != null) { |
| SendCallback sendCallback = nextCallback; |
| if (e != null) { |
| stats.incrementSendFailed(); |
| sendCallback.getFuture().completeExceptionally(e); |
| } else { |
| sendCallback.getFuture().complete(message.getMessageId()); |
| stats.incrementNumAcksReceived(System.nanoTime() - createdAt); |
| } |
| nextCallback = nextCallback.getNextSendCallback(); |
| sendCallback = null; |
| } |
| } |
| |
| @Override |
| public void addCallback(SendCallback scb) { |
| nextCallback = scb; |
| } |
| }); |
| return future; |
| } |
| |
| public void sendAsync(Message message, SendCallback callback) { |
| checkArgument(message instanceof MessageImpl); |
| |
| if (!isValidProducerState(callback)) { |
| return; |
| } |
| |
| if (!canEnqueueRequest(callback)) { |
| return; |
| } |
| |
| MessageImpl msg = (MessageImpl) message; |
| MessageMetadata.Builder msgMetadata = msg.getMessageBuilder(); |
| ByteBuf payload = msg.getDataBuffer(); |
| |
| // If compression is enabled, we are compressing, otherwise it will simply use the same buffer |
| int uncompressedSize = payload.readableBytes(); |
| ByteBuf compressedPayload = payload; |
| // batch will be compressed when closed |
| if (!isBatchMessagingEnabled()) { |
| compressedPayload = compressor.encode(payload); |
| payload.release(); |
| } |
| int compressedSize = compressedPayload.readableBytes(); |
| |
| // validate msg-size (validate uncompressed-payload size for batch as we can't discard later on while building a |
| // batch) |
| if (compressedSize > PulsarDecoder.MaxMessageSize) { |
| compressedPayload.release(); |
| String compressedStr = (!isBatchMessagingEnabled() && conf.getCompressionType() != CompressionType.NONE) |
| ? "Compressed" : ""; |
| callback.sendComplete(new PulsarClientException.InvalidMessageException( |
| format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize, |
| PulsarDecoder.MaxMessageSize))); |
| return; |
| } |
| |
| if (!msg.isReplicated() && msgMetadata.hasProducerName()) { |
| callback.sendComplete(new PulsarClientException.InvalidMessageException("Cannot re-use the same message")); |
| compressedPayload.release(); |
| return; |
| } |
| |
| try { |
| synchronized (this) { |
| long sequenceId; |
| if (!msgMetadata.hasSequenceId()) { |
| sequenceId = msgIdGeneratorUpdater.getAndIncrement(this); |
| msgMetadata.setSequenceId(sequenceId); |
| } else { |
| sequenceId = msgMetadata.getSequenceId(); |
| } |
| if (!msgMetadata.hasPublishTime()) { |
| msgMetadata.setPublishTime(System.currentTimeMillis()); |
| |
| checkArgument(!msgMetadata.hasProducerName()); |
| |
| msgMetadata.setProducerName(producerName); |
| |
| if (conf.getCompressionType() != CompressionType.NONE) { |
| msgMetadata.setCompression(convertCompressionType(conf.getCompressionType())); |
| msgMetadata.setUncompressedSize(uncompressedSize); |
| } |
| } |
| |
| if (isBatchMessagingEnabled()) { |
| // handle boundary cases where message being added would exceed |
| // batch size and/or max message size |
| if (batchMessageContainer.hasSpaceInBatch(msg)) { |
| batchMessageContainer.add(msg, callback); |
| payload.release(); |
| if (batchMessageContainer.numMessagesInBatch == maxNumMessagesInBatch |
| || batchMessageContainer.currentBatchSizeBytes >= BatchMessageContainer.MAX_MESSAGE_BATCH_SIZE_BYTES) { |
| batchMessageAndSend(); |
| } |
| } else { |
| doBatchSendAndAdd(msg, callback, payload); |
| } |
| } else { |
| ByteBuf encryptedPayload = encryptMessage(msgMetadata, compressedPayload); |
| ByteBuf cmd = sendMessage(producerId, sequenceId, 1, msgMetadata.build(), encryptedPayload); |
| msgMetadata.recycle(); |
| |
| final OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback); |
| op.setNumMessagesInBatch(1); |
| op.setBatchSizeByte(encryptedPayload.readableBytes()); |
| pendingMessages.put(op); |
| |
| // Read the connection before validating if it's still connected, so that we avoid reading a null |
| // value |
| ClientCnx cnx = cnx(); |
| if (isConnected()) { |
| // If we do have a connection, the message is sent immediately, otherwise we'll try again once a |
| // new |
| // connection is established |
| cmd.retain(); |
| cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op)); |
| stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", topic, producerName, |
| sequenceId); |
| } |
| } |
| } |
| } |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| semaphore.release(); |
| callback.sendComplete(new PulsarClientException(ie)); |
| } catch (PulsarClientException e) { |
| semaphore.release(); |
| callback.sendComplete(e); |
| } catch (Throwable t) { |
| semaphore.release(); |
| callback.sendComplete(new PulsarClientException(t)); |
| } |
| } |
| |
| private ByteBuf encryptMessage(MessageMetadata.Builder msgMetadata, ByteBuf compressedPayload) throws PulsarClientException { |
| |
| ByteBuf encryptedPayload = compressedPayload; |
| if (!conf.isEncryptionEnabled() || msgCrypto == null) { |
| return encryptedPayload; |
| } |
| try { |
| encryptedPayload = msgCrypto.encrypt(conf.getEncryptionKeys(), conf.getCryptoKeyReader(), msgMetadata, compressedPayload); |
| } catch (PulsarClientException e) { |
| // Unless config is set to explicitly publish un-encrypted message upon failure, fail the request |
| if (conf.getCryptoFailureAction() == ProducerCryptoFailureAction.SEND) { |
| log.warn("[{}] [{}] Failed to encrypt message {}. Proceeding with publishing unencrypted message", |
| topic, producerName, e.getMessage()); |
| return compressedPayload; |
| } |
| throw e; |
| } |
| return encryptedPayload; |
| } |
| |
| private ByteBuf sendMessage(long producerId, long sequenceId, int numMessages, MessageMetadata msgMetadata, |
| ByteBuf compressedPayload) throws IOException { |
| ChecksumType checksumType; |
| |
| if (getClientCnx() == null |
| || getClientCnx().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) { |
| checksumType = ChecksumType.Crc32c; |
| } else { |
| checksumType = ChecksumType.None; |
| } |
| return Commands.newSend(producerId, sequenceId, numMessages, checksumType, msgMetadata, compressedPayload); |
| } |
| |
| private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBuf payload) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Closing out batch to accomodate large message with size {}", topic, producerName, |
| msg.getDataBuffer().readableBytes()); |
| } |
| batchMessageAndSend(); |
| batchMessageContainer.add(msg, callback); |
| payload.release(); |
| } |
| |
| private boolean isValidProducerState(SendCallback callback) { |
| switch (getState()) { |
| case Ready: |
| // OK |
| case Connecting: |
| // We are OK to queue the messages on the client, it will be sent to the broker once we get the connection |
| return true; |
| case Closing: |
| case Closed: |
| callback.sendComplete(new PulsarClientException.AlreadyClosedException("Producer already closed")); |
| return false; |
| case Terminated: |
| callback.sendComplete(new PulsarClientException.TopicTerminatedException("Topic was terminated")); |
| return false; |
| case Failed: |
| case Uninitialized: |
| default: |
| callback.sendComplete(new PulsarClientException.NotConnectedException()); |
| return false; |
| } |
| } |
| |
| private boolean canEnqueueRequest(SendCallback callback) { |
| try { |
| if (conf.getBlockIfQueueFull()) { |
| semaphore.acquire(); |
| } else { |
| if (!semaphore.tryAcquire()) { |
| callback.sendComplete( |
| new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full")); |
| return false; |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| callback.sendComplete(new PulsarClientException(e)); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| private static final class WriteInEventLoopCallback implements Runnable { |
| private ProducerImpl producer; |
| private ClientCnx cnx; |
| private OpSendMsg op; |
| |
| static WriteInEventLoopCallback create(ProducerImpl producer, ClientCnx cnx, OpSendMsg op) { |
| WriteInEventLoopCallback c = RECYCLER.get(); |
| c.producer = producer; |
| c.cnx = cnx; |
| c.op = op; |
| return c; |
| } |
| |
| @Override |
| public void run() { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", producer.topic, producer.producerName, cnx, |
| op.sequenceId); |
| } |
| |
| try { |
| cnx.ctx().writeAndFlush(op.cmd, cnx.ctx().voidPromise()); |
| } finally { |
| recycle(); |
| } |
| } |
| |
| private void recycle() { |
| producer = null; |
| cnx = null; |
| op = null; |
| recyclerHandle.recycle(this); |
| } |
| |
| private final Handle<WriteInEventLoopCallback> recyclerHandle; |
| |
| private WriteInEventLoopCallback(Handle<WriteInEventLoopCallback> recyclerHandle) { |
| this.recyclerHandle = recyclerHandle; |
| } |
| |
| private static final Recycler<WriteInEventLoopCallback> RECYCLER = new Recycler<WriteInEventLoopCallback>() { |
| @Override |
| protected WriteInEventLoopCallback newObject(Handle<WriteInEventLoopCallback> handle) { |
| return new WriteInEventLoopCallback(handle); |
| } |
| }; |
| } |
| |
| @Override |
| public CompletableFuture<Void> closeAsync() { |
| final State currentState = getAndUpdateState(state -> { |
| if (state == State.Closed) { |
| return state; |
| } |
| return State.Closing; |
| }); |
| |
| if (currentState == State.Closed || currentState == State.Closing) { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| Timeout timeout = sendTimeout; |
| if (timeout != null) { |
| timeout.cancel(); |
| sendTimeout = null; |
| } |
| |
| if (keyGenExecutor != null && !keyGenExecutor.isTerminated()) { |
| keyGenExecutor.shutdown(); |
| } |
| |
| stats.cancelStatsTimeout(); |
| |
| ClientCnx cnx = cnx(); |
| if (cnx == null || currentState != State.Ready) { |
| log.info("[{}] [{}] Closed Producer (not connected)", topic, producerName); |
| synchronized (this) { |
| setState(State.Closed); |
| client.cleanupProducer(this); |
| PulsarClientException ex = new PulsarClientException.AlreadyClosedException( |
| "Producer was already closed"); |
| pendingMessages.forEach(msg -> { |
| msg.callback.sendComplete(ex); |
| msg.cmd.release(); |
| msg.recycle(); |
| }); |
| pendingMessages.clear(); |
| } |
| |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| long requestId = client.newRequestId(); |
| ByteBuf cmd = Commands.newCloseProducer(producerId, requestId); |
| |
| CompletableFuture<Void> closeFuture = new CompletableFuture<>(); |
| cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> { |
| cnx.removeProducer(producerId); |
| if (exception == null || !cnx.ctx().channel().isActive()) { |
| // Either we've received the success response for the close producer command from the broker, or the |
| // connection did break in the meantime. In any case, the producer is gone. |
| synchronized (ProducerImpl.this) { |
| log.info("[{}] [{}] Closed Producer", topic, producerName); |
| setState(State.Closed); |
| pendingMessages.forEach(msg -> { |
| msg.cmd.release(); |
| msg.recycle(); |
| }); |
| pendingMessages.clear(); |
| } |
| |
| closeFuture.complete(null); |
| client.cleanupProducer(this); |
| } else { |
| closeFuture.completeExceptionally(exception); |
| } |
| |
| return null; |
| }); |
| |
| return closeFuture; |
| } |
| |
| @Override |
| public boolean isConnected() { |
| return getClientCnx() != null && (getState() == State.Ready); |
| } |
| |
| public boolean isWritable() { |
| ClientCnx cnx = getClientCnx(); |
| return cnx != null && cnx.channel().isWritable(); |
| } |
| |
| public void terminated(ClientCnx cnx) { |
| State previousState = getAndUpdateState(state -> (state == State.Closed ? State.Closed : State.Terminated)); |
| if (previousState != State.Terminated && previousState != State.Closed) { |
| log.info("[{}] [{}] The topic has been terminated", topic, producerName); |
| setClientCnx(null); |
| |
| failPendingMessages(cnx, |
| new PulsarClientException.TopicTerminatedException("The topic has been terminated")); |
| } |
| } |
| |
| void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) { |
| OpSendMsg op = null; |
| boolean callback = false; |
| synchronized (this) { |
| op = pendingMessages.peek(); |
| if (op == null) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Got ack for timed out msg {}", topic, producerName, sequenceId); |
| } |
| return; |
| } |
| |
| long expectedSequenceId = op.sequenceId; |
| if (sequenceId > expectedSequenceId) { |
| log.warn("[{}] [{}] Got ack for msg. expecting: {} - got: {} - queue-size: {}", topic, producerName, |
| expectedSequenceId, sequenceId, pendingMessages.size()); |
| // Force connection closing so that messages can be retransmitted in a new connection |
| cnx.channel().close(); |
| } else if (sequenceId < expectedSequenceId) { |
| // Ignoring the ack since it's referring to a message that has already timed out. |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Got ack for timed out msg {} last-seq: {}", topic, producerName, sequenceId, |
| expectedSequenceId); |
| } |
| } else { |
| // Message was persisted correctly |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Received ack for msg {} ", topic, producerName, sequenceId); |
| } |
| pendingMessages.remove(); |
| semaphore.release(op.numMessagesInBatch); |
| callback = true; |
| pendingCallbacks.add(op); |
| } |
| } |
| if (callback) { |
| op = pendingCallbacks.poll(); |
| if (op != null) { |
| lastSequenceIdPublished = op.sequenceId + op.numMessagesInBatch - 1; |
| op.setMessageId(ledgerId, entryId, partitionIndex); |
| try { |
| // Need to protect ourselves from any exception being thrown in the future handler from the |
| // application |
| op.callback.sendComplete(null); |
| } catch (Throwable t) { |
| log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName, |
| sequenceId, t); |
| } |
| ReferenceCountUtil.safeRelease(op.cmd); |
| op.recycle(); |
| } |
| } |
| } |
| |
| /** |
| * Checks message checksum to retry if message was corrupted while sending to broker. Recomputes checksum of the |
| * message header-payload again. |
| * <ul> |
| * <li><b>if matches with existing checksum</b>: it means message was corrupt while sending to broker. So, resend |
| * message</li> |
| * <li><b>if doesn't match with existing checksum</b>: it means message is already corrupt and can't retry again. |
| * So, fail send-message by failing callback</li> |
| * </ul> |
| * |
| * @param cnx |
| * @param sequenceId |
| */ |
| protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) { |
| OpSendMsg op = pendingMessages.peek(); |
| if (op == null) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Got send failure for timed out msg {}", topic, producerName, sequenceId); |
| } |
| } else { |
| long expectedSequenceId = op.sequenceId; |
| if (sequenceId == expectedSequenceId) { |
| boolean corrupted = !verifyLocalBufferIsNotCorrupted(op); |
| if (corrupted) { |
| // remove message from pendingMessages queue and fail callback |
| pendingMessages.remove(); |
| semaphore.release(op.numMessagesInBatch); |
| try { |
| op.callback.sendComplete( |
| new PulsarClientException.ChecksumException("Checksum failded on corrupt message")); |
| } catch (Throwable t) { |
| log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, |
| producerName, sequenceId, t); |
| } |
| ReferenceCountUtil.safeRelease(op.cmd); |
| op.recycle(); |
| return; |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Message is not corrupted, retry send-message with sequenceId {}", topic, |
| producerName, sequenceId); |
| } |
| } |
| |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Corrupt message is already timed out {}", topic, producerName, sequenceId); |
| } |
| } |
| } |
| // as msg is not corrupted : let producer resend pending-messages again including checksum failed message |
| resendMessages(cnx); |
| } |
| |
| /** |
| * Computes checksum again and verifies it against existing checksum. If checksum doesn't match it means that |
| * message is corrupt. |
| * |
| * @param op |
| * @return returns true only if message is not modified and computed-checksum is same as previous checksum else |
| * return false that means that message is corrupted. Returns true if checksum is not present. |
| */ |
| protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) { |
| DoubleByteBuf msg = getDoubleByteBuf(op.cmd); |
| |
| if (msg != null) { |
| ByteBuf headerFrame = msg.getFirst(); |
| msg.markReaderIndex(); |
| headerFrame.markReaderIndex(); |
| try { |
| // skip bytes up to checksum index |
| headerFrame.skipBytes(4); // skip [total-size] |
| int cmdSize = (int) headerFrame.readUnsignedInt(); |
| headerFrame.skipBytes(cmdSize); |
| // verify if checksum present |
| if (hasChecksum(headerFrame)) { |
| int checksum = readChecksum(headerFrame).intValue(); |
| // msg.readerIndex is already at header-payload index, Recompute checksum for headers-payload |
| int metadataChecksum = computeChecksum(headerFrame); |
| long computedChecksum = resumeChecksum(metadataChecksum, msg.getSecond()); |
| return checksum == computedChecksum; |
| } else { |
| log.warn("[{}] [{}] checksum is not present into message with id {}", topic, producerName, |
| op.sequenceId); |
| } |
| } finally { |
| headerFrame.resetReaderIndex(); |
| msg.resetReaderIndex(); |
| } |
| return true; |
| } else { |
| log.warn("[{}] Failed while casting {} into DoubleByteBuf", producerName, op.cmd.getClass().getName()); |
| return false; |
| } |
| } |
| |
| protected static final class OpSendMsg { |
| MessageImpl msg; |
| List<MessageImpl> msgs; |
| ByteBuf cmd; |
| SendCallback callback; |
| long sequenceId; |
| long createdAt; |
| long batchSizeByte = 0; |
| int numMessagesInBatch = 1; |
| |
| static OpSendMsg create(MessageImpl msg, ByteBuf cmd, long sequenceId, SendCallback callback) { |
| OpSendMsg op = RECYCLER.get(); |
| op.msg = msg; |
| op.cmd = cmd; |
| op.callback = callback; |
| op.sequenceId = sequenceId; |
| op.createdAt = System.currentTimeMillis(); |
| return op; |
| } |
| |
| static OpSendMsg create(List<MessageImpl> msgs, ByteBuf cmd, long sequenceId, SendCallback callback) { |
| OpSendMsg op = RECYCLER.get(); |
| op.msgs = msgs; |
| op.cmd = cmd; |
| op.callback = callback; |
| op.sequenceId = sequenceId; |
| op.createdAt = System.currentTimeMillis(); |
| return op; |
| } |
| |
| void recycle() { |
| msg = null; |
| msgs = null; |
| cmd = null; |
| callback = null; |
| sequenceId = -1; |
| createdAt = -1; |
| recyclerHandle.recycle(this); |
| } |
| |
| void setNumMessagesInBatch(int numMessagesInBatch) { |
| this.numMessagesInBatch = numMessagesInBatch; |
| } |
| |
| void setBatchSizeByte(long batchSizeByte) { |
| this.batchSizeByte = batchSizeByte; |
| } |
| |
| void setMessageId(long ledgerId, long entryId, int partitionIndex) { |
| if (msg != null) { |
| msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex)); |
| } else { |
| for (int batchIndex = 0; batchIndex < msgs.size(); batchIndex++) { |
| msgs.get(batchIndex) |
| .setMessageId(new BatchMessageIdImpl(ledgerId, entryId, partitionIndex, batchIndex)); |
| } |
| } |
| } |
| |
| private OpSendMsg(Handle<OpSendMsg> recyclerHandle) { |
| this.recyclerHandle = recyclerHandle; |
| } |
| |
| private final Handle<OpSendMsg> recyclerHandle; |
| private static final Recycler<OpSendMsg> RECYCLER = new Recycler<OpSendMsg>() { |
| @Override |
| protected OpSendMsg newObject(Handle<OpSendMsg> handle) { |
| return new OpSendMsg(handle); |
| } |
| }; |
| } |
| |
| @Override |
| void connectionOpened(final ClientCnx cnx) { |
| // we set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating the |
| // producer, it will try to grab a new cnx |
| setClientCnx(cnx); |
| cnx.registerProducer(producerId, this); |
| |
| log.info("[{}] [{}] Creating producer on cnx {}", topic, producerName, cnx.ctx().channel()); |
| |
| long requestId = client.newRequestId(); |
| |
| cnx.sendRequestWithId(Commands.newProducer(topic, producerId, requestId, producerName), requestId) |
| .thenAccept(pair -> { |
| String producerName = pair.getLeft(); |
| long lastSequenceId = pair.getRight(); |
| |
| // We are now reconnected to broker and clear to send messages. Re-send all pending messages and |
| // set the cnx pointer so that new messages will be sent immediately |
| synchronized (ProducerImpl.this) { |
| if (getState() == State.Closing || getState() == State.Closed) { |
| // Producer was closed while reconnecting, close the connection to make sure the broker |
| // drops the producer on its side |
| cnx.removeProducer(producerId); |
| cnx.channel().close(); |
| return; |
| } |
| resetBackoff(); |
| |
| log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel()); |
| connectionId = cnx.ctx().channel().toString(); |
| connectedSince = DateFormatter.now(); |
| |
| if (this.producerName == null) { |
| this.producerName = producerName; |
| } |
| |
| if (this.lastSequenceIdPublished == -1 && !conf.getInitialSequenceId().isPresent()) { |
| this.lastSequenceIdPublished = lastSequenceId; |
| this.msgIdGenerator = lastSequenceId + 1; |
| } |
| |
| if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) { |
| // schedule the first batch message task |
| client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMs(), |
| TimeUnit.MILLISECONDS); |
| } |
| resendMessages(cnx); |
| } |
| }).exceptionally((e) -> { |
| Throwable cause = e.getCause(); |
| cnx.removeProducer(producerId); |
| if (getState() == State.Closing || getState() == State.Closed) { |
| // Producer was closed while reconnecting, close the connection to make sure the broker |
| // drops the producer on its side |
| cnx.channel().close(); |
| return null; |
| } |
| log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage()); |
| |
| if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) { |
| synchronized (this) { |
| log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", topic, |
| producerName); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Pending messages: {}", topic, producerName, |
| pendingMessages.size()); |
| } |
| |
| PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException( |
| "Could not send pending messages as backlog exceeded"); |
| failPendingMessages(cnx(), bqe); |
| } |
| } else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) { |
| log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.", |
| producerName, topic); |
| } |
| |
| if (cause instanceof PulsarClientException.TopicTerminatedException) { |
| setState(State.Terminated); |
| failPendingMessages(cnx(), (PulsarClientException) cause); |
| producerCreatedFuture.completeExceptionally(cause); |
| client.cleanupProducer(this); |
| } else if (producerCreatedFuture.isDone() || // |
| (cause instanceof PulsarClientException && isRetriableError((PulsarClientException) cause) |
| && System.currentTimeMillis() < createProducerTimeout)) { |
| // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are |
| // still within the initial timeout budget and we are dealing with a retriable error |
| reconnectLater(cause); |
| } else { |
| setState(State.Failed); |
| producerCreatedFuture.completeExceptionally(cause); |
| client.cleanupProducer(this); |
| } |
| |
| return null; |
| }); |
| } |
| |
| @Override |
| void connectionFailed(PulsarClientException exception) { |
| if (System.currentTimeMillis() > createProducerTimeout |
| && producerCreatedFuture.completeExceptionally(exception)) { |
| log.info("[{}] Producer creation failed for producer {}", topic, producerId); |
| setState(State.Failed); |
| client.cleanupProducer(this); |
| } |
| } |
| |
| private void resendMessages(ClientCnx cnx) { |
| cnx.ctx().channel().eventLoop().execute(() -> { |
| synchronized (this) { |
| if (getState() == State.Closing || getState() == State.Closed) { |
| // Producer was closed while reconnecting, close the connection to make sure the broker |
| // drops the producer on its side |
| cnx.channel().close(); |
| return; |
| } |
| int messagesToResend = pendingMessages.size(); |
| if (messagesToResend == 0) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] No pending messages to resend {}", topic, producerName, messagesToResend); |
| } |
| if (changeToReadyState()) { |
| producerCreatedFuture.complete(ProducerImpl.this); |
| return; |
| } else { |
| // Producer was closed while reconnecting, close the connection to make sure the broker |
| // drops the producer on its side |
| cnx.channel().close(); |
| return; |
| } |
| |
| } |
| |
| log.info("[{}] [{}] Re-Sending {} messages to server", topic, producerName, messagesToResend); |
| |
| final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion(); |
| for (OpSendMsg op : pendingMessages) { |
| |
| if (stripChecksum) { |
| stripChecksum(op); |
| } |
| op.cmd.retain(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", topic, producerName, |
| cnx.channel(), op.sequenceId); |
| } |
| cnx.ctx().write(op.cmd, cnx.ctx().voidPromise()); |
| stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte); |
| } |
| |
| cnx.ctx().flush(); |
| if (!changeToReadyState()) { |
| // Producer was closed while reconnecting, close the connection to make sure the broker |
| // drops the producer on its side |
| cnx.channel().close(); |
| return; |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Strips checksum from {@link OpSendMsg} command if present else ignore it. |
| * |
| * @param op |
| */ |
| private void stripChecksum(OpSendMsg op) { |
| op.cmd.markReaderIndex(); |
| int totalMsgBufSize = op.cmd.readableBytes(); |
| DoubleByteBuf msg = getDoubleByteBuf(op.cmd); |
| if (msg != null) { |
| ByteBuf headerFrame = msg.getFirst(); |
| msg.markReaderIndex(); |
| headerFrame.markReaderIndex(); |
| try { |
| headerFrame.skipBytes(4); // skip [total-size] |
| int cmdSize = (int) headerFrame.readUnsignedInt(); |
| |
| // verify if checksum present |
| headerFrame.skipBytes(cmdSize); |
| |
| if (!hasChecksum(headerFrame)) { |
| headerFrame.resetReaderIndex(); |
| return; |
| } |
| |
| int headerSize = 4 + 4 + cmdSize; // [total-size] [cmd-length] [cmd-size] |
| int checksumSize = 4 + 2; // [magic-number] [checksum-size] |
| int checksumMark = (headerSize + checksumSize); // [header-size] [checksum-size] |
| int metaPayloadSize = (totalMsgBufSize - checksumMark); // metadataPayload = totalSize - checksumMark |
| int newTotalFrameSizeLength = 4 + cmdSize + metaPayloadSize; // new total-size without checksum |
| headerFrame.resetReaderIndex(); |
| int headerFrameSize = headerFrame.readableBytes(); |
| |
| headerFrame.setInt(0, newTotalFrameSizeLength); // rewrite new [total-size] |
| ByteBuf metadata = headerFrame.slice(checksumMark, headerFrameSize - checksumMark); // sliced only |
| // metadata |
| headerFrame.writerIndex(headerSize); // set headerFrame write-index to overwrite metadata over checksum |
| metadata.readBytes(headerFrame, metadata.readableBytes()); |
| headerFrame.capacity(headerFrameSize - checksumSize); // reduce capacity by removed checksum bytes |
| headerFrame.resetReaderIndex(); |
| |
| } finally { |
| op.cmd.resetReaderIndex(); |
| } |
| } else { |
| log.warn("[{}] Failed while casting {} into DoubleByteBuf", producerName, op.cmd.getClass().getName()); |
| } |
| } |
| |
| public int brokerChecksumSupportedVersion() { |
| return ProtocolVersion.v6.getNumber(); |
| } |
| |
| @Override |
| String getHandlerName() { |
| return producerName; |
| } |
| |
| /** |
| * Process sendTimeout events |
| */ |
| @Override |
| public void run(Timeout timeout) throws Exception { |
| if (timeout.isCancelled()) { |
| return; |
| } |
| |
| long timeToWaitMs; |
| |
| synchronized (this) { |
| OpSendMsg firstMsg = pendingMessages.peek(); |
| if (firstMsg == null) { |
| // If there are no pending messages, reset the timeout to the configured value. |
| timeToWaitMs = conf.getSendTimeoutMs(); |
| } else { |
| // If there is at least one message, calculate the diff between the message timeout and the current |
| // time. |
| long diff = (firstMsg.createdAt + conf.getSendTimeoutMs()) - System.currentTimeMillis(); |
| if (diff <= 0) { |
| // The diff is less than or equal to zero, meaning that the message has been timed out. |
| // Set the callback to timeout on every message, then clear the pending queue. |
| log.info("[{}] [{}] Message send timed out. Failing {} messages", topic, producerName, |
| pendingMessages.size()); |
| |
| PulsarClientException te = new PulsarClientException.TimeoutException( |
| "Could not send message to broker within given timeout"); |
| failPendingMessages(cnx(), te); |
| stats.incrementSendFailed(pendingMessages.size()); |
| // Since the pending queue is cleared now, set timer to expire after configured value. |
| timeToWaitMs = conf.getSendTimeoutMs(); |
| } else { |
| // The diff is greater than zero, set the timeout to the diff value |
| timeToWaitMs = diff; |
| } |
| } |
| } |
| |
| sendTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS); |
| } |
| |
| /** |
| * This fails and clears the pending messages with the given exception. This method should be called from within the |
| * ProducerImpl object mutex. |
| */ |
| private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) { |
| if (cnx == null) { |
| final AtomicInteger releaseCount = new AtomicInteger(); |
| pendingMessages.forEach(op -> { |
| releaseCount.addAndGet(op.numMessagesInBatch); |
| try { |
| // Need to protect ourselves from any exception being thrown in the future handler from the |
| // application |
| op.callback.sendComplete(ex); |
| } catch (Throwable t) { |
| log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName, |
| op.sequenceId, t); |
| } |
| ReferenceCountUtil.safeRelease(op.cmd); |
| op.recycle(); |
| }); |
| semaphore.release(releaseCount.get()); |
| pendingMessages.clear(); |
| pendingCallbacks.clear(); |
| if (isBatchMessagingEnabled()) { |
| failPendingBatchMessages(ex); |
| } |
| } else { |
| // If we have a connection, we schedule the callback and recycle on the event loop thread to avoid any |
| // race condition since we also write the message on the socket from this thread |
| cnx.ctx().channel().eventLoop().execute(() -> { |
| synchronized (ProducerImpl.this) { |
| failPendingMessages(null, ex); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * fail any pending batch messages that were enqueued, however batch was not closed out |
| * |
| */ |
| private void failPendingBatchMessages(PulsarClientException ex) { |
| if (batchMessageContainer.isEmpty()) { |
| return; |
| } |
| int numMessagesInBatch = batchMessageContainer.numMessagesInBatch; |
| semaphore.release(numMessagesInBatch); |
| try { |
| // Need to protect ourselves from any exception being thrown in the future handler from the application |
| batchMessageContainer.firstCallback.sendComplete(ex); |
| } catch (Throwable t) { |
| log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName, |
| batchMessageContainer.sequenceId, t); |
| } |
| ReferenceCountUtil.safeRelease(batchMessageContainer.getBatchedSingleMessageMetadataAndPayload()); |
| batchMessageContainer.clear(); |
| } |
| |
| TimerTask batchMessageAndSendTask = new TimerTask() { |
| |
| @Override |
| public void run(Timeout timeout) throws Exception { |
| if (timeout.isCancelled()) { |
| return; |
| } |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Batching the messages from the batch container from timer thread", topic, |
| producerName); |
| } |
| // semaphore acquired when message was enqueued to container |
| synchronized (ProducerImpl.this) { |
| batchMessageAndSend(); |
| } |
| // schedule the next batch message task |
| client.timer().newTimeout(this, conf.getBatchingMaxPublishDelayMs(), TimeUnit.MILLISECONDS); |
| } |
| }; |
| |
| // must acquire semaphore before enqueuing |
| private void batchMessageAndSend() { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Batching the messages from the batch container with {} messages", topic, producerName, |
| batchMessageContainer.numMessagesInBatch); |
| } |
| OpSendMsg op = null; |
| int numMessagesInBatch = 0; |
| try { |
| if (!batchMessageContainer.isEmpty()) { |
| numMessagesInBatch = batchMessageContainer.numMessagesInBatch; |
| ByteBuf compressedPayload = batchMessageContainer.getCompressedBatchMetadataAndPayload(); |
| long sequenceId = batchMessageContainer.sequenceId; |
| ByteBuf encryptedPayload = encryptMessage(batchMessageContainer.messageMetadata, compressedPayload); |
| ByteBuf cmd = sendMessage(producerId, sequenceId, batchMessageContainer.numMessagesInBatch, |
| batchMessageContainer.setBatchAndBuild(), encryptedPayload); |
| |
| op = OpSendMsg.create(batchMessageContainer.messages, cmd, sequenceId, |
| batchMessageContainer.firstCallback); |
| |
| op.setNumMessagesInBatch(batchMessageContainer.numMessagesInBatch); |
| op.setBatchSizeByte(batchMessageContainer.currentBatchSizeBytes); |
| |
| batchMessageContainer.clear(); |
| |
| pendingMessages.put(op); |
| |
| if (isConnected()) { |
| // If we do have a connection, the message is sent immediately, otherwise we'll try again once a new |
| // connection is established |
| cmd.retain(); |
| cnx().ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx(), op)); |
| stats.updateNumMsgsSent(numMessagesInBatch, op.batchSizeByte); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", topic, producerName, |
| sequenceId); |
| } |
| } |
| } |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| semaphore.release(numMessagesInBatch); |
| if (op != null) { |
| op.callback.sendComplete(new PulsarClientException(ie)); |
| } |
| } catch (PulsarClientException e) { |
| Thread.currentThread().interrupt(); |
| semaphore.release(numMessagesInBatch); |
| if (op != null) { |
| op.callback.sendComplete(e); |
| } |
| } catch (Throwable t) { |
| semaphore.release(numMessagesInBatch); |
| log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t); |
| if (op != null) { |
| op.callback.sendComplete(new PulsarClientException(t)); |
| } |
| } |
| } |
| |
| /** |
| * Casts input cmd to {@link DoubleByteBuf} |
| * |
| * Incase if leak-detection level is enabled: pulsar instruments {@link DoubleByteBuf} into LeakAwareByteBuf (type |
| * of {@link io.netty.buffer.WrappedByteBuf}) So, this method casts input cmd to {@link DoubleByteBuf} else |
| * retrieves it from LeakAwareByteBuf. |
| * |
| * @param cmd |
| * @return DoubleByteBuf or null in case failed to cast input {@link ByteBuf} |
| */ |
| private DoubleByteBuf getDoubleByteBuf(ByteBuf cmd) { |
| DoubleByteBuf msg = null; |
| if (cmd instanceof DoubleByteBuf) { |
| msg = (DoubleByteBuf) cmd; |
| } else { |
| try { |
| msg = (DoubleByteBuf) cmd.unwrap(); |
| } catch (Exception e) { |
| log.error("[{}] Failed while casting {} into DoubleByteBuf", producerName, cmd.getClass().getName(), e); |
| } |
| } |
| return msg; |
| } |
| |
| public long getDelayInMillis() { |
| OpSendMsg firstMsg = pendingMessages.peek(); |
| if (firstMsg != null) { |
| return System.currentTimeMillis() - firstMsg.createdAt; |
| } |
| return 0L; |
| } |
| |
| public String getConnectionId() { |
| return cnx() != null ? connectionId : null; |
| } |
| |
| public String getConnectedSince() { |
| return cnx() != null ? connectedSince : null; |
| } |
| |
| public int getPendingQueueSize() { |
| return pendingMessages.size(); |
| } |
| |
| private PulsarApi.CompressionType convertCompressionType(CompressionType compressionType) { |
| switch (compressionType) { |
| case NONE: |
| return PulsarApi.CompressionType.NONE; |
| case LZ4: |
| return PulsarApi.CompressionType.LZ4; |
| case ZLIB: |
| return PulsarApi.CompressionType.ZLIB; |
| |
| default: |
| throw new RuntimeException("Invalid compression type"); |
| } |
| } |
| |
| @Override |
| public ProducerStats getStats() { |
| if (stats instanceof ProducerStatsDisabled) { |
| return null; |
| } |
| return stats; |
| } |
| |
| public String getProducerName() { |
| return producerName; |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class); |
| } |