blob: ce1fe3443f5f39e4542bf3ec7c5f6856875a8729 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
import org.apache.pulsar.common.stats.Rate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NonPersistentReplicator extends AbstractReplicator implements Replicator {
private final Rate msgOut = new Rate();
private final Rate msgDrop = new Rate();
private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();
public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService,
replicationClient);
producerBuilder.blockIfQueueFull(false);
startProducer();
}
@Override
protected void readEntries(Producer<byte[]> producer) {
this.producer = (ProducerImpl) producer;
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
log.info("[{}][{} -> {}] Created replicator producer", topicName, localCluster, remoteCluster);
backOff.reset();
} else {
log.info(
"[{}][{} -> {}] Replicator was stopped while creating the producer."
+ " Closing it. Replicator state: {}",
topicName, localCluster, remoteCluster, STATE_UPDATER.get(this));
STATE_UPDATER.set(this, State.Stopping);
closeProducerAsync();
return;
}
}
public void sendMessage(Entry entry) {
if ((STATE_UPDATER.get(this) == State.Started) && isWritable()) {
int length = entry.getLength();
ByteBuf headersAndPayload = entry.getDataBuffer();
MessageImpl msg;
try {
msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
} catch (Throwable t) {
log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
entry.release();
return;
}
if (msg.isReplicated()) {
// Discard messages that were already replicated into this region
entry.release();
msg.recycle();
return;
}
if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Skipping message at {} / msg-id: {}: replicateTo {}", topicName,
localCluster, remoteCluster, entry.getPosition(), msg.getMessageId(), msg.getReplicateTo());
}
entry.release();
msg.recycle();
return;
}
msgOut.recordEvent(headersAndPayload.readableBytes());
msg.setReplicatedFrom(localCluster);
headersAndPayload.retain();
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] dropping message because replicator producer is not started/writable",
topicName, localCluster, remoteCluster);
}
msgDrop.recordEvent();
entry.release();
}
}
@Override
public void updateRates() {
msgOut.calculateRate();
msgDrop.calculateRate();
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgDropRate = msgDrop.getRate();
}
@Override
public NonPersistentReplicatorStatsImpl getStats() {
stats.connected = producer != null && producer.isConnected();
stats.replicationDelayInSeconds = getReplicationDelayInSeconds();
ProducerImpl producer = this.producer;
if (producer != null) {
stats.outboundConnection = producer.getConnectionId();
stats.outboundConnectedSince = producer.getConnectedSince();
} else {
stats.outboundConnection = null;
stats.outboundConnectedSince = null;
}
return stats;
}
private long getReplicationDelayInSeconds() {
if (producer != null) {
return TimeUnit.MILLISECONDS.toSeconds(producer.getDelayInMillis());
}
return 0L;
}
private static final class ProducerSendCallback implements SendCallback {
private NonPersistentReplicator replicator;
private Entry entry;
private MessageImpl msg;
@Override
public void sendComplete(Exception exception) {
if (exception != null) {
log.error("[{}][{} -> {}] Error producing on remote broker", replicator.topicName,
replicator.localCluster, replicator.remoteCluster, exception);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Message persisted on remote broker", replicator.topicName,
replicator.localCluster, replicator.remoteCluster);
}
}
entry.release();
recycle();
}
private final Handle<ProducerSendCallback> recyclerHandle;
private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
static ProducerSendCallback create(NonPersistentReplicator replicator, Entry entry, MessageImpl msg) {
ProducerSendCallback sendCallback = RECYCLER.get();
sendCallback.replicator = replicator;
sendCallback.entry = entry;
sendCallback.msg = msg;
return sendCallback;
}
private void recycle() {
replicator = null;
entry = null; // already released and recycled on sendComplete
if (msg != null) {
msg.recycle();
msg = null;
}
recyclerHandle.recycle(this);
}
private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>() {
@Override
protected ProducerSendCallback newObject(Handle<ProducerSendCallback> handle) {
return new ProducerSendCallback(handle);
}
};
@Override
public void addCallback(MessageImpl<?> msg, SendCallback scb) {
// noop
}
@Override
public SendCallback getNextSendCallback() {
return null;
}
@Override
public MessageImpl<?> getNextMessage() {
return null;
}
@Override
public CompletableFuture<MessageId> getFuture() {
return null;
}
}
private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);
@Override
protected Position getReplicatorReadPosition() {
// No-op
return null;
}
@Override
protected long getNumberOfEntriesInBacklog() {
// No-op
return 0;
}
@Override
protected void disableReplicatorRead() {
// No-op
}
@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
return producer != null && producer.isConnected();
}
}