| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hadoop.ozone.om; |
| |
| import java.io.IOException; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.security.GeneralSecurityException; |
| import java.security.PrivilegedExceptionAction; |
| import java.time.Instant; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.TreeMap; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| import org.apache.hadoop.conf.StorageUnit; |
| import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; |
| import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; |
| import org.apache.hadoop.fs.FileEncryptionInfo; |
| import org.apache.hadoop.hdds.client.BlockID; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; |
| import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; |
| import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; |
| import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException; |
| import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; |
| import org.apache.hadoop.hdds.utils.BackgroundService; |
| import org.apache.hadoop.hdds.utils.UniqueId; |
| import org.apache.hadoop.hdds.utils.db.BatchOperation; |
| import org.apache.hadoop.hdds.utils.db.CodecRegistry; |
| import org.apache.hadoop.hdds.utils.db.DBStore; |
| import org.apache.hadoop.hdds.utils.db.RDBStore; |
| import org.apache.hadoop.hdds.utils.db.Table; |
| import org.apache.hadoop.hdds.utils.db.TableIterator; |
| import org.apache.hadoop.ipc.Server; |
| import org.apache.hadoop.ozone.OmUtils; |
| import org.apache.hadoop.ozone.OzoneAcl; |
| import org.apache.hadoop.ozone.OzoneConsts; |
| import org.apache.hadoop.ozone.common.BlockGroup; |
| import org.apache.hadoop.ozone.om.exceptions.OMException; |
| import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; |
| import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; |
| import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; |
| import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload; |
| import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteList; |
| import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; |
| import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; |
| import org.apache.hadoop.ozone.om.helpers.OmPartInfo; |
| import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo; |
| import org.apache.hadoop.ozone.om.helpers.OpenKeySession; |
| import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil; |
| import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils; |
| import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus; |
| import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; |
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo; |
| import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager; |
| import org.apache.hadoop.ozone.security.acl.OzoneObj; |
| import org.apache.hadoop.ozone.security.acl.RequestContext; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.Time; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Strings; |
| import org.apache.commons.codec.digest.DigestUtils; |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH; |
| import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; |
| import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; |
| import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE; |
| import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; |
| import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND; |
| import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND; |
| import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND; |
| import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR; |
| import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_KMS_PROVIDER; |
| import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; |
| import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND; |
| import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; |
| import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY; |
| import static org.apache.hadoop.util.Time.monotonicNow; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Implementation of keyManager. |
| */ |
| public class KeyManagerImpl implements KeyManager { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(KeyManagerImpl.class); |
| |
| /** |
| * A SCM block client, used to talk to SCM to allocate block during putKey. |
| */ |
| private final OzoneManager ozoneManager; |
| private final ScmClient scmClient; |
| private final OMMetadataManager metadataManager; |
| private final long scmBlockSize; |
| private final boolean useRatis; |
| |
| private final int preallocateBlocksMax; |
| private final String omId; |
| private final OzoneBlockTokenSecretManager secretManager; |
| private final boolean grpcBlockTokenEnabled; |
| |
| private BackgroundService keyDeletingService; |
| |
| private final KeyProviderCryptoExtension kmsProvider; |
| private final PrefixManager prefixManager; |
| |
| |
| @VisibleForTesting |
| public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, |
| OMMetadataManager metadataManager, OzoneConfiguration conf, String omId, |
| OzoneBlockTokenSecretManager secretManager) { |
| this(null, new ScmClient(scmBlockClient, null), metadataManager, |
| conf, omId, secretManager, null, null); |
| } |
| |
| public KeyManagerImpl(OzoneManager om, ScmClient scmClient, |
| OzoneConfiguration conf, String omId) { |
| this (om, scmClient, om.getMetadataManager(), conf, omId, |
| om.getBlockTokenMgr(), om.getKmsProvider(), om.getPrefixManager()); |
| } |
| |
| @SuppressWarnings("parameternumber") |
| private KeyManagerImpl(OzoneManager om, ScmClient scmClient, |
| OMMetadataManager metadataManager, OzoneConfiguration conf, String omId, |
| OzoneBlockTokenSecretManager secretManager, |
| KeyProviderCryptoExtension kmsProvider, PrefixManager prefixManager) { |
| this.scmBlockSize = (long) conf |
| .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, |
| StorageUnit.BYTES); |
| this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY, |
| DFS_CONTAINER_RATIS_ENABLED_DEFAULT); |
| this.preallocateBlocksMax = conf.getInt( |
| OZONE_KEY_PREALLOCATION_BLOCKS_MAX, |
| OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT); |
| this.grpcBlockTokenEnabled = conf.getBoolean( |
| HDDS_BLOCK_TOKEN_ENABLED, |
| HDDS_BLOCK_TOKEN_ENABLED_DEFAULT); |
| |
| this.ozoneManager = om; |
| this.omId = omId; |
| this.scmClient = scmClient; |
| this.metadataManager = metadataManager; |
| this.prefixManager = prefixManager; |
| this.secretManager = secretManager; |
| this.kmsProvider = kmsProvider; |
| |
| } |
| |
| @Override |
| public void start(OzoneConfiguration configuration) { |
| if (keyDeletingService == null) { |
| long blockDeleteInterval = configuration.getTimeDuration( |
| OZONE_BLOCK_DELETING_SERVICE_INTERVAL, |
| OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| long serviceTimeout = configuration.getTimeDuration( |
| OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, |
| OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| keyDeletingService = new KeyDeletingService(ozoneManager, |
| scmClient.getBlockClient(), this, blockDeleteInterval, |
| serviceTimeout, configuration); |
| keyDeletingService.start(); |
| } |
| } |
| |
| KeyProviderCryptoExtension getKMSProvider() { |
| return kmsProvider; |
| } |
| |
| @Override |
| public void stop() throws IOException { |
| if (keyDeletingService != null) { |
| keyDeletingService.shutdown(); |
| keyDeletingService = null; |
| } |
| } |
| |
| private OmBucketInfo getBucketInfo(String volumeName, String bucketName) |
| throws IOException { |
| String bucketKey = metadataManager.getBucketKey(volumeName, bucketName); |
| return metadataManager.getBucketTable().get(bucketKey); |
| } |
| |
| private void validateBucket(String volumeName, String bucketName) |
| throws IOException { |
| String bucketKey = metadataManager.getBucketKey(volumeName, bucketName); |
| // Check if bucket exists |
| if (metadataManager.getBucketTable().get(bucketKey) == null) { |
| String volumeKey = metadataManager.getVolumeKey(volumeName); |
| // If the volume also does not exist, we should throw volume not found |
| // exception |
| if (metadataManager.getVolumeTable().get(volumeKey) == null) { |
| LOG.error("volume not found: {}", volumeName); |
| throw new OMException("Volume not found", |
| VOLUME_NOT_FOUND); |
| } |
| |
| // if the volume exists but bucket does not exist, throw bucket not found |
| // exception |
| LOG.error("bucket not found: {}/{} ", volumeName, bucketName); |
| throw new OMException("Bucket not found", |
| BUCKET_NOT_FOUND); |
| } |
| } |
| |
| /** |
| * Check S3 bucket exists or not. |
| * @param volumeName |
| * @param bucketName |
| * @throws IOException |
| */ |
| private OmBucketInfo validateS3Bucket(String volumeName, String bucketName) |
| throws IOException { |
| |
| String bucketKey = metadataManager.getBucketKey(volumeName, bucketName); |
| OmBucketInfo omBucketInfo = metadataManager.getBucketTable(). |
| get(bucketKey); |
| //Check if bucket already exists |
| if (omBucketInfo == null) { |
| LOG.error("bucket not found: {}/{} ", volumeName, bucketName); |
| throw new OMException("Bucket not found", |
| BUCKET_NOT_FOUND); |
| } |
| return omBucketInfo; |
| } |
| |
| @Override |
| public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, |
| ExcludeList excludeList) throws IOException { |
| Preconditions.checkNotNull(args); |
| |
| |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String keyName = args.getKeyName(); |
| validateBucket(volumeName, bucketName); |
| String openKey = metadataManager.getOpenKey( |
| volumeName, bucketName, keyName, clientID); |
| |
| OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey); |
| if (keyInfo == null) { |
| LOG.error("Allocate block for a key not in open status in meta store" + |
| " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID); |
| throw new OMException("Open Key not found", |
| KEY_NOT_FOUND); |
| } |
| |
| // current version not committed, so new blocks coming now are added to |
| // the same version |
| List<OmKeyLocationInfo> locationInfos = |
| allocateBlock(keyInfo, excludeList, scmBlockSize); |
| |
| keyInfo.appendNewBlocks(locationInfos, true); |
| keyInfo.updateModifcationTime(); |
| metadataManager.getOpenKeyTable().put(openKey, keyInfo); |
| |
| return locationInfos.get(0); |
| |
| } |
| |
| /** |
| * This methods avoids multiple rpc calls to SCM by allocating multiple blocks |
| * in one rpc call. |
| * @param keyInfo - key info for key to be allocated. |
| * @param requestedSize requested length for allocation. |
| * @param excludeList exclude list while allocating blocks. |
| * @param requestedSize requested size to be allocated. |
| * @return |
| * @throws IOException |
| */ |
| private List<OmKeyLocationInfo> allocateBlock(OmKeyInfo keyInfo, |
| ExcludeList excludeList, long requestedSize) throws IOException { |
| int numBlocks = Math.min((int) ((requestedSize - 1) / scmBlockSize + 1), |
| preallocateBlocksMax); |
| List<OmKeyLocationInfo> locationInfos = new ArrayList<>(numBlocks); |
| String remoteUser = getRemoteUser().getShortUserName(); |
| List<AllocatedBlock> allocatedBlocks; |
| try { |
| allocatedBlocks = scmClient.getBlockClient() |
| .allocateBlock(scmBlockSize, numBlocks, keyInfo.getType(), |
| keyInfo.getFactor(), omId, excludeList); |
| } catch (SCMException ex) { |
| if (ex.getResult() |
| .equals(SCMException.ResultCodes.SAFE_MODE_EXCEPTION)) { |
| throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_SAFE_MODE); |
| } |
| throw ex; |
| } |
| for (AllocatedBlock allocatedBlock : allocatedBlocks) { |
| OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder() |
| .setBlockID(new BlockID(allocatedBlock.getBlockID())) |
| .setLength(scmBlockSize) |
| .setOffset(0) |
| .setPipeline(allocatedBlock.getPipeline()); |
| if (grpcBlockTokenEnabled) { |
| builder.setToken(secretManager |
| .generateToken(remoteUser, allocatedBlock.getBlockID().toString(), |
| getAclForUser(remoteUser), scmBlockSize)); |
| } |
| locationInfos.add(builder.build()); |
| } |
| return locationInfos; |
| } |
| |
| /* Optimize ugi lookup for RPC operations to avoid a trip through |
| * UGI.getCurrentUser which is synch'ed. |
| */ |
| public static UserGroupInformation getRemoteUser() throws IOException { |
| UserGroupInformation ugi = Server.getRemoteUser(); |
| return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser(); |
| } |
| |
| /** |
| * Return acl for user. |
| * @param user |
| * |
| * */ |
| private EnumSet<AccessModeProto> getAclForUser(String user) { |
| // TODO: Return correct acl for user. |
| return EnumSet.allOf(AccessModeProto.class); |
| } |
| |
| private EncryptedKeyVersion generateEDEK( |
| final String ezKeyName) throws IOException { |
| if (ezKeyName == null) { |
| return null; |
| } |
| long generateEDEKStartTime = monotonicNow(); |
| EncryptedKeyVersion edek = SecurityUtil.doAsLoginUser( |
| new PrivilegedExceptionAction<EncryptedKeyVersion>() { |
| @Override |
| public EncryptedKeyVersion run() throws IOException { |
| try { |
| return getKMSProvider().generateEncryptedKey(ezKeyName); |
| } catch (GeneralSecurityException e) { |
| throw new IOException(e); |
| } |
| } |
| }); |
| long generateEDEKTime = monotonicNow() - generateEDEKStartTime; |
| LOG.debug("generateEDEK takes {} ms", generateEDEKTime); |
| Preconditions.checkNotNull(edek); |
| return edek; |
| } |
| |
| @Override |
| public OpenKeySession openKey(OmKeyArgs args) throws IOException { |
| Preconditions.checkNotNull(args); |
| Preconditions.checkNotNull(args.getAcls(), "Default acls " + |
| "should be set."); |
| |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String keyName = args.getKeyName(); |
| validateBucket(volumeName, bucketName); |
| |
| long currentTime = UniqueId.next(); |
| OmKeyInfo keyInfo; |
| long openVersion; |
| // NOTE size of a key is not a hard limit on anything, it is a value that |
| // client should expect, in terms of current size of key. If client sets |
| // a value, then this value is used, otherwise, we allocate a single |
| // block which is the current size, if read by the client. |
| final long size = args.getDataSize() > 0 ? |
| args.getDataSize() : scmBlockSize; |
| final List<OmKeyLocationInfo> locations = new ArrayList<>(); |
| |
| ReplicationFactor factor = args.getFactor(); |
| if (factor == null) { |
| factor = useRatis ? ReplicationFactor.THREE : ReplicationFactor.ONE; |
| } |
| |
| ReplicationType type = args.getType(); |
| if (type == null) { |
| type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE; |
| } |
| |
| String dbKeyName = metadataManager.getOzoneKey( |
| args.getVolumeName(), args.getBucketName(), args.getKeyName()); |
| |
| FileEncryptionInfo encInfo; |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName); |
| OmBucketInfo bucketInfo; |
| try { |
| bucketInfo = getBucketInfo(volumeName, bucketName); |
| encInfo = getFileEncryptionInfo(bucketInfo); |
| keyInfo = prepareKeyInfo(args, dbKeyName, size, locations, encInfo); |
| } catch (OMException e) { |
| throw e; |
| } catch (IOException ex) { |
| LOG.error("Key open failed for volume:{} bucket:{} key:{}", |
| volumeName, bucketName, keyName, ex); |
| throw new OMException(ex.getMessage(), ResultCodes.KEY_ALLOCATION_ERROR); |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| if (keyInfo == null) { |
| // the key does not exist, create a new object, the new blocks are the |
| // version 0 |
| keyInfo = createKeyInfo(args, locations, factor, type, size, |
| encInfo, bucketInfo); |
| } |
| openVersion = keyInfo.getLatestVersionLocations().getVersion(); |
| LOG.debug("Key {} allocated in volume {} bucket {}", |
| keyName, volumeName, bucketName); |
| allocateBlockInKey(keyInfo, size, currentTime); |
| return new OpenKeySession(currentTime, keyInfo, openVersion); |
| } |
| |
| private void allocateBlockInKey(OmKeyInfo keyInfo, long size, long sessionId) |
| throws IOException { |
| String openKey = metadataManager |
| .getOpenKey(keyInfo.getVolumeName(), keyInfo.getBucketName(), |
| keyInfo.getKeyName(), sessionId); |
| // requested size is not required but more like a optimization: |
| // SCM looks at the requested, if it 0, no block will be allocated at |
| // the point, if client needs more blocks, client can always call |
| // allocateBlock. But if requested size is not 0, OM will preallocate |
| // some blocks and piggyback to client, to save RPC calls. |
| if (size > 0) { |
| List<OmKeyLocationInfo> locationInfos = |
| allocateBlock(keyInfo, new ExcludeList(), size); |
| keyInfo.appendNewBlocks(locationInfos, true); |
| } |
| |
| metadataManager.getOpenKeyTable().put(openKey, keyInfo); |
| |
| } |
| |
| private OmKeyInfo prepareKeyInfo( |
| OmKeyArgs keyArgs, String dbKeyName, long size, |
| List<OmKeyLocationInfo> locations, FileEncryptionInfo encInfo) |
| throws IOException { |
| OmKeyInfo keyInfo = null; |
| if (keyArgs.getIsMultipartKey()) { |
| keyInfo = prepareMultipartKeyInfo(keyArgs, size, locations, encInfo); |
| } else if (metadataManager.getKeyTable().isExist(dbKeyName)) { |
| keyInfo = metadataManager.getKeyTable().get(dbKeyName); |
| // the key already exist, the new blocks will be added as new version |
| // when locations.size = 0, the new version will have identical blocks |
| // as its previous version |
| keyInfo.addNewVersion(locations, true); |
| keyInfo.setDataSize(size + keyInfo.getDataSize()); |
| } |
| if(keyInfo != null) { |
| keyInfo.setMetadata(keyArgs.getMetadata()); |
| } |
| return keyInfo; |
| } |
| |
| private OmKeyInfo prepareMultipartKeyInfo(OmKeyArgs args, long size, |
| List<OmKeyLocationInfo> locations, FileEncryptionInfo encInfo) |
| throws IOException { |
| ReplicationFactor factor; |
| ReplicationType type; |
| |
| Preconditions.checkArgument(args.getMultipartUploadPartNumber() > 0, |
| "PartNumber Should be greater than zero"); |
| // When key is multipart upload part key, we should take replication |
| // type and replication factor from original key which has done |
| // initiate multipart upload. If we have not found any such, we throw |
| // error no such multipart upload. |
| String uploadID = args.getMultipartUploadID(); |
| Preconditions.checkNotNull(uploadID); |
| String multipartKey = metadataManager |
| .getMultipartKey(args.getVolumeName(), args.getBucketName(), |
| args.getKeyName(), uploadID); |
| OmKeyInfo partKeyInfo = metadataManager.getOpenKeyTable().get( |
| multipartKey); |
| if (partKeyInfo == null) { |
| throw new OMException("No such Multipart upload is with specified " + |
| "uploadId " + uploadID, |
| ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); |
| } else { |
| factor = partKeyInfo.getFactor(); |
| type = partKeyInfo.getType(); |
| } |
| // For this upload part we don't need to check in KeyTable. As this |
| // is not an actual key, it is a part of the key. |
| return createKeyInfo(args, locations, factor, type, size, encInfo, |
| getBucketInfo(args.getVolumeName(), args.getBucketName())); |
| } |
| |
| /** |
| * Create OmKeyInfo object. |
| * @param keyArgs |
| * @param locations |
| * @param factor |
| * @param type |
| * @param size |
| * @param encInfo |
| * @param omBucketInfo |
| * @return |
| */ |
| private OmKeyInfo createKeyInfo(OmKeyArgs keyArgs, |
| List<OmKeyLocationInfo> locations, |
| ReplicationFactor factor, |
| ReplicationType type, long size, |
| FileEncryptionInfo encInfo, |
| OmBucketInfo omBucketInfo) { |
| OmKeyInfo.Builder builder = new OmKeyInfo.Builder() |
| .setVolumeName(keyArgs.getVolumeName()) |
| .setBucketName(keyArgs.getBucketName()) |
| .setKeyName(keyArgs.getKeyName()) |
| .setOmKeyLocationInfos(Collections.singletonList( |
| new OmKeyLocationInfoGroup(0, locations))) |
| .setCreationTime(Time.now()) |
| .setModificationTime(Time.now()) |
| .setDataSize(size) |
| .setReplicationType(type) |
| .setReplicationFactor(factor) |
| .setFileEncryptionInfo(encInfo) |
| .addAllMetadata(keyArgs.getMetadata()); |
| builder.setAcls(getAclsForKey(keyArgs, omBucketInfo)); |
| |
| if(Boolean.valueOf(omBucketInfo.getMetadata().get(OzoneConsts.GDPR_FLAG))) { |
| builder.addMetadata(OzoneConsts.GDPR_FLAG, Boolean.TRUE.toString()); |
| } |
| return builder.build(); |
| } |
| |
| @Override |
| public void commitKey(OmKeyArgs args, long clientID) throws IOException { |
| Preconditions.checkNotNull(args); |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String keyName = args.getKeyName(); |
| List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList(); |
| String objectKey = metadataManager |
| .getOzoneKey(volumeName, bucketName, keyName); |
| String openKey = metadataManager |
| .getOpenKey(volumeName, bucketName, keyName, clientID); |
| Preconditions.checkNotNull(locationInfoList); |
| try { |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| validateBucket(volumeName, bucketName); |
| OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey); |
| if (keyInfo == null) { |
| throw new OMException("Failed to commit key, as " + openKey + "entry " + |
| "is not found in the openKey table", KEY_NOT_FOUND); |
| } |
| keyInfo.setDataSize(args.getDataSize()); |
| keyInfo.setModificationTime(Time.now()); |
| |
| //update the block length for each block |
| keyInfo.updateLocationInfoList(locationInfoList); |
| metadataManager.getStore().move( |
| openKey, |
| objectKey, |
| keyInfo, |
| metadataManager.getOpenKeyTable(), |
| metadataManager.getKeyTable()); |
| } catch (OMException e) { |
| throw e; |
| } catch (IOException ex) { |
| LOG.error("Key commit failed for volume:{} bucket:{} key:{}", |
| volumeName, bucketName, keyName, ex); |
| throw new OMException(ex.getMessage(), |
| ResultCodes.KEY_ALLOCATION_ERROR); |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| } |
| |
| @Override |
| public OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress) |
| throws IOException { |
| Preconditions.checkNotNull(args); |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String keyName = args.getKeyName(); |
| metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| try { |
| String keyBytes = metadataManager.getOzoneKey( |
| volumeName, bucketName, keyName); |
| OmKeyInfo value = metadataManager.getKeyTable().get(keyBytes); |
| if (value == null) { |
| LOG.debug("volume:{} bucket:{} Key:{} not found", |
| volumeName, bucketName, keyName); |
| throw new OMException("Key not found", |
| KEY_NOT_FOUND); |
| } |
| if (grpcBlockTokenEnabled) { |
| String remoteUser = getRemoteUser().getShortUserName(); |
| for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) { |
| key.getLocationList().forEach(k -> { |
| k.setToken(secretManager.generateToken(remoteUser, |
| k.getBlockID().getContainerBlockID().toString(), |
| getAclForUser(remoteUser), |
| k.getLength())); |
| }); |
| } |
| } |
| // Refresh container pipeline info from SCM |
| // based on OmKeyArgs.refreshPipeline flag |
| // 1. Client send initial read request OmKeyArgs.refreshPipeline = false |
| // and uses the pipeline cached in OM to access datanode |
| // 2. If succeeded, done. |
| // 3. If failed due to pipeline does not exist or invalid pipeline state |
| // exception, client should retry lookupKey with |
| // OmKeyArgs.refreshPipeline = true |
| if (args.getRefreshPipeline()) { |
| for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) { |
| key.getLocationList().forEach(k -> { |
| // TODO: fix Some tests that may not initialize container client |
| // The production should always have containerClient initialized. |
| if (scmClient.getContainerClient() != null) { |
| try { |
| ContainerWithPipeline cp = scmClient.getContainerClient() |
| .getContainerWithPipeline(k.getContainerID()); |
| if (!cp.getPipeline().equals(k.getPipeline())) { |
| k.setPipeline(cp.getPipeline()); |
| } |
| } catch (IOException e) { |
| LOG.error("Unable to update pipeline for container:{}", |
| k.getContainerID()); |
| } |
| } |
| }); |
| } |
| } |
| if (args.getSortDatanodes()) { |
| sortDatanodeInPipeline(value, clientAddress); |
| } |
| return value; |
| } catch (IOException ex) { |
| LOG.debug("Get key failed for volume:{} bucket:{} key:{}", |
| volumeName, bucketName, keyName, ex); |
| throw new OMException(ex.getMessage(), |
| KEY_NOT_FOUND); |
| } finally { |
| metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| } |
| |
| @Override |
| public void renameKey(OmKeyArgs args, String toKeyName) throws IOException { |
| Preconditions.checkNotNull(args); |
| Preconditions.checkNotNull(toKeyName); |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String fromKeyName = args.getKeyName(); |
| if (toKeyName.length() == 0 || fromKeyName.length() == 0) { |
| LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}", |
| volumeName, bucketName, fromKeyName, toKeyName); |
| throw new OMException("Key name is empty", |
| ResultCodes.INVALID_KEY_NAME); |
| } |
| |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName); |
| try { |
| // fromKeyName should exist |
| String fromKey = metadataManager.getOzoneKey( |
| volumeName, bucketName, fromKeyName); |
| OmKeyInfo fromKeyValue = metadataManager.getKeyTable().get(fromKey); |
| if (fromKeyValue == null) { |
| // TODO: Add support for renaming open key |
| LOG.error( |
| "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. " |
| + "Key: {} not found.", volumeName, bucketName, fromKeyName, |
| toKeyName, fromKeyName); |
| throw new OMException("Key not found", |
| KEY_NOT_FOUND); |
| } |
| |
| // A rename is a no-op if the target and source name is same. |
| // TODO: Discuss if we need to throw?. |
| if (fromKeyName.equals(toKeyName)) { |
| return; |
| } |
| |
| // toKeyName should not exist |
| String toKey = |
| metadataManager.getOzoneKey(volumeName, bucketName, toKeyName); |
| OmKeyInfo toKeyValue = metadataManager.getKeyTable().get(toKey); |
| if (toKeyValue != null) { |
| LOG.error( |
| "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. " |
| + "Key: {} already exists.", volumeName, bucketName, |
| fromKeyName, toKeyName, toKeyName); |
| throw new OMException("Key already exists", |
| OMException.ResultCodes.KEY_ALREADY_EXISTS); |
| } |
| |
| fromKeyValue.setKeyName(toKeyName); |
| fromKeyValue.updateModifcationTime(); |
| DBStore store = metadataManager.getStore(); |
| try (BatchOperation batch = store.initBatchOperation()) { |
| metadataManager.getKeyTable().deleteWithBatch(batch, fromKey); |
| metadataManager.getKeyTable().putWithBatch(batch, toKey, |
| fromKeyValue); |
| store.commitBatchOperation(batch); |
| } |
| } catch (IOException ex) { |
| if (ex instanceof OMException) { |
| throw ex; |
| } |
| LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}", |
| volumeName, bucketName, fromKeyName, toKeyName, ex); |
| throw new OMException(ex.getMessage(), |
| ResultCodes.KEY_RENAME_ERROR); |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| } |
| |
| @Override |
| public void deleteKey(OmKeyArgs args) throws IOException { |
| Preconditions.checkNotNull(args); |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String keyName = args.getKeyName(); |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName); |
| try { |
| String objectKey = metadataManager.getOzoneKey( |
| volumeName, bucketName, keyName); |
| OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey); |
| if (keyInfo == null) { |
| throw new OMException("Key not found", |
| KEY_NOT_FOUND); |
| } else { |
| // directly delete key with no blocks from db. This key need not be |
| // moved to deleted table. |
| if (isKeyEmpty(keyInfo)) { |
| metadataManager.getKeyTable().delete(objectKey); |
| LOG.debug("Key {} deleted from OM DB", keyName); |
| return; |
| } |
| } |
| RepeatedOmKeyInfo repeatedOmKeyInfo = |
| metadataManager.getDeletedTable().get(objectKey); |
| repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(keyInfo, |
| repeatedOmKeyInfo); |
| metadataManager.getKeyTable().delete(objectKey); |
| metadataManager.getDeletedTable().put(objectKey, repeatedOmKeyInfo); |
| } catch (OMException ex) { |
| throw ex; |
| } catch (IOException ex) { |
| LOG.error(String.format("Delete key failed for volume:%s " |
| + "bucket:%s key:%s", volumeName, bucketName, keyName), ex); |
| throw new OMException(ex.getMessage(), ex, |
| ResultCodes.KEY_DELETION_ERROR); |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| } |
| |
| private boolean isKeyEmpty(OmKeyInfo keyInfo) { |
| for (OmKeyLocationInfoGroup keyLocationList : keyInfo |
| .getKeyLocationVersions()) { |
| if (keyLocationList.getLocationList().size() != 0) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| @Override |
| public List<OmKeyInfo> listKeys(String volumeName, String bucketName, |
| String startKey, String keyPrefix, |
| int maxKeys) throws IOException { |
| Preconditions.checkNotNull(volumeName); |
| Preconditions.checkNotNull(bucketName); |
| |
| // We don't take a lock in this path, since we walk the |
| // underlying table using an iterator. That automatically creates a |
| // snapshot of the data, so we don't need these locks at a higher level |
| // when we iterate. |
| return metadataManager.listKeys(volumeName, bucketName, |
| startKey, keyPrefix, maxKeys); |
| } |
| |
| @Override |
| public List<BlockGroup> getPendingDeletionKeys(final int count) |
| throws IOException { |
| return metadataManager.getPendingDeletionKeys(count); |
| } |
| |
| @Override |
| public List<BlockGroup> getExpiredOpenKeys() throws IOException { |
| return metadataManager.getExpiredOpenKeys(); |
| |
| } |
| |
| @Override |
| public void deleteExpiredOpenKey(String objectKeyName) throws IOException { |
| Preconditions.checkNotNull(objectKeyName); |
| // TODO: Fix this in later patches. |
| } |
| |
| @Override |
| public OMMetadataManager getMetadataManager() { |
| return metadataManager; |
| } |
| |
| @Override |
| public BackgroundService getDeletingService() { |
| return keyDeletingService; |
| } |
| |
| @Override |
| public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws |
| IOException { |
| Preconditions.checkNotNull(omKeyArgs); |
| String uploadID = UUID.randomUUID().toString() + "-" + UniqueId.next(); |
| return createMultipartInfo(omKeyArgs, uploadID); |
| } |
| |
| private OmMultipartInfo createMultipartInfo(OmKeyArgs keyArgs, |
| String multipartUploadID) throws IOException { |
| String volumeName = keyArgs.getVolumeName(); |
| String bucketName = keyArgs.getBucketName(); |
| String keyName = keyArgs.getKeyName(); |
| |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName); |
| OmBucketInfo bucketInfo = validateS3Bucket(volumeName, bucketName); |
| try { |
| |
| // We are adding uploadId to key, because if multiple users try to |
| // perform multipart upload on the same key, each will try to upload, who |
| // ever finally commit the key, we see that key in ozone. Suppose if we |
| // don't add id, and use the same key /volume/bucket/key, when multiple |
| // users try to upload the key, we update the parts of the key's from |
| // multiple users to same key, and the key output can be a mix of the |
| // parts from multiple users. |
| |
| // So on same key if multiple time multipart upload is initiated we |
| // store multiple entries in the openKey Table. |
| // Checked AWS S3, when we try to run multipart upload, each time a |
| // new uploadId is returned. |
| |
| String multipartKey = metadataManager.getMultipartKey(volumeName, |
| bucketName, keyName, multipartUploadID); |
| |
| // Not checking if there is an already key for this in the keyTable, as |
| // during final complete multipart upload we take care of this. |
| |
| |
| Map<Integer, PartKeyInfo> partKeyInfoMap = new HashMap<>(); |
| OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo( |
| multipartUploadID, partKeyInfoMap); |
| List<OmKeyLocationInfo> locations = new ArrayList<>(); |
| OmKeyInfo omKeyInfo = new OmKeyInfo.Builder() |
| .setVolumeName(keyArgs.getVolumeName()) |
| .setBucketName(keyArgs.getBucketName()) |
| .setKeyName(keyArgs.getKeyName()) |
| .setCreationTime(Time.now()) |
| .setModificationTime(Time.now()) |
| .setReplicationType(keyArgs.getType()) |
| .setReplicationFactor(keyArgs.getFactor()) |
| .setOmKeyLocationInfos(Collections.singletonList( |
| new OmKeyLocationInfoGroup(0, locations))) |
| .setAcls(getAclsForKey(keyArgs, bucketInfo)) |
| .build(); |
| DBStore store = metadataManager.getStore(); |
| try (BatchOperation batch = store.initBatchOperation()) { |
| // Create an entry in open key table and multipart info table for |
| // this key. |
| metadataManager.getMultipartInfoTable().putWithBatch(batch, |
| multipartKey, multipartKeyInfo); |
| metadataManager.getOpenKeyTable().putWithBatch(batch, |
| multipartKey, omKeyInfo); |
| store.commitBatchOperation(batch); |
| return new OmMultipartInfo(volumeName, bucketName, keyName, |
| multipartUploadID); |
| } |
| } catch (IOException ex) { |
| LOG.error("Initiate Multipart upload Failed for volume:{} bucket:{} " + |
| "key:{}", volumeName, bucketName, keyName, ex); |
| throw new OMException(ex.getMessage(), |
| ResultCodes.INITIATE_MULTIPART_UPLOAD_ERROR); |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| } |
| |
| private List<OzoneAcl> getAclsForKey(OmKeyArgs keyArgs, |
| OmBucketInfo bucketInfo) { |
| List<OzoneAcl> acls = new ArrayList<>(); |
| |
| if(keyArgs.getAcls() != null) { |
| acls.addAll(keyArgs.getAcls()); |
| } |
| |
| // Inherit DEFAULT acls from prefix. |
| if(prefixManager != null) { |
| List<OmPrefixInfo> prefixList = prefixManager.getLongestPrefixPath( |
| OZONE_URI_DELIMITER + |
| keyArgs.getVolumeName() + OZONE_URI_DELIMITER + |
| keyArgs.getBucketName() + OZONE_URI_DELIMITER + |
| keyArgs.getKeyName()); |
| |
| if(prefixList.size() > 0) { |
| // Add all acls from direct parent to key. |
| OmPrefixInfo prefixInfo = prefixList.get(prefixList.size() - 1); |
| if(prefixInfo != null) { |
| if (OzoneAclUtil.inheritDefaultAcls(acls, prefixInfo.getAcls())) { |
| return acls; |
| } |
| } |
| } |
| } |
| |
| // Inherit DEFAULT acls from bucket only if DEFAULT acls for |
| // prefix are not set. |
| if (bucketInfo != null) { |
| if (OzoneAclUtil.inheritDefaultAcls(acls, bucketInfo.getAcls())) { |
| return acls; |
| } |
| } |
| |
| // TODO: do we need to further fallback to volume default ACL |
| return acls; |
| } |
| |
| @Override |
| public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( |
| OmKeyArgs omKeyArgs, long clientID) throws IOException { |
| Preconditions.checkNotNull(omKeyArgs); |
| String volumeName = omKeyArgs.getVolumeName(); |
| String bucketName = omKeyArgs.getBucketName(); |
| String keyName = omKeyArgs.getKeyName(); |
| String uploadID = omKeyArgs.getMultipartUploadID(); |
| int partNumber = omKeyArgs.getMultipartUploadPartNumber(); |
| |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName); |
| validateS3Bucket(volumeName, bucketName); |
| String partName; |
| try { |
| String multipartKey = metadataManager.getMultipartKey(volumeName, |
| bucketName, keyName, uploadID); |
| OmMultipartKeyInfo multipartKeyInfo = metadataManager |
| .getMultipartInfoTable().get(multipartKey); |
| |
| String openKey = metadataManager.getOpenKey( |
| volumeName, bucketName, keyName, clientID); |
| OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get( |
| openKey); |
| |
| // set the data size and location info list |
| keyInfo.setDataSize(omKeyArgs.getDataSize()); |
| keyInfo.updateLocationInfoList(omKeyArgs.getLocationInfoList()); |
| |
| partName = metadataManager.getOzoneKey(volumeName, bucketName, keyName) |
| + clientID; |
| if (multipartKeyInfo == null) { |
| // This can occur when user started uploading part by the time commit |
| // of that part happens, in between the user might have requested |
| // abort multipart upload. If we just throw exception, then the data |
| // will not be garbage collected, so move this part to delete table |
| // and throw error |
| // Move this part to delete table. |
| RepeatedOmKeyInfo repeatedOmKeyInfo = |
| metadataManager.getDeletedTable().get(partName); |
| repeatedOmKeyInfo = OmUtils.prepareKeyForDelete( |
| keyInfo, repeatedOmKeyInfo); |
| metadataManager.getDeletedTable().put(partName, repeatedOmKeyInfo); |
| throw new OMException("No such Multipart upload is with specified " + |
| "uploadId " + uploadID, ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); |
| } else { |
| PartKeyInfo oldPartKeyInfo = |
| multipartKeyInfo.getPartKeyInfo(partNumber); |
| PartKeyInfo.Builder partKeyInfo = PartKeyInfo.newBuilder(); |
| partKeyInfo.setPartName(partName); |
| partKeyInfo.setPartNumber(partNumber); |
| partKeyInfo.setPartKeyInfo(keyInfo.getProtobuf()); |
| multipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo.build()); |
| if (oldPartKeyInfo == null) { |
| // This is the first time part is being added. |
| DBStore store = metadataManager.getStore(); |
| try (BatchOperation batch = store.initBatchOperation()) { |
| metadataManager.getOpenKeyTable().deleteWithBatch(batch, openKey); |
| metadataManager.getMultipartInfoTable().putWithBatch(batch, |
| multipartKey, multipartKeyInfo); |
| store.commitBatchOperation(batch); |
| } |
| } else { |
| // If we have this part already, that means we are overriding it. |
| // We need to 3 steps. |
| // Add the old entry to delete table. |
| // Remove the new entry from openKey table. |
| // Add the new entry in to the list of part keys. |
| DBStore store = metadataManager.getStore(); |
| try (BatchOperation batch = store.initBatchOperation()) { |
| OmKeyInfo partKey = OmKeyInfo.getFromProtobuf( |
| oldPartKeyInfo.getPartKeyInfo()); |
| |
| RepeatedOmKeyInfo repeatedOmKeyInfo = |
| metadataManager.getDeletedTable() |
| .get(oldPartKeyInfo.getPartName()); |
| |
| repeatedOmKeyInfo = OmUtils.prepareKeyForDelete( |
| partKey, repeatedOmKeyInfo); |
| |
| metadataManager.getDeletedTable().put(partName, repeatedOmKeyInfo); |
| metadataManager.getDeletedTable().putWithBatch(batch, |
| oldPartKeyInfo.getPartName(), |
| repeatedOmKeyInfo); |
| metadataManager.getOpenKeyTable().deleteWithBatch(batch, openKey); |
| metadataManager.getMultipartInfoTable().putWithBatch(batch, |
| multipartKey, multipartKeyInfo); |
| store.commitBatchOperation(batch); |
| } |
| } |
| } |
| } catch (IOException ex) { |
| LOG.error("Upload part Failed: volume:{} bucket:{} " + |
| "key:{} PartNumber: {}", volumeName, bucketName, keyName, |
| partNumber, ex); |
| throw new OMException(ex.getMessage(), |
| ResultCodes.MULTIPART_UPLOAD_PARTFILE_ERROR); |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| |
| return new OmMultipartCommitUploadPartInfo(partName); |
| |
| } |
| |
| @Override |
| @SuppressWarnings("methodlength") |
| public OmMultipartUploadCompleteInfo completeMultipartUpload( |
| OmKeyArgs omKeyArgs, OmMultipartUploadCompleteList multipartUploadList) |
| throws IOException { |
| Preconditions.checkNotNull(omKeyArgs); |
| Preconditions.checkNotNull(multipartUploadList); |
| String volumeName = omKeyArgs.getVolumeName(); |
| String bucketName = omKeyArgs.getBucketName(); |
| String keyName = omKeyArgs.getKeyName(); |
| String uploadID = omKeyArgs.getMultipartUploadID(); |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName); |
| validateS3Bucket(volumeName, bucketName); |
| try { |
| String multipartKey = metadataManager.getMultipartKey(volumeName, |
| bucketName, keyName, uploadID); |
| String ozoneKey = metadataManager.getOzoneKey(volumeName, bucketName, |
| keyName); |
| OmKeyInfo keyInfo = metadataManager.getKeyTable().get(ozoneKey); |
| |
| OmMultipartKeyInfo multipartKeyInfo = metadataManager |
| .getMultipartInfoTable().get(multipartKey); |
| if (multipartKeyInfo == null) { |
| throw new OMException("Complete Multipart Upload Failed: volume: " + |
| volumeName + "bucket: " + bucketName + "key: " + keyName, |
| ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); |
| } |
| TreeMap<Integer, PartKeyInfo> partKeyInfoMap = multipartKeyInfo |
| .getPartKeyInfoMap(); |
| |
| TreeMap<Integer, String> multipartMap = multipartUploadList |
| .getMultipartMap(); |
| |
| // Last key in the map should be having key value as size, as map's |
| // are sorted. Last entry in both maps should have partNumber as size |
| // of the map. As we have part entries 1, 2, 3, 4 and then we get |
| // complete multipart upload request so the map last entry should have 4, |
| // if it is having value greater or less than map size, then there is |
| // some thing wrong throw error. |
| |
| Map.Entry<Integer, String> multipartMapLastEntry = multipartMap |
| .lastEntry(); |
| Map.Entry<Integer, PartKeyInfo> partKeyInfoLastEntry = partKeyInfoMap |
| .lastEntry(); |
| if (partKeyInfoMap.size() != multipartMap.size()) { |
| throw new OMException("Complete Multipart Upload Failed: volume: " + |
| volumeName + "bucket: " + bucketName + "key: " + keyName, |
| ResultCodes.MISMATCH_MULTIPART_LIST); |
| } |
| |
| // Last entry part Number should be the size of the map, otherwise this |
| // means we have missing some parts but we got a complete request. |
| if (multipartMapLastEntry.getKey() != partKeyInfoMap.size() || |
| partKeyInfoLastEntry.getKey() != partKeyInfoMap.size()) { |
| throw new OMException("Complete Multipart Upload Failed: volume: " + |
| volumeName + "bucket: " + bucketName + "key: " + keyName, |
| ResultCodes.MISSING_UPLOAD_PARTS); |
| } |
| ReplicationType type = partKeyInfoLastEntry.getValue().getPartKeyInfo() |
| .getType(); |
| ReplicationFactor factor = partKeyInfoLastEntry.getValue() |
| .getPartKeyInfo().getFactor(); |
| List<OmKeyLocationInfo> locations = new ArrayList<>(); |
| long size = 0; |
| int partsCount =1; |
| int partsMapSize = partKeyInfoMap.size(); |
| for(Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry : partKeyInfoMap |
| .entrySet()) { |
| int partNumber = partKeyInfoEntry.getKey(); |
| PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue(); |
| // Check we have all parts to complete multipart upload and also |
| // check partNames provided match with actual part names |
| String providedPartName = multipartMap.get(partNumber); |
| String actualPartName = partKeyInfo.getPartName(); |
| if (partNumber == partsCount) { |
| if (!actualPartName.equals(providedPartName)) { |
| throw new OMException("Complete Multipart Upload Failed: volume: " + |
| volumeName + "bucket: " + bucketName + "key: " + keyName, |
| ResultCodes.MISMATCH_MULTIPART_LIST); |
| } |
| OmKeyInfo currentPartKeyInfo = OmKeyInfo |
| .getFromProtobuf(partKeyInfo.getPartKeyInfo()); |
| // Check if any part size is less than 5mb, last part can be less |
| // than 5 mb. |
| if (partsCount != partsMapSize && |
| currentPartKeyInfo.getDataSize() < OM_MULTIPART_MIN_SIZE) { |
| LOG.error("MultipartUpload: " + ozoneKey + "Part number: " + |
| partKeyInfo.getPartNumber() + "size " + currentPartKeyInfo |
| .getDataSize() + " is less than minimum part size " + |
| OzoneConsts.OM_MULTIPART_MIN_SIZE); |
| throw new OMException("Complete Multipart Upload Failed: Entity " + |
| "too small: volume: " + volumeName + "bucket: " + bucketName |
| + "key: " + keyName, ResultCodes.ENTITY_TOO_SMALL); |
| } |
| // As all part keys will have only one version. |
| OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo |
| .getKeyLocationVersions().get(0); |
| locations.addAll(currentKeyInfoGroup.getLocationList()); |
| size += currentPartKeyInfo.getDataSize(); |
| } else { |
| throw new OMException("Complete Multipart Upload Failed: volume: " + |
| volumeName + "bucket: " + bucketName + "key: " + keyName, |
| ResultCodes.MISSING_UPLOAD_PARTS); |
| } |
| partsCount++; |
| } |
| if (keyInfo == null) { |
| // This is a newly added key, it does not have any versions. |
| OmKeyLocationInfoGroup keyLocationInfoGroup = new |
| OmKeyLocationInfoGroup(0, locations); |
| // A newly created key, this is the first version. |
| keyInfo = new OmKeyInfo.Builder() |
| .setVolumeName(omKeyArgs.getVolumeName()) |
| .setBucketName(omKeyArgs.getBucketName()) |
| .setKeyName(omKeyArgs.getKeyName()) |
| .setReplicationFactor(factor) |
| .setReplicationType(type) |
| .setCreationTime(Time.now()) |
| .setModificationTime(Time.now()) |
| .setDataSize(size) |
| .setOmKeyLocationInfos( |
| Collections.singletonList(keyLocationInfoGroup)) |
| .setAcls(omKeyArgs.getAcls()).build(); |
| } else { |
| // Already a version exists, so we should add it as a new version. |
| // But now as versioning is not supported, just following the commit |
| // key approach. When versioning support comes, then we can uncomment |
| // below code keyInfo.addNewVersion(locations); |
| keyInfo.updateLocationInfoList(locations); |
| } |
| DBStore store = metadataManager.getStore(); |
| try (BatchOperation batch = store.initBatchOperation()) { |
| //Remove entry in multipart table and add a entry in to key table |
| metadataManager.getMultipartInfoTable().deleteWithBatch(batch, |
| multipartKey); |
| metadataManager.getKeyTable().putWithBatch(batch, |
| ozoneKey, keyInfo); |
| metadataManager.getOpenKeyTable().deleteWithBatch(batch, multipartKey); |
| store.commitBatchOperation(batch); |
| } |
| return new OmMultipartUploadCompleteInfo(omKeyArgs.getVolumeName(), |
| omKeyArgs.getBucketName(), omKeyArgs.getKeyName(), DigestUtils |
| .sha256Hex(keyName)); |
| } catch (OMException ex) { |
| throw ex; |
| } catch (IOException ex) { |
| LOG.error("Complete Multipart Upload Failed: volume: " + volumeName + |
| "bucket: " + bucketName + "key: " + keyName, ex); |
| throw new OMException(ex.getMessage(), ResultCodes |
| .COMPLETE_MULTIPART_UPLOAD_ERROR); |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| } |
| |
| @Override |
| public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException { |
| |
| Preconditions.checkNotNull(omKeyArgs); |
| String volumeName = omKeyArgs.getVolumeName(); |
| String bucketName = omKeyArgs.getBucketName(); |
| String keyName = omKeyArgs.getKeyName(); |
| String uploadID = omKeyArgs.getMultipartUploadID(); |
| Preconditions.checkNotNull(uploadID, "uploadID cannot be null"); |
| validateS3Bucket(volumeName, bucketName); |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName); |
| OmBucketInfo bucketInfo; |
| try { |
| String multipartKey = metadataManager.getMultipartKey(volumeName, |
| bucketName, keyName, uploadID); |
| OmMultipartKeyInfo multipartKeyInfo = metadataManager |
| .getMultipartInfoTable().get(multipartKey); |
| OmKeyInfo openKeyInfo = metadataManager.getOpenKeyTable().get( |
| multipartKey); |
| |
| // If there is no entry in openKeyTable, then there is no multipart |
| // upload initiated for this key. |
| if (openKeyInfo == null) { |
| LOG.error("Abort Multipart Upload Failed: volume: " + volumeName + |
| "bucket: " + bucketName + "key: " + keyName + "with error no " + |
| "such uploadID:" + uploadID); |
| throw new OMException("Abort Multipart Upload Failed: volume: " + |
| volumeName + "bucket: " + bucketName + "key: " + keyName, |
| ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); |
| } else { |
| // Move all the parts to delete table |
| TreeMap<Integer, PartKeyInfo> partKeyInfoMap = multipartKeyInfo |
| .getPartKeyInfoMap(); |
| DBStore store = metadataManager.getStore(); |
| try (BatchOperation batch = store.initBatchOperation()) { |
| for (Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry : partKeyInfoMap |
| .entrySet()) { |
| PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue(); |
| OmKeyInfo currentKeyPartInfo = OmKeyInfo.getFromProtobuf( |
| partKeyInfo.getPartKeyInfo()); |
| |
| RepeatedOmKeyInfo repeatedOmKeyInfo = |
| metadataManager.getDeletedTable() |
| .get(partKeyInfo.getPartName()); |
| |
| repeatedOmKeyInfo = OmUtils.prepareKeyForDelete( |
| currentKeyPartInfo, repeatedOmKeyInfo); |
| |
| metadataManager.getDeletedTable().putWithBatch(batch, |
| partKeyInfo.getPartName(), repeatedOmKeyInfo); |
| } |
| // Finally delete the entry from the multipart info table and open |
| // key table |
| metadataManager.getMultipartInfoTable().deleteWithBatch(batch, |
| multipartKey); |
| metadataManager.getOpenKeyTable().deleteWithBatch(batch, |
| multipartKey); |
| store.commitBatchOperation(batch); |
| } |
| } |
| } catch (OMException ex) { |
| throw ex; |
| } catch (IOException ex) { |
| LOG.error("Abort Multipart Upload Failed: volume: " + volumeName + |
| "bucket: " + bucketName + "key: " + keyName, ex); |
| throw new OMException(ex.getMessage(), ResultCodes |
| .ABORT_MULTIPART_UPLOAD_FAILED); |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| |
| } |
| |
| @Override |
| public OmMultipartUploadList listMultipartUploads(String volumeName, |
| String bucketName, String prefix) throws OMException { |
| Preconditions.checkNotNull(volumeName); |
| Preconditions.checkNotNull(bucketName); |
| |
| metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| try { |
| |
| List<String> multipartUploadKeys = |
| metadataManager |
| .getMultipartUploadKeys(volumeName, bucketName, prefix); |
| |
| List<OmMultipartUpload> collect = multipartUploadKeys.stream() |
| .map(OmMultipartUpload::from) |
| .map(upload -> { |
| String dbKey = metadataManager |
| .getOzoneKey(upload.getVolumeName(), |
| upload.getBucketName(), |
| upload.getKeyName()); |
| try { |
| Table<String, OmKeyInfo> openKeyTable = |
| metadataManager.getOpenKeyTable(); |
| |
| OmKeyInfo omKeyInfo = |
| openKeyTable.get(upload.getDbKey()); |
| |
| upload.setCreationTime( |
| Instant.ofEpochMilli(omKeyInfo.getCreationTime())); |
| |
| upload.setReplicationType(omKeyInfo.getType()); |
| upload.setReplicationFactor(omKeyInfo.getFactor()); |
| } catch (IOException e) { |
| LOG.warn( |
| "Open key entry for multipart upload record can be read {}", |
| dbKey); |
| } |
| return upload; |
| }) |
| .collect(Collectors.toList()); |
| |
| return new OmMultipartUploadList(collect); |
| |
| } catch (IOException ex) { |
| LOG.error("List Multipart Uploads Failed: volume: " + volumeName + |
| "bucket: " + bucketName + "prefix: " + prefix, ex); |
| throw new OMException(ex.getMessage(), ResultCodes |
| .LIST_MULTIPART_UPLOAD_PARTS_FAILED); |
| } finally { |
| metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| } |
| |
| @Override |
| public OmMultipartUploadListParts listParts(String volumeName, |
| String bucketName, String keyName, String uploadID, |
| int partNumberMarker, int maxParts) throws IOException { |
| Preconditions.checkNotNull(volumeName); |
| Preconditions.checkNotNull(bucketName); |
| Preconditions.checkNotNull(keyName); |
| Preconditions.checkNotNull(uploadID); |
| boolean isTruncated = false; |
| int nextPartNumberMarker = 0; |
| |
| metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| try { |
| String multipartKey = metadataManager.getMultipartKey(volumeName, |
| bucketName, keyName, uploadID); |
| |
| OmMultipartKeyInfo multipartKeyInfo = |
| metadataManager.getMultipartInfoTable().get(multipartKey); |
| |
| if (multipartKeyInfo == null) { |
| throw new OMException("No Such Multipart upload exists for this key.", |
| ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); |
| } else { |
| TreeMap<Integer, PartKeyInfo> partKeyInfoMap = |
| multipartKeyInfo.getPartKeyInfoMap(); |
| Iterator<Map.Entry<Integer, PartKeyInfo>> partKeyInfoMapIterator = |
| partKeyInfoMap.entrySet().iterator(); |
| |
| HddsProtos.ReplicationType replicationType = null; |
| HddsProtos.ReplicationFactor replicationFactor = null; |
| |
| int count = 0; |
| List<OmPartInfo> omPartInfoList = new ArrayList<>(); |
| |
| while (count < maxParts && partKeyInfoMapIterator.hasNext()) { |
| Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry = |
| partKeyInfoMapIterator.next(); |
| nextPartNumberMarker = partKeyInfoEntry.getKey(); |
| // As we should return only parts with part number greater |
| // than part number marker |
| if (partKeyInfoEntry.getKey() > partNumberMarker) { |
| PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue(); |
| OmPartInfo omPartInfo = new OmPartInfo(partKeyInfo.getPartNumber(), |
| partKeyInfo.getPartName(), |
| partKeyInfo.getPartKeyInfo().getModificationTime(), |
| partKeyInfo.getPartKeyInfo().getDataSize()); |
| omPartInfoList.add(omPartInfo); |
| |
| //if there are parts, use replication type from one of the parts |
| replicationType = partKeyInfo.getPartKeyInfo().getType(); |
| replicationFactor = partKeyInfo.getPartKeyInfo().getFactor(); |
| count++; |
| } |
| } |
| |
| if (replicationType == null) { |
| //if there are no parts, use the replicationType from the open key. |
| |
| OmKeyInfo omKeyInfo = |
| metadataManager.getOpenKeyTable().get(multipartKey); |
| |
| if (omKeyInfo == null) { |
| throw new IllegalStateException( |
| "Open key is missing for multipart upload " + multipartKey); |
| } |
| |
| replicationType = omKeyInfo.getType(); |
| replicationFactor = omKeyInfo.getFactor(); |
| } |
| Preconditions.checkNotNull(replicationType, |
| "Replication type can't be identified"); |
| Preconditions.checkNotNull(replicationFactor, |
| "Replication factor can't be identified"); |
| |
| if (partKeyInfoMapIterator.hasNext()) { |
| Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry = |
| partKeyInfoMapIterator.next(); |
| isTruncated = true; |
| } else { |
| isTruncated = false; |
| nextPartNumberMarker = 0; |
| } |
| OmMultipartUploadListParts omMultipartUploadListParts = |
| new OmMultipartUploadListParts(replicationType, replicationFactor, |
| nextPartNumberMarker, isTruncated); |
| omMultipartUploadListParts.addPartList(omPartInfoList); |
| return omMultipartUploadListParts; |
| } |
| } catch (OMException ex) { |
| throw ex; |
| } catch (IOException ex){ |
| LOG.error( |
| "List Multipart Upload Parts Failed: volume: {}, bucket: {}, ,key: " |
| + "{} ", |
| volumeName, bucketName, keyName, ex); |
| throw new OMException(ex.getMessage(), ResultCodes |
| .LIST_MULTIPART_UPLOAD_PARTS_FAILED); |
| } finally { |
| metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| } |
| |
| /** |
| * Add acl for Ozone object. Return true if acl is added successfully else |
| * false. |
| * |
| * @param obj Ozone object for which acl should be added. |
| * @param acl ozone acl top be added. |
| * @throws IOException if there is error. |
| */ |
| @Override |
| public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException { |
| validateOzoneObj(obj); |
| String volume = obj.getVolumeName(); |
| String bucket = obj.getBucketName(); |
| String keyName = obj.getKeyName(); |
| boolean changed = false; |
| |
| |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket); |
| try { |
| validateBucket(volume, bucket); |
| String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName); |
| OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey); |
| if (keyInfo == null) { |
| throw new OMException("Key not found. Key:" + objectKey, KEY_NOT_FOUND); |
| } |
| |
| if (keyInfo.getAcls() == null) { |
| keyInfo.setAcls(new ArrayList<>()); |
| } |
| changed = keyInfo.addAcl(acl); |
| if (changed) { |
| metadataManager.getKeyTable().put(objectKey, keyInfo); |
| } |
| } catch (IOException ex) { |
| if (!(ex instanceof OMException)) { |
| LOG.error("Add acl operation failed for key:{}/{}/{}", volume, |
| bucket, keyName, ex); |
| } |
| throw ex; |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket); |
| } |
| return changed; |
| } |
| |
| /** |
| * Remove acl for Ozone object. Return true if acl is removed successfully |
| * else false. |
| * |
| * @param obj Ozone object. |
| * @param acl Ozone acl to be removed. |
| * @throws IOException if there is error. |
| */ |
| @Override |
| public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException { |
| validateOzoneObj(obj); |
| String volume = obj.getVolumeName(); |
| String bucket = obj.getBucketName(); |
| String keyName = obj.getKeyName(); |
| boolean changed = false; |
| |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket); |
| try { |
| validateBucket(volume, bucket); |
| String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName); |
| OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey); |
| if (keyInfo == null) { |
| throw new OMException("Key not found. Key:" + objectKey, KEY_NOT_FOUND); |
| } |
| |
| changed = keyInfo.removeAcl(acl); |
| if (changed) { |
| metadataManager.getKeyTable().put(objectKey, keyInfo); |
| } |
| } catch (IOException ex) { |
| if (!(ex instanceof OMException)) { |
| LOG.error("Remove acl operation failed for key:{}/{}/{}", volume, |
| bucket, keyName, ex); |
| } |
| throw ex; |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket); |
| } |
| return changed; |
| } |
| |
| /** |
| * Acls to be set for given Ozone object. This operations reset ACL for given |
| * object to list of ACLs provided in argument. |
| * |
| * @param obj Ozone object. |
| * @param acls List of acls. |
| * @throws IOException if there is error. |
| */ |
| @Override |
| public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException { |
| validateOzoneObj(obj); |
| String volume = obj.getVolumeName(); |
| String bucket = obj.getBucketName(); |
| String keyName = obj.getKeyName(); |
| boolean changed = false; |
| |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket); |
| try { |
| validateBucket(volume, bucket); |
| String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName); |
| OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey); |
| if (keyInfo == null) { |
| throw new OMException("Key not found. Key:" + objectKey, KEY_NOT_FOUND); |
| } |
| |
| changed = keyInfo.setAcls(acls); |
| |
| if (changed) { |
| metadataManager.getKeyTable().put(objectKey, keyInfo); |
| } |
| } catch (IOException ex) { |
| if (!(ex instanceof OMException)) { |
| LOG.error("Set acl operation failed for key:{}/{}/{}", volume, |
| bucket, keyName, ex); |
| } |
| throw ex; |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket); |
| } |
| return changed; |
| } |
| |
| /** |
| * Returns list of ACLs for given Ozone object. |
| * |
| * @param obj Ozone object. |
| * @throws IOException if there is error. |
| */ |
| @Override |
| public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException { |
| validateOzoneObj(obj); |
| String volume = obj.getVolumeName(); |
| String bucket = obj.getBucketName(); |
| String keyName = obj.getKeyName(); |
| |
| metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volume, bucket); |
| try { |
| validateBucket(volume, bucket); |
| String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName); |
| OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey); |
| if (keyInfo == null) { |
| throw new OMException("Key not found. Key:" + objectKey, KEY_NOT_FOUND); |
| } |
| |
| return keyInfo.getAcls(); |
| } catch (IOException ex) { |
| if (!(ex instanceof OMException)) { |
| LOG.error("Get acl operation failed for key:{}/{}/{}", volume, |
| bucket, keyName, ex); |
| } |
| throw ex; |
| } finally { |
| metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volume, bucket); |
| } |
| } |
| |
| /** |
| * Check access for given ozoneObject. |
| * |
| * @param ozObject object for which access needs to be checked. |
| * @param context Context object encapsulating all user related information. |
| * @return true if user has access else false. |
| */ |
| @Override |
| public boolean checkAccess(OzoneObj ozObject, RequestContext context) |
| throws OMException { |
| Objects.requireNonNull(ozObject); |
| Objects.requireNonNull(context); |
| Objects.requireNonNull(context.getClientUgi()); |
| |
| String volume = ozObject.getVolumeName(); |
| String bucket = ozObject.getBucketName(); |
| String keyName = ozObject.getKeyName(); |
| String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName); |
| OmKeyArgs args = new OmKeyArgs.Builder() |
| .setVolumeName(volume) |
| .setBucketName(bucket) |
| .setKeyName(keyName) |
| .build(); |
| |
| metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volume, bucket); |
| try { |
| validateBucket(volume, bucket); |
| OmKeyInfo keyInfo = null; |
| try { |
| OzoneFileStatus fileStatus = getFileStatus(args); |
| keyInfo = fileStatus.getKeyInfo(); |
| if (keyInfo == null) { |
| // the key does not exist, but it is a parent "dir" of some key |
| // let access be determined based on volume/bucket/prefix ACL |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("key:{} is non-existent parent, permit access to user:{}", |
| keyName, context.getClientUgi()); |
| } |
| return true; |
| } |
| } catch (OMException e) { |
| if (e.getResult() == FILE_NOT_FOUND) { |
| keyInfo = metadataManager.getOpenKeyTable().get(objectKey); |
| } |
| } |
| |
| if (keyInfo == null) { |
| throw new OMException("Key not found, checkAccess failed. Key:" + |
| objectKey, KEY_NOT_FOUND); |
| } |
| |
| boolean hasAccess = OzoneAclUtil.checkAclRight( |
| keyInfo.getAcls(), context); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("user:{} has access rights for key:{} :{} ", |
| context.getClientUgi(), ozObject.getKeyName(), hasAccess); |
| } |
| return hasAccess; |
| } catch (IOException ex) { |
| if(ex instanceof OMException) { |
| throw (OMException) ex; |
| } |
| LOG.error("CheckAccess operation failed for key:{}/{}/{}", volume, |
| bucket, keyName, ex); |
| throw new OMException("Check access operation failed for " + |
| "key:" + keyName, ex, INTERNAL_ERROR); |
| } finally { |
| metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volume, bucket); |
| } |
| } |
| |
| /** |
| * Helper method to validate ozone object. |
| * @param obj |
| * */ |
| private void validateOzoneObj(OzoneObj obj) throws OMException { |
| Objects.requireNonNull(obj); |
| |
| if (!obj.getResourceType().equals(KEY)) { |
| throw new IllegalArgumentException("Unexpected argument passed to " + |
| "KeyManager. OzoneObj type:" + obj.getResourceType()); |
| } |
| String volume = obj.getVolumeName(); |
| String bucket = obj.getBucketName(); |
| String keyName = obj.getKeyName(); |
| |
| if (Strings.isNullOrEmpty(volume)) { |
| throw new OMException("Volume name is required.", VOLUME_NOT_FOUND); |
| } |
| if (Strings.isNullOrEmpty(bucket)) { |
| throw new OMException("Bucket name is required.", BUCKET_NOT_FOUND); |
| } |
| if (Strings.isNullOrEmpty(keyName)) { |
| throw new OMException("Key name is required.", KEY_NOT_FOUND); |
| } |
| } |
| |
| /** |
| * OzoneFS api to get file status for an entry. |
| * |
| * @param args Key args |
| * @throws OMException if file does not exist |
| * if bucket does not exist |
| * @throws IOException if there is error in the db |
| * invalid arguments |
| */ |
| public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException { |
| Preconditions.checkNotNull(args, "Key args can not be null"); |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String keyName = args.getKeyName(); |
| |
| metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| try { |
| // Check if this is the root of the filesystem. |
| if (keyName.length() == 0) { |
| validateBucket(volumeName, bucketName); |
| return new OzoneFileStatus(OZONE_URI_DELIMITER); |
| } |
| |
| // Check if the key is a file. |
| String fileKeyBytes = metadataManager.getOzoneKey( |
| volumeName, bucketName, keyName); |
| OmKeyInfo fileKeyInfo = metadataManager.getKeyTable().get(fileKeyBytes); |
| if (fileKeyInfo != null) { |
| // this is a file |
| return new OzoneFileStatus(fileKeyInfo, scmBlockSize, false); |
| } |
| |
| String dirKey = OzoneFSUtils.addTrailingSlashIfNeeded(keyName); |
| String dirKeyBytes = metadataManager.getOzoneKey( |
| volumeName, bucketName, dirKey); |
| OmKeyInfo dirKeyInfo = metadataManager.getKeyTable().get(dirKeyBytes); |
| if (dirKeyInfo != null) { |
| return new OzoneFileStatus(dirKeyInfo, scmBlockSize, true); |
| } |
| |
| List<OmKeyInfo> keys = metadataManager.listKeys(volumeName, bucketName, |
| null, dirKey, 1); |
| if (keys.iterator().hasNext()) { |
| return new OzoneFileStatus(keyName); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Unable to get file status for the key: volume: {}, bucket:" + |
| " {}, key: {}, with error: No such file exists.", volumeName, |
| bucketName, keyName); |
| } |
| throw new OMException("Unable to get file status: volume: " + |
| volumeName + " bucket: " + bucketName + " key: " + keyName, |
| FILE_NOT_FOUND); |
| } finally { |
| metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| } |
| |
| /** |
| * Ozone FS api to create a directory. Parent directories if do not exist |
| * are created for the input directory. |
| * |
| * @param args Key args |
| * @throws OMException if any entry in the path exists as a file |
| * if bucket does not exist |
| * @throws IOException if there is error in the db |
| * invalid arguments |
| */ |
| public void createDirectory(OmKeyArgs args) throws IOException { |
| Preconditions.checkNotNull(args, "Key args can not be null"); |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String keyName = args.getKeyName(); |
| |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName); |
| try { |
| |
| // Check if this is the root of the filesystem. |
| if (keyName.length() == 0) { |
| return; |
| } |
| |
| Path keyPath = Paths.get(keyName); |
| OzoneFileStatus status = |
| verifyNoFilesInPath(volumeName, bucketName, keyPath, false); |
| if (status != null && OzoneFSUtils.pathToKey(status.getPath()) |
| .equals(keyName)) { |
| // if directory already exists |
| return; |
| } |
| OmKeyInfo dirDbKeyInfo = |
| createDirectoryKey(volumeName, bucketName, keyName, args.getAcls()); |
| String dirDbKey = metadataManager |
| .getOzoneKey(volumeName, bucketName, dirDbKeyInfo.getKeyName()); |
| metadataManager.getKeyTable().put(dirDbKey, dirDbKeyInfo); |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| } |
| |
| private OmKeyInfo createDirectoryKey(String volumeName, String bucketName, |
| String keyName, List<OzoneAcl> acls) throws IOException { |
| // verify bucket exists |
| OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName); |
| |
| String dir = OzoneFSUtils.addTrailingSlashIfNeeded(keyName); |
| FileEncryptionInfo encInfo = getFileEncryptionInfo(bucketInfo); |
| return new OmKeyInfo.Builder() |
| .setVolumeName(volumeName) |
| .setBucketName(bucketName) |
| .setKeyName(dir) |
| .setOmKeyLocationInfos(Collections.singletonList( |
| new OmKeyLocationInfoGroup(0, new ArrayList<>()))) |
| .setCreationTime(Time.now()) |
| .setModificationTime(Time.now()) |
| .setDataSize(0) |
| .setReplicationType(ReplicationType.RATIS) |
| .setReplicationFactor(ReplicationFactor.ONE) |
| .setFileEncryptionInfo(encInfo) |
| .setAcls(acls) |
| .build(); |
| } |
| |
| /** |
| * OzoneFS api to creates an output stream for a file. |
| * |
| * @param args Key args |
| * @param isOverWrite if true existing file at the location will be |
| * overwritten |
| * @param isRecursive if true file would be created even if parent |
| * directories do not exist |
| * @throws OMException if given key is a directory |
| * if file exists and isOverwrite flag is false |
| * if an ancestor exists as a file |
| * if bucket does not exist |
| * @throws IOException if there is error in the db |
| * invalid arguments |
| */ |
| @Override |
| public OpenKeySession createFile(OmKeyArgs args, boolean isOverWrite, |
| boolean isRecursive) throws IOException { |
| Preconditions.checkNotNull(args, "Key args can not be null"); |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String keyName = args.getKeyName(); |
| OpenKeySession keySession; |
| |
| metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName); |
| try { |
| OzoneFileStatus fileStatus; |
| try { |
| fileStatus = getFileStatus(args); |
| if (fileStatus.isDirectory()) { |
| throw new OMException("Can not write to directory: " + keyName, |
| ResultCodes.NOT_A_FILE); |
| } else if (fileStatus.isFile()) { |
| if (!isOverWrite) { |
| throw new OMException("File " + keyName + " already exists", |
| ResultCodes.FILE_ALREADY_EXISTS); |
| } |
| } |
| } catch (OMException ex) { |
| if (ex.getResult() != FILE_NOT_FOUND) { |
| throw ex; |
| } |
| } |
| |
| verifyNoFilesInPath(volumeName, bucketName, |
| Paths.get(keyName).getParent(), !isRecursive); |
| // TODO: Optimize call to openKey as keyInfo is already available in the |
| // filestatus. We can avoid some operations in openKey call. |
| keySession = openKey(args); |
| } finally { |
| metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| |
| return keySession; |
| } |
| |
| /** |
| * OzoneFS api to lookup for a file. |
| * |
| * @param args Key args |
| * @throws OMException if given key is not found or it is not a file |
| * if bucket does not exist |
| * @throws IOException if there is error in the db |
| * invalid arguments |
| */ |
| @Override |
| public OmKeyInfo lookupFile(OmKeyArgs args, String clientAddress) |
| throws IOException { |
| Preconditions.checkNotNull(args, "Key args can not be null"); |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String keyName = args.getKeyName(); |
| |
| metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| try { |
| OzoneFileStatus fileStatus = getFileStatus(args); |
| if (fileStatus.isFile()) { |
| if (args.getSortDatanodes()) { |
| sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress); |
| } |
| return fileStatus.getKeyInfo(); |
| } |
| //if key is not of type file or if key is not found we throw an exception |
| } finally { |
| metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| |
| throw new OMException("Can not write to directory: " + keyName, |
| ResultCodes.NOT_A_FILE); |
| } |
| |
| /** |
| * List the status for a file or a directory and its contents. |
| * |
| * @param args Key args |
| * @param recursive For a directory if true all the descendants of a |
| * particular directory are listed |
| * @param startKey Key from which listing needs to start. If startKey exists |
| * its status is included in the final list. |
| * @param numEntries Number of entries to list from the start key |
| * @return list of file status |
| */ |
| public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive, |
| String startKey, long numEntries) throws IOException { |
| Preconditions.checkNotNull(args, "Key args can not be null"); |
| String volumeName = args.getVolumeName(); |
| String bucketName = args.getBucketName(); |
| String keyName = args.getKeyName(); |
| |
| List<OzoneFileStatus> fileStatusList = new ArrayList<>(); |
| metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| try { |
| if (Strings.isNullOrEmpty(startKey)) { |
| OzoneFileStatus fileStatus = getFileStatus(args); |
| if (fileStatus.isFile()) { |
| return Collections.singletonList(fileStatus); |
| } |
| startKey = OzoneFSUtils.addTrailingSlashIfNeeded(keyName); |
| } |
| |
| String seekKeyInDb = |
| metadataManager.getOzoneKey(volumeName, bucketName, startKey); |
| String keyInDb = OzoneFSUtils.addTrailingSlashIfNeeded( |
| metadataManager.getOzoneKey(volumeName, bucketName, keyName)); |
| TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> |
| iterator = metadataManager.getKeyTable().iterator(); |
| iterator.seek(seekKeyInDb); |
| |
| if (!iterator.hasNext()) { |
| return Collections.emptyList(); |
| } |
| |
| if (iterator.key().equals(keyInDb)) { |
| // skip the key which needs to be listed |
| iterator.next(); |
| } |
| |
| while (iterator.hasNext() && numEntries - fileStatusList.size() > 0) { |
| String entryInDb = iterator.key(); |
| OmKeyInfo value = iterator.value().getValue(); |
| if (entryInDb.startsWith(keyInDb)) { |
| String entryKeyName = value.getKeyName(); |
| if (recursive) { |
| // for recursive list all the entries |
| fileStatusList.add(new OzoneFileStatus(value, scmBlockSize, |
| !OzoneFSUtils.isFile(entryKeyName))); |
| iterator.next(); |
| } else { |
| // get the child of the directory to list from the entry. For |
| // example if directory to list is /a and entry is /a/b/c where |
| // c is a file. The immediate child is b which is a directory. c |
| // should not be listed as child of a. |
| String immediateChild = OzoneFSUtils |
| .getImmediateChild(entryKeyName, keyName); |
| boolean isFile = OzoneFSUtils.isFile(immediateChild); |
| if (isFile) { |
| fileStatusList |
| .add(new OzoneFileStatus(value, scmBlockSize, !isFile)); |
| iterator.next(); |
| } else { |
| // if entry is a directory |
| fileStatusList.add(new OzoneFileStatus(immediateChild)); |
| // skip the other descendants of this child directory. |
| iterator.seek( |
| getNextGreaterString(volumeName, bucketName, immediateChild)); |
| } |
| } |
| } else { |
| break; |
| } |
| } |
| } finally { |
| metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName, |
| bucketName); |
| } |
| return fileStatusList; |
| } |
| |
| private String getNextGreaterString(String volumeName, String bucketName, |
| String keyPrefix) throws IOException { |
| // Increment the last character of the string and return the new ozone key. |
| Preconditions.checkArgument(!Strings.isNullOrEmpty(keyPrefix), |
| "Key prefix is null or empty"); |
| CodecRegistry codecRegistry = |
| ((RDBStore) metadataManager.getStore()).getCodecRegistry(); |
| byte[] keyPrefixInBytes = codecRegistry.asRawData(keyPrefix); |
| keyPrefixInBytes[keyPrefixInBytes.length - 1]++; |
| String nextPrefix = codecRegistry.asObject(keyPrefixInBytes, String.class); |
| return metadataManager.getOzoneKey(volumeName, bucketName, nextPrefix); |
| } |
| |
| /** |
| * Verify that none of the parent path exists as file in the filesystem. |
| * |
| * @param volumeName Volume name |
| * @param bucketName Bucket name |
| * @param path Directory path. This is the absolute path of the |
| * directory for the ozone filesystem. |
| * @param directoryMustExist throws exception if true and given path does not |
| * exist as directory |
| * @return OzoneFileStatus of the first directory found in path in reverse |
| * order |
| * @throws OMException if ancestor exists as file in the filesystem |
| * if directoryMustExist flag is true and parent does |
| * not exist |
| * if bucket does not exist |
| * @throws IOException if there is error in the db |
| * invalid arguments |
| */ |
| private OzoneFileStatus verifyNoFilesInPath(String volumeName, |
| String bucketName, Path path, boolean directoryMustExist) |
| throws IOException { |
| OmKeyArgs.Builder argsBuilder = new OmKeyArgs.Builder() |
| .setVolumeName(volumeName) |
| .setBucketName(bucketName); |
| while (path != null) { |
| String keyName = path.toString(); |
| try { |
| OzoneFileStatus fileStatus = |
| getFileStatus(argsBuilder.setKeyName(keyName).build()); |
| if (fileStatus.isFile()) { |
| LOG.error("Unable to create directory (File already exists): volume: " |
| + volumeName + "bucket: " + bucketName + "key: " + keyName); |
| throw new OMException( |
| "Unable to create directory at : volume: " + volumeName |
| + "bucket: " + bucketName + "key: " + keyName, |
| ResultCodes.FILE_ALREADY_EXISTS); |
| } else if (fileStatus.isDirectory()) { |
| return fileStatus; |
| } |
| } catch (OMException ex) { |
| if (ex.getResult() != FILE_NOT_FOUND) { |
| throw ex; |
| } else if (ex.getResult() == FILE_NOT_FOUND) { |
| if (directoryMustExist) { |
| throw new OMException("Parent directory does not exist", |
| ex.getCause(), DIRECTORY_NOT_FOUND); |
| } |
| } |
| } |
| path = path.getParent(); |
| } |
| return null; |
| } |
| |
| private FileEncryptionInfo getFileEncryptionInfo(OmBucketInfo bucketInfo) |
| throws IOException { |
| FileEncryptionInfo encInfo = null; |
| BucketEncryptionKeyInfo ezInfo = bucketInfo.getEncryptionKeyInfo(); |
| if (ezInfo != null) { |
| if (getKMSProvider() == null) { |
| throw new OMException("Invalid KMS provider, check configuration " + |
| HADOOP_SECURITY_KEY_PROVIDER_PATH, |
| INVALID_KMS_PROVIDER); |
| } |
| |
| final String ezKeyName = ezInfo.getKeyName(); |
| EncryptedKeyVersion edek = generateEDEK(ezKeyName); |
| encInfo = new FileEncryptionInfo(ezInfo.getSuite(), ezInfo.getVersion(), |
| edek.getEncryptedKeyVersion().getMaterial(), |
| edek.getEncryptedKeyIv(), |
| ezKeyName, edek.getEncryptionKeyVersionName()); |
| } |
| return encInfo; |
| } |
| |
| private void sortDatanodeInPipeline(OmKeyInfo keyInfo, String clientMachine) { |
| if (keyInfo != null && clientMachine != null && !clientMachine.isEmpty()) { |
| for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) { |
| key.getLocationList().forEach(k -> { |
| List<DatanodeDetails> nodes = k.getPipeline().getNodes(); |
| if (nodes == null || nodes.size() == 0) { |
| LOG.warn("Datanodes for pipeline {} is empty", |
| k.getPipeline().getId().toString()); |
| return; |
| } |
| List<String> nodeList = new ArrayList<>(); |
| nodes.stream().forEach(node -> |
| nodeList.add(node.getUuidString())); |
| try { |
| List<DatanodeDetails> sortedNodes = scmClient.getBlockClient() |
| .sortDatanodes(nodeList, clientMachine); |
| k.getPipeline().setNodesInOrder(sortedNodes); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sort datanodes {} for client {}, return {}", nodes, |
| clientMachine, sortedNodes); |
| } |
| } catch (IOException e) { |
| LOG.warn("Unable to sort datanodes based on distance to " + |
| "client, volume=" + keyInfo.getVolumeName() + |
| ", bucket=" + keyInfo.getBucketName() + |
| ", key=" + keyInfo.getKeyName() + |
| ", client=" + clientMachine + |
| ", datanodes=" + nodes.toString() + |
| ", exception=" + e.getMessage()); |
| } |
| }); |
| } |
| } |
| } |
| } |