(TWILL-189) Allows secure store update with different UGI
- Deprecated the old TwillRunner.scheduleSecureStoreUpdate
- Added new TwillRunner.setSecureStoreRenewer method
- Takes SecureStoreRenewer that writes SecureStore via
SecureStoreWriter
- The renewer implementation can use appropriate UGI.doAs to make
call to SecureStoreWriter
This closes #48 on Github.
Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-api/src/main/java/org/apache/twill/api/SecureStoreUpdater.java b/twill-api/src/main/java/org/apache/twill/api/SecureStoreUpdater.java
index 5912247..f468745 100644
--- a/twill-api/src/main/java/org/apache/twill/api/SecureStoreUpdater.java
+++ b/twill-api/src/main/java/org/apache/twill/api/SecureStoreUpdater.java
@@ -17,9 +17,14 @@
*/
package org.apache.twill.api;
+import org.apache.twill.api.security.SecureStoreRenewer;
+
/**
* Represents class capable of creating update of {@link SecureStore} for live applications.
+ *
+ * @deprecated This class doesn't handle user correctly, use {@link SecureStoreRenewer} instead
*/
+@Deprecated
public interface SecureStoreUpdater {
/**
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillRunner.java b/twill-api/src/main/java/org/apache/twill/api/TwillRunner.java
index 845ced8..bdb812c 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillRunner.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillRunner.java
@@ -17,6 +17,8 @@
*/
package org.apache.twill.api;
+import org.apache.twill.api.security.SecureStoreRenewer;
+import org.apache.twill.api.security.SecureStoreWriter;
import org.apache.twill.common.Cancellable;
import java.util.concurrent.TimeUnit;
@@ -105,7 +107,25 @@
* @param delay Delay between completion of one update call to the next one.
* @param unit time unit for the initialDelay and delay.
* @return A {@link Cancellable} for cancelling the scheduled update.
+ *
+ * @deprecated Use {@link #setSecureStoreRenewer(SecureStoreRenewer, long, long, long, TimeUnit)} instead.
*/
+ @Deprecated
Cancellable scheduleSecureStoreUpdate(final SecureStoreUpdater updater,
long initialDelay, long delay, TimeUnit unit);
+
+ /**
+ * Sets and schedules a periodic renewal of {@link SecureStore} using a given {@link SecureStoreRenewer}.
+ * There is always only one active {@link SecureStoreRenewer}. Setting a new renewer will replace the old one
+ * and setting up a new schedule.
+ *
+ * @param renewer a {@link SecureStoreRenewer} for renewing {@link SecureStore} for all applications.
+ * @param initialDelay delay before the first call to renew method.
+ * @param delay the delay between successful completion of one renew call to the next one.
+ * @param retryDelay the delay before the retrying the renewal if the call
+ * to {@link SecureStoreRenewer#renew(String, RunId, SecureStoreWriter)} raised exception.
+ * @param unit time unit for the initialDelay and period.
+ */
+ Cancellable setSecureStoreRenewer(SecureStoreRenewer renewer,
+ long initialDelay, long delay, long retryDelay, TimeUnit unit);
}
diff --git a/twill-api/src/main/java/org/apache/twill/api/security/SecureStoreRenewer.java b/twill-api/src/main/java/org/apache/twill/api/security/SecureStoreRenewer.java
new file mode 100644
index 0000000..0cb9740
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/security/SecureStoreRenewer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api.security;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.SecureStore;
+import org.apache.twill.filesystem.Location;
+
+import java.io.IOException;
+
+/**
+ * This class is responsible for renewing the secure store used by application.
+ */
+public abstract class SecureStoreRenewer {
+
+ /**
+ * Renew the secure store for an application run. It must uses the {@link SecureStoreWriter} to update the
+ * {@link SecureStore}.
+ *
+ * @param application The name of the application.
+ * @param runId The runId of the live application.
+ * @param secureStoreWriter a {@link SecureStoreWriter} for writing out the new {@link SecureStore}.
+ * @throws IOException if renewal failed
+ */
+ public abstract void renew(String application, RunId runId, SecureStoreWriter secureStoreWriter) throws IOException;
+}
diff --git a/twill-api/src/main/java/org/apache/twill/api/security/SecureStoreWriter.java b/twill-api/src/main/java/org/apache/twill/api/security/SecureStoreWriter.java
new file mode 100644
index 0000000..fd8d04f
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/security/SecureStoreWriter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api.security;
+
+import org.apache.twill.api.SecureStore;
+
+import java.io.IOException;
+
+/**
+ * A writer provided to {@link SecureStoreRenewer} for writing out {@link SecureStore} during renewal process.
+ */
+public interface SecureStoreWriter {
+
+ /**
+ * Writes the given {@link SecureStore}.
+ *
+ * @param secureStore the {@link SecureStore} to persist
+ * @throws IOException if failed to write out the {@link SecureStore}
+ */
+ void write(SecureStore secureStore) throws IOException;
+}
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreRenewer.java
similarity index 70%
rename from twill-yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
rename to twill-yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreRenewer.java
index 02fd356..5593777 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreRenewer.java
@@ -17,12 +17,11 @@
*/
package org.apache.twill.yarn;
-import com.google.common.base.Throwables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.twill.api.RunId;
-import org.apache.twill.api.SecureStore;
-import org.apache.twill.api.SecureStoreUpdater;
+import org.apache.twill.api.security.SecureStoreRenewer;
+import org.apache.twill.api.security.SecureStoreWriter;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.yarn.YarnUtils;
@@ -31,24 +30,20 @@
/**
* Package private class for updating location related secure store.
*/
-final class LocationSecureStoreUpdater implements SecureStoreUpdater {
+final class LocationSecureStoreRenewer extends SecureStoreRenewer {
private final Configuration configuration;
private final LocationFactory locationFactory;
- LocationSecureStoreUpdater(Configuration configuration, LocationFactory locationFactory) {
+ LocationSecureStoreRenewer(Configuration configuration, LocationFactory locationFactory) {
this.configuration = configuration;
this.locationFactory = locationFactory;
}
@Override
- public SecureStore update(String application, RunId runId) {
- try {
- Credentials credentials = new Credentials();
- YarnUtils.addDelegationTokens(configuration, locationFactory, credentials);
- return YarnSecureStore.create(credentials);
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
+ public void renew(String application, RunId runId, SecureStoreWriter secureStoreWriter) throws IOException {
+ Credentials credentials = new Credentials();
+ YarnUtils.addDelegationTokens(configuration, locationFactory, credentials);
+ secureStoreWriter.write(YarnSecureStore.create(credentials));
}
}
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index a31265e..269ffdf 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -25,6 +25,7 @@
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
@@ -57,6 +58,8 @@
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.api.security.SecureStoreRenewer;
+import org.apache.twill.api.security.SecureStoreWriter;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.FileContextLocationFactory;
@@ -91,6 +94,7 @@
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.util.HashSet;
@@ -99,6 +103,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -214,45 +219,29 @@
@Override
public Cancellable scheduleSecureStoreUpdate(final SecureStoreUpdater updater,
long initialDelay, long delay, TimeUnit unit) {
- if (!UserGroupInformation.isSecurityEnabled()) {
- return new Cancellable() {
- @Override
- public void cancel() {
- // No-op
- }
- };
- }
-
synchronized (this) {
if (secureStoreScheduler == null) {
secureStoreScheduler = Executors.newSingleThreadScheduledExecutor(
- Threads.createDaemonThreadFactory("secure-store-updater"));
+ Threads.createDaemonThreadFactory("secure-store-renewer"));
}
}
final ScheduledFuture<?> future = secureStoreScheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
- // Collects all <application, runId> pairs first
- Multimap<String, RunId> liveApps = HashMultimap.create();
- synchronized (YarnTwillRunnerService.this) {
- for (Table.Cell<String, RunId, YarnTwillController> cell : controllers.cellSet()) {
- liveApps.put(cell.getRowKey(), cell.getColumnKey());
- }
+ // Collects all live applications
+ Table<String, RunId, YarnTwillController> liveApps;
+ synchronized (this) {
+ liveApps = HashBasedTable.create(controllers);
}
- // Collect all secure stores that needs to be updated.
- Table<String, RunId, SecureStore> secureStores = HashBasedTable.create();
- for (Map.Entry<String, RunId> entry : liveApps.entries()) {
- try {
- secureStores.put(entry.getKey(), entry.getValue(), updater.update(entry.getKey(), entry.getValue()));
- } catch (Throwable t) {
- LOG.warn("Exception thrown by SecureStoreUpdater {}", updater, t);
+ // Update the secure store with merging = true
+ renewSecureStore(liveApps, new SecureStoreRenewer() {
+ @Override
+ public void renew(String application, RunId runId, SecureStoreWriter secureStoreWriter) throws IOException {
+ secureStoreWriter.write(updater.update(application, runId));
}
- }
-
- // Update secure stores.
- updateSecureStores(secureStores);
+ }, true);
}
}, initialDelay, delay, unit);
@@ -265,6 +254,37 @@
}
@Override
+ public Cancellable setSecureStoreRenewer(SecureStoreRenewer renewer, long initialDelay,
+ long delay, long retryDelay, TimeUnit unit) {
+ synchronized (this) {
+ if (secureStoreScheduler != null) {
+ // Shutdown and block until the schedule is stopped
+ stopScheduler(secureStoreScheduler);
+ }
+ secureStoreScheduler = Executors.newSingleThreadScheduledExecutor(
+ Threads.createDaemonThreadFactory("secure-store-renewer"));
+ }
+
+ final ScheduledExecutorService currentScheduler = secureStoreScheduler;
+ secureStoreScheduler.scheduleWithFixedDelay(
+ createSecureStoreUpdateRunnable(currentScheduler, renewer,
+ ImmutableMultimap.<String, RunId>of(), retryDelay, unit),
+ initialDelay, delay, unit);
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ synchronized (YarnTwillRunnerService.this) {
+ // Only cancel if the active scheduler is the same as the schedule bind to this cancellable
+ if (currentScheduler == secureStoreScheduler) {
+ secureStoreScheduler.shutdown();
+ secureStoreScheduler = null;
+ }
+ }
+ }
+ };
+ }
+
+ @Override
public TwillPreparer prepare(TwillRunnable runnable) {
return prepare(runnable, ResourceSpecification.BASIC);
}
@@ -367,8 +387,9 @@
if (delay <= 0) {
delay = (renewalInterval <= 2) ? 1 : renewalInterval / 2;
}
- scheduleSecureStoreUpdate(new LocationSecureStoreUpdater(yarnConfig, locationFactory),
- delay, delay, TimeUnit.MILLISECONDS);
+
+ setSecureStoreRenewer(new LocationSecureStoreRenewer(yarnConfig, locationFactory),
+ delay, delay, 10000L, TimeUnit.MILLISECONDS);
}
// Optionally create a LocationCache
@@ -613,62 +634,109 @@
}, Threads.SAME_THREAD_EXECUTOR);
}
-
- private void updateSecureStores(Table<String, RunId, SecureStore> secureStores) {
- for (Table.Cell<String, RunId, SecureStore> cell : secureStores.cellSet()) {
- Object store = cell.getValue().getStore();
- if (!(store instanceof Credentials)) {
- LOG.warn("Only Hadoop Credentials is supported. Ignore update for {}.", cell);
- continue;
- }
-
- Credentials credentials = (Credentials) store;
- if (credentials.getAllTokens().isEmpty()) {
- // Nothing to update.
- continue;
- }
-
- try {
- updateCredentials(cell.getRowKey(), cell.getColumnKey(), credentials);
- synchronized (YarnTwillRunnerService.this) {
- // Notify the application for secure store updates if it is still running.
- YarnTwillController controller = controllers.get(cell.getRowKey(), cell.getColumnKey());
- if (controller != null) {
- controller.secureStoreUpdated();
- }
+ /**
+ * Stops the given scheduler and block until is it stopped.
+ */
+ private void stopScheduler(final ScheduledExecutorService scheduler) {
+ scheduler.shutdown();
+ boolean interrupted = false;
+ try {
+ while (true) {
+ try {
+ scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ return;
+ } catch (InterruptedException e) {
+ interrupted = true;
}
- } catch (Throwable t) {
- LOG.warn("Failed to update secure store for {}.", cell, t);
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
}
}
}
- private void updateCredentials(String application, RunId runId, Credentials updates) throws IOException {
- Location credentialsLocation = locationFactory.create(String.format("/%s/%s/%s", application, runId.getId(),
- Constants.Files.CREDENTIALS));
- // Try to read the old credentials.
- Credentials credentials = new Credentials();
- if (credentialsLocation.exists()) {
- try (DataInputStream is = new DataInputStream(new BufferedInputStream(credentialsLocation.getInputStream()))) {
- credentials.readTokenStorageStream(is);
+ /**
+ * Creates a {@link Runnable} for renewing {@link SecureStore} for running applications.
+ *
+ * @param scheduler the schedule to schedule next renewal execution
+ * @param renewer the {@link SecureStoreRenewer} to use for renewal
+ * @param retryRuns if non-empty, only the given set of application name and run id that need to have
+ * secure store renewed; if empty, renew all running applications
+ * @param retryDelay the delay before retrying applications that are failed to have secure store renewed
+ * @param timeUnit the unit for the {@code delay} and {@code failureDelay}.
+ * @return a {@link Runnable}
+ */
+ private Runnable createSecureStoreUpdateRunnable(final ScheduledExecutorService scheduler,
+ final SecureStoreRenewer renewer,
+ final Multimap<String, RunId> retryRuns,
+ final long retryDelay, final TimeUnit timeUnit) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ // Collects the set of running application runs
+ Table<String, RunId, YarnTwillController> liveApps;
+
+ synchronized (YarnTwillRunnerService.this) {
+ if (retryRuns.isEmpty()) {
+ liveApps = HashBasedTable.create(controllers);
+ } else {
+ // If this is a renew retry, only renew the one in the retryRuns set
+ liveApps = HashBasedTable.create();
+ for (Table.Cell<String, RunId, YarnTwillController> cell : controllers.cellSet()) {
+ if (retryRuns.containsEntry(cell.getRowKey(), cell.getColumnKey())) {
+ liveApps.put(cell.getRowKey(), cell.getColumnKey(), cell.getValue());
+ }
+ }
+ }
+ }
+
+ Multimap<String, RunId> failureRenews = renewSecureStore(liveApps, renewer, false);
+
+ if (!failureRenews.isEmpty()) {
+ // If there are failure during the renewal, schedule a retry with a new Runnable.
+ LOG.info("Schedule to retry on secure store renewal for applications {} in {} {}",
+ failureRenews.keySet(), retryDelay, timeUnit.name().toLowerCase());
+ try {
+ scheduler.schedule(
+ createSecureStoreUpdateRunnable(scheduler, renewer, failureRenews, retryDelay, timeUnit),
+ retryDelay, timeUnit);
+ } catch (RejectedExecutionException e) {
+ // If the renewal is stopped, the scheduler will be stopped,
+ // hence this exception will be thrown and can be safely ignore.
+ }
+ }
+ }
+ };
+ }
+
+ /**
+ * Renews the {@link SecureStore} for all the running applications.
+ *
+ * @param liveApps set of running applications that need to have secure store renewal
+ * @param renewer the {@link SecureStoreRenewer} for renewal
+ * @param mergeCredentials {@code true} to merge with existing credentials
+ * @return a {@link Multimap} containing the application runs that were failed to have secure store renewed
+ */
+ private Multimap<String, RunId> renewSecureStore(Table<String, RunId, YarnTwillController> liveApps,
+ SecureStoreRenewer renewer, boolean mergeCredentials) {
+ Multimap<String, RunId> failureRenews = HashMultimap.create();
+
+ // Renew the secure store for each running application
+ for (Table.Cell<String, RunId, YarnTwillController> liveApp : liveApps.cellSet()) {
+ String application = liveApp.getRowKey();
+ RunId runId = liveApp.getColumnKey();
+ YarnTwillController controller = liveApp.getValue();
+
+ try {
+ renewer.renew(application, runId, new YarnSecureStoreWriter(application, runId, controller, mergeCredentials));
+ } catch (Exception e) {
+ LOG.warn("Failed to renew secure store for {}:{}", application, runId, e);
+ failureRenews.put(application, runId);
}
}
- // Overwrite with the updates.
- credentials.addAll(updates);
-
- // Overwrite the credentials.
- Location tmpLocation = credentialsLocation.getTempFile(Constants.Files.CREDENTIALS);
-
- // Save the credentials store with user-only permission.
- try (DataOutputStream os = new DataOutputStream(new BufferedOutputStream(tmpLocation.getOutputStream("600")))) {
- credentials.writeTokenStorageToStream(os);
- }
-
- // Rename the tmp file into the credentials location
- tmpLocation.renameTo(credentialsLocation);
-
- LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation);
+ return failureRenews;
}
private static LocationFactory createDefaultLocationFactory(Configuration configuration) {
@@ -680,4 +748,70 @@
throw Throwables.propagate(e);
}
}
+
+ /**
+ * A {@link SecureStoreWriter} for updating secure store for YARN application via a shared location with the
+ * running application.
+ */
+ private final class YarnSecureStoreWriter implements SecureStoreWriter {
+
+ private final String application;
+ private final RunId runId;
+ private final YarnTwillController controller;
+ private final boolean mergeCredentials;
+
+ private YarnSecureStoreWriter(String application, RunId runId,
+ YarnTwillController controller, boolean mergeCredentials) {
+ this.application = application;
+ this.runId = runId;
+ this.controller = controller;
+ this.mergeCredentials = mergeCredentials;
+ }
+
+ @Override
+ public void write(SecureStore secureStore) throws IOException {
+ Object store = secureStore.getStore();
+ if (!(store instanceof Credentials)) {
+ LOG.warn("Only Hadoop Credentials is supported. Ignore update for {}:{} with secure store {}",
+ application, runId, secureStore);
+ return;
+ }
+
+ Location credentialsLocation = locationFactory.create(String.format("/%s/%s/%s", application, runId.getId(),
+ Constants.Files.CREDENTIALS));
+
+ LOG.debug("Writing new secure store for {}:{} to {}", application, runId, credentialsLocation);
+
+ Credentials credentials = new Credentials();
+ if (mergeCredentials) {
+ // Try to read the old credentials.
+ try (DataInputStream is = new DataInputStream(new BufferedInputStream(credentialsLocation.getInputStream()))) {
+ credentials.readTokenStorageStream(is);
+ } catch (FileNotFoundException e) {
+ // This is safe to ignore as the file may not be there
+ } catch (Exception e) {
+ // Just log and proceed.
+ LOG.warn("Failed to read existing credentials from {} for merging due to {}.",
+ credentialsLocation, e.toString());
+ }
+ }
+
+ // Overwrite with credentials from the secure store
+ credentials.addAll((Credentials) store);
+ Location tmpLocation = credentialsLocation.getTempFile(Constants.Files.CREDENTIALS);
+
+ // Save the credentials store with user-only permission.
+ try (DataOutputStream os = new DataOutputStream(new BufferedOutputStream(tmpLocation.getOutputStream("600")))) {
+ credentials.writeTokenStorageToStream(os);
+ }
+
+ // Rename the tmp file into the credentials location
+ tmpLocation.renameTo(credentialsLocation);
+
+ // Notify the application that the credentials has been updated
+ controller.secureStoreUpdated();
+
+ LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation);
+ }
+ }
}