blob: 5a2359560aa79b264d9c321266e9b861ce57bd3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.request.s3.multipart;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.nio.file.InvalidPathException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.ozone.om.OzoneConfigUtil;
import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequestWithFSO;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.request.validation.RequestFeatureValidator;
import org.apache.hadoop.ozone.om.request.validation.RequestProcessingPhase;
import org.apache.hadoop.ozone.om.request.validation.ValidationCondition;
import org.apache.hadoop.ozone.om.request.validation.ValidationContext;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadCompleteResponse;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import jakarta.annotation.Nullable;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
/**
* Handle Multipart upload complete request.
*/
public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
private static final Logger LOG =
LoggerFactory.getLogger(S3MultipartUploadCompleteRequest.class);
private BiFunction<OzoneManagerProtocolProtos.Part, PartKeyInfo, MultipartCommitRequestPart> eTagBasedValidator =
(part, partKeyInfo) -> {
String eTag = part.getETag();
AtomicReference<String> dbPartETag = new AtomicReference<>();
String dbPartName = null;
if (partKeyInfo != null) {
partKeyInfo.getPartKeyInfo().getMetadataList()
.stream()
.filter(keyValue -> keyValue.getKey().equals(OzoneConsts.ETAG))
.findFirst().ifPresent(kv -> dbPartETag.set(kv.getValue()));
dbPartName = partKeyInfo.getPartName();
}
return new MultipartCommitRequestPart(eTag, partKeyInfo == null ? null :
dbPartETag.get(), StringUtils.equals(eTag, dbPartETag.get()) || StringUtils.equals(eTag, dbPartName));
};
private BiFunction<OzoneManagerProtocolProtos.Part, PartKeyInfo, MultipartCommitRequestPart> partNameBasedValidator =
(part, partKeyInfo) -> {
String partName = part.getPartName();
String dbPartName = null;
if (partKeyInfo != null) {
dbPartName = partKeyInfo.getPartName();
}
return new MultipartCommitRequestPart(partName, partKeyInfo == null ? null :
dbPartName, StringUtils.equals(partName, dbPartName));
};
public S3MultipartUploadCompleteRequest(OMRequest omRequest,
BucketLayout bucketLayout) {
super(omRequest, bucketLayout);
}
@Override
public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
MultipartUploadCompleteRequest multipartUploadCompleteRequest =
super.preExecute(ozoneManager).getCompleteMultiPartUploadRequest();
KeyArgs keyArgs = multipartUploadCompleteRequest.getKeyArgs();
String keyPath = keyArgs.getKeyName();
keyPath = validateAndNormalizeKey(ozoneManager.getEnableFileSystemPaths(),
keyPath, getBucketLayout());
KeyArgs newKeyArgs = keyArgs.toBuilder().setModificationTime(Time.now())
.setKeyName(keyPath).build();
KeyArgs resolvedArgs = resolveBucketAndCheckKeyAcls(newKeyArgs,
ozoneManager, ACLType.WRITE);
return getOmRequest().toBuilder().setCompleteMultiPartUploadRequest(
multipartUploadCompleteRequest.toBuilder().setKeyArgs(
resolvedArgs)).setUserInfo(getUserInfo()).build();
}
@Override
@SuppressWarnings("methodlength")
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIndex termIndex) {
final long trxnLogIndex = termIndex.getIndex();
MultipartUploadCompleteRequest multipartUploadCompleteRequest =
getOmRequest().getCompleteMultiPartUploadRequest();
KeyArgs keyArgs = multipartUploadCompleteRequest.getKeyArgs();
List<OzoneManagerProtocolProtos.Part> partsList =
multipartUploadCompleteRequest.getPartsListList();
Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
String volumeName = keyArgs.getVolumeName();
String bucketName = keyArgs.getBucketName();
final String requestedVolume = volumeName;
final String requestedBucket = bucketName;
String keyName = keyArgs.getKeyName();
String uploadID = keyArgs.getMultipartUploadID();
String multipartKey = null;
ozoneManager.getMetrics().incNumCompleteMultipartUploads();
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
boolean acquiredLock = false;
OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
getOmRequest());
OMClientResponse omClientResponse = null;
Exception exception = null;
Result result = null;
try {
multipartKey = omMetadataManager.getMultipartKey(volumeName,
bucketName, keyName, uploadID);
mergeOmLockDetails(omMetadataManager.getLock()
.acquireWriteLock(BUCKET_LOCK, volumeName, bucketName));
acquiredLock = getOmLockDetails().isLockAcquired();
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);
List<OmDirectoryInfo> missingParentInfos;
OMFileRequest.OMPathInfoWithFSO pathInfoFSO = OMFileRequest
.verifyDirectoryKeysInPath(omMetadataManager, volumeName, bucketName,
keyName, Paths.get(keyName));
missingParentInfos = OMDirectoryCreateRequestWithFSO
.getAllMissingParentDirInfo(ozoneManager, keyArgs, omBucketInfo,
pathInfoFSO, trxnLogIndex);
if (missingParentInfos != null) {
final long volumeId = omMetadataManager.getVolumeId(volumeName);
final long bucketId = omMetadataManager.getBucketId(volumeName,
bucketName);
// add all missing parents to directory table
addMissingParentsToCache(omBucketInfo, missingParentInfos,
omMetadataManager, volumeId, bucketId, trxnLogIndex);
String multipartOpenKey = omMetadataManager
.getMultipartKey(volumeId, bucketId,
pathInfoFSO.getLastKnownParentId(),
pathInfoFSO.getLeafNodeName(),
keyArgs.getMultipartUploadID());
if (getOmKeyInfoFromOpenKeyTable(multipartOpenKey,
keyName, omMetadataManager) == null) {
final ReplicationConfig replicationConfig = OzoneConfigUtil
.resolveReplicationConfigPreference(keyArgs.getType(),
keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
omBucketInfo != null ?
omBucketInfo.getDefaultReplicationConfig() :
null, ozoneManager);
OmMultipartKeyInfo multipartKeyInfoFromArgs =
new OmMultipartKeyInfo.Builder()
.setUploadID(keyArgs.getMultipartUploadID())
.setCreationTime(keyArgs.getModificationTime())
.setReplicationConfig(replicationConfig)
.setObjectID(pathInfoFSO.getLeafNodeObjectId())
.setUpdateID(trxnLogIndex)
.setParentID(pathInfoFSO.getLastKnownParentId())
.build();
OmKeyInfo keyInfoFromArgs = new OmKeyInfo.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setCreationTime(keyArgs.getModificationTime())
.setModificationTime(keyArgs.getModificationTime())
.setReplicationConfig(replicationConfig)
.setOmKeyLocationInfos(Collections.singletonList(
new OmKeyLocationInfoGroup(0, new ArrayList<>(), true)))
.setAcls(getAclsForKey(keyArgs, omBucketInfo, pathInfoFSO,
ozoneManager.getPrefixManager()))
.setObjectID(pathInfoFSO.getLeafNodeObjectId())
.setUpdateID(trxnLogIndex)
.setFileEncryptionInfo(keyArgs.hasFileEncryptionInfo() ?
OMPBHelper.convert(keyArgs.getFileEncryptionInfo()) : null)
.setParentObjectID(pathInfoFSO.getLastKnownParentId())
.build();
// Add missing multi part info to open key table
addMultiPartToCache(omMetadataManager, multipartOpenKey,
pathInfoFSO, keyInfoFromArgs, trxnLogIndex);
}
}
String dbMultipartOpenKey =
getDBMultipartOpenKey(volumeName, bucketName, keyName, uploadID,
omMetadataManager);
OmMultipartKeyInfo multipartKeyInfo = omMetadataManager
.getMultipartInfoTable().get(multipartKey);
String ozoneKey = omMetadataManager.getOzoneKey(
volumeName, bucketName, keyName);
String dbOzoneKey =
getDBOzoneKey(omMetadataManager, volumeName, bucketName, keyName);
// Check for directory exists with same name for the LEGACY_FS,
// if it exists throw error.
checkDirectoryAlreadyExists(ozoneManager, omBucketInfo, keyName,
omMetadataManager);
if (multipartKeyInfo == null) {
throw new OMException(
failureMessage(requestedVolume, requestedBucket, keyName),
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}
if (partsList.size() > 0) {
final OmMultipartKeyInfo.PartKeyInfoMap partKeyInfoMap
= multipartKeyInfo.getPartKeyInfoMap();
if (partKeyInfoMap.size() == 0) {
LOG.error("Complete MultipartUpload failed for key {} , MPU Key has" +
" no parts in OM, parts given to upload are {}", ozoneKey,
partsList);
throw new OMException(
failureMessage(requestedVolume, requestedBucket, keyName),
OMException.ResultCodes.INVALID_PART);
}
// First Check for Invalid Part Order.
List< Integer > partNumbers = new ArrayList<>();
int partsListSize = getPartsListSize(requestedVolume,
requestedBucket, keyName, ozoneKey, partNumbers, partsList);
List<OmKeyLocationInfo> partLocationInfos = new ArrayList<>();
long dataSize = getMultipartDataSize(requestedVolume, requestedBucket,
keyName, ozoneKey, partKeyInfoMap, partsListSize,
partLocationInfos, partsList, ozoneManager);
// All parts have same replication information. Here getting from last
// part.
OmKeyInfo omKeyInfo =
getOmKeyInfo(ozoneManager, trxnLogIndex, keyArgs, volumeName,
bucketName, keyName, dbMultipartOpenKey, omMetadataManager,
dbOzoneKey, partKeyInfoMap, partLocationInfos, dataSize);
long usedBytesDiff = 0;
//Find all unused parts.
List<OmKeyInfo> allKeyInfoToRemove = new ArrayList<>();
for (PartKeyInfo partKeyInfo : partKeyInfoMap) {
if (!partNumbers.contains(partKeyInfo.getPartNumber())) {
OmKeyInfo delPartKeyInfo =
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
allKeyInfoToRemove.add(delPartKeyInfo);
usedBytesDiff -= delPartKeyInfo.getReplicatedSize();
}
}
// If bucket versioning is turned on during the update, between key
// creation and key commit, old versions will be just overwritten and
// not kept. Bucket versioning will be effective from the first key
// creation after the knob turned on.
OmKeyInfo keyToDelete =
omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
boolean isNamespaceUpdate = false;
if (keyToDelete != null && !omBucketInfo.getIsVersionEnabled()) {
RepeatedOmKeyInfo oldKeyVersionsToDelete = getOldVersionsToCleanUp(
keyToDelete, trxnLogIndex, ozoneManager.isRatisEnabled());
allKeyInfoToRemove.addAll(oldKeyVersionsToDelete.getOmKeyInfoList());
usedBytesDiff -= keyToDelete.getReplicatedSize();
} else {
checkBucketQuotaInNamespace(omBucketInfo, 1L);
omBucketInfo.incrUsedNamespace(1L);
isNamespaceUpdate = true;
}
String dbBucketKey = omMetadataManager.getBucketKey(
omBucketInfo.getVolumeName(), omBucketInfo.getBucketName());
if (usedBytesDiff != 0) {
omBucketInfo.incrUsedBytes(usedBytesDiff);
} else if (!isNamespaceUpdate) {
// If no bucket size and Namespace changed, prevent from updating
// bucket object.
omBucketInfo = null;
}
updateCache(omMetadataManager, dbBucketKey, omBucketInfo, dbOzoneKey,
dbMultipartOpenKey, multipartKey, omKeyInfo, trxnLogIndex);
omResponse.setCompleteMultiPartUploadResponse(
MultipartUploadCompleteResponse.newBuilder()
.setVolume(requestedVolume)
.setBucket(requestedBucket)
.setKey(keyName)
.setHash(omKeyInfo.getMetadata().get(OzoneConsts.ETAG)));
long volumeId = omMetadataManager.getVolumeId(volumeName);
long bucketId = omMetadataManager.getBucketId(volumeName, bucketName);
omClientResponse =
getOmClientResponse(multipartKey, omResponse, dbMultipartOpenKey,
omKeyInfo, allKeyInfoToRemove, omBucketInfo,
volumeId, bucketId, missingParentInfos, multipartKeyInfo);
result = Result.SUCCESS;
} else {
throw new OMException(
failureMessage(requestedVolume, requestedBucket, keyName) +
" because of empty part list",
OMException.ResultCodes.INVALID_REQUEST);
}
} catch (IOException | InvalidPathException ex) {
result = Result.FAILURE;
exception = ex;
omClientResponse = getOmClientResponse(omResponse, exception);
} finally {
if (acquiredLock) {
mergeOmLockDetails(omMetadataManager.getLock()
.releaseWriteLock(BUCKET_LOCK, volumeName, bucketName));
}
if (omClientResponse != null) {
omClientResponse.setOmLockDetails(getOmLockDetails());
}
}
logResult(ozoneManager, multipartUploadCompleteRequest, partsList,
auditMap, volumeName, bucketName, keyName, exception, result);
return omClientResponse;
}
protected S3MultipartUploadCompleteResponse getOmClientResponse(
OMResponse.Builder omResponse, Exception exception) {
return new S3MultipartUploadCompleteResponse(
createErrorOMResponse(omResponse, exception), getBucketLayout());
}
@SuppressWarnings("parameternumber")
protected OMClientResponse getOmClientResponse(String multipartKey,
OMResponse.Builder omResponse, String dbMultipartOpenKey,
OmKeyInfo omKeyInfo, List<OmKeyInfo> allKeyInfoToRemove,
OmBucketInfo omBucketInfo,
long volumeId, long bucketId, List<OmDirectoryInfo> missingParentInfos,
OmMultipartKeyInfo multipartKeyInfo) {
return new S3MultipartUploadCompleteResponse(omResponse.build(),
multipartKey, dbMultipartOpenKey, omKeyInfo, allKeyInfoToRemove,
getBucketLayout(), omBucketInfo);
}
protected void checkDirectoryAlreadyExists(OzoneManager ozoneManager,
OmBucketInfo omBucketInfo, String keyName,
OMMetadataManager omMetadataManager) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("BucketName: {}, BucketLayout: {}",
omBucketInfo.getBucketName(), omBucketInfo.getBucketLayout());
}
if (omBucketInfo.getBucketLayout()
.shouldNormalizePaths(ozoneManager.getEnableFileSystemPaths())) {
if (checkDirectoryAlreadyExists(omBucketInfo.getVolumeName(),
omBucketInfo.getBucketName(), keyName, omMetadataManager)) {
throw new OMException("Can not Complete MPU for file: " + keyName +
" as there is already directory in the given path",
NOT_A_FILE);
}
}
}
@SuppressWarnings("checkstyle:ParameterNumber")
protected void logResult(OzoneManager ozoneManager,
MultipartUploadCompleteRequest multipartUploadCompleteRequest,
List<OzoneManagerProtocolProtos.Part> partsList,
Map<String, String> auditMap, String volumeName,
String bucketName, String keyName, Exception exception,
Result result) {
auditMap.put(OzoneConsts.MULTIPART_LIST, partsList.toString()
.replaceAll("\\n", " "));
// audit log
auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
OMAction.COMPLETE_MULTIPART_UPLOAD, auditMap, exception,
getOmRequest().getUserInfo()));
switch (result) {
case SUCCESS:
LOG.debug("MultipartUpload Complete request is successful for Key: {} " +
"in Volume/Bucket {}/{}", keyName, volumeName, bucketName);
break;
case FAILURE:
ozoneManager.getMetrics().incNumCompleteMultipartUploadFails();
LOG.error("MultipartUpload Complete request failed for Key: {} " +
"in Volume/Bucket {}/{}", keyName, volumeName, bucketName,
exception);
break;
default:
LOG.error("Unrecognized Result for S3MultipartUploadCommitRequest: {}",
multipartUploadCompleteRequest);
}
}
@SuppressWarnings("checkstyle:ParameterNumber")
protected OmKeyInfo getOmKeyInfo(OzoneManager ozoneManager, long trxnLogIndex,
KeyArgs keyArgs, String volumeName, String bucketName, String keyName,
String multipartOpenKey, OMMetadataManager omMetadataManager,
String ozoneKey, OmMultipartKeyInfo.PartKeyInfoMap partKeyInfoMap,
List<OmKeyLocationInfo> partLocationInfos, long dataSize)
throws IOException {
OzoneManagerProtocolProtos.KeyInfo partKeyInfo =
partKeyInfoMap.lastEntry().getPartKeyInfo();
OmKeyInfo omKeyInfo = getOmKeyInfoFromKeyTable(ozoneKey, keyName,
omMetadataManager);
if (omKeyInfo == null) {
// This is a newly added key, it does not have any versions.
OmKeyLocationInfoGroup keyLocationInfoGroup = new
OmKeyLocationInfoGroup(0, partLocationInfos, true);
// Get the objectID of the key from OpenKeyTable
OmKeyInfo dbOpenKeyInfo = getOmKeyInfoFromOpenKeyTable(multipartOpenKey,
keyName, omMetadataManager);
// A newly created key, this is the first version.
OmKeyInfo.Builder builder =
new OmKeyInfo.Builder().setVolumeName(volumeName)
.setBucketName(bucketName).setKeyName(dbOpenKeyInfo.getKeyName())
.setReplicationConfig(ReplicationConfig.fromProto(
partKeyInfo.getType(), partKeyInfo.getFactor(),
partKeyInfo.getEcReplicationConfig()))
.setCreationTime(keyArgs.getModificationTime())
.setModificationTime(keyArgs.getModificationTime())
.setDataSize(dataSize)
.setFileEncryptionInfo(dbOpenKeyInfo.getFileEncryptionInfo())
.setOmKeyLocationInfos(
Collections.singletonList(keyLocationInfoGroup))
.setAcls(dbOpenKeyInfo.getAcls())
.addMetadata(OzoneConsts.ETAG,
multipartUploadedKeyHash(partKeyInfoMap));
// Check if db entry has ObjectID. This check is required because
// it is possible that between multipart key uploads and complete,
// we had an upgrade.
if (dbOpenKeyInfo.getObjectID() != 0) {
builder.setObjectID(dbOpenKeyInfo.getObjectID());
}
updatePrefixFSOInfo(dbOpenKeyInfo, builder);
omKeyInfo = builder.build();
} else {
OmKeyInfo dbOpenKeyInfo = getOmKeyInfoFromOpenKeyTable(multipartOpenKey,
keyName, omMetadataManager);
// Already a version exists, so we should add it as a new version.
// But now as versioning is not supported, just following the commit
// key approach. When versioning support comes, then we can uncomment
// below code keyInfo.addNewVersion(locations);
// As right now versioning is not supported, we can set encryption info
// at KeyInfo level, but once we start supporting versioning,
// encryption info needs to be set at KeyLocation level, as each version
// will have it's own file encryption info.
if (dbOpenKeyInfo.getFileEncryptionInfo() != null) {
omKeyInfo.setFileEncryptionInfo(dbOpenKeyInfo.getFileEncryptionInfo());
}
omKeyInfo.updateLocationInfoList(partLocationInfos, true, true);
omKeyInfo.setModificationTime(keyArgs.getModificationTime());
omKeyInfo.setDataSize(dataSize);
omKeyInfo.setReplicationConfig(dbOpenKeyInfo.getReplicationConfig());
omKeyInfo.getMetadata().put(OzoneConsts.ETAG,
multipartUploadedKeyHash(partKeyInfoMap));
}
omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
return omKeyInfo;
}
protected void updatePrefixFSOInfo(OmKeyInfo dbOpenKeyInfo,
OmKeyInfo.Builder builder) {
// FSO is disabled. Do nothing.
}
protected String getDBOzoneKey(OMMetadataManager omMetadataManager,
String volumeName, String bucketName, String keyName) throws IOException {
return omMetadataManager.getOzoneKey(volumeName, bucketName, keyName);
}
protected void addMissingParentsToCache(OmBucketInfo omBucketInfo,
List<OmDirectoryInfo> missingParentInfos,
OMMetadataManager omMetadataManager,
long volumeId, long bucketId, long transactionLogIndex
) throws IOException {
// FSO is disabled. Do nothing.
}
protected void addMultiPartToCache(
OMMetadataManager omMetadataManager, String multipartOpenKey,
OMFileRequest.OMPathInfoWithFSO pathInfoFSO, OmKeyInfo omKeyInfo,
long transactionLogIndex
) throws IOException {
// FSO is disabled. Do nothing.
}
protected OmKeyInfo getOmKeyInfoFromKeyTable(String dbOzoneKey,
String keyName, OMMetadataManager omMetadataManager) throws IOException {
return omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
}
protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey,
String keyName, OMMetadataManager omMetadataManager) throws IOException {
return omMetadataManager.getOpenKeyTable(getBucketLayout())
.get(dbMultipartKey);
}
protected void addKeyTableCacheEntry(OMMetadataManager omMetadataManager,
String dbOzoneKey, OmKeyInfo omKeyInfo, long transactionLogIndex)
throws IOException {
// Add key entry to file table.
omMetadataManager.getKeyTable(getBucketLayout())
.addCacheEntry(new CacheKey<>(dbOzoneKey),
CacheValue.get(transactionLogIndex, omKeyInfo));
}
private int getPartsListSize(String requestedVolume,
String requestedBucket, String keyName, String ozoneKey,
List<Integer> partNumbers,
List<OzoneManagerProtocolProtos.Part> partsList) throws OMException {
int prevPartNumber = partsList.get(0).getPartNumber();
int partsListSize = partsList.size();
partNumbers.add(prevPartNumber);
for (int i = 1; i < partsListSize; i++) {
int currentPartNumber = partsList.get(i).getPartNumber();
if (prevPartNumber >= currentPartNumber) {
LOG.error("PartNumber at index {} is {}, and its previous " +
"partNumber at index {} is {} for ozonekey is " +
"{}", i, currentPartNumber, i - 1, prevPartNumber,
ozoneKey);
throw new OMException(
failureMessage(requestedVolume, requestedBucket, keyName) +
" because parts are in Invalid order.",
OMException.ResultCodes.INVALID_PART_ORDER);
}
prevPartNumber = currentPartNumber;
partNumbers.add(prevPartNumber);
}
return partsListSize;
}
@SuppressWarnings("checkstyle:ParameterNumber")
private long getMultipartDataSize(String requestedVolume,
String requestedBucket, String keyName, String ozoneKey,
OmMultipartKeyInfo.PartKeyInfoMap partKeyInfoMap,
int partsListSize, List<OmKeyLocationInfo> partLocationInfos,
List<OzoneManagerProtocolProtos.Part> partsList,
OzoneManager ozoneManager) throws OMException {
long dataSize = 0;
int currentPartCount = 0;
boolean eTagBasedValidationAvailable = partsList.stream().allMatch(OzoneManagerProtocolProtos.Part::hasETag);
// Now do actual logic, and check for any Invalid part during this.
for (OzoneManagerProtocolProtos.Part part : partsList) {
currentPartCount++;
int partNumber = part.getPartNumber();
PartKeyInfo partKeyInfo = partKeyInfoMap.get(partNumber);
MultipartCommitRequestPart requestPart = eTagBasedValidationAvailable ?
eTagBasedValidator.apply(part, partKeyInfo) : partNameBasedValidator.apply(part, partKeyInfo);
if (!requestPart.isValid()) {
throw new OMException(
failureMessage(requestedVolume, requestedBucket, keyName) +
". Provided Part info is { " + requestPart.getRequestPartId() + ", " + partNumber +
"}, whereas OM has eTag " + requestPart.getOmPartId(),
OMException.ResultCodes.INVALID_PART);
}
OmKeyInfo currentPartKeyInfo = null;
try {
currentPartKeyInfo =
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
} catch (IOException ioe) {
throw new OMException(ioe, OMException.ResultCodes.INTERNAL_ERROR);
}
// Except for last part all parts should have minimum size.
if (currentPartCount != partsListSize) {
if (currentPartKeyInfo.getDataSize() <
ozoneManager.getMinMultipartUploadPartSize()) {
LOG.error("MultipartUpload: {} Part number: {} size {} is less" +
" than minimum part size {}", ozoneKey,
partKeyInfo.getPartNumber(), currentPartKeyInfo.getDataSize(),
ozoneManager.getMinMultipartUploadPartSize());
throw new OMException(
failureMessage(requestedVolume, requestedBucket, keyName) +
". Entity too small.",
OMException.ResultCodes.ENTITY_TOO_SMALL);
}
}
// As all part keys will have only one version.
OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo
.getKeyLocationVersions().get(0);
// Set partNumber in each block.
currentKeyInfoGroup.getLocationList().forEach(
omKeyLocationInfo -> omKeyLocationInfo.setPartNumber(partNumber));
partLocationInfos.addAll(currentKeyInfoGroup.getLocationList());
dataSize += currentPartKeyInfo.getDataSize();
}
return dataSize;
}
private static String failureMessage(String volume, String bucket,
String keyName) {
return "Complete Multipart Upload Failed: volume: " +
volume + " bucket: " + bucket + " key: " + keyName;
}
@SuppressWarnings("parameternumber")
private void updateCache(OMMetadataManager omMetadataManager,
String dbBucketKey, @Nullable OmBucketInfo omBucketInfo,
String dbOzoneKey, String dbMultipartOpenKey, String dbMultipartKey,
OmKeyInfo omKeyInfo, long transactionLogIndex) throws IOException {
// Update cache.
// 1. Add key entry to key table.
// 2. Delete multipartKey entry from openKeyTable and multipartInfo table.
// 3. If the bucket size has changed (omBucketInfo is not null),
// update bucket cache
addKeyTableCacheEntry(omMetadataManager, dbOzoneKey, omKeyInfo,
transactionLogIndex);
omMetadataManager.getOpenKeyTable(getBucketLayout())
.addCacheEntry(
new CacheKey<>(dbMultipartOpenKey),
CacheValue.get(transactionLogIndex));
omMetadataManager.getMultipartInfoTable().addCacheEntry(
new CacheKey<>(dbMultipartKey),
CacheValue.get(transactionLogIndex));
// Here, omBucketInfo can be null if its size has not changed. No need to
// update the bucket info unless its size has changed. We never want to
// delete the bucket info here, but just avoiding unnecessary update.
if (omBucketInfo != null) {
omMetadataManager.getBucketTable().addCacheEntry(
new CacheKey<>(dbBucketKey),
CacheValue.get(transactionLogIndex, omBucketInfo));
}
}
@RequestFeatureValidator(
conditions = ValidationCondition.CLUSTER_NEEDS_FINALIZATION,
processingPhase = RequestProcessingPhase.PRE_PROCESS,
requestType = Type.CompleteMultiPartUpload
)
public static OMRequest
disallowCompleteMultiPartUploadWithECReplicationConfig(
OMRequest req, ValidationContext ctx) throws OMException {
if (!ctx.versionManager().isAllowed(
OMLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT)) {
if (req.getCompleteMultiPartUploadRequest().getKeyArgs()
.hasEcReplicationConfig()) {
throw new OMException("Cluster does not have the Erasure Coded"
+ " Storage support feature finalized yet, but the request contains"
+ " an Erasure Coded replication type. Rejecting the request,"
+ " please finalize the cluster upgrade and then try again.",
OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION);
}
}
return req;
}
/**
* Validates S3 MPU complete requests.
* We do not want to allow older clients to upload MPU keys to buckets which
* use non LEGACY layouts.
*
* @param req - the request to validate
* @param ctx - the validation context
* @return the validated request
* @throws OMException if the request is invalid
*/
@RequestFeatureValidator(
conditions = ValidationCondition.OLDER_CLIENT_REQUESTS,
processingPhase = RequestProcessingPhase.PRE_PROCESS,
requestType = Type.CompleteMultiPartUpload
)
public static OMRequest blockMPUCompleteWithBucketLayoutFromOldClient(
OMRequest req, ValidationContext ctx) throws IOException {
if (req.getCompleteMultiPartUploadRequest().hasKeyArgs()) {
KeyArgs keyArgs = req.getCompleteMultiPartUploadRequest().getKeyArgs();
if (keyArgs.hasVolumeName() && keyArgs.hasBucketName()) {
BucketLayout bucketLayout = ctx.getBucketLayout(
keyArgs.getVolumeName(), keyArgs.getBucketName());
bucketLayout.validateSupportedOperation();
}
}
return req;
}
private String multipartUploadedKeyHash(
OmMultipartKeyInfo.PartKeyInfoMap partsList) {
StringBuffer keysConcatenated = new StringBuffer();
for (PartKeyInfo partKeyInfo: partsList) {
String partPropertyToComputeHash = KeyValueUtil.getFromProtobuf(partKeyInfo.getPartKeyInfo().getMetadataList())
.get(OzoneConsts.ETAG);
if (partPropertyToComputeHash == null) {
partPropertyToComputeHash = partKeyInfo.getPartName();
}
keysConcatenated.append(partPropertyToComputeHash);
}
return DigestUtils.md5Hex(keysConcatenated.toString()) + "-"
+ partsList.size();
}
private static class MultipartCommitRequestPart {
private String requestPartId;
private String omPartId;
private boolean isValid;
MultipartCommitRequestPart(String requestPartId, String omPartId, boolean isValid) {
this.requestPartId = requestPartId;
this.omPartId = omPartId;
this.isValid = isValid;
}
public String getRequestPartId() {
return requestPartId;
}
public String getOmPartId() {
return omPartId;
}
public boolean isValid() {
return isValid;
}
}
}