blob: 7079ed4660a09e15e0fa922f1ffefc8766102858 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
import static java.lang.Thread.currentThread;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.time.Instant;
import java.util.Collection;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.RequestOptions;
import com.microsoft.azure.storage.ResultContinuation;
import com.microsoft.azure.storage.ResultSegment;
import com.microsoft.azure.storage.RetryPolicy;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CopyStatus;
import com.microsoft.azure.storage.blob.ListBlobItem;
import com.microsoft.azure.storage.blob.SharedAccessBlobHeaders;
import com.microsoft.azure.storage.blob.SharedAccessBlobPermissions;
import com.microsoft.azure.storage.blob.SharedAccessBlobPolicy;
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadToken;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
import org.apache.jackrabbit.oak.spi.blob.AbstractDataRecord;
import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
import org.apache.jackrabbit.util.Base64;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AzureBlobStoreBackend extends AbstractSharedBackend {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStoreBackend.class);
private static final Logger LOG_STREAMS_DOWNLOAD = LoggerFactory.getLogger("oak.datastore.download.streams");
private static final Logger LOG_STREAMS_UPLOAD = LoggerFactory.getLogger("oak.datastore.upload.streams");
private static final String META_DIR_NAME = "META";
private static final String META_KEY_PREFIX = META_DIR_NAME + "/";
private static final String REF_KEY = "reference.key";
private static final long BUFFERED_STREAM_THRESHHOLD = 1024 * 1024;
static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 1024 * 1024 * 10; // 10MB
static final long MAX_MULTIPART_UPLOAD_PART_SIZE = 1024 * 1024 * 100; // 100MB
static final long MAX_SINGLE_PUT_UPLOAD_SIZE = 1024 * 1024 * 256; // 256MB, Azure limit
static final long MAX_BINARY_UPLOAD_SIZE = (long) Math.floor(1024L * 1024L * 1024L * 1024L * 4.75); // 4.75TB, Azure limit
private static final int MAX_ALLOWABLE_UPLOAD_URIS = 50000; // Azure limit
private static final int MAX_UNIQUE_RECORD_TRIES = 10;
private Properties properties;
private String containerName;
private String connectionString;
private int concurrentRequestCount = 1;
private RetryPolicy retryPolicy;
private Integer requestTimeout;
private int httpDownloadURIExpirySeconds = 0; // disabled by default
private int httpUploadURIExpirySeconds = 0; // disabled by default
private boolean createBlobContainer = true;
private boolean presignedDownloadURIVerifyExists = true;
private Cache<DataIdentifier, URI> httpDownloadURICache;
private byte[] secret;
public void setProperties(final Properties properties) {
this.properties = properties;
}
protected CloudBlobContainer getAzureContainer() throws DataStoreException {
CloudBlobContainer container = Utils.getBlobContainer(connectionString, containerName);
RequestOptions requestOptions = container.getServiceClient().getDefaultRequestOptions();
if (retryPolicy != null) {
requestOptions.setRetryPolicyFactory(retryPolicy);
}
if (requestTimeout != null) {
requestOptions.setTimeoutIntervalInMs(requestTimeout);
}
return container;
}
@Override
public void init() throws DataStoreException {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
long start = System.currentTimeMillis();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
LOG.debug("Started backend initialization");
if (null == properties) {
try {
properties = Utils.readConfig(Utils.DEFAULT_CONFIG_FILE);
}
catch (IOException e) {
throw new DataStoreException("Unable to initialize Azure Data Store from " + Utils.DEFAULT_CONFIG_FILE, e);
}
}
try {
Utils.setProxyIfNeeded(properties);
containerName = (String) properties.get(AzureConstants.AZURE_BLOB_CONTAINER_NAME);
createBlobContainer = PropertiesUtil.toBoolean(properties.getProperty(AzureConstants.AZURE_CREATE_CONTAINER), true);
connectionString = Utils.getConnectionStringFromProperties(properties);
concurrentRequestCount = PropertiesUtil.toInteger(properties.get(AzureConstants.AZURE_BLOB_CONCURRENT_REQUESTS_PER_OPERATION), 1);
LOG.info("Using concurrentRequestsPerOperation={}", concurrentRequestCount);
retryPolicy = Utils.getRetryPolicy((String)properties.get(AzureConstants.AZURE_BLOB_MAX_REQUEST_RETRY));
if (properties.getProperty(AzureConstants.AZURE_BLOB_REQUEST_TIMEOUT) != null) {
requestTimeout = PropertiesUtil.toInteger(properties.getProperty(AzureConstants.AZURE_BLOB_REQUEST_TIMEOUT), RetryPolicy.DEFAULT_CLIENT_RETRY_COUNT);
}
presignedDownloadURIVerifyExists =
PropertiesUtil.toBoolean(properties.get(AzureConstants.PRESIGNED_HTTP_DOWNLOAD_URI_VERIFY_EXISTS), true);
CloudBlobContainer azureContainer = getAzureContainer();
if (createBlobContainer && azureContainer.createIfNotExists()) {
LOG.info("New container created. containerName={}", containerName);
} else {
LOG.info("Reusing existing container. containerName={}", containerName);
}
LOG.debug("Backend initialized. duration={}",
+(System.currentTimeMillis() - start));
// settings pertaining to DataRecordAccessProvider functionality
String putExpiry = properties.getProperty(AzureConstants.PRESIGNED_HTTP_UPLOAD_URI_EXPIRY_SECONDS);
if (null != putExpiry) {
this.setHttpUploadURIExpirySeconds(Integer.parseInt(putExpiry));
}
String getExpiry = properties.getProperty(AzureConstants.PRESIGNED_HTTP_DOWNLOAD_URI_EXPIRY_SECONDS);
if (null != getExpiry) {
this.setHttpDownloadURIExpirySeconds(Integer.parseInt(getExpiry));
String cacheMaxSize = properties.getProperty(AzureConstants.PRESIGNED_HTTP_DOWNLOAD_URI_CACHE_MAX_SIZE);
if (null != cacheMaxSize) {
this.setHttpDownloadURICacheSize(Integer.parseInt(cacheMaxSize));
}
else {
this.setHttpDownloadURICacheSize(0); // default
}
}
}
catch (StorageException e) {
throw new DataStoreException(e);
}
}
finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
@Override
public InputStream read(DataIdentifier identifier) throws DataStoreException {
if (null == identifier) throw new NullPointerException("identifier");
String key = getKeyName(identifier);
long start = System.currentTimeMillis();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(
getClass().getClassLoader());
CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(key);
if (!blob.exists()) {
throw new DataStoreException(String.format("Trying to read missing blob. identifier=%s", key));
}
InputStream is = blob.openInputStream();
LOG.debug("Got input stream for blob. identifier={} duration={}", key, (System.currentTimeMillis() - start));
if (LOG_STREAMS_DOWNLOAD.isDebugEnabled()) {
// Log message, with exception so we can get a trace to see where the call came from
LOG_STREAMS_DOWNLOAD.debug("Binary downloaded from Azure Blob Storage - identifier={}", key, new Exception());
}
return is;
}
catch (StorageException e) {
LOG.info("Error reading blob. identifier=%s", key);
throw new DataStoreException(String.format("Cannot read blob. identifier=%s", key), e);
}
catch (URISyntaxException e) {
LOG.debug("Error reading blob. identifier=%s", key);
throw new DataStoreException(String.format("Cannot read blob. identifier=%s", key), e);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public void write(DataIdentifier identifier, File file) throws DataStoreException {
if (null == identifier) {
throw new NullPointerException("identifier");
}
if (null == file) {
throw new NullPointerException("file");
}
String key = getKeyName(identifier);
long start = System.currentTimeMillis();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
long len = file.length();
LOG.debug("Blob write started. identifier={} length={}", key, len);
CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(key);
if (!blob.exists()) {
BlobRequestOptions options = new BlobRequestOptions();
options.setConcurrentRequestCount(concurrentRequestCount);
boolean useBufferedStream = len < BUFFERED_STREAM_THRESHHOLD;
final InputStream in = useBufferedStream ? new BufferedInputStream(new FileInputStream(file)) : new FileInputStream(file);
try {
blob.upload(in, len, null, options, null);
LOG.debug("Blob created. identifier={} length={} duration={} buffered={}", key, len, (System.currentTimeMillis() - start), useBufferedStream);
if (LOG_STREAMS_UPLOAD.isDebugEnabled()) {
// Log message, with exception so we can get a trace to see where the call came from
LOG_STREAMS_UPLOAD.debug("Binary uploaded to Azure Blob Storage - identifier={}", key, new Exception());
}
} finally {
in.close();
}
return;
}
blob.downloadAttributes();
if (blob.getProperties().getLength() != len) {
throw new DataStoreException("Length Collision. identifier=" + key +
" new length=" + len +
" old length=" + blob.getProperties().getLength());
}
LOG.trace("Blob already exists. identifier={} lastModified={}", key, blob.getProperties().getLastModified().getTime());
blob.startCopy(blob);
//TODO: better way of updating lastModified (use custom metadata?)
if (!waitForCopy(blob)) {
throw new DataStoreException(
String.format("Cannot update lastModified for blob. identifier=%s status=%s",
key, blob.getCopyState().getStatusDescription()));
}
LOG.debug("Blob updated. identifier={} lastModified={} duration={}", key,
blob.getProperties().getLastModified().getTime(), (System.currentTimeMillis() - start));
}
catch (StorageException e) {
LOG.info("Error writing blob. identifier={}", key, e);
throw new DataStoreException(String.format("Cannot write blob. identifier=%s", key), e);
}
catch (URISyntaxException | IOException e) {
LOG.debug("Error writing blob. identifier={}", key, e);
throw new DataStoreException(String.format("Cannot write blob. identifier=%s", key), e);
} catch (InterruptedException e) {
LOG.debug("Error writing blob. identifier={}", key, e);
throw new DataStoreException(String.format("Cannot copy blob. identifier=%s", key), e);
} finally {
if (null != contextClassLoader) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
private static boolean waitForCopy(CloudBlob blob) throws StorageException, InterruptedException {
boolean continueLoop = true;
CopyStatus status = CopyStatus.PENDING;
while (continueLoop) {
blob.downloadAttributes();
status = blob.getCopyState().getStatus();
continueLoop = status == CopyStatus.PENDING;
// Sleep if retry is needed
if (continueLoop) {
Thread.sleep(500);
}
}
return status == CopyStatus.SUCCESS;
}
@Override
public byte[] getOrCreateReferenceKey() throws DataStoreException {
try {
if (secret != null && secret.length != 0) {
return secret;
} else {
byte[] key;
// Try reading from the metadata folder if it exists
key = readMetadataBytes(REF_KEY);
if (key == null) {
key = super.getOrCreateReferenceKey();
addMetadataRecord(new ByteArrayInputStream(key), REF_KEY);
key = readMetadataBytes(REF_KEY);
}
secret = key;
return secret;
}
} catch (IOException e) {
throw new DataStoreException("Unable to get or create key " + e);
}
}
private byte[] readMetadataBytes(String name) throws IOException, DataStoreException {
DataRecord rec = getMetadataRecord(name);
byte[] key = null;
if (rec != null) {
InputStream stream = null;
try {
stream = rec.getStream();
return IOUtils.toByteArray(stream);
} finally {
IOUtils.closeQuietly(stream);
}
}
return key;
}
@Override
public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
if (null == identifier) {
throw new NullPointerException("identifier");
}
String key = getKeyName(identifier);
long start = System.currentTimeMillis();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(key);
blob.downloadAttributes();
AzureBlobStoreDataRecord record = new AzureBlobStoreDataRecord(
this,
connectionString,
containerName,
new DataIdentifier(getIdentifierName(blob.getName())),
blob.getProperties().getLastModified().getTime(),
blob.getProperties().getLength());
LOG.debug("Data record read for blob. identifier={} duration={} record={}",
key, (System.currentTimeMillis() - start), record);
return record;
}
catch (StorageException e) {
LOG.info("Error getting data record for blob. identifier={}", key, e);
throw new DataStoreException(String.format("Cannot retrieve blob. identifier=%s", key), e);
}
catch (URISyntaxException e) {
LOG.debug("Error getting data record for blob. identifier={}", key, e);
throw new DataStoreException(String.format("Cannot retrieve blob. identifier=%s", key), e);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
return new RecordsIterator<DataIdentifier>(
new Function<AzureBlobInfo, DataIdentifier>() {
@Override
public DataIdentifier apply(AzureBlobInfo input) {
return new DataIdentifier(getIdentifierName(input.getName()));
}
}
);
}
@Override
public Iterator<DataRecord> getAllRecords() throws DataStoreException {
final AbstractSharedBackend backend = this;
return new RecordsIterator<DataRecord>(
new Function<AzureBlobInfo, DataRecord>() {
@Override
public DataRecord apply(AzureBlobInfo input) {
return new AzureBlobStoreDataRecord(
backend,
connectionString,
containerName,
new DataIdentifier(getIdentifierName(input.getName())),
input.getLastModified(),
input.getLength());
}
}
);
}
@Override
public boolean exists(DataIdentifier identifier) throws DataStoreException {
long start = System.currentTimeMillis();
String key = getKeyName(identifier);
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
boolean exists =getAzureContainer().getBlockBlobReference(key).exists();
LOG.debug("Blob exists={} identifier={} duration={}", exists, key, (System.currentTimeMillis() - start));
return exists;
}
catch (Exception e) {
throw new DataStoreException(e);
}
finally {
if (null != contextClassLoader) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public void close() throws DataStoreException {
LOG.info("AzureBlobBackend closed.");
}
@Override
public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
if (null == identifier) throw new NullPointerException("identifier");
String key = getKeyName(identifier);
long start = System.currentTimeMillis();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
boolean result = getAzureContainer().getBlockBlobReference(key).deleteIfExists();
LOG.debug("Blob {}. identifier={} duration={}",
result ? "deleted" : "delete requested, but it does not exist (perhaps already deleted)",
key, (System.currentTimeMillis() - start));
}
catch (StorageException e) {
LOG.info("Error deleting blob. identifier={}", key, e);
throw new DataStoreException(e);
}
catch (URISyntaxException e) {
throw new DataStoreException(e);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public void addMetadataRecord(InputStream input, String name) throws DataStoreException {
if (null == input) {
throw new NullPointerException("input");
}
if (Strings.isNullOrEmpty(name)) {
throw new IllegalArgumentException("name");
}
long start = System.currentTimeMillis();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
addMetadataRecordImpl(input, name, -1L);
LOG.debug("Metadata record added. metadataName={} duration={}", name, (System.currentTimeMillis() - start));
}
finally {
if (null != contextClassLoader) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public void addMetadataRecord(File input, String name) throws DataStoreException {
if (null == input) {
throw new NullPointerException("input");
}
if (Strings.isNullOrEmpty(name)) {
throw new IllegalArgumentException("name");
}
long start = System.currentTimeMillis();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
addMetadataRecordImpl(new FileInputStream(input), name, input.length());
LOG.debug("Metadata record added. metadataName={} duration={}", name, (System.currentTimeMillis() - start));
}
catch (FileNotFoundException e) {
throw new DataStoreException(e);
}
finally {
if (null != contextClassLoader) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
private void addMetadataRecordImpl(final InputStream input, String name, long recordLength) throws DataStoreException {
try {
CloudBlobDirectory metaDir = getAzureContainer().getDirectoryReference(META_DIR_NAME);
CloudBlockBlob blob = metaDir.getBlockBlobReference(name);
blob.upload(input, recordLength);
}
catch (StorageException e) {
LOG.info("Error adding metadata record. metadataName={} length={}", name, recordLength, e);
throw new DataStoreException(e);
}
catch (URISyntaxException | IOException e) {
throw new DataStoreException(e);
}
}
@Override
public DataRecord getMetadataRecord(String name) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
long start = System.currentTimeMillis();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
CloudBlobDirectory metaDir = getAzureContainer().getDirectoryReference(META_DIR_NAME);
CloudBlockBlob blob = metaDir.getBlockBlobReference(name);
if (!blob.exists()) {
LOG.warn("Trying to read missing metadata. metadataName={}", name);
return null;
}
blob.downloadAttributes();
long lastModified = blob.getProperties().getLastModified().getTime();
long length = blob.getProperties().getLength();
AzureBlobStoreDataRecord record = new AzureBlobStoreDataRecord(this,
connectionString,
containerName, new DataIdentifier(name),
lastModified,
length,
true);
LOG.debug("Metadata record read. metadataName={} duration={} record={}", name, (System.currentTimeMillis() - start), record);
return record;
} catch (StorageException e) {
LOG.info("Error reading metadata record. metadataName={}", name, e);
throw new RuntimeException(e);
} catch (Exception e) {
LOG.debug("Error reading metadata record. metadataName={}", name, e);
throw new RuntimeException(e);
} finally {
if (null != contextClassLoader) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public List<DataRecord> getAllMetadataRecords(String prefix) {
if (null == prefix) {
throw new NullPointerException("prefix");
}
long start = System.currentTimeMillis();
final List<DataRecord> records = Lists.newArrayList();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
CloudBlobDirectory metaDir = getAzureContainer().getDirectoryReference(META_DIR_NAME);
for (ListBlobItem item : metaDir.listBlobs(prefix)) {
if (item instanceof CloudBlob) {
CloudBlob blob = (CloudBlob) item;
records.add(new AzureBlobStoreDataRecord(
this,
connectionString,
containerName,
new DataIdentifier(stripMetaKeyPrefix(blob.getName())),
blob.getProperties().getLastModified().getTime(),
blob.getProperties().getLength(),
true));
}
}
LOG.debug("Metadata records read. recordsRead={} metadataFolder={} duration={}", records.size(), prefix, (System.currentTimeMillis() - start));
}
catch (StorageException e) {
LOG.info("Error reading all metadata records. metadataFolder={}", prefix, e);
}
catch (DataStoreException | URISyntaxException e) {
LOG.debug("Error reading all metadata records. metadataFolder={}", prefix, e);
}
finally {
if (null != contextClassLoader) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
return records;
}
@Override
public boolean deleteMetadataRecord(String name) {
long start = System.currentTimeMillis();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(addMetaKeyPrefix(name));
boolean result = blob.deleteIfExists();
LOG.debug("Metadata record {}. metadataName={} duration={}",
result ? "deleted" : "delete requested, but it does not exist (perhaps already deleted)",
name, (System.currentTimeMillis() - start));
return result;
}
catch (StorageException e) {
LOG.info("Error deleting metadata record. metadataName={}", name, e);
}
catch (DataStoreException | URISyntaxException e) {
LOG.debug("Error deleting metadata record. metadataName={}", name, e);
}
finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
return false;
}
@Override
public void deleteAllMetadataRecords(String prefix) {
if (null == prefix) {
throw new NullPointerException("prefix");
}
long start = System.currentTimeMillis();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
CloudBlobDirectory metaDir = getAzureContainer().getDirectoryReference(META_DIR_NAME);
int total = 0;
for (ListBlobItem item : metaDir.listBlobs(prefix)) {
if (item instanceof CloudBlob) {
if (((CloudBlob)item).deleteIfExists()) {
total++;
}
}
}
LOG.debug("Metadata records deleted. recordsDeleted={} metadataFolder={} duration={}",
total, prefix, (System.currentTimeMillis() - start));
}
catch (StorageException e) {
LOG.info("Error deleting all metadata records. metadataFolder={}", prefix, e);
}
catch (DataStoreException | URISyntaxException e) {
LOG.debug("Error deleting all metadata records. metadataFolder={}", prefix, e);
}
finally {
if (null != contextClassLoader) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public boolean metadataRecordExists(String name) {
long start = System.currentTimeMillis();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(addMetaKeyPrefix(name));
boolean exists = blob.exists();
LOG.debug("Metadata record {} exists {}. duration={}", name, exists, (System.currentTimeMillis() - start));
return exists;
}
catch (DataStoreException | StorageException | URISyntaxException e) {
LOG.debug("Error checking existence of metadata record = {}", name, e);
}
finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
return false;
}
/**
* Get key from data identifier. Object is stored with key in ADS.
*/
private static String getKeyName(DataIdentifier identifier) {
String key = identifier.toString();
return key.substring(0, 4) + Utils.DASH + key.substring(4);
}
/**
* Get data identifier from key.
*/
private static String getIdentifierName(String key) {
if (!key.contains(Utils.DASH)) {
return null;
} else if (key.contains(META_KEY_PREFIX)) {
return key;
}
return key.substring(0, 4) + key.substring(5);
}
private static String addMetaKeyPrefix(final String key) {
return META_KEY_PREFIX + key;
}
private static String stripMetaKeyPrefix(String name) {
if (name.startsWith(META_KEY_PREFIX)) {
return name.substring(META_KEY_PREFIX.length());
}
return name;
}
void setHttpDownloadURIExpirySeconds(int seconds) {
httpDownloadURIExpirySeconds = seconds;
}
void setHttpDownloadURICacheSize(int maxSize) {
// max size 0 or smaller is used to turn off the cache
if (maxSize > 0) {
LOG.info("presigned GET URI cache enabled, maxSize = {} items, expiry = {} seconds", maxSize, httpDownloadURIExpirySeconds / 2);
httpDownloadURICache = CacheBuilder.newBuilder()
.maximumSize(maxSize)
.expireAfterWrite(httpDownloadURIExpirySeconds / 2, TimeUnit.SECONDS)
.build();
} else {
LOG.info("presigned GET URI cache disabled");
httpDownloadURICache = null;
}
}
URI createHttpDownloadURI(@NotNull DataIdentifier identifier,
@NotNull DataRecordDownloadOptions downloadOptions) {
URI uri = null;
// When running unit test from Maven, it doesn't always honor the @NotNull decorators
if (null == identifier) throw new NullPointerException("identifier");
if (null == downloadOptions) throw new NullPointerException("downloadOptions");
if (httpDownloadURIExpirySeconds > 0) {
if (null != httpDownloadURICache) {
uri = httpDownloadURICache.getIfPresent(identifier);
}
if (null == uri) {
if (presignedDownloadURIVerifyExists) {
// Check if this identifier exists. If not, we want to return null
// even if the identifier is in the download URI cache.
try {
if (!exists(identifier)) {
LOG.warn("Cannot create download URI for nonexistent blob {}; returning null", getKeyName(identifier));
return null;
}
} catch (DataStoreException e) {
LOG.warn("Cannot create download URI for blob {} (caught DataStoreException); returning null", getKeyName(identifier), e);
return null;
}
}
String key = getKeyName(identifier);
SharedAccessBlobHeaders headers = new SharedAccessBlobHeaders();
headers.setCacheControl(String.format("private, max-age=%d, immutable", httpDownloadURIExpirySeconds));
String contentType = downloadOptions.getContentTypeHeader();
if (! Strings.isNullOrEmpty(contentType)) {
headers.setContentType(contentType);
}
String contentDisposition =
downloadOptions.getContentDispositionHeader();
if (! Strings.isNullOrEmpty(contentDisposition)) {
headers.setContentDisposition(contentDisposition);
}
uri = createPresignedURI(key,
EnumSet.of(SharedAccessBlobPermissions.READ),
httpDownloadURIExpirySeconds,
headers);
if (uri != null && httpDownloadURICache != null) {
httpDownloadURICache.put(identifier, uri);
}
}
}
return uri;
}
void setHttpUploadURIExpirySeconds(int seconds) { httpUploadURIExpirySeconds = seconds; }
private DataIdentifier generateSafeRandomIdentifier() {
return new DataIdentifier(
String.format("%s-%d",
UUID.randomUUID().toString(),
Instant.now().toEpochMilli()
)
);
}
DataRecordUpload initiateHttpUpload(long maxUploadSizeInBytes, int maxNumberOfURIs) {
List<URI> uploadPartURIs = Lists.newArrayList();
long minPartSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
long maxPartSize = MAX_MULTIPART_UPLOAD_PART_SIZE;
if (0L >= maxUploadSizeInBytes) {
throw new IllegalArgumentException("maxUploadSizeInBytes must be > 0");
}
else if (0 == maxNumberOfURIs) {
throw new IllegalArgumentException("maxNumberOfURIs must either be > 0 or -1");
}
else if (-1 > maxNumberOfURIs) {
throw new IllegalArgumentException("maxNumberOfURIs must either be > 0 or -1");
}
else if (maxUploadSizeInBytes > MAX_SINGLE_PUT_UPLOAD_SIZE &&
maxNumberOfURIs == 1) {
throw new IllegalArgumentException(
String.format("Cannot do single-put upload with file size %d - exceeds max single-put upload size of %d",
maxUploadSizeInBytes,
MAX_SINGLE_PUT_UPLOAD_SIZE)
);
}
else if (maxUploadSizeInBytes > MAX_BINARY_UPLOAD_SIZE) {
throw new IllegalArgumentException(
String.format("Cannot do upload with file size %d - exceeds max upload size of %d",
maxUploadSizeInBytes,
MAX_BINARY_UPLOAD_SIZE)
);
}
DataIdentifier newIdentifier = generateSafeRandomIdentifier();
String blobId = getKeyName(newIdentifier);
String uploadId = null;
if (httpUploadURIExpirySeconds > 0) {
// Always do multi-part uploads for Azure, even for small binaries.
//
// This is because Azure requires a unique header, "x-ms-blob-type=BlockBlob", to be
// set but only for single-put uploads, not multi-part.
// This would require clients to know not only the type of service provider being used
// but also the type of upload (single-put vs multi-part), which breaks abstraction.
// Instead we can insist that clients always do multi-part uploads to Azure, even
// if the multi-part upload consists of only one upload part. This doesn't require
// additional work on the part of the client since the "complete" request must always
// be sent regardless, but it helps us avoid the client having to know what type
// of provider is being used, or us having to instruct the client to use specific
// types of headers, etc.
// Azure doesn't use upload IDs like AWS does
// Generate a fake one for compatibility - we use them to determine whether we are
// doing multi-part or single-put upload
uploadId = Base64.encode(UUID.randomUUID().toString());
long numParts = 0L;
if (maxNumberOfURIs > 0) {
long requestedPartSize = (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) maxNumberOfURIs));
if (requestedPartSize <= maxPartSize) {
numParts = Math.min(
maxNumberOfURIs,
Math.min(
(long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) minPartSize)),
MAX_ALLOWABLE_UPLOAD_URIS
)
);
} else {
throw new IllegalArgumentException(
String.format("Cannot do multi-part upload with requested part size %d", requestedPartSize)
);
}
}
else {
long maximalNumParts = (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) MIN_MULTIPART_UPLOAD_PART_SIZE));
numParts = Math.min(maximalNumParts, MAX_ALLOWABLE_UPLOAD_URIS);
}
String key = getKeyName(newIdentifier);
EnumSet<SharedAccessBlobPermissions> perms = EnumSet.of(SharedAccessBlobPermissions.WRITE);
Map<String, String> presignedURIRequestParams = Maps.newHashMap();
presignedURIRequestParams.put("comp", "block");
for (long blockId = 1; blockId <= numParts; ++blockId) {
presignedURIRequestParams.put("blockId",
Base64.encode(String.format("%06d", blockId)));
uploadPartURIs.add(createPresignedURI(key, perms, httpUploadURIExpirySeconds, presignedURIRequestParams));
}
}
try {
byte[] secret = getOrCreateReferenceKey();
String uploadToken = new DataRecordUploadToken(blobId, uploadId).getEncodedToken(secret);
return new DataRecordUpload() {
@Override
@NotNull
public String getUploadToken() { return uploadToken; }
@Override
public long getMinPartSize() { return minPartSize; }
@Override
public long getMaxPartSize() { return maxPartSize; }
@Override
@NotNull
public Collection<URI> getUploadURIs() { return uploadPartURIs; }
};
}
catch (DataStoreException e) {
LOG.warn("Unable to obtain data store key");
}
return null;
}
DataRecord completeHttpUpload(@NotNull String uploadTokenStr)
throws DataRecordUploadException, DataStoreException {
if (Strings.isNullOrEmpty(uploadTokenStr)) {
throw new IllegalArgumentException("uploadToken required");
}
DataRecordUploadToken uploadToken = DataRecordUploadToken.fromEncodedToken(uploadTokenStr, getOrCreateReferenceKey());
String key = uploadToken.getBlobId();
DataIdentifier blobId = new DataIdentifier(getIdentifierName(key));
DataRecord record = null;
try {
record = getRecord(blobId);
// If this succeeds this means either it was a "single put" upload
// (we don't need to do anything in this case - blob is already uploaded)
// or it was completed before with the same token.
}
catch (DataStoreException e) {
// record doesn't exist - so this means we are safe to do the complete request
try {
if (uploadToken.getUploadId().isPresent()) {
CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(key);
// An existing upload ID means this is a multi-part upload
List<BlockEntry> blocks = blob.downloadBlockList(
BlockListingFilter.UNCOMMITTED,
AccessCondition.generateEmptyCondition(),
null,
null);
blob.commitBlockList(blocks);
long size = 0L;
for (BlockEntry block : blocks) {
size += block.getSize();
}
record = new AzureBlobStoreDataRecord(
this,
connectionString,
containerName,
blobId,
blob.getProperties().getLastModified().getTime(),
size);
}
else {
// Something is wrong - upload ID missing from upload token
// but record doesn't exist already, so this is invalid
throw new DataRecordUploadException(
String.format("Unable to finalize direct write of binary %s - upload ID missing from upload token",
blobId)
);
}
} catch (URISyntaxException | StorageException e2) {
throw new DataRecordUploadException(
String.format("Unable to finalize direct write of binary %s", blobId),
e
);
}
}
return record;
}
private URI createPresignedURI(String key,
EnumSet<SharedAccessBlobPermissions> permissions,
int expirySeconds,
SharedAccessBlobHeaders optionalHeaders) {
return createPresignedURI(key, permissions, expirySeconds, Maps.newHashMap(), optionalHeaders);
}
private URI createPresignedURI(String key,
EnumSet<SharedAccessBlobPermissions> permissions,
int expirySeconds,
Map<String, String> additionalQueryParams) {
return createPresignedURI(key, permissions, expirySeconds, additionalQueryParams, null);
}
private URI createPresignedURI(String key,
EnumSet<SharedAccessBlobPermissions> permissions,
int expirySeconds,
Map<String, String> additionalQueryParams,
SharedAccessBlobHeaders optionalHeaders) {
SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy();
Date expiry = Date.from(Instant.now().plusSeconds(expirySeconds));
policy.setSharedAccessExpiryTime(expiry);
policy.setPermissions(permissions);
String accountName = properties.getProperty(AzureConstants.AZURE_STORAGE_ACCOUNT_NAME, "");
if (Strings.isNullOrEmpty(accountName)) {
LOG.warn("Can't generate presigned URI - Azure account name not found in properties");
return null;
}
URI presignedURI = null;
try {
CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(key);
String sharedAccessSignature =
null == optionalHeaders ?
blob.generateSharedAccessSignature(policy,
null) :
blob.generateSharedAccessSignature(policy,
optionalHeaders,
null);
// Shared access signature is returned encoded already.
String uriString = String.format("https://%s.blob.core.windows.net/%s/%s?%s",
accountName,
containerName,
key,
sharedAccessSignature);
if (! additionalQueryParams.isEmpty()) {
StringBuilder builder = new StringBuilder();
for (Map.Entry<String, String> e : additionalQueryParams.entrySet()) {
builder.append("&");
builder.append(URLEncoder.encode(e.getKey(), Charsets.UTF_8.name()));
builder.append("=");
builder.append(URLEncoder.encode(e.getValue(), Charsets.UTF_8.name()));
}
uriString += builder.toString();
}
presignedURI = new URI(uriString);
}
catch (DataStoreException e) {
LOG.error("No connection to Azure Blob Storage", e);
}
catch (URISyntaxException | InvalidKeyException | UnsupportedEncodingException e) {
LOG.error("Can't generate a presigned URI for key {}", key, e);
}
catch (StorageException e) {
LOG.error("Azure request to create presigned Azure Blob Storage {} URI failed. " +
"Key: {}, Error: {}, HTTP Code: {}, Azure Error Code: {}",
permissions.contains(SharedAccessBlobPermissions.READ) ? "GET" :
(permissions.contains(SharedAccessBlobPermissions.WRITE) ? "PUT" : ""),
key,
e.getMessage(),
e.getHttpStatusCode(),
e.getErrorCode());
}
return presignedURI;
}
private static class AzureBlobInfo {
private final String name;
private final long lastModified;
private final long length;
public AzureBlobInfo(String name, long lastModified, long length) {
this.name = name;
this.lastModified = lastModified;
this.length = length;
}
public String getName() {
return name;
}
public long getLastModified() {
return lastModified;
}
public long getLength() {
return length;
}
public static AzureBlobInfo fromCloudBlob(CloudBlob cloudBlob) {
return new AzureBlobInfo(cloudBlob.getName(),
cloudBlob.getProperties().getLastModified().getTime(),
cloudBlob.getProperties().getLength());
}
}
private class RecordsIterator<T> extends AbstractIterator<T> {
// Seems to be thread-safe (in 5.0.0)
ResultContinuation resultContinuation;
boolean firstCall = true;
final Function<AzureBlobInfo, T> transformer;
final Queue<AzureBlobInfo> items = Lists.newLinkedList();
public RecordsIterator (Function<AzureBlobInfo, T> transformer) {
this.transformer = transformer;
}
@Override
protected T computeNext() {
if (items.isEmpty()) {
loadItems();
}
if (!items.isEmpty()) {
return transformer.apply(items.remove());
}
return endOfData();
}
private boolean loadItems() {
long start = System.currentTimeMillis();
ClassLoader contextClassLoader = currentThread().getContextClassLoader();
try {
currentThread().setContextClassLoader(getClass().getClassLoader());
CloudBlobContainer container = Utils.getBlobContainer(connectionString, containerName);
if (!firstCall && (resultContinuation == null || !resultContinuation.hasContinuation())) {
LOG.trace("No more records in container. containerName={}", container);
return false;
}
firstCall = false;
ResultSegment<ListBlobItem> results = container.listBlobsSegmented(null, false, EnumSet.noneOf(BlobListingDetails.class), null, resultContinuation, null, null);
resultContinuation = results.getContinuationToken();
for (ListBlobItem item : results.getResults()) {
if (item instanceof CloudBlob) {
items.add(AzureBlobInfo.fromCloudBlob((CloudBlob)item));
}
}
LOG.debug("Container records batch read. batchSize={} containerName={} duration={}",
results.getLength(), containerName, (System.currentTimeMillis() - start));
return results.getLength() > 0;
}
catch (StorageException e) {
LOG.info("Error listing blobs. containerName={}", containerName, e);
}
catch (DataStoreException e) {
LOG.debug("Cannot list blobs. containerName={}", containerName, e);
} finally {
if (contextClassLoader != null) {
currentThread().setContextClassLoader(contextClassLoader);
}
}
return false;
}
}
static class AzureBlobStoreDataRecord extends AbstractDataRecord {
final String connectionString;
final String containerName;
final long lastModified;
final long length;
final boolean isMeta;
public AzureBlobStoreDataRecord(AbstractSharedBackend backend, String connectionString, String containerName,
DataIdentifier key, long lastModified, long length) {
this(backend, connectionString, containerName, key, lastModified, length, false);
}
public AzureBlobStoreDataRecord(AbstractSharedBackend backend, String connectionString, String containerName,
DataIdentifier key, long lastModified, long length, boolean isMeta) {
super(backend, key);
this.connectionString = connectionString;
this.containerName = containerName;
this.lastModified = lastModified;
this.length = length;
this.isMeta = isMeta;
}
@Override
public long getLength() throws DataStoreException {
return length;
}
@Override
public InputStream getStream() throws DataStoreException {
String id = getKeyName(getIdentifier());
CloudBlobContainer container = Utils.getBlobContainer(connectionString, containerName);
if (isMeta) {
id = addMetaKeyPrefix(getIdentifier().toString());
}
else {
// Don't worry about stream logging for metadata records
if (LOG_STREAMS_DOWNLOAD.isDebugEnabled()) {
// Log message, with exception so we can get a trace to see where the call came from
LOG_STREAMS_DOWNLOAD.debug("Binary downloaded from Azure Blob Storage - identifier={} ", id, new Exception());
}
}
try {
return container.getBlockBlobReference(id).openInputStream();
} catch (StorageException | URISyntaxException e) {
throw new DataStoreException(e);
}
}
@Override
public long getLastModified() {
return lastModified;
}
@Override
public String toString() {
return "AzureBlobStoreDataRecord{" +
"identifier=" + getIdentifier() +
", length=" + length +
", lastModified=" + lastModified +
", containerName='" + containerName + '\'' +
'}';
}
}
}