| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.blobstore; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import org.apache.storm.Config; |
| import org.apache.storm.generated.AuthorizationException; |
| import org.apache.storm.generated.BeginDownloadResult; |
| import org.apache.storm.generated.KeyAlreadyExistsException; |
| import org.apache.storm.generated.KeyNotFoundException; |
| import org.apache.storm.generated.ListBlobsResult; |
| import org.apache.storm.generated.ReadableBlobMeta; |
| import org.apache.storm.generated.SettableBlobMeta; |
| import org.apache.storm.thrift.TException; |
| import org.apache.storm.utils.NimbusClient; |
| import org.apache.storm.utils.ObjectReader; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * NimbusBlobStore is a USER facing client API to perform basic operations such as create, update, delete and read for local and hdfs blob |
| * store. |
| * |
| * <p>For local blob store it is also the client facing API for supervisor in order to download blobs from nimbus. |
| */ |
| public class NimbusBlobStore extends ClientBlobStore implements AutoCloseable { |
| private static final Logger LOG = LoggerFactory.getLogger(NimbusBlobStore.class); |
| private NimbusClient client; |
| private int bufferSize = 4096; |
| |
| @Override |
| public void prepare(Map<String, Object> conf) { |
| this.client = NimbusClient.getConfiguredClient(conf); |
| if (conf != null) { |
| this.bufferSize = ObjectReader.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize); |
| } |
| } |
| |
| @Override |
| protected AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta) |
| throws AuthorizationException, KeyAlreadyExistsException { |
| try { |
| synchronized (client) { |
| return new NimbusUploadAtomicOutputStream(client.getClient().beginCreateBlob(key, meta), this.bufferSize, key); |
| } |
| } catch (AuthorizationException | KeyAlreadyExistsException exp) { |
| throw exp; |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public AtomicOutputStream updateBlob(String key) |
| throws AuthorizationException, KeyNotFoundException { |
| try { |
| synchronized (client) { |
| return new NimbusUploadAtomicOutputStream(client.getClient().beginUpdateBlob(key), this.bufferSize, key); |
| } |
| } catch (AuthorizationException | KeyNotFoundException exp) { |
| throw exp; |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException { |
| try { |
| synchronized (client) { |
| return client.getClient().getBlobMeta(key); |
| } |
| } catch (AuthorizationException | KeyNotFoundException exp) { |
| throw exp; |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException { |
| try { |
| return client.getClient().isRemoteBlobExists(blobKey); |
| } catch (AuthorizationException aze) { |
| throw aze; |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| protected void setBlobMetaToExtend(String key, SettableBlobMeta meta) |
| throws AuthorizationException, KeyNotFoundException { |
| try { |
| synchronized (client) { |
| client.getClient().setBlobMeta(key, meta); |
| } |
| } catch (AuthorizationException | KeyNotFoundException exp) { |
| throw exp; |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException { |
| try { |
| synchronized (client) { |
| client.getClient().deleteBlob(key); |
| } |
| } catch (AuthorizationException | KeyNotFoundException exp) { |
| throw exp; |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void createStateInZookeeper(String key) { |
| try { |
| synchronized (client) { |
| client.getClient().createStateInZookeeper(key); |
| } |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException { |
| try { |
| synchronized (client) { |
| return new NimbusDownloadInputStream(client.getClient().beginBlobDownload(key)); |
| } |
| } catch (AuthorizationException | KeyNotFoundException exp) { |
| throw exp; |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public Iterator<String> listKeys() { |
| try { |
| synchronized (client) { |
| return new NimbusKeyIterator(client.getClient().listBlobs("")); |
| } |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException { |
| try { |
| return client.getClient().getBlobReplication(key); |
| } catch (AuthorizationException | KeyNotFoundException exp) { |
| throw exp; |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException { |
| try { |
| return client.getClient().updateBlobReplication(key, replication); |
| } catch (AuthorizationException | KeyNotFoundException exp) { |
| throw exp; |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public boolean setClient(Map<String, Object> conf, NimbusClient client) { |
| if (this.client != null) { |
| this.client.close(); |
| } |
| this.client = client; |
| if (conf != null) { |
| this.bufferSize = ObjectReader.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize); |
| } |
| return true; |
| } |
| |
| @Override |
| @SuppressWarnings("checkstyle:NoFinalizer") |
| protected void finalize() { |
| shutdown(); |
| } |
| |
| @Override |
| public void shutdown() { |
| if (client != null) { |
| client.close(); |
| client = null; |
| } |
| } |
| |
| @Override |
| public void close() { |
| shutdown(); |
| } |
| |
| public class NimbusKeyIterator implements Iterator<String> { |
| private ListBlobsResult listBlobs = null; |
| private int offset = 0; |
| private boolean eof = false; |
| |
| public NimbusKeyIterator(ListBlobsResult listBlobs) { |
| this.listBlobs = listBlobs; |
| this.eof = (listBlobs.get_keys_size() == 0); |
| } |
| |
| private boolean isCacheEmpty() { |
| return listBlobs.get_keys_size() <= offset; |
| } |
| |
| private void readMore() throws TException { |
| if (!eof) { |
| offset = 0; |
| synchronized (client) { |
| listBlobs = client.getClient().listBlobs(listBlobs.get_session()); |
| } |
| if (listBlobs.get_keys_size() == 0) { |
| eof = true; |
| } |
| } |
| } |
| |
| @Override |
| public synchronized boolean hasNext() { |
| try { |
| if (isCacheEmpty()) { |
| readMore(); |
| } |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| return !eof; |
| } |
| |
| @Override |
| public synchronized String next() { |
| if (!hasNext()) { |
| throw new NoSuchElementException(); |
| } |
| String ret = listBlobs.get_keys().get(offset); |
| offset++; |
| return ret; |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException("Delete Not Supported"); |
| } |
| } |
| |
| public class NimbusDownloadInputStream extends InputStreamWithMeta { |
| private BeginDownloadResult beginBlobDownload; |
| private byte[] buffer = null; |
| private int offset = 0; |
| private int end = 0; |
| private boolean eof = false; |
| |
| public NimbusDownloadInputStream(BeginDownloadResult beginBlobDownload) { |
| this.beginBlobDownload = beginBlobDownload; |
| } |
| |
| @Override |
| public long getVersion() throws IOException { |
| return beginBlobDownload.get_version(); |
| } |
| |
| @Override |
| public synchronized int read() throws IOException { |
| try { |
| if (isEmpty()) { |
| readMore(); |
| if (eof) { |
| return -1; |
| } |
| } |
| int length = Math.min(1, available()); |
| if (length == 0) { |
| return -1; |
| } |
| int ret = buffer[offset]; |
| offset += length; |
| return ret; |
| } catch (TException exp) { |
| throw new IOException(exp); |
| } |
| } |
| |
| @Override |
| public synchronized int read(byte[] b, int off, int len) throws IOException { |
| try { |
| if (isEmpty()) { |
| readMore(); |
| if (eof) { |
| return -1; |
| } |
| } |
| int length = Math.min(len, available()); |
| System.arraycopy(buffer, offset, b, off, length); |
| offset += length; |
| return length; |
| } catch (TException exp) { |
| throw new IOException(exp); |
| } |
| } |
| |
| @Override |
| public synchronized int read(byte[] b) throws IOException { |
| return read(b, 0, b.length); |
| } |
| |
| private boolean isEmpty() { |
| return buffer == null || offset >= end; |
| } |
| |
| private void readMore() throws TException { |
| if (!eof) { |
| ByteBuffer buff; |
| synchronized (client) { |
| buff = client.getClient().downloadBlobChunk(beginBlobDownload.get_session()); |
| } |
| buffer = buff.array(); |
| offset = buff.arrayOffset() + buff.position(); |
| int length = buff.remaining(); |
| end = offset + length; |
| if (length == 0) { |
| eof = true; |
| } |
| } |
| } |
| |
| @Override |
| public synchronized int available() { |
| return buffer == null ? 0 : (end - offset); |
| } |
| |
| @Override |
| public long getFileLength() { |
| return beginBlobDownload.get_data_size(); |
| } |
| } |
| |
| public class NimbusUploadAtomicOutputStream extends AtomicOutputStream { |
| private String session; |
| private int maxChunkSize = 4096; |
| private String key; |
| |
| public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) { |
| this.session = session; |
| this.maxChunkSize = bufferSize; |
| this.key = key; |
| } |
| |
| @Override |
| public void cancel() throws IOException { |
| try { |
| synchronized (client) { |
| client.getClient().cancelBlobUpload(session); |
| } |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void write(int b) throws IOException { |
| try { |
| synchronized (client) { |
| client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[]{ (byte) b })); |
| } |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void write(byte[] b) throws IOException { |
| write(b, 0, b.length); |
| } |
| |
| @Override |
| public void write(byte[] b, int offset, int len) throws IOException { |
| try { |
| int end = offset + len; |
| for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) { |
| int realLen = Math.min(end - realOffset, maxChunkSize); |
| LOG.debug("Writing {} bytes of {} remaining", realLen, (end - realOffset)); |
| synchronized (client) { |
| client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen)); |
| } |
| } |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| try { |
| synchronized (client) { |
| client.getClient().finishBlobUpload(session); |
| client.getClient().createStateInZookeeper(key); |
| } |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| } |