| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.overlord; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Optional; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Sets; |
| import com.google.common.io.ByteSink; |
| import com.google.common.io.ByteStreams; |
| import com.google.common.io.FileWriteMode; |
| import com.google.common.io.Files; |
| import com.google.common.math.IntMath; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.inject.Inject; |
| import org.apache.druid.guice.annotations.Self; |
| import org.apache.druid.indexer.RunnerTaskState; |
| import org.apache.druid.indexer.TaskLocation; |
| import org.apache.druid.indexer.TaskStatus; |
| import org.apache.druid.indexing.common.TaskStorageDirTracker; |
| import org.apache.druid.indexing.common.config.TaskConfig; |
| import org.apache.druid.indexing.common.task.Task; |
| import org.apache.druid.indexing.common.tasklogs.ConsoleLoggingEnforcementConfigurationFactory; |
| import org.apache.druid.indexing.common.tasklogs.LogUtils; |
| import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; |
| import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig; |
| import org.apache.druid.indexing.worker.config.WorkerConfig; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.FileUtils; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.common.io.Closer; |
| import org.apache.druid.java.util.common.lifecycle.LifecycleStart; |
| import org.apache.druid.java.util.common.lifecycle.LifecycleStop; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.query.DruidMetrics; |
| import org.apache.druid.server.DruidNode; |
| import org.apache.druid.server.log.StartupLoggingConfig; |
| import org.apache.druid.server.metrics.MonitorsConfig; |
| import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider; |
| import org.apache.druid.tasklogs.TaskLogPusher; |
| import org.apache.druid.tasklogs.TaskLogStreamer; |
| import org.apache.druid.utils.JvmUtils; |
| import org.joda.time.DateTime; |
| import org.joda.time.Interval; |
| |
| import javax.annotation.Nullable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.math.RoundingMode; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| /** |
| * Runs tasks in separate processes using the "internal peon" verb. |
| */ |
| public class ForkingTaskRunner |
| extends BaseRestorableTaskRunner<ForkingTaskRunner.ForkingTaskRunnerWorkItem> |
| implements TaskLogStreamer, WorkerTaskCountStatsProvider |
| { |
| private static final EmittingLogger LOGGER = new EmittingLogger(ForkingTaskRunner.class); |
| private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property."; |
| |
| /** |
| * Properties to add on Java 11+. When updating this list, update all four: |
| * 1) ForkingTaskRunner#STRONG_ENCAPSULATION_PROPERTIES (here) --> |
| * 2) docs/operations/java.md, "Strong encapsulation" section --> |
| * 3) pom.xml, jdk.strong.encapsulation.argLine --> |
| * 4) examples/bin/run-java script |
| */ |
| private static final List<String> STRONG_ENCAPSULATION_PROPERTIES = ImmutableList.of( |
| "--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED", |
| "--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED", |
| "--add-opens=java.base/java.nio=ALL-UNNAMED", |
| "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", |
| "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED", |
| "--add-opens=java.base/java.io=ALL-UNNAMED", |
| "--add-opens=java.base/java.lang=ALL-UNNAMED", |
| "--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED" |
| ); |
| |
| private final ForkingTaskRunnerConfig config; |
| private final Properties props; |
| private final TaskLogPusher taskLogPusher; |
| private final DruidNode node; |
| private final ListeningExecutorService exec; |
| private final PortFinder portFinder; |
| private final StartupLoggingConfig startupLoggingConfig; |
| private final WorkerConfig workerConfig; |
| |
| private volatile int numProcessorsPerTask = -1; |
| private volatile boolean stopping = false; |
| |
| private final AtomicLong lastReportedFailedTaskCount = new AtomicLong(); |
| private final AtomicLong failedTaskCount = new AtomicLong(); |
| private final AtomicLong successfulTaskCount = new AtomicLong(); |
| private final AtomicLong lastReportedSuccessfulTaskCount = new AtomicLong(); |
| |
| @Inject |
| public ForkingTaskRunner( |
| ForkingTaskRunnerConfig config, |
| TaskConfig taskConfig, |
| WorkerConfig workerConfig, |
| Properties props, |
| TaskLogPusher taskLogPusher, |
| ObjectMapper jsonMapper, |
| @Self DruidNode node, |
| StartupLoggingConfig startupLoggingConfig, |
| TaskStorageDirTracker dirTracker |
| ) |
| { |
| super(jsonMapper, taskConfig, dirTracker); |
| this.config = config; |
| this.props = props; |
| this.taskLogPusher = taskLogPusher; |
| this.node = node; |
| this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts()); |
| this.startupLoggingConfig = startupLoggingConfig; |
| this.workerConfig = workerConfig; |
| this.exec = MoreExecutors.listeningDecorator( |
| Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d") |
| ); |
| } |
| |
| @Override |
| public ListenableFuture<TaskStatus> run(final Task task) |
| { |
| synchronized (tasks) { |
| tasks.computeIfAbsent( |
| task.getId(), k -> |
| new ForkingTaskRunnerWorkItem( |
| task, |
| exec.submit( |
| new Callable<TaskStatus>() { |
| @Override |
| public TaskStatus call() |
| { |
| final TaskStorageDirTracker.StorageSlot storageSlot; |
| try { |
| storageSlot = getTracker().pickStorageSlot(task.getId()); |
| } |
| catch (RuntimeException e) { |
| LOG.warn(e, "Failed to get storage slot for task [%s], cannot schedule.", task.getId()); |
| return TaskStatus.failure( |
| task.getId(), |
| StringUtils.format("Failed to get storage slot due to error [%s]", e.getMessage()) |
| ); |
| } |
| |
| final File taskDir = new File(storageSlot.getDirectory(), task.getId()); |
| final String attemptId = String.valueOf(getNextAttemptID(taskDir)); |
| final File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", attemptId).toFile(); |
| |
| final ProcessHolder processHolder; |
| final String childHost = node.getHost(); |
| int childPort = -1; |
| int tlsChildPort = -1; |
| |
| if (node.isEnablePlaintextPort()) { |
| childPort = portFinder.findUnusedPort(); |
| } |
| |
| if (node.isEnableTlsPort()) { |
| tlsChildPort = portFinder.findUnusedPort(); |
| } |
| |
| final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort); |
| |
| try { |
| final Closer closer = Closer.create(); |
| try { |
| final File taskFile = new File(taskDir, "task.json"); |
| final File statusFile = new File(attemptDir, "status.json"); |
| final File logFile = new File(taskDir, "log"); |
| final File reportsFile = new File(attemptDir, "report.json"); |
| |
| // time to adjust process holders |
| synchronized (tasks) { |
| final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId()); |
| |
| if (taskWorkItem == null) { |
| LOGGER.makeAlert("TaskInfo disappeared!").addData("task", task.getId()).emit(); |
| throw new ISE("TaskInfo disappeared for task[%s]!", task.getId()); |
| } |
| |
| if (taskWorkItem.shutdown) { |
| throw new IllegalStateException("Task has been shut down!"); |
| } |
| |
| if (taskWorkItem.processHolder != null) { |
| LOGGER.makeAlert("TaskInfo already has a processHolder") |
| .addData("task", task.getId()) |
| .emit(); |
| throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId()); |
| } |
| |
| final CommandListBuilder command = new CommandListBuilder(); |
| final String taskClasspath; |
| if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { |
| taskClasspath = Joiner.on(File.pathSeparator).join( |
| task.getClasspathPrefix(), |
| config.getClasspath() |
| ); |
| } else { |
| taskClasspath = config.getClasspath(); |
| } |
| |
| command.add(config.getJavaCommand()); |
| |
| if (JvmUtils.majorVersion() >= 11) { |
| command.addAll(STRONG_ENCAPSULATION_PROPERTIES); |
| } |
| |
| command.add("-cp"); |
| command.add(taskClasspath); |
| |
| if (numProcessorsPerTask < 1) { |
| // numProcessorsPerTask is set by start() |
| throw new ISE("Not started"); |
| } |
| |
| command.add(StringUtils.format("-XX:ActiveProcessorCount=%d", numProcessorsPerTask)); |
| |
| command.addAll(new QuotableWhiteSpaceSplitter(config.getJavaOpts())); |
| command.addAll(config.getJavaOptsArray()); |
| |
| // Override task specific javaOpts |
| Object taskJavaOpts = task.getContextValue( |
| ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY |
| ); |
| if (taskJavaOpts != null) { |
| command.addAll(new QuotableWhiteSpaceSplitter((String) taskJavaOpts)); |
| } |
| |
| // Override task specific javaOptsArray |
| try { |
| List<String> taskJavaOptsArray = jsonMapper.convertValue( |
| task.getContextValue(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY), |
| new TypeReference<List<String>>() {} |
| ); |
| if (taskJavaOptsArray != null) { |
| command.addAll(taskJavaOptsArray); |
| } |
| } |
| catch (Exception e) { |
| throw new IllegalArgumentException( |
| ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY |
| + " in context of task: " + task.getId() + " must be an array of strings.", |
| e |
| ); |
| } |
| |
| for (String propName : props.stringPropertyNames()) { |
| for (String allowedPrefix : config.getAllowedPrefixes()) { |
| // See https://github.com/apache/druid/issues/1841 |
| if (propName.startsWith(allowedPrefix) |
| && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName) |
| && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName) |
| ) { |
| command.addSystemProperty(propName, props.getProperty(propName)); |
| } |
| } |
| } |
| |
| // Override child JVM specific properties |
| for (String propName : props.stringPropertyNames()) { |
| if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { |
| command.addSystemProperty( |
| propName.substring(CHILD_PROPERTY_PREFIX.length()), |
| props.getProperty(propName) |
| ); |
| } |
| } |
| |
| // Override task specific properties |
| final Map<String, Object> context = task.getContext(); |
| if (context != null) { |
| for (String propName : context.keySet()) { |
| if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { |
| command.addSystemProperty( |
| propName.substring(CHILD_PROPERTY_PREFIX.length()), |
| task.getContextValue(propName) |
| ); |
| } |
| } |
| } |
| |
| // add the attemptId as a system property |
| command.addSystemProperty("attemptId", "1"); |
| |
| // Add dataSource, taskId and taskType for metrics or logging |
| command.addSystemProperty( |
| MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.DATASOURCE, |
| task.getDataSource() |
| ); |
| command.addSystemProperty( |
| MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_ID, |
| task.getId() |
| ); |
| command.addSystemProperty( |
| MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_TYPE, |
| task.getType() |
| ); |
| command.addSystemProperty( |
| MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.GROUP_ID, |
| task.getGroupId() |
| ); |
| |
| |
| command.addSystemProperty("druid.host", childHost); |
| command.addSystemProperty("druid.plaintextPort", childPort); |
| command.addSystemProperty("druid.tlsPort", tlsChildPort); |
| |
| // Let tasks know where they are running on. |
| // This information is used in native parallel indexing with shuffle. |
| command.addSystemProperty("druid.task.executor.service", node.getServiceName()); |
| command.addSystemProperty("druid.task.executor.host", node.getHost()); |
| command.addSystemProperty("druid.task.executor.plaintextPort", node.getPlaintextPort()); |
| command.addSystemProperty("druid.task.executor.enablePlaintextPort", node.isEnablePlaintextPort()); |
| command.addSystemProperty("druid.task.executor.tlsPort", node.getTlsPort()); |
| command.addSystemProperty("druid.task.executor.enableTlsPort", node.isEnableTlsPort()); |
| command.addSystemProperty("log4j2.configurationFactory", ConsoleLoggingEnforcementConfigurationFactory.class.getName()); |
| |
| command.addSystemProperty("druid.indexer.task.baseTaskDir", storageSlot.getDirectory().getAbsolutePath()); |
| command.addSystemProperty("druid.indexer.task.tmpStorageBytesPerTask", storageSlot.getNumBytes()); |
| |
| command.add("org.apache.druid.cli.Main"); |
| command.add("internal"); |
| command.add("peon"); |
| command.add(taskDir.toString()); |
| command.add(attemptId); |
| String nodeType = task.getNodeType(); |
| if (nodeType != null) { |
| command.add("--nodeType"); |
| command.add(nodeType); |
| } |
| |
| // If the task type is queryable, we need to load broadcast segments on the peon, used for |
| // join queries |
| if (task.supportsQueries()) { |
| command.add("--loadBroadcastSegments"); |
| command.add("true"); |
| } |
| |
| if (!taskFile.exists()) { |
| jsonMapper.writeValue(taskFile, task); |
| } |
| |
| LOGGER.info( |
| "Running command[%s]", |
| getMaskedCommand(startupLoggingConfig.getMaskProperties(), command.getCommandList()) |
| ); |
| taskWorkItem.processHolder = runTaskProcess(command.getCommandList(), logFile, taskLocation); |
| |
| processHolder = taskWorkItem.processHolder; |
| processHolder.registerWithCloser(closer); |
| } |
| |
| TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation); |
| TaskRunnerUtils.notifyStatusChanged( |
| listeners, |
| task.getId(), |
| TaskStatus.running(task.getId()) |
| ); |
| |
| LOGGER.info("Logging output of task[%s] to file[%s].", task.getId(), logFile); |
| final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile); |
| final TaskStatus status; |
| if (exitCode == 0) { |
| LOGGER.info("Process exited successfully for task[%s]", task.getId()); |
| // Process exited successfully |
| status = jsonMapper.readValue(statusFile, TaskStatus.class); |
| } else { |
| LOGGER.error("Process exited with code[%d] for task[%s]", exitCode, task.getId()); |
| // Process exited unsuccessfully |
| status = TaskStatus.failure( |
| task.getId(), |
| StringUtils.format( |
| "Task execution process exited unsuccessfully with code[%s]. " |
| + "See middleManager logs for more details.", |
| exitCode |
| ) |
| ); |
| } |
| if (status.isSuccess()) { |
| successfulTaskCount.incrementAndGet(); |
| } else { |
| failedTaskCount.incrementAndGet(); |
| } |
| TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); |
| return status; |
| } |
| catch (Throwable t) { |
| throw closer.rethrow(t); |
| } |
| finally { |
| closer.close(); |
| } |
| } |
| catch (Throwable t) { |
| LOGGER.info(t, "Exception caught during execution"); |
| throw new RuntimeException(t); |
| } |
| finally { |
| try { |
| synchronized (tasks) { |
| final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId()); |
| if (taskWorkItem != null && taskWorkItem.processHolder != null) { |
| taskWorkItem.processHolder.shutdown(); |
| } |
| if (!stopping) { |
| saveRunningTasks(); |
| } |
| } |
| |
| if (node.isEnablePlaintextPort()) { |
| portFinder.markPortUnused(childPort); |
| } |
| if (node.isEnableTlsPort()) { |
| portFinder.markPortUnused(tlsChildPort); |
| } |
| |
| getTracker().returnStorageSlot(storageSlot); |
| |
| try { |
| if (!stopping && taskDir.exists()) { |
| FileUtils.deleteDirectory(taskDir); |
| LOGGER.info("Removing task directory: %s", taskDir); |
| } |
| } |
| catch (Exception e) { |
| LOGGER.makeAlert(e, "Failed to delete task directory") |
| .addData("taskDir", taskDir.toString()) |
| .addData("task", task.getId()) |
| .emit(); |
| } |
| } |
| catch (Exception e) { |
| LOGGER.error(e, "Suppressing exception caught while cleaning up task"); |
| } |
| } |
| } |
| |
| } |
| ) |
| ) |
| ); |
| saveRunningTasks(); |
| return tasks.get(task.getId()).getResult(); |
| } |
| } |
| |
| @VisibleForTesting |
| ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException |
| { |
| return new ProcessHolder( |
| new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(), |
| logFile, |
| taskLocation |
| ); |
| } |
| |
| @VisibleForTesting |
| int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile) |
| throws IOException, InterruptedException |
| { |
| final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND); |
| |
| // This will block for a while. So we append the thread information with more details |
| final String priorThreadName = Thread.currentThread().getName(); |
| Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId())); |
| |
| try (final OutputStream toLogfile = logSink.openStream()) { |
| ByteStreams.copy(processHolder.process.getInputStream(), toLogfile); |
| return processHolder.process.waitFor(); |
| } |
| finally { |
| Thread.currentThread().setName(priorThreadName); |
| // Upload task logs |
| taskLogPusher.pushTaskLog(task.getId(), logFile); |
| if (reportsFile.exists()) { |
| taskLogPusher.pushTaskReports(task.getId(), reportsFile); |
| } |
| } |
| } |
| |
| @Override |
| @LifecycleStop |
| public void stop() |
| { |
| stopping = true; |
| exec.shutdown(); |
| |
| synchronized (tasks) { |
| for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { |
| shutdownTaskProcess(taskWorkItem); |
| } |
| } |
| |
| final DateTime start = DateTimes.nowUtc(); |
| final long timeout = new Interval(start, taskConfig.getGracefulShutdownTimeout()).toDurationMillis(); |
| |
| // Things should be terminating now. Wait for it to happen so logs can be uploaded and all that good stuff. |
| LOGGER.info("Waiting up to %,dms for shutdown.", timeout); |
| if (timeout > 0) { |
| try { |
| final boolean terminated = exec.awaitTermination(timeout, TimeUnit.MILLISECONDS); |
| final long elapsed = System.currentTimeMillis() - start.getMillis(); |
| if (terminated) { |
| LOGGER.info("Finished stopping in %,dms.", elapsed); |
| } else { |
| final Set<String> stillRunning; |
| synchronized (tasks) { |
| stillRunning = ImmutableSet.copyOf(tasks.keySet()); |
| } |
| |
| LOGGER.makeAlert("Failed to stop forked tasks") |
| .addData("stillRunning", stillRunning) |
| .addData("elapsed", elapsed) |
| .emit(); |
| |
| LOGGER.warn( |
| "Executor failed to stop after %,dms, not waiting for it! Tasks still running: [%s]", |
| elapsed, |
| Joiner.on("; ").join(stillRunning) |
| ); |
| } |
| } |
| catch (InterruptedException e) { |
| LOGGER.warn(e, "Interrupted while waiting for executor to finish."); |
| Thread.currentThread().interrupt(); |
| } |
| } else { |
| LOGGER.warn("Ran out of time, not waiting for executor to finish!"); |
| } |
| } |
| |
| @Override |
| public void shutdown(final String taskid, String reason) |
| { |
| LOGGER.info("Shutdown [%s] because: [%s]", taskid, reason); |
| final ForkingTaskRunnerWorkItem taskInfo; |
| |
| synchronized (tasks) { |
| taskInfo = tasks.get(taskid); |
| |
| if (taskInfo == null) { |
| LOGGER.info("Ignoring request to cancel unknown task: %s", taskid); |
| return; |
| } |
| |
| taskInfo.shutdown = true; |
| |
| shutdownTaskProcess(taskInfo); |
| } |
| } |
| |
| @Override |
| public Collection<TaskRunnerWorkItem> getRunningTasks() |
| { |
| synchronized (tasks) { |
| final List<TaskRunnerWorkItem> ret = new ArrayList<>(); |
| for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { |
| if (taskWorkItem.processHolder != null) { |
| ret.add(taskWorkItem); |
| } |
| } |
| return ret; |
| } |
| } |
| |
| @Override |
| public Collection<TaskRunnerWorkItem> getPendingTasks() |
| { |
| synchronized (tasks) { |
| final List<TaskRunnerWorkItem> ret = new ArrayList<>(); |
| for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { |
| if (taskWorkItem.processHolder == null) { |
| ret.add(taskWorkItem); |
| } |
| } |
| return ret; |
| } |
| } |
| |
| @Nullable |
| @Override |
| public RunnerTaskState getRunnerTaskState(String taskId) |
| { |
| final ForkingTaskRunnerWorkItem workItem = tasks.get(taskId); |
| if (workItem == null) { |
| return null; |
| } else { |
| if (workItem.processHolder == null) { |
| return RunnerTaskState.PENDING; |
| } else if (workItem.processHolder.process.isAlive()) { |
| return RunnerTaskState.RUNNING; |
| } else { |
| return RunnerTaskState.NONE; |
| } |
| } |
| } |
| |
| @Override |
| public Optional<ScalingStats> getScalingStats() |
| { |
| return Optional.absent(); |
| } |
| |
| @Override |
| @LifecycleStart |
| public void start() |
| { |
| setNumProcessorsPerTask(); |
| } |
| |
| @Override |
| public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException |
| { |
| final ProcessHolder processHolder; |
| |
| synchronized (tasks) { |
| final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(taskid); |
| if (taskWorkItem != null && taskWorkItem.processHolder != null) { |
| processHolder = taskWorkItem.processHolder; |
| } else { |
| return Optional.absent(); |
| } |
| } |
| return Optional.of(LogUtils.streamFile(processHolder.logFile, offset)); |
| } |
| |
| /** |
| * Close task output stream (input stream of process) sending EOF telling process to terminate, destroying the process |
| * if an exception is encountered. |
| */ |
| private void shutdownTaskProcess(ForkingTaskRunnerWorkItem taskInfo) |
| { |
| if (taskInfo.processHolder != null) { |
| // Will trigger normal failure mechanisms due to process exit |
| LOGGER.info("Closing output stream to task[%s].", taskInfo.getTask().getId()); |
| try { |
| taskInfo.processHolder.process.getOutputStream().close(); |
| } |
| catch (Exception e) { |
| LOGGER.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskInfo.getTask().getId()); |
| taskInfo.processHolder.process.destroy(); |
| } |
| } |
| } |
| |
| public static String getMaskedCommand(List<String> maskedProperties, List<String> command) |
| { |
| final Set<String> maskedPropertiesSet = Sets.newHashSet(maskedProperties); |
| final Iterator<String> maskedIterator = command.stream().map(element -> { |
| String[] splits = element.split("=", 2); |
| if (splits.length == 2) { |
| for (String masked : maskedPropertiesSet) { |
| if (splits[0].contains(masked)) { |
| return StringUtils.format("%s=%s", splits[0], "<masked>"); |
| } |
| } |
| } |
| return element; |
| }).iterator(); |
| return Joiner.on(" ").join(maskedIterator); |
| } |
| |
| @Override |
| public Map<String, Long> getTotalTaskSlotCount() |
| { |
| return ImmutableMap.of(workerConfig.getCategory(), getTotalTaskSlotCountLong()); |
| } |
| |
| public long getTotalTaskSlotCountLong() |
| { |
| return workerConfig.getCapacity(); |
| } |
| |
| @Override |
| public Map<String, Long> getIdleTaskSlotCount() |
| { |
| return ImmutableMap.of(workerConfig.getCategory(), Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0)); |
| } |
| |
| @Override |
| public Map<String, Long> getUsedTaskSlotCount() |
| { |
| return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(portFinder.findUsedPortCount())); |
| } |
| |
| public long getUsedTaskSlotCountLong() |
| { |
| return portFinder.findUsedPortCount(); |
| } |
| |
| @Override |
| public Map<String, Long> getLazyTaskSlotCount() |
| { |
| return ImmutableMap.of(workerConfig.getCategory(), 0L); |
| } |
| |
| @Override |
| public Map<String, Long> getBlacklistedTaskSlotCount() |
| { |
| return ImmutableMap.of(workerConfig.getCategory(), 0L); |
| } |
| |
| @Override |
| public Long getWorkerFailedTaskCount() |
| { |
| long failedTaskCount = this.failedTaskCount.get(); |
| long lastReportedFailedTaskCount = this.lastReportedFailedTaskCount.get(); |
| this.lastReportedFailedTaskCount.set(failedTaskCount); |
| return failedTaskCount - lastReportedFailedTaskCount; |
| } |
| |
| @Override |
| public Long getWorkerIdleTaskSlotCount() |
| { |
| return Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0); |
| } |
| |
| @Override |
| public Long getWorkerUsedTaskSlotCount() |
| { |
| return (long) portFinder.findUsedPortCount(); |
| } |
| |
| @Override |
| public Long getWorkerTotalTaskSlotCount() |
| { |
| return getTotalTaskSlotCountLong(); |
| } |
| |
| @Override |
| public String getWorkerCategory() |
| { |
| return workerConfig.getCategory(); |
| } |
| |
| @Override |
| public String getWorkerVersion() |
| { |
| return workerConfig.getVersion(); |
| } |
| |
| @Override |
| public Long getWorkerSuccessfulTaskCount() |
| { |
| long successfulTaskCount = this.successfulTaskCount.get(); |
| long lastReportedSuccessfulTaskCount = this.lastReportedSuccessfulTaskCount.get(); |
| this.lastReportedSuccessfulTaskCount.set(successfulTaskCount); |
| return successfulTaskCount - lastReportedSuccessfulTaskCount; |
| } |
| |
| @VisibleForTesting |
| void setNumProcessorsPerTask() |
| { |
| // Divide number of available processors by the number of tasks. |
| // This prevents various automatically-sized thread pools from being unreasonably large (we don't want each |
| // task to size its pools as if it is the only thing on the entire machine). |
| |
| final int availableProcessors = JvmUtils.getRuntimeInfo().getAvailableProcessors(); |
| numProcessorsPerTask = Math.max( |
| 1, |
| IntMath.divide(availableProcessors, workerConfig.getCapacity(), RoundingMode.CEILING) |
| ); |
| } |
| |
| protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem |
| { |
| private final Task task; |
| |
| private volatile boolean shutdown = false; |
| private volatile ProcessHolder processHolder = null; |
| |
| private ForkingTaskRunnerWorkItem( |
| Task task, |
| ListenableFuture<TaskStatus> statusFuture |
| ) |
| { |
| super(task.getId(), statusFuture); |
| this.task = task; |
| } |
| |
| public Task getTask() |
| { |
| return task; |
| } |
| |
| @Override |
| public TaskLocation getLocation() |
| { |
| if (processHolder == null) { |
| return TaskLocation.unknown(); |
| } else { |
| return processHolder.location; |
| } |
| } |
| |
| @Override |
| public String getTaskType() |
| { |
| return task.getType(); |
| } |
| |
| @Override |
| public String getDataSource() |
| { |
| return task.getDataSource(); |
| } |
| } |
| |
| public static class ProcessHolder |
| { |
| private final Process process; |
| private final File logFile; |
| private final TaskLocation location; |
| |
| public ProcessHolder(Process process, File logFile, TaskLocation location) |
| { |
| this.process = process; |
| this.logFile = logFile; |
| this.location = location; |
| } |
| |
| private void registerWithCloser(Closer closer) |
| { |
| closer.register(process.getInputStream()); |
| closer.register(process.getOutputStream()); |
| } |
| |
| private void shutdown() |
| { |
| process.destroy(); |
| } |
| } |
| |
| @VisibleForTesting |
| static int getNextAttemptID(File taskDir) |
| { |
| File attemptDir = new File(taskDir, "attempt"); |
| try { |
| FileUtils.mkdirp(attemptDir); |
| } |
| catch (IOException e) { |
| throw new ISE("Error creating directory", e); |
| } |
| int maxAttempt = |
| Arrays.stream(attemptDir.listFiles(File::isDirectory)) |
| .mapToInt(x -> Integer.parseInt(x.getName())) |
| .max().orElse(0); |
| // now make the directory |
| File attempt = new File(attemptDir, String.valueOf(maxAttempt + 1)); |
| try { |
| FileUtils.mkdirp(attempt); |
| } |
| catch (IOException e) { |
| throw new ISE("Error creating directory", e); |
| } |
| return maxAttempt + 1; |
| } |
| |
| public static class CommandListBuilder |
| { |
| ArrayList<String> commandList = new ArrayList<>(); |
| |
| public CommandListBuilder add(String arg) |
| { |
| commandList.add(arg); |
| return this; |
| } |
| |
| public CommandListBuilder addSystemProperty(String property, int value) |
| { |
| return addSystemProperty(property, String.valueOf(value)); |
| } |
| |
| public CommandListBuilder addSystemProperty(String property, long value) |
| { |
| return addSystemProperty(property, String.valueOf(value)); |
| } |
| |
| public CommandListBuilder addSystemProperty(String property, boolean value) |
| { |
| return addSystemProperty(property, String.valueOf(value)); |
| } |
| |
| public CommandListBuilder addSystemProperty(String property, String value) |
| { |
| return add(StringUtils.format("-D%s=%s", property, value)); |
| } |
| |
| public CommandListBuilder addAll(Iterable<String> args) |
| { |
| for (String arg : args) { |
| add(arg); |
| } |
| return this; |
| } |
| |
| public ArrayList<String> getCommandList() |
| { |
| return commandList; |
| } |
| |
| } |
| } |
| |