| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.storm.blobstore; |
| |
| import org.apache.storm.Config; |
| import org.apache.storm.generated.SettableBlobMeta; |
| import org.apache.storm.generated.AuthorizationException; |
| import org.apache.storm.generated.KeyAlreadyExistsException; |
| import org.apache.storm.generated.KeyNotFoundException; |
| import org.apache.storm.generated.ReadableBlobMeta; |
| |
| import org.apache.storm.nimbus.NimbusInfo; |
| import org.apache.storm.utils.ConfigUtils; |
| import org.apache.storm.utils.Utils; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.security.auth.Subject; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.FileNotFoundException; |
| import java.io.InputStream; |
| |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set;; |
| |
| import static org.apache.storm.blobstore.BlobStoreAclHandler.ADMIN; |
| import static org.apache.storm.blobstore.BlobStoreAclHandler.READ; |
| import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * Provides a local file system backed blob store implementation for Nimbus. |
| * |
| * For a local blob store the user and the supervisor use NimbusBlobStore Client API in order to talk to nimbus through thrift. |
| * The authentication and authorization here is based on the subject. |
| * We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN |
| * access whereas the SUPERVISOR_ADMINS are given READ access in order to read and download the blobs form the nimbus. |
| * |
| * The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER |
| * who has read, write or admin privileges in order to perform respective operations on the blob. |
| * |
| * For local blob store |
| * 1. The USER interacts with nimbus to upload and access blobs through NimbusBlobStore Client API. |
| * 2. The USER sets the ACLs, and the blob access is validated against these ACLs. |
| * 3. The SUPERVISOR interacts with nimbus through the NimbusBlobStore Client API to download the blobs. |
| * The supervisors principal should match the set of users configured into SUPERVISOR_ADMINS. |
| * Here, the PrincipalToLocalPlugin takes care of mapping the principal to user name before the ACL validation. |
| */ |
| public class LocalFsBlobStore extends BlobStore { |
| public static final Logger LOG = LoggerFactory.getLogger(LocalFsBlobStore.class); |
| private static final String DATA_PREFIX = "data_"; |
| private static final String META_PREFIX = "meta_"; |
| protected BlobStoreAclHandler _aclHandler; |
| private final String BLOBSTORE_SUBTREE = "/blobstore/"; |
| private NimbusInfo nimbusInfo; |
| private FileBlobStoreImpl fbs; |
| private final int allPermissions = READ | WRITE | ADMIN; |
| private Map conf; |
| private CuratorFramework zkClient; |
| |
| @Override |
| public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) { |
| this.conf = conf; |
| this.nimbusInfo = nimbusInfo; |
| zkClient = BlobStoreUtils.createZKClient(conf); |
| if (overrideBase == null) { |
| overrideBase = ConfigUtils.absoluteStormBlobStoreDir(conf); |
| } |
| File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME); |
| try { |
| fbs = new FileBlobStoreImpl(baseDir, conf); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| _aclHandler = new BlobStoreAclHandler(conf); |
| } |
| |
| @Override |
| public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException { |
| LOG.debug("Creating Blob for key {}", key); |
| validateKey(key); |
| _aclHandler.normalizeSettableBlobMeta(key, meta, who, allPermissions); |
| BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); |
| _aclHandler.hasPermissions(meta.get_acl(), allPermissions, who, key); |
| if (fbs.exists(DATA_PREFIX+key)) { |
| throw new KeyAlreadyExistsException(key); |
| } |
| BlobStoreFileOutputStream mOut = null; |
| try { |
| mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, true)); |
| mOut.write(Utils.thriftSerialize(meta)); |
| mOut.close(); |
| mOut = null; |
| return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, true)); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } finally { |
| if (mOut != null) { |
| try { |
| mOut.cancel(); |
| } catch (IOException e) { |
| //Ignored |
| } |
| } |
| } |
| } |
| |
| @Override |
| public AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { |
| validateKey(key); |
| checkPermission(key, who, WRITE); |
| try { |
| return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, false)); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException { |
| InputStream in = null; |
| try { |
| LocalFsBlobStoreFile pf = fbs.read(META_PREFIX+key); |
| try { |
| in = pf.getInputStream(); |
| } catch (FileNotFoundException fnf) { |
| throw new KeyNotFoundException(key); |
| } |
| ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| byte [] buffer = new byte[2048]; |
| int len; |
| while ((len = in.read(buffer)) > 0) { |
| out.write(buffer, 0, len); |
| } |
| in.close(); |
| in = null; |
| return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray()); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } finally { |
| if (in != null) { |
| try { |
| in.close(); |
| } catch (IOException e) { |
| //Ignored |
| } |
| } |
| } |
| } |
| |
| @Override |
| public ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException { |
| validateKey(key); |
| if(!checkForBlobOrDownload(key)) { |
| checkForBlobUpdate(key); |
| } |
| SettableBlobMeta meta = getStoredBlobMeta(key); |
| _aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key); |
| ReadableBlobMeta rbm = new ReadableBlobMeta(); |
| rbm.set_settable(meta); |
| try { |
| LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX+key); |
| rbm.set_version(pf.getModTime()); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| return rbm; |
| } |
| |
| @Override |
| public void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException { |
| validateKey(key); |
| checkForBlobOrDownload(key); |
| _aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN); |
| BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); |
| SettableBlobMeta orig = getStoredBlobMeta(key); |
| _aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key); |
| BlobStoreFileOutputStream mOut = null; |
| try { |
| mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, false)); |
| mOut.write(Utils.thriftSerialize(meta)); |
| mOut.close(); |
| mOut = null; |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } finally { |
| if (mOut != null) { |
| try { |
| mOut.cancel(); |
| } catch (IOException e) { |
| //Ignored |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { |
| validateKey(key); |
| |
| if (!_aclHandler.checkForValidUsers(who, WRITE)) { |
| // need to get ACL from meta |
| LOG.debug("Retrieving meta to get ACL info... key: {} subject: {}", key, who); |
| |
| try { |
| checkPermission(key, who, WRITE); |
| } catch (KeyNotFoundException e) { |
| LOG.error("Error while retrieving meta from ZK or local... key: {} subject: {}", key, who); |
| throw e; |
| } |
| } else { |
| // able to delete the blob without checking meta's ACL |
| // skip checking everything and continue deleting local files |
| LOG.debug("Given subject is eligible to delete key without checking ACL, skipping... key: {} subject: {}", |
| key, who); |
| } |
| |
| try { |
| deleteKeyIgnoringFileNotFound(DATA_PREFIX + key); |
| deleteKeyIgnoringFileNotFound(META_PREFIX + key); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private void checkPermission(String key, Subject who, int mask) throws KeyNotFoundException, AuthorizationException { |
| checkForBlobOrDownload(key); |
| SettableBlobMeta meta = getStoredBlobMeta(key); |
| _aclHandler.hasPermissions(meta.get_acl(), mask, who, key); |
| } |
| |
| private void deleteKeyIgnoringFileNotFound(String key) throws IOException { |
| try { |
| fbs.deleteKey(key); |
| } catch (IOException e) { |
| if (e instanceof FileNotFoundException) { |
| LOG.debug("Ignoring FileNotFoundException since we're about to delete such key... key: {}", key); |
| } else { |
| throw e; |
| } |
| } |
| } |
| |
| @Override |
| public InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { |
| validateKey(key); |
| if(!checkForBlobOrDownload(key)) { |
| checkForBlobUpdate(key); |
| } |
| SettableBlobMeta meta = getStoredBlobMeta(key); |
| _aclHandler.hasPermissions(meta.get_acl(), READ, who, key); |
| try { |
| return new BlobStoreFileInputStream(fbs.read(DATA_PREFIX+key)); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public Iterator<String> listKeys() { |
| try { |
| return new KeyTranslationIterator(fbs.listKeys(), DATA_PREFIX); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void shutdown() { |
| if (zkClient != null) { |
| zkClient.close(); |
| } |
| } |
| |
| @Override |
| public int getBlobReplication(String key, Subject who) throws Exception { |
| int replicationCount = 0; |
| validateKey(key); |
| SettableBlobMeta meta = getStoredBlobMeta(key); |
| _aclHandler.hasPermissions(meta.get_acl(), READ, who, key); |
| if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) { |
| return 0; |
| } |
| replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size(); |
| return replicationCount; |
| } |
| |
| @Override |
| public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException { |
| throw new UnsupportedOperationException("For local file system blob store the update blobs function does not work. " + |
| "Please use HDFS blob store to make this feature available."); |
| } |
| |
| //This additional check and download is for nimbus high availability in case you have more than one nimbus |
| public synchronized boolean checkForBlobOrDownload(String key) throws KeyNotFoundException { |
| boolean checkBlobDownload = false; |
| try { |
| List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this); |
| if (!keyList.contains(key)) { |
| if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) != null) { |
| Set<NimbusInfo> nimbusSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key); |
| if (BlobStoreUtils.downloadMissingBlob(conf, this, key, nimbusSet)) { |
| LOG.debug("Updating blobs state"); |
| BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo); |
| checkBlobDownload = true; |
| } |
| } |
| } |
| } catch (KeyNotFoundException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| return checkBlobDownload; |
| } |
| |
| public synchronized void checkForBlobUpdate(String key) { |
| BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, nimbusInfo); |
| } |
| |
| public void fullCleanup(long age) throws IOException { |
| fbs.fullCleanup(age); |
| } |
| |
| @VisibleForTesting |
| File getKeyDataDir(String key) { |
| return fbs.getKeyDir(DATA_PREFIX + key); |
| } |
| } |