blob: 36aa925ef329aa5a28339bf1d15105d846765649 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CharMatcher;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.server.DruidNode;
import io.druid.tasklogs.TaskLogPusher;
import io.druid.tasklogs.TaskLogStreamer;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
/**
* Runs tasks in separate processes using the "internal peon" verb.
*/
public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
{
private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
private static final Splitter whiteSpaceSplitter = Splitter.on(CharMatcher.WHITESPACE).omitEmptyStrings();
private final ForkingTaskRunnerConfig config;
private final TaskConfig taskConfig;
private final Properties props;
private final TaskLogPusher taskLogPusher;
private final DruidNode node;
private final ListeningExecutorService exec;
private final ObjectMapper jsonMapper;
private final PortFinder portFinder;
private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newHashMap();
@Inject
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
TaskConfig taskConfig,
WorkerConfig workerConfig,
Properties props,
TaskLogPusher taskLogPusher,
ObjectMapper jsonMapper,
@Self DruidNode node
)
{
this.config = config;
this.taskConfig = taskConfig;
this.props = props;
this.taskLogPusher = taskLogPusher;
this.jsonMapper = jsonMapper;
this.node = node;
this.portFinder = new PortFinder(config.getStartPort());
this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(workerConfig.getCapacity()));
}
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
synchronized (tasks) {
if (!tasks.containsKey(task.getId())) {
tasks.put(
task.getId(),
new ForkingTaskRunnerWorkItem(
task.getId(),
exec.submit(
new Callable<TaskStatus>()
{
@Override
public TaskStatus call()
{
final String attemptUUID = UUID.randomUUID().toString();
final File taskDir = new File(taskConfig.getBaseTaskDir(), task.getId());
final File attemptDir = new File(taskDir, attemptUUID);
final ProcessHolder processHolder;
final int childPort = portFinder.findUnusedPort();
try {
final Closer closer = Closer.create();
try {
if (!attemptDir.mkdirs()) {
throw new IOException(String.format("Could not create directories: %s", attemptDir));
}
final File taskFile = new File(attemptDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(attemptDir, "log");
// time to adjust process holders
synchronized (tasks) {
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
if (taskWorkItem.shutdown) {
throw new IllegalStateException("Task has been shut down!");
}
if (taskWorkItem == null) {
log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
}
if (taskWorkItem.processHolder != null) {
log.makeAlert("WTF?! TaskInfo already has a processHolder")
.addData("task", task.getId())
.emit();
throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
}
final List<String> command = Lists.newArrayList();
final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort);
final String taskClasspath;
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
taskClasspath = Joiner.on(File.pathSeparator).join(
task.getClasspathPrefix(),
config.getClasspath()
);
} else {
taskClasspath = config.getClasspath();
}
command.add(config.getJavaCommand());
command.add("-cp");
command.add(taskClasspath);
Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
for (String propName : props.stringPropertyNames()) {
for (String allowedPrefix : config.getAllowedPrefixes()) {
if (propName.startsWith(allowedPrefix)) {
command.add(
String.format(
"-D%s=%s",
propName,
props.getProperty(propName)
)
);
}
}
}
// Override child JVM specific properties
for (String propName : props.stringPropertyNames()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.add(
String.format(
"-D%s=%s",
propName.substring(CHILD_PROPERTY_PREFIX.length()),
props.getProperty(propName)
)
);
}
}
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
command.add("io.druid.cli.Main");
command.add("internal");
command.add("peon");
command.add(taskFile.toString());
command.add(statusFile.toString());
String nodeType = task.getNodeType();
if (nodeType != null) {
command.add("--nodeType");
command.add(nodeType);
}
jsonMapper.writeValue(taskFile, task);
log.info("Running command: %s", Joiner.on(" ").join(command));
taskWorkItem.processHolder = new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
childPort
);
processHolder = taskWorkItem.processHolder;
processHolder.registerWithCloser(closer);
}
log.info("Logging task %s output to: %s", task.getId(), logFile);
boolean runFailed = true;
try (final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput()) {
ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
final int statusCode = processHolder.process.waitFor();
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
if (statusCode == 0) {
runFailed = false;
}
}
finally {
// Upload task logs
taskLogPusher.pushTaskLog(task.getId(), logFile);
}
if (!runFailed) {
// Process exited successfully
return jsonMapper.readValue(statusFile, TaskStatus.class);
} else {
// Process exited unsuccessfully
return TaskStatus.failure(task.getId());
}
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
catch (Throwable t) {
log.info(t, "Exception caught during execution");
throw Throwables.propagate(t);
}
finally {
try {
synchronized (tasks) {
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
taskWorkItem.processHolder.process.destroy();
}
}
portFinder.markPortUnused(childPort);
log.info("Removing temporary directory: %s", attemptDir);
FileUtils.deleteDirectory(attemptDir);
}
catch (Exception e) {
log.error(e, "Suppressing exception caught while cleaning up task");
}
}
}
}
)
)
);
}
return tasks.get(task.getId()).getResult();
}
}
@LifecycleStop
public void stop()
{
synchronized (tasks) {
exec.shutdown();
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
log.info("Destroying process: %s", taskWorkItem.processHolder.process);
taskWorkItem.processHolder.process.destroy();
}
}
}
}
@Override
public void shutdown(final String taskid)
{
final ForkingTaskRunnerWorkItem taskInfo;
synchronized (tasks) {
taskInfo = tasks.get(taskid);
if (taskInfo == null) {
log.info("Ignoring request to cancel unknown task: %s", taskid);
return;
}
taskInfo.shutdown = true;
}
if (taskInfo.processHolder != null) {
// Will trigger normal failure mechanisms due to process exit
log.info("Killing process for task: %s", taskid);
taskInfo.processHolder.process.destroy();
}
}
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
synchronized (tasks) {
final List<TaskRunnerWorkItem> ret = Lists.newArrayList();
for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
ret.add(taskWorkItem);
}
}
return ret;
}
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
synchronized (tasks) {
final List<TaskRunnerWorkItem> ret = Lists.newArrayList();
for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder == null) {
ret.add(taskWorkItem);
}
}
return ret;
}
}
@Override
public Collection<TaskRunnerWorkItem> getKnownTasks()
{
synchronized (tasks) {
return Lists.<TaskRunnerWorkItem>newArrayList(tasks.values());
}
}
@Override
public Collection<ZkWorker> getWorkers()
{
return ImmutableList.of();
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskid, final long offset)
{
final ProcessHolder processHolder;
synchronized (tasks) {
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(taskid);
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
processHolder = taskWorkItem.processHolder;
} else {
return Optional.absent();
}
}
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
final RandomAccessFile raf = new RandomAccessFile(processHolder.logFile, "r");
final long rafLength = raf.length();
if (offset > 0) {
raf.seek(offset);
} else if (offset < 0 && offset < rafLength) {
raf.seek(Math.max(0, rafLength + offset));
}
return Channels.newInputStream(raf.getChannel());
}
}
);
}
private int findUnusedPort()
{
synchronized (tasks) {
int port = config.getStartPort();
int maxPortSoFar = -1;
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
if (taskWorkItem.processHolder.port > maxPortSoFar) {
maxPortSoFar = taskWorkItem.processHolder.port;
}
if (taskWorkItem.processHolder.port == port) {
port = maxPortSoFar + 1;
}
}
}
return port;
}
}
private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private volatile boolean shutdown = false;
private volatile ProcessHolder processHolder = null;
private ForkingTaskRunnerWorkItem(
String taskId,
ListenableFuture<TaskStatus> statusFuture
)
{
super(taskId, statusFuture);
}
}
private static class ProcessHolder
{
private final Process process;
private final File logFile;
private final int port;
private ProcessHolder(Process process, File logFile, int port)
{
this.process = process;
this.logFile = logFile;
this.port = port;
}
private void registerWithCloser(Closer closer)
{
closer.register(process.getInputStream());
closer.register(process.getOutputStream());
}
}
}