blob: ad6ac5b5a9ea3b7ed7ec42d17744a8e87c558b58 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.utils;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.bind.DatatypeConverter;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.proto.system.Common;
import org.apache.heron.scheduler.ExecutorFlag;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.PackingPlanProtoSerializer;
import org.apache.heron.spi.scheduler.IScheduler;
import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import org.apache.heron.spi.utils.ShellUtils;
public final class SchedulerUtils {
public static final int PORTS_REQUIRED_FOR_SCHEDULER = 1;
public static final String SCHEDULER_COMMAND_LINE_PROPERTIES_OVERRIDE_OPTION = "P";
private static final Logger LOG = Logger.getLogger(SchedulerUtils.class.getName());
/**
* Enum that defines the type of ports that an heron executor needs
*/
public enum ExecutorPort {
SERVER_PORT("server", true),
TMANAGER_CONTROLLER_PORT("tmanager-ctl", true),
TMANAGER_STATS_PORT("tmanager-stats", true),
SHELL_PORT("shell-port", true),
METRICS_MANAGER_PORT("metrics-mgr", true),
SCHEDULER_PORT("scheduler", true),
METRICS_CACHE_SERVER_PORT("metrics-cache-m", true),
METRICS_CACHE_STATS_PORT("metrics-cache-s", true),
CHECKPOINT_MANAGER_PORT("ckptmgr", true),
JVM_REMOTE_DEBUGGER_PORTS("jvm-remote-debugger", false);
private final String name;
private final boolean required;
ExecutorPort(String name, boolean required) {
this.name = name;
this.required = required;
}
public String getName() {
return name;
}
public boolean isRequired() {
return required;
}
public static String getPort(ExecutorPort executorPort,
Map<ExecutorPort, String> portMap) {
if (!portMap.containsKey(executorPort) && executorPort.isRequired()) {
throw new RuntimeException("Required port " + executorPort.getName() + " not provided");
}
return portMap.get(executorPort);
}
public static Set<ExecutorPort> getRequiredPorts() {
Set<ExecutorPort> executorPorts = new HashSet<>();
for (ExecutorPort executorPort : ExecutorPort.values()) {
if (executorPort.isRequired()) {
executorPorts.add(executorPort);
}
}
return executorPorts;
}
}
private SchedulerUtils() {
}
/**
* Utils method to construct the command to start heron-scheduler
*
* @param config The static Config
* @param runtime The runtime Config
* @param freePorts list of free ports
* @return String[] representing the command to start heron-scheduler
*/
public static String[] schedulerCommand(
Config config,
Config runtime,
List<Integer> freePorts) {
List<String> commands = new ArrayList<>();
// The java executable should be "{JAVA_HOME}/bin/java"
String javaExecutable = String.format("%s/%s", Context.clusterJavaHome(config), "bin/java");
commands.add(javaExecutable);
commands.add("-cp");
// Construct the complete classpath to start scheduler
String completeSchedulerProcessClassPath = String.format("%s:%s:%s",
Context.schedulerClassPath(config),
Context.packingClassPath(config),
Context.stateManagerClassPath(config));
commands.add(completeSchedulerProcessClassPath);
commands.add("org.apache.heron.scheduler.SchedulerMain");
String[] commandArgs = schedulerCommandArgs(config, runtime, freePorts);
commands.addAll(Arrays.asList(commandArgs));
return commands.toArray(new String[0]);
}
/**
* Util method to get the arguments to the heron scheduler.
*
* @param config The static Config
* @param runtime The runtime Config
* @param freePorts list of free ports
* @return String[] representing the arguments to start heron-scheduler
*/
public static String[] schedulerCommandArgs(
Config config, Config runtime, List<Integer> freePorts) {
// First let us have some safe checks
if (freePorts.size() < PORTS_REQUIRED_FOR_SCHEDULER) {
throw new RuntimeException("Failed to find enough ports for executor");
}
for (int port : freePorts) {
if (port == -1) {
throw new RuntimeException("Failed to find available ports for executor");
}
}
int httpPort = freePorts.get(0);
List<String> commands = new ArrayList<>();
commands.add("--cluster");
commands.add(Context.cluster(config));
commands.add("--role");
commands.add(Context.role(config));
commands.add("--environment");
commands.add(Context.environ(config));
commands.add("--topology_name");
commands.add(Context.topologyName(config));
commands.add("--topology_bin");
commands.add(Context.topologyBinaryFile(config));
commands.add("--http_port");
commands.add(Integer.toString(httpPort));
return commands.toArray(new String[0]);
}
/**
* Utils method to construct the command to start heron-executor
*
* @param config The static config
* @param runtime The runtime config
* @param shardId the executor/container index
* @param ports a map of ports to use where the key indicate the port type and the
* value is the port
* @return String[] representing the command to start heron-executor
*/
public static String[] getExecutorCommand(
Config config,
Config runtime,
int shardId,
Map<ExecutorPort, String> ports) {
return getExecutorCommand(config, runtime, Integer.toString(shardId), ports);
}
/**
* Utils method to construct the command to start heron-executor
*
* @param config The static config
* @param runtime The runtime config
* @param shardId the executor/container index
* @param ports a map of ports to use where the key indicate the port type and the
* value is the port
* @return String[] representing the command to start heron-executor
*/
public static String[] getExecutorCommand(
Config config,
Config runtime,
String shardId,
Map<ExecutorPort, String> ports) {
List<String> commands = new ArrayList<>();
commands.add(Context.executorBinary(config));
String[] commandArgs = executorCommandArgs(config, runtime, ports, shardId);
commands.addAll(Arrays.asList(commandArgs));
return commands.toArray(new String[0]);
}
/**
* Util method to get the arguments to the heron executor. This method creates the arguments
* without the container index, which is the first argument to the executor
*
* @param config The static Config
* @param runtime The runtime Config
* @param ports a map of ports to use where the key indicate the port type and the
* value is the port
* @param containerIndex The index of the current container
* @return String[] representing the arguments to start heron-executor
*/
public static String[] executorCommandArgs(
Config config, Config runtime, Map<ExecutorPort, String> ports, String containerIndex) {
List<String> args = new ArrayList<>();
addExecutorTopologyArgs(args, config, runtime);
addExecutorContainerArgs(args, ports, containerIndex);
return args.toArray(new String[args.size()]);
}
/**
* Util method to parse configs and translate them into topology configs to be used by executor
*
* @param args The list to accept new topology arguments
* @param config The static Config
* @param runtime The runtime Config
*/
public static void addExecutorTopologyArgs(List<String> args, Config config, Config runtime) {
TopologyAPI.Topology topology = Runtime.topology(runtime);
args.add(createCommandArg(ExecutorFlag.TopologyName, topology.getName()));
args.add(createCommandArg(ExecutorFlag.TopologyId, topology.getId()));
args.add(createCommandArg(ExecutorFlag.TopologyDefinitionFile,
FileUtils.getBaseName(Context.topologyDefinitionFile(config))));
args.add(createCommandArg(ExecutorFlag.StateManagerConnection,
Context.stateManagerConnectionString(config)));
args.add(createCommandArg(ExecutorFlag.StateManagerRoot,
Context.stateManagerRootPath(config)));
args.add(createCommandArg(ExecutorFlag.StateManagerConfigFile,
Context.stateManagerFile(config)));
args.add(createCommandArg(ExecutorFlag.TManagerBinary, Context.tmanagerBinary(config)));
args.add(createCommandArg(ExecutorFlag.StmgrBinary, Context.stmgrBinary(config)));
args.add(createCommandArg(ExecutorFlag.MetricsManagerClasspath,
Context.metricsManagerClassPath(config)));
args.add(createCommandArg(ExecutorFlag.InstanceJvmOpts,
SchedulerUtils.encodeJavaOpts(TopologyUtils.getInstanceJvmOptions(topology))));
args.add(createCommandArg(ExecutorFlag.Classpath,
TopologyUtils.makeClassPath(topology, Context.topologyBinaryFile(config))));
args.add(createCommandArg(ExecutorFlag.HeronInternalsConfigFile,
Context.systemConfigFile(config)));
args.add(createCommandArg(ExecutorFlag.OverrideConfigFile, Context.overrideFile(config)));
args.add(createCommandArg(ExecutorFlag.ComponentRamMap, Runtime.componentRamMap(runtime)));
args.add(createCommandArg(ExecutorFlag.ComponentJvmOpts,
SchedulerUtils.encodeJavaOpts(TopologyUtils.getComponentJvmOptions(topology))));
args.add(createCommandArg(ExecutorFlag.PkgType,
Context.topologyPackageType(config).name().toLowerCase()));
args.add(createCommandArg(ExecutorFlag.TopologyBinaryFile,
Context.topologyBinaryFile(config)));
args.add(createCommandArg(ExecutorFlag.HeronJavaHome, Context.clusterJavaHome(config)));
if (Context.verboseGC(config)) {
args.add(ExecutorFlag.EnableVerboseGCLog.getFlag());
}
args.add(createCommandArg(ExecutorFlag.HeronShellBinary, Context.shellBinary(config)));
args.add(createCommandArg(ExecutorFlag.Cluster, Context.cluster(config)));
args.add(createCommandArg(ExecutorFlag.Role, Context.role(config)));
args.add(createCommandArg(ExecutorFlag.Environment, Context.environ(config)));
args.add(createCommandArg(ExecutorFlag.InstanceClasspath, Context.instanceClassPath(config)));
args.add(createCommandArg(ExecutorFlag.MetricsSinksConfigFile,
Context.metricsSinksFile(config)));
String completeSchedulerProcessClassPath = String.format("%s:%s:%s",
Context.schedulerClassPath(config),
Context.packingClassPath(config), Context.stateManagerClassPath(config));
args.add(createCommandArg(ExecutorFlag.SchedulerClasspath, completeSchedulerProcessClassPath));
args.add(createCommandArg(ExecutorFlag.PythonInstanceBinary,
Context.pythonInstanceBinary(config)));
args.add(createCommandArg(ExecutorFlag.CppInstanceBinary, Context.cppInstanceBinary(config)));
args.add(createCommandArg(ExecutorFlag.MetricsCacheManagerClasspath,
Context.metricsCacheManagerClassPath(config)));
String metricscacheMgrMode = Context.metricscacheMgrMode(config)
== null ? "disabled" : Context.metricscacheMgrMode(config);
args.add(createCommandArg(ExecutorFlag.MetricsCacheManagerMode, metricscacheMgrMode));
Boolean ckptMgrEnabled = TopologyUtils.shouldStartCkptMgr(topology);
args.add(createCommandArg(ExecutorFlag.IsStateful, Boolean.toString(ckptMgrEnabled)));
String completeCkptmgrProcessClassPath = String.format("%s:%s:%s",
Context.ckptmgrClassPath(config),
Context.statefulStoragesClassPath(config),
Context.statefulStorageCustomClassPath(config));
args.add(createCommandArg(ExecutorFlag.CheckpointManagerClasspath,
completeCkptmgrProcessClassPath));
args.add(createCommandArg(ExecutorFlag.StatefulConfigFile, Context.statefulConfigFile(config)));
args.add(createCommandArg(
ExecutorFlag.CheckpointManagerRam,
Long.toString(TopologyUtils.getCheckpointManagerRam(topology).asBytes())));
String healthMgrMode = Context.healthMgrMode(config)
== null ? "disabled" : Context.healthMgrMode(config);
args.add(createCommandArg(ExecutorFlag.HealthManagerMode, healthMgrMode));
args.add(createCommandArg(ExecutorFlag.HealthManagerClasspath,
Context.healthMgrClassPath(config)));
}
/**
* Util method to parse port map and container id and translate them into arguments to be used
* by executor
*
* @param args The list to accept new topology arguments
* @param ports a map of ports to use where the key indicate the port type and the
* value is the port
* @param containerIndex The index of the current container
*/
public static void addExecutorContainerArgs(
List<String> args,
Map<ExecutorPort, String> ports,
String containerIndex) {
String serverPort = ExecutorPort.getPort(ExecutorPort.SERVER_PORT, ports);
String tmanagerControllerPort = ExecutorPort.getPort(
ExecutorPort.TMANAGER_CONTROLLER_PORT, ports);
String tmanagerStatsPort = ExecutorPort.getPort(ExecutorPort.TMANAGER_STATS_PORT, ports);
String shellPort = ExecutorPort.getPort(ExecutorPort.SHELL_PORT, ports);
String metricsmgrPort = ExecutorPort.getPort(ExecutorPort.METRICS_MANAGER_PORT, ports);
String schedulerPort = ExecutorPort.getPort(ExecutorPort.SCHEDULER_PORT, ports);
String metricsCacheServerPort = ExecutorPort.getPort(
ExecutorPort.METRICS_CACHE_SERVER_PORT, ports);
String metricsCacheStatsPort = ExecutorPort.getPort(
ExecutorPort.METRICS_CACHE_STATS_PORT, ports);
String ckptmgrPort = ExecutorPort.getPort(ExecutorPort.CHECKPOINT_MANAGER_PORT, ports);
String remoteDebuggerPorts = ExecutorPort.getPort(
ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS, ports);
if (containerIndex != null) {
args.add(createCommandArg(ExecutorFlag.Shard, containerIndex));
}
args.add(createCommandArg(ExecutorFlag.ServerPort, serverPort));
args.add(createCommandArg(ExecutorFlag.TManagerControllerPort, tmanagerControllerPort));
args.add(createCommandArg(ExecutorFlag.TManagerStatsPort, tmanagerStatsPort));
args.add(createCommandArg(ExecutorFlag.ShellPort, shellPort));
args.add(createCommandArg(ExecutorFlag.MetricsManagerPort, metricsmgrPort));
args.add(createCommandArg(ExecutorFlag.SchedulerPort, schedulerPort));
args.add(createCommandArg(ExecutorFlag.MetricsCacheManagerServerPort, metricsCacheServerPort));
args.add(createCommandArg(ExecutorFlag.MetricsCacheManagerStatsPort, metricsCacheStatsPort));
args.add(createCommandArg(ExecutorFlag.CheckpointManagerPort, ckptmgrPort));
if (remoteDebuggerPorts != null) {
args.add(createCommandArg(ExecutorFlag.JvmRemoteDebuggerPorts, remoteDebuggerPorts));
}
}
public static String createCommandArg(ExecutorFlag flag, String value) {
return String.format("%s=%s", flag.getFlag(), value);
}
public static String[] splitCommandArg(String command) {
return command.split("=");
}
/**
* Set the location of scheduler for other processes to discover,
* when invoke IScheduler as a library on client side
*
* @param runtime the runtime configuration
* @param scheduler the IScheduler to provide more info
* @param isService true if the scheduler is a service; false otherwise
*/
public static boolean setLibSchedulerLocation(
Config runtime,
IScheduler scheduler,
boolean isService) {
// Dummy value since there is no scheduler running as service
final String endpoint = "scheduler_as_lib_no_endpoint";
return setSchedulerLocation(runtime, endpoint, scheduler);
}
/**
* Set the location of scheduler for other processes to discover
*
* @param runtime the runtime configuration
* @param schedulerEndpoint the endpoint that scheduler listens for receives requests
* @param scheduler the IScheduler to provide more info
*/
public static boolean setSchedulerLocation(
Config runtime,
String schedulerEndpoint,
IScheduler scheduler) {
// Set scheduler location to host:port by default. Overwrite scheduler location if behind DNS.
Scheduler.SchedulerLocation.Builder builder = Scheduler.SchedulerLocation.newBuilder()
.setTopologyName(Runtime.topologyName(runtime))
.setHttpEndpoint(schedulerEndpoint);
// Set the job link in SchedulerLocation if any
List<String> jobLinks = scheduler.getJobLinks();
// Check whether IScheduler provides valid job link
if (jobLinks != null) {
builder.addAllJobPageLink(jobLinks);
}
Scheduler.SchedulerLocation location = builder.build();
LOG.log(Level.INFO, "Setting Scheduler locations: {0}", location);
SchedulerStateManagerAdaptor statemgr = Runtime.schedulerStateManagerAdaptor(runtime);
Boolean result =
statemgr.setSchedulerLocation(location, Runtime.topologyName(runtime));
if (result == null || !result) {
LOG.severe("Failed to set Scheduler location");
return false;
}
return true;
}
/**
* construct heron scheduler response basing on the given result
*
* @param isOK whether the request successful
*/
public static Scheduler.SchedulerResponse constructSchedulerResponse(boolean isOK) {
Common.Status.Builder status = Common.Status.newBuilder();
if (isOK) {
status.setStatus(Common.StatusCode.OK);
} else {
status.setStatus(Common.StatusCode.NOTOK);
}
return Scheduler.SchedulerResponse.newBuilder().
setStatus(status).
build();
}
/**
* Encode the JVM options
* <br> 1. Convert it into Base64 format
* <br> 2. Add \" at the start and at the end
* <br> 3. replace "=" with "(61)" and "&amp;equals;"
* '=' can be parsed in a wrong way by some schedulers (aurora) hence it needs to be escaped.
*
* @return encoded string
*/
public static String encodeJavaOpts(String javaOpts) {
String javaOptsBase64 = DatatypeConverter.printBase64Binary(
javaOpts.getBytes(StandardCharsets.UTF_8));
return String.format("\"%s\"", javaOptsBase64.replace("=", "(61)"));
}
/**
* Decode the JVM options
* <br> 1. strip \" at the start and at the end
* <br> 2. replace "(61)" and "&amp;equals;" with "="
* <br> 3. Revert from Base64 format
* Note that '=' is escaped in two different ways. '(61)' is the new escaping.
* '&amp;equals;' was the original replacement but it is not friendly to bash and
* was causing issues. The original escaping is still left there for reference
* and backward compatibility purposes (to be removed after no topology needs
* it)
*
* @return decoded string
*/
public static String decodeJavaOpts(String encodedJavaOpts) {
String javaOptsBase64 =
encodedJavaOpts.
replaceAll("^\"+", "").
replaceAll("\\s+$", "").
replace("(61)", "=").
replace("&equals;", "=");
return new String(
DatatypeConverter.parseBase64Binary(javaOptsBase64), StandardCharsets.UTF_8);
}
/**
* Create the directory if it does not exist otherwise clean the directory.
*
* @param directory the working directory to setup
* @return true if successful
*/
public static boolean createOrCleanDirectory(String directory) {
// if the directory does not exist, create it.
if (!FileUtils.isDirectoryExists(directory)) {
LOG.fine("The directory does not exist; creating it.");
if (!FileUtils.createDirectory(directory)) {
LOG.severe("Failed to create directory: " + directory);
return false;
}
}
// Cleanup the directory
if (!FileUtils.cleanDir(directory)) {
LOG.severe("Failed to clean directory: " + directory);
return false;
}
return true;
}
public static boolean extractPackage(String workingDirectory, String packageURI,
String packageDestination, boolean deletePackage, boolean verbose) {
return curlAndExtractPackage(workingDirectory, packageURI, packageDestination,
deletePackage, verbose);
}
/**
* Curl a package, extract it to working directory
*
* @param workingDirectory the working directory to setup
* @param packageURI the URL of core release package
* @param packageDestination the destination of the core release package fetched
* @param isDeletePackage delete the package curled or not
* @param isVerbose display verbose output or not
* @return true if successful
*/
public static boolean curlAndExtractPackage(
String workingDirectory,
String packageURI,
String packageDestination,
boolean isDeletePackage,
boolean isVerbose) {
// curl the package to the working directory and extract it
LOG.log(Level.FINE, "Fetching package {0}", packageURI);
LOG.fine("Fetched package can overwrite old one.");
if (!ShellUtils.curlPackage(
packageURI, packageDestination, isVerbose, false)) {
LOG.severe("Failed to fetch package.");
return false;
}
// untar the heron core release package in the working directory
LOG.log(Level.FINE, "Extracting the package {0}", packageURI);
if (!ShellUtils.extractPackage(
packageDestination, workingDirectory, isVerbose, false)) {
LOG.severe("Failed to extract package.");
return false;
}
// remove the core release package
if (isDeletePackage && !FileUtils.deleteFile(packageDestination)) {
LOG.warning("Failed to delete the package: " + packageDestination);
}
return true;
}
/**
* Replaces persisted packing plan in state manager.
*/
public static void persistUpdatedPackingPlan(String topologyName,
PackingPlan updatedPackingPlan,
SchedulerStateManagerAdaptor stateManager) {
LOG.log(Level.INFO, "Updating scheduled-resource in packing plan: {0}", topologyName);
PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
if (!stateManager.updatePackingPlan(serializer.toProto(updatedPackingPlan), topologyName)) {
throw new RuntimeException(String.format(
"Failed to update packing plan for topology %s", topologyName));
}
}
}