Merge remote-tracking branch 'origin/master' into HDDS-10656-atomic-key-overwrite
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 79ba9bb..273065b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -342,6 +342,7 @@ private OzoneConsts() {
public static final String BUCKET_LAYOUT = "bucketLayout";
public static final String TENANT = "tenant";
public static final String USER_PREFIX = "userPrefix";
+ public static final String REWRITE_GENERATION = "rewriteGeneration";
// For multi-tenancy
public static final String TENANT_ID_USERNAME_DELIMITER = "$";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
index c55945d..c53aca2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
@@ -42,6 +42,8 @@ public enum OzoneManagerVersion implements ComponentVersion {
OBJECT_TAG(5, "OzoneManager version that supports object tags"),
+ ATOMIC_REWRITE_KEY(6, "OzoneManager version that supports rewriting key as atomic operation"),
+
FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 575701b..216b51b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -491,6 +491,28 @@ public OzoneOutputStream createKey(String key, long size,
}
/**
+ * This API allows to atomically update an existing key. The key read before invoking this API
+ * should remain unchanged for this key to be written. This is controlled by the generation
+ * field in the existing Key param. If the key is replaced or updated the generation will change. If the
+ * generation has changed since the existing Key was read, either the initial key create will fail,
+ * or the key will fail to commit after the data has been written as the checks are carried out
+ * both at key open and commit time.
+ *
+ * @param keyName Existing key to rewrite. This must exist in the bucket.
+ * @param size The size of the new key
+ * @param existingKeyGeneration The generation of the existing key which is checked for changes at key create
+ * and commit time.
+ * @param replicationConfig The replication configuration for the key to be rewritten.
+ * @param metadata custom key value metadata
+ * @return OzoneOutputStream to which the data has to be written.
+ * @throws IOException
+ */
+ public OzoneOutputStream rewriteKey(String keyName, long size, long existingKeyGeneration,
+ ReplicationConfig replicationConfig, Map<String, String> metadata) throws IOException {
+ return proxy.rewriteKey(volumeName, name, keyName, size, existingKeyGeneration, replicationConfig, metadata);
+ }
+
+ /**
* Creates a new key in the bucket, with default replication type RATIS and
* with replication factor THREE.
*
@@ -1862,8 +1884,7 @@ private void addKeyPrefixInfoToResultList(String keyPrefix,
keyInfo.getDataSize(), keyInfo.getCreationTime(),
keyInfo.getModificationTime(),
keyInfo.getReplicationConfig(),
- keyInfo.isFile(),
- keyInfo.getOwnerName());
+ keyInfo.isFile(), keyInfo.getOwnerName());
keysResultList.add(ozoneKey);
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
index fdd89fe..3b8d7ff 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
@@ -72,10 +72,6 @@ public class OzoneKey {
*/
private final boolean isFile;
- /**
- * Constructs OzoneKey from OmKeyInfo.
- *
- */
@SuppressWarnings("parameternumber")
public OzoneKey(String volumeName, String bucketName,
String keyName, long size, long creationTime,
@@ -219,6 +215,10 @@ public boolean isFile() {
return isFile;
}
+ /**
+ * Constructs OzoneKey from OmKeyInfo.
+ *
+ */
public static OzoneKey fromKeyInfo(OmKeyInfo keyInfo) {
return new OzoneKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
keyInfo.getKeyName(), keyInfo.getDataSize(), keyInfo.getCreationTime(),
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
index 168e15d..fe54779 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
@@ -43,6 +43,12 @@ public class OzoneKeyDetails extends OzoneKey {
private final CheckedSupplier<OzoneInputStream, IOException> contentSupplier;
/**
+ * The generation of an existing key. This can be used with atomic commits, to
+ * ensure the key has not changed since the key details were read.
+ */
+ private final Long generation;
+
+ /**
* Constructs OzoneKeyDetails from OmKeyInfo.
*/
@SuppressWarnings("parameternumber")
@@ -53,12 +59,30 @@ public OzoneKeyDetails(String volumeName, String bucketName, String keyName,
Map<String, String> metadata,
FileEncryptionInfo feInfo,
CheckedSupplier<OzoneInputStream, IOException> contentSupplier,
- boolean isFile, String owner, Map<String, String> tags) {
+ boolean isFile, String owner, Map<String, String> tags, Long generation) {
super(volumeName, bucketName, keyName, size, creationTime,
modificationTime, replicationConfig, metadata, isFile, owner, tags);
this.ozoneKeyLocations = ozoneKeyLocations;
this.feInfo = feInfo;
this.contentSupplier = contentSupplier;
+ this.generation = generation;
+ }
+
+ /**
+ * Constructs OzoneKeyDetails from OmKeyInfo.
+ */
+ @SuppressWarnings("parameternumber")
+ public OzoneKeyDetails(String volumeName, String bucketName, String keyName,
+ long size, long creationTime, long modificationTime,
+ List<OzoneKeyLocation> ozoneKeyLocations,
+ ReplicationConfig replicationConfig,
+ Map<String, String> metadata,
+ FileEncryptionInfo feInfo,
+ CheckedSupplier<OzoneInputStream, IOException> contentSupplier,
+ boolean isFile, String owner, Map<String, String> tags) {
+ this(volumeName, bucketName, keyName, size, creationTime,
+ modificationTime, ozoneKeyLocations, replicationConfig, metadata, feInfo, contentSupplier,
+ isFile, owner, tags, null);
}
/**
@@ -72,6 +96,10 @@ public FileEncryptionInfo getFileEncryptionInfo() {
return feInfo;
}
+ public Long getGeneration() {
+ return generation;
+ }
+
/**
* Get OzoneInputStream to read the content of the key.
* @return OzoneInputStream
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 7434434..55985a1 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -356,6 +356,29 @@ OzoneOutputStream createKey(String volumeName, String bucketName,
throws IOException;
/**
+ * This API allows to atomically update an existing key. The key read before invoking this API
+ * should remain unchanged for this key to be written. This is controlled by the generation
+ * field in the existing Key param. If the key is replaced or updated the generation will change. If the
+ * generation has changed since the existing Key was read, either the initial key create will fail,
+ * or the key will fail to commit after the data has been written as the checks are carried out
+ * both at key open and commit time.
+ *
+ * @param volumeName Name of the Volume
+ * @param bucketName Name of the Bucket
+ * @param keyName Existing key to rewrite. This must exist in the bucket.
+ * @param size The size of the new key
+ * @param existingKeyGeneration The generation of the existing key which is checked for changes at key create
+ * and commit time.
+ * @param replicationConfig The replication configuration for the key to be rewritten.
+ * @param metadata custom key value metadata
+ * @return {@link OzoneOutputStream}
+ * @throws IOException
+ */
+ OzoneOutputStream rewriteKey(String volumeName, String bucketName, String keyName,
+ long size, long existingKeyGeneration, ReplicationConfig replicationConfig,
+ Map<String, String> metadata) throws IOException;
+
+ /**
* Writes a key in an existing bucket.
* @param volumeName Name of the Volume
* @param bucketName Name of the Bucket
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 248f21c..ac0cf1d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1396,22 +1396,7 @@ public OzoneOutputStream createKey(
String volumeName, String bucketName, String keyName, long size,
ReplicationConfig replicationConfig,
Map<String, String> metadata, Map<String, String> tags) throws IOException {
- verifyVolumeName(volumeName);
- verifyBucketName(bucketName);
- if (checkKeyNameEnabled) {
- HddsClientUtils.verifyKeyName(keyName);
- }
- HddsClientUtils.checkNotNull(keyName);
- if (omVersion
- .compareTo(OzoneManagerVersion.ERASURE_CODED_STORAGE_SUPPORT) < 0) {
- if (replicationConfig != null &&
- replicationConfig.getReplicationType()
- == HddsProtos.ReplicationType.EC) {
- throw new IOException("Can not set the replication of the key to"
- + " Erasure Coded replication, as OzoneManager does not support"
- + " Erasure Coded replication.");
- }
- }
+ createKeyPreChecks(volumeName, bucketName, keyName, replicationConfig);
if (omVersion.compareTo(OzoneManagerVersion.OBJECT_TAG) < 0) {
if (tags != null && !tags.isEmpty()) {
@@ -1419,9 +1404,6 @@ public OzoneOutputStream createKey(
}
}
- if (replicationConfig != null) {
- replicationConfigValidator.validate(replicationConfig);
- }
String ownerName = getRealUserInfo().getShortUserName();
OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
@@ -1448,6 +1430,61 @@ public OzoneOutputStream createKey(
}
@Override
+ public OzoneOutputStream rewriteKey(String volumeName, String bucketName, String keyName,
+ long size, long existingKeyGeneration, ReplicationConfig replicationConfig,
+ Map<String, String> metadata) throws IOException {
+ if (omVersion.compareTo(OzoneManagerVersion.ATOMIC_REWRITE_KEY) < 0) {
+ throw new IOException("OzoneManager does not support atomic key rewrite.");
+ }
+
+ createKeyPreChecks(volumeName, bucketName, keyName, replicationConfig);
+
+ OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setDataSize(size)
+ .setReplicationConfig(replicationConfig)
+ .addAllMetadataGdpr(metadata)
+ .setLatestVersionLocation(getLatestVersionLocation)
+ .setExpectedDataGeneration(existingKeyGeneration);
+
+ OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
+ // For bucket with layout OBJECT_STORE, when create an empty file (size=0),
+ // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE,
+ // which will cause S3G's atomic write length check to fail,
+ // so reset size to 0 here.
+ if (isS3GRequest.get() && size == 0) {
+ openKey.getKeyInfo().setDataSize(0);
+ }
+ return createOutputStream(openKey);
+ }
+
+ private void createKeyPreChecks(String volumeName, String bucketName, String keyName,
+ ReplicationConfig replicationConfig) throws IOException {
+ verifyVolumeName(volumeName);
+ verifyBucketName(bucketName);
+ if (checkKeyNameEnabled) {
+ HddsClientUtils.verifyKeyName(keyName);
+ }
+ HddsClientUtils.checkNotNull(keyName);
+ if (omVersion
+ .compareTo(OzoneManagerVersion.ERASURE_CODED_STORAGE_SUPPORT) < 0) {
+ if (replicationConfig != null &&
+ replicationConfig.getReplicationType()
+ == HddsProtos.ReplicationType.EC) {
+ throw new IOException("Can not set the replication of the key to"
+ + " Erasure Coded replication, as OzoneManager does not support"
+ + " Erasure Coded replication.");
+ }
+ }
+
+ if (replicationConfig != null) {
+ replicationConfigValidator.validate(replicationConfig);
+ }
+ }
+
+ @Override
public OzoneDataStreamOutput createStreamKey(
String volumeName, String bucketName, String keyName, long size,
ReplicationConfig replicationConfig,
@@ -1767,8 +1804,10 @@ private OzoneKeyDetails getOzoneKeyDetails(OmKeyInfo keyInfo) {
keyInfo.getModificationTime(), ozoneKeyLocations,
keyInfo.getReplicationConfig(), keyInfo.getMetadata(),
keyInfo.getFileEncryptionInfo(),
- () -> getInputStreamWithRetryFunction(keyInfo), keyInfo.isFile(),
- keyInfo.getOwnerName(), keyInfo.getTags());
+ () -> getInputStreamWithRetryFunction(keyInfo), keyInfo.isFile(),
+ keyInfo.getOwnerName(), keyInfo.getTags(),
+ keyInfo.getGeneration()
+ );
}
@Override
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index 19d5ab4..ba28b45 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -54,6 +54,13 @@ public final class OmKeyArgs implements Auditable {
private final boolean headOp;
private final boolean forceUpdateContainerCacheFromSCM;
private final Map<String, String> tags;
+ // expectedDataGeneration, when used in key creation indicates that a
+ // key with the same keyName should exist with the given generation.
+ // For a key commit to succeed, the original key should still be present with the
+ // generation unchanged.
+ // This allows a key to be created an committed atomically if the original has not
+ // been modified.
+ private Long expectedDataGeneration = null;
private OmKeyArgs(Builder b) {
this.volumeName = b.volumeName;
@@ -74,6 +81,7 @@ private OmKeyArgs(Builder b) {
this.forceUpdateContainerCacheFromSCM = b.forceUpdateContainerCacheFromSCM;
this.ownerName = b.ownerName;
this.tags = b.tags;
+ this.expectedDataGeneration = b.expectedDataGeneration;
}
public boolean getIsMultipartKey() {
@@ -156,6 +164,10 @@ public Map<String, String> getTags() {
return tags;
}
+ public Long getExpectedDataGeneration() {
+ return expectedDataGeneration;
+ }
+
@Override
public Map<String, String> toAuditMap() {
Map<String, String> auditMap = new LinkedHashMap<>();
@@ -179,7 +191,7 @@ public void addLocationInfo(OmKeyLocationInfo locationInfo) {
}
public OmKeyArgs.Builder toBuilder() {
- return new OmKeyArgs.Builder()
+ OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
@@ -197,11 +209,16 @@ public OmKeyArgs.Builder toBuilder() {
.setAcls(acls)
.setForceUpdateContainerCacheFromSCM(forceUpdateContainerCacheFromSCM)
.addAllTags(tags);
+
+ if (expectedDataGeneration != null) {
+ builder.setExpectedDataGeneration(expectedDataGeneration);
+ }
+ return builder;
}
@Nonnull
public KeyArgs toProtobuf() {
- return KeyArgs.newBuilder()
+ KeyArgs.Builder builder = KeyArgs.newBuilder()
.setVolumeName(getVolumeName())
.setBucketName(getBucketName())
.setKeyName(getKeyName())
@@ -210,8 +227,11 @@ public KeyArgs toProtobuf() {
.setLatestVersionLocation(getLatestVersionLocation())
.setHeadOp(isHeadOp())
.setForceUpdateContainerCacheFromSCM(
- isForceUpdateContainerCacheFromSCM())
- .build();
+ isForceUpdateContainerCacheFromSCM());
+ if (expectedDataGeneration != null) {
+ builder.setExpectedDataGeneration(expectedDataGeneration);
+ }
+ return builder.build();
}
/**
@@ -236,6 +256,7 @@ public static class Builder {
private boolean headOp;
private boolean forceUpdateContainerCacheFromSCM;
private final Map<String, String> tags = new HashMap<>();
+ private Long expectedDataGeneration = null;
public Builder setVolumeName(String volume) {
this.volumeName = volume;
@@ -345,6 +366,11 @@ public Builder setForceUpdateContainerCacheFromSCM(boolean value) {
return this;
}
+ public Builder setExpectedDataGeneration(long generation) {
+ this.expectedDataGeneration = generation;
+ return this;
+ }
+
public OmKeyArgs build() {
return new OmKeyArgs(this);
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index c8e7f8f..f52a142 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -107,6 +107,14 @@ public static Codec<OmKeyInfo> getCodec(boolean ignorePipeline) {
*/
private Map<String, String> tags;
+ // expectedDataGeneration, when used in key creation indicates that a
+ // key with the same keyName should exist with the given generation.
+ // For a key commit to succeed, the original key should still be present with the
+ // generation unchanged.
+ // This allows a key to be created an committed atomically if the original has not
+ // been modified.
+ private Long expectedDataGeneration = null;
+
private OmKeyInfo(Builder b) {
super(b);
this.volumeName = b.volumeName;
@@ -124,6 +132,7 @@ private OmKeyInfo(Builder b) {
this.isFile = b.isFile;
this.ownerName = b.ownerName;
this.tags = b.tags;
+ this.expectedDataGeneration = b.expectedDataGeneration;
}
public String getVolumeName() {
@@ -166,10 +175,26 @@ public String getFileName() {
return fileName;
}
+ public void setExpectedDataGeneration(Long generation) {
+ this.expectedDataGeneration = generation;
+ }
+
+ public Long getExpectedDataGeneration() {
+ return expectedDataGeneration;
+ }
+
public String getOwnerName() {
return ownerName;
}
+ /**
+ * Returns the generation of the object. Note this is currently the same as updateID for a key.
+ * @return long
+ */
+ public long getGeneration() {
+ return getUpdateID();
+ }
+
public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() {
return keyLocationVersions.size() == 0 ? null :
keyLocationVersions.get(keyLocationVersions.size() - 1);
@@ -452,6 +477,7 @@ public static class Builder extends WithParentObjectId.Builder {
private boolean isFile;
private final Map<String, String> tags = new HashMap<>();
+ private Long expectedDataGeneration = null;
public Builder() {
}
@@ -590,6 +616,11 @@ public Builder addAllTags(Map<String, String> keyTags) {
return this;
}
+ public Builder setExpectedDataGeneration(Long existingGeneration) {
+ this.expectedDataGeneration = existingGeneration;
+ return this;
+ }
+
public OmKeyInfo build() {
return new OmKeyInfo(this);
}
@@ -695,6 +726,9 @@ private KeyInfo getProtobuf(boolean ignorePipeline, String fullKeyName,
kb.setFileEncryptionInfo(OMPBHelper.convert(encInfo));
}
kb.setIsFile(isFile);
+ if (expectedDataGeneration != null) {
+ kb.setExpectedDataGeneration(expectedDataGeneration);
+ }
if (ownerName != null) {
kb.setOwnerName(ownerName);
}
@@ -745,6 +779,9 @@ public static OmKeyInfo getFromProtobuf(KeyInfo keyInfo) throws IOException {
if (keyInfo.hasIsFile()) {
builder.setFile(keyInfo.getIsFile());
}
+ if (keyInfo.hasExpectedDataGeneration()) {
+ builder.setExpectedDataGeneration(keyInfo.getExpectedDataGeneration());
+ }
if (keyInfo.hasOwnerName()) {
builder.setOwnerName(keyInfo.getOwnerName());
@@ -863,6 +900,9 @@ public OmKeyInfo copyObject() {
if (fileChecksum != null) {
builder.setFileChecksum(fileChecksum);
}
+ if (expectedDataGeneration != null) {
+ builder.setExpectedDataGeneration(expectedDataGeneration);
+ }
return builder.build();
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 3e21494..e6a8556 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -689,8 +689,13 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
- .setKeyName(args.getKeyName())
- .setOwnerName(args.getOwner());
+ .setKeyName(args.getKeyName());
+
+ // When rewriting a key, the owner does not need to be passed, as it is inherited
+ // from the existing key. Hence it can be null.
+ if (args.getOwner() != null) {
+ keyArgs.setOwnerName(args.getOwner());
+ }
if (args.getAcls() != null) {
keyArgs.addAllAcls(args.getAcls().stream().distinct().map(a ->
@@ -733,6 +738,10 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
keyArgs.setSortDatanodes(args.getSortDatanodes());
+ if (args.getExpectedDataGeneration() != null) {
+ keyArgs.setExpectedDataGeneration(args.getExpectedDataGeneration());
+ }
+
req.setKeyArgs(keyArgs.build());
OMRequest omRequest = createOMRequest(Type.CreateKey)
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
index 4aead0c..03ab063 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
@@ -67,6 +67,7 @@ public void protobufConversion() throws IOException {
assertFalse(key.isHsync());
key.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, "clientid");
assertTrue(key.isHsync());
+ assertEquals(5678L, key.getExpectedDataGeneration());
}
@Test
@@ -123,6 +124,7 @@ private OmKeyInfo createOmKeyInfo(ReplicationConfig replicationConfig) {
.setReplicationConfig(replicationConfig)
.addMetadata("key1", "value1")
.addMetadata("key2", "value2")
+ .setExpectedDataGeneration(5678L)
.build();
}
diff --git a/hadoop-ozone/dist/src/main/compose/common/ec-test.sh b/hadoop-ozone/dist/src/main/compose/common/ec-test.sh
index 65a6595..04df2b2 100755
--- a/hadoop-ozone/dist/src/main/compose/common/ec-test.sh
+++ b/hadoop-ozone/dist/src/main/compose/common/ec-test.sh
@@ -20,6 +20,8 @@
## Exclude virtual-host tests. This is tested separately as it requires additional config.
execute_robot_test scm -v BUCKET:erasure --exclude virtual-host s3
+execute_robot_test scm ec/rewrite.robot
+
prefix=${RANDOM}
execute_robot_test scm -v PREFIX:${prefix} ec/basic.robot
docker-compose up -d --no-recreate --scale datanode=4
diff --git a/hadoop-ozone/dist/src/main/smoketest/ec/rewrite.robot b/hadoop-ozone/dist/src/main/smoketest/ec/rewrite.robot
new file mode 100644
index 0000000..5b5df20
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/ec/rewrite.robot
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Documentation Test EC shell commands
+Library OperatingSystem
+Resource ../commonlib.robot
+Resource ../lib/os.robot
+Resource ../ozone-lib/shell.robot
+Resource ../s3/commonawslib.robot
+Resource lib.resource
+Suite Setup Run Keyword if '${SECURITY_ENABLED}' == 'true' Kinit test user testuser testuser.keytab
+
+
+*** Variables ***
+${ENDPOINT_URL} http://s3g:9878
+
+
+*** Test Cases ***
+
+Rewrite Multipart Key
+ [setup] Setup v4 headers
+ ${bucket} = Create bucket with layout /s3v OBJECT_STORE
+ ${key} = Set Variable multipart.key
+ ${file} = Create Random File MB 12
+ Execute AWSS3Cli cp ${file} s3://${bucket}/${key}
+ Key Should Match Local File /s3v/${bucket}/${key} ${file}
+ Verify Key Replica Replication Config /s3v/${bucket}/${key} RATIS THREE
+
+ Execute ozone sh key rewrite -t EC -r rs-3-2-1024k /s3v/${bucket}/${key}
+
+ Key Should Match Local File /s3v/${bucket}/${key} ${file}
+ Verify Key EC Replication Config /s3v/${bucket}/${key} RS 3 2 1048576
diff --git a/hadoop-ozone/dist/src/main/smoketest/lib/os.robot b/hadoop-ozone/dist/src/main/smoketest/lib/os.robot
index cc20d6e..98f836f 100644
--- a/hadoop-ozone/dist/src/main/smoketest/lib/os.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/lib/os.robot
@@ -40,13 +40,28 @@
${checksumafter} = Execute md5sum ${file2} | awk '{print $1}'
Should Be Equal ${checksumbefore} ${checksumafter}
+Create Random File MB
+ [arguments] ${size_in_megabytes} ${path}=${EMPTY}
+ ${path} = Create Random File ${size_in_megabytes} 1048576 ${path}
+ [return] ${path}
+
+Create Random File KB
+ [arguments] ${size_in_kilobytes} ${path}=${EMPTY}
+ ${path} = Create Random File ${size_in_kilobytes} 1024 ${path}
+ [return] ${path}
+
Create Random File
- ${postfix} = Generate Random String 5 [NUMBERS]
+ [arguments] ${block_count} ${block_size} ${path}=${EMPTY}
+ ${path} = Run Keyword If '${path}' == '${EMPTY}' Get Random Filename
+ ... ELSE Set Variable ${path}
+ Execute dd if=/dev/urandom of=${path} bs=${block_size} count=${block_count} status=none
+ [return] ${path}
+
+Get Random Filename
+ ${postfix} = Generate Random String 10 [LOWER]
${tmpfile} = Set Variable /tmp/tempfile-${postfix}
File Should Not Exist ${tmpfile}
- ${content} = Set Variable "Random string"
- Create File ${tmpfile} ${content}
- [Return] ${tmpfile}
+ [return] ${tmpfile}
List All Processes
${output} = Execute ps aux
diff --git a/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot
index b255d51..ffefda8 100644
--- a/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot
@@ -56,6 +56,13 @@
${dir} = Execute ozone envvars | grep 'HDDS_LIB_JARS_DIR' | cut -f2 -d= | sed -e "s/'//g" -e 's/"//g'
Set Environment Variable HDDS_LIB_JARS_DIR ${dir}
+Create bucket with layout
+ [Arguments] ${volume} ${layout}
+ ${postfix} = Generate Random String 10 [LOWER]
+ ${bucket} = Set Variable bucket-${postfix}
+ ${result} = Execute ozone sh bucket create --layout ${layout} ${volume}/${bucket}
+ [Return] ${bucket}
+
Create Key
[arguments] ${key} ${file} ${args}=${EMPTY}
${output} = Execute ozone sh key put ${args} ${key} ${file}
diff --git a/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot
index 56fbcf8..22805ef 100644
--- a/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot
@@ -48,7 +48,7 @@
Key Should Match Local File o3://${OM_SERVICE_ID}/vol1/bucket/passwd /etc/passwd
Compare Key With Local File with Different File
- ${random_file} = Create Random File
+ ${random_file} = Create Random File KB 42
${matches} = Compare Key With Local File o3://${OM_SERVICE_ID}/vol1/bucket/passwd ${random_file}
Should Be Equal ${matches} ${FALSE}
[Teardown] Remove File ${random_file}
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
index 2c4195f..08fc692 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
@@ -22,6 +22,7 @@
Resource commonawslib.robot
Test Timeout 5 minutes
Suite Setup Setup Multipart Tests
+Suite Teardown Teardown Multipart Tests
Test Setup Generate random prefix
*** Keywords ***
@@ -29,18 +30,19 @@
Setup s3 tests
# 5MB + a bit
- Create Random File KB /tmp/part1 5121
+ Create Random File KB 5121 /tmp/part1
# 1MB - a bit
- Create Random File KB /tmp/part2 1023
+ Create Random File KB 1023 /tmp/part2
-Create Random file
- [arguments] ${size_in_megabytes}
- Execute dd if=/dev/urandom of=/tmp/part1 bs=1048576 count=${size_in_megabytes} status=none
+ Create Random File MB 10 /tmp/10mb
+ Create Random File MB 22 /tmp/22mb
+ Create Random File KB 10 /tmp/10kb
-Create Random File KB
- [arguments] ${file} ${size_in_kilobytes}
- Execute dd if=/dev/urandom of=${file} bs=1024 count=${size_in_kilobytes} status=none
+
+Teardown Multipart Tests
+ Remove Files /tmp/part1 /tmp/part2 /tmp/10mb /tmp/22mb /tmp/10kb
+
Wait Til Date Past
[arguments] ${date}
@@ -77,11 +79,9 @@
# upload we get error entity too small. So, considering further complete
# multipart upload, uploading each part as 5MB file, exception is for last part
- Run Keyword Create Random file 5
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey --part-number 1 --body /tmp/part1 --upload-id ${nextUploadID}
Should contain ${result} ETag
# override part
- Run Keyword Create Random file 5
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey --part-number 1 --body /tmp/part1 --upload-id ${nextUploadID}
Should contain ${result} ETag
@@ -94,7 +94,6 @@
Should contain ${result} UploadId
#upload parts
- Run Keyword Create Random file 5
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey1 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
${eTag1} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
Should contain ${result} ETag
@@ -171,13 +170,11 @@
Should contain ${result} UploadId
#upload parts
- Execute echo "Part1" > /tmp/part1
- ${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey2 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
+ ${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey2 --part-number 1 --body /tmp/10kb --upload-id ${uploadID}
${eTag1} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
Should contain ${result} ETag
- Execute echo "Part2" > /tmp/part2
- ${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey2 --part-number 2 --body /tmp/part2 --upload-id ${uploadID}
+ ${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey2 --part-number 2 --body /tmp/10kb --upload-id ${uploadID}
${eTag2} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
Should contain ${result} ETag
@@ -199,7 +196,6 @@
${result} = Execute AWSS3APICli and checkrc complete-multipart-upload --upload-id ${uploadID} --bucket ${BUCKET} --key ${PREFIX}/multipartKey3 --multipart-upload 'Parts=[{ETag=etag1,PartNumber=2},{ETag=etag2,PartNumber=1}]' 255
Should contain ${result} InvalidPart
#upload parts
- Run Keyword Create Random file 5
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey3 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
${eTag1} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
Should contain ${result} ETag
@@ -264,7 +260,6 @@
Should contain ${result} UploadId
#upload parts
- Run Keyword Create Random file 5
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey5 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
${eTag1} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
Should contain ${result} ETag
@@ -298,14 +293,12 @@
${result} = Execute AWSS3APICli and checkrc abort-multipart-upload --bucket ${BUCKET} --key ${PREFIX}/multipartKey5 --upload-id ${uploadID} 0
Test Multipart Upload with the simplified aws s3 cp API
- Create Random file 22
- Execute AWSS3Cli cp /tmp/part1 s3://${BUCKET}/mpyawscli
- Execute AWSS3Cli cp s3://${BUCKET}/mpyawscli /tmp/part1.result
+ Execute AWSS3Cli cp /tmp/22mb s3://${BUCKET}/mpyawscli
+ Execute AWSS3Cli cp s3://${BUCKET}/mpyawscli /tmp/22mb.result
Execute AWSS3Cli rm s3://${BUCKET}/mpyawscli
- Compare files /tmp/part1 /tmp/part1.result
+ Compare files /tmp/22mb /tmp/22mb.result
Test Multipart Upload Put With Copy
- Run Keyword Create Random file 5
${result} = Execute AWSS3APICli put-object --bucket ${BUCKET} --key ${PREFIX}/copytest/source --body /tmp/part1
@@ -327,8 +320,7 @@
Compare files /tmp/part1 /tmp/part-result
Test Multipart Upload Put With Copy and range
- Run Keyword Create Random file 10
- ${result} = Execute AWSS3APICli put-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/source --body /tmp/part1
+ ${result} = Execute AWSS3APICli put-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/source --body /tmp/10mb
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination
@@ -351,15 +343,14 @@
Execute AWSS3APICli complete-multipart-upload --upload-id ${uploadID} --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination --multipart-upload 'Parts=[{ETag=${eTag1},PartNumber=1},{ETag=${eTag2},PartNumber=2}]'
Execute AWSS3APICli get-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination /tmp/part-result
- Compare files /tmp/part1 /tmp/part-result
+ Compare files /tmp/10mb /tmp/part-result
Test Multipart Upload Put With Copy and range with IfModifiedSince
- Run Keyword Create Random file 10
${curDate} = Get Current Date
${beforeCreate} = Subtract Time From Date ${curDate} 1 day
${tomorrow} = Add Time To Date ${curDate} 1 day
- ${result} = Execute AWSS3APICli put-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/source --body /tmp/part1
+ ${result} = Execute AWSS3APICli put-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/source --body /tmp/10mb
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination
@@ -404,7 +395,7 @@
Execute AWSS3APICli complete-multipart-upload --upload-id ${uploadID} --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination --multipart-upload 'Parts=[{ETag=${eTag1},PartNumber=1},{ETag=${eTag2},PartNumber=2}]'
Execute AWSS3APICli get-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination /tmp/part-result
- Compare files /tmp/part1 /tmp/part-result
+ Compare files /tmp/10mb /tmp/part-result
Test Multipart Upload list
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key ${PREFIX}/listtest/key1
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot b/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot
index b205370..45dee92 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot
@@ -129,13 +129,6 @@
Should contain ${result} Location
Should contain ${result} ${bucket}
-Create bucket with layout
- [Arguments] ${layout}
- ${postfix} = Generate Ozone String
- ${bucket} = Set Variable bucket-${postfix}
- ${result} = Execute ozone sh bucket create --layout ${layout} s3v/${bucket}
- [Return] ${bucket}
-
Setup s3 tests
Return From Keyword if ${OZONE_S3_TESTS_SET_UP}
Run Keyword Generate random prefix
@@ -156,7 +149,7 @@
Create generated bucket
[Arguments] ${layout}=OBJECT_STORE
- ${BUCKET} = Create bucket with layout ${layout}
+ ${BUCKET} = Create bucket with layout s3v ${layout}
Set Global Variable ${BUCKET}
Create encrypted bucket
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
index f3c9227..007591d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
@@ -49,6 +49,7 @@
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -138,12 +139,15 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.OzoneTestBase;
import org.apache.ozone.test.tag.Flaky;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
+
+import static java.util.Collections.singletonMap;
import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
@@ -190,6 +194,7 @@
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
/**
@@ -197,7 +202,7 @@
* Client.
*/
@TestMethodOrder(MethodOrderer.MethodName.class)
-abstract class OzoneRpcClientTests {
+abstract class OzoneRpcClientTests extends OzoneTestBase {
private static MiniOzoneCluster cluster = null;
private static OzoneClient ozClient = null;
@@ -1094,6 +1099,223 @@ public void testPutKey() throws IOException {
}
}
+ @ParameterizedTest
+ @EnumSource
+ void rewriteKey(BucketLayout layout) throws IOException {
+ OzoneBucket bucket = createBucket(layout);
+ OzoneKeyDetails keyDetails = createTestKey(bucket);
+ OmKeyArgs keyArgs = toOmKeyArgs(keyDetails);
+ OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
+
+ final byte[] newContent = "rewrite value".getBytes(UTF_8);
+ rewriteKey(bucket, keyDetails, newContent);
+
+ OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, keyDetails.getName(), newContent);
+ assertThat(actualKeyDetails.getGeneration()).isGreaterThan(keyDetails.getGeneration());
+ assertMetadataUnchanged(keyDetails, actualKeyDetails);
+ assertMetadataAfterRewrite(keyInfo, ozoneManager.lookupKey(keyArgs));
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void overwriteAfterRewrite(BucketLayout layout) throws IOException {
+ OzoneBucket bucket = createBucket(layout);
+ OzoneKeyDetails keyDetails = createTestKey(bucket);
+ rewriteKey(bucket, keyDetails, "rewrite".getBytes(UTF_8));
+
+ final byte[] overwriteContent = "overwrite".getBytes(UTF_8);
+ OzoneKeyDetails overwriteDetails = createTestKey(bucket, keyDetails.getName(), overwriteContent);
+
+ OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, keyDetails.getName(), overwriteContent);
+ assertEquals(overwriteDetails.getGeneration(), actualKeyDetails.getGeneration());
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void rewriteAfterRename(BucketLayout layout) throws IOException {
+ OzoneBucket bucket = createBucket(layout);
+ OzoneKeyDetails keyDetails = createTestKey(bucket);
+ String newKeyName = "rewriteAfterRename-" + layout;
+
+ bucket.renameKey(keyDetails.getName(), newKeyName);
+ OzoneKeyDetails renamedKeyDetails = bucket.getKey(newKeyName);
+ OmKeyArgs keyArgs = toOmKeyArgs(renamedKeyDetails);
+ OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
+
+ final byte[] rewriteContent = "rewrite".getBytes(UTF_8);
+ rewriteKey(bucket, renamedKeyDetails, rewriteContent);
+
+ OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, newKeyName, rewriteContent);
+ assertMetadataUnchanged(keyDetails, actualKeyDetails);
+ assertMetadataAfterRewrite(keyInfo, ozoneManager.lookupKey(keyArgs));
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void renameAfterRewrite(BucketLayout layout) throws IOException {
+ OzoneBucket bucket = createBucket(layout);
+ OzoneKeyDetails keyDetails = createTestKey(bucket);
+ final byte[] rewriteContent = "rewrite".getBytes(UTF_8);
+ rewriteKey(bucket, keyDetails, rewriteContent);
+ OmKeyInfo keyInfo = ozoneManager.lookupKey(toOmKeyArgs(keyDetails));
+
+ String newKeyName = "renameAfterRewrite-" + layout;
+ bucket.renameKey(keyDetails.getName(), newKeyName);
+
+ OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, newKeyName, rewriteContent);
+ assertMetadataUnchanged(keyDetails, actualKeyDetails);
+ assertMetadataAfterRewrite(keyInfo, ozoneManager.lookupKey(toOmKeyArgs(actualKeyDetails)));
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void rewriteFailsDueToOutdatedGeneration(BucketLayout layout) throws IOException {
+ OzoneBucket bucket = createBucket(layout);
+ OzoneKeyDetails keyDetails = createTestKey(bucket);
+ OmKeyArgs keyArgs = toOmKeyArgs(keyDetails);
+
+ // overwrite to get new generation
+ final byte[] overwriteContent = "overwrite".getBytes(UTF_8);
+ OzoneKeyDetails overwriteDetails = createTestKey(bucket, keyDetails.getName(), overwriteContent);
+ OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
+
+ // try to rewrite previous generation
+ OMException e = assertThrows(OMException.class, () -> rewriteKey(bucket, keyDetails, "rewrite".getBytes(UTF_8)));
+ assertEquals(KEY_NOT_FOUND, e.getResult());
+ assertThat(e).hasMessageContaining("Generation mismatch");
+
+ OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, keyDetails.getName(), overwriteContent);
+ assertEquals(overwriteDetails.getGeneration(), actualKeyDetails.getGeneration());
+ assertMetadataUnchanged(overwriteDetails, actualKeyDetails);
+ assertUnchanged(keyInfo, ozoneManager.lookupKey(keyArgs));
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void rewriteFailsDueToOutdatedGenerationAtCommit(BucketLayout layout) throws IOException {
+ OzoneBucket bucket = createBucket(layout);
+ OzoneKeyDetails keyDetails = createTestKey(bucket);
+ final byte[] overwriteContent = "overwrite".getBytes(UTF_8);
+ OmKeyArgs keyArgs = toOmKeyArgs(keyDetails);
+ OmKeyInfo keyInfo;
+
+ OzoneOutputStream out = null;
+ final OzoneKeyDetails overwriteDetails;
+ try {
+ out = bucket.rewriteKey(keyDetails.getName(), keyDetails.getDataSize(),
+ keyDetails.getGeneration(), RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ keyDetails.getMetadata());
+ out.write("rewrite".getBytes(UTF_8));
+
+ overwriteDetails = createTestKey(bucket, keyDetails.getName(), overwriteContent);
+ keyInfo = ozoneManager.lookupKey(keyArgs);
+
+ OMException e = assertThrows(OMException.class, out::close);
+ assertEquals(KEY_NOT_FOUND, e.getResult());
+ assertThat(e).hasMessageContaining("does not match the expected generation to rewrite");
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+
+ OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, keyDetails.getName(), overwriteContent);
+ assertEquals(overwriteDetails.getGeneration(), actualKeyDetails.getGeneration());
+ assertUnchanged(keyInfo, ozoneManager.lookupKey(keyArgs));
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void cannotRewriteDeletedKey(BucketLayout layout) throws IOException {
+ OzoneBucket bucket = createBucket(layout);
+ OzoneKeyDetails keyDetails = createTestKey(bucket);
+ bucket.deleteKey(keyDetails.getName());
+
+ OMException e = assertThrows(OMException.class, () -> rewriteKey(bucket, keyDetails, "rewrite".getBytes(UTF_8)));
+ assertEquals(KEY_NOT_FOUND, e.getResult());
+ assertThat(e).hasMessageContaining("not found");
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void cannotRewriteRenamedKey(BucketLayout layout) throws IOException {
+ OzoneBucket bucket = createBucket(layout);
+ OzoneKeyDetails keyDetails = createTestKey(bucket);
+ bucket.renameKey(keyDetails.getName(), "newKeyName-" + layout.name());
+
+ OMException e = assertThrows(OMException.class, () -> rewriteKey(bucket, keyDetails, "rewrite".getBytes(UTF_8)));
+ assertEquals(KEY_NOT_FOUND, e.getResult());
+ assertThat(e).hasMessageContaining("not found");
+ }
+
+ private static void rewriteKey(
+ OzoneBucket bucket, OzoneKeyDetails keyDetails, byte[] newContent
+ ) throws IOException {
+ try (OzoneOutputStream out = bucket.rewriteKey(keyDetails.getName(), keyDetails.getDataSize(),
+ keyDetails.getGeneration(), RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ keyDetails.getMetadata())) {
+ out.write(newContent);
+ }
+ }
+
+ private static OzoneKeyDetails assertKeyContent(
+ OzoneBucket bucket, String keyName, byte[] expectedContent
+ ) throws IOException {
+ OzoneKeyDetails updatedKeyDetails = bucket.getKey(keyName);
+
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[expectedContent.length];
+ IOUtils.readFully(is, fileContent);
+ assertArrayEquals(expectedContent, fileContent);
+ }
+
+ return updatedKeyDetails;
+ }
+
+ private OzoneBucket createBucket(BucketLayout layout) throws IOException {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+
+ BucketArgs args = BucketArgs.newBuilder()
+ .setBucketLayout(layout)
+ .build();
+
+ volume.createBucket(bucketName, args);
+ return volume.getBucket(bucketName);
+ }
+
+ private static OmKeyArgs toOmKeyArgs(OzoneKeyDetails keyDetails) {
+ return new OmKeyArgs.Builder()
+ .setVolumeName(keyDetails.getVolumeName())
+ .setBucketName(keyDetails.getBucketName())
+ .setKeyName(keyDetails.getName())
+ .build();
+ }
+
+ private static void assertUnchanged(OmKeyInfo original, OmKeyInfo current) {
+ assertEquals(original.getAcls(), current.getAcls());
+ assertEquals(original.getMetadata(), current.getMetadata());
+ assertEquals(original.getCreationTime(), current.getCreationTime());
+ assertEquals(original.getUpdateID(), current.getUpdateID());
+ assertEquals(original.getModificationTime(), current.getModificationTime());
+ }
+
+ private static void assertMetadataAfterRewrite(OmKeyInfo original, OmKeyInfo updated) {
+ assertEquals(original.getAcls(), updated.getAcls());
+ assertEquals(original.getMetadata(), updated.getMetadata());
+ assertEquals(original.getCreationTime(), updated.getCreationTime());
+ assertThat(updated.getUpdateID()).isGreaterThan(original.getUpdateID());
+ assertThat(updated.getModificationTime()).isGreaterThanOrEqualTo(original.getModificationTime());
+ }
+
+ private static void assertMetadataUnchanged(OzoneKeyDetails original, OzoneKeyDetails rewritten) {
+ assertEquals(original.getOwner(), rewritten.getOwner());
+ assertEquals(original.getMetadata(), rewritten.getMetadata());
+ }
+
@Test
public void testCheckUsedBytesQuota() throws IOException {
String volumeName = UUID.randomUUID().toString();
@@ -3956,15 +4178,28 @@ private void completeMultipartUpload(OzoneBucket bucket, String keyName,
assertNotNull(omMultipartUploadCompleteInfo.getHash());
}
- private void createTestKey(OzoneBucket bucket, String keyName,
- String keyValue) throws IOException {
- OzoneOutputStream out = bucket.createKey(keyName,
- keyValue.getBytes(UTF_8).length, RATIS,
- ONE, new HashMap<>());
- out.write(keyValue.getBytes(UTF_8));
- out.close();
- OzoneKey key = bucket.getKey(keyName);
+ private OzoneKeyDetails createTestKey(OzoneBucket bucket) throws IOException {
+ return createTestKey(bucket, getTestName(), UUID.randomUUID().toString());
+ }
+
+ private OzoneKeyDetails createTestKey(
+ OzoneBucket bucket, String keyName, String keyValue
+ ) throws IOException {
+ return createTestKey(bucket, keyName, keyValue.getBytes(UTF_8));
+ }
+
+ private OzoneKeyDetails createTestKey(
+ OzoneBucket bucket, String keyName, byte[] bytes
+ ) throws IOException {
+ RatisReplicationConfig replication = RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE);
+ Map<String, String> metadata = singletonMap("key", RandomStringUtils.randomAscii(10));
+ try (OzoneOutputStream out = bucket.createKey(keyName, bytes.length, replication, metadata)) {
+ out.write(bytes);
+ }
+ OzoneKeyDetails key = bucket.getKey(keyName);
+ assertNotNull(key);
assertEquals(keyName, key.getName());
+ return key;
}
private void assertKeyRenamedEx(OzoneBucket bucket, String keyName) {
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 5f5a50d..8640343 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1029,6 +1029,14 @@
// S3 object tags support
repeated hadoop.hdds.KeyValue tags = 22;
+
+ // expectedDataGeneration, when used in key creation indicates that a
+ // key with the same keyName should exist with the given generation.
+ // For a key commit to succeed, the original key should still be present with the
+ // generation unchanged.
+ // This allows a key to be created an committed atomically if the original has not
+ // been modified.
+ optional uint64 expectedDataGeneration = 23;
}
message KeyLocation {
@@ -1113,6 +1121,13 @@
optional bool isFile = 19;
optional string ownerName = 20;
repeated hadoop.hdds.KeyValue tags = 21;
+ // expectedDataGeneration, when used in key creation indicates that a
+ // key with the same keyName should exist with the given generation.
+ // For a key commit to succeed, the original key should still be present with the
+ // generation unchanged.
+ // This allows a key to be created an committed atomically if the original has not
+ // been modified.
+ optional uint64 expectedDataGeneration = 22;
}
message BasicKeyInfo {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java
index 78e67bb..bf12a1d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java
@@ -82,6 +82,10 @@ default Map<String, String> buildKeyArgsAuditMap(KeyArgs keyArgs) {
auditMap.put(OzoneConsts.REPLICATION_CONFIG,
ECReplicationConfig.toString(keyArgs.getEcReplicationConfig()));
}
+ if (keyArgs.hasExpectedDataGeneration()) {
+ auditMap.put(OzoneConsts.REWRITE_GENERATION,
+ String.valueOf(keyArgs.getExpectedDataGeneration()));
+ }
for (HddsProtos.KeyValue item : keyArgs.getMetadataList()) {
if (ETAG.equals(item.getKey())) {
auditMap.put(ETAG, item.getValue());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 5c3593b..7d31422 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -240,6 +240,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
throw new OMException("Failed to " + action + " key, as " + dbOpenKey +
" entry is not found in the OpenKey table", KEY_NOT_FOUND);
}
+
+ validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap);
+ // Optimistic locking validation has passed. Now set the rewrite fields to null so they are
+ // not persisted in the key table.
+ omKeyInfo.setExpectedDataGeneration(null);
+
omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
commitKeyArgs.getMetadataList()));
if (isHSync) {
@@ -497,4 +503,22 @@ public static OMRequest disallowHsync(
}
return req;
}
+
+ protected void validateAtomicRewrite(OmKeyInfo existing, OmKeyInfo toCommit, Map<String, String> auditMap)
+ throws OMException {
+ if (toCommit.getExpectedDataGeneration() != null) {
+ // These values are not passed in the request keyArgs, so add them into the auditMap if they are present
+ // in the open key entry.
+ auditMap.put(OzoneConsts.REWRITE_GENERATION, String.valueOf(toCommit.getExpectedDataGeneration()));
+ if (existing == null) {
+ throw new OMException("Atomic rewrite is not allowed for a new key", KEY_NOT_FOUND);
+ }
+ if (!toCommit.getExpectedDataGeneration().equals(existing.getUpdateID())) {
+ throw new OMException("Cannot commit as current generation (" + existing.getUpdateID() +
+ ") does not match the expected generation to rewrite (" + toCommit.getExpectedDataGeneration() + ")",
+ KEY_NOT_FOUND);
+ }
+ }
+ }
+
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index 704e9e9..d2cd3fd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -148,7 +148,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
action = "hsync";
}
throw new OMException("Failed to " + action + " key, as " +
- dbOpenFileKey + "entry is not found in the OpenKey table",
+ dbOpenFileKey + " entry is not found in the OpenKey table",
KEY_NOT_FOUND);
}
omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
@@ -176,6 +176,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
OmKeyInfo keyToDelete =
omMetadataManager.getKeyTable(getBucketLayout()).get(dbFileKey);
+
+ validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap);
+ // Optimistic locking validation has passed. Now set the rewrite fields to null so they are
+ // not persisted in the key table.
+ omKeyInfo.setExpectedDataGeneration(null);
+
if (null != keyToDelete) {
final String clientIdString
= String.valueOf(commitKeyRequest.getClientID());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index e9a9f00..36f2a8c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -231,6 +231,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
keyName);
OmKeyInfo dbKeyInfo = omMetadataManager.getKeyTable(getBucketLayout())
.getIfExist(dbKeyName);
+ validateAtomicRewrite(dbKeyInfo, keyArgs);
OmBucketInfo bucketInfo =
getBucketInfo(omMetadataManager, volumeName, bucketName);
@@ -440,4 +441,17 @@ public static OMRequest blockCreateKeyWithBucketLayoutFromOldClient(
}
return req;
}
+
+ protected void validateAtomicRewrite(OmKeyInfo dbKeyInfo, KeyArgs keyArgs)
+ throws OMException {
+ if (keyArgs.hasExpectedDataGeneration()) {
+ // If a key does not exist, or if it exists but the updateID do not match, then fail this request.
+ if (dbKeyInfo == null) {
+ throw new OMException("Key not found during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND);
+ }
+ if (dbKeyInfo.getUpdateID() != keyArgs.getExpectedDataGeneration()) {
+ throw new OMException("Generation mismatch during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND);
+ }
+ }
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
index 6fe8c12..d8d9b19 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
@@ -120,6 +120,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
dbFileInfo = OMFileRequest.getOmKeyInfoFromFileTable(false,
omMetadataManager, dbFileKey, keyName);
}
+ validateAtomicRewrite(dbFileInfo, keyArgs);
// Check if a file or directory exists with same key name.
if (pathInfoFSO.getDirectoryResult() == DIRECTORY_EXISTS) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 50bb105..a90d7a2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -775,7 +775,6 @@ protected OmKeyInfo prepareFileInfo(
dbKeyInfo.setReplicationConfig(replicationConfig);
// Construct a new metadata map from KeyArgs.
- // Clear the old one when the key is overwritten.
dbKeyInfo.getMetadata().clear();
dbKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
keyArgs.getMetadataList()));
@@ -786,6 +785,10 @@ protected OmKeyInfo prepareFileInfo(
dbKeyInfo.getTags().putAll(KeyValueUtil.getFromProtobuf(
keyArgs.getTagsList()));
+ if (keyArgs.hasExpectedDataGeneration()) {
+ dbKeyInfo.setExpectedDataGeneration(keyArgs.getExpectedDataGeneration());
+ }
+
dbKeyInfo.setFileEncryptionInfo(encInfo);
return dbKeyInfo;
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index 9719865..0503578 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -31,11 +32,13 @@
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
@@ -58,9 +61,12 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -116,7 +122,7 @@ public void testValidateAndUpdateCacheWithUnknownBlockId() throws Exception {
OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
- assertEquals(OzoneManagerProtocolProtos.Status.OK,
+ assertEquals(OK,
omClientResponse.getOMResponse().getStatus());
// Entry should be deleted from openKey Table.
@@ -183,7 +189,7 @@ public void testValidateAndUpdateCache() throws Exception {
OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
- assertEquals(OzoneManagerProtocolProtos.Status.OK,
+ assertEquals(OK,
omClientResponse.getOMResponse().getStatus());
// Entry should be deleted from openKey Table.
@@ -219,6 +225,68 @@ public void testValidateAndUpdateCache() throws Exception {
}
@Test
+ public void testAtomicRewrite() throws Exception {
+ Table<String, OmKeyInfo> openKeyTable = omMetadataManager.getOpenKeyTable(getBucketLayout());
+ Table<String, OmKeyInfo> closedKeyTable = omMetadataManager.getKeyTable(getBucketLayout());
+
+ OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest());
+ OMKeyCommitRequest omKeyCommitRequest = getOmKeyCommitRequest(modifiedOmRequest);
+ KeyArgs keyArgs = modifiedOmRequest.getCommitKeyRequest().getKeyArgs();
+
+ OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager, omKeyCommitRequest.getBucketLayout());
+
+ // Append new blocks
+ List<OmKeyLocationInfo> allocatedLocationList =
+ keyArgs.getKeyLocationsList().stream()
+ .map(OmKeyLocationInfo::getFromProtobuf)
+ .collect(Collectors.toList());
+
+ OmKeyInfo.Builder omKeyInfoBuilder = OMRequestTestUtils.createOmKeyInfo(
+ volumeName, bucketName, keyName, replicationConfig, new OmKeyLocationInfoGroup(version, new ArrayList<>()));
+ omKeyInfoBuilder.setExpectedDataGeneration(1L);
+ OmKeyInfo omKeyInfo = omKeyInfoBuilder.build();
+ omKeyInfo.appendNewBlocks(allocatedLocationList, false);
+ List<OzoneAcl> acls = Collections.singletonList(OzoneAcl.parseAcl("user:foo:rw"));
+ omKeyInfo.addAcl(acls.get(0));
+
+ String openKey = addKeyToOpenKeyTable(allocatedLocationList, omKeyInfo);
+ OmKeyInfo openKeyInfo = openKeyTable.get(openKey);
+ assertNotNull(openKeyInfo);
+ assertEquals(acls, openKeyInfo.getAcls());
+ // At this stage, we have an openKey, with rewrite generation of 1.
+ // However there is no closed key entry, so the commit should fail.
+ OMClientResponse omClientResponse =
+ omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
+ assertEquals(KEY_NOT_FOUND, omClientResponse.getOMResponse().getStatus());
+
+ // Now add the key to the key table, and try again, but with different generation
+ omKeyInfoBuilder.setExpectedDataGeneration(null);
+ omKeyInfoBuilder.setUpdateID(0L);
+ OmKeyInfo invalidKeyInfo = omKeyInfoBuilder.build();
+ closedKeyTable.put(getOzonePathKey(), invalidKeyInfo);
+ // This should fail as the updateID ia zero and the open key has rewrite generation of 1.
+ omClientResponse = omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
+ assertEquals(KEY_NOT_FOUND, omClientResponse.getOMResponse().getStatus());
+
+ omKeyInfoBuilder.setUpdateID(1L);
+ OmKeyInfo closedKeyInfo = omKeyInfoBuilder.build();
+
+ closedKeyTable.delete(getOzonePathKey());
+ closedKeyTable.put(getOzonePathKey(), closedKeyInfo);
+
+ // Now the key should commit as the updateID and rewrite Generation match.
+ omClientResponse = omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
+ assertEquals(OK, omClientResponse.getOMResponse().getStatus());
+
+ OmKeyInfo committedKey = closedKeyTable.get(getOzonePathKey());
+ assertNull(committedKey.getExpectedDataGeneration());
+ // Generation should be changed
+ assertNotEquals(closedKeyInfo.getGeneration(), committedKey.getGeneration());
+ assertEquals(acls, committedKey.getAcls());
+ }
+
+ @Test
public void testValidateAndUpdateCacheWithUncommittedBlocks()
throws Exception {
@@ -260,7 +328,7 @@ public void testValidateAndUpdateCacheWithUncommittedBlocks()
OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
- assertEquals(OzoneManagerProtocolProtos.Status.OK,
+ assertEquals(OK,
omClientResponse.getOMResponse().getStatus());
Map<String, RepeatedOmKeyInfo> toDeleteKeyList
@@ -385,7 +453,7 @@ private Map<String, RepeatedOmKeyInfo> doKeyCommit(boolean isHSync,
OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
- assertEquals(OzoneManagerProtocolProtos.Status.OK,
+ assertEquals(OK,
omClientResponse.getOMResponse().getStatus());
// Key should be present in both OpenKeyTable and KeyTable with HSync commit
@@ -550,7 +618,7 @@ public void testValidateAndUpdateCacheWithKeyNotFound() throws Exception {
OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
- assertEquals(OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND,
+ assertEquals(KEY_NOT_FOUND,
omClientResponse.getOMResponse().getStatus());
omKeyInfo =
@@ -596,7 +664,7 @@ public void testValidateAndUpdateCacheOnOverwrite() throws Exception {
OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 102L);
- assertEquals(OzoneManagerProtocolProtos.Status.OK, omClientResponse.getOMResponse().getStatus());
+ assertEquals(OK, omClientResponse.getOMResponse().getStatus());
// New entry should be created in key Table.
omKeyInfo = omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout()).get(ozoneKey);
@@ -753,6 +821,14 @@ protected String addKeyToOpenKeyTable(List<OmKeyLocationInfo> locationList)
}
@Nonnull
+ protected String addKeyToOpenKeyTable(List<OmKeyLocationInfo> locationList, OmKeyInfo keyInfo) throws Exception {
+ OMRequestTestUtils.addKeyToTable(true, false, keyInfo, clientID, 0, omMetadataManager);
+
+ return omMetadataManager.getOpenKey(volumeName, bucketName,
+ keyName, clientID);
+ }
+
+ @Nonnull
protected OMKeyCommitRequest getOmKeyCommitRequest(OMRequest omRequest) {
return new OMKeyCommitRequest(omRequest, BucketLayout.DEFAULT);
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
index 48cc527..c3b913b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
@@ -69,30 +69,36 @@ protected String getOzonePathKey() throws IOException {
}
@Override
- protected String addKeyToOpenKeyTable(List<OmKeyLocationInfo> locationList)
+ protected String addKeyToOpenKeyTable(List<OmKeyLocationInfo> locationList, OmKeyInfo keyInfo)
throws Exception {
// need to initialize parentID
if (getParentDir() == null) {
parentID = getBucketID();
} else {
parentID = OMRequestTestUtils.addParentsToDirTable(volumeName,
- bucketName, getParentDir(), omMetadataManager);
+ bucketName, getParentDir(), omMetadataManager);
}
+ keyInfo.setParentObjectID(parentID);
+ keyInfo.appendNewBlocks(locationList, false);
+
+ String fileName = OzoneFSUtils.getFileName(keyName);
+ return OMRequestTestUtils.addFileToKeyTable(true, false,
+ fileName, keyInfo, clientID, txnLogId, omMetadataManager);
+
+ }
+
+ @Override
+ protected String addKeyToOpenKeyTable(List<OmKeyLocationInfo> locationList)
+ throws Exception {
long objectId = 100;
OmKeyInfo omKeyInfoFSO =
OMRequestTestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
RatisReplicationConfig.getInstance(ONE), new OmKeyLocationInfoGroup(version, new ArrayList<>(), false))
.setObjectID(objectId)
- .setParentObjectID(parentID)
.setUpdateID(100L)
.build();
- omKeyInfoFSO.appendNewBlocks(locationList, false);
-
- String fileName = OzoneFSUtils.getFileName(keyName);
- return OMRequestTestUtils.addFileToKeyTable(true, false,
- fileName, omKeyInfoFSO, clientID, txnLogId, omMetadataManager);
-
+ return addKeyToOpenKeyTable(locationList, omKeyInfoFSO);
}
@Nonnull
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
index 166edb5..4bfdd33 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -63,6 +62,8 @@
.OMRequest;
import org.junit.jupiter.params.provider.ValueSource;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
@@ -71,6 +72,7 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
import static org.apache.hadoop.ozone.om.request.OMRequestTestUtils.addVolumeAndBucketToDB;
import static org.apache.hadoop.ozone.om.request.OMRequestTestUtils.createOmKeyInfo;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.NOT_A_FILE;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
import static org.assertj.core.api.Assertions.assertThat;
@@ -121,9 +123,9 @@ private void preExecuteTest(boolean isMultipartKey, int partNumber,
long scmBlockSize = ozoneManager.getScmBlockSize();
for (int i = 0; i <= repConfig.getRequiredNodes(); i++) {
doPreExecute(createKeyRequest(isMultipartKey, partNumber,
- scmBlockSize * i, repConfig));
+ scmBlockSize * i, repConfig, null));
doPreExecute(createKeyRequest(isMultipartKey, partNumber,
- scmBlockSize * i + 1, repConfig));
+ scmBlockSize * i + 1, repConfig, null));
}
}
@@ -148,7 +150,7 @@ public void testValidateAndUpdateCache(
tags.put("tag-key2", "tag-value2");
OMRequest modifiedOmRequest =
- doPreExecute(createKeyRequest(false, 0, Collections.emptyMap(), tags));
+ doPreExecute(createKeyRequest(false, 0, emptyMap(), tags));
OMKeyCreateRequest omKeyCreateRequest =
getOMKeyCreateRequest(modifiedOmRequest);
@@ -197,7 +199,7 @@ public void testValidateAndUpdateCache(
// Override same key again
modifiedOmRequest =
- doPreExecute(createKeyRequest(false, 0, Collections.emptyMap(), tags));
+ doPreExecute(createKeyRequest(false, 0, emptyMap(), tags));
id = modifiedOmRequest.getCreateKeyRequest().getClientID();
openKey = getOpenKey(id);
@@ -534,7 +536,7 @@ public void testCreationWithoutMetadataFollowedByOverwriteWithMetadata(
// Create the key request without any initial metadata
OMRequest createRequestWithoutMetadata = createKeyRequest(false, 0, keyName,
- null); // Passing 'null' for metadata
+ null, emptyMap(), emptyList()); // Passing 'null' for metadata
OMKeyCreateRequest createOmKeyCreateRequest =
new OMKeyCreateRequest(createRequestWithoutMetadata, getBucketLayout());
@@ -557,7 +559,7 @@ public void testCreationWithoutMetadataFollowedByOverwriteWithMetadata(
// Overwrite the previously created key with new metadata
OMRequest overwriteRequestWithMetadata =
- createKeyRequest(false, 0, keyName, overwriteMetadata);
+ createKeyRequest(false, 0, keyName, overwriteMetadata, emptyMap(), emptyList());
OMKeyCreateRequest overwriteOmKeyCreateRequest =
new OMKeyCreateRequest(overwriteRequestWithMetadata, getBucketLayout());
@@ -657,23 +659,23 @@ private OMRequest doPreExecute(OMRequest originalOMRequest) throws Exception {
@SuppressWarnings("parameterNumber")
protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber) {
- return createKeyRequest(isMultipartKey, partNumber, Collections.emptyMap(), Collections.emptyMap());
+ return createKeyRequest(isMultipartKey, partNumber, emptyMap(), emptyMap());
}
protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
Map<String, String> metadata, Map<String, String> tags) {
- return createKeyRequest(isMultipartKey, partNumber, keyName, metadata, tags);
+ return createKeyRequest(isMultipartKey, partNumber, keyName, metadata, tags, emptyList());
}
private OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
String keyName) {
- return createKeyRequest(isMultipartKey, partNumber, keyName, null);
+ return createKeyRequest(isMultipartKey, partNumber, keyName, emptyMap());
}
protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
String keyName,
Map<String, String> metadata) {
- return createKeyRequest(isMultipartKey, partNumber, keyName, metadata, null);
+ return createKeyRequest(isMultipartKey, partNumber, keyName, metadata, emptyMap(), emptyList());
}
/**
@@ -693,7 +695,8 @@ protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
String keyName,
Map<String, String> metadata,
- Map<String, String> tags) {
+ Map<String, String> tags,
+ List<OzoneAcl> acls) {
KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
@@ -704,6 +707,9 @@ protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
.setType(replicationConfig.getReplicationType())
.setLatestVersionLocation(true);
+ for (OzoneAcl acl : acls) {
+ keyArgs.addAcls(OzoneAcl.toProtobuf(acl));
+ }
// Configure for multipart upload, if applicable
if (isMultipartKey) {
keyArgs.setDataSize(dataSize).setMultipartNumber(partNumber);
@@ -733,7 +739,14 @@ protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
private OMRequest createKeyRequest(
boolean isMultipartKey, int partNumber, long keyLength,
- ReplicationConfig repConfig) {
+ ReplicationConfig repConfig, Long expectedDataGeneration) {
+ return createKeyRequest(isMultipartKey, partNumber, keyLength, repConfig,
+ expectedDataGeneration, null);
+ }
+
+ private OMRequest createKeyRequest(
+ boolean isMultipartKey, int partNumber, long keyLength,
+ ReplicationConfig repConfig, Long expectedDataGeneration, Map<String, String> metaData) {
KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(volumeName).setBucketName(bucketName)
@@ -752,8 +765,17 @@ private OMRequest createKeyRequest(
if (isMultipartKey) {
keyArgs.setMultipartNumber(partNumber);
}
+ if (expectedDataGeneration != null) {
+ keyArgs.setExpectedDataGeneration(expectedDataGeneration);
+ }
+ if (metaData != null) {
+ metaData.forEach((key, value) -> keyArgs.addMetadata(KeyValue.newBuilder()
+ .setKey(key)
+ .setValue(value)
+ .build()));
+ }
- OzoneManagerProtocolProtos.CreateKeyRequest createKeyRequest =
+ CreateKeyRequest createKeyRequest =
CreateKeyRequest.newBuilder().setKeyArgs(keyArgs).build();
return OMRequest.newBuilder()
@@ -935,6 +957,72 @@ public void testKeyCreateInheritParentDefaultAcls(
}
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testAtomicRewrite(
+ boolean setKeyPathLock, boolean setFileSystemPaths) throws Exception {
+ when(ozoneManager.getOzoneLockProvider()).thenReturn(
+ new OzoneLockProvider(setKeyPathLock, setFileSystemPaths));
+
+ OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, omMetadataManager,
+ OmBucketInfo.newBuilder().setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setBucketLayout(getBucketLayout()));
+
+ // First, create a key with the rewrite ID - this should fail as no key exists
+ OMRequest omRequest = createKeyRequest(false, 0, 100,
+ RatisReplicationConfig.getInstance(THREE), 1L);
+ omRequest = doPreExecute(omRequest);
+ OMKeyCreateRequest omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
+ OMClientResponse response = omKeyCreateRequest.validateAndUpdateCache(ozoneManager, 105L);
+ assertEquals(KEY_NOT_FOUND, response.getOMResponse().getStatus());
+
+ // Now pre-create the key in the system so we can rewrite it.
+ Map<String, String> metadata = Collections.singletonMap("metakey", "metavalue");
+ Map<String, String> reWriteMetadata = Collections.singletonMap("metakey", "rewriteMetavalue");
+
+ List<OzoneAcl> acls = Collections.singletonList(OzoneAcl.parseAcl("user:foo:rw"));
+ OmKeyInfo createdKeyInfo = createAndCheck(keyName, metadata, acls);
+ // Commit openKey entry.
+ omMetadataManager.getKeyTable(getBucketLayout()).put(getOzoneKey(), createdKeyInfo);
+
+ // Retrieve the committed key info
+ OmKeyInfo existingKeyInfo = omMetadataManager.getKeyTable(getBucketLayout()).get(getOzoneKey());
+ List<OzoneAcl> existingAcls = existingKeyInfo.getAcls();
+ assertEquals(acls, existingAcls);
+
+ // Create a request with a generation which doesn't match the current key
+ omRequest = createKeyRequest(false, 0, 100,
+ RatisReplicationConfig.getInstance(THREE), existingKeyInfo.getGeneration() + 1, reWriteMetadata);
+ omRequest = doPreExecute(omRequest);
+ omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
+ response = omKeyCreateRequest.validateAndUpdateCache(ozoneManager, 105L);
+ // Still fails, as the matching key is not present.
+ assertEquals(KEY_NOT_FOUND, response.getOMResponse().getStatus());
+
+ // Now create the key with the correct rewrite generation
+ omRequest = createKeyRequest(false, 0, 100,
+ RatisReplicationConfig.getInstance(THREE), existingKeyInfo.getGeneration(), reWriteMetadata);
+ omRequest = doPreExecute(omRequest);
+ omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
+ response = omKeyCreateRequest.validateAndUpdateCache(ozoneManager, 105L);
+ assertEquals(OK, response.getOMResponse().getStatus());
+
+ OmKeyInfo openKeyInfo = omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .get(getOpenKey(omRequest.getCreateKeyRequest().getClientID()));
+
+ assertEquals(existingKeyInfo.getGeneration(), openKeyInfo.getExpectedDataGeneration());
+ // Creation time should remain the same on rewrite.
+ assertEquals(existingKeyInfo.getCreationTime(), openKeyInfo.getCreationTime());
+ // Update ID should change
+ assertNotEquals(existingKeyInfo.getGeneration(), openKeyInfo.getGeneration());
+ assertEquals(metadata, existingKeyInfo.getMetadata());
+ // The metadata should not be copied from the existing key. It should be passed in the request.
+ assertEquals(reWriteMetadata, openKeyInfo.getMetadata());
+ // Ensure the ACLS are copied over from the existing key.
+ assertEquals(existingAcls, openKeyInfo.getAcls());
+ }
+
/**
* Leaf file has ACCESS scope acls which inherited
* from parent DEFAULT acls.
@@ -991,9 +1079,13 @@ private void checkNotAFile(String keyName) throws Exception {
assertEquals(NOT_A_FILE, omClientResponse.getOMResponse().getStatus());
}
-
private void createAndCheck(String keyName) throws Exception {
- OMRequest omRequest = createKeyRequest(false, 0, keyName);
+ createAndCheck(keyName, emptyMap(), emptyList());
+ }
+
+ private OmKeyInfo createAndCheck(String keyName, Map<String, String> metadata, List<OzoneAcl> acls)
+ throws Exception {
+ OMRequest omRequest = createKeyRequest(false, 0, keyName, metadata, emptyMap(), acls);
OMKeyCreateRequest omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
@@ -1006,16 +1098,13 @@ private void createAndCheck(String keyName) throws Exception {
assertEquals(OK, omClientResponse.getOMResponse().getStatus());
- checkCreatedPaths(omKeyCreateRequest, omRequest, keyName);
+ return checkCreatedPaths(omKeyCreateRequest, omRequest, keyName);
}
- protected void checkCreatedPaths(
+ protected OmKeyInfo checkCreatedPaths(
OMKeyCreateRequest omKeyCreateRequest, OMRequest omRequest,
String keyName) throws Exception {
keyName = omKeyCreateRequest.validateAndNormalizeKey(true, keyName);
- // Check intermediate directories created or not.
- Path keyPath = Paths.get(keyName);
- checkIntermediatePaths(keyPath);
// Check open key entry
String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
@@ -1024,6 +1113,7 @@ protected void checkCreatedPaths(
omMetadataManager.getOpenKeyTable(omKeyCreateRequest.getBucketLayout())
.get(openKey);
assertNotNull(omKeyInfo);
+ return omKeyInfo;
}
protected long checkIntermediatePaths(Path keyPath) throws Exception {
@@ -1048,7 +1138,7 @@ protected String getOzoneKey() throws IOException {
}
protected OMKeyCreateRequest getOMKeyCreateRequest(OMRequest omRequest) {
- return new OMKeyCreateRequest(omRequest, BucketLayout.DEFAULT);
+ return new OMKeyCreateRequest(omRequest, getBucketLayout());
}
protected OMKeyCreateRequest getOMKeyCreateRequest(
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
index 2a25a9b..a5181b2 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
@@ -117,7 +117,7 @@ protected void addToKeyTable(String keyName) throws Exception {
}
@Override
- protected void checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
+ protected OmKeyInfo checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
OMRequest omRequest, String keyName) throws Exception {
keyName = omKeyCreateRequest.validateAndNormalizeKey(true, keyName,
BucketLayout.FILE_SYSTEM_OPTIMIZED);
@@ -139,6 +139,7 @@ protected void checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
omMetadataManager.getOpenKeyTable(omKeyCreateRequest.getBucketLayout())
.get(openKey);
assertNotNull(omKeyInfo);
+ return omKeyInfo;
}
@Override
@@ -152,6 +153,11 @@ protected long checkIntermediatePaths(Path keyPath) throws Exception {
long lastKnownParentId = omBucketInfo.getObjectID();
final long volumeId = omMetadataManager.getVolumeId(volumeName);
+ if (keyPath == null) {
+ // The file is at the root of the bucket, so it has no parent folder. The parent is
+ // the bucket itself.
+ return lastKnownParentId;
+ }
Iterator<Path> elements = keyPath.iterator();
StringBuilder fullKeyPath = new StringBuilder(bucketKey);
while (elements.hasNext()) {
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index b75167c..bea831a 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -241,6 +241,14 @@ public OzoneOutputStream createKey(String volumeName, String bucketName,
}
@Override
+ public OzoneOutputStream rewriteKey(String volumeName, String bucketName, String keyName,
+ long size, long existingKeyGeneration, ReplicationConfig replicationConfig,
+ Map<String, String> metadata) throws IOException {
+ return getBucket(volumeName, bucketName)
+ .rewriteKey(keyName, size, existingKeyGeneration, replicationConfig, metadata);
+ }
+
+ @Override
public OzoneInputStream getKey(String volumeName, String bucketName,
String keyName) throws IOException {
return getBucket(volumeName, bucketName).readKey(keyName);
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 3320801..06b6a8e 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -161,6 +161,38 @@ public void close() throws IOException {
}
@Override
+ public OzoneOutputStream rewriteKey(String keyName, long size, long existingKeyGeneration,
+ ReplicationConfig rConfig, Map<String, String> metadata) throws IOException {
+ final ReplicationConfig repConfig;
+ if (rConfig == null) {
+ repConfig = getReplicationConfig();
+ } else {
+ repConfig = rConfig;
+ }
+ ReplicationConfig finalReplicationCon = repConfig;
+ ByteArrayOutputStream byteArrayOutputStream =
+ new KeyMetadataAwareOutputStream(metadata) {
+ @Override
+ public void close() throws IOException {
+ keyContents.put(keyName, toByteArray());
+ keyDetails.put(keyName, new OzoneKeyDetails(
+ getVolumeName(),
+ getName(),
+ keyName,
+ size,
+ System.currentTimeMillis(),
+ System.currentTimeMillis(),
+ new ArrayList<>(), finalReplicationCon, metadata, null,
+ () -> readKey(keyName), true, null, null
+ ));
+ super.close();
+ }
+ };
+
+ return new OzoneOutputStream(byteArrayOutputStream, null);
+ }
+
+ @Override
public OzoneDataStreamOutput createStreamKey(String key, long size,
ReplicationConfig rConfig,
Map<String, String> keyMetadata,
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/KeyCommands.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/KeyCommands.java
index 726ab8b..bbef584 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/KeyCommands.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/KeyCommands.java
@@ -44,6 +44,7 @@
CatKeyHandler.class,
PutKeyHandler.class,
RenameKeyHandler.class,
+ RewriteKeyHandler.class,
CopyKeyHandler.class,
DeleteKeyHandler.class,
AddAclKeyHandler.class,
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
index 68beb69..833f4f7 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
@@ -19,13 +19,13 @@
package org.apache.hadoop.ozone.shell.keys;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
@@ -40,6 +40,7 @@
import org.apache.hadoop.ozone.client.OzoneClientException;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.shell.OzoneAddress;
import org.apache.commons.codec.digest.DigestUtils;
@@ -68,6 +69,10 @@ public class PutKeyHandler extends KeyHandler {
@Mixin
private ShellReplicationOptions replication;
+ @Option(names = "--expectedGeneration",
+ description = "Store key only if it already exists and its generation matches the value provided")
+ private long expectedGeneration;
+
@Override
protected void execute(OzoneClient client, OzoneAddress address)
throws IOException, OzoneClientException {
@@ -79,7 +84,7 @@ protected void execute(OzoneClient client, OzoneAddress address)
File dataFile = new File(fileName);
if (isVerbose()) {
- try (InputStream stream = new FileInputStream(dataFile)) {
+ try (InputStream stream = Files.newInputStream(dataFile.toPath())) {
String hash = DigestUtils.sha256Hex(stream);
out().printf("File sha256 checksum : %s%n", hash);
}
@@ -109,7 +114,7 @@ protected void execute(OzoneClient client, OzoneAddress address)
}
}
- void async(
+ private void async(
File dataFile, OzoneBucket bucket,
String keyName, Map<String, String> keyMetadata,
ReplicationConfig replicationConfig, int chunkSize)
@@ -117,14 +122,21 @@ void async(
if (isVerbose()) {
out().println("API: async");
}
- try (InputStream input = new FileInputStream(dataFile);
- OutputStream output = bucket.createKey(keyName, dataFile.length(),
- replicationConfig, keyMetadata)) {
+ try (InputStream input = Files.newInputStream(dataFile.toPath());
+ OutputStream output = createOrReplaceKey(bucket, keyName, dataFile.length(), keyMetadata, replicationConfig)) {
IOUtils.copyBytes(input, output, chunkSize);
}
}
- void stream(
+ private OzoneOutputStream createOrReplaceKey(OzoneBucket bucket, String keyName,
+ long size, Map<String, String> keyMetadata, ReplicationConfig replicationConfig
+ ) throws IOException {
+ return expectedGeneration > 0
+ ? bucket.rewriteKey(keyName, size, expectedGeneration, replicationConfig, keyMetadata)
+ : bucket.createKey(keyName, size, replicationConfig, keyMetadata);
+ }
+
+ private void stream(
File dataFile, OzoneBucket bucket,
String keyName, Map<String, String> keyMetadata,
ReplicationConfig replicationConfig, int chunkSize)
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/RewriteKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/RewriteKeyHandler.java
new file mode 100644
index 0000000..c6f97c6
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/RewriteKeyHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.shell.keys;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientException;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.shell.OzoneAddress;
+import org.apache.hadoop.ozone.shell.ShellReplicationOptions;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.apache.hadoop.ozone.OzoneConsts.MB;
+
+/**
+ * Rewrite a key with different replication.
+ */
+@CommandLine.Command(name = "rewrite",
+ description = "Rewrites the key with different replication")
+public class RewriteKeyHandler extends KeyHandler {
+
+ @CommandLine.Mixin
+ private ShellReplicationOptions replication;
+
+ @Override
+ protected void execute(OzoneClient client, OzoneAddress address) throws IOException, OzoneClientException {
+ String volumeName = address.getVolumeName();
+ String bucketName = address.getBucketName();
+ String keyName = address.getKeyName();
+
+ OzoneVolume vol = client.getObjectStore().getVolume(volumeName);
+ OzoneBucket bucket = vol.getBucket(bucketName);
+ OzoneKeyDetails key = bucket.getKey(keyName);
+
+ ReplicationConfig newReplication = replication.fromParamsOrConfig(getConf());
+ if (newReplication == null) {
+ newReplication = key.getReplicationConfig().getReplicationType() == HddsProtos.ReplicationType.RATIS
+ ? new ECReplicationConfig(3, 2)
+ : RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+ } else if (newReplication.equals(key.getReplicationConfig())) {
+ System.err.println("Replication unchanged: " + key.getReplicationConfig());
+ return;
+ }
+
+ try (
+ InputStream input = bucket.readKey(keyName);
+ OutputStream output = bucket.rewriteKey(keyName, key.getDataSize(), key.getGeneration(),
+ newReplication, key.getMetadata())) {
+ IOUtils.copyBytes(input, output, (int) Math.min(MB, key.getDataSize()));
+ }
+ }
+}