blob: 89fd40a0d470b9047ec41aa29fcfffa8545e1999 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionHandler.ReencryptionBatch;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY;
/**
* Class for finalizing re-encrypt EDEK operations, by updating file xattrs with
* edeks returned from reencryption.
* <p>
* The tasks are submitted by ReencryptionHandler.
* <p>
* It is assumed only 1 Updater will be running, since updating file xattrs
* requires namespace write lock, and performance gain from multi-threading
* is limited.
*/
@InterfaceAudience.Private
public final class ReencryptionUpdater implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(ReencryptionUpdater.class);
private volatile boolean shouldPauseForTesting = false;
private volatile int pauseAfterNthCheckpoint = 0;
private volatile long pauseZoneId = 0;
private double throttleLimitRatio;
private final StopWatch throttleTimerAll = new StopWatch();
private final StopWatch throttleTimerLocked = new StopWatch();
private volatile long faultRetryInterval = 60000;
private volatile boolean isRunning = false;
/**
* Class to track re-encryption submissions of a single zone. It contains
* all the submitted futures, and statistics about how far the futures are
* processed.
*/
static final class ZoneSubmissionTracker {
private boolean submissionDone;
private LinkedList<Future> tasks;
private int numCheckpointed;
private int numFutureDone;
ZoneSubmissionTracker() {
submissionDone = false;
tasks = new LinkedList<>();
numCheckpointed = 0;
numFutureDone = 0;
}
void reset() {
submissionDone = false;
tasks.clear();
numCheckpointed = 0;
numFutureDone = 0;
}
LinkedList<Future> getTasks() {
return tasks;
}
void cancelAllTasks() {
if (!tasks.isEmpty()) {
LOG.info("Cancelling {} re-encryption tasks", tasks.size());
for (Future f : tasks) {
f.cancel(true);
}
}
}
void addTask(final Future task) {
tasks.add(task);
}
private boolean isCompleted() {
return submissionDone && tasks.isEmpty();
}
void setSubmissionDone() {
submissionDone = true;
}
}
/**
* Class representing the task for one batch of a re-encryption command. It
* also contains statistics about how far this single batch has been executed.
*/
static final class ReencryptionTask {
private final long zoneId;
private boolean processed = false;
private int numFilesUpdated = 0;
private int numFailures = 0;
private String lastFile = null;
private final ReencryptionBatch batch;
ReencryptionTask(final long id, final int failures,
final ReencryptionBatch theBatch) {
zoneId = id;
numFailures = failures;
batch = theBatch;
}
}
/**
* Class that encapsulates re-encryption details of a file. It contains the
* file inode, stores the initial edek of the file, and the new edek
* after re-encryption.
* <p>
* Assumptions are the object initialization happens when dir lock is held,
* and inode is valid and is encrypted during initialization.
* <p>
* Namespace changes may happen during re-encryption, and if inode is changed
* the re-encryption is skipped.
*/
static final class FileEdekInfo {
private final long inodeId;
private final EncryptedKeyVersion existingEdek;
private EncryptedKeyVersion edek = null;
FileEdekInfo(FSDirectory dir, INodeFile inode) throws IOException {
assert dir.hasReadLock();
Preconditions.checkNotNull(inode, "INodeFile is null");
inodeId = inode.getId();
final FileEncryptionInfo fei = FSDirEncryptionZoneOp
.getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
Preconditions.checkNotNull(fei,
"FileEncryptionInfo is null for " + inodeId);
existingEdek = EncryptedKeyVersion
.createForDecryption(fei.getKeyName(), fei.getEzKeyVersionName(),
fei.getIV(), fei.getEncryptedDataEncryptionKey());
}
long getInodeId() {
return inodeId;
}
EncryptedKeyVersion getExistingEdek() {
return existingEdek;
}
void setEdek(final EncryptedKeyVersion ekv) {
assert ekv != null;
edek = ekv;
}
}
@VisibleForTesting
synchronized void pauseForTesting() {
shouldPauseForTesting = true;
LOG.info("Pausing re-encrypt updater for testing.");
notify();
}
@VisibleForTesting
synchronized void resumeForTesting() {
shouldPauseForTesting = false;
LOG.info("Resuming re-encrypt updater for testing.");
notify();
}
@VisibleForTesting
void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
assert pauseAfterNthCheckpoint == 0;
pauseAfterNthCheckpoint = count;
pauseZoneId = zoneId;
}
@VisibleForTesting
boolean isRunning() {
return isRunning;
}
private final FSDirectory dir;
private final CompletionService<ReencryptionTask> batchService;
private final ReencryptionHandler handler;
ReencryptionUpdater(final FSDirectory fsd,
final CompletionService<ReencryptionTask> service,
final ReencryptionHandler rh, final Configuration conf) {
dir = fsd;
batchService = service;
handler = rh;
this.throttleLimitRatio =
conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY,
DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT);
Preconditions.checkArgument(throttleLimitRatio > 0.0f,
DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY
+ " is not positive.");
}
/**
* Called by the submission thread to indicate all tasks have been submitted.
* If this is called but no tasks has been submitted, the re-encryption is
* considered complete.
*
* @param zoneId Id of the zone inode.
* @throws IOException
* @throws InterruptedException
*/
void markZoneSubmissionDone(final long zoneId)
throws IOException, InterruptedException {
final ZoneSubmissionTracker tracker = handler.getTracker(zoneId);
if (tracker != null && !tracker.getTasks().isEmpty()) {
tracker.submissionDone = true;
} else {
// Caller thinks submission is done, but no tasks submitted - meaning
// no files in the EZ need to be re-encrypted. Complete directly.
handler.addDummyTracker(zoneId, tracker);
}
}
@Override
public void run() {
isRunning = true;
throttleTimerAll.start();
while (true) {
try {
// Assuming single-threaded updater.
takeAndProcessTasks();
} catch (InterruptedException ie) {
LOG.warn("Re-encryption updater thread interrupted. Exiting.");
Thread.currentThread().interrupt();
isRunning = false;
return;
} catch (IOException | CancellationException e) {
LOG.warn("Re-encryption updater thread exception.", e);
} catch (Throwable t) {
LOG.error("Re-encryption updater thread exiting.", t);
isRunning = false;
return;
}
}
}
/**
* Process a completed ReencryptionTask. Each inode id is resolved to an INode
* object, skip if the inode is deleted.
* <p>
* Only file xattr is updated by this method. Re-encryption progress is not
* updated.
*
* @param zoneNodePath full path of the EZ inode.
* @param task the completed task.
* @throws IOException
* @throws InterruptedException
*/
private void processTaskEntries(final String zoneNodePath,
final ReencryptionTask task) throws IOException, InterruptedException {
assert dir.hasWriteLock();
if (!task.batch.isEmpty() && task.numFailures == 0) {
LOG.debug(
"Updating file xattrs for re-encrypting zone {}," + " starting at {}",
zoneNodePath, task.batch.getFirstFilePath());
final int batchSize = task.batch.size();
for (Iterator<FileEdekInfo> it = task.batch.getBatch().iterator();
it.hasNext();) {
FileEdekInfo entry = it.next();
// resolve the inode again, and skip if it's doesn't exist
LOG.trace("Updating {} for re-encryption.", entry.getInodeId());
final INode inode = dir.getInode(entry.getInodeId());
if (inode == null) {
LOG.debug("INode {} doesn't exist, skipping re-encrypt.",
entry.getInodeId());
// also remove from batch so later it's not saved.
it.remove();
continue;
}
// Cautiously check file encryption info, and only update if we're sure
// it's still using the same edek.
Preconditions.checkNotNull(entry.edek);
final FileEncryptionInfo fei = FSDirEncryptionZoneOp
.getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
if (!fei.getKeyName().equals(entry.edek.getEncryptionKeyName())) {
LOG.debug("Inode {} EZ key changed, skipping re-encryption.",
entry.getInodeId());
it.remove();
continue;
}
if (fei.getEzKeyVersionName()
.equals(entry.edek.getEncryptionKeyVersionName())) {
LOG.debug(
"Inode {} EZ key version unchanged, skipping re-encryption.",
entry.getInodeId());
it.remove();
continue;
}
if (!Arrays.equals(fei.getEncryptedDataEncryptionKey(),
entry.existingEdek.getEncryptedKeyVersion().getMaterial())) {
LOG.debug("Inode {} existing edek changed, skipping re-encryption",
entry.getInodeId());
it.remove();
continue;
}
FileEncryptionInfo newFei = new FileEncryptionInfo(fei.getCipherSuite(),
fei.getCryptoProtocolVersion(),
entry.edek.getEncryptedKeyVersion().getMaterial(),
entry.edek.getEncryptedKeyIv(), fei.getKeyName(),
entry.edek.getEncryptionKeyVersionName());
final INodesInPath iip = INodesInPath.fromINode(inode);
FSDirEncryptionZoneOp
.setFileEncryptionInfo(dir, iip, newFei, XAttrSetFlag.REPLACE);
task.lastFile = iip.getPath();
++task.numFilesUpdated;
}
LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption,"
+ " starting:{}.", task.numFilesUpdated, batchSize,
zoneNodePath, task.batch.getFirstFilePath());
}
task.processed = true;
}
/**
* Iterate tasks for the given zone, and update progress accordingly. The
* checkpoint indicates all files before it are done re-encryption, so it will
* be updated to the position where all tasks before are completed.
*
* @param zoneNode the EZ inode.
* @param tracker the zone submission tracker.
* @return the list containing the last checkpointed xattr. Empty if
* no checkpoint happened.
* @throws ExecutionException
* @throws IOException
* @throws InterruptedException
*/
private List<XAttr> processCheckpoints(final INode zoneNode,
final ZoneSubmissionTracker tracker)
throws ExecutionException, IOException, InterruptedException {
assert dir.hasWriteLock();
final long zoneId = zoneNode.getId();
final String zonePath = zoneNode.getFullPathName();
final ZoneReencryptionStatus status =
handler.getReencryptionStatus().getZoneStatus(zoneId);
assert status != null;
// always start from the beginning, because the checkpoint means all files
// before it are re-encrypted.
final LinkedList<Future> tasks = tracker.getTasks();
final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
ListIterator<Future> iter = tasks.listIterator();
synchronized (handler) {
while (iter.hasNext()) {
Future<ReencryptionTask> curr = iter.next();
if (curr.isCancelled()) {
break;
}
if (!curr.isDone() || !curr.get().processed) {
// still has earlier tasks not completed, skip here.
break;
}
ReencryptionTask task = curr.get();
LOG.debug("Updating re-encryption checkpoint with completed task."
+ " last: {} size:{}.", task.lastFile, task.batch.size());
assert zoneId == task.zoneId;
try {
final XAttr xattr = FSDirEncryptionZoneOp
.updateReencryptionProgress(dir, zoneNode, status, task.lastFile,
task.numFilesUpdated, task.numFailures);
xAttrs.clear();
xAttrs.add(xattr);
} catch (IOException ie) {
LOG.warn("Failed to update re-encrypted progress to xattr" +
" for zone {}", zonePath, ie);
++task.numFailures;
}
++tracker.numCheckpointed;
iter.remove();
}
}
if (tracker.isCompleted()) {
LOG.debug("Removed re-encryption tracker for zone {} because it completed"
+ " with {} tasks.", zonePath, tracker.numCheckpointed);
return handler.completeReencryption(zoneNode);
}
return xAttrs;
}
private void takeAndProcessTasks() throws Exception {
final Future<ReencryptionTask> completed = batchService.take();
throttle();
checkPauseForTesting();
if (completed.isCancelled()) {
// Ignore canceled zones. The cancellation is edit-logged by the handler.
LOG.debug("Skipped a canceled re-encryption task");
return;
}
final ReencryptionTask task = completed.get();
boolean shouldRetry;
do {
dir.getFSNamesystem().writeLock();
try {
throttleTimerLocked.start();
processTask(task);
shouldRetry = false;
} catch (RetriableException | SafeModeException re) {
// Keep retrying until succeed.
LOG.info("Exception when processing re-encryption task for zone {}, "
+ "retrying...", task.zoneId, re);
shouldRetry = true;
Thread.sleep(faultRetryInterval);
} catch (IOException ioe) {
LOG.warn("Failure processing re-encryption task for zone {}",
task.zoneId, ioe);
++task.numFailures;
task.processed = true;
shouldRetry = false;
} finally {
dir.getFSNamesystem().writeUnlock("reencryptUpdater");
throttleTimerLocked.stop();
}
// logSync regardless, to prevent edit log buffer overflow triggering
// logSync inside FSN writelock.
dir.getEditLog().logSync();
} while (shouldRetry);
}
private void processTask(ReencryptionTask task)
throws InterruptedException, ExecutionException, IOException {
final List<XAttr> xAttrs;
final String zonePath;
dir.writeLock();
try {
handler.getTraverser().checkINodeReady(task.zoneId);
final INode zoneNode = dir.getInode(task.zoneId);
if (zoneNode == null) {
// ez removed.
return;
}
zonePath = zoneNode.getFullPathName();
LOG.info("Processing returned re-encryption task for zone {}({}), "
+ "batch size {}, start:{}", zonePath, task.zoneId,
task.batch.size(), task.batch.getFirstFilePath());
final ZoneSubmissionTracker tracker =
handler.getTracker(zoneNode.getId());
if (tracker == null) {
// re-encryption canceled.
LOG.info("Re-encryption was canceled.");
return;
}
tracker.numFutureDone++;
EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask();
processTaskEntries(zonePath, task);
EncryptionFaultInjector.getInstance().reencryptUpdaterProcessCheckpoint();
xAttrs = processCheckpoints(zoneNode, tracker);
} finally {
dir.writeUnlock();
}
FSDirEncryptionZoneOp.saveFileXAttrsForBatch(dir, task.batch.getBatch());
if (!xAttrs.isEmpty()) {
dir.getEditLog().logSetXAttrs(zonePath, xAttrs, false);
}
}
private synchronized void checkPauseForTesting() throws InterruptedException {
assert !dir.hasWriteLock();
assert !dir.getFSNamesystem().hasWriteLock();
if (pauseAfterNthCheckpoint != 0) {
ZoneSubmissionTracker tracker =
handler.unprotectedGetTracker(pauseZoneId);
if (tracker != null) {
if (tracker.numFutureDone == pauseAfterNthCheckpoint) {
shouldPauseForTesting = true;
pauseAfterNthCheckpoint = 0;
}
}
}
while (shouldPauseForTesting) {
LOG.info("Sleeping in the re-encryption updater for unit test.");
wait();
LOG.info("Continuing re-encryption updater after pausing.");
}
}
/**
* Throttles the ReencryptionUpdater to prevent from contending FSN/FSD write
* locks. This is done by the configuration.
*/
private void throttle() throws InterruptedException {
if (throttleLimitRatio >= 1.0) {
return;
}
final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
* throttleLimitRatio);
final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
if (LOG.isDebugEnabled()) {
LOG.debug("Re-encryption updater throttling expect: {}, actual: {},"
+ " throttleTimerAll:{}", expect, actual,
throttleTimerAll.now(TimeUnit.MILLISECONDS));
}
if (expect - actual < 0) {
// in case throttleLimitHandlerRatio is very small, expect will be 0.
// so sleepMs should not be calculated from expect, to really meet the
// ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
// should be 1000 - throttleTimerAll.now()
final long sleepMs =
(long) (actual / throttleLimitRatio) - throttleTimerAll
.now(TimeUnit.MILLISECONDS);
LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
Thread.sleep(sleepMs);
}
throttleTimerAll.reset().start();
throttleTimerLocked.reset();
}
}