blob: c9219c5b40de0f5ca56cf65491b68700de633e2d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.blobstore;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.regex.Pattern;
import javax.security.auth.Subject;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides a way to store blobs that can be downloaded. Blobs must be able to be uploaded and listed from Nimbus, and
* downloaded from the Supervisors. It is a key value based store. Key being a string and value being the blob data.
*
* <p>ACL checking must take place against the provided subject. If the blob store does not support Security it must
* validate that all ACLs set are always WORLD, everything.
*
* <p>The users can upload their blobs through the blob store command line. The command line also allows us to update
* and delete blobs.
*
* <p>Modifying the replication factor only works for HdfsBlobStore as for the LocalFsBlobStore the replication is
* dependent on the number of Nimbodes available.
*/
public abstract class BlobStore implements Shutdownable, AutoCloseable {
protected static final String BASE_BLOBS_DIR_NAME = "blobs";
private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class);
private static final KeyFilter<String> TO_TOPO_ID = (key) -> ConfigUtils.getIdFromBlobKey(key);
/**
* Validates key checking for potentially harmful patterns.
*
* @param key Key for the blob
*/
public static final void validateKey(String key) throws IllegalArgumentException {
if (!Utils.isValidKey(key)) {
throw new IllegalArgumentException(key + " does not appear to be a valid blob key");
}
}
/**
* Allows us to initialize the blob store.
*
* @param conf The storm configuration
* @param baseDir The directory path to store the blobs
* @param nimbusInfo Contains the nimbus host, port and leadership information
*/
public abstract void prepare(Map<String, Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector);
/**
* Start the syncing blobs between the local running instance of the BlobStore and others.
* A no-op for the HdfsBlobStore where HDFS itself does the syncing
* but for the LocalFsBlobStore ZK state updates are run periodically here.
*/
public void startSyncBlobs() throws KeyNotFoundException, AuthorizationException {
// NO-OP by default
}
/**
* Creates the blob.
*
* @param key Key for the blob
* @param meta Metadata which contains the acls information
* @param who Is the subject creating the blob
* @return AtomicOutputStream returns a stream into which the data can be written
*/
public abstract AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException,
KeyAlreadyExistsException;
/**
* Wrapper called to create the blob which contains the byte data.
*
* @param key Key for the blob
* @param data Byte data that needs to be uploaded
* @param meta Metadata which contains the acls information
* @param who Is the subject creating the blob
*/
public void createBlob(String key, byte[] data, SettableBlobMeta meta, Subject who) throws AuthorizationException,
KeyAlreadyExistsException, IOException {
AtomicOutputStream out = null;
try {
out = createBlob(key, meta, who);
out.write(data);
out.close();
out = null;
} finally {
if (out != null) {
out.cancel();
}
}
}
/**
* Wrapper called to create the blob which contains the byte data.
*
* @param key Key for the blob
* @param in InputStream from which the data is read to be written as a part of the blob
* @param meta Metadata which contains the acls information
* @param who Is the subject creating the blob
*/
public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subject who) throws AuthorizationException,
KeyAlreadyExistsException, IOException {
AtomicOutputStream out = null;
try {
out = createBlob(key, meta, who);
byte[] buffer = new byte[2048];
int len = 0;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
out.close();
out = null;
} finally {
try {
if (out != null) {
out.cancel();
}
in.close();
} catch (IOException throwaway) {
// Ignored
}
}
}
/**
* Updates the blob data.
*
* @param key Key for the blob
* @param who Is the subject having the write privilege for the blob
* @return AtomicOutputStream returns a stream into which the data can be written
*/
public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
/**
* Wrapper called to create the blob which contains the byte data.
*
* @param key Key for the blob
* @param data Byte data that needs to be uploaded
* @param who Is the subject creating the blob
*/
public void updateBlob(String key, byte[] data, Subject who) throws AuthorizationException, IOException, KeyNotFoundException {
AtomicOutputStream out = null;
try {
out = updateBlob(key, who);
out.write(data);
out.close();
out = null;
} finally {
if (out != null) {
out.cancel();
}
}
}
/**
* Gets the current version of metadata for a blob to be viewed by the user or downloaded by the supervisor.
*
* @param key Key for the blob
* @param who Is the subject having the read privilege for the blob
* @return AtomicOutputStream returns a stream into which the data can be written
*/
public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
/**
* Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi.
*/
public abstract void setLeaderElector(ILeaderElector leaderElector);
/**
* Sets the metadata with renewed acls for the blob.
*
* @param key Key for the blob
* @param meta Metadata which contains the updated acls information
* @param who Is the subject having the write privilege for the blob
*/
public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException;
/**
* Deletes the blob data and metadata.
*
* @param key Key for the blob
* @param who Is the subject having write privilege for the blob
*/
public abstract void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
/**
* Gets the InputStream to read the blob details.
*
* @param key Key for the blob
* @param who Is the subject having the read privilege for the blob
* @return InputStreamWithMeta has the additional file length and version information
*/
public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
/**
* Returns an iterator with all the list of keys currently available on the blob store.
*
* @return {@code Iterator<String>}
*/
public abstract Iterator<String> listKeys();
/**
* Gets the replication factor of the blob.
*
* @param key Key for the blob
* @param who Is the subject having the read privilege for the blob
* @return BlobReplication object containing the replication factor for the blob
*/
public abstract int getBlobReplication(String key, Subject who) throws Exception;
/**
* Modifies the replication factor of the blob.
*
* @param key Key for the blob
* @param replication The replication factor the blob has to be set
* @param who Is the subject having the update privilege for the blob
* @return BlobReplication object containing the updated replication factor for the blob
*/
public abstract int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException,
IOException;
@Override
public void close() {
shutdown();
}
/**
* Filters keys based on the KeyFilter passed as the argument.
*
* @param filter KeyFilter
* @param <R> Type
* @return Set of filtered keys
*/
public <R> Set<R> filterAndListKeys(KeyFilter<R> filter) {
Set<R> ret = new HashSet<R>();
Iterator<String> keys = listKeys();
while (keys.hasNext()) {
String key = keys.next();
R filtered = filter.filter(key);
if (filtered != null) {
ret.add(filtered);
}
}
return ret;
}
/**
* Reads the blob from the blob store and writes it into the output stream.
*
* @param key Key for the blob
* @param out Output stream
* @param who Is the subject having read privilege for the blob
*/
public void readBlobTo(String key, OutputStream out, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
InputStreamWithMeta in = getBlob(key, who);
if (in == null) {
throw new IOException("Could not find " + key);
}
byte[] buffer = new byte[2048];
int len = 0;
try {
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
} finally {
in.close();
out.flush();
}
}
/**
* Wrapper around readBlobTo which returns a ByteArray output stream.
*
* @param key Key for the blob
* @param who Is the subject having the read privilege for the blob
*/
public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
readBlobTo(key, out, who);
byte[] bytes = out.toByteArray();
out.close();
return bytes;
}
/**
* Get IDs stored in blob store.
* @return a set of all of the topology ids with special data stored in the blob store.
*/
public Set<String> storedTopoIds() {
return filterAndListKeys(TO_TOPO_ID);
}
/**
* Blob store implements its own version of iterator to list the blobs.
*/
public static class KeyTranslationIterator implements Iterator<String> {
private Iterator<String> it = null;
private String next = null;
private String prefix = null;
public KeyTranslationIterator(Iterator<String> it, String prefix) throws IOException {
this.it = it;
this.prefix = prefix;
primeNext();
}
private void primeNext() {
next = null;
while (it.hasNext()) {
String tmp = it.next();
if (tmp.startsWith(prefix)) {
next = tmp.substring(prefix.length());
return;
}
}
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public String next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
String current = next;
primeNext();
return current;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Delete Not Supported");
}
}
/**
* Output stream implementation used for reading the metadata and data information.
*/
protected class BlobStoreFileOutputStream extends AtomicOutputStream {
private BlobStoreFile part;
private OutputStream out;
public BlobStoreFileOutputStream(BlobStoreFile part) throws IOException {
this.part = part;
this.out = part.getOutputStream();
}
@Override
public void close() throws IOException {
try {
//close means commit
out.close();
part.commit();
} catch (IOException | RuntimeException e) {
cancel();
throw e;
}
}
@Override
public void cancel() throws IOException {
try {
out.close();
} finally {
part.cancel();
}
}
@Override
public void write(int b) throws IOException {
out.write(b);
}
@Override
public void write(byte[] b) throws IOException {
out.write(b);
}
@Override
public void write(byte[] b, int offset, int len) throws IOException {
out.write(b, offset, len);
}
}
/**
* Input stream implementation used for writing both the metadata containing the acl information and the blob data.
*/
protected class BlobStoreFileInputStream extends InputStreamWithMeta {
private BlobStoreFile part;
private InputStream in;
public BlobStoreFileInputStream(BlobStoreFile part) throws IOException {
this.part = part;
this.in = part.getInputStream();
}
@Override
public long getVersion() throws IOException {
return part.getModTime();
}
@Override
public int read() throws IOException {
return in.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
@Override
public int read(byte[] b) throws IOException {
return in.read(b);
}
@Override
public int available() throws IOException {
return in.available();
}
@Override
public long getFileLength() throws IOException {
return part.getFileLength();
}
@Override
public void close() throws IOException {
in.close();
}
}
}