blob: 944b4e8e34491f900e2e41e290254e4e2f3f65b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.FunctionCommon;
/**
* FunctionMetaDataManager maintains a global state of all function metadata.
* It is the system of record for the worker for function metadata.
* FunctionMetaDataManager operates in either the leader mode or worker mode.
* By default, when you initialize and start manager, it starts in the worker mode.
* In the worker mode, the FunctionMetaDataTailer tails the function metadata topic
* and updates the in-memory metadata cache.
* When the worker becomes a leader, it calls the acquireLeadaership thru which
* the FunctionMetaData Manager switches to a leader mode. In the leader mode
* the manager first captures an exclusive producer on the the metadata topic.
* Then it drains the MetaDataTailer to ensure that it has caught up to the last record.
* After this point, the worker can update the in-memory state of function metadata
* by calling processUpdate/processDeregister methods.
* If a worker loses its leadership, it calls giveupLeaderShip at which time the
* manager closes its exclusive producer and starts its tailer again.
*/
@Slf4j
public class FunctionMetaDataManager implements AutoCloseable {
// Represents the global state
// tenant -> namespace -> (function name, FunctionRuntimeInfo)
final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<>();
private final SchedulerManager schedulerManager;
private final WorkerConfig workerConfig;
private final PulsarClient pulsarClient;
private final ErrorNotifier errorNotifier;
private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
// The producer of the metadata topic when we are the leader.
// Note that this variable serves a double duty. A non-null value
// implies we are the leader, while a null value means we are not the leader
private Producer exclusiveLeaderProducer;
@Getter
private volatile MessageId lastMessageSeen = MessageId.earliest;
private static final String versionTag = "version";
@Getter
private CompletableFuture<Void> isInitialized = new CompletableFuture<>();
public FunctionMetaDataManager(WorkerConfig workerConfig,
SchedulerManager schedulerManager,
PulsarClient pulsarClient,
ErrorNotifier errorNotifier) throws PulsarClientException {
this.workerConfig = workerConfig;
this.pulsarClient = pulsarClient;
this.schedulerManager = schedulerManager;
this.errorNotifier = errorNotifier;
exclusiveLeaderProducer = null;
}
/**
* Public methods. Please use these methods if references FunctionMetaManager from an external class
*/
/**
* Initializes the FunctionMetaDataManager.
* We create a new reader
*/
public synchronized void initialize() {
try (Reader reader = FunctionMetaDataTopicTailer.createReader(
workerConfig, pulsarClient.newReader(), MessageId.earliest)) {
// read all existing messages
while (reader.hasMessageAvailable()) {
processMetaDataTopicMessage(reader.readNext());
}
this.isInitialized.complete(null);
} catch (Exception e) {
log.error("Failed to initialize meta data store", e);
throw new RuntimeException("Failed to initialize Metadata Manager", e);
}
log.info("FunctionMetaData Manager initialization complete");
}
// Starts the tailer if we are in non-leader mode
public synchronized void start() {
if (exclusiveLeaderProducer == null) {
try {
// This means that we are in non-leader mode. start function metadata tailer
initializeTailer();
} catch (PulsarClientException e) {
throw new RuntimeException("Could not start MetaData topic tailer", e);
}
}
}
@Override
public void close() throws Exception {
if (this.functionMetaDataTopicTailer != null) {
this.functionMetaDataTopicTailer.close();
}
if (this.exclusiveLeaderProducer != null) {
this.exclusiveLeaderProducer.close();
}
}
/**
* Get the function metadata for a function.
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @return FunctionMetaData that contains the function metadata
*/
public synchronized FunctionMetaData getFunctionMetaData(String tenant, String namespace, String functionName) {
return this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
}
/**
* Get a list of all the meta for every function.
* @return list of function metadata
*/
public synchronized List<FunctionMetaData> getAllFunctionMetaData() {
List<FunctionMetaData> ret = new LinkedList<>();
for (Map<String, Map<String, FunctionMetaData>> i : this.functionMetaDataMap.values()) {
for (Map<String, FunctionMetaData> j : i.values()) {
ret.addAll(j.values());
}
}
return ret;
}
/**
* List all the functions in a namespace.
* @param tenant the tenant the namespace belongs to
* @param namespace the namespace
* @return a list of function names
*/
public synchronized Collection<FunctionMetaData> listFunctions(String tenant, String namespace) {
List<FunctionMetaData> ret = new LinkedList<>();
if (!this.functionMetaDataMap.containsKey(tenant)) {
return ret;
}
if (!this.functionMetaDataMap.get(tenant).containsKey(namespace)) {
return ret;
}
for (FunctionMetaData functionMetaData : this.functionMetaDataMap.get(tenant).get(namespace).values()) {
ret.add(functionMetaData);
}
return ret;
}
/**
* Check if the function exists.
* @param tenant tenant that the function belongs to
* @param namespace namespace that the function belongs to
* @param functionName name of function
* @return true if function exists and false if it does not
*/
public synchronized boolean containsFunction(String tenant, String namespace, String functionName) {
return containsFunctionMetaData(tenant, namespace, functionName);
}
/**
* Called by the worker when we are in the leader mode. In this state, we update our in-memory
* data structures and then write to the metadata topic.
* @param functionMetaData The function metadata in question
* @param delete Is this a delete operation
* @throws IllegalStateException if we are not the leader
* @throws IllegalArgumentException if the request is out of date.
*/
public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
throws IllegalStateException, IllegalArgumentException {
boolean needsScheduling;
if (exclusiveLeaderProducer == null) {
throw new IllegalStateException("Not the leader");
}
// Check first to avoid local cache update failure
checkRequestOutDated(functionMetaData, delete);
byte[] toWrite;
if (workerConfig.getUseCompactedMetadataTopic()) {
if (delete) {
toWrite = "".getBytes();
} else {
toWrite = functionMetaData.toByteArray();
}
} else {
Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
.setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE
: Request.ServiceRequest.ServiceRequestType.UPDATE)
.setFunctionMetaData(functionMetaData)
.setWorkerId(workerConfig.getWorkerId())
.setRequestId(UUID.randomUUID().toString())
.build();
toWrite = serviceRequest.toByteArray();
}
try {
TypedMessageBuilder builder = exclusiveLeaderProducer.newMessage()
.value(toWrite)
.property(versionTag, Long.toString(functionMetaData.getVersion()));
if (workerConfig.getUseCompactedMetadataTopic()) {
builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
}
lastMessageSeen = builder.send();
if (delete) {
needsScheduling = processDeregister(functionMetaData);
} else {
needsScheduling = processUpdate(functionMetaData);
}
} catch (Exception e) {
log.error("Could not write into Function Metadata topic", e);
throw new IllegalStateException("Internal Error updating function at the leader", e);
}
if (needsScheduling) {
this.schedulerManager.schedule();
}
}
private void checkRequestOutDated(FunctionMetaData functionMetaData, boolean delete) {
Function.FunctionDetails details = functionMetaData.getFunctionDetails();
if (isRequestOutdated(details.getTenant(), details.getNamespace(),
details.getName(), functionMetaData.getVersion())) {
if (log.isDebugEnabled()) {
log.debug("{}/{}/{} Ignoring outdated request version: {}", details.getTenant(), details.getNamespace(),
details.getName(), functionMetaData.getVersion());
}
if (delete) {
throw new IllegalArgumentException(
"Delete request ignored because it is out of date. Please try again.");
}
throw new IllegalArgumentException("Update request ignored because it is out of date. Please try again.");
}
}
/**
* Acquires a exclusive producer. This method cannot return null. It can only return a valid exclusive producer
* or throw NotLeaderAnymore exception.
* @param isLeader if the worker is still the leader
* @return A valid exclusive producer
* @throws WorkerUtils.NotLeaderAnymore if the worker is no longer the leader.
*/
public Producer<byte[]> acquireExclusiveWrite(Supplier<Boolean> isLeader) throws WorkerUtils.NotLeaderAnymore {
// creates exclusive producer for metadata topic
return WorkerUtils.createExclusiveProducerWithRetry(
pulsarClient,
workerConfig.getFunctionMetadataTopic(),
workerConfig.getWorkerId() + "-leader",
isLeader, 1000);
}
/**
* Called by the leader service when this worker becomes the leader.
* We first get exclusive producer on the metadata topic. Next we drain the tailer
* to ensure that we have caught up to metadata topic. After which we close the tailer.
* Note that this method cannot be syncrhonized because the tailer might still be processing messages
*/
public void acquireLeadership(Producer<byte[]> exclusiveProducer) {
log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
if (exclusiveLeaderProducer != null) {
log.error("FunctionMetaData Manager entered invalid state");
errorNotifier.triggerError(new IllegalStateException());
}
this.exclusiveLeaderProducer = exclusiveProducer;
FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
this.functionMetaDataTopicTailer = null;
// Now that we have created the exclusive producer, wait for reader to get over
if (tailer != null) {
try {
tailer.stopWhenNoMoreMessages().get();
} catch (Exception e) {
log.error("Error while waiting for metadata tailer thread to finish", e);
errorNotifier.triggerError(e);
}
tailer.close();
}
log.info("FunctionMetaDataManager done becoming leader");
}
/**
* called by the leader service when we lose leadership. We close the exclusive producer
* and start the tailer.
*/
public synchronized void giveupLeadership() {
log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
try {
exclusiveLeaderProducer.close();
exclusiveLeaderProducer = null;
initializeTailer();
} catch (PulsarClientException e) {
log.error("Error closing exclusive producer", e);
errorNotifier.triggerError(e);
}
}
/**
* This is called by the MetaData tailer. It updates the in-memory cache.
* It eats up any exception thrown by processUpdate/processDeregister since
* that's just part of the state machine
* @param message The message read from metadata topic that needs to be processed
*/
public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
try {
if (workerConfig.getUseCompactedMetadataTopic()) {
processCompactedMetaDataTopicMessage(message);
} else {
processUncompactedMetaDataTopicMessage(message);
}
} catch (IllegalArgumentException e) {
// Its ok. Nothing much we can do about it
}
lastMessageSeen = message.getMessageId();
}
private void processUncompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
if (log.isDebugEnabled()) {
log.debug("Received Service Request: {}", serviceRequest);
}
switch (serviceRequest.getServiceRequestType()) {
case UPDATE:
this.processUpdate(serviceRequest.getFunctionMetaData());
break;
case DELETE:
this.processDeregister(serviceRequest.getFunctionMetaData());
break;
default:
log.warn("Received request with unrecognized type: {}", serviceRequest);
}
}
private void processCompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
long version = Long.parseLong(message.getProperty(versionTag));
String tenant = FunctionCommon.extractTenantFromFullyQualifiedName(message.getKey());
String namespace = FunctionCommon.extractNamespaceFromFullyQualifiedName(message.getKey());
String functionName = FunctionCommon.extractNameFromFullyQualifiedName(message.getKey());
if (message.getData() == null || message.getData().length == 0) {
// this is a delete message
this.processDeregister(tenant, namespace, functionName, version);
} else {
FunctionMetaData functionMetaData = FunctionMetaData.parseFrom(message.getData());
this.processUpdate(functionMetaData);
}
}
/**
* Private methods for internal use. Should not be used outside of this class
*/
private boolean containsFunctionMetaData(FunctionMetaData functionMetaData) {
return containsFunctionMetaData(functionMetaData.getFunctionDetails());
}
private boolean containsFunctionMetaData(Function.FunctionDetails functionDetails) {
return containsFunctionMetaData(
functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
}
private boolean containsFunctionMetaData(String tenant, String namespace, String functionName) {
if (this.functionMetaDataMap.containsKey(tenant)) {
if (this.functionMetaDataMap.get(tenant).containsKey(namespace)) {
if (this.functionMetaDataMap.get(tenant).get(namespace).containsKey(functionName)) {
return true;
}
}
}
return false;
}
synchronized boolean processDeregister(FunctionMetaData deregisterRequestFs) throws IllegalArgumentException {
String functionName = deregisterRequestFs.getFunctionDetails().getName();
String tenant = deregisterRequestFs.getFunctionDetails().getTenant();
String namespace = deregisterRequestFs.getFunctionDetails().getNamespace();
return processDeregister(tenant, namespace, functionName, deregisterRequestFs.getVersion());
}
synchronized boolean processDeregister(String tenant, String namespace,
String functionName, long version) throws IllegalArgumentException {
boolean needsScheduling = false;
if (log.isDebugEnabled()) {
log.debug("Process deregister request: {}/{}/{}/{}", tenant, namespace, functionName, version);
}
// Check if we still have this function. Maybe already deleted by someone else
if (this.containsFunctionMetaData(tenant, namespace, functionName)) {
// check if request is outdated
if (!isRequestOutdated(tenant, namespace, functionName, version)) {
this.functionMetaDataMap.get(tenant).get(namespace).remove(functionName);
needsScheduling = true;
} else {
if (log.isDebugEnabled()) {
log.debug("{}/{}/{} Ignoring outdated request version: {}", tenant, namespace, functionName,
version);
}
throw new IllegalArgumentException("Delete request ignored because "
+ "it is out of date. Please try again.");
}
}
return needsScheduling;
}
synchronized boolean processUpdate(FunctionMetaData updateRequestFs) throws IllegalArgumentException {
log.debug("Process update request: {}", updateRequestFs);
boolean needsScheduling = false;
// Worker doesn't know about the function so far
if (!this.containsFunctionMetaData(updateRequestFs)) {
// Since this is the first time worker has seen function, just put it into internal function metadata store
setFunctionMetaData(updateRequestFs);
needsScheduling = true;
} else {
// The request is an update to an existing function since this worker already has a record of this function
// in its function metadata store
// Check if request is outdated
if (!isRequestOutdated(updateRequestFs)) {
// update the function metadata
setFunctionMetaData(updateRequestFs);
needsScheduling = true;
} else {
throw new IllegalArgumentException("Update request "
+ "ignored because it is out of date. Please try again.");
}
}
return needsScheduling;
}
private boolean isRequestOutdated(FunctionMetaData requestFunctionMetaData) {
Function.FunctionDetails functionDetails = requestFunctionMetaData.getFunctionDetails();
return isRequestOutdated(functionDetails.getTenant(), functionDetails.getNamespace(),
functionDetails.getName(), requestFunctionMetaData.getVersion());
}
private boolean isRequestOutdated(String tenant, String namespace, String functionName, long version) {
// avoid NPE
if (!containsFunctionMetaData(tenant, namespace, functionName)) {
return false;
}
FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(tenant)
.get(namespace).get(functionName);
return currentFunctionMetaData.getVersion() >= version;
}
void setFunctionMetaData(FunctionMetaData functionMetaData) {
Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
if (!this.functionMetaDataMap.containsKey(functionDetails.getTenant())) {
this.functionMetaDataMap.put(functionDetails.getTenant(), new ConcurrentHashMap<>());
}
if (!this.functionMetaDataMap.get(functionDetails.getTenant()).containsKey(functionDetails.getNamespace())) {
this.functionMetaDataMap.get(functionDetails.getTenant())
.put(functionDetails.getNamespace(), new ConcurrentHashMap<>());
}
this.functionMetaDataMap.get(functionDetails.getTenant())
.get(functionDetails.getNamespace()).put(functionDetails.getName(), functionMetaData);
}
private void initializeTailer() throws PulsarClientException {
this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
pulsarClient.newReader(), this.workerConfig, lastMessageSeen, this.errorNotifier);
this.functionMetaDataTopicTailer.start();
log.info("MetaData Manager Tailer started");
}
}