blob: d9004c0c13a09aa5011562f94b65364d1847719e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.request.s3.multipart;
import com.google.common.base.Optional;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMReplayException;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.s3.multipart
.S3MultipartUploadCommitPartResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartCommitUploadPartRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartCommitUploadPartResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
/**
* Handle Multipart upload commit upload part file.
*/
public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
private static final Logger LOG =
LoggerFactory.getLogger(S3MultipartUploadCommitPartRequest.class);
public S3MultipartUploadCommitPartRequest(OMRequest omRequest) {
super(omRequest);
}
@Override
public OMRequest preExecute(OzoneManager ozoneManager) {
MultipartCommitUploadPartRequest multipartCommitUploadPartRequest =
getOmRequest().getCommitMultiPartUploadRequest();
return getOmRequest().toBuilder().setCommitMultiPartUploadRequest(
multipartCommitUploadPartRequest.toBuilder()
.setKeyArgs(multipartCommitUploadPartRequest.getKeyArgs()
.toBuilder().setModificationTime(Time.now())))
.setUserInfo(getUserInfo()).build();
}
@Override
@SuppressWarnings("methodlength")
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
MultipartCommitUploadPartRequest multipartCommitUploadPartRequest =
getOmRequest().getCommitMultiPartUploadRequest();
KeyArgs keyArgs = multipartCommitUploadPartRequest.getKeyArgs();
String volumeName = keyArgs.getVolumeName();
String bucketName = keyArgs.getBucketName();
String keyName = keyArgs.getKeyName();
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
ozoneManager.getMetrics().incNumCommitMultipartUploadParts();
boolean acquiredLock = false;
IOException exception = null;
String partName = null;
OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
getOmRequest());
OMClientResponse omClientResponse = null;
OzoneManagerProtocolProtos.PartKeyInfo oldPartKeyInfo = null;
String openKey = null;
OmKeyInfo omKeyInfo = null;
String multipartKey = null;
OmMultipartKeyInfo multipartKeyInfo = null;
Result result = null;
try {
// TODO to support S3 ACL later.
acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
volumeName, bucketName);
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
String uploadID = keyArgs.getMultipartUploadID();
multipartKey = omMetadataManager.getMultipartKey(volumeName,
bucketName, keyName, uploadID);
multipartKeyInfo = omMetadataManager.getMultipartInfoTable()
.get(multipartKey);
long clientID = multipartCommitUploadPartRequest.getClientID();
openKey = omMetadataManager.getOpenKey(volumeName, bucketName, keyName,
clientID);
omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
if (omKeyInfo == null) {
// Check the KeyTable if this transaction is a replay of ratis logs.
String ozoneKey = omMetadataManager.getOzoneKey(volumeName,
bucketName, keyName);
OmKeyInfo dbKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey);
if (dbKeyInfo != null) {
if (isReplay(ozoneManager, dbKeyInfo, trxnLogIndex)) {
throw new OMReplayException();
}
}
throw new OMException("Failed to commit Multipart Upload key, as " +
openKey + "entry is not found in the openKey table",
KEY_NOT_FOUND);
}
// set the data size and location info list
omKeyInfo.setDataSize(keyArgs.getDataSize());
omKeyInfo.updateLocationInfoList(keyArgs.getKeyLocationsList().stream()
.map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList()));
// Set Modification time
omKeyInfo.setModificationTime(keyArgs.getModificationTime());
// Set the UpdateID to current transactionLogIndex
omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
partName = omMetadataManager.getOzoneKey(volumeName, bucketName,
keyName) + clientID;
if (multipartKeyInfo == null) {
// This can occur when user started uploading part by the time commit
// of that part happens, in between the user might have requested
// abort multipart upload. If we just throw exception, then the data
// will not be garbage collected, so move this part to delete table
// and throw error
// Move this part to delete table.
throw new OMException("No such Multipart upload is with specified " +
"uploadId " + uploadID,
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}
int partNumber = keyArgs.getMultipartNumber();
oldPartKeyInfo = multipartKeyInfo.getPartKeyInfo(partNumber);
// Build this multipart upload part info.
OzoneManagerProtocolProtos.PartKeyInfo.Builder partKeyInfo =
OzoneManagerProtocolProtos.PartKeyInfo.newBuilder();
partKeyInfo.setPartName(partName);
partKeyInfo.setPartNumber(partNumber);
partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf());
// Add this part information in to multipartKeyInfo.
multipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo.build());
// Set the UpdateID to current transactionLogIndex
multipartKeyInfo.setUpdateID(trxnLogIndex,
ozoneManager.isRatisEnabled());
// OldPartKeyInfo will be deleted. Its updateID will be set in
// S3MultipartUplodaCommitPartResponse before being added to
// DeletedKeyTable.
// Add to cache.
// Delete from open key table and add it to multipart info table.
// No need to add cache entries to delete table, as no
// read/write requests that info for validation.
omMetadataManager.getMultipartInfoTable().addCacheEntry(
new CacheKey<>(multipartKey),
new CacheValue<>(Optional.of(multipartKeyInfo),
trxnLogIndex));
omMetadataManager.getOpenKeyTable().addCacheEntry(
new CacheKey<>(openKey),
new CacheValue<>(Optional.absent(), trxnLogIndex));
omResponse.setCommitMultiPartUploadResponse(
MultipartCommitUploadPartResponse.newBuilder()
.setPartName(partName));
omClientResponse = new S3MultipartUploadCommitPartResponse(
omResponse.build(), multipartKey, openKey,
multipartKeyInfo, oldPartKeyInfo, ozoneManager.isRatisEnabled());
result = Result.SUCCESS;
} catch (IOException ex) {
if (ex instanceof OMReplayException) {
result = Result.REPLAY;
omClientResponse = new S3MultipartUploadCommitPartResponse(
createReplayOMResponse(omResponse));
} else {
result = Result.FAILURE;
exception = ex;
omClientResponse = new S3MultipartUploadCommitPartResponse(
createErrorOMResponse(omResponse, exception), openKey, omKeyInfo,
ozoneManager.isRatisEnabled());
}
} finally {
addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
omDoubleBufferHelper);
if (acquiredLock) {
omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
bucketName);
}
}
// audit log
auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY,
buildAuditMap(keyArgs, partName), exception,
getOmRequest().getUserInfo()));
switch (result) {
case REPLAY:
LOG.debug("Replayed Transaction {} ignored. Request: {}",
trxnLogIndex, multipartCommitUploadPartRequest);
break;
case SUCCESS:
LOG.debug("MultipartUpload Commit is successfully for Key:{} in " +
"Volume/Bucket {}/{}", keyName, volumeName, bucketName);
break;
case FAILURE:
ozoneManager.getMetrics().incNumCommitMultipartUploadPartFails();
LOG.error("MultipartUpload Commit is failed for Key:{} in " +
"Volume/Bucket {}/{}", keyName, volumeName, bucketName, exception);
break;
default:
LOG.error("Unrecognized Result for S3MultipartUploadCommitPartRequest: " +
"{}", multipartCommitUploadPartRequest);
}
return omClientResponse;
}
private Map<String, String> buildAuditMap(KeyArgs keyArgs, String partName) {
Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
// Add MPU related information.
auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NUMBER,
String.valueOf(keyArgs.getMultipartNumber()));
auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NAME, partName);
return auditMap;
}
}