blob: 6dd296d16b53bdaca963b89e25a7c4158de22a94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.StringInterner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractReplicator {
protected final BrokerService brokerService;
protected final String localTopicName;
protected final String localCluster;
protected final String remoteTopicName;
protected final String remoteCluster;
protected final PulsarClientImpl replicationClient;
protected final PulsarClientImpl client;
protected String replicatorId;
protected final Topic localTopic;
protected volatile ProducerImpl producer;
public static final String REPL_PRODUCER_NAME_DELIMITER = "-->";
protected final int producerQueueSize;
protected final ProducerBuilder<byte[]> producerBuilder;
protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0,
TimeUnit.MILLISECONDS);
protected final String replicatorPrefix;
protected static final AtomicReferenceFieldUpdater<AbstractReplicator, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state");
private volatile State state = State.Stopped;
protected enum State {
Stopped, Starting, Started, Stopping
}
public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName,
String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
this.brokerService = brokerService;
this.localTopic = localTopic;
this.localTopicName = localTopic.getName();
this.replicatorPrefix = replicatorPrefix;
this.localCluster = StringInterner.intern(localCluster);
this.remoteTopicName = remoteTopicName;
this.remoteCluster = StringInterner.intern(remoteCluster);
this.replicationClient = replicationClient;
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
this.replicatorId = String.format("%s | %s",
StringUtils.equals(localTopicName, remoteTopicName) ? localTopicName :
localTopicName + "-->" + remoteTopicName,
StringUtils.equals(localCluster, remoteCluster) ? localCluster : localCluster + "-->" + remoteCluster
);
this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) //
.topic(remoteTopicName)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.enableBatching(false)
.sendTimeout(0, TimeUnit.SECONDS) //
.maxPendingMessages(producerQueueSize) //
.producerName(getProducerName());
STATE_UPDATER.set(this, State.Stopped);
}
protected abstract String getProducerName();
protected abstract void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);
protected abstract Position getReplicatorReadPosition();
protected abstract long getNumberOfEntriesInBacklog();
protected abstract void disableReplicatorRead();
public String getRemoteCluster() {
return remoteCluster;
}
// This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer
// the end result can be disconnect.
public synchronized void startProducer() {
if (STATE_UPDATER.get(this) == State.Stopping) {
long waitTimeMs = backOff.next();
if (log.isDebugEnabled()) {
log.debug(
"[{}] waiting for producer to close before attempting to reconnect, retrying in {} s",
replicatorId, waitTimeMs / 1000.0);
}
// BackOff before retrying
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs,
TimeUnit.MILLISECONDS);
return;
}
State state = STATE_UPDATER.get(this);
if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) {
if (state == State.Started) {
// Already running
if (log.isDebugEnabled()) {
log.debug("[{}] Replicator was already running", replicatorId);
}
} else {
log.info("[{}] Replicator already being started. Replicator state: {}", replicatorId, state);
}
return;
}
log.info("[{}] Starting replicator", replicatorId);
producerBuilder.createAsync().thenAccept(producer -> {
readEntries(producer);
}).exceptionally(ex -> {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
long waitTimeMs = backOff.next();
log.warn("[{}] Failed to create remote producer ({}), retrying in {} s",
replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
// BackOff before retrying
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs,
TimeUnit.MILLISECONDS);
} else {
log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId,
STATE_UPDATER.get(this), ex);
}
return null;
});
}
protected void checkTopicActiveAndRetryStartProducer() {
isLocalTopicActive().thenAccept(isTopicActive -> {
if (isTopicActive) {
startProducer();
}
}).exceptionally(ex -> {
log.warn("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}", replicatorId,
STATE_UPDATER.get(this), ex);
return null;
});
}
protected CompletableFuture<Boolean> isLocalTopicActive() {
CompletableFuture<Optional<Topic>> topicFuture = brokerService.getTopics().get(localTopicName);
if (topicFuture == null){
return CompletableFuture.completedFuture(false);
}
return topicFuture.thenApplyAsync(optional -> {
if (optional.isEmpty()) {
return false;
}
return optional.get() == localTopic;
}, brokerService.executor());
}
protected synchronized CompletableFuture<Void> closeProducerAsync() {
if (producer == null) {
STATE_UPDATER.set(this, State.Stopped);
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> future = producer.closeAsync();
future.thenRun(() -> {
STATE_UPDATER.set(this, State.Stopped);
this.producer = null;
// deactivate further read
disableReplicatorRead();
}).exceptionally(ex -> {
long waitTimeMs = backOff.next();
log.warn(
"[{}] Exception: '{}' occurred while trying to close the producer."
+ " retrying again in {} s",
replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
// BackOff before retrying
brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS);
return null;
});
return future;
}
public CompletableFuture<Void> disconnect() {
return disconnect(false);
}
public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
if (failIfHasBacklog && getNumberOfEntriesInBacklog() > 0) {
CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog"));
if (log.isDebugEnabled()) {
log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId);
}
return disconnectFuture;
}
if (STATE_UPDATER.get(this) == State.Stopping) {
// Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by
// closeProducerAsync()
// which will at some point change the state to stopped
return CompletableFuture.completedFuture(null);
}
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping)
|| STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping)) {
log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId,
getReplicatorReadPosition(), getNumberOfEntriesInBacklog());
}
return closeProducerAsync();
}
public CompletableFuture<Void> remove() {
// No-op
return CompletableFuture.completedFuture(null);
}
protected boolean isWritable() {
ProducerImpl producer = this.producer;
return producer != null && producer.isWritable();
}
public static String getRemoteCluster(String remoteCursor) {
String[] split = remoteCursor.split("\\.");
return split[split.length - 1];
}
public static String getReplicatorName(String replicatorPrefix, String cluster) {
return StringInterner.intern(replicatorPrefix + "." + cluster);
}
/**
* Replication can't be started on root-partitioned-topic to avoid producer startup conflict.
*
* <pre>
* eg:
* if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then
* broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2".
*
* However, if broker tries to start producer with root topic "my-topic" then client-lib internally
* creates individual producers for "my-topic-partition-1" and "my-topic-partition-2" which creates
* conflict with existing
* replicator producers.
* </pre>
*
* Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned
* producers.
*
* @param topic
* @param brokerService
*/
public static CompletableFuture<Void> validatePartitionedTopicAsync(String topic, BrokerService brokerService) {
TopicName topicName = TopicName.get(topic);
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.partitionedTopicExistsAsync(topicName).thenCompose(isPartitionedTopic -> {
if (isPartitionedTopic) {
String s = topicName
+ " is a partitioned-topic and replication can't be started for partitioned-producer ";
log.error(s);
return FutureUtil.failedFuture(new NamingException(s));
}
return CompletableFuture.completedFuture(null);
});
}
private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
public State getState() {
return state;
}
}