| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service.persistent; |
| |
| import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; |
| import com.google.common.annotations.VisibleForTesting; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.util.Recycler; |
| import io.netty.util.Recycler.Handle; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; |
| import org.apache.bookkeeper.mledger.Entry; |
| import org.apache.bookkeeper.mledger.ManagedCursor; |
| import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; |
| import org.apache.bookkeeper.mledger.Position; |
| import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| import org.apache.pulsar.broker.PulsarServerException; |
| import org.apache.pulsar.broker.service.AbstractReplicator; |
| import org.apache.pulsar.broker.service.BrokerService; |
| import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; |
| import org.apache.pulsar.broker.service.Replicator; |
| import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.impl.Backoff; |
| import org.apache.pulsar.client.impl.MessageImpl; |
| import org.apache.pulsar.client.impl.ProducerImpl; |
| import org.apache.pulsar.client.impl.PulsarClientImpl; |
| import org.apache.pulsar.client.impl.SendCallback; |
| import org.apache.pulsar.common.api.proto.MarkerType; |
| import org.apache.pulsar.common.policies.data.Policies; |
| import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; |
| import org.apache.pulsar.common.schema.SchemaInfo; |
| import org.apache.pulsar.common.stats.Rate; |
| import org.apache.pulsar.common.util.Codec; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class PersistentReplicator extends AbstractReplicator |
| implements Replicator, ReadEntriesCallback, DeleteCallback { |
| |
| private final PersistentTopic topic; |
| protected final ManagedCursor cursor; |
| |
| private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty(); |
| |
| private int readBatchSize; |
| private final int readMaxSizeBytes; |
| |
| private final int producerQueueThreshold; |
| |
| private static final AtomicIntegerFieldUpdater<PersistentReplicator> PENDING_MESSAGES_UPDATER = |
| AtomicIntegerFieldUpdater |
| .newUpdater(PersistentReplicator.class, "pendingMessages"); |
| private volatile int pendingMessages = 0; |
| |
| private static final int FALSE = 0; |
| private static final int TRUE = 1; |
| |
| private static final AtomicIntegerFieldUpdater<PersistentReplicator> HAVE_PENDING_READ_UPDATER = |
| AtomicIntegerFieldUpdater |
| .newUpdater(PersistentReplicator.class, "havePendingRead"); |
| private volatile int havePendingRead = FALSE; |
| |
| private final Rate msgOut = new Rate(); |
| private final Rate msgExpired = new Rate(); |
| |
| private int messageTTLInSeconds = 0; |
| |
| private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS, |
| 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); |
| |
| private final PersistentMessageExpiryMonitor expiryMonitor; |
| // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold |
| private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000; |
| |
| private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl(); |
| |
| public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, |
| BrokerService brokerService, PulsarClientImpl replicationClient) |
| throws PulsarServerException { |
| super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService, |
| replicationClient); |
| this.topic = topic; |
| this.cursor = cursor; |
| this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, |
| Codec.decode(cursor.getName()), cursor, null); |
| HAVE_PENDING_READ_UPDATER.set(this, FALSE); |
| PENDING_MESSAGES_UPDATER.set(this, 0); |
| |
| readBatchSize = Math.min( |
| producerQueueSize, |
| topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize()); |
| readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes(); |
| producerQueueThreshold = (int) (producerQueueSize * 0.9); |
| |
| this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); |
| |
| startProducer(); |
| } |
| |
| @Override |
| protected void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer) { |
| // Rewind the cursor to be sure to read again all non-acked messages sent while restarting |
| cursor.rewind(); |
| |
| cursor.cancelPendingReadRequest(); |
| HAVE_PENDING_READ_UPDATER.set(this, FALSE); |
| this.producer = (ProducerImpl) producer; |
| |
| if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { |
| log.info("[{}][{} -> {}] Created replicator producer", topicName, localCluster, remoteCluster); |
| backOff.reset(); |
| // activate cursor: so, entries can be cached |
| this.cursor.setActive(); |
| // read entries |
| readMoreEntries(); |
| } else { |
| log.info( |
| "[{}][{} -> {}] Replicator was stopped while creating the producer." |
| + " Closing it. Replicator state: {}", |
| topicName, localCluster, remoteCluster, STATE_UPDATER.get(this)); |
| STATE_UPDATER.set(this, State.Stopping); |
| closeProducerAsync(); |
| } |
| |
| } |
| |
| @Override |
| protected Position getReplicatorReadPosition() { |
| return cursor.getMarkDeletedPosition(); |
| } |
| |
| @Override |
| protected long getNumberOfEntriesInBacklog() { |
| return cursor.getNumberOfEntriesInBacklog(false); |
| } |
| |
| @Override |
| protected void disableReplicatorRead() { |
| if (this.cursor != null) { |
| // deactivate cursor after successfully close the producer |
| this.cursor.setInactive(); |
| } |
| } |
| |
| /** |
| * Calculate available permits for read entries. |
| * |
| * @return |
| * 0: Producer queue is full, no permits. |
| * -1: Rate Limiter reaches limit. |
| * >0: available permits for read entries. |
| */ |
| private int getAvailablePermits() { |
| int availablePermits = producerQueueSize - PENDING_MESSAGES_UPDATER.get(this); |
| |
| // return 0, if Producer queue is full, it will pause read entries. |
| if (availablePermits <= 0) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Producer queue is full, availablePermits: {}, pause reading", |
| topicName, localCluster, remoteCluster, availablePermits); |
| } |
| return 0; |
| } |
| |
| // handle rate limit |
| if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) { |
| DispatchRateLimiter rateLimiter = dispatchRateLimiter.get(); |
| // no permits from rate limit |
| if (!rateLimiter.hasMessageDispatchPermit()) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] message-read exceeded topic replicator message-rate {}/{}," |
| + " schedule after a {}", |
| topicName, localCluster, remoteCluster, |
| rateLimiter.getDispatchRateOnMsg(), |
| rateLimiter.getDispatchRateOnByte(), |
| MESSAGE_RATE_BACKOFF_MS); |
| } |
| return -1; |
| } |
| |
| // if dispatch-rate is in msg then read only msg according to available permit |
| long availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg(); |
| if (availablePermitsOnMsg > 0) { |
| availablePermits = Math.min(availablePermits, (int) availablePermitsOnMsg); |
| } |
| } |
| |
| return availablePermits; |
| } |
| |
| protected void readMoreEntries() { |
| int availablePermits = getAvailablePermits(); |
| |
| if (availablePermits > 0) { |
| int messagesToRead = Math.min(availablePermits, readBatchSize); |
| if (!isWritable()) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Throttling replication traffic because producer is not writable", |
| topicName, localCluster, remoteCluster); |
| } |
| // Minimize the read size if the producer is disconnected or the window is already full |
| messagesToRead = 1; |
| } |
| |
| // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException |
| messagesToRead = Math.max(messagesToRead, 1); |
| |
| // Schedule read |
| if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Schedule read of {} messages", topicName, localCluster, remoteCluster, |
| messagesToRead); |
| } |
| cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this, |
| null, PositionImpl.LATEST); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Not scheduling read due to pending read. Messages To Read {}", topicName, |
| localCluster, remoteCluster, messagesToRead); |
| } |
| } |
| } else if (availablePermits == -1) { |
| // no permits from rate limit |
| topic.getBrokerService().executor().schedule( |
| () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] No Permits for reading. availablePermits: {}", |
| topicName, localCluster, remoteCluster, availablePermits); |
| } |
| } |
| } |
| |
| @Override |
| public void readEntriesComplete(List<Entry> entries, Object ctx) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Read entries complete of {} messages", topicName, localCluster, remoteCluster, |
| entries.size()); |
| } |
| |
| int maxReadBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize(); |
| if (readBatchSize < maxReadBatchSize) { |
| int newReadBatchSize = Math.min(readBatchSize * 2, maxReadBatchSize); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", topicName, localCluster, |
| remoteCluster, readBatchSize, newReadBatchSize); |
| } |
| |
| readBatchSize = newReadBatchSize; |
| } |
| |
| readFailureBackoff.reduceToHalf(); |
| |
| boolean atLeastOneMessageSentForReplication = false; |
| boolean isEnableReplicatedSubscriptions = |
| brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions(); |
| |
| try { |
| // This flag is set to true when we skip atleast one local message, |
| // in order to skip remaining local messages. |
| boolean isLocalMessageSkippedOnce = false; |
| for (int i = 0; i < entries.size(); i++) { |
| Entry entry = entries.get(i); |
| int length = entry.getLength(); |
| ByteBuf headersAndPayload = entry.getDataBuffer(); |
| MessageImpl msg; |
| try { |
| msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload); |
| } catch (Throwable t) { |
| log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName, |
| localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t); |
| cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); |
| entry.release(); |
| continue; |
| } |
| |
| if (isEnableReplicatedSubscriptions) { |
| checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload); |
| } |
| |
| if (msg.isReplicated()) { |
| // Discard messages that were already replicated into this region |
| cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); |
| entry.release(); |
| msg.recycle(); |
| continue; |
| } |
| |
| if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Skipping message at position {}, replicateTo {}", topicName, |
| localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo()); |
| } |
| cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); |
| entry.release(); |
| msg.recycle(); |
| continue; |
| } |
| |
| if (msg.isExpired(messageTTLInSeconds)) { |
| msgExpired.recordEvent(0 /* no value stat */); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Discarding expired message at position {}, replicateTo {}", topicName, |
| localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo()); |
| } |
| cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); |
| entry.release(); |
| msg.recycle(); |
| continue; |
| } |
| |
| if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { |
| // The producer is not ready yet after having stopped/restarted. Drop the message because it will |
| // recovered when the producer is ready |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Dropping read message at {} because producer is not ready", topicName, |
| localCluster, remoteCluster, entry.getPosition()); |
| } |
| isLocalMessageSkippedOnce = true; |
| entry.release(); |
| msg.recycle(); |
| continue; |
| } |
| |
| dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength())); |
| |
| // Increment pending messages for messages produced locally |
| PENDING_MESSAGES_UPDATER.incrementAndGet(this); |
| |
| msgOut.recordEvent(headersAndPayload.readableBytes()); |
| |
| msg.setReplicatedFrom(localCluster); |
| |
| headersAndPayload.retain(); |
| |
| getSchemaInfo(msg).thenAccept(schemaInfo -> { |
| msg.setSchemaInfoForReplicator(schemaInfo); |
| producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg)); |
| }).exceptionally(ex -> { |
| log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName, |
| localCluster, remoteCluster, ex); |
| return null; |
| }); |
| |
| atLeastOneMessageSentForReplication = true; |
| } |
| } catch (Exception e) { |
| log.error("[{}][{} -> {}] Unexpected exception: {}", topicName, localCluster, remoteCluster, e.getMessage(), |
| e); |
| } |
| |
| HAVE_PENDING_READ_UPDATER.set(this, FALSE); |
| |
| if (atLeastOneMessageSentForReplication && !isWritable()) { |
| // Don't read any more entries until the current pending entries are persisted |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Pausing replication traffic. at-least-one: {} is-writable: {}", topicName, |
| localCluster, remoteCluster, atLeastOneMessageSentForReplication, isWritable()); |
| } |
| } else { |
| readMoreEntries(); |
| } |
| } |
| |
| private CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws ExecutionException { |
| if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) { |
| return CompletableFuture.completedFuture(null); |
| } |
| return client.getSchemaProviderLoadingCache().get(topicName) |
| .getSchemaByVersion(msg.getSchemaVersion()); |
| } |
| |
| public void updateCursorState() { |
| if (this.cursor != null) { |
| if (producer != null && producer.isConnected()) { |
| this.cursor.setActive(); |
| } else { |
| this.cursor.setInactive(); |
| } |
| } |
| } |
| |
| private static final class ProducerSendCallback implements SendCallback { |
| private PersistentReplicator replicator; |
| private Entry entry; |
| private MessageImpl msg; |
| |
| @Override |
| public void sendComplete(Exception exception) { |
| if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) { |
| log.error("[{}][{} -> {}] Error producing on remote broker", replicator.topicName, |
| replicator.localCluster, replicator.remoteCluster, exception); |
| // cursor should be rewinded since it was incremented when readMoreEntries |
| replicator.cursor.rewind(); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Message persisted on remote broker", replicator.topicName, |
| replicator.localCluster, replicator.remoteCluster); |
| } |
| replicator.cursor.asyncDelete(entry.getPosition(), replicator, entry.getPosition()); |
| } |
| entry.release(); |
| |
| int pending = PENDING_MESSAGES_UPDATER.decrementAndGet(replicator); |
| |
| // In general, we schedule a new batch read operation when the occupied queue size gets smaller than half |
| // the max size, unless another read operation is already in progress. |
| // If the producer is not currently writable (disconnected or TCP window full), we want to defer the reads |
| // until we have emptied the whole queue, and at that point we will read a batch of 1 single message if the |
| // producer is still not "writable". |
| if (pending < replicator.producerQueueThreshold // |
| && HAVE_PENDING_READ_UPDATER.get(replicator) == FALSE // |
| ) { |
| if (pending == 0 || replicator.producer.isWritable()) { |
| replicator.readMoreEntries(); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Not resuming reads. pending: {} is-writable: {}", |
| replicator.topicName, replicator.localCluster, replicator.remoteCluster, pending, |
| replicator.producer.isWritable()); |
| } |
| } |
| } |
| |
| recycle(); |
| } |
| |
| private final Handle<ProducerSendCallback> recyclerHandle; |
| |
| private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) { |
| this.recyclerHandle = recyclerHandle; |
| } |
| |
| static ProducerSendCallback create(PersistentReplicator replicator, Entry entry, MessageImpl msg) { |
| ProducerSendCallback sendCallback = RECYCLER.get(); |
| sendCallback.replicator = replicator; |
| sendCallback.entry = entry; |
| sendCallback.msg = msg; |
| return sendCallback; |
| } |
| |
| private void recycle() { |
| replicator = null; |
| entry = null; //already released and recycled on sendComplete |
| if (msg != null) { |
| msg.recycle(); |
| msg = null; |
| } |
| recyclerHandle.recycle(this); |
| } |
| |
| private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>() { |
| @Override |
| protected ProducerSendCallback newObject(Handle<ProducerSendCallback> handle) { |
| return new ProducerSendCallback(handle); |
| } |
| }; |
| |
| @Override |
| public void addCallback(MessageImpl<?> msg, SendCallback scb) { |
| // noop |
| } |
| |
| @Override |
| public SendCallback getNextSendCallback() { |
| return null; |
| } |
| |
| @Override |
| public MessageImpl<?> getNextMessage() { |
| return null; |
| } |
| |
| @Override |
| public CompletableFuture<MessageId> getFuture() { |
| return null; |
| } |
| } |
| |
| @Override |
| public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { |
| if (STATE_UPDATER.get(this) != State.Started) { |
| log.info("[{}][{} -> {}] Replicator was stopped while reading entries." |
| + " Stop reading. Replicator state: {}", |
| topic, localCluster, remoteCluster, STATE_UPDATER.get(this)); |
| return; |
| } |
| |
| // Reduce read batch size to avoid flooding bookies with retries |
| readBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize(); |
| |
| long waitTimeMillis = readFailureBackoff.next(); |
| |
| if (exception instanceof CursorAlreadyClosedException) { |
| log.error("[{}][{} -> {}] Error reading entries because replicator is" |
| + " already deleted and cursor is already closed {}, ({})", |
| topic, localCluster, |
| remoteCluster, ctx, exception.getMessage(), exception); |
| // replicator is already deleted and cursor is already closed so, producer should also be stopped |
| closeProducerAsync(); |
| return; |
| } else if (!(exception instanceof TooManyRequestsException)) { |
| log.error("[{}][{} -> {}] Error reading entries at {}. Retrying to read in {}s. ({})", |
| topic, localCluster, |
| remoteCluster, ctx, waitTimeMillis / 1000.0, exception.getMessage(), exception); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Throttled by bookies while reading at {}. Retrying to read in {}s. ({})", |
| topicName, localCluster, remoteCluster, ctx, waitTimeMillis / 1000.0, exception.getMessage(), |
| exception); |
| } |
| } |
| |
| HAVE_PENDING_READ_UPDATER.set(this, FALSE); |
| brokerService.executor().schedule(this::readMoreEntries, waitTimeMillis, TimeUnit.MILLISECONDS); |
| } |
| |
| public CompletableFuture<Void> clearBacklog() { |
| CompletableFuture<Void> future = new CompletableFuture<>(); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Backlog size before clearing: {}", topicName, localCluster, remoteCluster, |
| cursor.getNumberOfEntriesInBacklog(false)); |
| } |
| |
| cursor.asyncClearBacklog(new ClearBacklogCallback() { |
| @Override |
| public void clearBacklogComplete(Object ctx) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Backlog size after clearing: {}", topicName, localCluster, remoteCluster, |
| cursor.getNumberOfEntriesInBacklog(false)); |
| } |
| future.complete(null); |
| } |
| |
| @Override |
| public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) { |
| log.error("[{}][{} -> {}] Failed to clear backlog", topicName, localCluster, remoteCluster, exception); |
| future.completeExceptionally(exception); |
| } |
| }, null); |
| |
| return future; |
| } |
| |
| public CompletableFuture<Void> skipMessages(int numMessagesToSkip) { |
| CompletableFuture<Void> future = new CompletableFuture<>(); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Skipping {} messages, current backlog {}", topicName, localCluster, remoteCluster, |
| numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false)); |
| } |
| cursor.asyncSkipEntries(numMessagesToSkip, IndividualDeletedEntries.Exclude, |
| new AsyncCallbacks.SkipEntriesCallback() { |
| @Override |
| public void skipEntriesComplete(Object ctx) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Skipped {} messages, new backlog {}", topicName, localCluster, |
| remoteCluster, numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false)); |
| } |
| future.complete(null); |
| } |
| |
| @Override |
| public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) { |
| log.error("[{}][{} -> {}] Failed to skip {} messages", topicName, localCluster, remoteCluster, |
| numMessagesToSkip, exception); |
| future.completeExceptionally(exception); |
| } |
| }, null); |
| |
| return future; |
| } |
| |
| public CompletableFuture<Entry> peekNthMessage(int messagePosition) { |
| CompletableFuture<Entry> future = new CompletableFuture<>(); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Getting message at position {}", topicName, localCluster, remoteCluster, |
| messagePosition); |
| } |
| |
| cursor.asyncGetNthEntry(messagePosition, IndividualDeletedEntries.Exclude, new ReadEntryCallback() { |
| |
| @Override |
| public void readEntryFailed(ManagedLedgerException exception, Object ctx) { |
| future.completeExceptionally(exception); |
| } |
| |
| @Override |
| public void readEntryComplete(Entry entry, Object ctx) { |
| future.complete(entry); |
| } |
| }, null); |
| |
| return future; |
| } |
| |
| @Override |
| public void deleteComplete(Object ctx) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}][{} -> {}] Deleted message at {}", topicName, localCluster, remoteCluster, ctx); |
| } |
| } |
| |
| @Override |
| public void deleteFailed(ManagedLedgerException exception, Object ctx) { |
| log.error("[{}][{} -> {}] Failed to delete message at {}: {}", topicName, localCluster, remoteCluster, ctx, |
| exception.getMessage(), exception); |
| if (ctx instanceof PositionImpl) { |
| PositionImpl deletedEntry = (PositionImpl) ctx; |
| if (deletedEntry.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) { |
| brokerService.getPulsar().getExecutor().schedule( |
| () -> cursor.asyncDelete(deletedEntry, (PersistentReplicator) this, deletedEntry), 10, |
| TimeUnit.SECONDS); |
| } |
| } |
| } |
| |
| public void updateRates() { |
| msgOut.calculateRate(); |
| msgExpired.calculateRate(); |
| stats.msgRateOut = msgOut.getRate(); |
| stats.msgThroughputOut = msgOut.getValueRate(); |
| stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate(); |
| } |
| |
| public ReplicatorStatsImpl getStats() { |
| stats.replicationBacklog = cursor != null ? cursor.getNumberOfEntriesInBacklog(false) : 0; |
| stats.connected = producer != null && producer.isConnected(); |
| stats.replicationDelayInSeconds = getReplicationDelayInSeconds(); |
| |
| ProducerImpl producer = this.producer; |
| if (producer != null) { |
| stats.outboundConnection = producer.getConnectionId(); |
| stats.outboundConnectedSince = producer.getConnectedSince(); |
| } else { |
| stats.outboundConnection = null; |
| stats.outboundConnectedSince = null; |
| } |
| |
| return stats; |
| } |
| |
| public void updateMessageTTL(int messageTTLInSeconds) { |
| this.messageTTLInSeconds = messageTTLInSeconds; |
| } |
| |
| private long getReplicationDelayInSeconds() { |
| if (producer != null) { |
| return TimeUnit.MILLISECONDS.toSeconds(producer.getDelayInMillis()); |
| } |
| return 0L; |
| } |
| |
| public boolean expireMessages(int messageTTLInSeconds) { |
| if ((cursor.getNumberOfEntriesInBacklog(false) == 0) |
| || (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK |
| && !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) { |
| // don't do anything for almost caught-up connected subscriptions |
| return false; |
| } |
| |
| return expiryMonitor.expireMessages(messageTTLInSeconds); |
| } |
| |
| public boolean expireMessages(Position position) { |
| return expiryMonitor.expireMessages(position); |
| } |
| |
| @Override |
| public Optional<DispatchRateLimiter> getRateLimiter() { |
| return dispatchRateLimiter; |
| } |
| |
| @Override |
| public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) { |
| if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter |
| .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.REPLICATOR)) { |
| this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR)); |
| } |
| } |
| |
| private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) { |
| if (!msg.getMessageBuilder().hasMarkerType()) { |
| // No marker is defined |
| return; |
| } |
| |
| int markerType = msg.getMessageBuilder().getMarkerType(); |
| |
| if (!(msg.getMessageBuilder().hasReplicatedFrom() |
| && remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom()))) { |
| // Only consider markers that are coming from the same cluster that this |
| // replicator instance is assigned to. |
| // All the replicators will see all the markers, but we need to only process |
| // it once. |
| return; |
| } |
| |
| switch (markerType) { |
| case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE: |
| case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE: |
| case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE: |
| topic.receivedReplicatedSubscriptionMarker(position, markerType, payload); |
| break; |
| |
| default: |
| // Do nothing |
| } |
| } |
| |
| @Override |
| public CompletableFuture<Void> disconnect() { |
| return disconnect(false); |
| } |
| |
| @Override |
| public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) { |
| final CompletableFuture<Void> future = new CompletableFuture<>(); |
| |
| super.disconnect(failIfHasBacklog).thenRun(() -> { |
| dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); |
| future.complete(null); |
| }).exceptionally(ex -> { |
| Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); |
| if (!(t instanceof TopicBusyException)) { |
| log.error("[{}][{} -> {}] Failed to close dispatch rate limiter: {}", topicName, localCluster, |
| remoteCluster, ex.getMessage()); |
| } |
| future.completeExceptionally(t); |
| return null; |
| }); |
| |
| return future; |
| } |
| |
| @Override |
| public boolean isConnected() { |
| ProducerImpl<?> producer = this.producer; |
| return producer != null && producer.isConnected(); |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class); |
| |
| @VisibleForTesting |
| public ManagedCursor getCursor() { |
| return cursor; |
| } |
| } |