blob: 47d6583338b3a7880fab61bc3e921e3baabd8a2c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Shell;
@Private
@Unstable
/**
* A history server state storage implementation that supports any persistent
* storage that adheres to the FileSystem interface.
*/
public class HistoryServerFileSystemStateStoreService
extends HistoryServerStateStoreService {
public static final Log LOG =
LogFactory.getLog(HistoryServerFileSystemStateStoreService.class);
private static final String ROOT_STATE_DIR_NAME = "HistoryServerState";
private static final String TOKEN_STATE_DIR_NAME = "tokens";
private static final String TOKEN_KEYS_DIR_NAME = "keys";
private static final String TOKEN_BUCKET_DIR_PREFIX = "tb_";
private static final String TOKEN_BUCKET_NAME_FORMAT =
TOKEN_BUCKET_DIR_PREFIX + "%03d";
private static final String TOKEN_MASTER_KEY_FILE_PREFIX = "key_";
private static final String TOKEN_FILE_PREFIX = "token_";
private static final String TMP_FILE_PREFIX = "tmp-";
private static final String UPDATE_TMP_FILE_PREFIX = "update-";
private static final FsPermission DIR_PERMISSIONS =
new FsPermission((short)0700);
private static final FsPermission FILE_PERMISSIONS = Shell.WINDOWS
? new FsPermission((short) 0700) : new FsPermission((short) 0400);
private static final int NUM_TOKEN_BUCKETS = 1000;
private FileSystem fs;
private Path rootStatePath;
private Path tokenStatePath;
private Path tokenKeysStatePath;
@Override
protected void initStorage(Configuration conf)
throws IOException {
final String storeUri = conf.get(JHAdminConfig.MR_HS_FS_STATE_STORE_URI);
if (storeUri == null) {
throw new IOException("No store location URI configured in " +
JHAdminConfig.MR_HS_FS_STATE_STORE_URI);
}
LOG.info("Using " + storeUri + " for history server state storage");
rootStatePath = new Path(storeUri, ROOT_STATE_DIR_NAME);
}
@Override
protected void startStorage() throws IOException {
fs = createFileSystem();
createDir(rootStatePath);
tokenStatePath = new Path(rootStatePath, TOKEN_STATE_DIR_NAME);
createDir(tokenStatePath);
tokenKeysStatePath = new Path(tokenStatePath, TOKEN_KEYS_DIR_NAME);
createDir(tokenKeysStatePath);
for (int i=0; i < NUM_TOKEN_BUCKETS; ++i) {
createDir(getTokenBucketPath(i));
}
}
FileSystem createFileSystem() throws IOException {
return rootStatePath.getFileSystem(getConfig());
}
@Override
protected void closeStorage() throws IOException {
// don't close the filesystem as it's part of the filesystem cache
// and other clients may still be using it
}
@Override
public HistoryServerState loadState() throws IOException {
LOG.info("Loading history server state from " + rootStatePath);
HistoryServerState state = new HistoryServerState();
loadTokenState(state);
return state;
}
@Override
public void storeToken(MRDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing token " + tokenId.getSequenceNumber());
}
Path tokenPath = getTokenPath(tokenId);
if (fs.exists(tokenPath)) {
throw new IOException(tokenPath + " already exists");
}
createNewFile(tokenPath, buildTokenData(tokenId, renewDate));
}
@Override
public void updateToken(MRDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating token " + tokenId.getSequenceNumber());
}
// Files cannot be atomically replaced, therefore we write a temporary
// update file, remove the original token file, then rename the update
// file to the token file. During recovery either the token file will be
// used or if that is missing and an update file is present then the
// update file is used.
Path tokenPath = getTokenPath(tokenId);
Path tmp = new Path(tokenPath.getParent(),
UPDATE_TMP_FILE_PREFIX + tokenPath.getName());
writeFile(tmp, buildTokenData(tokenId, renewDate));
try {
deleteFile(tokenPath);
} catch (IOException e) {
fs.delete(tmp, false);
throw e;
}
if (!fs.rename(tmp, tokenPath)) {
throw new IOException("Could not rename " + tmp + " to " + tokenPath);
}
}
@Override
public void removeToken(MRDelegationTokenIdentifier tokenId)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing token " + tokenId.getSequenceNumber());
}
deleteFile(getTokenPath(tokenId));
}
@Override
public void storeTokenMasterKey(DelegationKey key) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing master key " + key.getKeyId());
}
Path keyPath = new Path(tokenKeysStatePath,
TOKEN_MASTER_KEY_FILE_PREFIX + key.getKeyId());
if (fs.exists(keyPath)) {
throw new FileAlreadyExistsException(keyPath + " already exists");
}
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
DataOutputStream dataStream = new DataOutputStream(memStream);
try {
key.write(dataStream);
dataStream.close();
dataStream = null;
} finally {
IOUtils.cleanup(LOG, dataStream);
}
createNewFile(keyPath, memStream.toByteArray());
}
@Override
public void removeTokenMasterKey(DelegationKey key)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing master key " + key.getKeyId());
}
Path keyPath = new Path(tokenKeysStatePath,
TOKEN_MASTER_KEY_FILE_PREFIX + key.getKeyId());
deleteFile(keyPath);
}
private static int getBucketId(MRDelegationTokenIdentifier tokenId) {
return tokenId.getSequenceNumber() % NUM_TOKEN_BUCKETS;
}
private Path getTokenBucketPath(int bucketId) {
return new Path(tokenStatePath,
String.format(TOKEN_BUCKET_NAME_FORMAT, bucketId));
}
private Path getTokenPath(MRDelegationTokenIdentifier tokenId) {
Path bucketPath = getTokenBucketPath(getBucketId(tokenId));
return new Path(bucketPath,
TOKEN_FILE_PREFIX + tokenId.getSequenceNumber());
}
private void createDir(Path dir) throws IOException {
try {
FileStatus status = fs.getFileStatus(dir);
if (!status.isDirectory()) {
throw new FileAlreadyExistsException("Unexpected file in store: "
+ dir);
}
if (!status.getPermission().equals(DIR_PERMISSIONS)) {
fs.setPermission(dir, DIR_PERMISSIONS);
}
} catch (FileNotFoundException e) {
fs.mkdirs(dir, DIR_PERMISSIONS);
}
}
private void createNewFile(Path file, byte[] data)
throws IOException {
Path tmp = new Path(file.getParent(), TMP_FILE_PREFIX + file.getName());
writeFile(tmp, data);
try {
if (!fs.rename(tmp, file)) {
throw new IOException("Could not rename " + tmp + " to " + file);
}
} catch (IOException e) {
fs.delete(tmp, false);
throw e;
}
}
private void writeFile(Path file, byte[] data) throws IOException {
final int WRITE_BUFFER_SIZE = 4096;
FSDataOutputStream out = fs.create(file, FILE_PERMISSIONS, true,
WRITE_BUFFER_SIZE, fs.getDefaultReplication(file),
fs.getDefaultBlockSize(file), null);
try {
try {
out.write(data);
out.close();
out = null;
} finally {
IOUtils.cleanup(LOG, out);
}
} catch (IOException e) {
fs.delete(file, false);
throw e;
}
}
private byte[] readFile(Path file, long numBytes) throws IOException {
byte[] data = new byte[(int)numBytes];
FSDataInputStream in = fs.open(file);
try {
in.readFully(data);
} finally {
IOUtils.cleanup(LOG, in);
}
return data;
}
private void deleteFile(Path file) throws IOException {
boolean deleted;
try {
deleted = fs.delete(file, false);
} catch (FileNotFoundException e) {
deleted = true;
}
if (!deleted) {
throw new IOException("Unable to delete " + file);
}
}
private byte[] buildTokenData(MRDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException {
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
DataOutputStream dataStream = new DataOutputStream(memStream);
try {
tokenId.write(dataStream);
dataStream.writeLong(renewDate);
dataStream.close();
dataStream = null;
} finally {
IOUtils.cleanup(LOG, dataStream);
}
return memStream.toByteArray();
}
private void loadTokenMasterKey(HistoryServerState state, Path keyFile,
long numKeyFileBytes) throws IOException {
DelegationKey key = new DelegationKey();
byte[] keyData = readFile(keyFile, numKeyFileBytes);
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(keyData));
try {
key.readFields(in);
} finally {
IOUtils.cleanup(LOG, in);
}
state.tokenMasterKeyState.add(key);
}
private void loadTokenFromBucket(int bucketId,
HistoryServerState state, Path tokenFile, long numTokenFileBytes)
throws IOException {
MRDelegationTokenIdentifier token =
loadToken(state, tokenFile, numTokenFileBytes);
int tokenBucketId = getBucketId(token);
if (tokenBucketId != bucketId) {
throw new IOException("Token " + tokenFile
+ " should be in bucket " + tokenBucketId + ", found in bucket "
+ bucketId);
}
}
private MRDelegationTokenIdentifier loadToken(HistoryServerState state,
Path tokenFile, long numTokenFileBytes) throws IOException {
MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier();
long renewDate;
byte[] tokenData = readFile(tokenFile, numTokenFileBytes);
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(tokenData));
try {
tokenId.readFields(in);
renewDate = in.readLong();
} finally {
IOUtils.cleanup(LOG, in);
}
state.tokenState.put(tokenId, renewDate);
return tokenId;
}
private int loadTokensFromBucket(HistoryServerState state, Path bucket)
throws IOException {
String numStr =
bucket.getName().substring(TOKEN_BUCKET_DIR_PREFIX.length());
final int bucketId = Integer.parseInt(numStr);
int numTokens = 0;
FileStatus[] tokenStats = fs.listStatus(bucket);
Set<String> loadedTokens = new HashSet<String>(tokenStats.length);
for (FileStatus stat : tokenStats) {
String name = stat.getPath().getName();
if (name.startsWith(TOKEN_FILE_PREFIX)) {
loadTokenFromBucket(bucketId, state, stat.getPath(), stat.getLen());
loadedTokens.add(name);
++numTokens;
} else if (name.startsWith(UPDATE_TMP_FILE_PREFIX)) {
String tokenName = name.substring(UPDATE_TMP_FILE_PREFIX.length());
if (loadedTokens.contains(tokenName)) {
// already have the token, update may be partial so ignore it
fs.delete(stat.getPath(), false);
} else {
// token is missing, so try to parse the update temp file
loadTokenFromBucket(bucketId, state, stat.getPath(), stat.getLen());
fs.rename(stat.getPath(),
new Path(stat.getPath().getParent(), tokenName));
loadedTokens.add(tokenName);
++numTokens;
}
} else if (name.startsWith(TMP_FILE_PREFIX)) {
// cleanup incomplete temp files
fs.delete(stat.getPath(), false);
} else {
LOG.warn("Skipping unexpected file in history server token bucket: "
+ stat.getPath());
}
}
return numTokens;
}
private int loadKeys(HistoryServerState state) throws IOException {
FileStatus[] stats = fs.listStatus(tokenKeysStatePath);
int numKeys = 0;
for (FileStatus stat : stats) {
String name = stat.getPath().getName();
if (name.startsWith(TOKEN_MASTER_KEY_FILE_PREFIX)) {
loadTokenMasterKey(state, stat.getPath(), stat.getLen());
++numKeys;
} else {
LOG.warn("Skipping unexpected file in history server token state: "
+ stat.getPath());
}
}
return numKeys;
}
private int loadTokens(HistoryServerState state) throws IOException {
FileStatus[] stats = fs.listStatus(tokenStatePath);
int numTokens = 0;
for (FileStatus stat : stats) {
String name = stat.getPath().getName();
if (name.startsWith(TOKEN_BUCKET_DIR_PREFIX)) {
numTokens += loadTokensFromBucket(state, stat.getPath());
} else if (name.equals(TOKEN_KEYS_DIR_NAME)) {
// key loading is done elsewhere
continue;
} else {
LOG.warn("Skipping unexpected file in history server token state: "
+ stat.getPath());
}
}
return numTokens;
}
private void loadTokenState(HistoryServerState state) throws IOException {
int numKeys = loadKeys(state);
int numTokens = loadTokens(state);
LOG.info("Loaded " + numKeys + " master keys and " + numTokens
+ " tokens from " + tokenStatePath);
}
}