blob: 015470dc4fad0e68a5a5249e5641145dfeb4eada [file] [log] [blame]
package org.apache.airavata.mft.transport.s3;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.util.IOUtils;
import org.apache.airavata.mft.core.api.ConnectorConfig;
import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.common.GenericResource;
import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
public class S3IncomingConnector implements IncomingChunkedConnector, IncomingStreamingConnector {
private static final Logger logger = LoggerFactory.getLogger(S3IncomingConnector.class);
private GenericResource resource;
private AmazonS3 s3Client;
@Override
public void init(ConnectorConfig cc) throws Exception {
try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
.buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
.setAuthzToken(cc.getAuthToken())
.setResourceId(cc.getResourceId()).build());
}
if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
}
S3Storage s3Storage = resource.getS3Storage();
S3Secret s3Secret;
try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
cc.getSecretServiceHost(), cc.getSecretServicePort())) {
s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder()
.setAuthzToken(cc.getAuthToken())
.setSecretId(cc.getCredentialToken()).build());
AWSCredentials awsCreds;
if (s3Secret.getSessionToken() == null || s3Secret.getSessionToken().equals("")) {
awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
} else {
awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
s3Secret.getSecretKey(),
s3Secret.getSessionToken());
}
s3Client = AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
s3Storage.getEndpoint(), s3Storage.getRegion()))
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}
}
@Override
public InputStream fetchInputStream() throws Exception {
S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), resource.getFile().getResourcePath());
return s3object.getObjectContent();
}
@Override
public InputStream fetchInputStream(String childPath) throws Exception {
S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), childPath);
return s3object.getObjectContent();
}
@Override
public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception {
GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
resource.getFile().getResourcePath());
rangeObjectRequest.setRange(startByte, endByte - 1);
ObjectMetadata objectMetadata = s3Client.getObject(rangeObjectRequest, new File(downloadFile));
logger.debug("Downloaded S3 chunk to path {} for resource id {}", downloadFile, resource.getResourceId());
}
@Override
public InputStream downloadChunk(int chunkId, long startByte, long endByte) throws Exception {
GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
resource.getFile().getResourcePath());
rangeObjectRequest.setRange(startByte, endByte - 1);
logger.debug("Fetching input stream for chunk {} in resource {}", chunkId, resource.getResourceId());
S3Object object = s3Client.getObject(rangeObjectRequest);
return object.getObjectContent();
}
@Override
public void complete() throws Exception {
}
@Override
public void failed() throws Exception {
}
}