blob: 0579063060da9e82d4f32cd6e84d4f8b7e0c844b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.epam.dlab.module.aws;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.epam.dlab.core.AdapterBase;
import com.epam.dlab.exceptions.AdapterException;
import com.epam.dlab.model.aws.ReportLine;
import com.epam.dlab.module.ModuleName;
import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.MoreObjects.ToStringHelper;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.validation.constraints.NotNull;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.List;
/**
* The adapter for S3 file system of Amazon.
*/
@JsonTypeName(ModuleName.ADAPTER_S3_FILE)
@JsonClassDescription(
"Amazon S3 file system adapter.\n" +
"Read source or write converted data to the file in Amazon S3 bucket.\n" +
" - type: " + ModuleName.ADAPTER_S3_FILE + "\n" +
" [writeHeader: <true | false>] - write header of data to the adapterOut.\n" +
" bucket: <bucketname> - the name of S3 bucket.\n" +
" path: <path> - the path to the report or empty if used the root folder.\n" +
" accountId: <AWS account number> - the account number, see for details\n" +
" \"Detailed billing report with resources and tags\"\n" +
" http://docs.aws.amazon" +
".com/awsaccountbilling/latest/aboutv2/billing-reports.html#detailed-report-with-resources-tags\n" +
" [accessKeyId: <string>] - Amazon access key ID.\n" +
" [secretAccessKey: <string>] - Amazon secret access key."
)
public class AdapterS3File extends AdapterBase {
private static final Logger LOGGER = LoggerFactory.getLogger(AdapterS3File.class);
/**
* Name of key for the last loaded file.
*/
public static final String DATA_KEY_LAST_LOADED_FILE = "AdapterS3File_lastLoadedFile";
/**
* Name of key for the modification date of loaded file.
*/
public static final String DATA_KEY_LAST_MODIFICATION_DATE = "AdapterS3File_lastModifyDate";
private static final String CANNOT_READ_FILE_FORMAT = "Cannot read file %s. %s";
private static final String DELIMITER = "/";
/**
* The name of bucket.
*/
@NotNull
@JsonProperty
private String bucket;
/**
* The path to report.
*/
@JsonProperty
private String path;
/**
* AWS account number.
*/
@NotNull
@JsonProperty
private String accountId;
/**
* Access key ID for Amazon Web Services.
*/
@JsonProperty
private String accessKeyId;
/**
* Secret key for Amazon Web Services.
*/
@JsonProperty
private String secretAccessKey;
@JsonProperty
private boolean awsJobEnabled;
/**
* Return the name of bucket.
*/
public String getBucket() {
return bucket;
}
/**
* Set the name of bucket.
*/
public void setBucket(String bucket) {
this.bucket = bucket;
}
/**
* Return the path to report.
*/
public String getPath() {
return path;
}
/**
* Set the path to report.
*/
public void setPath(String path) {
this.path = path;
}
/**
* Return the AWS account number.
*/
public String getAccountId() {
return accountId;
}
/**
* Set the AWS account number.
*/
public void setAccountId(String accountId) {
this.accountId = accountId;
}
/**
* Return the access key ID for Amazon Web Services.
*/
public String getAccessKeyId() {
return this.accessKeyId;
}
/**
* Set the access key ID for Amazon Web Services.
*/
public void setAccessKeyId(String accessKeyId) {
this.accessKeyId = accessKeyId;
}
/**
* Return the secret key for Amazon Web Services.
*/
public String getSecretAccessKey() {
return this.secretAccessKey;
}
/**
* Set the secret key for Amazon Web Services.
*/
public void setSecretAccessKey(String secretAccessKey) {
this.secretAccessKey = secretAccessKey;
}
/**
* List of report files for loading.
*/
@JsonIgnore
private List<String> filelist = null;
/**
* Index of current report file.
*/
@JsonIgnore
private int currentFileIndex = -1;
/**
* Index of current report file.
*/
@JsonIgnore
private String entryName = null;
/**
* Amazon S3 client.
*/
@JsonIgnore
private AmazonS3 clientS3 = null;
/**
* Amazon S3 client.
*/
@JsonIgnore
private Date lastModificationDate = null;
/**
* File input stream.
*/
@JsonIgnore
private InputStream fileInputStream = null;
/**
* Reader for adapter.
*/
@JsonIgnore
private BufferedReader reader = null;
@Override
public void open() throws AdapterException {
LOGGER.debug("Adapter S3 will be opened for {}", getMode());
if (getMode() == Mode.READ) {
setLastModificationDate();
clientS3 = getAmazonClient();
S3FileList s3files = new S3FileList(awsJobEnabled, bucket, getModuleData());
filelist = s3files.getFiles(clientS3);
currentFileIndex = (filelist.isEmpty() ? -1 : 0);
fileInputStream = null;
reader = null;
entryName = null;
openNextEntry();
LOGGER.debug("Adapter S3 has been opened");
} else if (getMode() == Mode.WRITE) {
throw new AdapterException("Unsupported mode " + Mode.WRITE + ".");
} else {
throw new AdapterException("Mode of adapter unknown or not defined. Set mode to " + Mode.READ + ".");
}
}
@Override
public boolean hasMultyEntry() {
return true;
}
@Override
public boolean openNextEntry() throws AdapterException {
String filename = getCurrentFileName();
if (filename == null) {
if (filelist.isEmpty()) {
final String reportPath = path == null ? bucket : bucket + DELIMITER + path;
LOGGER.debug("New report files in bucket folder {} not found", reportPath);
}
return false;
}
entryName = filename;
LOGGER.debug("Open a next entry in file {}", filename);
reader = new BufferedReader(new InputStreamReader(getFileStream()));
try {
getModuleData().setId(filename);
getModuleData().setModificationDate(lastModificationDate);
getModuleData().set(DATA_KEY_LAST_LOADED_FILE, filename);
getModuleData().set(DATA_KEY_LAST_MODIFICATION_DATE, lastModificationDate);
getModuleData().store();
} catch (Exception e) {
throw new AdapterException(e.getLocalizedMessage(), e);
}
currentFileIndex++;
return false;
}
@Override
public boolean hasEntryData() {
return (reader != null);
}
@Override
public void close() throws AdapterException {
closeFile(getCurrentFileName());
}
@Override
public String getEntryName() {
return entryName;
}
@Override
public String readLine() throws AdapterException {
try {
return reader.readLine();
} catch (IOException e) {
throw new AdapterException(String.format(CANNOT_READ_FILE_FORMAT, getCurrentFileName(), e
.getLocalizedMessage()), e);
}
}
@Override
public void writeHeader(List<String> header) throws AdapterException {
throw new AdapterException("Unimplemented method.");
}
@Override
public Document writeRow(ReportLine row) throws AdapterException {
throw new AdapterException("Unimplemented method.");
}
/**
* Return the current file name.
*/
public String getCurrentFileName() {
return (filelist == null || currentFileIndex < 0 || currentFileIndex >= filelist.size() ? null : filelist.get
(currentFileIndex));
}
/**
* Creates and returns the Amazon client, as well as checks bucket existence.
*
* @throws AdapterException
*/
private AmazonS3 getAmazonClient() throws AdapterException {
AmazonS3 s3 = (accessKeyId == null ?
new AmazonS3Client() :
new AmazonS3Client(new BasicAWSCredentials(accessKeyId, secretAccessKey)));
if (!s3.doesBucketExist(bucket)) {
throw new AdapterException("Bucket \"" + bucket + "\" does not exist.");
}
return s3;
}
/**
* Open the source file and return reader.
*
* @throws AdapterException
*/
private InputStream getFileStream() throws AdapterException {
try {
GetObjectRequest request = new GetObjectRequest(bucket, getCurrentFileName());
S3Object object = clientS3.getObject(request);
lastModificationDate = object.getObjectMetadata().getLastModified();
return object.getObjectContent();
} catch (Exception e) {
throw new AdapterException("Cannot open file " + bucket + DELIMITER + getCurrentFileName() + ". " + e
.getLocalizedMessage(), e);
}
}
/**
* Return the modification date of loaded file.
*
* @throws AdapterException
*/
private void setLastModificationDate() throws AdapterException {
try {
lastModificationDate = getModuleData().getDate(DATA_KEY_LAST_MODIFICATION_DATE);
} catch (Exception e) {
throw new AdapterException("Cannot get the last modification date for report. " + e.getLocalizedMessage(),
e);
}
}
/**
* Close a zip file.
*
* @param filename file name.
* @throws AdapterException
*/
private void closeFile(String filename) throws AdapterException {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
throw new AdapterException("Cannot close file " + filename + ". " + e.getLocalizedMessage(), e);
}
fileInputStream = null;
}
}
@Override
public ToStringHelper toStringHelper(Object self) {
return super.toStringHelper(self)
.add("bucket", bucket)
.add("path", path)
.add("accountId", accountId)
.add("accessKeyId", "***")
.add("secretAccessKey", "***");
}
}