blob: 0b98ba6d8b088391afb21a6481c0dd60ad030cb2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.internal;
import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import org.apache.twill.api.RunId;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.utils.Resources;
import org.apache.twill.launcher.FindFreePort;
import org.apache.twill.launcher.TwillLauncher;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* This class helps launching a container.
*/
public final class TwillContainerLauncher {
private static final Logger LOG = LoggerFactory.getLogger(TwillContainerLauncher.class);
private final RuntimeSpecification runtimeSpec;
private final ContainerInfo containerInfo;
private final ProcessLauncher.PrepareLaunchContext launchContext;
private final ZKClient zkClient;
private final int instanceCount;
private final JvmOptions jvmOpts;
private final int reservedMemory;
private final double minHeapRatio;
private final Location secureStoreLocation;
private int maxHeapSizeMB;
public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo,
ProcessLauncher.PrepareLaunchContext launchContext,
ZKClient zkClient, int instanceCount, JvmOptions jvmOpts,
int reservedMemory, double minHeapRatio,
Location secureStoreLocation) {
this.runtimeSpec = runtimeSpec;
this.containerInfo = containerInfo;
this.launchContext = launchContext;
this.zkClient = zkClient;
this.instanceCount = instanceCount;
this.jvmOpts = jvmOpts;
this.reservedMemory = reservedMemory;
this.minHeapRatio = minHeapRatio;
this.secureStoreLocation = secureStoreLocation;
}
/**
* Start execution run for a class in a container. Will return instance of {@link TwillContainerController}
* that allows sending messages directly to the container.
*
* @param runId Use to represent unique id of the container run.
* @param instanceId The Twill instance Id.
* @param mainClass The main class to run in the container.
* @param classPath The class path to load classes for the container.
* @param logLevelLocation The log level file location for the container to localize.
* @return instance of {@link TwillContainerController} to control the container run.
*/
public TwillContainerController start(RunId runId, int instanceId, Class<?> mainClass, String classPath,
@Nullable Location logLevelLocation) {
// Clean up zookeeper path in case this is a retry and there are old messages and state there.
Futures.getUnchecked(ZKOperations.ignoreError(
ZKOperations.recursiveDelete(zkClient, "/" + runId), KeeperException.NoNodeException.class, null));
// Adds all file to be localized to container
launchContext.addResources(runtimeSpec.getLocalFiles());
// Optionally localize secure store and log level file.
try {
if (secureStoreLocation != null && secureStoreLocation.exists()) {
launchContext.addResources(new DefaultLocalFile(Constants.Files.CREDENTIALS,
secureStoreLocation.toURI(),
secureStoreLocation.lastModified(),
secureStoreLocation.length(), false, null));
}
if (logLevelLocation != null && logLevelLocation.exists()) {
launchContext.addResources(new DefaultLocalFile(Constants.Files.LOG_LEVELS,
logLevelLocation.toURI(),
logLevelLocation.lastModified(),
logLevelLocation.length(), false, null));
}
} catch (IOException e) {
LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation);
}
// Currently no reporting is supported for runnable containers
launchContext
.addEnvironment(EnvKeys.TWILL_RUN_ID, runId.getId())
.addEnvironment(EnvKeys.TWILL_RUNNABLE_NAME, runtimeSpec.getName())
.addEnvironment(EnvKeys.TWILL_INSTANCE_ID, Integer.toString(instanceId))
.addEnvironment(EnvKeys.TWILL_INSTANCE_COUNT, Integer.toString(instanceCount));
// assemble the command based on jvm options
ImmutableList.Builder<String> commandBuilder = ImmutableList.builder();
String firstCommand;
if (jvmOpts.getDebugOptions().doDebug(runtimeSpec.getName())) {
// for debugging we run a quick Java program to find a free port, then pass that port as the debug port and also
// as a System property to the runnable (Java has no general way to determine the port from within the JVM).
// PORT=$(java FindFreePort) && java -agentlib:jdwp=...,address=\$PORT -Dtwill.debug.port=\$PORT... TwillLauncher
// The $ must be escaped, otherwise it gets expanded (to "") before the command is submitted.
String suspend = jvmOpts.getDebugOptions().doSuspend() ? "y" : "n";
firstCommand = "TWILL_DEBUG_PORT=$($JAVA_HOME/bin/java";
commandBuilder.add("-cp", Constants.Files.LAUNCHER_JAR,
FindFreePort.class.getName() + ")",
"&&", // this will stop if FindFreePort fails
"$JAVA_HOME/bin/java",
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=" + suspend + "," +
"address=\\$TWILL_DEBUG_PORT",
"-Dtwill.debug.port=\\$TWILL_DEBUG_PORT"
);
} else {
firstCommand = "$JAVA_HOME/bin/java";
}
maxHeapSizeMB = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory, minHeapRatio);
commandBuilder.add("-Djava.io.tmpdir=tmp",
"-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
"-Dtwill.runnable=$" + Constants.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,
"-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
"-Xmx" + maxHeapSizeMB + "m");
String extraOptions = jvmOpts.getRunnableExtraOptions(runtimeSpec.getName());
if (!extraOptions.isEmpty()) {
commandBuilder.add(extraOptions);
}
commandBuilder.add(TwillLauncher.class.getName(),
mainClass.getName(),
Boolean.TRUE.toString());
List<String> command = commandBuilder.build();
ProcessController<Void> processController = launchContext
.addCommand(firstCommand, command.toArray(new String[command.size()]))
.launch();
TwillContainerControllerImpl controller =
new TwillContainerControllerImpl(zkClient, runId, runtimeSpec.getName(), instanceId, processController);
controller.start();
return controller;
}
/**
* Returns the maximum heap memory size in MB of the Java process launched in the container.
* This method can only be called after the {@link #start(RunId, int, Class, String, Location)} method.
*
* @throws IllegalStateException if the {@link #start(RunId, int, Class, String, Location)} was not called yet.
*/
public int getMaxHeapMemoryMB() {
if (maxHeapSizeMB <= 0) {
throw new IllegalStateException("Unknown maximum heap memory size. Please make sure the container is started");
}
return maxHeapSizeMB;
}
private static final class TwillContainerControllerImpl extends AbstractZKServiceController
implements TwillContainerController {
private final String runnable;
private final int instanceId;
private final ProcessController<Void> processController;
// This latch can be used to wait for container shutdown
private final CountDownLatch shutdownLatch;
private volatile ContainerLiveNodeData liveData;
protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId, String runnable, int instanceId,
ProcessController<Void> processController) {
super(runId, zkClient);
this.runnable = runnable;
this.instanceId = instanceId;
this.processController = processController;
this.shutdownLatch = new CountDownLatch(1);
}
@Override
protected void doStartUp() {
// No-op
}
@Override
protected void doShutDown() {
// Wait for sometime for the container to stop
// TODO: Use configurable value for stop time (TWILL-192)
int maxWaitSecs = Constants.APPLICATION_MAX_STOP_SECONDS - 30;
maxWaitSecs = maxWaitSecs < 15 ? 15 : maxWaitSecs;
try {
if (Uninterruptibles.awaitUninterruptibly(shutdownLatch, maxWaitSecs, TimeUnit.SECONDS)) {
return;
}
} catch (Exception e) {
LOG.error("Got exception while trying to stop runnable {}, instance {}", runnable, instanceId, e);
}
// Container has not shutdown even after maxWaitSecs after sending stop message,
// we'll need to kill the container
LOG.warn("Killing runnable {}, instance {} after waiting {} secs", runnable, instanceId, maxWaitSecs);
killAndWait(maxWaitSecs);
}
@Override
protected void instanceNodeUpdated(NodeData nodeData) {
if (nodeData == null || nodeData.getData() == null) {
LOG.warn("Instance node was updated but data is null.");
return;
}
try {
Gson gson = new Gson();
JsonElement json = gson.fromJson(new String(nodeData.getData(), Charsets.UTF_8), JsonElement.class);
if (json.isJsonObject()) {
JsonElement data = json.getAsJsonObject().get("data");
if (data != null) {
this.liveData = gson.fromJson(data, ContainerLiveNodeData.class);
LOG.info("Container LiveNodeData updated: " + new String(nodeData.getData(), Charsets.UTF_8));
}
}
} catch (Throwable t) {
LOG.warn("Error deserializing updated instance node data", t);
}
}
@Override
protected void instanceNodeFailed(Throwable cause) {
// No-op
}
@Override
public ListenableFuture<Message> sendMessage(Message message) {
return sendMessage(message, message);
}
@Override
public void completed(int exitStatus) {
// count down the shutdownLatch to inform any waiting threads that this container is complete
if (exitStatus == 0) {
setTerminationStatus(TerminationStatus.SUCCEEDED);
} else if (exitStatus == 143) {
setTerminationStatus(TerminationStatus.KILLED);
} else {
setTerminationStatus(TerminationStatus.FAILED);
}
shutdownLatch.countDown();
synchronized (this) {
forceShutDown();
}
}
@Override
public ContainerLiveNodeData getLiveNodeData() {
return liveData;
}
@Override
public void kill() {
processController.cancel();
}
@Override
public int getInstanceId() {
return instanceId;
}
private void killAndWait(int maxWaitSecs) {
Stopwatch watch = new Stopwatch();
watch.start();
while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) {
// Kill the application
try {
kill();
} catch (Exception e) {
LOG.error("Exception while killing runnable {}, instance {}", runnable, instanceId, e);
}
// Wait on the shutdownLatch,
// if the runnable has stopped then the latch will be count down by completed() method
if (Uninterruptibles.awaitUninterruptibly(shutdownLatch, 10, TimeUnit.SECONDS)) {
// Runnable has stopped now
return;
}
}
// Timeout reached, runnable has not stopped
LOG.error("Failed to kill runnable {}, instance {} after {} seconds", runnable, instanceId,
watch.elapsedTime(TimeUnit.SECONDS));
// TODO: should we throw exception here?
}
}
}