blob: 8e6725e93af91bf5bb799357f7130cc05920dec2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionInstanceId;
/**
* This class managers all aspects of functions assignments and running of function assignments for this worker.
*/
@Slf4j
public class FunctionRuntimeManager implements AutoCloseable {
// all assignments
// WorkerId -> Function Fully Qualified InstanceId -> List<Assignments>
Map<String, Map<String, Assignment>> workerIdToAssignments = new ConcurrentHashMap<>();
// All the runtime info related to functions executed by this worker
// Fully Qualified InstanceId - > FunctionRuntimeInfo
class FunctionRuntimeInfos {
private Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<>();
public FunctionRuntimeInfo get(String fullyQualifiedInstanceId) {
return functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
}
public void put(String fullyQualifiedInstanceId, FunctionRuntimeInfo functionRuntimeInfo) {
if (!isInitializePhase) {
functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
}
}
public void remove (String fullyQualifiedInstanceId) {
if (!isInitializePhase) {
functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
}
}
public Map<String, FunctionRuntimeInfo> getAll() {
return functionRuntimeInfoMap;
}
public int size() {
return functionRuntimeInfoMap.size();
}
}
final FunctionRuntimeInfos functionRuntimeInfos = new FunctionRuntimeInfos();
@Getter
final WorkerConfig workerConfig;
@Setter
@Getter
private FunctionActioner functionActioner;
@Getter
private RuntimeFactory runtimeFactory;
private MembershipManager membershipManager;
private final PulsarAdmin functionAdmin;
@Getter
private PulsarWorkerService workerService;
boolean isInitializePhase = false;
@Getter
private final CompletableFuture<Void> isInitialized = new CompletableFuture<>();
private final FunctionMetaDataManager functionMetaDataManager;
private final WorkerStatsManager workerStatsManager;
private final ErrorNotifier errorNotifier;
public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarWorkerService workerService, Namespace dlogNamespace,
MembershipManager membershipManager, ConnectorsManager connectorsManager,
FunctionsManager functionsManager,
FunctionMetaDataManager functionMetaDataManager,
WorkerStatsManager workerStatsManager, ErrorNotifier errorNotifier) throws Exception {
this.workerConfig = workerConfig;
this.workerService = workerService;
this.functionAdmin = workerService.getFunctionAdmin();
SecretsProviderConfigurator secretsProviderConfigurator;
if (!StringUtils.isEmpty(workerConfig.getSecretsProviderConfiguratorClassName())) {
secretsProviderConfigurator = (SecretsProviderConfigurator) Reflections.createInstance(workerConfig
.getSecretsProviderConfiguratorClassName(), ClassLoader.getSystemClassLoader());
} else {
secretsProviderConfigurator = new DefaultSecretsProviderConfigurator();
}
log.info("Initializing secrets provider configurator {} with configs: {}",
secretsProviderConfigurator.getClass().getName(), workerConfig.getSecretsProviderConfiguratorConfig());
secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
Optional<FunctionAuthProvider> functionAuthProvider = Optional.empty();
AuthenticationConfig authConfig = null;
if (workerConfig.isAuthenticationEnabled()) {
authConfig = AuthenticationConfig.builder()
.clientAuthenticationPlugin(workerConfig.getBrokerClientAuthenticationPlugin())
.clientAuthenticationParameters(workerConfig.getBrokerClientAuthenticationParameters())
.tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath())
.useTls(workerConfig.isUseTls())
.tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection())
.tlsHostnameVerificationEnable(workerConfig.isTlsEnableHostnameVerification())
.build();
//initialize function authentication provider
if (!StringUtils.isEmpty(workerConfig.getFunctionAuthProviderClassName())) {
functionAuthProvider = Optional.of(FunctionAuthProvider
.getAuthProvider(workerConfig.getFunctionAuthProviderClassName()));
}
}
// initialize the runtime customizer
Optional<RuntimeCustomizer> runtimeCustomizer = Optional.empty();
if (!StringUtils.isEmpty(workerConfig.getRuntimeCustomizerClassName())) {
runtimeCustomizer = Optional.of(RuntimeCustomizer
.getRuntimeCustomizer(workerConfig.getRuntimeCustomizerClassName()));
runtimeCustomizer.get().initialize(Optional.ofNullable(
workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
}
// initialize function runtime factory
if (!StringUtils.isEmpty(workerConfig.getFunctionRuntimeFactoryClassName())) {
this.runtimeFactory = RuntimeFactory.getFuntionRuntimeFactory(
workerConfig.getFunctionRuntimeFactoryClassName());
} else {
if (workerConfig.getThreadContainerFactory() != null) {
this.runtimeFactory = new ThreadRuntimeFactory();
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
workerConfig.getThreadContainerFactory(), Map.class));
} else if (workerConfig.getProcessContainerFactory() != null) {
this.runtimeFactory = new ProcessRuntimeFactory();
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
workerConfig.getProcessContainerFactory(), Map.class));
} else if (workerConfig.getKubernetesContainerFactory() != null) {
this.runtimeFactory = new KubernetesRuntimeFactory();
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
workerConfig.getKubernetesContainerFactory(), Map.class));
} else {
throw new RuntimeException("A Function Runtime Factory needs to be set");
}
}
// initialize runtime
this.runtimeFactory.initialize(workerConfig, authConfig,
secretsProviderConfigurator, connectorsManager, functionsManager,
functionAuthProvider, runtimeCustomizer);
this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin());
this.membershipManager = membershipManager;
this.functionMetaDataManager = functionMetaDataManager;
this.workerStatsManager = workerStatsManager;
this.errorNotifier = errorNotifier;
}
/**
* Initializes the FunctionRuntimeManager. Does the following:
* 1. Consume all existing assignments to establish existing/latest set of assignments
* 2. After current assignments are read, assignments belonging to this worker will be processed
*
* @return the message id of the message processed during init phase
*/
public MessageId initialize() {
try (Reader<byte[]> reader = WorkerUtils.createReader(
workerService.getClient().newReader(),
workerConfig.getWorkerId() + "-function-assignment-initialize",
workerConfig.getFunctionAssignmentTopic(),
MessageId.earliest)) {
// start init phase
this.isInitializePhase = true;
// keep track of the last message read
MessageId lastMessageRead = MessageId.earliest;
// read all existing messages
while (reader.hasMessageAvailable()) {
Message<byte[]> message = reader.readNext();
lastMessageRead = message.getMessageId();
processAssignmentMessage(message);
}
// init phase is done
this.isInitializePhase = false;
// realize existing assignments
Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId());
if (assignmentMap != null) {
for (Assignment assignment : assignmentMap.values()) {
if (needsStart(assignment)) {
startFunctionInstance(assignment);
}
}
}
// complete future to indicate initialization is complete
isInitialized.complete(null);
return lastMessageRead;
} catch (Exception e) {
log.error("Failed to initialize function runtime manager: {}", e.getMessage(), e);
throw new RuntimeException(e);
}
}
/**
* Get current assignments.
*
* @return a map of current assignments in the following format
* {workerId : {FullyQualifiedInstanceId : Assignment}}
*/
public synchronized Map<String, Map<String, Assignment>> getCurrentAssignments() {
Map<String, Map<String, Assignment>> copy = new HashMap<>();
for (Map.Entry<String, Map<String, Assignment>> entry : this.workerIdToAssignments.entrySet()) {
Map<String, Assignment> tmp = new HashMap<>();
tmp.putAll(entry.getValue());
copy.put(entry.getKey(), tmp);
}
return copy;
}
/**
* Find a assignment of a function.
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @return the assignment of the function
*/
public synchronized Assignment findFunctionAssignment(String tenant, String namespace,
String functionName, int instanceId) {
return this.findAssignment(tenant, namespace, functionName, instanceId);
}
/**
* Find all instance assignments of function.
* @param tenant
* @param namespace
* @param functionName
* @return
*/
public synchronized Collection<Assignment> findFunctionAssignments(String tenant,
String namespace, String functionName) {
return findFunctionAssignments(tenant, namespace, functionName, this.workerIdToAssignments);
}
public static Collection<Assignment> findFunctionAssignments(String tenant,
String namespace, String functionName,
Map<String, Map<String, Assignment>>
workerIdToAssignments) {
Collection<Assignment> assignments = new LinkedList<>();
for (Map<String, Assignment> entryMap : workerIdToAssignments.values()) {
assignments.addAll(entryMap.values().stream()
.filter(
assignment ->
(tenant.equals(assignment.getInstance()
.getFunctionMetaData().getFunctionDetails()
.getTenant())
&& namespace.equals((assignment.getInstance()
.getFunctionMetaData().getFunctionDetails()
.getNamespace()))
&& functionName.equals(assignment.getInstance()
.getFunctionMetaData().getFunctionDetails()
.getName())))
.collect(Collectors.toList()));
}
return assignments;
}
/**
* Removes a collection of assignments.
*
* @param assignments assignments to remove
*/
public synchronized void removeAssignments(Collection<Assignment> assignments) {
for (Assignment assignment : assignments) {
this.deleteAssignment(assignment);
}
}
public synchronized void restartFunctionInstance(String tenant, String namespace,
String functionName, int instanceId, URI uri) throws Exception {
if (runtimeFactory.externallyManaged()) {
throw new WebApplicationException(Response.serverError().status(Status.NOT_IMPLEMENTED)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData("Externally managed schedulers can't do per instance stop")).build());
}
Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId);
if (assignment == null) {
throw new WebApplicationException(Response.serverError().status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " doesn't exist")).build());
}
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
if (assignedWorkerId.equals(workerId)) {
stopFunction(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), true);
return;
} else {
// query other worker
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry : workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
throw new WebApplicationException(Response.serverError().status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build());
}
if (uri == null) {
throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname())
.port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
}
public synchronized void restartFunctionInstances(String tenant, String namespace, String functionName)
throws Exception {
final String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName);
Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
if (assignments.isEmpty()) {
throw new WebApplicationException(Response.serverError().status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build());
}
if (runtimeFactory.externallyManaged()) {
Assignment assignment = assignments.iterator().next();
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
if (assignedWorkerId.equals(workerId)) {
stopFunction(fullyQualifiedInstanceId, true);
} else {
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry : workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] has not been assigned yet", fullyQualifiedInstanceId);
}
throw new WebApplicationException(Response.serverError().status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build());
}
restartFunctionUsingPulsarAdmin(assignment, tenant, namespace, functionName, true);
}
} else {
for (Assignment assignment : assignments) {
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
if (assignedWorkerId.equals(workerId)) {
stopFunction(fullyQualifiedInstanceId, true);
} else {
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry : workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
log.warn("[{}] has not been assigned yet, assignment: [{}], workerList: [{}]",
fullyQualifiedInstanceId, assignment, workerInfoList);
continue;
}
restartFunctionUsingPulsarAdmin(assignment, tenant, namespace, functionName, false);
}
}
}
return;
}
/**
* Restart the entire function or restart a single instance of the function.
*/
@VisibleForTesting
void restartFunctionUsingPulsarAdmin(Assignment assignment, String tenant, String namespace,
String functionName, boolean restartEntireFunction)
throws PulsarAdminException {
ComponentType componentType = assignment.getInstance().getFunctionMetaData()
.getFunctionDetails().getComponentType();
if (restartEntireFunction) {
if (ComponentType.SOURCE == componentType) {
this.functionAdmin.sources().restartSource(tenant, namespace, functionName);
} else if (ComponentType.SINK == componentType) {
this.functionAdmin.sinks().restartSink(tenant, namespace, functionName);
} else {
this.functionAdmin.functions().restartFunction(tenant, namespace, functionName);
}
} else {
// only restart single instance
if (ComponentType.SOURCE == componentType) {
this.functionAdmin.sources().restartSource(tenant, namespace, functionName,
assignment.getInstance().getInstanceId());
} else if (ComponentType.SINK == componentType) {
this.functionAdmin.sinks().restartSink(tenant, namespace, functionName,
assignment.getInstance().getInstanceId());
} else {
this.functionAdmin.functions().restartFunction(tenant, namespace, functionName,
assignment.getInstance().getInstanceId());
}
}
}
/**
* It stops all functions instances owned by current worker.
* @throws Exception
*/
public void stopAllOwnedFunctions() {
if (runtimeFactory.externallyManaged()) {
log.warn("Will not stop any functions since they are externally managed");
return;
}
final String workerId = this.workerConfig.getWorkerId();
Map<String, Assignment> assignments = workerIdToAssignments.get(workerId);
if (assignments != null) {
// Take a copy of the map since the stopFunction will modify the same map
// and invalidate the iterator
Map<String, Assignment> copiedAssignments = new TreeMap<>(assignments);
copiedAssignments.values().forEach(assignment -> {
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
try {
stopFunction(fullyQualifiedInstanceId, false);
} catch (Exception e) {
log.warn("Failed to stop function {} - {}", fullyQualifiedInstanceId, e.getMessage());
}
});
}
}
@VisibleForTesting
void stopFunction(String fullyQualifiedInstanceId, boolean restart) throws Exception {
log.info("[{}] {}..", restart ? "restarting" : "stopping", fullyQualifiedInstanceId);
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
if (functionRuntimeInfo != null) {
this.conditionallyStopFunction(functionRuntimeInfo);
try {
if (restart) {
this.conditionallyStartFunction(functionRuntimeInfo);
}
} catch (Exception ex) {
log.info("{} Error re-starting function", fullyQualifiedInstanceId, ex);
functionRuntimeInfo.setStartupException(ex);
throw ex;
}
}
}
/**
* Get stats of a function instance. If this worker is not running the function instance.
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @param instanceId the function instance id
* @return jsonObject containing stats for instance
*/
public FunctionInstanceStatsDataImpl getFunctionInstanceStats(String tenant, String namespace,
String functionName, int instanceId, URI uri) {
Assignment assignment;
if (runtimeFactory.externallyManaged()) {
assignment = this.findAssignment(tenant, namespace, functionName, -1);
} else {
assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
}
if (assignment == null) {
return new FunctionInstanceStatsDataImpl();
}
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
// If I am running worker
if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
if (runtimeSpawner != null) {
return (FunctionInstanceStatsDataImpl)
WorkerUtils.getFunctionInstanceStats(
FunctionCommon.getFullyQualifiedInstanceId(
assignment.getInstance()), functionRuntimeInfo, instanceId).getMetrics();
}
return new FunctionInstanceStatsDataImpl();
} else {
// query other worker
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry : workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
return new FunctionInstanceStatsDataImpl();
}
if (uri == null) {
throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname())
.port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
}
/**
* Get stats of all function instances.
*
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @return a list of function statuses
* @throws PulsarAdminException
*/
public FunctionStatsImpl getFunctionStats(String tenant, String namespace,
String functionName, URI uri) throws PulsarAdminException {
Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
FunctionStatsImpl functionStats = new FunctionStatsImpl();
if (assignments.isEmpty()) {
return functionStats;
}
if (runtimeFactory.externallyManaged()) {
Assignment assignment = assignments.iterator().next();
boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
if (isOwner) {
int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
for (int i = 0; i < parallelism; ++i) {
FunctionInstanceStatsDataImpl functionInstanceStatsData =
getFunctionInstanceStats(tenant, namespace, functionName, i, null);
FunctionInstanceStatsImpl functionInstanceStats = new FunctionInstanceStatsImpl();
functionInstanceStats.setInstanceId(i);
functionInstanceStats.setMetrics(functionInstanceStatsData);
functionStats.addInstance(functionInstanceStats);
}
} else {
// find the hostname/port of the worker who is the owner
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry : workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
return functionStats;
}
if (uri == null) {
throw new WebApplicationException(Response.serverError()
.status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri)
.host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
} else {
for (Assignment assignment : assignments) {
boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
FunctionInstanceStatsDataImpl functionInstanceStatsData;
if (isOwner) {
functionInstanceStatsData = getFunctionInstanceStats(tenant, namespace, functionName,
assignment.getInstance().getInstanceId(), null);
} else {
functionInstanceStatsData =
(FunctionInstanceStatsDataImpl) this.functionAdmin.functions().getFunctionStats(
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
assignment.getInstance().getInstanceId());
}
FunctionInstanceStatsImpl functionInstanceStats = new FunctionInstanceStatsImpl();
functionInstanceStats.setInstanceId(assignment.getInstance().getInstanceId());
functionInstanceStats.setMetrics(functionInstanceStatsData);
functionStats.addInstance(functionInstanceStats);
}
}
return functionStats.calculateOverall();
}
public synchronized void processAssignmentMessage(Message<byte[]> msg) {
if (msg.getData() == null || (msg.getData().length == 0)) {
log.info("Received assignment delete: {}", msg.getKey());
deleteAssignment(msg.getKey());
} else {
Assignment assignment;
try {
assignment = Assignment.parseFrom(msg.getData());
} catch (IOException e) {
log.error("[{}] Received bad assignment update at message {}", msg.getMessageId(), e);
throw new RuntimeException(e);
}
log.info("Received assignment update: {}", assignment);
processAssignment(assignment);
}
}
/**
* Process an assignment update from the assignment topic.
* @param newAssignment the assignment
*/
public synchronized void processAssignment(Assignment newAssignment) {
boolean exists = false;
for (Map<String, Assignment> entry : this.workerIdToAssignments.values()) {
if (entry.containsKey(FunctionCommon.getFullyQualifiedInstanceId(newAssignment.getInstance()))) {
exists = true;
}
}
if (exists) {
updateAssignment(newAssignment);
} else {
addAssignment(newAssignment);
}
}
private void updateAssignment(Assignment assignment) {
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
Assignment existingAssignment = this.findAssignment(assignment);
// potential updates need to happen
if (!existingAssignment.equals(assignment)) {
FunctionRuntimeInfo functionRuntimeInfo = get_FunctionRuntimeInfo(fullyQualifiedInstanceId);
// for externally managed functions we don't really care about which worker
// the function instance is assigned to
// and we don't really need to stop and start the function instance
// because the worker its assigned to changed
// we just need to update the locally cached assignment info.
// We only need to stop and start when there are
// changes to the function meta data of the instance
if (runtimeFactory.externallyManaged()) {
// change in metadata thus need to potentially restart
if (!assignment.getInstance().equals(existingAssignment.getInstance())) {
//stop function
if (functionRuntimeInfo != null) {
this.conditionallyStopFunction(functionRuntimeInfo);
}
// still assigned to me, need to restart
if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
if (needsStart(assignment)) {
//start again
FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo();
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
this.conditionallyStartFunction(newFunctionRuntimeInfo);
this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
}
} else {
this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
}
} else {
// if assignment got transferred to me just set function runtime
if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo();
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
RuntimeSpawner runtimeSpawner = functionActioner.getRuntimeSpawner(
assignment.getInstance(),
assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath(),
assignment
.getInstance()
.getFunctionMetaData()
.getTransformFunctionPackageLocation()
.getPackagePath());
// re-initialize if necessary
runtimeSpawner.getRuntime().reinitialize();
newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
} else {
this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
}
}
} else {
//stop function
if (functionRuntimeInfo != null) {
this.conditionallyStopFunction(functionRuntimeInfo);
}
// still assigned to me, need to restart
if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
if (needsStart(assignment)) {
//start again
FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo();
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
this.conditionallyStartFunction(newFunctionRuntimeInfo);
this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
}
} else {
this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
}
}
// find existing assignment
Assignment existingAssignments = this.findAssignment(assignment);
if (existingAssignments != null) {
// delete old assignment that could have old data
this.deleteAssignment(existingAssignments);
}
// set to newest assignment
this.setAssignment(assignment);
}
}
public synchronized void deleteAssignment(String fullyQualifiedInstanceId) {
FunctionRuntimeInfo functionRuntimeInfo = get_FunctionRuntimeInfo(fullyQualifiedInstanceId);
if (functionRuntimeInfo != null) {
Function.FunctionDetails functionDetails = functionRuntimeInfo
.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
// check if this is part of a function delete operation or update operation
// TODO could be a race condition here if functionMetaDataTailer
// somehow does not receive the functionMeta prior
// to the functionAssignmentsTailer gets the assignment for the function.
if (this.functionMetaDataManager.containsFunction(functionDetails.getTenant(),
functionDetails.getNamespace(), functionDetails.getName())) {
// function still exists thus probably an update or stop operation
this.conditionallyStopFunction(functionRuntimeInfo);
} else {
// function doesn't exist anymore thus we should terminate
FunctionInstanceId functionInstanceId =
new FunctionInstanceId(fullyQualifiedInstanceId);
String name = functionInstanceId.getName();
String namespace = functionInstanceId.getNamespace();
String tenant = functionInstanceId.getTenant();
// only run the termination logic if
// this is the last/only instance from a function left on the worker
Collection<Assignment> assignments = findFunctionAssignments(tenant, namespace, name, this
.workerIdToAssignments);
if (assignments.size() > 1) {
this.conditionallyStopFunction(functionRuntimeInfo);
} else {
this.conditionallyTerminateFunction(functionRuntimeInfo);
}
}
this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
}
String workerId = null;
for (Entry<String, Map<String, Assignment>> workerAssignments : workerIdToAssignments.entrySet()) {
if (workerAssignments.getValue().remove(fullyQualifiedInstanceId) != null) {
workerId = workerAssignments.getKey();
break;
}
}
Map<String, Assignment> worker;
if (workerId != null && ((worker = workerIdToAssignments.get(workerId)) != null && worker.isEmpty())) {
this.workerIdToAssignments.remove(workerId);
}
}
void deleteAssignment(Assignment assignment) {
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
Map<String, Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId());
if (assignmentMap != null) {
assignmentMap.remove(fullyQualifiedInstanceId);
if (assignmentMap.isEmpty()) {
this.workerIdToAssignments.remove(assignment.getWorkerId());
}
}
}
private void addAssignment(Assignment assignment) {
//add new function
this.setAssignment(assignment);
//Assigned to me
if (assignment.getWorkerId().equals(workerConfig.getWorkerId()) && needsStart(assignment)) {
startFunctionInstance(assignment);
}
}
private void startFunctionInstance(Assignment assignment) {
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
FunctionRuntimeInfo functionRuntimeInfo = get_FunctionRuntimeInfo(fullyQualifiedInstanceId);
if (functionRuntimeInfo == null) {
functionRuntimeInfo = new FunctionRuntimeInfo()
.setFunctionInstance(assignment.getInstance());
this.functionRuntimeInfos.put(fullyQualifiedInstanceId, functionRuntimeInfo);
} else {
//Somehow this function is already started
log.warn("Function {} already running. Going to restart function.",
functionRuntimeInfo);
this.conditionallyStopFunction(functionRuntimeInfo);
}
this.conditionallyStartFunction(functionRuntimeInfo);
}
public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
return this.functionRuntimeInfos.getAll();
}
/**
* Private methods for internal use. Should not be used outside of this class
*/
private Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) {
String fullyQualifiedInstanceId =
FunctionCommon.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId);
for (Map.Entry<String, Map<String, Assignment>> entry : this.workerIdToAssignments.entrySet()) {
Map<String, Assignment> assignmentMap = entry.getValue();
Assignment existingAssignment = assignmentMap.get(fullyQualifiedInstanceId);
if (existingAssignment != null) {
return existingAssignment;
}
}
return null;
}
private Assignment findAssignment(Assignment assignment) {
return findAssignment(
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
assignment.getInstance().getInstanceId()
);
}
@VisibleForTesting
void setAssignment(Assignment assignment) {
if (!this.workerIdToAssignments.containsKey(assignment.getWorkerId())) {
this.workerIdToAssignments.put(assignment.getWorkerId(), new HashMap<>());
}
this.workerIdToAssignments.get(assignment.getWorkerId()).put(
FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()),
assignment);
}
@Override
public void close() throws Exception {
stopAllOwnedFunctions();
if (runtimeFactory != null) {
runtimeFactory.close();
}
}
public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
return get_FunctionRuntimeInfo(fullyQualifiedInstanceId);
}
private FunctionRuntimeInfo get_FunctionRuntimeInfo(String fullyQualifiedInstanceId) {
FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfos.get(fullyQualifiedInstanceId);
// sanity check to make sure assignments and runtimeinfo is in sync
if (functionRuntimeInfo == null) {
if (this.workerIdToAssignments.containsValue(workerConfig.getWorkerId())
&& this.workerIdToAssignments.get(workerConfig.getWorkerId())
.containsValue(fullyQualifiedInstanceId)) {
log.error("Assignments and RuntimeInfos are inconsistent."
+ "FunctionRuntimeInfo missing for " + fullyQualifiedInstanceId);
}
}
return functionRuntimeInfo;
}
private boolean needsStart(Assignment assignment) {
boolean toStart = false;
Function.FunctionMetaData functionMetaData = assignment.getInstance().getFunctionMetaData();
if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) {
toStart = true;
} else {
if (assignment.getInstance().getInstanceId() < 0) {
// for externally managed functions, insert the start only if there is atleast one
// instance that needs to be started
for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) {
if (state == Function.FunctionState.RUNNING) {
toStart = true;
break;
}
}
} else {
if (functionMetaData.getInstanceStatesOrDefault(assignment.getInstance().getInstanceId(),
Function.FunctionState.RUNNING) == Function.FunctionState.RUNNING) {
toStart = true;
}
}
}
return toStart;
}
private void conditionallyStartFunction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase) {
workerStatsManager.startInstanceProcessTimeStart();
this.functionActioner.startFunction(functionRuntimeInfo);
workerStatsManager.startInstanceProcessTimeEnd();
}
}
private void conditionallyStopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase) {
workerStatsManager.stopInstanceProcessTimeStart();
this.functionActioner.stopFunction(functionRuntimeInfo);
workerStatsManager.stopInstanceProcessTimeEnd();
}
}
private void conditionallyTerminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase) {
workerStatsManager.startInstanceProcessTimeStart();
this.functionActioner.terminateFunction(functionRuntimeInfo);
workerStatsManager.startInstanceProcessTimeEnd();
}
}
/** Methods for metrics. **/
public int getMyInstances() {
Map<String, Assignment> myAssignments = workerIdToAssignments.get(workerConfig.getWorkerId());
return myAssignments == null ? 0 : myAssignments.size();
}
}