blob: 5d4884b32f762e849d76c2adfe394f94773375b9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hdfs.blobstore;
import static org.apache.storm.blobstore.BlobStoreAclHandler.ADMIN;
import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.security.auth.Subject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.storm.Config;
import org.apache.storm.blobstore.AtomicOutputStream;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.BlobStoreAclHandler;
import org.apache.storm.blobstore.BlobStoreFile;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WrappedKeyAlreadyExistsException;
import org.apache.storm.utils.WrappedKeyNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides a HDFS file system backed blob store implementation.
* Note that this provides an api for having HDFS be the backing store for the blobstore,
* it is not a service/daemon.
*
* <p>We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN
* access whereas the SUPERVISOR_ADMINS are given READ access in order to read and download the blobs form the nimbus.
*
* <p>The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER
* who has read, write or admin privileges in order to perform respective operations on the blob.
*
* <p>For hdfs blob store
* 1. The USER interacts with nimbus to upload and access blobs through NimbusBlobStore Client API. Here, unlike
* local blob store which stores the blobs locally, the nimbus talks to HDFS to upload the blobs.
* 2. The USER sets the ACLs, and the blob access is validated against these ACLs.
* 3. The SUPERVISOR interacts with nimbus through HdfsClientBlobStore to download the blobs. Here, unlike local
* blob store the supervisor interacts with HDFS directly to download the blobs. The call to HdfsBlobStore is made as a "null"
* subject. The blobstore gets the hadoop user and validates permissions for the supervisor.
*/
public class HdfsBlobStore extends BlobStore {
public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class);
private static final String DATA_PREFIX = "data_";
private static final String META_PREFIX = "meta_";
private static final HashMap<String, Subject> alreadyLoggedInUsers = new HashMap<>();
private BlobStoreAclHandler aclHandler;
private HdfsBlobStoreImpl hbs;
private Subject localSubject;
private Map<String, Object> conf;
/**
* Get the subject from Hadoop so we can use it to validate the acls. There is no direct
* interface from UserGroupInformation to get the subject, so do a doAs and get the context.
* We could probably run everything in the doAs but for now just grab the subject.
*/
private Subject getHadoopUser() {
Subject subj;
try {
subj = UserGroupInformation.getCurrentUser().doAs(
new PrivilegedAction<Subject>() {
@Override
public Subject run() {
return Subject.getSubject(AccessController.getContext());
}
});
} catch (IOException e) {
throw new RuntimeException("Error creating subject and logging user in!", e);
}
return subj;
}
/**
* If who is null then we want to use the user hadoop says we are.
* Required for the supervisor to call these routines as its not
* logged in as anyone.
*/
private Subject checkAndGetSubject(Subject who) {
if (who == null) {
return localSubject;
}
return who;
}
@Override
public void prepare(Map<String, Object> conf, String overrideBase, NimbusInfo nimbusInfo, ILeaderElector leaderElector) {
this.conf = conf;
prepareInternal(conf, overrideBase, null);
}
/**
* Allow a Hadoop Configuration to be passed for testing. If it's null then the hadoop configs
* must be in your classpath.
*/
protected void prepareInternal(Map<String, Object> conf, String overrideBase, Configuration hadoopConf) {
this.conf = conf;
if (overrideBase == null) {
overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
}
if (overrideBase == null) {
throw new RuntimeException("You must specify a blobstore directory for HDFS to use!");
}
LOG.debug("directory is: {}", overrideBase);
try {
// if a HDFS keytab/principal have been supplied login, otherwise assume they are
// logged in already or running insecure HDFS.
String principal = Config.getBlobstoreHDFSPrincipal(conf);
String keyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB);
if (principal != null && keyTab != null) {
String combinedKey = principal + " from " + keyTab;
synchronized (alreadyLoggedInUsers) {
localSubject = alreadyLoggedInUsers.get(combinedKey);
if (localSubject == null) {
UserGroupInformation.loginUserFromKeytab(principal, keyTab);
localSubject = getHadoopUser();
alreadyLoggedInUsers.put(combinedKey, localSubject);
}
}
} else {
if (principal == null && keyTab != null) {
throw new RuntimeException("You must specify an HDFS principal to go with the keytab!");
} else {
if (principal != null && keyTab == null) {
throw new RuntimeException("You must specify HDFS keytab go with the principal!");
}
}
localSubject = getHadoopUser();
}
} catch (IOException e) {
throw new RuntimeException("Error logging in from keytab: " + e.getMessage(), e);
}
aclHandler = new BlobStoreAclHandler(conf);
Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME);
try {
if (hadoopConf != null) {
hbs = new HdfsBlobStoreImpl(baseDir, conf, hadoopConf);
} else {
hbs = new HdfsBlobStoreImpl(baseDir, conf);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who)
throws AuthorizationException, KeyAlreadyExistsException {
if (meta.get_replication_factor() <= 0) {
meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
}
who = checkAndGetSubject(who);
validateKey(key);
aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | ADMIN);
BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
if (hbs.exists(DATA_PREFIX + key)) {
throw new WrappedKeyAlreadyExistsException(key);
}
BlobStoreFileOutputStream outputStream = null;
try {
BlobStoreFile metaFile = hbs.write(META_PREFIX + key, true);
metaFile.setMetadata(meta);
outputStream = new BlobStoreFileOutputStream(metaFile);
outputStream.write(Utils.thriftSerialize(meta));
outputStream.close();
outputStream = null;
BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, true);
dataFile.setMetadata(meta);
return new BlobStoreFileOutputStream(dataFile);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (outputStream != null) {
try {
outputStream.cancel();
} catch (IOException e) {
//Ignored
}
}
}
}
@Override
public AtomicOutputStream updateBlob(String key, Subject who)
throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
SettableBlobMeta meta = getStoredBlobMeta(key);
validateKey(key);
aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
try {
BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, false);
dataFile.setMetadata(meta);
return new BlobStoreFileOutputStream(dataFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException {
InputStream in = null;
try {
BlobStoreFile pf = hbs.read(META_PREFIX + key);
try {
in = pf.getInputStream();
} catch (FileNotFoundException fnf) {
throw new WrappedKeyNotFoundException(key);
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[2048];
int len;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
in.close();
in = null;
return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray());
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
//Ignored
}
}
}
}
@Override
public ReadableBlobMeta getBlobMeta(String key, Subject who)
throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
validateKey(key);
SettableBlobMeta meta = getStoredBlobMeta(key);
aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
ReadableBlobMeta rbm = new ReadableBlobMeta();
rbm.set_settable(meta);
try {
BlobStoreFile pf = hbs.read(DATA_PREFIX + key);
rbm.set_version(pf.getModTime());
} catch (IOException e) {
throw new RuntimeException(e);
}
return rbm;
}
/**
* Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi.
*
* @param leaderElector the leader elector
*/
@Override
public void setLeaderElector(ILeaderElector leaderElector) {
// NO-OP
}
@Override
public void setBlobMeta(String key, SettableBlobMeta meta, Subject who)
throws AuthorizationException, KeyNotFoundException {
if (meta.get_replication_factor() <= 0) {
meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
}
who = checkAndGetSubject(who);
validateKey(key);
aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN);
BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
SettableBlobMeta orig = getStoredBlobMeta(key);
aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
writeMetadata(key, meta);
}
@Override
public void deleteBlob(String key, Subject who)
throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
validateKey(key);
SettableBlobMeta meta = getStoredBlobMeta(key);
aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
try {
hbs.deleteKey(DATA_PREFIX + key);
hbs.deleteKey(META_PREFIX + key);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public InputStreamWithMeta getBlob(String key, Subject who)
throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
validateKey(key);
SettableBlobMeta meta = getStoredBlobMeta(key);
aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
try {
return new BlobStoreFileInputStream(hbs.read(DATA_PREFIX + key));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Checks if a blob exists.
*
* @param key blobstore key
* @param who subject
* @throws AuthorizationException if authorization is failed
*/
public boolean blobExists(String key, Subject who) throws AuthorizationException {
try {
who = checkAndGetSubject(who);
validateKey(key);
SettableBlobMeta meta = getStoredBlobMeta(key);
aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
} catch (KeyNotFoundException e) {
return false;
}
return true;
}
@Override
public Iterator<String> listKeys() {
try {
return new KeyTranslationIterator(hbs.listKeys(), DATA_PREFIX);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void shutdown() {
//Empty
}
@Override
public int getBlobReplication(String key, Subject who) throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
validateKey(key);
SettableBlobMeta meta = getStoredBlobMeta(key);
aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
try {
return hbs.getBlobReplication(DATA_PREFIX + key);
} catch (IOException exp) {
throw new RuntimeException(exp);
}
}
@Override
public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
validateKey(key);
SettableBlobMeta meta = getStoredBlobMeta(key);
meta.set_replication_factor(replication);
aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key);
try {
writeMetadata(key, meta);
return hbs.updateBlobReplication(DATA_PREFIX + key, replication);
} catch (IOException exp) {
throw new RuntimeException(exp);
}
}
public void writeMetadata(String key, SettableBlobMeta meta)
throws AuthorizationException, KeyNotFoundException {
BlobStoreFileOutputStream outputStream = null;
try {
BlobStoreFile hdfsFile = hbs.write(META_PREFIX + key, false);
hdfsFile.setMetadata(meta);
outputStream = new BlobStoreFileOutputStream(hdfsFile);
outputStream.write(Utils.thriftSerialize(meta));
outputStream.close();
outputStream = null;
} catch (IOException exp) {
throw new RuntimeException(exp);
} finally {
if (outputStream != null) {
try {
outputStream.cancel();
} catch (IOException e) {
//Ignored
}
}
}
}
public void fullCleanup(long age) throws IOException {
hbs.fullCleanup(age);
}
}