blob: e01c2c3eec55f7c1d040696e10bd80737c482744 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rest;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.*;
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.client.rest.headers.Header;
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.client.rpc.OzoneKMSUtil;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.net.HttpURLConnection.HTTP_CREATED;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_SERVICE_LIST_HTTP_ENDPOINT;
/**
* Ozone Client REST protocol implementation. It uses REST protocol to
* connect to Ozone Handler that executes client calls.
*/
public class RestClient implements ClientProtocol {
private static final String PATH_SEPARATOR = "/";
private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
private final Configuration conf;
private final URI ozoneRestUri;
private final CloseableHttpClient httpClient;
private final UserGroupInformation ugi;
// private final OzoneAcl.OzoneACLRights userRights;
/**
* Creates RestClient instance with the given configuration.
* @param conf Configuration
* @throws IOException
*/
public RestClient(Configuration conf)
throws IOException {
try {
Preconditions.checkNotNull(conf);
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
long socketTimeout = conf.getTimeDuration(
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT,
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
long connectionTimeout = conf.getTimeDuration(
OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT,
OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
int maxConnection = conf.getInt(
OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_MAX,
OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_DEFAULT);
int maxConnectionPerRoute = conf.getInt(
OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX,
OzoneConfigKeys
.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX_DEFAULT
);
/*
To make RestClient Thread safe, creating the HttpClient with
ThreadSafeClientConnManager.
*/
PoolingHttpClientConnectionManager connManager =
new PoolingHttpClientConnectionManager();
connManager.setMaxTotal(maxConnection);
connManager.setDefaultMaxPerRoute(maxConnectionPerRoute);
this.httpClient = HttpClients.custom()
.setConnectionManager(connManager)
.setDefaultRequestConfig(
RequestConfig.custom()
.setSocketTimeout(Math.toIntExact(socketTimeout))
.setConnectTimeout(Math.toIntExact(connectionTimeout))
.build())
.build();
// this.userRights = conf.getEnum(OMConfigKeys.OZONE_OM_USER_RIGHTS,
// OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT);
// TODO: Add new configuration parameter to configure RestServerSelector.
RestServerSelector defaultSelector = new DefaultRestServerSelector();
InetSocketAddress restServer = getOzoneRestServerAddress(defaultSelector);
URIBuilder uriBuilder = new URIBuilder()
.setScheme("http")
.setHost(restServer.getHostName())
.setPort(restServer.getPort());
this.ozoneRestUri = uriBuilder.build();
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
private InetSocketAddress getOzoneRestServerAddress(
RestServerSelector selector) throws IOException {
String httpAddress = conf.get(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY);
if (httpAddress == null) {
throw new IllegalArgumentException(
OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY + " must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration for" +
" details on configuring Ozone.");
}
HttpGet httpGet = new HttpGet("http://" + httpAddress +
OZONE_OM_SERVICE_LIST_HTTP_ENDPOINT);
HttpEntity entity = executeHttpRequest(httpGet);
try {
String serviceListJson = EntityUtils.toString(entity);
ObjectMapper objectMapper = new ObjectMapper();
TypeReference<List<ServiceInfo>> serviceInfoReference =
new TypeReference<List<ServiceInfo>>() {
};
List<ServiceInfo> services = objectMapper.readValue(
serviceListJson, serviceInfoReference);
List<ServiceInfo> dataNodeInfos = services.stream().filter(
a -> a.getNodeType().equals(HddsProtos.NodeType.DATANODE))
.collect(Collectors.toList());
ServiceInfo restServer = selector.getRestServer(dataNodeInfos);
return NetUtils.createSocketAddr(
NetUtils.normalizeHostName(restServer.getHostname()) + ":"
+ restServer.getPort(ServicePort.Type.HTTP));
} finally {
EntityUtils.consume(entity);
}
}
@Override
public void createVolume(String volumeName) throws IOException {
createVolume(volumeName, VolumeArgs.newBuilder().build());
}
@Override
public void createVolume(String volumeName, VolumeArgs volArgs)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName);
Preconditions.checkNotNull(volArgs);
URIBuilder builder = new URIBuilder(ozoneRestUri);
String owner = volArgs.getOwner() == null ?
ugi.getUserName() : volArgs.getOwner();
//TODO: support for ACLs has to be done in OzoneHandler (rest server)
/**
List<OzoneAcl> listOfAcls = new ArrayList<>();
//User ACL
listOfAcls.add(new OzoneAcl(OzoneAcl.OzoneACLType.USER,
owner, userRights));
//ACLs from VolumeArgs
if(volArgs.getAcls() != null) {
listOfAcls.addAll(volArgs.getAcls());
}
*/
builder.setPath(PATH_SEPARATOR + volumeName);
String quota = volArgs.getQuota();
if(quota != null) {
builder.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quota);
}
HttpPost httpPost = new HttpPost(builder.build());
addOzoneHeaders(httpPost);
//use admin from VolumeArgs, if it's present
if(volArgs.getAdmin() != null) {
httpPost.removeHeaders(HttpHeaders.AUTHORIZATION);
httpPost.addHeader(HttpHeaders.AUTHORIZATION,
Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
volArgs.getAdmin());
}
httpPost.addHeader(Header.OZONE_USER, owner);
LOG.info("Creating Volume: {}, with {} as owner and quota set to {}.",
volumeName, owner, quota == null ? "default" : quota);
EntityUtils.consume(executeHttpRequest(httpPost));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public void setVolumeOwner(String volumeName, String owner)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName);
Preconditions.checkNotNull(owner);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName);
HttpPut httpPut = new HttpPut(builder.build());
addOzoneHeaders(httpPut);
httpPut.addHeader(Header.OZONE_USER, owner);
EntityUtils.consume(executeHttpRequest(httpPut));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public void setVolumeQuota(String volumeName, OzoneQuota quota)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName);
Preconditions.checkNotNull(quota);
String quotaString = quota.toString();
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName);
builder.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quotaString);
HttpPut httpPut = new HttpPut(builder.build());
addOzoneHeaders(httpPut);
EntityUtils.consume(executeHttpRequest(httpPut));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public OzoneVolume getVolumeDetails(String volumeName)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName);
builder.setParameter(Header.OZONE_INFO_QUERY_TAG,
Header.OZONE_INFO_QUERY_VOLUME);
HttpGet httpGet = new HttpGet(builder.build());
addOzoneHeaders(httpGet);
HttpEntity response = executeHttpRequest(httpGet);
VolumeInfo volInfo =
VolumeInfo.parse(EntityUtils.toString(response));
//TODO: OzoneHandler in datanode has to be modified to send ACLs
OzoneVolume volume = new OzoneVolume(conf,
this,
volInfo.getVolumeName(),
volInfo.getCreatedBy(),
volInfo.getOwner().getName(),
volInfo.getQuota().sizeInBytes(),
HddsClientUtils.formatDateTime(volInfo.getCreatedOn()),
null);
EntityUtils.consume(response);
return volume;
} catch (URISyntaxException | ParseException e) {
throw new IOException(e);
}
}
@Override
public boolean checkVolumeAccess(String volumeName, OzoneAcl acl)
throws IOException {
throw new UnsupportedOperationException("Not yet implemented.");
}
@Override
public void deleteVolume(String volumeName) throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName);
HttpDelete httpDelete = new HttpDelete(builder.build());
addOzoneHeaders(httpDelete);
EntityUtils.consume(executeHttpRequest(httpDelete));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public List<OzoneVolume> listVolumes(String volumePrefix, String prevKey,
int maxListResult)
throws IOException {
return listVolumes(null, volumePrefix, prevKey, maxListResult);
}
@Override
public List<OzoneVolume> listVolumes(String user, String volumePrefix,
String prevKey, int maxListResult)
throws IOException {
try {
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR);
builder.addParameter(Header.OZONE_INFO_QUERY_TAG,
Header.OZONE_LIST_QUERY_SERVICE);
builder.addParameter(Header.OZONE_LIST_QUERY_MAXKEYS,
String.valueOf(maxListResult));
addQueryParamter(Header.OZONE_LIST_QUERY_PREFIX, volumePrefix, builder);
addQueryParamter(Header.OZONE_LIST_QUERY_PREVKEY, prevKey, builder);
HttpGet httpGet = new HttpGet(builder.build());
if (!Strings.isNullOrEmpty(user)) {
httpGet.addHeader(Header.OZONE_USER, user);
}
addOzoneHeaders(httpGet);
HttpEntity response = executeHttpRequest(httpGet);
ListVolumes volumeList =
ListVolumes.parse(EntityUtils.toString(response));
EntityUtils.consume(response);
return volumeList.getVolumes().stream().map(volInfo -> {
long creationTime = 0;
try {
creationTime = HddsClientUtils.formatDateTime(volInfo.getCreatedOn());
} catch (ParseException e) {
LOG.warn("Parse exception in getting creation time for volume", e);
}
return new OzoneVolume(conf, this, volInfo.getVolumeName(),
volInfo.getCreatedBy(), volInfo.getOwner().getName(),
volInfo.getQuota().sizeInBytes(), creationTime, null);
}).collect(Collectors.toList());
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public void createBucket(String volumeName, String bucketName)
throws IOException {
createBucket(volumeName, bucketName, BucketArgs.newBuilder().build());
}
@Override
public void createBucket(
String volumeName, String bucketName, BucketArgs bucketArgs)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
Preconditions.checkNotNull(bucketArgs);
URIBuilder builder = new URIBuilder(ozoneRestUri);
OzoneConsts.Versioning versioning = OzoneConsts.Versioning.DISABLED;
if(bucketArgs.getVersioning() != null &&
bucketArgs.getVersioning()) {
versioning = OzoneConsts.Versioning.ENABLED;
}
StorageType storageType = bucketArgs.getStorageType() == null ?
StorageType.DEFAULT : bucketArgs.getStorageType();
builder.setPath(PATH_SEPARATOR + volumeName +
PATH_SEPARATOR + bucketName);
HttpPost httpPost = new HttpPost(builder.build());
addOzoneHeaders(httpPost);
//ACLs from BucketArgs
if(bucketArgs.getAcls() != null) {
for (OzoneAcl acl : bucketArgs.getAcls()) {
httpPost.addHeader(
Header.OZONE_ACLS, Header.OZONE_ACL_ADD + " " + acl.toString());
}
}
httpPost.addHeader(Header.OZONE_STORAGE_TYPE, storageType.toString());
httpPost.addHeader(Header.OZONE_BUCKET_VERSIONING,
versioning.toString());
LOG.info("Creating Bucket: {}/{}, with Versioning {} and Storage Type" +
" set to {}", volumeName, bucketName, versioning,
storageType);
EntityUtils.consume(executeHttpRequest(httpPost));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public void setBucketVersioning(
String volumeName, String bucketName, Boolean versioning)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
Preconditions.checkNotNull(versioning);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName +
PATH_SEPARATOR + bucketName);
HttpPut httpPut = new HttpPut(builder.build());
addOzoneHeaders(httpPut);
httpPut.addHeader(Header.OZONE_BUCKET_VERSIONING,
getBucketVersioning(versioning).toString());
EntityUtils.consume(executeHttpRequest(httpPut));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public void setBucketStorageType(
String volumeName, String bucketName, StorageType storageType)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
Preconditions.checkNotNull(storageType);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName +
PATH_SEPARATOR + bucketName);
HttpPut httpPut = new HttpPut(builder.build());
addOzoneHeaders(httpPut);
httpPut.addHeader(Header.OZONE_STORAGE_TYPE, storageType.toString());
EntityUtils.consume(executeHttpRequest(httpPut));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public void deleteBucket(String volumeName, String bucketName)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName +
PATH_SEPARATOR + bucketName);
HttpDelete httpDelete = new HttpDelete(builder.build());
addOzoneHeaders(httpDelete);
EntityUtils.consume(executeHttpRequest(httpDelete));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public void checkBucketAccess(String volumeName, String bucketName)
throws IOException {
throw new UnsupportedOperationException("Not yet implemented.");
}
@Override
public OzoneBucket getBucketDetails(String volumeName, String bucketName)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName +
PATH_SEPARATOR + bucketName);
builder.setParameter(Header.OZONE_INFO_QUERY_TAG,
Header.OZONE_INFO_QUERY_BUCKET);
HttpGet httpGet = new HttpGet(builder.build());
addOzoneHeaders(httpGet);
HttpEntity response = executeHttpRequest(httpGet);
BucketInfo bucketInfo =
BucketInfo.parse(EntityUtils.toString(response));
OzoneBucket bucket = new OzoneBucket(conf,
this,
bucketInfo.getVolumeName(),
bucketInfo.getBucketName(),
bucketInfo.getStorageType(),
getBucketVersioningFlag(bucketInfo.getVersioning()),
HddsClientUtils.formatDateTime(bucketInfo.getCreatedOn()),
new HashMap<>());
EntityUtils.consume(response);
return bucket;
} catch (URISyntaxException | ParseException e) {
throw new IOException(e);
}
}
@Override
public List<OzoneBucket> listBuckets(String volumeName, String bucketPrefix,
String prevBucket, int maxListResult)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName);
builder.addParameter(Header.OZONE_INFO_QUERY_TAG,
Header.OZONE_INFO_QUERY_BUCKET);
builder.addParameter(Header.OZONE_LIST_QUERY_MAXKEYS,
String.valueOf(maxListResult));
addQueryParamter(Header.OZONE_LIST_QUERY_PREFIX, bucketPrefix, builder);
addQueryParamter(Header.OZONE_LIST_QUERY_PREVKEY, prevBucket, builder);
HttpGet httpGet = new HttpGet(builder.build());
addOzoneHeaders(httpGet);
HttpEntity response = executeHttpRequest(httpGet);
ListBuckets bucketList =
ListBuckets.parse(EntityUtils.toString(response));
EntityUtils.consume(response);
return bucketList.getBuckets().stream().map(bucketInfo -> {
long creationTime = 0;
try {
creationTime =
HddsClientUtils.formatDateTime(bucketInfo.getCreatedOn());
} catch (ParseException e) {
LOG.warn("Parse exception in getting creation time for volume", e);
}
return new OzoneBucket(conf, this, volumeName,
bucketInfo.getBucketName(), bucketInfo.getStorageType(),
getBucketVersioningFlag(bucketInfo.getVersioning()), creationTime,
new HashMap<>(), bucketInfo.getEncryptionKeyName());
}).collect(Collectors.toList());
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
/**
* Writes a key in an existing bucket.
*
* @param volumeName Name of the Volume
* @param bucketName Name of the Bucket
* @param keyName Name of the Key
* @param size Size of the data
* @param type
* @param factor @return {@link OzoneOutputStream}
*/
@Override
public OzoneOutputStream createKey(
String volumeName, String bucketName, String keyName, long size,
ReplicationType type, ReplicationFactor factor,
Map<String, String> metadata)
throws IOException {
// TODO: Once ReplicationType and ReplicationFactor are supported in
// OzoneHandler (in Datanode), set them in header.
try {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
HddsClientUtils.checkNotNull(keyName, type, factor);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName +
PATH_SEPARATOR + bucketName +
PATH_SEPARATOR + keyName);
HttpPut putRequest = new HttpPut(builder.build());
addOzoneHeaders(putRequest);
PipedInputStream in = new PipedInputStream();
OutputStream out = new PipedOutputStream(in);
putRequest.setEntity(new InputStreamEntity(in, size));
FutureTask<HttpEntity> futureTask =
new FutureTask<>(() -> executeHttpRequest(putRequest));
new Thread(futureTask).start();
OzoneOutputStream outputStream = new OzoneOutputStream(
new OutputStream() {
@Override
public void write(int b) throws IOException {
out.write(b);
}
@Override
public void close() throws IOException {
try {
out.close();
EntityUtils.consume(futureTask.get());
} catch (ExecutionException | InterruptedException e) {
throw new IOException(e);
}
}
});
return outputStream;
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
/**
* Get a valid Delegation Token. Not supported for RestClient.
*
* @param renewer the designated renewer for the token
* @return Token<OzoneDelegationTokenSelector>
* @throws IOException
*/
@Override
public Token<OzoneTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
throw new IOException("Method not supported");
}
/**
* Renew an existing delegation token. Not supported for RestClient.
*
* @param token delegation token obtained earlier
* @return the new expiration time
* @throws IOException
*/
@Override
public long renewDelegationToken(Token<OzoneTokenIdentifier> token)
throws IOException {
throw new IOException("Method not supported");
}
/**
* Cancel an existing delegation token. Not supported for RestClient.
*
* @param token delegation token
* @throws IOException
*/
@Override
public void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
throws IOException {
throw new IOException("Method not supported");
}
@Override
public S3SecretValue getS3Secret(String kerberosID) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public OMFailoverProxyProvider getOMProxyProvider() {
return null;
}
@Override
public KeyProvider getKeyProvider() throws IOException {
// TODO: fix me to support kms instances for difference OMs
return OzoneKMSUtil.getKeyProvider(conf, getKeyProviderUri());
}
@Override
public URI getKeyProviderUri() throws IOException {
return OzoneKMSUtil.getKeyProviderUri(ugi, null, null, conf);
}
@Override
public OzoneInputStream getKey(
String volumeName, String bucketName, String keyName)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
Preconditions.checkNotNull(keyName);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName +
PATH_SEPARATOR + bucketName +
PATH_SEPARATOR + keyName);
HttpGet getRequest = new HttpGet(builder.build());
addOzoneHeaders(getRequest);
HttpEntity entity = executeHttpRequest(getRequest);
PipedInputStream in = new PipedInputStream();
OutputStream out = new PipedOutputStream(in);
FutureTask<Void> futureTask =
new FutureTask<>(() -> {
entity.writeTo(out);
out.close();
return null;
});
new Thread(futureTask).start();
OzoneInputStream inputStream = new OzoneInputStream(
new InputStream() {
@Override
public int read() throws IOException {
return in.read();
}
@Override
public void close() throws IOException {
in.close();
EntityUtils.consume(entity);
}
});
return inputStream;
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public void deleteKey(String volumeName, String bucketName, String keyName)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
Preconditions.checkNotNull(keyName);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName +
PATH_SEPARATOR + bucketName + PATH_SEPARATOR + keyName);
HttpDelete httpDelete = new HttpDelete(builder.build());
addOzoneHeaders(httpDelete);
EntityUtils.consume(executeHttpRequest(httpDelete));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public void renameKey(String volumeName, String bucketName,
String fromKeyName, String toKeyName) throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
HddsClientUtils.checkNotNull(fromKeyName, toKeyName);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName + PATH_SEPARATOR + bucketName
+ PATH_SEPARATOR + fromKeyName);
builder.addParameter(Header.OZONE_RENAME_TO_KEY_PARAM_NAME, toKeyName);
HttpPost httpPost = new HttpPost(builder.build());
addOzoneHeaders(httpPost);
EntityUtils.consume(executeHttpRequest(httpPost));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public List<OzoneKey> listKeys(String volumeName, String bucketName,
String keyPrefix, String prevKey,
int maxListResult)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder
.setPath(PATH_SEPARATOR + volumeName + PATH_SEPARATOR + bucketName);
builder.addParameter(Header.OZONE_INFO_QUERY_TAG,
Header.OZONE_INFO_QUERY_KEY);
builder.addParameter(Header.OZONE_LIST_QUERY_MAXKEYS,
String.valueOf(maxListResult));
addQueryParamter(Header.OZONE_LIST_QUERY_PREFIX, keyPrefix, builder);
addQueryParamter(Header.OZONE_LIST_QUERY_PREVKEY, prevKey, builder);
HttpGet httpGet = new HttpGet(builder.build());
addOzoneHeaders(httpGet);
HttpEntity response = executeHttpRequest(httpGet);
ListKeys keyList = ListKeys.parse(EntityUtils.toString(response));
EntityUtils.consume(response);
return keyList.getKeyList().stream().map(keyInfo -> {
long creationTime = 0, modificationTime = 0;
try {
creationTime = HddsClientUtils.formatDateTime(keyInfo.getCreatedOn());
modificationTime =
HddsClientUtils.formatDateTime(keyInfo.getModifiedOn());
} catch (ParseException e) {
LOG.warn("Parse exception in getting creation time for volume", e);
}
return new OzoneKey(volumeName, bucketName, keyInfo.getKeyName(),
keyInfo.getSize(), creationTime, modificationTime,
ReplicationType.valueOf(keyInfo.getType().toString()));
}).collect(Collectors.toList());
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public OzoneKeyDetails getKeyDetails(
String volumeName, String bucketName, String keyName)
throws IOException {
try {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
Preconditions.checkNotNull(keyName);
URIBuilder builder = new URIBuilder(ozoneRestUri);
builder.setPath(PATH_SEPARATOR + volumeName +
PATH_SEPARATOR + bucketName + PATH_SEPARATOR + keyName);
builder.setParameter(Header.OZONE_INFO_QUERY_TAG,
Header.OZONE_INFO_QUERY_KEY_DETAIL);
HttpGet httpGet = new HttpGet(builder.build());
addOzoneHeaders(httpGet);
HttpEntity response = executeHttpRequest(httpGet);
KeyInfoDetails keyInfo =
KeyInfoDetails.parse(EntityUtils.toString(response));
List<OzoneKeyLocation> ozoneKeyLocations = new ArrayList<>();
keyInfo.getKeyLocations().forEach((a) -> ozoneKeyLocations.add(
new OzoneKeyLocation(a.getContainerID(), a.getLocalID(),
a.getLength(), a.getOffset())));
OzoneKeyDetails key = new OzoneKeyDetails(volumeName,
bucketName,
keyInfo.getKeyName(),
keyInfo.getSize(),
HddsClientUtils.formatDateTime(keyInfo.getCreatedOn()),
HddsClientUtils.formatDateTime(keyInfo.getModifiedOn()),
ozoneKeyLocations, ReplicationType.valueOf(
keyInfo.getType().toString()),
new HashMap<>(), keyInfo.getFileEncryptionInfo());
EntityUtils.consume(response);
return key;
} catch (URISyntaxException | ParseException e) {
throw new IOException(e);
}
}
@Override
public void createS3Bucket(String userName, String s3BucketName)
throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public void deleteS3Bucket(String s3BucketName)
throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public String getOzoneBucketMapping(String s3BucketName) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public String getOzoneVolumeName(String s3BucketName) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public String getOzoneBucketName(String s3BucketName) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public List<OzoneBucket> listS3Buckets(String userName, String bucketPrefix,
String prevBucket, int maxListResult)
throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
/**
* Adds Ozone headers to http request.
*
* @param httpRequest Http Request
*/
private void addOzoneHeaders(HttpUriRequest httpRequest) {
httpRequest.addHeader(HttpHeaders.AUTHORIZATION,
Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
ugi.getUserName());
httpRequest.addHeader(HttpHeaders.DATE,
HddsClientUtils.formatDateTime(Time.monotonicNow()));
httpRequest.addHeader(Header.OZONE_VERSION_HEADER,
Header.OZONE_V1_VERSION_HEADER);
}
/**
* Sends the http request to server and returns the response HttpEntity.
* It's responsibility of the caller to consume and close response HttpEntity
* by calling {@code EntityUtils.consume}
*
* @param httpUriRequest http request
* @throws IOException
*/
private HttpEntity executeHttpRequest(HttpUriRequest httpUriRequest)
throws IOException {
HttpResponse response = httpClient.execute(httpUriRequest);
int errorCode = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) {
return entity;
}
if (entity != null) {
throw new IOException(
OzoneException.parse(EntityUtils.toString(entity)));
} else {
throw new IOException("Unexpected null in http payload," +
" while processing request");
}
}
/**
* Converts OzoneConts.Versioning to boolean.
*
* @param version
* @return corresponding boolean value
*/
private Boolean getBucketVersioningFlag(
OzoneConsts.Versioning version) {
if(version != null) {
switch(version) {
case ENABLED:
return true;
case NOT_DEFINED:
case DISABLED:
default:
return false;
}
}
return false;
}
/**
* Converts Bucket versioning flag into OzoneConts.Versioning.
*
* @param flag versioning flag
* @return corresponding OzoneConts.Versionin
*/
private OzoneConsts.Versioning getBucketVersioning(Boolean flag) {
if(flag != null) {
if(flag) {
return OzoneConsts.Versioning.ENABLED;
} else {
return OzoneConsts.Versioning.DISABLED;
}
}
return OzoneConsts.Versioning.NOT_DEFINED;
}
@Override
public void close() throws IOException {
httpClient.close();
}
private void addQueryParamter(String param, String value,
URIBuilder builder) {
if (!Strings.isNullOrEmpty(value)) {
builder.addParameter(param, value);
}
}
@Override
public OmMultipartInfo initiateMultipartUpload(String volumeName,
String bucketName,
String keyName,
ReplicationType type,
ReplicationFactor factor)
throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public OzoneOutputStream createMultipartKey(String volumeName,
String bucketName,
String keyName,
long size,
int partNumber,
String uploadID)
throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public OmMultipartUploadCompleteInfo completeMultipartUpload(
String volumeName, String bucketName, String keyName, String uploadID,
Map<Integer, String> partsMap) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public void abortMultipartUpload(String volumeName,
String bucketName, String keyName, String uploadID) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public OzoneMultipartUploadPartListParts listParts(String volumeName,
String bucketName, String keyName, String uploadID, int partNumberMarker,
int maxParts) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
/**
* Get CanonicalServiceName for ozone delegation token.
* @return Canonical Service Name of ozone delegation token.
*/
public String getCanonicalServiceName(){
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public OzoneFileStatus getOzoneFileStatus(String volumeName,
String bucketName, String keyName) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public void createDirectory(String volumeName, String bucketName,
String keyName) {
throw new UnsupportedOperationException(
"Ozone REST protocol does not " + "support this operation.");
}
@Override
public OzoneInputStream readFile(String volumeName, String bucketName,
String keyName) {
throw new UnsupportedOperationException(
"Ozone REST protocol does not " + "support this operation.");
}
@Override
public OzoneOutputStream createFile(String volumeName, String bucketName,
String keyName, long size, ReplicationType type, ReplicationFactor factor,
boolean overWrite, boolean recursive) {
throw new UnsupportedOperationException(
"Ozone REST protocol does not " + "support this operation.");
}
@Override
public List<OzoneFileStatus> listStatus(String volumeName, String bucketName,
String keyName, boolean recursive, String startKey, long numEntries)
throws IOException {
throw new UnsupportedOperationException(
"Ozone REST protocol does not " + "support this operation.");
}
/**
* Add acl for Ozone object. Return true if acl is added successfully else
* false.
*
* @param obj Ozone object for which acl should be added.
* @param acl ozone acl top be added.
* @throws IOException if there is error.
*/
@Override
public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not" +
" support this operation.");
}
/**
* Remove acl for Ozone object. Return true if acl is removed successfully
* else false.
*
* @param obj Ozone object.
* @param acl Ozone acl to be removed.
* @throws IOException if there is error.
*/
@Override
public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not" +
" support this operation.");
}
/**
* Acls to be set for given Ozone object. This operations reset ACL for given
* object to list of ACLs provided in argument.
*
* @param obj Ozone object.
* @param acls List of acls.
* @throws IOException if there is error.
*/
@Override
public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not" +
" support this operation.");
}
/**
* Returns list of ACLs for given Ozone object.
*
* @param obj Ozone object.
* @throws IOException if there is error.
*/
@Override
public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not" +
" support this operation.");
}
}