blob: ee6af3110306349bdece36a1bb06b546b7ca4732 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.s3;
import java.io.IOException;
import java.net.URI;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.*;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.storage.FileTablespace;
import net.minidev.json.JSONObject;
import static org.apache.tajo.storage.s3.TajoS3Constants.*;
public class S3TableSpace extends FileTablespace {
private final static Log LOG = LogFactory.getLog(S3TableSpace.class);
private AmazonS3 s3;
private boolean s3Enabled;
private int maxKeys;
public S3TableSpace(String spaceName, URI uri, JSONObject config) {
super(spaceName, uri, config);
}
@Override
public void init(TajoConf tajoConf) throws IOException {
super.init(tajoConf);
try {
// Try to get our credentials or just connect anonymously
String accessKey = conf.get(ACCESS_KEY, null);
String secretKey = conf.get(SECRET_KEY, null);
String userInfo = uri.getUserInfo();
if (userInfo != null) {
int index = userInfo.indexOf(':');
if (index != -1) {
accessKey = userInfo.substring(0, index);
secretKey = userInfo.substring(index + 1);
} else {
accessKey = userInfo;
}
}
AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
new BasicAWSCredentialsProvider(accessKey, secretKey),
new InstanceProfileCredentialsProvider(),
new AnonymousAWSCredentialsProvider()
);
ClientConfiguration awsConf = new ClientConfiguration();
awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES));
awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT));
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT));
String proxyHost = conf.getTrimmed(PROXY_HOST,"");
int proxyPort = conf.getInt(PROXY_PORT, -1);
if (!proxyHost.isEmpty()) {
awsConf.setProxyHost(proxyHost);
if (proxyPort >= 0) {
awsConf.setProxyPort(proxyPort);
} else {
if (secureConnections) {
LOG.warn("Proxy host set without port. Using HTTPS default 443");
awsConf.setProxyPort(443);
} else {
LOG.warn("Proxy host set without port. Using HTTP default 80");
awsConf.setProxyPort(80);
}
}
String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
if ((proxyUsername == null) != (proxyPassword == null)) {
String msg = "Proxy error: " + PROXY_USERNAME + " or " + PROXY_PASSWORD + " set without the other.";
LOG.error(msg);
}
awsConf.setProxyUsername(proxyUsername);
awsConf.setProxyPassword(proxyPassword);
awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Using proxy server %s:%d as user %s with password %s on domain %s as workstation " +
"%s", awsConf.getProxyHost(), awsConf.getProxyPort(), awsConf.getProxyUsername(),
awsConf.getProxyPassword(), awsConf.getProxyDomain(), awsConf.getProxyWorkstation()));
}
} else if (proxyPort >= 0) {
String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
LOG.error(msg);
}
s3 = new AmazonS3Client(credentials, awsConf);
String endPoint = conf.getTrimmed(ENDPOINT,"");
if (!endPoint.isEmpty()) {
try {
s3.setEndpoint(endPoint);
} catch (IllegalArgumentException e) {
String msg = "Incorrect endpoint: " + e.getMessage();
LOG.error(msg);
}
}
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
s3Enabled = true;
} catch (NoClassDefFoundError e) {
// If the version of hadoop is less than 2.6.0, hadoop doesn't include aws dependencies because it doesn't provide
// S3AFileSystem. In this case, tajo never uses aws s3 api directly.
LOG.warn(e);
s3Enabled = false;
} catch (Exception e) {
throw new TajoInternalError(e);
}
}
/**
* Calculate the total size of all objects in the indicated bucket
*
* @param path to use
* @return calculated size
* @throws IOException
*/
@Override
public long calculateSize(Path path) throws IOException {
long totalBucketSize = 0L;
if (s3Enabled) {
String key = pathToKey(path);
final FileStatus fileStatus = fs.getFileStatus(path);
if (fileStatus.isDirectory()) {
if (!key.isEmpty()) {
key = key + "/";
}
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(uri.getHost());
request.setPrefix(key);
request.setMaxKeys(maxKeys);
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: doing listObjects for directory " + key);
}
ObjectListing objects = s3.listObjects(request);
while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, fs.getWorkingDirectory());
// Skip over keys that are ourselves and old S3N _$folder$ files
if (keyPath.equals(path) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + keyPath);
}
continue;
}
if (!objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
totalBucketSize += summary.getSize();
}
}
if (objects.isTruncated()) {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch");
}
objects = s3.listNextBatchOfObjects(objects);
} else {
break;
}
}
} else {
return fileStatus.getLen();
}
} else {
totalBucketSize = fs.getContentSummary(path).getLength();
}
return totalBucketSize;
}
private boolean objectRepresentsDirectory(final String name, final long size) {
return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L;
}
private Path keyToPath(String key) {
return new Path("/" + key);
}
/* Turns a path (relative or otherwise) into an S3 key
*/
private String pathToKey(Path path) {
if (!path.isAbsolute()) {
path = new Path(fs.getWorkingDirectory(), path);
}
if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
return "";
}
return path.toUri().getPath().substring(1);
}
@VisibleForTesting
public void setAmazonS3Client(AmazonS3 s3) {
this.s3 = s3;
}
}