| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.pulsar.functions.runtime; |
| |
| import com.beust.jcommander.JCommander; |
| import com.beust.jcommander.Parameter; |
| import com.beust.jcommander.converters.StringConverter; |
| import com.google.gson.Gson; |
| import com.google.gson.reflect.TypeToken; |
| import com.google.protobuf.Empty; |
| import com.google.protobuf.util.JsonFormat; |
| import io.grpc.Server; |
| import io.grpc.ServerBuilder; |
| import io.grpc.stub.StreamObserver; |
| import io.prometheus.client.CollectorRegistry; |
| import io.prometheus.client.exporter.HTTPServer; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.pulsar.common.nar.NarClassLoader; |
| import org.apache.pulsar.common.functions.AuthenticationConfig; |
| import org.apache.pulsar.functions.instance.InstanceCache; |
| import org.apache.pulsar.functions.instance.InstanceConfig; |
| import org.apache.pulsar.functions.proto.Function; |
| import org.apache.pulsar.functions.proto.InstanceCommunication; |
| import org.apache.pulsar.functions.proto.InstanceControlGrpc; |
| import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; |
| import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; |
| import org.apache.pulsar.functions.secretsprovider.SecretsProvider; |
| import org.apache.pulsar.common.util.Reflections; |
| |
| import java.lang.reflect.Type; |
| import java.net.InetSocketAddress; |
| import java.util.Map; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| |
| |
| @Slf4j |
| public class JavaInstanceStarter implements AutoCloseable { |
| @Parameter(names = "--function_details", description = "Function details json\n", required = true) |
| public String functionDetailsJsonString; |
| @Parameter( |
| names = "--jar", |
| description = "Path to Jar\n", |
| listConverter = StringConverter.class) |
| public String jarFile; |
| |
| @Parameter(names = "--instance_id", description = "Instance Id\n", required = true) |
| public int instanceId; |
| |
| @Parameter(names = "--function_id", description = "Function Id\n", required = true) |
| public String functionId; |
| |
| @Parameter(names = "--function_version", description = "Function Version\n", required = true) |
| public String functionVersion; |
| |
| @Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true) |
| public String pulsarServiceUrl; |
| |
| @Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n") |
| public String clientAuthenticationPlugin; |
| |
| @Parameter(names = "--client_auth_params", description = "Client auth param\n") |
| public String clientAuthenticationParameters; |
| |
| @Parameter(names = "--use_tls", description = "Use tls connection\n") |
| public String useTls = Boolean.FALSE.toString(); |
| |
| @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n") |
| public String tlsAllowInsecureConnection = Boolean.TRUE.toString(); |
| |
| @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification") |
| public String tlsHostNameVerificationEnabled = Boolean.FALSE.toString(); |
| |
| @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path") |
| public String tlsTrustCertFilePath; |
| |
| @Parameter(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required= false) |
| public String stateStorageServiceUrl; |
| |
| @Parameter(names = "--port", description = "Port to listen on\n", required = true) |
| public int port; |
| |
| @Parameter(names = "--metrics_port", description = "Port metrics will be exposed on\n", required = true) |
| public int metrics_port; |
| |
| @Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true) |
| public int maxBufferedTuples; |
| |
| @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true) |
| public int expectedHealthCheckInterval; |
| |
| @Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false) |
| public String secretsProviderClassName; |
| |
| @Parameter(names = "--secrets_provider_config", description = "The config that needs to be passed to secrets provider", required = false) |
| public String secretsProviderConfig; |
| |
| @Parameter(names = "--cluster_name", description = "The name of the cluster this instance is running on", required = true) |
| public String clusterName; |
| |
| @Parameter(names = "--nar_extraction_directory", description = "The directory where extraction of nar packages happen", required = false) |
| public String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; |
| |
| @Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance", required = false) |
| public int maxPendingAsyncRequests = 1000; |
| |
| @Parameter(names = "--web_serviceurl", description = "Pulsar Web Service Url", required = false) |
| public String webServiceUrl = null; |
| |
| @Parameter(names = "--expose_pulsaradmin", description = "Whether the pulsar admin client exposed to function context, default is disabled.", required = false) |
| public Boolean exposePulsarAdminClientEnabled = false; |
| |
| private Server server; |
| private RuntimeSpawner runtimeSpawner; |
| private ThreadRuntimeFactory containerFactory; |
| private Long lastHealthCheckTs = null; |
| private HTTPServer metricsServer; |
| private ScheduledFuture healthCheckTimer; |
| |
| public JavaInstanceStarter() { } |
| |
| public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassLoader rootClassLoader) throws Exception { |
| Thread.currentThread().setContextClassLoader(functionInstanceClassLoader); |
| |
| JCommander jcommander = new JCommander(this); |
| // parse args by JCommander |
| jcommander.parse(args); |
| |
| InstanceConfig instanceConfig = new InstanceConfig(); |
| instanceConfig.setFunctionId(functionId); |
| instanceConfig.setFunctionVersion(functionVersion); |
| instanceConfig.setInstanceId(instanceId); |
| instanceConfig.setMaxBufferedTuples(maxBufferedTuples); |
| instanceConfig.setClusterName(clusterName); |
| instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests); |
| instanceConfig.setExposePulsarAdminClientEnabled(exposePulsarAdminClientEnabled); |
| Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder(); |
| if (functionDetailsJsonString.charAt(0) == '\'') { |
| functionDetailsJsonString = functionDetailsJsonString.substring(1); |
| } |
| if (functionDetailsJsonString.charAt(functionDetailsJsonString.length() - 1) == '\'') { |
| functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1); |
| } |
| JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder); |
| Function.FunctionDetails functionDetails = functionDetailsBuilder.build(); |
| instanceConfig.setFunctionDetails(functionDetails); |
| instanceConfig.setPort(port); |
| instanceConfig.setMetricsPort(metrics_port); |
| |
| Map<String, String> secretsProviderConfigMap = null; |
| if (!StringUtils.isEmpty(secretsProviderConfig)) { |
| if (secretsProviderConfig.charAt(0) == '\'') { |
| secretsProviderConfig = secretsProviderConfig.substring(1); |
| } |
| if (secretsProviderConfig.charAt(secretsProviderConfig.length() - 1) == '\'') { |
| secretsProviderConfig = secretsProviderConfig.substring(0, secretsProviderConfig.length() - 1); |
| } |
| Type type = new TypeToken<Map<String, String>>() {}.getType(); |
| secretsProviderConfigMap = new Gson().fromJson(secretsProviderConfig, type); |
| } |
| |
| if (StringUtils.isEmpty(secretsProviderClassName)) { |
| secretsProviderClassName = ClearTextSecretsProvider.class.getName(); |
| } |
| |
| SecretsProvider secretsProvider; |
| try { |
| secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, functionInstanceClassLoader); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| secretsProvider.init(secretsProviderConfigMap); |
| |
| // Collector Registry for prometheus metrics |
| CollectorRegistry collectorRegistry = new CollectorRegistry(); |
| RuntimeUtils.registerDefaultCollectors(collectorRegistry); |
| |
| containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl, |
| stateStorageServiceUrl, |
| AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin) |
| .clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls)) |
| .tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection)) |
| .tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled)) |
| .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), |
| secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader, |
| exposePulsarAdminClientEnabled, webServiceUrl); |
| runtimeSpawner = new RuntimeSpawner( |
| instanceConfig, |
| jarFile, |
| null, // we really dont use this in thread container |
| containerFactory, |
| expectedHealthCheckInterval * 1000); |
| |
| server = ServerBuilder.forPort(port) |
| .addService(new InstanceControlImpl(runtimeSpawner)) |
| .build() |
| .start(); |
| log.info("JavaInstance Server started, listening on " + port); |
| java.lang.Runtime.getRuntime().addShutdownHook(new Thread() { |
| @Override |
| public void run() { |
| // Use stderr here since the logger may have been reset by its JVM shutdown hook. |
| try { |
| close(); |
| } catch (Exception ex) { |
| System.err.println(ex); |
| } |
| } |
| }); |
| |
| log.info("Starting runtimeSpawner"); |
| runtimeSpawner.start(); |
| |
| // starting metrics server |
| log.info("Starting metrics server on port {}", metrics_port); |
| metricsServer = new HTTPServer(new InetSocketAddress(metrics_port), collectorRegistry, true); |
| |
| if (expectedHealthCheckInterval > 0) { |
| healthCheckTimer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> { |
| try { |
| if (System.currentTimeMillis() - lastHealthCheckTs > 3 * expectedHealthCheckInterval * 1000) { |
| log.info("Haven't received health check from spawner in a while. Stopping instance..."); |
| close(); |
| } |
| } catch (Exception e) { |
| log.error("Error occurred when checking for latest health check", e); |
| } |
| }, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval * 1000, TimeUnit.MILLISECONDS); |
| } |
| |
| runtimeSpawner.join(); |
| log.info("RuntimeSpawner quit, shutting down JavaInstance"); |
| close(); |
| } |
| |
| private static boolean isTrue(String param) { |
| return Boolean.TRUE.toString().equals(param); |
| } |
| |
| @Override |
| public void close() { |
| try { |
| // Use stderr here since the logger may have been reset by its JVM shutdown hook. |
| if (server != null) { |
| server.shutdown(); |
| } |
| if (runtimeSpawner != null) { |
| runtimeSpawner.close(); |
| } |
| if (healthCheckTimer != null) { |
| healthCheckTimer.cancel(false); |
| } |
| if (containerFactory != null) { |
| containerFactory.close(); |
| } |
| if (metricsServer != null) { |
| metricsServer.stop(); |
| } |
| |
| InstanceCache.shutdown(); |
| } catch (Exception ex) { |
| System.err.println(ex); |
| } |
| } |
| |
| |
| class InstanceControlImpl extends InstanceControlGrpc.InstanceControlImplBase { |
| private RuntimeSpawner runtimeSpawner; |
| |
| public InstanceControlImpl(RuntimeSpawner runtimeSpawner) { |
| this.runtimeSpawner = runtimeSpawner; |
| lastHealthCheckTs = System.currentTimeMillis(); |
| } |
| |
| @Override |
| public void getFunctionStatus(Empty request, StreamObserver<InstanceCommunication.FunctionStatus> responseObserver) { |
| try { |
| InstanceCommunication.FunctionStatus response = runtimeSpawner.getFunctionStatus(runtimeSpawner.getInstanceConfig().getInstanceId()).get(); |
| responseObserver.onNext(response); |
| responseObserver.onCompleted(); |
| } catch (Exception e) { |
| log.error("Exception in JavaInstance doing getFunctionStatus", e); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void getAndResetMetrics(com.google.protobuf.Empty request, |
| io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) { |
| Runtime runtime = runtimeSpawner.getRuntime(); |
| if (runtime != null) { |
| try { |
| InstanceCommunication.MetricsData metrics = runtime.getAndResetMetrics().get(); |
| responseObserver.onNext(metrics); |
| responseObserver.onCompleted(); |
| } catch (InterruptedException | ExecutionException e) { |
| log.error("Exception in JavaInstance doing getAndResetMetrics", e); |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| @Override |
| public void getMetrics(com.google.protobuf.Empty request, |
| io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) { |
| Runtime runtime = runtimeSpawner.getRuntime(); |
| if (runtime != null) { |
| try { |
| InstanceCommunication.MetricsData metrics = runtime.getMetrics(instanceId).get(); |
| responseObserver.onNext(metrics); |
| responseObserver.onCompleted(); |
| } catch (InterruptedException | ExecutionException e) { |
| log.error("Exception in JavaInstance doing getAndResetMetrics", e); |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| public void resetMetrics(com.google.protobuf.Empty request, |
| io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) { |
| Runtime runtime = runtimeSpawner.getRuntime(); |
| if (runtime != null) { |
| try { |
| runtime.resetMetrics().get(); |
| responseObserver.onNext(com.google.protobuf.Empty.getDefaultInstance()); |
| responseObserver.onCompleted(); |
| } catch (InterruptedException | ExecutionException e) { |
| log.error("Exception in JavaInstance doing resetMetrics", e); |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| @Override |
| public void healthCheck(com.google.protobuf.Empty request, |
| io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult> responseObserver) { |
| log.debug("Received health check request..."); |
| InstanceCommunication.HealthCheckResult healthCheckResult |
| = InstanceCommunication.HealthCheckResult.newBuilder().setSuccess(true).build(); |
| responseObserver.onNext(healthCheckResult); |
| responseObserver.onCompleted(); |
| |
| lastHealthCheckTs = System.currentTimeMillis(); |
| } |
| } |
| } |