blob: 6171eb8e7924dadc3479accf6eaaf7b36cc22685 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import java.io.Closeable;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The class provides utilities for key and token management.
*/
@InterfaceAudience.Private
public class KeyManager implements Closeable, DataEncryptionKeyFactory {
private static final Logger LOG = LoggerFactory.getLogger(KeyManager.class);
private final NamenodeProtocol namenode;
private final boolean isBlockTokenEnabled;
private final boolean encryptDataTransfer;
private boolean shouldRun;
private final BlockTokenSecretManager blockTokenSecretManager;
private final BlockKeyUpdater blockKeyUpdater;
private DataEncryptionKey encryptionKey;
/**
* Timer object for querying the current time. Separated out for
* unit testing.
*/
private Timer timer;
public KeyManager(String blockpoolID, NamenodeProtocol namenode,
boolean encryptDataTransfer, Configuration conf) throws IOException {
this.namenode = namenode;
this.encryptDataTransfer = encryptDataTransfer;
this.timer = new Timer();
final ExportedBlockKeys keys = namenode.getBlockKeys();
this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
if (isBlockTokenEnabled) {
long updateInterval = keys.getKeyUpdateInterval();
long tokenLifetime = keys.getTokenLifetime();
LOG.info("Block token params received from NN: update interval="
+ StringUtils.formatTime(updateInterval)
+ ", token lifetime=" + StringUtils.formatTime(tokenLifetime));
String encryptionAlgorithm = conf.get(
DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.blockTokenSecretManager = new BlockTokenSecretManager(
updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm);
this.blockTokenSecretManager.addKeys(keys);
// sync block keys with NN more frequently than NN updates its block keys
this.blockKeyUpdater = new BlockKeyUpdater(updateInterval / 4);
this.shouldRun = true;
} else {
this.blockTokenSecretManager = null;
this.blockKeyUpdater = null;
}
}
public void startBlockKeyUpdater() {
if (blockKeyUpdater != null) {
blockKeyUpdater.daemon.start();
}
}
/** Get an access token for a block. */
public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
) throws IOException {
if (!isBlockTokenEnabled) {
return BlockTokenSecretManager.DUMMY_TOKEN;
} else {
if (!shouldRun) {
throw new IOException(
"Cannot get access token since BlockKeyUpdater is not running");
}
return blockTokenSecretManager.generateToken(null, eb,
EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE, BlockTokenIdentifier.AccessMode.COPY));
}
}
@Override
public DataEncryptionKey newDataEncryptionKey() {
if (encryptDataTransfer) {
synchronized (this) {
if (encryptionKey == null ||
encryptionKey.expiryDate < timer.now()) {
// Encryption Key (EK) is generated from Block Key (BK).
// Check if EK is expired, and generate a new one using the current BK
// if so, otherwise continue to use the previously generated EK.
//
// It's important to make sure that when EK is not expired, the BK
// used to generate the EK is not expired and removed, because
// the same BK will be used to re-generate the EK
// by BlockTokenSecretManager.
//
// The current implementation ensures that when an EK is not expired
// (within tokenLifetime), the BK that's used to generate it
// still has at least "keyUpdateInterval" of life time before
// the BK gets expired and removed.
// See BlockTokenSecretManager for details.
LOG.debug("Generating new data encryption key because current key "
+ (encryptionKey == null ?
"is null." : "expired on " + encryptionKey.expiryDate));
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
}
return encryptionKey;
}
} else {
return null;
}
}
@Override
public void close() {
shouldRun = false;
try {
if (blockKeyUpdater != null) {
blockKeyUpdater.daemon.interrupt();
}
} catch(Exception e) {
LOG.warn("Exception shutting down access key updater thread", e);
}
}
/**
* Periodically updates access keys.
*/
class BlockKeyUpdater implements Runnable, Closeable {
private final Daemon daemon = new Daemon(this);
private final long sleepInterval;
BlockKeyUpdater(final long sleepInterval) {
this.sleepInterval = sleepInterval;
LOG.info("Update block keys every " + StringUtils.formatTime(sleepInterval));
}
@Override
public void run() {
try {
while (shouldRun) {
try {
blockTokenSecretManager.addKeys(namenode.getBlockKeys());
} catch (IOException e) {
LOG.error("Failed to set keys", e);
}
Thread.sleep(sleepInterval);
}
} catch (InterruptedException e) {
LOG.debug("InterruptedException in block key updater thread", e);
} catch (Throwable e) {
LOG.error("Exception in block key updater thread", e);
shouldRun = false;
}
}
@Override
public void close() throws IOException {
try {
daemon.interrupt();
} catch(Exception e) {
LOG.warn("Exception shutting down key updater thread", e);
}
}
}
}