blob: 6a3313147573b4c5510236ea58f3284504ea2320 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.yarn;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Callables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.RunId;
import org.apache.twill.api.SecureStore;
import org.apache.twill.api.SecureStoreUpdater;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillRunnable;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.ServiceListenerAdapter;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.HDFSLocationFactory;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.RunIds;
import org.apache.twill.internal.SingleRunnableApplication;
import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
import org.apache.twill.internal.yarn.YarnAppClient;
import org.apache.twill.internal.yarn.YarnApplicationReport;
import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* An implementation of {@link org.apache.twill.api.TwillRunnerService} that runs application on a YARN cluster.
*/
public final class YarnTwillRunnerService extends AbstractIdleService implements TwillRunnerService {
private static final Logger LOG = LoggerFactory.getLogger(YarnTwillRunnerService.class);
private static final int ZK_TIMEOUT = 10000;
private static final Function<String, RunId> STRING_TO_RUN_ID = new Function<String, RunId>() {
@Override
public RunId apply(String input) {
return RunIds.fromString(input);
}
};
private static final Function<YarnTwillController, TwillController> CAST_CONTROLLER =
new Function<YarnTwillController, TwillController>() {
@Override
public TwillController apply(YarnTwillController controller) {
return controller;
}
};
private final YarnConfiguration yarnConfig;
private final YarnAppClient yarnAppClient;
private final ZKClientService zkClientService;
private final LocationFactory locationFactory;
private final Table<String, RunId, YarnTwillController> controllers;
private ScheduledExecutorService secureStoreScheduler;
private Iterable<LiveInfo> liveInfos;
private Cancellable watchCancellable;
private volatile String jvmOptions = null;
/**
* Creates an instance with a {@link HDFSLocationFactory} created base on the given configuration with the
* user home directory as the location factory namespace.
*
* @param config Configuration of the yarn cluster
* @param zkConnect ZooKeeper connection string
*/
public YarnTwillRunnerService(YarnConfiguration config, String zkConnect) {
this(config, zkConnect, createDefaultLocationFactory(config));
}
/**
* Creates an instance.
*
* @param config Configuration of the yarn cluster
* @param zkConnect ZooKeeper connection string
* @param locationFactory Factory to create {@link Location} instances that are readable and writable by this service
*/
public YarnTwillRunnerService(YarnConfiguration config, String zkConnect, LocationFactory locationFactory) {
this.yarnConfig = config;
this.yarnAppClient = new VersionDetectYarnAppClientFactory().create(config);
this.locationFactory = locationFactory;
this.zkClientService = getZKClientService(zkConnect);
this.controllers = HashBasedTable.create();
}
/**
* This methods sets the extra JVM options that will be passed to the java command line for every application
* started through this {@link YarnTwillRunnerService} instance. It only affects applications that are started
* after options is set.
*
* This is intended for advance usage. All options will be passed unchanged to the java command line. Invalid
* options could cause application not able to start.
*
* @param options extra JVM options.
*/
public void setJVMOptions(String options) {
Preconditions.checkArgument(options != null, "JVM options cannot be null.");
this.jvmOptions = options;
}
/**
* Returns any extra JVM options that have been set.
* @see #setJVMOptions(String)
*/
public String getJVMOptions() {
return jvmOptions;
}
@Override
public Cancellable scheduleSecureStoreUpdate(final SecureStoreUpdater updater,
long initialDelay, long delay, TimeUnit unit) {
if (!UserGroupInformation.isSecurityEnabled()) {
return new Cancellable() {
@Override
public void cancel() {
// No-op
}
};
}
synchronized (this) {
if (secureStoreScheduler == null) {
secureStoreScheduler = Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("secure-store-updater"));
}
}
final ScheduledFuture<?> future = secureStoreScheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Collects all <application, runId> pairs first
Multimap<String, RunId> liveApps = HashMultimap.create();
synchronized (YarnTwillRunnerService.this) {
for (Table.Cell<String, RunId, YarnTwillController> cell : controllers.cellSet()) {
liveApps.put(cell.getRowKey(), cell.getColumnKey());
}
}
// Collect all secure stores that needs to be updated.
Table<String, RunId, SecureStore> secureStores = HashBasedTable.create();
for (Map.Entry<String, RunId> entry : liveApps.entries()) {
try {
secureStores.put(entry.getKey(), entry.getValue(), updater.update(entry.getKey(), entry.getValue()));
} catch (Throwable t) {
LOG.warn("Exception thrown by SecureStoreUpdater {}", updater, t);
}
}
// Update secure stores.
updateSecureStores(secureStores);
}
}, initialDelay, delay, unit);
return new Cancellable() {
@Override
public void cancel() {
future.cancel(false);
}
};
}
@Override
public TwillPreparer prepare(TwillRunnable runnable) {
return prepare(runnable, ResourceSpecification.BASIC);
}
@Override
public TwillPreparer prepare(TwillRunnable runnable, ResourceSpecification resourceSpecification) {
return prepare(new SingleRunnableApplication(runnable, resourceSpecification));
}
@Override
public TwillPreparer prepare(TwillApplication application) {
Preconditions.checkState(isRunning(), "Service not start. Please call start() first.");
final TwillSpecification twillSpec = application.configure();
final String appName = twillSpec.getName();
return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory, jvmOptions,
new YarnTwillControllerFactory() {
@Override
public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
Callable<ProcessController<YarnApplicationReport>> startUp) {
ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
YarnTwillController controller = listenController(new YarnTwillController(appName, runId, zkClient,
logHandlers, startUp));
synchronized (YarnTwillRunnerService.this) {
Preconditions.checkArgument(!controllers.contains(appName, runId),
"Application %s with runId %s is already running.", appName, runId);
controllers.put(appName, runId, controller);
}
return controller;
}
});
}
@Override
public synchronized TwillController lookup(String applicationName, final RunId runId) {
return controllers.get(applicationName, runId);
}
@Override
public Iterable<TwillController> lookup(final String applicationName) {
return new Iterable<TwillController>() {
@Override
public Iterator<TwillController> iterator() {
synchronized (YarnTwillRunnerService.this) {
return Iterators.transform(ImmutableList.copyOf(controllers.row(applicationName).values()).iterator(),
CAST_CONTROLLER);
}
}
};
}
@Override
public Iterable<LiveInfo> lookupLive() {
return liveInfos;
}
@Override
protected void startUp() throws Exception {
yarnAppClient.startAndWait();
zkClientService.startAndWait();
// Create the root node, so that the namespace root would get created if it is missing
// If the exception is caused by node exists, then it's ok. Otherwise propagate the exception.
ZKOperations.ignoreError(zkClientService.create("/", null, CreateMode.PERSISTENT),
KeeperException.NodeExistsException.class, null).get();
watchCancellable = watchLiveApps();
liveInfos = createLiveInfos();
// Schedule an updater for updating HDFS delegation tokens
if (UserGroupInformation.isSecurityEnabled()) {
long renewalInterval = yarnConfig.getLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
// Schedule it five minutes before it expires.
long delay = renewalInterval - TimeUnit.MINUTES.toMillis(5);
// Just to safeguard. In practice, the value shouldn't be that small, otherwise nothing could work.
if (delay <= 0) {
delay = (renewalInterval <= 2) ? 1 : renewalInterval / 2;
}
scheduleSecureStoreUpdate(new LocationSecureStoreUpdater(yarnConfig, locationFactory),
delay, delay, TimeUnit.MILLISECONDS);
}
}
@Override
protected void shutDown() throws Exception {
// Shutdown shouldn't stop any controllers, as stopping this client service should let the remote containers
// running. However, this assumes that this TwillRunnerService is a long running service and you only stop it
// when the JVM process is about to exit. Hence it is important that threads created in the controllers are
// daemon threads.
synchronized (this) {
if (secureStoreScheduler != null) {
secureStoreScheduler.shutdownNow();
}
}
watchCancellable.cancel();
zkClientService.stopAndWait();
yarnAppClient.stopAndWait();
}
private Cancellable watchLiveApps() {
final Map<String, Cancellable> watched = Maps.newConcurrentMap();
final AtomicBoolean cancelled = new AtomicBoolean(false);
// Watch child changes in the root, which gives all application names.
final Cancellable cancellable = ZKOperations.watchChildren(zkClientService, "/",
new ZKOperations.ChildrenCallback() {
@Override
public void updated(NodeChildren nodeChildren) {
if (cancelled.get()) {
return;
}
Set<String> apps = ImmutableSet.copyOf(nodeChildren.getChildren());
// For each for the application name, watch for ephemeral nodes under /instances.
for (final String appName : apps) {
if (watched.containsKey(appName)) {
continue;
}
final String instancePath = String.format("/%s/instances", appName);
watched.put(appName,
ZKOperations.watchChildren(zkClientService, instancePath, new ZKOperations.ChildrenCallback() {
@Override
public void updated(NodeChildren nodeChildren) {
if (cancelled.get()) {
return;
}
if (nodeChildren.getChildren().isEmpty()) { // No more child, means no live instances
Cancellable removed = watched.remove(appName);
if (removed != null) {
removed.cancel();
}
return;
}
synchronized (YarnTwillRunnerService.this) {
// For each of the children, which the node name is the runId,
// fetch the application Id and construct TwillController.
for (final RunId runId : Iterables.transform(nodeChildren.getChildren(), STRING_TO_RUN_ID)) {
if (controllers.contains(appName, runId)) {
continue;
}
updateController(appName, runId, cancelled);
}
}
}
}));
}
// Remove app watches for apps that are gone. Removal of controller from controllers table is done
// in the state listener attached to the twill controller.
for (String removeApp : Sets.difference(watched.keySet(), apps)) {
watched.remove(removeApp).cancel();
}
}
});
return new Cancellable() {
@Override
public void cancel() {
cancelled.set(true);
cancellable.cancel();
for (Cancellable c : watched.values()) {
c.cancel();
}
}
};
}
private YarnTwillController listenController(final YarnTwillController controller) {
controller.addListener(new ServiceListenerAdapter() {
@Override
public void terminated(State from) {
removeController();
}
@Override
public void failed(State from, Throwable failure) {
removeController();
}
private void removeController() {
synchronized (YarnTwillRunnerService.this) {
Iterables.removeIf(controllers.values(),
new Predicate<TwillController>() {
@Override
public boolean apply(TwillController input) {
return input == controller;
}
});
}
}
}, Threads.SAME_THREAD_EXECUTOR);
return controller;
}
private ZKClientService getZKClientService(String zkConnect) {
return ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnect)
.setSessionTimeout(ZK_TIMEOUT)
.build(), RetryStrategies.exponentialDelay(100, 2000, TimeUnit.MILLISECONDS))));
}
private Iterable<LiveInfo> createLiveInfos() {
return new Iterable<LiveInfo>() {
@Override
public Iterator<LiveInfo> iterator() {
Map<String, Map<RunId, YarnTwillController>> controllerMap;
synchronized (YarnTwillRunnerService.this) {
controllerMap = ImmutableTable.copyOf(controllers).rowMap();
}
return Iterators.transform(controllerMap.entrySet().iterator(),
new Function<Map.Entry<String, Map<RunId, YarnTwillController>>, LiveInfo>() {
@Override
public LiveInfo apply(final Map.Entry<String, Map<RunId, YarnTwillController>> entry) {
return new LiveInfo() {
@Override
public String getApplicationName() {
return entry.getKey();
}
@Override
public Iterable<TwillController> getControllers() {
return Iterables.transform(entry.getValue().values(), CAST_CONTROLLER);
}
};
}
});
}
};
}
private void updateController(final String appName, final RunId runId, final AtomicBoolean cancelled) {
String instancePath = String.format("/%s/instances/%s", appName, runId.getId());
// Fetch the content node.
Futures.addCallback(zkClientService.getData(instancePath), new FutureCallback<NodeData>() {
@Override
public void onSuccess(NodeData result) {
if (cancelled.get()) {
return;
}
ApplicationId appId = getApplicationId(result);
if (appId == null) {
return;
}
synchronized (YarnTwillRunnerService.this) {
if (!controllers.contains(appName, runId)) {
ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
YarnTwillController controller = listenController(
new YarnTwillController(appName, runId, zkClient,
Callables.returning(yarnAppClient.createProcessController(appId))));
controllers.put(appName, runId, controller);
controller.start();
}
}
}
@Override
public void onFailure(Throwable t) {
LOG.warn("Failed in fetching application instance node.", t);
}
}, Threads.SAME_THREAD_EXECUTOR);
}
/**
* Decodes application ID stored inside the node data.
* @param nodeData The node data to decode from. If it is {@code null}, this method would return {@code null}.
* @return The ApplicationId or {@code null} if failed to decode.
*/
private ApplicationId getApplicationId(NodeData nodeData) {
byte[] data = nodeData == null ? null : nodeData.getData();
if (data == null) {
return null;
}
Gson gson = new Gson();
JsonElement json = gson.fromJson(new String(data, Charsets.UTF_8), JsonElement.class);
if (!json.isJsonObject()) {
LOG.warn("Unable to decode live data node.");
return null;
}
JsonObject jsonObj = json.getAsJsonObject();
json = jsonObj.get("data");
if (!json.isJsonObject()) {
LOG.warn("Property data not found in live data node.");
return null;
}
try {
ApplicationMasterLiveNodeData amLiveNode = gson.fromJson(json, ApplicationMasterLiveNodeData.class);
return YarnUtils.createApplicationId(amLiveNode.getAppIdClusterTime(), amLiveNode.getAppId());
} catch (Exception e) {
LOG.warn("Failed to decode application live node data.", e);
return null;
}
}
private void updateSecureStores(Table<String, RunId, SecureStore> secureStores) {
for (Table.Cell<String, RunId, SecureStore> cell : secureStores.cellSet()) {
Object store = cell.getValue().getStore();
if (!(store instanceof Credentials)) {
LOG.warn("Only Hadoop Credentials is supported. Ignore update for {}.", cell);
continue;
}
Credentials credentials = (Credentials) store;
if (credentials.getAllTokens().isEmpty()) {
// Nothing to update.
continue;
}
try {
updateCredentials(cell.getRowKey(), cell.getColumnKey(), credentials);
synchronized (YarnTwillRunnerService.this) {
// Notify the application for secure store updates if it is still running.
YarnTwillController controller = controllers.get(cell.getRowKey(), cell.getColumnKey());
if (controller != null) {
controller.secureStoreUpdated();
}
}
} catch (Throwable t) {
LOG.warn("Failed to update secure store for {}.", cell, t);
}
}
}
private void updateCredentials(String application, RunId runId, Credentials updates) throws IOException {
Location credentialsLocation = locationFactory.create(String.format("/%s/%s/%s", application, runId.getId(),
Constants.Files.CREDENTIALS));
// Try to read the old credentials.
Credentials credentials = new Credentials();
if (credentialsLocation.exists()) {
DataInputStream is = new DataInputStream(new BufferedInputStream(credentialsLocation.getInputStream()));
try {
credentials.readTokenStorageStream(is);
} finally {
is.close();
}
}
// Overwrite with the updates.
credentials.addAll(updates);
// Overwrite the credentials.
Location tmpLocation = credentialsLocation.getTempFile(Constants.Files.CREDENTIALS);
// Save the credentials store with user-only permission.
DataOutputStream os = new DataOutputStream(new BufferedOutputStream(tmpLocation.getOutputStream("600")));
try {
credentials.writeTokenStorageToStream(os);
} finally {
os.close();
}
// Rename the tmp file into the credentials location
tmpLocation.renameTo(credentialsLocation);
LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation.toURI());
}
private static LocationFactory createDefaultLocationFactory(Configuration configuration) {
try {
FileSystem fs = FileSystem.get(configuration);
return new HDFSLocationFactory(fs, fs.getHomeDirectory().toUri().getPath());
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
}