blob: 80743e44ab7d20f31ffa18b74b9e31c6dc9f9b0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronizer {
private static final Logger log = LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
protected PulsarService pulsar;
protected BrokerService brokerService;
protected String topicName;
protected PulsarClientImpl client;
protected volatile Producer<MetadataEvent> producer;
protected volatile Consumer<MetadataEvent> consumer;
private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>>
listeners = new CopyOnWriteArrayList<>();
private volatile boolean started = false;
public static final String SUBSCRIPTION_NAME = "metadata-syncer";
private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0,
TimeUnit.MILLISECONDS);
public PulsarMetadataEventSynchronizer(PulsarService pulsar, String topicName) throws PulsarServerException {
this.pulsar = pulsar;
this.brokerService = pulsar.getBrokerService();
this.topicName = topicName;
if (!StringUtils.isNotBlank(topicName)) {
log.info("Metadata synchronizer is disabled");
return;
}
}
public void start() throws PulsarServerException {
if (StringUtils.isBlank(topicName)) {
log.info("metadata topic doesn't exist.. skipping metadata synchronizer init..");
return;
}
this.client = (PulsarClientImpl) pulsar.getClient();
startProducer();
startConsumer();
log.info("Metadata event synchronizer started on topic {}", topicName);
}
@Override
public CompletableFuture<Void> notify(MetadataEvent event) {
CompletableFuture<Void> future = new CompletableFuture<>();
publishAsync(event, future);
return future;
}
@Override
public void registerSyncListener(Function<MetadataEvent, CompletableFuture<Void>> listener) {
listeners.add(listener);
}
@Override
public String getClusterName() {
return pulsar.getConfig().getClusterName();
}
private void publishAsync(MetadataEvent event, CompletableFuture<Void> future) {
if (!started) {
log.info("Producer is not started on {}, failed to publish {}", topicName, event);
future.completeExceptionally(new IllegalStateException("producer is not started yet"));
}
producer.newMessage().value(event).sendAsync().thenAccept(__ -> {
log.info("successfully published metadata change event {}", event);
future.complete(null);
}).exceptionally(ex -> {
log.warn("failed to publish metadata update {}, will retry in {}", topicName, MESSAGE_RATE_BACKOFF_MS, ex);
pulsar.getBrokerService().executor().schedule(() -> publishAsync(event, future), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
return null;
});
}
private void startProducer() {
log.info("[{}] Starting producer", topicName);
client.newProducer(Schema.AVRO(MetadataEvent.class)).topic(topicName)
.messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false)
.sendTimeout(0, TimeUnit.SECONDS) //
.maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod -> {
producer = prod;
started = true;
log.info("producer is created successfully {}", topicName);
}).exceptionally(ex -> {
long waitTimeMs = backOff.next();
log.warn("[{}] Failed to create producer ({}), retrying in {} s", topicName, ex.getMessage(),
waitTimeMs / 1000.0);
// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
return null;
});
}
private void startConsumer() {
if (consumer != null) {
return;
}
ConsumerBuilder<MetadataEvent> consumerBuilder = client.newConsumer(Schema.AVRO(MetadataEvent.class))
.topic(topicName).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Failover).messageListener((c, msg) -> {
log.info("Processing metadata event for {} with listeners {}", msg.getValue().getPath(),
listeners.size());
try {
if (listeners.size() == 0) {
c.acknowledgeAsync(msg);
return;
}
if (listeners.size() == 1) {
listeners.get(0).apply(msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg))
.exceptionally(ex -> {
log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName,
ex.getCause());
return null;
});
} else {
FutureUtil
.waitForAll(listeners.stream().map(listener -> listener.apply(msg.getValue()))
.collect(Collectors.toList()))
.thenApply(__ -> c.acknowledgeAsync(msg)).exceptionally(ex -> {
log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName);
return null;
});
}
} catch (Exception e) {
log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName);
}
});
consumerBuilder.subscribeAsync().thenAccept(consumer -> {
log.info("successfully created consumer {}", topicName);
this.consumer = consumer;
}).exceptionally(ex -> {
long waitTimeMs = backOff.next();
log.warn("[{}] Failed to create consumer ({}), retrying in {} s", topicName, ex.getMessage(),
waitTimeMs / 1000.0);
// BackOff before retrying
brokerService.executor().schedule(this::startConsumer, waitTimeMs, TimeUnit.MILLISECONDS);
return null;
});
}
public boolean isStarted() {
return started;
}
@Override
public void close() {
started = false;
if (producer != null) {
producer.closeAsync();
producer = null;
}
if (consumer != null) {
consumer.closeAsync();
consumer = null;
}
}
}