blob: 269ffdf97625f98c16684973dadc487c4ee5fb3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.yarn;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.ClassAcceptor;
import org.apache.twill.api.Configs;
import org.apache.twill.api.LocalFile;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.RunId;
import org.apache.twill.api.SecureStore;
import org.apache.twill.api.SecureStoreUpdater;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillRunnable;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.api.security.SecureStoreRenewer;
import org.apache.twill.api.security.SecureStoreWriter;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.FileContextLocationFactory;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.RunIds;
import org.apache.twill.internal.SingleRunnableApplication;
import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
import org.apache.twill.internal.io.BasicLocationCache;
import org.apache.twill.internal.io.LocationCache;
import org.apache.twill.internal.io.NoCachingLocationCache;
import org.apache.twill.internal.utils.Dependencies;
import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
import org.apache.twill.internal.yarn.YarnAppClient;
import org.apache.twill.internal.yarn.YarnApplicationReport;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* An implementation of {@link org.apache.twill.api.TwillRunnerService} that runs application on a YARN cluster.
*/
public final class YarnTwillRunnerService implements TwillRunnerService {
private static final Logger LOG = LoggerFactory.getLogger(YarnTwillRunnerService.class);
private static final int ZK_TIMEOUT = 10000;
private static final Function<String, RunId> STRING_TO_RUN_ID = new Function<String, RunId>() {
@Override
public RunId apply(String input) {
return RunIds.fromString(input);
}
};
private static final Function<YarnTwillController, TwillController> CAST_CONTROLLER =
new Function<YarnTwillController, TwillController>() {
@Override
public TwillController apply(YarnTwillController controller) {
return controller;
}
};
private final YarnConfiguration yarnConfig;
private final ZKClientService zkClientService;
private final LocationFactory locationFactory;
private final Table<String, RunId, YarnTwillController> controllers;
private final Set<URL> twillClassPaths;
// A Guava service to help the state transition.
private final Service serviceDelegate;
private LocationCache locationCache;
private LocationCacheCleaner locationCacheCleaner;
private ScheduledExecutorService secureStoreScheduler;
private Iterable<LiveInfo> liveInfos;
private Cancellable watchCancellable;
private volatile String jvmOptions = null;
/**
* Creates an instance with a {@link FileContextLocationFactory} created base on the given configuration with the
* user home directory as the location factory namespace.
*
* @param config Configuration of the yarn cluster
* @param zkConnect ZooKeeper connection string
*/
public YarnTwillRunnerService(YarnConfiguration config, String zkConnect) {
this(config, zkConnect, createDefaultLocationFactory(config));
}
/**
* Creates an instance.
*
* @param config Configuration of the yarn cluster
* @param zkConnect ZooKeeper connection string
* @param locationFactory Factory to create {@link Location} instances that are readable and writable by this service
*/
public YarnTwillRunnerService(YarnConfiguration config, String zkConnect, LocationFactory locationFactory) {
this.yarnConfig = config;
this.locationFactory = locationFactory;
this.zkClientService = getZKClientService(zkConnect);
this.controllers = HashBasedTable.create();
this.twillClassPaths = new HashSet<>();
this.serviceDelegate = new AbstractIdleService() {
@Override
protected void startUp() throws Exception {
YarnTwillRunnerService.this.startUp();
}
@Override
protected void shutDown() throws Exception {
YarnTwillRunnerService.this.shutDown();
}
};
}
@Override
public void start() {
serviceDelegate.startAndWait();
}
@Override
public void stop() {
serviceDelegate.stopAndWait();
}
/**
* This methods sets the extra JVM options that will be passed to the java command line for every application
* started through this {@link YarnTwillRunnerService} instance. It only affects applications that are started
* after options is set.
*
* This is intended for advance usage. All options will be passed unchanged to the java command line. Invalid
* options could cause application not able to start.
*
* @param options extra JVM options.
*/
public void setJVMOptions(String options) {
Preconditions.checkArgument(options != null, "JVM options cannot be null.");
this.jvmOptions = options;
}
/**
* Returns any extra JVM options that have been set.
* @see #setJVMOptions(String)
*/
public String getJVMOptions() {
return jvmOptions;
}
@Override
public Cancellable scheduleSecureStoreUpdate(final SecureStoreUpdater updater,
long initialDelay, long delay, TimeUnit unit) {
synchronized (this) {
if (secureStoreScheduler == null) {
secureStoreScheduler = Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("secure-store-renewer"));
}
}
final ScheduledFuture<?> future = secureStoreScheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Collects all live applications
Table<String, RunId, YarnTwillController> liveApps;
synchronized (this) {
liveApps = HashBasedTable.create(controllers);
}
// Update the secure store with merging = true
renewSecureStore(liveApps, new SecureStoreRenewer() {
@Override
public void renew(String application, RunId runId, SecureStoreWriter secureStoreWriter) throws IOException {
secureStoreWriter.write(updater.update(application, runId));
}
}, true);
}
}, initialDelay, delay, unit);
return new Cancellable() {
@Override
public void cancel() {
future.cancel(false);
}
};
}
@Override
public Cancellable setSecureStoreRenewer(SecureStoreRenewer renewer, long initialDelay,
long delay, long retryDelay, TimeUnit unit) {
synchronized (this) {
if (secureStoreScheduler != null) {
// Shutdown and block until the schedule is stopped
stopScheduler(secureStoreScheduler);
}
secureStoreScheduler = Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("secure-store-renewer"));
}
final ScheduledExecutorService currentScheduler = secureStoreScheduler;
secureStoreScheduler.scheduleWithFixedDelay(
createSecureStoreUpdateRunnable(currentScheduler, renewer,
ImmutableMultimap.<String, RunId>of(), retryDelay, unit),
initialDelay, delay, unit);
return new Cancellable() {
@Override
public void cancel() {
synchronized (YarnTwillRunnerService.this) {
// Only cancel if the active scheduler is the same as the schedule bind to this cancellable
if (currentScheduler == secureStoreScheduler) {
secureStoreScheduler.shutdown();
secureStoreScheduler = null;
}
}
}
};
}
@Override
public TwillPreparer prepare(TwillRunnable runnable) {
return prepare(runnable, ResourceSpecification.BASIC);
}
@Override
public TwillPreparer prepare(TwillRunnable runnable, ResourceSpecification resourceSpecification) {
return prepare(new SingleRunnableApplication(runnable, resourceSpecification));
}
@Override
public TwillPreparer prepare(TwillApplication application) {
Preconditions.checkState(serviceDelegate.isRunning(), "Service not start. Please call start() first.");
final TwillSpecification twillSpec = application.configure();
final String appName = twillSpec.getName();
RunId runId = RunIds.generate();
Location appLocation = locationFactory.create(String.format("/%s/%s", twillSpec.getName(), runId.getId()));
LocationCache locationCache = this.locationCache;
if (locationCache == null) {
locationCache = new NoCachingLocationCache(appLocation);
}
Configuration config = new Configuration(yarnConfig);
return new YarnTwillPreparer(config, twillSpec, runId, zkClientService.getConnectString(),
appLocation, twillClassPaths, jvmOptions,
locationCache, new YarnTwillControllerFactory() {
@Override
public YarnTwillController create(RunId runId, boolean logCollectionEnabled, Iterable<LogHandler> logHandlers,
Callable<ProcessController<YarnApplicationReport>> startUp,
long startTimeout, TimeUnit startTimeoutUnit) {
ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
YarnTwillController controller = listenController(new YarnTwillController(appName, runId, zkClient,
logCollectionEnabled,
logHandlers, startUp,
startTimeout, startTimeoutUnit));
synchronized (YarnTwillRunnerService.this) {
Preconditions.checkArgument(!controllers.contains(appName, runId),
"Application %s with runId %s is already running.", appName, runId);
controllers.put(appName, runId, controller);
}
return controller;
}
});
}
@Override
public synchronized TwillController lookup(String applicationName, final RunId runId) {
return controllers.get(applicationName, runId);
}
@Override
public Iterable<TwillController> lookup(final String applicationName) {
return new Iterable<TwillController>() {
@Override
public Iterator<TwillController> iterator() {
synchronized (YarnTwillRunnerService.this) {
return Iterators.transform(ImmutableList.copyOf(controllers.row(applicationName).values()).iterator(),
CAST_CONTROLLER);
}
}
};
}
@Override
public Iterable<LiveInfo> lookupLive() {
return liveInfos;
}
private void startUp() throws Exception {
zkClientService.startAndWait();
// Find all the classpaths for Twill classes. It is used for class filtering when building application jar
// in the YarnTwillPreparer
Dependencies.findClassDependencies(getClass().getClassLoader(), new ClassAcceptor() {
@Override
public boolean accept(String className, URL classUrl, URL classPathUrl) {
if (!className.startsWith("org.apache.twill.")) {
return false;
}
twillClassPaths.add(classPathUrl);
return true;
}
}, getClass().getName());
// Create the root node, so that the namespace root would get created if it is missing
// If the exception is caused by node exists, then it's ok. Otherwise propagate the exception.
ZKOperations.ignoreError(zkClientService.create("/", null, CreateMode.PERSISTENT),
KeeperException.NodeExistsException.class, null).get();
watchCancellable = watchLiveApps();
liveInfos = createLiveInfos();
boolean enableSecureStoreUpdate = yarnConfig.getBoolean(Configs.Keys.SECURE_STORE_UPDATE_LOCATION_ENABLED, true);
// Schedule an updater for updating HDFS delegation tokens
if (UserGroupInformation.isSecurityEnabled() && enableSecureStoreUpdate) {
long renewalInterval = yarnConfig.getLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
// Schedule it five minutes before it expires.
long delay = renewalInterval - TimeUnit.MINUTES.toMillis(5);
// Just to safeguard. In practice, the value shouldn't be that small, otherwise nothing could work.
if (delay <= 0) {
delay = (renewalInterval <= 2) ? 1 : renewalInterval / 2;
}
setSecureStoreRenewer(new LocationSecureStoreRenewer(yarnConfig, locationFactory),
delay, delay, 10000L, TimeUnit.MILLISECONDS);
}
// Optionally create a LocationCache
String cacheDir = yarnConfig.get(Configs.Keys.LOCATION_CACHE_DIR);
if (cacheDir != null) {
String sessionId = Long.toString(System.currentTimeMillis());
try {
Location cacheBase = locationFactory.create(cacheDir);
cacheBase.mkdirs();
cacheBase.setPermissions("775");
// Use a unique cache directory for each instance of this class
Location cacheLocation = cacheBase.append(sessionId);
cacheLocation.mkdirs();
cacheLocation.setPermissions("775");
locationCache = new BasicLocationCache(cacheLocation);
locationCacheCleaner = startLocationCacheCleaner(cacheBase, sessionId);
} catch (IOException e) {
LOG.warn("Failed to create location cache directory. Location cache cannot be enabled.", e);
}
}
}
/**
* Forces a cleanup of location cache based on the given time.
*/
@VisibleForTesting
void forceLocationCacheCleanup(long currentTime) {
locationCacheCleaner.forceCleanup(currentTime);
}
private LocationCacheCleaner startLocationCacheCleaner(final Location cacheBase, final String sessionId) {
LocationCacheCleaner cleaner = new LocationCacheCleaner(
yarnConfig, cacheBase, sessionId, new Predicate<Location>() {
@Override
public boolean apply(Location location) {
// Collects all the locations that is being used by any live applications
Set<Location> activeLocations = new HashSet<>();
synchronized (YarnTwillRunnerService.this) {
for (YarnTwillController controller : controllers.values()) {
ApplicationMasterLiveNodeData amLiveNodeData = controller.getApplicationMasterLiveNodeData();
if (amLiveNodeData != null) {
for (LocalFile localFile : amLiveNodeData.getLocalFiles()) {
activeLocations.add(locationFactory.create(localFile.getURI()));
}
}
}
}
try {
// Always keep the launcher.jar and twill.jar from the current session as they should never change,
// hence never expires
activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.LAUNCHER_JAR));
activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.TWILL_JAR));
} catch (IOException e) {
// This should not happen
LOG.warn("Failed to construct cache location", e);
}
return !activeLocations.contains(location);
}
});
cleaner.startAndWait();
return cleaner;
}
private void shutDown() throws Exception {
// Shutdown shouldn't stop any controllers, as stopping this client service should let the remote containers
// running. However, this assumes that this TwillRunnerService is a long running service and you only stop it
// when the JVM process is about to exit. Hence it is important that threads created in the controllers are
// daemon threads.
synchronized (this) {
if (locationCacheCleaner != null) {
locationCacheCleaner.stopAndWait();
}
if (secureStoreScheduler != null) {
secureStoreScheduler.shutdownNow();
}
}
watchCancellable.cancel();
zkClientService.stopAndWait();
}
private Cancellable watchLiveApps() {
final Map<String, Cancellable> watched = Maps.newConcurrentMap();
final AtomicBoolean cancelled = new AtomicBoolean(false);
// Watch child changes in the root, which gives all application names.
final Cancellable cancellable = ZKOperations.watchChildren(zkClientService, "/",
new ZKOperations.ChildrenCallback() {
@Override
public void updated(NodeChildren nodeChildren) {
if (cancelled.get()) {
return;
}
Set<String> apps = ImmutableSet.copyOf(nodeChildren.getChildren());
// For each for the application name, watch for ephemeral nodes under /instances.
for (final String appName : apps) {
if (watched.containsKey(appName)) {
continue;
}
final String instancePath = String.format("/%s/instances", appName);
watched.put(appName,
ZKOperations.watchChildren(zkClientService, instancePath, new ZKOperations.ChildrenCallback() {
@Override
public void updated(NodeChildren nodeChildren) {
if (cancelled.get()) {
return;
}
if (nodeChildren.getChildren().isEmpty()) { // No more child, means no live instances
Cancellable removed = watched.remove(appName);
if (removed != null) {
removed.cancel();
}
return;
}
synchronized (YarnTwillRunnerService.this) {
// For each of the children, which the node name is the runId,
// fetch the application Id and construct TwillController.
for (final RunId runId : Iterables.transform(nodeChildren.getChildren(), STRING_TO_RUN_ID)) {
if (controllers.contains(appName, runId)) {
continue;
}
updateController(appName, runId, cancelled);
}
}
}
}));
}
// Remove app watches for apps that are gone. Removal of controller from controllers table is done
// in the state listener attached to the twill controller.
for (String removeApp : Sets.difference(watched.keySet(), apps)) {
watched.remove(removeApp).cancel();
}
}
});
return new Cancellable() {
@Override
public void cancel() {
cancelled.set(true);
cancellable.cancel();
for (Cancellable c : watched.values()) {
c.cancel();
}
}
};
}
private YarnTwillController listenController(final YarnTwillController controller) {
controller.onTerminated(new Runnable() {
@Override
public void run() {
synchronized (YarnTwillRunnerService.this) {
Iterables.removeIf(controllers.values(), new Predicate<YarnTwillController>() {
@Override
public boolean apply(YarnTwillController input) {
return input == controller;
}
});
}
}
}, Threads.SAME_THREAD_EXECUTOR);
return controller;
}
private ZKClientService getZKClientService(String zkConnect) {
return ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnect)
.setSessionTimeout(ZK_TIMEOUT)
.build(), RetryStrategies.exponentialDelay(100, 2000, TimeUnit.MILLISECONDS))));
}
private Iterable<LiveInfo> createLiveInfos() {
return new Iterable<LiveInfo>() {
@Override
public Iterator<LiveInfo> iterator() {
Map<String, Map<RunId, YarnTwillController>> controllerMap;
synchronized (YarnTwillRunnerService.this) {
controllerMap = ImmutableTable.copyOf(controllers).rowMap();
}
return Iterators.transform(controllerMap.entrySet().iterator(),
new Function<Map.Entry<String, Map<RunId, YarnTwillController>>, LiveInfo>() {
@Override
public LiveInfo apply(final Map.Entry<String, Map<RunId, YarnTwillController>> entry) {
return new LiveInfo() {
@Override
public String getApplicationName() {
return entry.getKey();
}
@Override
public Iterable<TwillController> getControllers() {
return Iterables.transform(entry.getValue().values(), CAST_CONTROLLER);
}
};
}
});
}
};
}
private void updateController(final String appName, final RunId runId, final AtomicBoolean cancelled) {
String instancePath = String.format("/%s/instances/%s", appName, runId.getId());
// Fetch the content node.
Futures.addCallback(zkClientService.getData(instancePath), new FutureCallback<NodeData>() {
@Override
public void onSuccess(NodeData result) {
if (cancelled.get()) {
return;
}
ApplicationMasterLiveNodeData amLiveNodeData = ApplicationMasterLiveNodeDecoder.decode(result);
if (amLiveNodeData == null) {
return;
}
synchronized (YarnTwillRunnerService.this) {
if (!controllers.contains(appName, runId)) {
ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
YarnAppClient yarnAppClient = new VersionDetectYarnAppClientFactory().create(new Configuration(yarnConfig));
YarnTwillController controller = listenController(
new YarnTwillController(appName, runId, zkClient, amLiveNodeData, yarnAppClient));
controllers.put(appName, runId, controller);
controller.start();
}
}
}
@Override
public void onFailure(Throwable t) {
LOG.warn("Failed in fetching application instance node.", t);
}
}, Threads.SAME_THREAD_EXECUTOR);
}
/**
* Stops the given scheduler and block until is it stopped.
*/
private void stopScheduler(final ScheduledExecutorService scheduler) {
scheduler.shutdown();
boolean interrupted = false;
try {
while (true) {
try {
scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Creates a {@link Runnable} for renewing {@link SecureStore} for running applications.
*
* @param scheduler the schedule to schedule next renewal execution
* @param renewer the {@link SecureStoreRenewer} to use for renewal
* @param retryRuns if non-empty, only the given set of application name and run id that need to have
* secure store renewed; if empty, renew all running applications
* @param retryDelay the delay before retrying applications that are failed to have secure store renewed
* @param timeUnit the unit for the {@code delay} and {@code failureDelay}.
* @return a {@link Runnable}
*/
private Runnable createSecureStoreUpdateRunnable(final ScheduledExecutorService scheduler,
final SecureStoreRenewer renewer,
final Multimap<String, RunId> retryRuns,
final long retryDelay, final TimeUnit timeUnit) {
return new Runnable() {
@Override
public void run() {
// Collects the set of running application runs
Table<String, RunId, YarnTwillController> liveApps;
synchronized (YarnTwillRunnerService.this) {
if (retryRuns.isEmpty()) {
liveApps = HashBasedTable.create(controllers);
} else {
// If this is a renew retry, only renew the one in the retryRuns set
liveApps = HashBasedTable.create();
for (Table.Cell<String, RunId, YarnTwillController> cell : controllers.cellSet()) {
if (retryRuns.containsEntry(cell.getRowKey(), cell.getColumnKey())) {
liveApps.put(cell.getRowKey(), cell.getColumnKey(), cell.getValue());
}
}
}
}
Multimap<String, RunId> failureRenews = renewSecureStore(liveApps, renewer, false);
if (!failureRenews.isEmpty()) {
// If there are failure during the renewal, schedule a retry with a new Runnable.
LOG.info("Schedule to retry on secure store renewal for applications {} in {} {}",
failureRenews.keySet(), retryDelay, timeUnit.name().toLowerCase());
try {
scheduler.schedule(
createSecureStoreUpdateRunnable(scheduler, renewer, failureRenews, retryDelay, timeUnit),
retryDelay, timeUnit);
} catch (RejectedExecutionException e) {
// If the renewal is stopped, the scheduler will be stopped,
// hence this exception will be thrown and can be safely ignore.
}
}
}
};
}
/**
* Renews the {@link SecureStore} for all the running applications.
*
* @param liveApps set of running applications that need to have secure store renewal
* @param renewer the {@link SecureStoreRenewer} for renewal
* @param mergeCredentials {@code true} to merge with existing credentials
* @return a {@link Multimap} containing the application runs that were failed to have secure store renewed
*/
private Multimap<String, RunId> renewSecureStore(Table<String, RunId, YarnTwillController> liveApps,
SecureStoreRenewer renewer, boolean mergeCredentials) {
Multimap<String, RunId> failureRenews = HashMultimap.create();
// Renew the secure store for each running application
for (Table.Cell<String, RunId, YarnTwillController> liveApp : liveApps.cellSet()) {
String application = liveApp.getRowKey();
RunId runId = liveApp.getColumnKey();
YarnTwillController controller = liveApp.getValue();
try {
renewer.renew(application, runId, new YarnSecureStoreWriter(application, runId, controller, mergeCredentials));
} catch (Exception e) {
LOG.warn("Failed to renew secure store for {}:{}", application, runId, e);
failureRenews.put(application, runId);
}
}
return failureRenews;
}
private static LocationFactory createDefaultLocationFactory(Configuration configuration) {
try {
FileContext fc = FileContext.getFileContext(configuration);
String basePath = fc.getHomeDirectory().toUri().getPath();
return new FileContextLocationFactory(configuration, basePath);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
/**
* A {@link SecureStoreWriter} for updating secure store for YARN application via a shared location with the
* running application.
*/
private final class YarnSecureStoreWriter implements SecureStoreWriter {
private final String application;
private final RunId runId;
private final YarnTwillController controller;
private final boolean mergeCredentials;
private YarnSecureStoreWriter(String application, RunId runId,
YarnTwillController controller, boolean mergeCredentials) {
this.application = application;
this.runId = runId;
this.controller = controller;
this.mergeCredentials = mergeCredentials;
}
@Override
public void write(SecureStore secureStore) throws IOException {
Object store = secureStore.getStore();
if (!(store instanceof Credentials)) {
LOG.warn("Only Hadoop Credentials is supported. Ignore update for {}:{} with secure store {}",
application, runId, secureStore);
return;
}
Location credentialsLocation = locationFactory.create(String.format("/%s/%s/%s", application, runId.getId(),
Constants.Files.CREDENTIALS));
LOG.debug("Writing new secure store for {}:{} to {}", application, runId, credentialsLocation);
Credentials credentials = new Credentials();
if (mergeCredentials) {
// Try to read the old credentials.
try (DataInputStream is = new DataInputStream(new BufferedInputStream(credentialsLocation.getInputStream()))) {
credentials.readTokenStorageStream(is);
} catch (FileNotFoundException e) {
// This is safe to ignore as the file may not be there
} catch (Exception e) {
// Just log and proceed.
LOG.warn("Failed to read existing credentials from {} for merging due to {}.",
credentialsLocation, e.toString());
}
}
// Overwrite with credentials from the secure store
credentials.addAll((Credentials) store);
Location tmpLocation = credentialsLocation.getTempFile(Constants.Files.CREDENTIALS);
// Save the credentials store with user-only permission.
try (DataOutputStream os = new DataOutputStream(new BufferedOutputStream(tmpLocation.getOutputStream("600")))) {
credentials.writeTokenStorageToStream(os);
}
// Rename the tmp file into the credentials location
tmpLocation.renameTo(credentialsLocation);
// Notify the application that the credentials has been updated
controller.secureStoreUpdated();
LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation);
}
}
}