| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.functions.worker; |
| |
| import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; |
| import static org.apache.pulsar.common.policies.data.Policies.getBundles; |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.Sets; |
| import io.netty.util.concurrent.DefaultThreadFactory; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import javax.ws.rs.core.Response; |
| import lombok.Getter; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.clients.StorageClientBuilder; |
| import org.apache.bookkeeper.clients.admin.StorageAdminClient; |
| import org.apache.bookkeeper.clients.config.StorageClientSettings; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.distributedlog.DistributedLogConfiguration; |
| import org.apache.distributedlog.api.namespace.Namespace; |
| import org.apache.distributedlog.api.namespace.NamespaceBuilder; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.authentication.AuthenticationService; |
| import org.apache.pulsar.broker.authorization.AuthorizationService; |
| import org.apache.pulsar.broker.cache.ConfigurationCacheService; |
| import org.apache.pulsar.broker.resources.PulsarResources; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.common.conf.InternalConfigurationData; |
| import org.apache.pulsar.common.naming.NamedEntity; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.Policies; |
| import org.apache.pulsar.common.policies.data.RetentionPolicies; |
| import org.apache.pulsar.common.policies.data.TenantInfo; |
| import org.apache.pulsar.common.policies.path.PolicyPath; |
| import org.apache.pulsar.common.util.SimpleTextOutputStream; |
| import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; |
| import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2; |
| import org.apache.pulsar.functions.worker.rest.api.SinksImpl; |
| import org.apache.pulsar.functions.worker.rest.api.SourcesImpl; |
| import org.apache.pulsar.functions.worker.rest.api.WorkerImpl; |
| import org.apache.pulsar.functions.worker.service.api.Functions; |
| import org.apache.pulsar.functions.worker.service.api.FunctionsV2; |
| import org.apache.pulsar.functions.worker.service.api.Sinks; |
| import org.apache.pulsar.functions.worker.service.api.Sources; |
| import org.apache.pulsar.functions.worker.service.api.Workers; |
| import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A service component contains everything to run a worker except rest server. |
| */ |
| @Slf4j |
| @Getter |
| public class PulsarWorkerService implements WorkerService { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(PulsarWorkerService.class); |
| |
| public interface PulsarClientCreator { |
| |
| PulsarAdmin newPulsarAdmin(String pulsarServiceUrl, WorkerConfig workerConfig); |
| |
| PulsarClient newPulsarClient(String pulsarServiceUrl, WorkerConfig workerConfig); |
| |
| } |
| |
| private WorkerConfig workerConfig; |
| |
| private PulsarClient client; |
| private FunctionRuntimeManager functionRuntimeManager; |
| private FunctionMetaDataManager functionMetaDataManager; |
| private ClusterServiceCoordinator clusterServiceCoordinator; |
| // dlog namespace for storing function jars in bookkeeper |
| private Namespace dlogNamespace; |
| // storage client for accessing state storage for functions |
| private StorageAdminClient stateStoreAdminClient; |
| private MembershipManager membershipManager; |
| private SchedulerManager schedulerManager; |
| private volatile boolean isInitialized = false; |
| private ScheduledExecutorService statsUpdater; |
| private AuthenticationService authenticationService; |
| private AuthorizationService authorizationService; |
| private ConnectorsManager connectorsManager; |
| private FunctionsManager functionsManager; |
| private PulsarAdmin brokerAdmin; |
| private PulsarAdmin functionAdmin; |
| private MetricsGenerator metricsGenerator; |
| @VisibleForTesting |
| private URI dlogUri; |
| private LeaderService leaderService; |
| private FunctionAssignmentTailer functionAssignmentTailer; |
| private WorkerStatsManager workerStatsManager; |
| private Functions<PulsarWorkerService> functions; |
| private FunctionsV2<PulsarWorkerService> functionsV2; |
| private Sinks<PulsarWorkerService> sinks; |
| private Sources<PulsarWorkerService> sources; |
| private Workers<PulsarWorkerService> workers; |
| |
| private final PulsarClientCreator clientCreator; |
| |
| public PulsarWorkerService() { |
| this.clientCreator = new PulsarClientCreator() { |
| @Override |
| public PulsarAdmin newPulsarAdmin(String pulsarServiceUrl, WorkerConfig workerConfig) { |
| // using isBrokerClientAuthenticationEnabled instead of isAuthenticationEnabled in function-worker |
| if (workerConfig.isBrokerClientAuthenticationEnabled()) { |
| return WorkerUtils.getPulsarAdminClient( |
| pulsarServiceUrl, |
| workerConfig.getBrokerClientAuthenticationPlugin(), |
| workerConfig.getBrokerClientAuthenticationParameters(), |
| workerConfig.getBrokerClientTrustCertsFilePath(), |
| workerConfig.isTlsAllowInsecureConnection(), |
| workerConfig.isTlsEnableHostnameVerification()); |
| } else { |
| return WorkerUtils.getPulsarAdminClient(pulsarServiceUrl); |
| } |
| } |
| |
| @Override |
| public PulsarClient newPulsarClient(String pulsarServiceUrl, WorkerConfig workerConfig) { |
| // using isBrokerClientAuthenticationEnabled instead of isAuthenticationEnabled in function-worker |
| if (workerConfig.isBrokerClientAuthenticationEnabled()) { |
| return WorkerUtils.getPulsarClient( |
| pulsarServiceUrl, |
| workerConfig.getBrokerClientAuthenticationPlugin(), |
| workerConfig.getBrokerClientAuthenticationParameters(), |
| workerConfig.isUseTls(), |
| workerConfig.getBrokerClientTrustCertsFilePath(), |
| workerConfig.isTlsAllowInsecureConnection(), |
| workerConfig.isTlsEnableHostnameVerification()); |
| } else { |
| return WorkerUtils.getPulsarClient(pulsarServiceUrl); |
| } |
| } |
| }; |
| } |
| |
| public PulsarWorkerService(PulsarClientCreator clientCreator) { |
| this.clientCreator = clientCreator; |
| } |
| |
| @Override |
| public void generateFunctionsStats(SimpleTextOutputStream out) { |
| FunctionsStatsGenerator.generate( |
| this, out |
| ); |
| } |
| |
| @VisibleForTesting |
| public void init(WorkerConfig workerConfig, |
| URI dlogUri, |
| boolean runAsStandalone) { |
| this.statsUpdater = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater")); |
| this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig); |
| this.workerConfig = workerConfig; |
| this.dlogUri = dlogUri; |
| this.workerStatsManager = new WorkerStatsManager(workerConfig, runAsStandalone); |
| this.functions = new FunctionsImpl(() -> PulsarWorkerService.this); |
| this.functionsV2 = new FunctionsImplV2(() -> PulsarWorkerService.this); |
| this.sinks = new SinksImpl(() -> PulsarWorkerService.this); |
| this.sources = new SourcesImpl(() -> PulsarWorkerService.this); |
| this.workers = new WorkerImpl(() -> PulsarWorkerService.this); |
| } |
| |
| @Override |
| public void initAsStandalone(WorkerConfig workerConfig) throws Exception { |
| URI dlogUri = initializeStandaloneWorkerService(clientCreator, workerConfig); |
| init(workerConfig, dlogUri, true); |
| } |
| |
| private static URI initializeStandaloneWorkerService(PulsarClientCreator clientCreator, |
| WorkerConfig workerConfig) throws Exception { |
| // initializing pulsar functions namespace |
| PulsarAdmin admin = clientCreator.newPulsarAdmin(workerConfig.getPulsarWebServiceUrl(), workerConfig); |
| InternalConfigurationData internalConf; |
| // make sure pulsar broker is up |
| log.info("Checking if pulsar service at {} is up...", workerConfig.getPulsarWebServiceUrl()); |
| int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries(); |
| int retries = 0; |
| while (true) { |
| try { |
| admin.clusters().getClusters(); |
| break; |
| } catch (PulsarAdminException e) { |
| log.warn("Failed to retrieve clusters from pulsar service", e); |
| log.warn("Retry to connect to Pulsar service at {}", workerConfig.getPulsarWebServiceUrl()); |
| if (retries >= maxRetries) { |
| log.error("Failed to connect to Pulsar service at {} after {} attempts", |
| workerConfig.getPulsarFunctionsNamespace(), maxRetries); |
| throw e; |
| } |
| retries ++; |
| Thread.sleep(1000); |
| } |
| } |
| |
| // getting namespace policy |
| log.info("Initializing Pulsar Functions namespace..."); |
| try { |
| try { |
| admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace()); |
| } catch (PulsarAdminException e) { |
| if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { |
| // if not found than create |
| try { |
| Policies policies = new Policies(); |
| policies.retention_policies = new RetentionPolicies(-1, -1); |
| policies.replication_clusters = new HashSet<>(); |
| policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster()); |
| admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(), |
| policies); |
| } catch (PulsarAdminException e1) { |
| // prevent race condition with other workers starting up |
| if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { |
| log.error("Failed to create namespace {} for pulsar functions", workerConfig |
| .getPulsarFunctionsNamespace(), e1); |
| throw e1; |
| } |
| } |
| } else { |
| log.error("Failed to get retention policy for pulsar function namespace {}", |
| workerConfig.getPulsarFunctionsNamespace(), e); |
| throw e; |
| } |
| } |
| try { |
| internalConf = admin.brokers().getInternalConfigurationData(); |
| } catch (PulsarAdminException e) { |
| log.error("Failed to retrieve broker internal configuration", e); |
| throw e; |
| } |
| } finally { |
| admin.close(); |
| } |
| |
| // initialize the dlog namespace |
| URI dlogURI; |
| try { |
| if (workerConfig.isInitializedDlogMetadata()) { |
| dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers()); |
| } else { |
| dlogURI = WorkerUtils.initializeDlogNamespace(internalConf); |
| } |
| } catch (IOException ioe) { |
| log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing " + |
| "function packages", internalConf.getZookeeperServers(), |
| internalConf.getBookkeeperMetadataServiceUri(), ioe); |
| throw ioe; |
| } |
| |
| return dlogURI; |
| } |
| |
| @Override |
| public void initInBroker(ServiceConfiguration brokerConfig, |
| WorkerConfig workerConfig, |
| PulsarResources pulsarResources, |
| ConfigurationCacheService configurationCacheService, |
| InternalConfigurationData internalConf) throws Exception { |
| |
| String namespace = workerConfig.getPulsarFunctionsNamespace(); |
| String[] a = workerConfig.getPulsarFunctionsNamespace().split("/"); |
| String property = a[0]; |
| String cluster = workerConfig.getPulsarFunctionsCluster(); |
| |
| int[] ar = null; |
| /* |
| multiple brokers may be trying to create the property, cluster, and namespace |
| for function worker service this in parallel. The function worker service uses the namespace |
| to create topics for internal function |
| */ |
| |
| // create tenant for function worker service |
| try { |
| NamedEntity.checkName(property); |
| pulsarResources.getTenantResources().create(PolicyPath.path(POLICIES, property), |
| new TenantInfo(Sets.newHashSet(workerConfig.getSuperUserRoles()), Sets.newHashSet(cluster))); |
| LOG.info("Created property {} for function worker", property); |
| } catch (AlreadyExistsException e) { |
| LOG.debug("Failed to create already existing property {} for function worker service", cluster, e); |
| } catch (IllegalArgumentException e) { |
| LOG.error("Failed to create property with invalid name {} for function worker service", cluster, e); |
| throw e; |
| } catch (Exception e) { |
| LOG.error("Failed to create property {} for function worker", cluster, e); |
| throw e; |
| } |
| |
| // create cluster for function worker service |
| try { |
| NamedEntity.checkName(cluster); |
| ClusterData clusterData = new ClusterData( |
| workerConfig.getPulsarWebServiceUrl(), |
| null /* serviceUrlTls */, |
| workerConfig.getPulsarServiceUrl(), |
| null /* brokerServiceUrlTls */); |
| pulsarResources.getClusterResources().create( |
| PolicyPath.path("clusters", cluster), |
| clusterData); |
| LOG.info("Created cluster {} for function worker", cluster); |
| } catch (AlreadyExistsException e) { |
| LOG.debug("Failed to create already existing cluster {} for function worker service", cluster, e); |
| } catch (IllegalArgumentException e) { |
| LOG.error("Failed to create cluster with invalid name {} for function worker service", cluster, e); |
| throw e; |
| } catch (Exception e) { |
| LOG.error("Failed to create cluster {} for function worker service", cluster, e); |
| throw e; |
| } |
| |
| // create namespace for function worker service |
| try { |
| Policies policies = new Policies(); |
| policies.retention_policies = new RetentionPolicies(-1, -1); |
| policies.replication_clusters = Collections.singleton(workerConfig.getPulsarFunctionsCluster()); |
| int defaultNumberOfBundles = brokerConfig.getDefaultNumberOfNamespaceBundles(); |
| policies.bundles = getBundles(defaultNumberOfBundles); |
| |
| configurationCacheService.policiesCache().invalidate(PolicyPath.path(POLICIES, namespace)); |
| pulsarResources.getNamespaceResources().create(PolicyPath.path(POLICIES, namespace), policies); |
| LOG.info("Created namespace {} for function worker service", namespace); |
| } catch (AlreadyExistsException e) { |
| LOG.debug("Failed to create already existing namespace {} for function worker service", namespace); |
| } catch (Exception e) { |
| LOG.error("Failed to create namespace {}", namespace, e); |
| throw e; |
| } |
| |
| URI dlogURI; |
| try { |
| // initializing dlog namespace for function worker |
| if (workerConfig.isInitializedDlogMetadata()){ |
| dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers()); |
| } else { |
| dlogURI = WorkerUtils.initializeDlogNamespace(internalConf); |
| } |
| } catch (IOException ioe) { |
| LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for " + |
| "storing function packages", |
| internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe); |
| throw ioe; |
| } |
| |
| init(workerConfig, dlogURI, false); |
| |
| LOG.info("Function worker service setup completed"); |
| } |
| |
| private void tryCreateNonPartitionedTopic(final String topic) throws PulsarAdminException { |
| try { |
| getBrokerAdmin().topics().createNonPartitionedTopic(topic); |
| } catch (PulsarAdminException e) { |
| if (e instanceof PulsarAdminException.ConflictException) { |
| log.warn("Failed to create topic '{}': {}", topic, e.getMessage()); |
| } else { |
| throw e; |
| } |
| } |
| } |
| |
| @Override |
| public void start(AuthenticationService authenticationService, |
| AuthorizationService authorizationService, |
| ErrorNotifier errorNotifier) throws Exception { |
| |
| workerStatsManager.startupTimeStart(); |
| log.info("/** Starting worker id={} **/", workerConfig.getWorkerId()); |
| |
| try { |
| log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter() |
| .writeValueAsString(workerConfig)); |
| } catch (JsonProcessingException e) { |
| log.warn("Failed to print worker configs with error {}", e.getMessage(), e); |
| } |
| |
| try { |
| DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig); |
| try { |
| this.dlogNamespace = NamespaceBuilder.newBuilder() |
| .conf(dlogConf) |
| .clientId("function-worker-" + workerConfig.getWorkerId()) |
| .uri(dlogUri) |
| .build(); |
| } catch (Exception e) { |
| log.error("Failed to initialize dlog namespace {} for storing function packages", |
| dlogUri, e); |
| throw new RuntimeException(e); |
| } |
| |
| // create the state storage client for accessing function state |
| if (workerConfig.getStateStorageServiceUrl() != null) { |
| StorageClientSettings clientSettings = StorageClientSettings.newBuilder() |
| .serviceUri(workerConfig.getStateStorageServiceUrl()) |
| .build(); |
| this.stateStoreAdminClient = StorageClientBuilder.newBuilder() |
| .withSettings(clientSettings) |
| .buildAdmin(); |
| } |
| |
| final String functionWebServiceUrl = StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl()) |
| ? workerConfig.getFunctionWebServiceUrl() |
| : (workerConfig.getTlsEnabled() |
| ? workerConfig.getWorkerWebAddressTls() : workerConfig.getWorkerWebAddress()); |
| |
| this.brokerAdmin = clientCreator.newPulsarAdmin(workerConfig.getPulsarWebServiceUrl(), workerConfig); |
| this.functionAdmin = clientCreator.newPulsarAdmin(functionWebServiceUrl, workerConfig); |
| this.client = clientCreator.newPulsarClient(workerConfig.getPulsarServiceUrl(), workerConfig); |
| |
| tryCreateNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic()); |
| tryCreateNonPartitionedTopic(workerConfig.getClusterCoordinationTopic()); |
| tryCreateNonPartitionedTopic(workerConfig.getFunctionMetadataTopic()); |
| //create scheduler manager |
| this.schedulerManager = new SchedulerManager(workerConfig, client, getBrokerAdmin(), workerStatsManager, errorNotifier); |
| |
| //create function meta data manager |
| this.functionMetaDataManager = new FunctionMetaDataManager( |
| this.workerConfig, this.schedulerManager, this.client, errorNotifier); |
| |
| this.connectorsManager = new ConnectorsManager(workerConfig); |
| this.functionsManager = new FunctionsManager(workerConfig); |
| |
| //create membership manager |
| String coordinationTopic = workerConfig.getClusterCoordinationTopic(); |
| if (!getBrokerAdmin().topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) { |
| getBrokerAdmin().topics() |
| .createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, |
| MessageId.earliest); |
| } |
| this.membershipManager = new MembershipManager(this, client, getBrokerAdmin()); |
| |
| // create function runtime manager |
| this.functionRuntimeManager = new FunctionRuntimeManager( |
| workerConfig, |
| this, |
| dlogNamespace, |
| membershipManager, |
| connectorsManager, |
| functionsManager, |
| functionMetaDataManager, |
| workerStatsManager, |
| errorNotifier); |
| |
| |
| // initialize function assignment tailer that reads from the assignment topic |
| this.functionAssignmentTailer = new FunctionAssignmentTailer( |
| functionRuntimeManager, |
| client.newReader(), |
| workerConfig, |
| errorNotifier); |
| |
| // Start worker early in the worker service init process so that functions don't get re-assigned because |
| // initialize operations of FunctionRuntimeManager and FunctionMetadataManger might take a while |
| this.leaderService = new LeaderService(this, |
| client, |
| functionAssignmentTailer, |
| schedulerManager, |
| functionRuntimeManager, |
| functionMetaDataManager, |
| membershipManager, |
| errorNotifier); |
| |
| log.info("/** Start Leader Service **/"); |
| leaderService.start(); |
| |
| // initialize function metadata manager |
| log.info("/** Initializing Metdata Manager **/"); |
| functionMetaDataManager.initialize(); |
| |
| // initialize function runtime manager |
| log.info("/** Initializing Runtime Manager **/"); |
| |
| MessageId lastAssignmentMessageId = functionRuntimeManager.initialize(); |
| |
| // Setting references to managers in scheduler |
| schedulerManager.setFunctionMetaDataManager(functionMetaDataManager); |
| schedulerManager.setFunctionRuntimeManager(functionRuntimeManager); |
| schedulerManager.setMembershipManager(membershipManager); |
| schedulerManager.setLeaderService(leaderService); |
| |
| this.authenticationService = authenticationService; |
| |
| this.authorizationService = authorizationService; |
| |
| // Start function assignment tailer |
| log.info("/** Starting Function Assignment Tailer **/"); |
| functionAssignmentTailer.startFromMessage(lastAssignmentMessageId); |
| |
| // start function metadata manager |
| log.info("/** Starting Metdata Manager **/"); |
| functionMetaDataManager.start(); |
| |
| // Starting cluster services |
| this.clusterServiceCoordinator = new ClusterServiceCoordinator( |
| workerConfig.getWorkerId(), |
| leaderService); |
| |
| clusterServiceCoordinator.addTask("membership-monitor", |
| workerConfig.getFailureCheckFreqMs(), |
| () -> { |
| // computing a new schedule and checking for failures cannot happen concurrently |
| // both paths of code modify internally cached assignments map in function runtime manager |
| try { |
| schedulerManager.getSchedulerLock().lock(); |
| membershipManager.checkFailures( |
| functionMetaDataManager, functionRuntimeManager, schedulerManager); |
| } finally { |
| schedulerManager.getSchedulerLock().unlock(); |
| } |
| }); |
| |
| if (workerConfig.getRebalanceCheckFreqSec() > 0) { |
| clusterServiceCoordinator.addTask("rebalance-periodic-check", |
| workerConfig.getRebalanceCheckFreqSec() * 1000, |
| () -> { |
| try { |
| schedulerManager.rebalanceIfNotInprogress().get(); |
| } catch (SchedulerManager.RebalanceInProgressException e) { |
| log.info("Scheduled for rebalance but rebalance is already in progress. Ignoring."); |
| } catch (Exception e) { |
| log.warn("Encountered error when running scheduled rebalance", e); |
| } |
| }); |
| } |
| |
| log.info("/** Starting Cluster Service Coordinator **/"); |
| clusterServiceCoordinator.start(); |
| |
| // indicate function worker service is done initializing |
| this.isInitialized = true; |
| |
| log.info("/** Started worker id={} **/", workerConfig.getWorkerId()); |
| |
| workerStatsManager.setFunctionRuntimeManager(functionRuntimeManager); |
| workerStatsManager.setFunctionMetaDataManager(functionMetaDataManager); |
| workerStatsManager.setLeaderService(leaderService); |
| workerStatsManager.startupTimeEnd(); |
| } catch (Throwable t) { |
| log.error("Error Starting up in worker", t); |
| throw new RuntimeException(t); |
| } |
| } |
| |
| @Override |
| public void stop() { |
| if (null != functionMetaDataManager) { |
| try { |
| functionMetaDataManager.close(); |
| } catch (Exception e) { |
| log.warn("Failed to close function metadata manager", e); |
| } |
| } |
| |
| if (null != functionAssignmentTailer) { |
| try { |
| functionAssignmentTailer.close(); |
| } catch (Exception e) { |
| log.warn("Failed to close function assignment tailer", e); |
| } |
| } |
| |
| if (null != functionRuntimeManager) { |
| try { |
| functionRuntimeManager.close(); |
| } catch (Exception e) { |
| log.warn("Failed to close function runtime manager", e); |
| } |
| } |
| |
| if (null != clusterServiceCoordinator) { |
| clusterServiceCoordinator.close(); |
| } |
| |
| if (null != membershipManager) { |
| membershipManager.close(); |
| } |
| |
| if (null != schedulerManager) { |
| schedulerManager.close(); |
| } |
| |
| if (null != leaderService) { |
| try { |
| leaderService.close(); |
| } catch (PulsarClientException e) { |
| log.warn("Failed to close leader service", e); |
| } |
| } |
| |
| if (null != client) { |
| try { |
| client.close(); |
| } catch (PulsarClientException e) { |
| log.warn("Failed to close pulsar client", e); |
| } |
| } |
| |
| if (null != getBrokerAdmin()) { |
| getBrokerAdmin().close(); |
| } |
| |
| if (null != functionAdmin) { |
| functionAdmin.close(); |
| } |
| |
| if (null != stateStoreAdminClient) { |
| stateStoreAdminClient.close(); |
| } |
| |
| if (null != dlogNamespace) { |
| dlogNamespace.close(); |
| } |
| |
| if (statsUpdater != null) { |
| statsUpdater.shutdownNow(); |
| } |
| } |
| |
| } |