blob: b947c9ee52006eb21b57b4779027103e2275729f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.s3.endpoint;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HEAD;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.s3.HeaderPreprocessor;
import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.io.S3WrapperInputStream;
import org.apache.hadoop.ozone.s3.util.RFC1123Util;
import org.apache.hadoop.ozone.s3.util.RangeHeader;
import org.apache.hadoop.ozone.s3.util.RangeHeaderParserUtil;
import org.apache.hadoop.ozone.s3.util.S3StorageType;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH;
import static javax.ws.rs.core.HttpHeaders.LAST_MODIFIED;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ENTITY_TOO_SMALL;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
import static org.apache.hadoop.ozone.s3.util.S3Consts.ACCEPT_RANGE_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.CONTENT_RANGE_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER_RANGE;
import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER_SUPPORTED_UNIT;
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Key level rest endpoints.
*/
@Path("/{bucket}/{path:.+}")
public class ObjectEndpoint extends EndpointBase {
private static final Logger LOG =
LoggerFactory.getLogger(ObjectEndpoint.class);
@Context
private HttpHeaders headers;
private List<String> customizableGetHeaders = new ArrayList<>();
public ObjectEndpoint() {
customizableGetHeaders.add("Content-Type");
customizableGetHeaders.add("Content-Language");
customizableGetHeaders.add("Expires");
customizableGetHeaders.add("Cache-Control");
customizableGetHeaders.add("Content-Disposition");
customizableGetHeaders.add("Content-Encoding");
}
/**
* Rest endpoint to upload object to a bucket.
* <p>
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html for
* more details.
*/
@PUT
public Response put(
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
@HeaderParam("Content-Length") long length,
@QueryParam("partNumber") int partNumber,
@QueryParam("uploadId") @DefaultValue("") String uploadID,
InputStream body) throws IOException, OS3Exception {
OzoneOutputStream output = null;
if (uploadID != null && !uploadID.equals("")) {
// If uploadID is specified, it is a request for upload part
return createMultipartKey(bucketName, keyPath, length,
partNumber, uploadID, body);
}
try {
String copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
ReplicationType replicationType;
ReplicationFactor replicationFactor;
boolean storageTypeDefault;
if (storageType == null || storageType.equals("")) {
replicationType = S3StorageType.getDefault().getType();
replicationFactor = S3StorageType.getDefault().getFactor();
storageTypeDefault = true;
} else {
try {
replicationType = S3StorageType.valueOf(storageType).getType();
replicationFactor = S3StorageType.valueOf(storageType).getFactor();
} catch (IllegalArgumentException ex) {
throw S3ErrorTable.newError(S3ErrorTable.INVALID_ARGUMENT,
storageType);
}
storageTypeDefault = false;
}
if (copyHeader != null) {
//Copy object, as copy source available.
CopyObjectResponse copyObjectResponse = copyObject(
copyHeader, bucketName, keyPath, replicationType,
replicationFactor, storageTypeDefault);
return Response.status(Status.OK).entity(copyObjectResponse).header(
"Connection", "close").build();
}
// Normal put object
OzoneBucket bucket = getBucket(bucketName);
output = bucket.createKey(keyPath, length, replicationType,
replicationFactor, new HashMap<>());
if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
.equals(headers.getHeaderString("x-amz-content-sha256"))) {
body = new SignedChunksInputStream(body);
}
IOUtils.copy(body, output);
return Response.ok().status(HttpStatus.SC_OK)
.build();
} catch (IOException ex) {
LOG.error("Exception occurred in PutObject", ex);
throw ex;
} finally {
if (output != null) {
output.close();
}
}
}
/**
* Rest endpoint to download object from a bucket, if query param uploadId
* is specified, request for list parts of a multipart upload key with
* specific uploadId.
* <p>
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
* https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadListParts.html
* for more details.
*/
@GET
public Response get(
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
@QueryParam("uploadId") String uploadId,
@QueryParam("max-parts") @DefaultValue("1000") int maxParts,
@QueryParam("part-number-marker") String partNumberMarker,
InputStream body) throws IOException, OS3Exception {
try {
if (uploadId != null) {
// When we have uploadId, this is the request for list Parts.
int partMarker = 0;
if (partNumberMarker != null) {
partMarker = Integer.parseInt(partNumberMarker);
}
return listParts(bucketName, keyPath, uploadId,
partMarker, maxParts);
}
OzoneBucket bucket = getBucket(bucketName);
OzoneKeyDetails keyDetails = bucket.getKey(keyPath);
long length = keyDetails.getDataSize();
LOG.debug("Data length of the key {} is {}", keyPath, length);
String rangeHeaderVal = headers.getHeaderString(RANGE_HEADER);
RangeHeader rangeHeader = null;
LOG.debug("range Header provided value is {}", rangeHeaderVal);
if (rangeHeaderVal != null) {
rangeHeader = RangeHeaderParserUtil.parseRangeHeader(rangeHeaderVal,
length);
LOG.debug("range Header provided value is {}", rangeHeader);
if (rangeHeader.isInValidRange()) {
OS3Exception exception = S3ErrorTable.newError(S3ErrorTable
.INVALID_RANGE, rangeHeaderVal);
throw exception;
}
}
ResponseBuilder responseBuilder;
if (rangeHeaderVal == null || rangeHeader.isReadFull()) {
StreamingOutput output = dest -> {
try (OzoneInputStream key = bucket.readKey(keyPath)) {
IOUtils.copy(key, dest);
}
};
responseBuilder = Response
.ok(output)
.header(CONTENT_LENGTH, keyDetails.getDataSize());
} else {
LOG.debug("range Header provided value is {}", rangeHeader);
OzoneInputStream key = bucket.readKey(keyPath);
long startOffset = rangeHeader.getStartOffset();
long endOffset = rangeHeader.getEndOffset();
long copyLength;
if (startOffset == endOffset) {
// if range header is given as bytes=0-0, then we should return 1
// byte from start offset
copyLength = 1;
} else {
copyLength = rangeHeader.getEndOffset() - rangeHeader
.getStartOffset() + 1;
}
StreamingOutput output = dest -> {
try (S3WrapperInputStream s3WrapperInputStream =
new S3WrapperInputStream(
key.getInputStream())) {
IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset,
copyLength);
}
};
responseBuilder = Response
.ok(output)
.header(CONTENT_LENGTH, copyLength);
String contentRangeVal = RANGE_HEADER_SUPPORTED_UNIT + " " +
rangeHeader.getStartOffset() + "-" + rangeHeader.getEndOffset() +
"/" + length;
responseBuilder.header(CONTENT_RANGE_HEADER, contentRangeVal);
}
responseBuilder.header(ACCEPT_RANGE_HEADER,
RANGE_HEADER_SUPPORTED_UNIT);
for (String responseHeader : customizableGetHeaders) {
String headerValue = headers.getHeaderString(responseHeader);
if (headerValue != null) {
responseBuilder.header(responseHeader, headerValue);
}
}
addLastModifiedDate(responseBuilder, keyDetails);
return responseBuilder.build();
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
throw S3ErrorTable.newError(S3ErrorTable
.NO_SUCH_KEY, keyPath);
} else {
throw ex;
}
}
}
private void addLastModifiedDate(
ResponseBuilder responseBuilder, OzoneKeyDetails key) {
ZonedDateTime lastModificationTime =
Instant.ofEpochMilli(key.getModificationTime())
.atZone(ZoneId.of("GMT"));
responseBuilder
.header(LAST_MODIFIED,
RFC1123Util.FORMAT.format(lastModificationTime));
}
/**
* Rest endpoint to check existence of an object in a bucket.
* <p>
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectHEAD.html
* for more details.
*/
@HEAD
public Response head(
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath) throws Exception {
OzoneKeyDetails key;
try {
key = getBucket(bucketName).getKey(keyPath);
// TODO: return the specified range bytes of this object.
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
// Just return 404 with no content
return Response.status(Status.NOT_FOUND).build();
} else {
throw ex;
}
}
ResponseBuilder response = Response.ok().status(HttpStatus.SC_OK)
.header("ETag", "" + key.getModificationTime())
.header("Content-Length", key.getDataSize())
.header("Content-Type", "binary/octet-stream");
addLastModifiedDate(response, key);
return response
.build();
}
/**
* Abort multipart upload request.
* @param bucket
* @param key
* @param uploadId
* @return Response
* @throws IOException
* @throws OS3Exception
*/
private Response abortMultipartUpload(String bucket, String key, String
uploadId) throws IOException, OS3Exception {
try {
OzoneBucket ozoneBucket = getBucket(bucket);
ozoneBucket.abortMultipartUpload(key, uploadId);
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
throw S3ErrorTable.newError(S3ErrorTable.NO_SUCH_UPLOAD, uploadId);
}
throw ex;
}
return Response
.status(Status.NO_CONTENT)
.build();
}
/**
* Delete a specific object from a bucket, if query param uploadId is
* specified, this request is for abort multipart upload.
* <p>
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
* https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadAbort.html
* for more details.
*/
@DELETE
@SuppressWarnings("emptyblock")
public Response delete(
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
@QueryParam("uploadId") @DefaultValue("") String uploadId) throws
IOException, OS3Exception {
try {
if (uploadId != null && !uploadId.equals("")) {
return abortMultipartUpload(bucketName, keyPath, uploadId);
}
OzoneBucket bucket = getBucket(bucketName);
bucket.getKey(keyPath);
bucket.deleteKey(keyPath);
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) {
throw S3ErrorTable.newError(S3ErrorTable
.NO_SUCH_BUCKET, bucketName);
} else if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
//NOT_FOUND is not a problem, AWS doesn't throw exception for missing
// keys. Just return 204
} else {
throw ex;
}
}
return Response
.status(Status.NO_CONTENT)
.build();
}
/**
* Initialize MultiPartUpload request.
* <p>
* Note: the specific content type is set by the HeaderPreprocessor.
*/
@POST
@Produces(MediaType.APPLICATION_XML)
@Consumes(HeaderPreprocessor.MULTIPART_UPLOAD_MARKER)
public Response initializeMultipartUpload(
@PathParam("bucket") String bucket,
@PathParam("path") String key
)
throws IOException, OS3Exception {
try {
OzoneBucket ozoneBucket = getBucket(bucket);
String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
ReplicationType replicationType;
ReplicationFactor replicationFactor;
if (storageType == null || storageType.equals("")) {
replicationType = S3StorageType.getDefault().getType();
replicationFactor = S3StorageType.getDefault().getFactor();
} else {
try {
replicationType = S3StorageType.valueOf(storageType).getType();
replicationFactor = S3StorageType.valueOf(storageType).getFactor();
} catch (IllegalArgumentException ex) {
throw S3ErrorTable.newError(S3ErrorTable.INVALID_ARGUMENT,
storageType);
}
}
OmMultipartInfo multipartInfo = ozoneBucket
.initiateMultipartUpload(key, replicationType, replicationFactor);
MultipartUploadInitiateResponse multipartUploadInitiateResponse = new
MultipartUploadInitiateResponse();
multipartUploadInitiateResponse.setBucket(bucket);
multipartUploadInitiateResponse.setKey(key);
multipartUploadInitiateResponse.setUploadID(multipartInfo.getUploadID());
return Response.status(Status.OK).entity(
multipartUploadInitiateResponse).build();
} catch (IOException ex) {
LOG.error("Error in Initiate Multipart Upload Request for bucket: " +
bucket + ", key: " + key, ex);
throw ex;
}
}
/**
* Complete a multipart upload.
*/
@POST
@Produces(MediaType.APPLICATION_XML)
public Response completeMultipartUpload(@PathParam("bucket") String bucket,
@PathParam("path") String key,
@QueryParam("uploadId") @DefaultValue("") String uploadID,
CompleteMultipartUploadRequest multipartUploadRequest)
throws IOException, OS3Exception {
OzoneBucket ozoneBucket = getBucket(bucket);
Map<Integer, String> partsMap = new TreeMap<>();
List<CompleteMultipartUploadRequest.Part> partList =
multipartUploadRequest.getPartList();
for (CompleteMultipartUploadRequest.Part part : partList) {
partsMap.put(part.getPartNumber(), part.geteTag());
}
LOG.debug("Parts map {}", partsMap.toString());
OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo;
try {
omMultipartUploadCompleteInfo = ozoneBucket.completeMultipartUpload(
key, uploadID, partsMap);
CompleteMultipartUploadResponse completeMultipartUploadResponse =
new CompleteMultipartUploadResponse();
completeMultipartUploadResponse.setBucket(bucket);
completeMultipartUploadResponse.setKey(key);
completeMultipartUploadResponse.setETag(omMultipartUploadCompleteInfo
.getHash());
// Location also setting as bucket name.
completeMultipartUploadResponse.setLocation(bucket);
return Response.status(Status.OK).entity(completeMultipartUploadResponse)
.build();
} catch (OMException ex) {
LOG.error("Error in Complete Multipart Upload Request for bucket: " +
bucket + ", key: " + key, ex);
if (ex.getResult() == ResultCodes.MISMATCH_MULTIPART_LIST) {
OS3Exception oex =
S3ErrorTable.newError(S3ErrorTable.INVALID_PART, key);
throw oex;
} else if (ex.getResult() == ResultCodes.MISSING_UPLOAD_PARTS) {
OS3Exception oex =
S3ErrorTable.newError(S3ErrorTable.INVALID_PART_ORDER, key);
throw oex;
} else if (ex.getResult() == ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
OS3Exception os3Exception = S3ErrorTable.newError(NO_SUCH_UPLOAD,
uploadID);
throw os3Exception;
} else if (ex.getResult() == ResultCodes.ENTITY_TOO_SMALL) {
OS3Exception os3Exception = S3ErrorTable.newError(ENTITY_TOO_SMALL,
key);
throw os3Exception;
}
throw ex;
}
}
private Response createMultipartKey(String bucket, String key, long length,
int partNumber, String uploadID,
InputStream body)
throws IOException, OS3Exception {
try {
OzoneBucket ozoneBucket = getBucket(bucket);
OzoneOutputStream ozoneOutputStream = ozoneBucket.createMultipartKey(
key, length, partNumber, uploadID);
String copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
if (copyHeader != null) {
Pair<String, String> result = parseSourceHeader(copyHeader);
String sourceBucket = result.getLeft();
String sourceKey = result.getRight();
try (OzoneInputStream sourceObject =
getBucket(sourceBucket).readKey(sourceKey)) {
String range =
headers.getHeaderString(COPY_SOURCE_HEADER_RANGE);
if (range != null) {
RangeHeader rangeHeader =
RangeHeaderParserUtil.parseRangeHeader(range, 0);
IOUtils.copyLarge(sourceObject, ozoneOutputStream,
rangeHeader.getStartOffset(),
rangeHeader.getEndOffset() - rangeHeader.getStartOffset());
} else {
IOUtils.copy(sourceObject, ozoneOutputStream);
}
}
} else {
IOUtils.copy(body, ozoneOutputStream);
}
ozoneOutputStream.close();
OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
ozoneOutputStream.getCommitUploadPartInfo();
String eTag = omMultipartCommitUploadPartInfo.getPartName();
if (copyHeader != null) {
return Response.ok(new CopyPartResult(eTag)).build();
} else {
return Response.ok().header("ETag",
eTag).build();
}
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
throw S3ErrorTable.newError(NO_SUCH_UPLOAD,
uploadID);
}
throw ex;
}
}
/**
* Returns response for the listParts request.
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadListParts.html
* @param bucket
* @param key
* @param uploadID
* @param partNumberMarker
* @param maxParts
* @return
* @throws IOException
* @throws OS3Exception
*/
private Response listParts(String bucket, String key, String uploadID,
int partNumberMarker, int maxParts) throws IOException, OS3Exception {
ListPartsResponse listPartsResponse = new ListPartsResponse();
try {
OzoneBucket ozoneBucket = getBucket(bucket);
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
ozoneBucket.listParts(key, uploadID, partNumberMarker, maxParts);
listPartsResponse.setBucket(bucket);
listPartsResponse.setKey(key);
listPartsResponse.setUploadID(uploadID);
listPartsResponse.setMaxParts(maxParts);
listPartsResponse.setPartNumberMarker(partNumberMarker);
listPartsResponse.setTruncated(false);
listPartsResponse.setStorageClass(S3StorageType.fromReplicationType(
ozoneMultipartUploadPartListParts.getReplicationType(),
ozoneMultipartUploadPartListParts.getReplicationFactor()).toString());
if (ozoneMultipartUploadPartListParts.isTruncated()) {
listPartsResponse.setTruncated(
ozoneMultipartUploadPartListParts.isTruncated());
listPartsResponse.setNextPartNumberMarker(
ozoneMultipartUploadPartListParts.getNextPartNumberMarker());
}
ozoneMultipartUploadPartListParts.getPartInfoList().forEach(partInfo -> {
ListPartsResponse.Part part = new ListPartsResponse.Part();
part.setPartNumber(partInfo.getPartNumber());
part.setETag(partInfo.getPartName());
part.setSize(partInfo.getSize());
part.setLastModified(Instant.ofEpochMilli(
partInfo.getModificationTime()));
listPartsResponse.addPart(part);
});
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
throw S3ErrorTable.newError(NO_SUCH_UPLOAD,
uploadID);
}
throw ex;
}
return Response.status(Status.OK).entity(listPartsResponse).build();
}
@VisibleForTesting
public void setHeaders(HttpHeaders headers) {
this.headers = headers;
}
private CopyObjectResponse copyObject(String copyHeader,
String destBucket,
String destkey,
ReplicationType replicationType,
ReplicationFactor replicationFactor,
boolean storageTypeDefault)
throws OS3Exception, IOException {
Pair<String, String> result = parseSourceHeader(copyHeader);
String sourceBucket = result.getLeft();
String sourceKey = result.getRight();
OzoneInputStream sourceInputStream = null;
OzoneOutputStream destOutputStream = null;
boolean closed = false;
try {
// Checking whether we trying to copying to it self.
if (sourceBucket.equals(destBucket)) {
if (sourceKey.equals(destkey)) {
// When copying to same storage type when storage type is provided,
// we should not throw exception, as aws cli checks if any of the
// options like storage type are provided or not when source and
// dest are given same
if (storageTypeDefault) {
OS3Exception ex = S3ErrorTable.newError(S3ErrorTable
.INVALID_REQUEST, copyHeader);
ex.setErrorMessage("This copy request is illegal because it is " +
"trying to copy an object to it self itself without changing " +
"the object's metadata, storage class, website redirect " +
"location or encryption attributes.");
throw ex;
} else {
// TODO: Actually here we should change storage type, as ozone
// still does not support this just returning dummy response
// for now
CopyObjectResponse copyObjectResponse = new CopyObjectResponse();
copyObjectResponse.setETag(OzoneUtils.getRequestID());
copyObjectResponse.setLastModified(Instant.ofEpochMilli(
Time.now()));
return copyObjectResponse;
}
}
}
OzoneBucket sourceOzoneBucket = getBucket(sourceBucket);
OzoneBucket destOzoneBucket = getBucket(destBucket);
OzoneKeyDetails sourceKeyDetails = sourceOzoneBucket.getKey(sourceKey);
long sourceKeyLen = sourceKeyDetails.getDataSize();
sourceInputStream = sourceOzoneBucket.readKey(sourceKey);
destOutputStream = destOzoneBucket.createKey(destkey, sourceKeyLen,
replicationType, replicationFactor, new HashMap<>());
IOUtils.copy(sourceInputStream, destOutputStream);
// Closing here, as if we don't call close this key will not commit in
// OM, and getKey fails.
sourceInputStream.close();
destOutputStream.close();
closed = true;
OzoneKeyDetails destKeyDetails = destOzoneBucket.getKey(destkey);
CopyObjectResponse copyObjectResponse = new CopyObjectResponse();
copyObjectResponse.setETag(OzoneUtils.getRequestID());
copyObjectResponse.setLastModified(Instant.ofEpochMilli(destKeyDetails
.getModificationTime()));
return copyObjectResponse;
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
throw S3ErrorTable.newError(S3ErrorTable.NO_SUCH_KEY, sourceKey);
} else if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) {
throw S3ErrorTable.newError(S3ErrorTable.NO_SUCH_BUCKET, sourceBucket);
}
throw ex;
} finally {
if (!closed) {
if (sourceInputStream != null) {
sourceInputStream.close();
}
if (destOutputStream != null) {
destOutputStream.close();
}
}
}
}
/**
* Parse the key and bucket name from copy header.
*/
@VisibleForTesting
public static Pair<String, String> parseSourceHeader(String copyHeader)
throws OS3Exception {
String header = copyHeader;
if (header.startsWith("/")) {
header = copyHeader.substring(1);
}
int pos = header.indexOf("/");
if (pos == -1) {
OS3Exception ex = S3ErrorTable.newError(S3ErrorTable
.INVALID_ARGUMENT, header);
ex.setErrorMessage("Copy Source must mention the source bucket and " +
"key: sourcebucket/sourcekey");
throw ex;
}
return Pair.of(header.substring(0, pos), header.substring(pos + 1));
}
}