blob: a891de660b1bffe85307a06771e2ba1c19181273 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.internal;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import org.apache.twill.api.LocalFile;
import org.apache.twill.api.RunId;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.state.Message;
import org.apache.twill.launcher.FindFreePort;
import org.apache.twill.launcher.TwillLauncher;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* This class helps launching a container.
*/
public final class TwillContainerLauncher {
private static final Logger LOG = LoggerFactory.getLogger(TwillContainerLauncher.class);
private static final double HEAP_MIN_RATIO = 0.7d;
private final RuntimeSpecification runtimeSpec;
private final ProcessLauncher.PrepareLaunchContext launchContext;
private final ZKClient zkClient;
private final int instanceCount;
private final JvmOptions jvmOpts;
private final int reservedMemory;
private final Location secureStoreLocation;
public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ProcessLauncher.PrepareLaunchContext launchContext,
ZKClient zkClient, int instanceCount, JvmOptions jvmOpts, int reservedMemory,
Location secureStoreLocation) {
this.runtimeSpec = runtimeSpec;
this.launchContext = launchContext;
this.zkClient = zkClient;
this.instanceCount = instanceCount;
this.jvmOpts = jvmOpts;
this.reservedMemory = reservedMemory;
this.secureStoreLocation = secureStoreLocation;
}
public TwillContainerController start(RunId runId, int instanceId, Class<?> mainClass, String classPath) {
ProcessLauncher.PrepareLaunchContext.AfterResources afterResources = null;
ProcessLauncher.PrepareLaunchContext.ResourcesAdder resourcesAdder = null;
// Clean up zookeeper path in case this is a retry and there are old messages and state there.
Futures.getUnchecked(ZKOperations.ignoreError(
ZKOperations.recursiveDelete(zkClient, "/" + runId), KeeperException.NoNodeException.class, null));
// Adds all file to be localized to container
if (!runtimeSpec.getLocalFiles().isEmpty()) {
resourcesAdder = launchContext.withResources();
for (LocalFile localFile : runtimeSpec.getLocalFiles()) {
afterResources = resourcesAdder.add(localFile);
}
}
// Optionally localize secure store.
try {
if (secureStoreLocation != null && secureStoreLocation.exists()) {
if (resourcesAdder == null) {
resourcesAdder = launchContext.withResources();
}
afterResources = resourcesAdder.add(new DefaultLocalFile(Constants.Files.CREDENTIALS,
secureStoreLocation.toURI(),
secureStoreLocation.lastModified(),
secureStoreLocation.length(), false, null));
}
} catch (IOException e) {
LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation.toURI());
}
if (afterResources == null) {
afterResources = launchContext.noResources();
}
int memory = runtimeSpec.getResourceSpecification().getMemorySize();
if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) {
// Reduce -Xmx by the reserved memory size.
memory = runtimeSpec.getResourceSpecification().getMemorySize() - reservedMemory;
} else {
// If it is a small VM, just discount it by the min ratio.
memory = (int) Math.ceil(memory * HEAP_MIN_RATIO);
}
// Currently no reporting is supported for runnable containers
ProcessLauncher.PrepareLaunchContext.MoreEnvironment afterEnvironment = afterResources
.withEnvironment()
.add(EnvKeys.TWILL_RUN_ID, runId.getId())
.add(EnvKeys.TWILL_RUNNABLE_NAME, runtimeSpec.getName())
.add(EnvKeys.TWILL_INSTANCE_ID, Integer.toString(instanceId))
.add(EnvKeys.TWILL_INSTANCE_COUNT, Integer.toString(instanceCount));
// assemble the command based on jvm options
ImmutableList.Builder<String> commandBuilder = ImmutableList.builder();
String firstCommand;
if (jvmOpts.getDebugOptions().doDebug(runtimeSpec.getName())) {
// for debugging we run a quick Java program to find a free port, then pass that port as the debug port and also
// as a System property to the runnable (Java has no general way to determine the port from within the JVM).
// PORT=$(java FindFreePort) && java -agentlib:jdwp=...,address=\$PORT -Dtwill.debug.port=\$PORT... TwillLauncher
// The $ must be escaped, otherwise it gets expanded (to "") before the command is submitted.
String suspend = jvmOpts.getDebugOptions().doSuspend() ? "y" : "n";
firstCommand = "TWILL_DEBUG_PORT=$($JAVA_HOME/bin/java";
commandBuilder.add("-cp", Constants.Files.LAUNCHER_JAR,
FindFreePort.class.getName() + ")",
"&&", // this will stop if FindFreePort fails
"$JAVA_HOME/bin/java",
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=" + suspend + "," +
"address=\\$TWILL_DEBUG_PORT",
"-Dtwill.debug.port=\\$TWILL_DEBUG_PORT"
);
} else {
firstCommand = "$JAVA_HOME/bin/java";
}
commandBuilder.add("-Djava.io.tmpdir=tmp",
"-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
"-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,
"-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
"-Xmx" + memory + "m");
if (jvmOpts.getExtraOptions() != null) {
commandBuilder.add(jvmOpts.getExtraOptions());
}
commandBuilder.add(TwillLauncher.class.getName(),
Constants.Files.CONTAINER_JAR,
mainClass.getName(),
Boolean.TRUE.toString());
List<String> command = commandBuilder.build();
ProcessController<Void> processController = afterEnvironment
.withCommands().add(firstCommand, command.toArray(new String[command.size()]))
.redirectOutput(Constants.STDOUT).redirectError(Constants.STDERR)
.launch();
TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, processController);
controller.start();
return controller;
}
private static final class TwillContainerControllerImpl extends AbstractZKServiceController
implements TwillContainerController {
private final ProcessController<Void> processController;
private volatile ContainerLiveNodeData liveData;
protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId,
ProcessController<Void> processController) {
super(runId, zkClient);
this.processController = processController;
}
@Override
protected void doStartUp() {
// No-op
}
@Override
protected void doShutDown() {
// No-op
}
@Override
protected void instanceNodeUpdated(NodeData nodeData) {
if (nodeData == null || nodeData.getData() == null) {
LOG.warn("Instance node was updated but data is null.");
return;
}
try {
Gson gson = new Gson();
JsonElement json = gson.fromJson(new String(nodeData.getData(), Charsets.UTF_8), JsonElement.class);
if (json.isJsonObject()) {
JsonElement data = json.getAsJsonObject().get("data");
if (data != null) {
this.liveData = gson.fromJson(data, ContainerLiveNodeData.class);
LOG.info("Container LiveNodeData updated: " + new String(nodeData.getData(), Charsets.UTF_8));
}
}
} catch (Throwable t) {
LOG.warn("Error deserializing updated instance node data", t);
}
}
@Override
protected void instanceNodeFailed(Throwable cause) {
// No-op
}
@Override
public ListenableFuture<Message> sendMessage(Message message) {
return sendMessage(message, message);
}
@Override
public synchronized void completed(int exitStatus) {
forceShutDown();
}
@Override
public ContainerLiveNodeData getLiveNodeData() {
return liveData;
}
@Override
public void kill() {
processController.cancel();
}
}
}