blob: 9ba51af6488b6ebba66a3863818c03f4232acc23 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.offload.jcloud.provider;
import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.GCS_ACCOUNT_KEY_FILE_FIELD;
import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ID_FIELD;
import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_FIELD;
import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD;
import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_SECRET_FIELD;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.google.common.base.Strings;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Properties;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.BlobStoreBuilder;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.ConfigValidation;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.CredentialBuilder;
import org.apache.commons.lang3.StringUtils;
import org.jclouds.ContextBuilder;
import org.jclouds.aws.domain.SessionCredentials;
import org.jclouds.aws.s3.AWSS3ProviderMetadata;
import org.jclouds.azureblob.AzureBlobProviderMetadata;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.TransientApiMetadata;
import org.jclouds.domain.Credentials;
import org.jclouds.domain.Location;
import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
import org.jclouds.googlecloud.GoogleCredentialsFromJson;
import org.jclouds.googlecloudstorage.GoogleCloudStorageProviderMetadata;
import org.jclouds.providers.AnonymousProviderMetadata;
import org.jclouds.providers.ProviderMetadata;
import org.jclouds.s3.S3ApiMetadata;
import org.jclouds.s3.reference.S3Constants;
/**
* Enumeration of the supported JCloud Blob Store Providers.
* <p>
* Each Enumeration is responsible for implementation of its own validation,
* service authentication, and factory method for creating and instance of the
* JClod BlobStore type.
*
* Additional enumerations can be added in the future support other JCloud Providers,
* currently JClouds supports the following:
*
* - providers=[aws-s3, azureblob, b2, google-cloud-storage, rackspace-cloudfiles-us, rackspace-cloudfiles-uk]
* - apis=[s3, sts, transient, atmos, openstack-swift, openstack-keystone, openstack-keystone-3,
* rackspace-cloudfiles, rackspace-cloudidentity, filesystem]
*
* Note: The driver name associated with each Enum MUST match one of the above vaules, as it is used to instantiate the
* org.jclouds.ContextBuilder used to create the BlobStore.
*</p>
*/
@Slf4j
public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, BlobStoreBuilder, CredentialBuilder {
AWS_S3("aws-s3", new AWSS3ProviderMetadata()) {
@Override
public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
VALIDATION.validate(config);
}
@Override
public BlobStore getBlobStore(TieredStorageConfiguration config) {
return BLOB_STORE_BUILDER.getBlobStore(config);
}
@Override
public void buildCredentials(TieredStorageConfiguration config) {
AWS_CREDENTIAL_BUILDER.buildCredentials(config);
}
},
GOOGLE_CLOUD_STORAGE("google-cloud-storage", new GoogleCloudStorageProviderMetadata()) {
@Override
public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
VALIDATION.validate(config);
}
@Override
public BlobStore getBlobStore(TieredStorageConfiguration config) {
return BLOB_STORE_BUILDER.getBlobStore(config);
}
@Override
public void buildCredentials(TieredStorageConfiguration config) {
if (config.getCredentials() == null) {
try {
String gcsKeyContent = Files.asCharSource(
new File(config.getConfigProperty(GCS_ACCOUNT_KEY_FILE_FIELD)),
Charset.defaultCharset()).read();
config.setProviderCredentials(() -> new GoogleCredentialsFromJson(gcsKeyContent).get());
} catch (IOException ioe) {
log.error("Cannot read GCS service account credentials file: {}",
config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile"));
throw new IllegalArgumentException(ioe);
}
}
}
},
AZURE_BLOB("azureblob", new AzureBlobProviderMetadata()) {
@Override
public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
VALIDATION.validate(config);
}
@Override
public BlobStore getBlobStore(TieredStorageConfiguration config) {
ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
contextBuilder.overrides(config.getOverrides());
if (config.getProviderCredentials() != null) {
Credentials credentials = config.getProviderCredentials().get();
return contextBuilder
.credentials(credentials.identity, credentials.credential)
.buildView(BlobStoreContext.class)
.getBlobStore();
} else {
log.warn("The credentials is null. driver: {}, bucket: {}", config.getDriver(), config.getBucket());
return contextBuilder
.buildView(BlobStoreContext.class)
.getBlobStore();
}
}
@Override
public void buildCredentials(TieredStorageConfiguration config) {
String accountName = System.getenv("AZURE_STORAGE_ACCOUNT");
if (StringUtils.isEmpty(accountName)) {
throw new IllegalArgumentException("Couldn't get the azure storage account.");
}
String accountKey = System.getenv("AZURE_STORAGE_ACCESS_KEY");
if (StringUtils.isEmpty(accountKey)) {
throw new IllegalArgumentException("Couldn't get the azure storage access key.");
}
config.setProviderCredentials(() -> new Credentials(accountName, accountKey));
}
},
/**
* Aliyun OSS is compatible with the S3 API.
* https://www.alibabacloud.com/help/doc-detail/64919.htm
*/
ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
@Override
public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
ALIYUN_OSS_VALIDATION.validate(config);
}
@Override
public BlobStore getBlobStore(TieredStorageConfiguration config) {
return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
}
@Override
public void buildCredentials(TieredStorageConfiguration config) {
ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
}
},
TRANSIENT("transient", new AnonymousProviderMetadata(new TransientApiMetadata(), "")) {
@Override
public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
if (Strings.isNullOrEmpty(config.getBucket())) {
throw new IllegalArgumentException(
"Bucket cannot be empty for Local offload");
}
}
@Override
public BlobStore getBlobStore(TieredStorageConfiguration config) {
ContextBuilder builder = ContextBuilder.newBuilder("transient");
BlobStoreContext ctx = builder
.buildView(BlobStoreContext.class);
BlobStore bs = ctx.getBlobStore();
if (!bs.containerExists(config.getBucket())) {
Location loc = new LocationBuilder()
.scope(LocationScope.HOST)
.id(UUID.randomUUID() + "")
.description("Transient " + config.getBucket())
.build();
bs.createContainerInLocation(loc, config.getBucket());
}
System.out.println("Returning " + bs);
return bs;
}
@Override
public void buildCredentials(TieredStorageConfiguration config) {
// No-op
}
};
public static JCloudBlobStoreProvider getProvider(String driver) {
if (StringUtils.isEmpty(driver)) {
return null;
}
for (JCloudBlobStoreProvider provider : JCloudBlobStoreProvider.values()) {
if (provider.driver.equalsIgnoreCase(driver)) {
return provider;
}
}
return null;
}
public static boolean driverSupported(String driverName) {
for (JCloudBlobStoreProvider provider: JCloudBlobStoreProvider.values()) {
if (provider.getDriver().equalsIgnoreCase(driverName)) {
return true;
}
}
return false;
}
private String driver;
private ProviderMetadata providerMetadata;
JCloudBlobStoreProvider(String s, ProviderMetadata providerMetadata) {
this.driver = s;
this.providerMetadata = providerMetadata;
}
public String getDriver() {
return driver;
}
public ProviderMetadata getProviderMetadata() {
return providerMetadata;
}
// Constants for reuse across AWS, GCS, and Azure, etc.
static final ConfigValidation VALIDATION = (TieredStorageConfiguration config) -> {
if (Strings.isNullOrEmpty(config.getRegion()) && Strings.isNullOrEmpty(config.getServiceEndpoint())) {
throw new IllegalArgumentException(
"Either Region or ServiceEndpoint must specified for " + config.getDriver() + " offload");
}
if (Strings.isNullOrEmpty(config.getBucket())) {
throw new IllegalArgumentException(
"Bucket cannot be empty for " + config.getDriver() + " offload");
}
if (config.getMaxBlockSizeInBytes() < (5 * 1024 * 1024)) {
throw new IllegalArgumentException(
"ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for "
+ config.getDriver() + " offload");
}
};
static final BlobStoreBuilder BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
contextBuilder.overrides(config.getOverrides());
if (StringUtils.isNotEmpty(config.getServiceEndpoint())) {
contextBuilder.endpoint(config.getServiceEndpoint());
}
if (config.getProviderCredentials() != null) {
return contextBuilder
.credentialsSupplier(config.getCredentials()::get)
.buildView(BlobStoreContext.class)
.getBlobStore();
} else {
log.warn("The credentials is null. driver: {}, bucket: {}", config.getDriver(), config.getBucket());
return contextBuilder
.buildView(BlobStoreContext.class)
.getBlobStore();
}
};
static final CredentialBuilder AWS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
if (config.getCredentials() == null) {
final AWSCredentialsProvider authChain;
try {
if (!Strings.isNullOrEmpty(config.getConfigProperty(S3_ID_FIELD))
&& !Strings.isNullOrEmpty(config.getConfigProperty(S3_SECRET_FIELD))) {
AWSCredentials awsCredentials = new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return config.getConfigProperty(S3_ID_FIELD);
}
@Override
public String getAWSSecretKey() {
return config.getConfigProperty(S3_SECRET_FIELD);
}
};
authChain = new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
config.getConfigProperty(S3_ID_FIELD),
config.getConfigProperty(S3_SECRET_FIELD)));
} else if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
authChain = DefaultAWSCredentialsProviderChain.getInstance();
} else {
authChain =
new STSAssumeRoleSessionCredentialsProvider.Builder(
config.getConfigProperty(S3_ROLE_FIELD),
config.getConfigProperty(S3_ROLE_SESSION_NAME_FIELD)
).build();
}
// Important! Delay the building of actual credentials
// until later to support tokens that may be refreshed
// such as all session tokens
config.setProviderCredentials(() -> {
AWSCredentials newCreds = authChain.getCredentials();
Credentials jcloudCred = null;
if (newCreds instanceof AWSSessionCredentials) {
// if we have session credentials, we need to send the session token
// this allows us to support EC2 metadata credentials
jcloudCred = SessionCredentials.builder()
.accessKeyId(newCreds.getAWSAccessKeyId())
.secretAccessKey(newCreds.getAWSSecretKey())
.sessionToken(((AWSSessionCredentials) newCreds).getSessionToken())
.build();
} else {
// in the event we hit this branch, we likely don't have expiring
// credentials, however, this still allows for the user to update
// profiles creds or some other mechanism
jcloudCred = new Credentials(
newCreds.getAWSAccessKeyId(), newCreds.getAWSSecretKey());
}
return jcloudCred;
});
} catch (Exception e) {
// allowed, some mock s3 service do not need credential
log.warn("Exception when get credentials for s3 ", e);
}
}
};
static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
Properties overrides = config.getOverrides();
// For security reasons, OSS supports only virtual hosted style access.
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
contextBuilder.overrides(overrides);
contextBuilder.endpoint(config.getServiceEndpoint());
if (config.getProviderCredentials() != null) {
return contextBuilder
.credentialsSupplier(config.getCredentials()::get)
.buildView(BlobStoreContext.class)
.getBlobStore();
} else {
log.warn("The credentials is null. driver: {}, bucket: {}", config.getDriver(), config.getBucket());
return contextBuilder
.buildView(BlobStoreContext.class)
.getBlobStore();
}
};
static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> {
if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
throw new IllegalArgumentException(
"ServiceEndpoint must specified for " + config.getDriver() + " offload");
}
if (Strings.isNullOrEmpty(config.getBucket())) {
throw new IllegalArgumentException(
"Bucket cannot be empty for " + config.getDriver() + " offload");
}
if (config.getMaxBlockSizeInBytes() < (5 * 1024 * 1024)) {
throw new IllegalArgumentException(
"ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for "
+ config.getDriver() + " offload");
}
};
static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
if (StringUtils.isEmpty(accountName)) {
throw new IllegalArgumentException("Couldn't get the aliyun oss access key id.");
}
String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
if (StringUtils.isEmpty(accountKey)) {
throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret.");
}
Credentials credentials = new Credentials(
accountName, accountKey);
config.setProviderCredentials(() -> credentials);
};
}