Merge remote-tracking branch 'origin/master' into HDDS-10656-atomic-key-overwrite
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 79ba9bb..273065b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -342,6 +342,7 @@ private OzoneConsts() {
   public static final String BUCKET_LAYOUT = "bucketLayout";
   public static final String TENANT = "tenant";
   public static final String USER_PREFIX = "userPrefix";
+  public static final String REWRITE_GENERATION = "rewriteGeneration";
 
   // For multi-tenancy
   public static final String TENANT_ID_USERNAME_DELIMITER = "$";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
index c55945d..c53aca2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
@@ -42,6 +42,8 @@ public enum OzoneManagerVersion implements ComponentVersion {
 
   OBJECT_TAG(5, "OzoneManager version that supports object tags"),
 
+  ATOMIC_REWRITE_KEY(6, "OzoneManager version that supports rewriting key as atomic operation"),
+
   FUTURE_VERSION(-1, "Used internally in the client when the server side is "
       + " newer and an unknown server version has arrived to the client.");
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 575701b..216b51b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -491,6 +491,28 @@ public OzoneOutputStream createKey(String key, long size,
   }
 
   /**
+   * This API allows to atomically update an existing key. The key read before invoking this API
+   * should remain unchanged for this key to be written. This is controlled by the generation
+   * field in the existing Key param. If the key is replaced or updated the generation will change. If the
+   * generation has changed since the existing Key was read, either the initial key create will fail,
+   * or the key will fail to commit after the data has been written as the checks are carried out
+   * both at key open and commit time.
+   *
+   * @param keyName Existing key to rewrite. This must exist in the bucket.
+   * @param size The size of the new key
+   * @param existingKeyGeneration The generation of the existing key which is checked for changes at key create
+   *                              and commit time.
+   * @param replicationConfig The replication configuration for the key to be rewritten.
+   * @param metadata custom key value metadata
+   * @return OzoneOutputStream to which the data has to be written.
+   * @throws IOException
+   */
+  public OzoneOutputStream rewriteKey(String keyName, long size, long existingKeyGeneration,
+      ReplicationConfig replicationConfig, Map<String, String> metadata) throws IOException {
+    return proxy.rewriteKey(volumeName, name, keyName, size, existingKeyGeneration, replicationConfig, metadata);
+  }
+
+  /**
    * Creates a new key in the bucket, with default replication type RATIS and
    * with replication factor THREE.
    *
@@ -1862,8 +1884,7 @@ private void addKeyPrefixInfoToResultList(String keyPrefix,
             keyInfo.getDataSize(), keyInfo.getCreationTime(),
             keyInfo.getModificationTime(),
             keyInfo.getReplicationConfig(),
-            keyInfo.isFile(),
-            keyInfo.getOwnerName());
+            keyInfo.isFile(), keyInfo.getOwnerName());
         keysResultList.add(ozoneKey);
       }
     }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
index fdd89fe..3b8d7ff 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
@@ -72,10 +72,6 @@ public class OzoneKey {
    */
   private final boolean isFile;
 
-  /**
-   * Constructs OzoneKey from OmKeyInfo.
-   *
-   */
   @SuppressWarnings("parameternumber")
   public OzoneKey(String volumeName, String bucketName,
       String keyName, long size, long creationTime,
@@ -219,6 +215,10 @@ public boolean isFile() {
     return isFile;
   }
 
+  /**
+   * Constructs OzoneKey from OmKeyInfo.
+   *
+   */
   public static OzoneKey fromKeyInfo(OmKeyInfo keyInfo) {
     return new OzoneKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
         keyInfo.getKeyName(), keyInfo.getDataSize(), keyInfo.getCreationTime(),
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
index 168e15d..fe54779 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
@@ -43,6 +43,12 @@ public class OzoneKeyDetails extends OzoneKey {
   private final CheckedSupplier<OzoneInputStream, IOException> contentSupplier;
 
   /**
+   * The generation of an existing key. This can be used with atomic commits, to
+   * ensure the key has not changed since the key details were read.
+   */
+  private final Long generation;
+
+  /**
    * Constructs OzoneKeyDetails from OmKeyInfo.
    */
   @SuppressWarnings("parameternumber")
@@ -53,12 +59,30 @@ public OzoneKeyDetails(String volumeName, String bucketName, String keyName,
       Map<String, String> metadata,
       FileEncryptionInfo feInfo,
       CheckedSupplier<OzoneInputStream, IOException> contentSupplier,
-      boolean isFile, String owner, Map<String, String> tags) {
+      boolean isFile, String owner, Map<String, String> tags, Long generation) {
     super(volumeName, bucketName, keyName, size, creationTime,
         modificationTime, replicationConfig, metadata, isFile, owner, tags);
     this.ozoneKeyLocations = ozoneKeyLocations;
     this.feInfo = feInfo;
     this.contentSupplier = contentSupplier;
+    this.generation = generation;
+  }
+
+  /**
+   * Constructs OzoneKeyDetails from OmKeyInfo.
+   */
+  @SuppressWarnings("parameternumber")
+  public OzoneKeyDetails(String volumeName, String bucketName, String keyName,
+                         long size, long creationTime, long modificationTime,
+                         List<OzoneKeyLocation> ozoneKeyLocations,
+                         ReplicationConfig replicationConfig,
+                         Map<String, String> metadata,
+                         FileEncryptionInfo feInfo,
+                         CheckedSupplier<OzoneInputStream, IOException> contentSupplier,
+                         boolean isFile, String owner, Map<String, String> tags) {
+    this(volumeName, bucketName, keyName, size, creationTime,
+        modificationTime, ozoneKeyLocations, replicationConfig, metadata, feInfo, contentSupplier,
+        isFile, owner, tags, null);
   }
 
   /**
@@ -72,6 +96,10 @@ public FileEncryptionInfo getFileEncryptionInfo() {
     return feInfo;
   }
 
+  public Long getGeneration() {
+    return generation;
+  }
+
   /**
    * Get OzoneInputStream to read the content of the key.
    * @return OzoneInputStream
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 7434434..55985a1 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -356,6 +356,29 @@ OzoneOutputStream createKey(String volumeName, String bucketName,
       throws IOException;
 
   /**
+   * This API allows to atomically update an existing key. The key read before invoking this API
+   * should remain unchanged for this key to be written. This is controlled by the generation
+   * field in the existing Key param. If the key is replaced or updated the generation will change. If the
+   * generation has changed since the existing Key was read, either the initial key create will fail,
+   * or the key will fail to commit after the data has been written as the checks are carried out
+   * both at key open and commit time.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param keyName Existing key to rewrite. This must exist in the bucket.
+   * @param size The size of the new key
+   * @param existingKeyGeneration The generation of the existing key which is checked for changes at key create
+   *                              and commit time.
+   * @param replicationConfig The replication configuration for the key to be rewritten.
+   * @param metadata custom key value metadata
+   * @return {@link OzoneOutputStream}
+   * @throws IOException
+   */
+  OzoneOutputStream rewriteKey(String volumeName, String bucketName, String keyName,
+      long size, long existingKeyGeneration, ReplicationConfig replicationConfig,
+       Map<String, String> metadata) throws IOException;
+
+  /**
    * Writes a key in an existing bucket.
    * @param volumeName Name of the Volume
    * @param bucketName Name of the Bucket
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 248f21c..ac0cf1d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1396,22 +1396,7 @@ public OzoneOutputStream createKey(
       String volumeName, String bucketName, String keyName, long size,
       ReplicationConfig replicationConfig,
       Map<String, String> metadata, Map<String, String> tags) throws IOException {
-    verifyVolumeName(volumeName);
-    verifyBucketName(bucketName);
-    if (checkKeyNameEnabled) {
-      HddsClientUtils.verifyKeyName(keyName);
-    }
-    HddsClientUtils.checkNotNull(keyName);
-    if (omVersion
-        .compareTo(OzoneManagerVersion.ERASURE_CODED_STORAGE_SUPPORT) < 0) {
-      if (replicationConfig != null &&
-          replicationConfig.getReplicationType()
-              == HddsProtos.ReplicationType.EC) {
-        throw new IOException("Can not set the replication of the key to"
-            + " Erasure Coded replication, as OzoneManager does not support"
-            + " Erasure Coded replication.");
-      }
-    }
+    createKeyPreChecks(volumeName, bucketName, keyName, replicationConfig);
 
     if (omVersion.compareTo(OzoneManagerVersion.OBJECT_TAG) < 0) {
       if (tags != null && !tags.isEmpty()) {
@@ -1419,9 +1404,6 @@ public OzoneOutputStream createKey(
       }
     }
 
-    if (replicationConfig != null) {
-      replicationConfigValidator.validate(replicationConfig);
-    }
     String ownerName = getRealUserInfo().getShortUserName();
 
     OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
@@ -1448,6 +1430,61 @@ public OzoneOutputStream createKey(
   }
 
   @Override
+  public OzoneOutputStream rewriteKey(String volumeName, String bucketName, String keyName,
+      long size, long existingKeyGeneration, ReplicationConfig replicationConfig,
+      Map<String, String> metadata) throws IOException {
+    if (omVersion.compareTo(OzoneManagerVersion.ATOMIC_REWRITE_KEY) < 0) {
+      throw new IOException("OzoneManager does not support atomic key rewrite.");
+    }
+
+    createKeyPreChecks(volumeName, bucketName, keyName, replicationConfig);
+
+    OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(size)
+        .setReplicationConfig(replicationConfig)
+        .addAllMetadataGdpr(metadata)
+        .setLatestVersionLocation(getLatestVersionLocation)
+        .setExpectedDataGeneration(existingKeyGeneration);
+
+    OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
+    // For bucket with layout OBJECT_STORE, when create an empty file (size=0),
+    // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE,
+    // which will cause S3G's atomic write length check to fail,
+    // so reset size to 0 here.
+    if (isS3GRequest.get() && size == 0) {
+      openKey.getKeyInfo().setDataSize(0);
+    }
+    return createOutputStream(openKey);
+  }
+
+  private void createKeyPreChecks(String volumeName, String bucketName, String keyName,
+      ReplicationConfig replicationConfig) throws IOException {
+    verifyVolumeName(volumeName);
+    verifyBucketName(bucketName);
+    if (checkKeyNameEnabled) {
+      HddsClientUtils.verifyKeyName(keyName);
+    }
+    HddsClientUtils.checkNotNull(keyName);
+    if (omVersion
+        .compareTo(OzoneManagerVersion.ERASURE_CODED_STORAGE_SUPPORT) < 0) {
+      if (replicationConfig != null &&
+          replicationConfig.getReplicationType()
+              == HddsProtos.ReplicationType.EC) {
+        throw new IOException("Can not set the replication of the key to"
+            + " Erasure Coded replication, as OzoneManager does not support"
+            + " Erasure Coded replication.");
+      }
+    }
+
+    if (replicationConfig != null) {
+      replicationConfigValidator.validate(replicationConfig);
+    }
+  }
+
+  @Override
   public OzoneDataStreamOutput createStreamKey(
       String volumeName, String bucketName, String keyName, long size,
       ReplicationConfig replicationConfig,
@@ -1767,8 +1804,10 @@ private OzoneKeyDetails getOzoneKeyDetails(OmKeyInfo keyInfo) {
         keyInfo.getModificationTime(), ozoneKeyLocations,
         keyInfo.getReplicationConfig(), keyInfo.getMetadata(),
         keyInfo.getFileEncryptionInfo(),
-        () -> getInputStreamWithRetryFunction(keyInfo), keyInfo.isFile(), 
-        keyInfo.getOwnerName(), keyInfo.getTags());
+        () -> getInputStreamWithRetryFunction(keyInfo), keyInfo.isFile(),
+        keyInfo.getOwnerName(), keyInfo.getTags(),
+        keyInfo.getGeneration()
+    );
   }
 
   @Override
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index 19d5ab4..ba28b45 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -54,6 +54,13 @@ public final class OmKeyArgs implements Auditable {
   private final boolean headOp;
   private final boolean forceUpdateContainerCacheFromSCM;
   private final Map<String, String> tags;
+  // expectedDataGeneration, when used in key creation indicates that a
+  // key with the same keyName should exist with the given generation.
+  // For a key commit to succeed, the original key should still be present with the
+  // generation unchanged.
+  // This allows a key to be created an committed atomically if the original has not
+  // been modified.
+  private Long expectedDataGeneration = null;
 
   private OmKeyArgs(Builder b) {
     this.volumeName = b.volumeName;
@@ -74,6 +81,7 @@ private OmKeyArgs(Builder b) {
     this.forceUpdateContainerCacheFromSCM = b.forceUpdateContainerCacheFromSCM;
     this.ownerName = b.ownerName;
     this.tags = b.tags;
+    this.expectedDataGeneration = b.expectedDataGeneration;
   }
 
   public boolean getIsMultipartKey() {
@@ -156,6 +164,10 @@ public Map<String, String> getTags() {
     return tags;
   }
 
+  public Long getExpectedDataGeneration() {
+    return expectedDataGeneration;
+  }
+
   @Override
   public Map<String, String> toAuditMap() {
     Map<String, String> auditMap = new LinkedHashMap<>();
@@ -179,7 +191,7 @@ public void addLocationInfo(OmKeyLocationInfo locationInfo) {
   }
 
   public OmKeyArgs.Builder toBuilder() {
-    return new OmKeyArgs.Builder()
+    OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
         .setKeyName(keyName)
@@ -197,11 +209,16 @@ public OmKeyArgs.Builder toBuilder() {
         .setAcls(acls)
         .setForceUpdateContainerCacheFromSCM(forceUpdateContainerCacheFromSCM)
         .addAllTags(tags);
+
+    if (expectedDataGeneration != null) {
+      builder.setExpectedDataGeneration(expectedDataGeneration);
+    }
+    return builder;
   }
 
   @Nonnull
   public KeyArgs toProtobuf() {
-    return KeyArgs.newBuilder()
+    KeyArgs.Builder builder = KeyArgs.newBuilder()
         .setVolumeName(getVolumeName())
         .setBucketName(getBucketName())
         .setKeyName(getKeyName())
@@ -210,8 +227,11 @@ public KeyArgs toProtobuf() {
         .setLatestVersionLocation(getLatestVersionLocation())
         .setHeadOp(isHeadOp())
         .setForceUpdateContainerCacheFromSCM(
-            isForceUpdateContainerCacheFromSCM())
-        .build();
+            isForceUpdateContainerCacheFromSCM());
+    if (expectedDataGeneration != null) {
+      builder.setExpectedDataGeneration(expectedDataGeneration);
+    }
+    return builder.build();
   }
 
   /**
@@ -236,6 +256,7 @@ public static class Builder {
     private boolean headOp;
     private boolean forceUpdateContainerCacheFromSCM;
     private final Map<String, String> tags = new HashMap<>();
+    private Long expectedDataGeneration = null;
 
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
@@ -345,6 +366,11 @@ public Builder setForceUpdateContainerCacheFromSCM(boolean value) {
       return this;
     }
 
+    public Builder setExpectedDataGeneration(long generation) {
+      this.expectedDataGeneration = generation;
+      return this;
+    }
+
     public OmKeyArgs build() {
       return new OmKeyArgs(this);
     }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index c8e7f8f..f52a142 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -107,6 +107,14 @@ public static Codec<OmKeyInfo> getCodec(boolean ignorePipeline) {
    */
   private Map<String, String> tags;
 
+  // expectedDataGeneration, when used in key creation indicates that a
+  // key with the same keyName should exist with the given generation.
+  // For a key commit to succeed, the original key should still be present with the
+  // generation unchanged.
+  // This allows a key to be created an committed atomically if the original has not
+  // been modified.
+  private Long expectedDataGeneration = null;
+
   private OmKeyInfo(Builder b) {
     super(b);
     this.volumeName = b.volumeName;
@@ -124,6 +132,7 @@ private OmKeyInfo(Builder b) {
     this.isFile = b.isFile;
     this.ownerName = b.ownerName;
     this.tags = b.tags;
+    this.expectedDataGeneration = b.expectedDataGeneration;
   }
 
   public String getVolumeName() {
@@ -166,10 +175,26 @@ public String getFileName() {
     return fileName;
   }
 
+  public void setExpectedDataGeneration(Long generation) {
+    this.expectedDataGeneration = generation;
+  }
+
+  public Long getExpectedDataGeneration() {
+    return expectedDataGeneration;
+  }
+
   public String getOwnerName() {
     return ownerName;
   }
 
+  /**
+   * Returns the generation of the object. Note this is currently the same as updateID for a key.
+   * @return long
+   */
+  public long getGeneration() {
+    return getUpdateID();
+  }
+
   public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() {
     return keyLocationVersions.size() == 0 ? null :
         keyLocationVersions.get(keyLocationVersions.size() - 1);
@@ -452,6 +477,7 @@ public static class Builder extends WithParentObjectId.Builder {
 
     private boolean isFile;
     private final Map<String, String> tags = new HashMap<>();
+    private Long expectedDataGeneration = null;
 
     public Builder() {
     }
@@ -590,6 +616,11 @@ public Builder addAllTags(Map<String, String> keyTags) {
       return this;
     }
 
+    public Builder setExpectedDataGeneration(Long existingGeneration) {
+      this.expectedDataGeneration = existingGeneration;
+      return this;
+    }
+
     public OmKeyInfo build() {
       return new OmKeyInfo(this);
     }
@@ -695,6 +726,9 @@ private KeyInfo getProtobuf(boolean ignorePipeline, String fullKeyName,
       kb.setFileEncryptionInfo(OMPBHelper.convert(encInfo));
     }
     kb.setIsFile(isFile);
+    if (expectedDataGeneration != null) {
+      kb.setExpectedDataGeneration(expectedDataGeneration);
+    }
     if (ownerName != null) {
       kb.setOwnerName(ownerName);
     }
@@ -745,6 +779,9 @@ public static OmKeyInfo getFromProtobuf(KeyInfo keyInfo) throws IOException {
     if (keyInfo.hasIsFile()) {
       builder.setFile(keyInfo.getIsFile());
     }
+    if (keyInfo.hasExpectedDataGeneration()) {
+      builder.setExpectedDataGeneration(keyInfo.getExpectedDataGeneration());
+    }
 
     if (keyInfo.hasOwnerName()) {
       builder.setOwnerName(keyInfo.getOwnerName());
@@ -863,6 +900,9 @@ public OmKeyInfo copyObject() {
     if (fileChecksum != null) {
       builder.setFileChecksum(fileChecksum);
     }
+    if (expectedDataGeneration != null) {
+      builder.setExpectedDataGeneration(expectedDataGeneration);
+    }
 
     return builder.build();
   }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 3e21494..e6a8556 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -689,8 +689,13 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
     KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
-        .setKeyName(args.getKeyName())
-        .setOwnerName(args.getOwner());
+        .setKeyName(args.getKeyName());
+
+    // When rewriting a key, the owner does not need to be passed, as it is inherited
+    // from the existing key. Hence it can be null.
+    if (args.getOwner() != null) {
+      keyArgs.setOwnerName(args.getOwner());
+    }
 
     if (args.getAcls() != null) {
       keyArgs.addAllAcls(args.getAcls().stream().distinct().map(a ->
@@ -733,6 +738,10 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
 
     keyArgs.setSortDatanodes(args.getSortDatanodes());
 
+    if (args.getExpectedDataGeneration() != null) {
+      keyArgs.setExpectedDataGeneration(args.getExpectedDataGeneration());
+    }
+
     req.setKeyArgs(keyArgs.build());
 
     OMRequest omRequest = createOMRequest(Type.CreateKey)
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
index 4aead0c..03ab063 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
@@ -67,6 +67,7 @@ public void protobufConversion() throws IOException {
     assertFalse(key.isHsync());
     key.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, "clientid");
     assertTrue(key.isHsync());
+    assertEquals(5678L, key.getExpectedDataGeneration());
   }
 
   @Test
@@ -123,6 +124,7 @@ private OmKeyInfo createOmKeyInfo(ReplicationConfig replicationConfig) {
         .setReplicationConfig(replicationConfig)
         .addMetadata("key1", "value1")
         .addMetadata("key2", "value2")
+        .setExpectedDataGeneration(5678L)
         .build();
   }
 
diff --git a/hadoop-ozone/dist/src/main/compose/common/ec-test.sh b/hadoop-ozone/dist/src/main/compose/common/ec-test.sh
index 65a6595..04df2b2 100755
--- a/hadoop-ozone/dist/src/main/compose/common/ec-test.sh
+++ b/hadoop-ozone/dist/src/main/compose/common/ec-test.sh
@@ -20,6 +20,8 @@
 ## Exclude virtual-host tests. This is tested separately as it requires additional config.
 execute_robot_test scm -v BUCKET:erasure --exclude virtual-host s3
 
+execute_robot_test scm ec/rewrite.robot
+
 prefix=${RANDOM}
 execute_robot_test scm -v PREFIX:${prefix} ec/basic.robot
 docker-compose up -d --no-recreate --scale datanode=4
diff --git a/hadoop-ozone/dist/src/main/smoketest/ec/rewrite.robot b/hadoop-ozone/dist/src/main/smoketest/ec/rewrite.robot
new file mode 100644
index 0000000..5b5df20
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/ec/rewrite.robot
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Documentation       Test EC shell commands
+Library             OperatingSystem
+Resource            ../commonlib.robot
+Resource            ../lib/os.robot
+Resource            ../ozone-lib/shell.robot
+Resource            ../s3/commonawslib.robot
+Resource            lib.resource
+Suite Setup         Run Keyword if    '${SECURITY_ENABLED}' == 'true'    Kinit test user     testuser     testuser.keytab
+
+
+*** Variables ***
+${ENDPOINT_URL}       http://s3g:9878
+
+
+*** Test Cases ***
+
+Rewrite Multipart Key
+    [setup]    Setup v4 headers
+    ${bucket} =    Create bucket with layout    /s3v    OBJECT_STORE
+    ${key} =    Set Variable    multipart.key
+    ${file} =    Create Random File MB    12
+    Execute AWSS3Cli    cp ${file} s3://${bucket}/${key}
+    Key Should Match Local File    /s3v/${bucket}/${key}    ${file}
+    Verify Key Replica Replication Config    /s3v/${bucket}/${key}    RATIS    THREE
+
+    Execute    ozone sh key rewrite -t EC -r rs-3-2-1024k /s3v/${bucket}/${key}
+
+    Key Should Match Local File    /s3v/${bucket}/${key}    ${file}
+    Verify Key EC Replication Config    /s3v/${bucket}/${key}    RS    3    2    1048576
diff --git a/hadoop-ozone/dist/src/main/smoketest/lib/os.robot b/hadoop-ozone/dist/src/main/smoketest/lib/os.robot
index cc20d6e..98f836f 100644
--- a/hadoop-ozone/dist/src/main/smoketest/lib/os.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/lib/os.robot
@@ -40,13 +40,28 @@
     ${checksumafter} =          Execute                    md5sum ${file2} | awk '{print $1}'
                                 Should Be Equal            ${checksumbefore}            ${checksumafter}
 
+Create Random File MB
+    [arguments]    ${size_in_megabytes}    ${path}=${EMPTY}
+    ${path} =      Create Random File      ${size_in_megabytes}    1048576    ${path}
+    [return]       ${path}
+
+Create Random File KB
+    [arguments]    ${size_in_kilobytes}    ${path}=${EMPTY}
+    ${path} =      Create Random File      ${size_in_kilobytes}    1024    ${path}
+    [return]       ${path}
+
 Create Random File
-    ${postfix} =             Generate Random String  5  [NUMBERS]
+    [arguments]    ${block_count}    ${block_size}    ${path}=${EMPTY}
+    ${path} =      Run Keyword If   '${path}' == '${EMPTY}'    Get Random Filename
+    ...            ELSE             Set Variable    ${path}
+    Execute        dd if=/dev/urandom of=${path} bs=${block_size} count=${block_count} status=none
+    [return]       ${path}
+
+Get Random Filename
+    ${postfix} =             Generate Random String  10  [LOWER]
     ${tmpfile} =             Set Variable   /tmp/tempfile-${postfix}
     File Should Not Exist    ${tmpfile}
-    ${content} =             Set Variable   "Random string"
-    Create File              ${tmpfile}    ${content}
-    [Return]                 ${tmpfile}
+    [return]                 ${tmpfile}
 
 List All Processes
     ${output} =    Execute    ps aux
diff --git a/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot
index b255d51..ffefda8 100644
--- a/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot
@@ -56,6 +56,13 @@
     ${dir} =    Execute    ozone envvars | grep 'HDDS_LIB_JARS_DIR' | cut -f2 -d= | sed -e "s/'//g" -e 's/"//g'
     Set Environment Variable    HDDS_LIB_JARS_DIR    ${dir}
 
+Create bucket with layout
+    [Arguments]          ${volume}    ${layout}
+    ${postfix} =         Generate Random String    10    [LOWER]
+    ${bucket} =          Set Variable    bucket-${postfix}
+    ${result} =          Execute         ozone sh bucket create --layout ${layout} ${volume}/${bucket}
+    [Return]             ${bucket}
+
 Create Key
     [arguments]    ${key}    ${file}    ${args}=${EMPTY}
     ${output} =    Execute          ozone sh key put ${args} ${key} ${file}
diff --git a/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot
index 56fbcf8..22805ef 100644
--- a/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot
@@ -48,7 +48,7 @@
     Key Should Match Local File     o3://${OM_SERVICE_ID}/vol1/bucket/passwd    /etc/passwd
 
 Compare Key With Local File with Different File
-    ${random_file} =            Create Random File
+    ${random_file} =            Create Random File KB    42
     ${matches} =                Compare Key With Local File     o3://${OM_SERVICE_ID}/vol1/bucket/passwd    ${random_file}
     Should Be Equal             ${matches}     ${FALSE}
     [Teardown]                  Remove File    ${random_file}
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
index 2c4195f..08fc692 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
@@ -22,6 +22,7 @@
 Resource            commonawslib.robot
 Test Timeout        5 minutes
 Suite Setup         Setup Multipart Tests
+Suite Teardown      Teardown Multipart Tests
 Test Setup          Generate random prefix
 
 *** Keywords ***
@@ -29,18 +30,19 @@
     Setup s3 tests
 
     # 5MB + a bit
-    Create Random File KB    /tmp/part1    5121
+    Create Random File KB    5121    /tmp/part1
 
     # 1MB - a bit
-    Create Random File KB    /tmp/part2    1023
+    Create Random File KB    1023    /tmp/part2
 
-Create Random file
-    [arguments]             ${size_in_megabytes}
-    Execute                 dd if=/dev/urandom of=/tmp/part1 bs=1048576 count=${size_in_megabytes} status=none
+    Create Random File MB    10      /tmp/10mb
+    Create Random File MB    22      /tmp/22mb
+    Create Random File KB    10      /tmp/10kb
 
-Create Random File KB
-    [arguments]             ${file}    ${size_in_kilobytes}
-    Execute                 dd if=/dev/urandom of=${file} bs=1024 count=${size_in_kilobytes} status=none
+
+Teardown Multipart Tests
+    Remove Files    /tmp/part1 /tmp/part2 /tmp/10mb /tmp/22mb /tmp/10kb
+
 
 Wait Til Date Past
     [arguments]         ${date}
@@ -77,11 +79,9 @@
 # upload we get error entity too small. So, considering further complete
 # multipart upload, uploading each part as 5MB file, exception is for last part
 
-    Run Keyword         Create Random file      5
     ${result} =         Execute AWSS3APICli     upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey --part-number 1 --body /tmp/part1 --upload-id ${nextUploadID}
                         Should contain          ${result}    ETag
 # override part
-    Run Keyword         Create Random file      5
     ${result} =         Execute AWSS3APICli     upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey --part-number 1 --body /tmp/part1 --upload-id ${nextUploadID}
                         Should contain          ${result}    ETag
 
@@ -94,7 +94,6 @@
                         Should contain          ${result}    UploadId
 
 #upload parts
-    Run Keyword         Create Random file            5
     ${result} =         Execute AWSS3APICli           upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey1 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
     ${eTag1} =          Execute and checkrc           echo '${result}' | jq -r '.ETag'   0
                         Should contain                ${result}    ETag
@@ -171,13 +170,11 @@
                         Should contain          ${result}    UploadId
 
 #upload parts
-                        Execute                 echo "Part1" > /tmp/part1
-    ${result} =         Execute AWSS3APICli     upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey2 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
+    ${result} =         Execute AWSS3APICli     upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey2 --part-number 1 --body /tmp/10kb --upload-id ${uploadID}
     ${eTag1} =          Execute and checkrc     echo '${result}' | jq -r '.ETag'   0
                         Should contain          ${result}    ETag
 
-                        Execute                 echo "Part2" > /tmp/part2
-    ${result} =         Execute AWSS3APICli     upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey2 --part-number 2 --body /tmp/part2 --upload-id ${uploadID}
+    ${result} =         Execute AWSS3APICli     upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey2 --part-number 2 --body /tmp/10kb --upload-id ${uploadID}
     ${eTag2} =          Execute and checkrc     echo '${result}' | jq -r '.ETag'   0
                         Should contain          ${result}    ETag
 
@@ -199,7 +196,6 @@
     ${result} =         Execute AWSS3APICli and checkrc  complete-multipart-upload --upload-id ${uploadID} --bucket ${BUCKET} --key ${PREFIX}/multipartKey3 --multipart-upload 'Parts=[{ETag=etag1,PartNumber=2},{ETag=etag2,PartNumber=1}]'    255
                         Should contain          ${result}    InvalidPart
 #upload parts
-                        Run Keyword             Create Random file      5
     ${result} =         Execute AWSS3APICli     upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey3 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
     ${eTag1} =          Execute and checkrc     echo '${result}' | jq -r '.ETag'   0
                         Should contain          ${result}    ETag
@@ -264,7 +260,6 @@
                         Should contain          ${result}    UploadId
 
 #upload parts
-    Run Keyword         Create Random file      5
     ${result} =         Execute AWSS3APICli     upload-part --bucket ${BUCKET} --key ${PREFIX}/multipartKey5 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
     ${eTag1} =          Execute and checkrc     echo '${result}' | jq -r '.ETag'   0
                         Should contain          ${result}    ETag
@@ -298,14 +293,12 @@
     ${result} =         Execute AWSS3APICli and checkrc    abort-multipart-upload --bucket ${BUCKET} --key ${PREFIX}/multipartKey5 --upload-id ${uploadID}    0
 
 Test Multipart Upload with the simplified aws s3 cp API
-                        Create Random file      22
-                        Execute AWSS3Cli        cp /tmp/part1 s3://${BUCKET}/mpyawscli
-                        Execute AWSS3Cli        cp s3://${BUCKET}/mpyawscli /tmp/part1.result
+                        Execute AWSS3Cli        cp /tmp/22mb s3://${BUCKET}/mpyawscli
+                        Execute AWSS3Cli        cp s3://${BUCKET}/mpyawscli /tmp/22mb.result
                         Execute AWSS3Cli        rm s3://${BUCKET}/mpyawscli
-                        Compare files           /tmp/part1        /tmp/part1.result
+                        Compare files           /tmp/22mb        /tmp/22mb.result
 
 Test Multipart Upload Put With Copy
-    Run Keyword         Create Random file      5
     ${result} =         Execute AWSS3APICli     put-object --bucket ${BUCKET} --key ${PREFIX}/copytest/source --body /tmp/part1
 
 
@@ -327,8 +320,7 @@
                         Compare files           /tmp/part1        /tmp/part-result
 
 Test Multipart Upload Put With Copy and range
-    Run Keyword         Create Random file      10
-    ${result} =         Execute AWSS3APICli     put-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/source --body /tmp/part1
+    ${result} =         Execute AWSS3APICli     put-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/source --body /tmp/10mb
 
 
     ${result} =         Execute AWSS3APICli     create-multipart-upload --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination
@@ -351,15 +343,14 @@
                         Execute AWSS3APICli     complete-multipart-upload --upload-id ${uploadID} --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination --multipart-upload 'Parts=[{ETag=${eTag1},PartNumber=1},{ETag=${eTag2},PartNumber=2}]'
                         Execute AWSS3APICli     get-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination /tmp/part-result
 
-                        Compare files           /tmp/part1        /tmp/part-result
+                        Compare files           /tmp/10mb        /tmp/part-result
 
 Test Multipart Upload Put With Copy and range with IfModifiedSince
-    Run Keyword         Create Random file      10
     ${curDate} =        Get Current Date
     ${beforeCreate} =   Subtract Time From Date     ${curDate}  1 day
     ${tomorrow} =       Add Time To Date            ${curDate}  1 day
 
-    ${result} =         Execute AWSS3APICli     put-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/source --body /tmp/part1
+    ${result} =         Execute AWSS3APICli     put-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/source --body /tmp/10mb
 
     ${result} =         Execute AWSS3APICli     create-multipart-upload --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination
 
@@ -404,7 +395,7 @@
                         Execute AWSS3APICli     complete-multipart-upload --upload-id ${uploadID} --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination --multipart-upload 'Parts=[{ETag=${eTag1},PartNumber=1},{ETag=${eTag2},PartNumber=2}]'
                         Execute AWSS3APICli     get-object --bucket ${BUCKET} --key ${PREFIX}/copyrange/destination /tmp/part-result
 
-                        Compare files           /tmp/part1        /tmp/part-result
+                        Compare files           /tmp/10mb        /tmp/part-result
 
 Test Multipart Upload list
     ${result} =         Execute AWSS3APICli     create-multipart-upload --bucket ${BUCKET} --key ${PREFIX}/listtest/key1
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot b/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot
index b205370..45dee92 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot
@@ -129,13 +129,6 @@
                          Should contain              ${result}         Location
                          Should contain              ${result}         ${bucket}
 
-Create bucket with layout
-    [Arguments]          ${layout}
-    ${postfix} =         Generate Ozone String
-    ${bucket} =          Set Variable    bucket-${postfix}
-    ${result} =          Execute         ozone sh bucket create --layout ${layout} s3v/${bucket}
-    [Return]             ${bucket}
-
 Setup s3 tests
     Return From Keyword if    ${OZONE_S3_TESTS_SET_UP}
     Run Keyword        Generate random prefix
@@ -156,7 +149,7 @@
 
 Create generated bucket
     [Arguments]          ${layout}=OBJECT_STORE
-    ${BUCKET} =          Create bucket with layout    ${layout}
+    ${BUCKET} =          Create bucket with layout    s3v    ${layout}
     Set Global Variable   ${BUCKET}
 
 Create encrypted bucket
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
index f3c9227..007591d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
@@ -49,6 +49,7 @@
 
 import javax.xml.bind.DatatypeConverter;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -138,12 +139,15 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.OzoneTestBase;
 import org.apache.ozone.test.tag.Flaky;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
+
+import static java.util.Collections.singletonMap;
 import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
@@ -190,6 +194,7 @@
 import org.junit.jupiter.api.TestMethodOrder;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 /**
@@ -197,7 +202,7 @@
  * Client.
  */
 @TestMethodOrder(MethodOrderer.MethodName.class)
-abstract class OzoneRpcClientTests {
+abstract class OzoneRpcClientTests extends OzoneTestBase {
 
   private static MiniOzoneCluster cluster = null;
   private static OzoneClient ozClient = null;
@@ -1094,6 +1099,223 @@ public void testPutKey() throws IOException {
     }
   }
 
+  @ParameterizedTest
+  @EnumSource
+  void rewriteKey(BucketLayout layout) throws IOException {
+    OzoneBucket bucket = createBucket(layout);
+    OzoneKeyDetails keyDetails = createTestKey(bucket);
+    OmKeyArgs keyArgs = toOmKeyArgs(keyDetails);
+    OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
+
+    final byte[] newContent = "rewrite value".getBytes(UTF_8);
+    rewriteKey(bucket, keyDetails, newContent);
+
+    OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, keyDetails.getName(), newContent);
+    assertThat(actualKeyDetails.getGeneration()).isGreaterThan(keyDetails.getGeneration());
+    assertMetadataUnchanged(keyDetails, actualKeyDetails);
+    assertMetadataAfterRewrite(keyInfo, ozoneManager.lookupKey(keyArgs));
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void overwriteAfterRewrite(BucketLayout layout) throws IOException {
+    OzoneBucket bucket = createBucket(layout);
+    OzoneKeyDetails keyDetails = createTestKey(bucket);
+    rewriteKey(bucket, keyDetails, "rewrite".getBytes(UTF_8));
+
+    final byte[] overwriteContent = "overwrite".getBytes(UTF_8);
+    OzoneKeyDetails overwriteDetails = createTestKey(bucket, keyDetails.getName(), overwriteContent);
+
+    OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, keyDetails.getName(), overwriteContent);
+    assertEquals(overwriteDetails.getGeneration(), actualKeyDetails.getGeneration());
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void rewriteAfterRename(BucketLayout layout) throws IOException {
+    OzoneBucket bucket = createBucket(layout);
+    OzoneKeyDetails keyDetails = createTestKey(bucket);
+    String newKeyName = "rewriteAfterRename-" + layout;
+
+    bucket.renameKey(keyDetails.getName(), newKeyName);
+    OzoneKeyDetails renamedKeyDetails = bucket.getKey(newKeyName);
+    OmKeyArgs keyArgs = toOmKeyArgs(renamedKeyDetails);
+    OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
+
+    final byte[] rewriteContent = "rewrite".getBytes(UTF_8);
+    rewriteKey(bucket, renamedKeyDetails, rewriteContent);
+
+    OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, newKeyName, rewriteContent);
+    assertMetadataUnchanged(keyDetails, actualKeyDetails);
+    assertMetadataAfterRewrite(keyInfo, ozoneManager.lookupKey(keyArgs));
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void renameAfterRewrite(BucketLayout layout) throws IOException {
+    OzoneBucket bucket = createBucket(layout);
+    OzoneKeyDetails keyDetails = createTestKey(bucket);
+    final byte[] rewriteContent = "rewrite".getBytes(UTF_8);
+    rewriteKey(bucket, keyDetails, rewriteContent);
+    OmKeyInfo keyInfo = ozoneManager.lookupKey(toOmKeyArgs(keyDetails));
+
+    String newKeyName = "renameAfterRewrite-" + layout;
+    bucket.renameKey(keyDetails.getName(), newKeyName);
+
+    OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, newKeyName, rewriteContent);
+    assertMetadataUnchanged(keyDetails, actualKeyDetails);
+    assertMetadataAfterRewrite(keyInfo, ozoneManager.lookupKey(toOmKeyArgs(actualKeyDetails)));
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void rewriteFailsDueToOutdatedGeneration(BucketLayout layout) throws IOException {
+    OzoneBucket bucket = createBucket(layout);
+    OzoneKeyDetails keyDetails = createTestKey(bucket);
+    OmKeyArgs keyArgs = toOmKeyArgs(keyDetails);
+
+    // overwrite to get new generation
+    final byte[] overwriteContent = "overwrite".getBytes(UTF_8);
+    OzoneKeyDetails overwriteDetails = createTestKey(bucket, keyDetails.getName(), overwriteContent);
+    OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
+
+    // try to rewrite previous generation
+    OMException e = assertThrows(OMException.class, () -> rewriteKey(bucket, keyDetails, "rewrite".getBytes(UTF_8)));
+    assertEquals(KEY_NOT_FOUND, e.getResult());
+    assertThat(e).hasMessageContaining("Generation mismatch");
+
+    OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, keyDetails.getName(), overwriteContent);
+    assertEquals(overwriteDetails.getGeneration(), actualKeyDetails.getGeneration());
+    assertMetadataUnchanged(overwriteDetails, actualKeyDetails);
+    assertUnchanged(keyInfo, ozoneManager.lookupKey(keyArgs));
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void rewriteFailsDueToOutdatedGenerationAtCommit(BucketLayout layout) throws IOException {
+    OzoneBucket bucket = createBucket(layout);
+    OzoneKeyDetails keyDetails = createTestKey(bucket);
+    final byte[] overwriteContent = "overwrite".getBytes(UTF_8);
+    OmKeyArgs keyArgs = toOmKeyArgs(keyDetails);
+    OmKeyInfo keyInfo;
+
+    OzoneOutputStream out = null;
+    final OzoneKeyDetails overwriteDetails;
+    try {
+      out = bucket.rewriteKey(keyDetails.getName(), keyDetails.getDataSize(),
+          keyDetails.getGeneration(), RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+          keyDetails.getMetadata());
+      out.write("rewrite".getBytes(UTF_8));
+
+      overwriteDetails = createTestKey(bucket, keyDetails.getName(), overwriteContent);
+      keyInfo = ozoneManager.lookupKey(keyArgs);
+
+      OMException e = assertThrows(OMException.class, out::close);
+      assertEquals(KEY_NOT_FOUND, e.getResult());
+      assertThat(e).hasMessageContaining("does not match the expected generation to rewrite");
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+
+    OzoneKeyDetails actualKeyDetails = assertKeyContent(bucket, keyDetails.getName(), overwriteContent);
+    assertEquals(overwriteDetails.getGeneration(), actualKeyDetails.getGeneration());
+    assertUnchanged(keyInfo, ozoneManager.lookupKey(keyArgs));
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void cannotRewriteDeletedKey(BucketLayout layout) throws IOException {
+    OzoneBucket bucket = createBucket(layout);
+    OzoneKeyDetails keyDetails = createTestKey(bucket);
+    bucket.deleteKey(keyDetails.getName());
+
+    OMException e = assertThrows(OMException.class, () -> rewriteKey(bucket, keyDetails, "rewrite".getBytes(UTF_8)));
+    assertEquals(KEY_NOT_FOUND, e.getResult());
+    assertThat(e).hasMessageContaining("not found");
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void cannotRewriteRenamedKey(BucketLayout layout) throws IOException {
+    OzoneBucket bucket = createBucket(layout);
+    OzoneKeyDetails keyDetails = createTestKey(bucket);
+    bucket.renameKey(keyDetails.getName(), "newKeyName-" + layout.name());
+
+    OMException e = assertThrows(OMException.class, () -> rewriteKey(bucket, keyDetails, "rewrite".getBytes(UTF_8)));
+    assertEquals(KEY_NOT_FOUND, e.getResult());
+    assertThat(e).hasMessageContaining("not found");
+  }
+
+  private static void rewriteKey(
+      OzoneBucket bucket, OzoneKeyDetails keyDetails, byte[] newContent
+  ) throws IOException {
+    try (OzoneOutputStream out = bucket.rewriteKey(keyDetails.getName(), keyDetails.getDataSize(),
+        keyDetails.getGeneration(), RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        keyDetails.getMetadata())) {
+      out.write(newContent);
+    }
+  }
+
+  private static OzoneKeyDetails assertKeyContent(
+      OzoneBucket bucket, String keyName, byte[] expectedContent
+  ) throws IOException {
+    OzoneKeyDetails updatedKeyDetails = bucket.getKey(keyName);
+
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[expectedContent.length];
+      IOUtils.readFully(is, fileContent);
+      assertArrayEquals(expectedContent, fileContent);
+    }
+
+    return updatedKeyDetails;
+  }
+
+  private OzoneBucket createBucket(BucketLayout layout) throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+
+    BucketArgs args = BucketArgs.newBuilder()
+        .setBucketLayout(layout)
+        .build();
+
+    volume.createBucket(bucketName, args);
+    return volume.getBucket(bucketName);
+  }
+
+  private static OmKeyArgs toOmKeyArgs(OzoneKeyDetails keyDetails) {
+    return new OmKeyArgs.Builder()
+        .setVolumeName(keyDetails.getVolumeName())
+        .setBucketName(keyDetails.getBucketName())
+        .setKeyName(keyDetails.getName())
+        .build();
+  }
+
+  private static void assertUnchanged(OmKeyInfo original, OmKeyInfo current) {
+    assertEquals(original.getAcls(), current.getAcls());
+    assertEquals(original.getMetadata(), current.getMetadata());
+    assertEquals(original.getCreationTime(), current.getCreationTime());
+    assertEquals(original.getUpdateID(), current.getUpdateID());
+    assertEquals(original.getModificationTime(), current.getModificationTime());
+  }
+
+  private static void assertMetadataAfterRewrite(OmKeyInfo original, OmKeyInfo updated) {
+    assertEquals(original.getAcls(), updated.getAcls());
+    assertEquals(original.getMetadata(), updated.getMetadata());
+    assertEquals(original.getCreationTime(), updated.getCreationTime());
+    assertThat(updated.getUpdateID()).isGreaterThan(original.getUpdateID());
+    assertThat(updated.getModificationTime()).isGreaterThanOrEqualTo(original.getModificationTime());
+  }
+
+  private static void assertMetadataUnchanged(OzoneKeyDetails original, OzoneKeyDetails rewritten) {
+    assertEquals(original.getOwner(), rewritten.getOwner());
+    assertEquals(original.getMetadata(), rewritten.getMetadata());
+  }
+
   @Test
   public void testCheckUsedBytesQuota() throws IOException {
     String volumeName = UUID.randomUUID().toString();
@@ -3956,15 +4178,28 @@ private void completeMultipartUpload(OzoneBucket bucket, String keyName,
     assertNotNull(omMultipartUploadCompleteInfo.getHash());
   }
 
-  private void createTestKey(OzoneBucket bucket, String keyName,
-                             String keyValue) throws IOException {
-    OzoneOutputStream out = bucket.createKey(keyName,
-        keyValue.getBytes(UTF_8).length, RATIS,
-        ONE, new HashMap<>());
-    out.write(keyValue.getBytes(UTF_8));
-    out.close();
-    OzoneKey key = bucket.getKey(keyName);
+  private OzoneKeyDetails createTestKey(OzoneBucket bucket) throws IOException {
+    return createTestKey(bucket, getTestName(), UUID.randomUUID().toString());
+  }
+
+  private OzoneKeyDetails createTestKey(
+      OzoneBucket bucket, String keyName, String keyValue
+  ) throws IOException {
+    return createTestKey(bucket, keyName, keyValue.getBytes(UTF_8));
+  }
+
+  private OzoneKeyDetails createTestKey(
+      OzoneBucket bucket, String keyName, byte[] bytes
+  ) throws IOException {
+    RatisReplicationConfig replication = RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE);
+    Map<String, String> metadata = singletonMap("key", RandomStringUtils.randomAscii(10));
+    try (OzoneOutputStream out = bucket.createKey(keyName, bytes.length, replication, metadata)) {
+      out.write(bytes);
+    }
+    OzoneKeyDetails key = bucket.getKey(keyName);
+    assertNotNull(key);
     assertEquals(keyName, key.getName());
+    return key;
   }
 
   private void assertKeyRenamedEx(OzoneBucket bucket, String keyName) {
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 5f5a50d..8640343 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1029,6 +1029,14 @@
 
     // S3 object tags support
     repeated hadoop.hdds.KeyValue tags = 22;
+
+    // expectedDataGeneration, when used in key creation indicates that a
+    // key with the same keyName should exist with the given generation.
+    // For a key commit to succeed, the original key should still be present with the
+    // generation unchanged.
+    // This allows a key to be created an committed atomically if the original has not
+    // been modified.
+    optional uint64 expectedDataGeneration = 23;
 }
 
 message KeyLocation {
@@ -1113,6 +1121,13 @@
     optional bool isFile = 19;
     optional string ownerName = 20;
     repeated hadoop.hdds.KeyValue tags = 21;
+  // expectedDataGeneration, when used in key creation indicates that a
+  // key with the same keyName should exist with the given generation.
+  // For a key commit to succeed, the original key should still be present with the
+  // generation unchanged.
+  // This allows a key to be created an committed atomically if the original has not
+  // been modified.
+    optional uint64 expectedDataGeneration = 22;
 }
 
 message BasicKeyInfo {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java
index 78e67bb..bf12a1d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java
@@ -82,6 +82,10 @@ default Map<String, String> buildKeyArgsAuditMap(KeyArgs keyArgs) {
         auditMap.put(OzoneConsts.REPLICATION_CONFIG,
             ECReplicationConfig.toString(keyArgs.getEcReplicationConfig()));
       }
+      if (keyArgs.hasExpectedDataGeneration()) {
+        auditMap.put(OzoneConsts.REWRITE_GENERATION,
+            String.valueOf(keyArgs.getExpectedDataGeneration()));
+      }
       for (HddsProtos.KeyValue item : keyArgs.getMetadataList()) {
         if (ETAG.equals(item.getKey())) {
           auditMap.put(ETAG, item.getValue());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 5c3593b..7d31422 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -240,6 +240,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
         throw new OMException("Failed to " + action + " key, as " + dbOpenKey +
             " entry is not found in the OpenKey table", KEY_NOT_FOUND);
       }
+
+      validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap);
+      // Optimistic locking validation has passed. Now set the rewrite fields to null so they are
+      // not persisted in the key table.
+      omKeyInfo.setExpectedDataGeneration(null);
+
       omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
           commitKeyArgs.getMetadataList()));
       if (isHSync) {
@@ -497,4 +503,22 @@ public static OMRequest disallowHsync(
     }
     return req;
   }
+
+  protected void validateAtomicRewrite(OmKeyInfo existing, OmKeyInfo toCommit, Map<String, String> auditMap)
+      throws OMException {
+    if (toCommit.getExpectedDataGeneration() != null) {
+      // These values are not passed in the request keyArgs, so add them into the auditMap if they are present
+      // in the open key entry.
+      auditMap.put(OzoneConsts.REWRITE_GENERATION, String.valueOf(toCommit.getExpectedDataGeneration()));
+      if (existing == null) {
+        throw new OMException("Atomic rewrite is not allowed for a new key", KEY_NOT_FOUND);
+      }
+      if (!toCommit.getExpectedDataGeneration().equals(existing.getUpdateID())) {
+        throw new OMException("Cannot commit as current generation (" + existing.getUpdateID() +
+            ") does not match the expected generation to rewrite (" + toCommit.getExpectedDataGeneration() + ")",
+            KEY_NOT_FOUND);
+      }
+    }
+  }
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index 704e9e9..d2cd3fd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -148,7 +148,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
           action = "hsync";
         }
         throw new OMException("Failed to " + action + " key, as " +
-                dbOpenFileKey + "entry is not found in the OpenKey table",
+                dbOpenFileKey + " entry is not found in the OpenKey table",
                 KEY_NOT_FOUND);
       }
       omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
@@ -176,6 +176,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
       Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
       OmKeyInfo keyToDelete =
           omMetadataManager.getKeyTable(getBucketLayout()).get(dbFileKey);
+
+      validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap);
+      // Optimistic locking validation has passed. Now set the rewrite fields to null so they are
+      // not persisted in the key table.
+      omKeyInfo.setExpectedDataGeneration(null);
+
       if (null != keyToDelete) {
         final String clientIdString
             = String.valueOf(commitKeyRequest.getClientID());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index e9a9f00..36f2a8c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -231,6 +231,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
           keyName);
       OmKeyInfo dbKeyInfo = omMetadataManager.getKeyTable(getBucketLayout())
           .getIfExist(dbKeyName);
+      validateAtomicRewrite(dbKeyInfo, keyArgs);
 
       OmBucketInfo bucketInfo =
           getBucketInfo(omMetadataManager, volumeName, bucketName);
@@ -440,4 +441,17 @@ public static OMRequest blockCreateKeyWithBucketLayoutFromOldClient(
     }
     return req;
   }
+
+  protected void validateAtomicRewrite(OmKeyInfo dbKeyInfo, KeyArgs keyArgs)
+      throws OMException {
+    if (keyArgs.hasExpectedDataGeneration()) {
+      // If a key does not exist, or if it exists but the updateID do not match, then fail this request.
+      if (dbKeyInfo == null) {
+        throw new OMException("Key not found during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND);
+      }
+      if (dbKeyInfo.getUpdateID() != keyArgs.getExpectedDataGeneration()) {
+        throw new OMException("Generation mismatch during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND);
+      }
+    }
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
index 6fe8c12..d8d9b19 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
@@ -120,6 +120,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
         dbFileInfo = OMFileRequest.getOmKeyInfoFromFileTable(false,
                 omMetadataManager, dbFileKey, keyName);
       }
+      validateAtomicRewrite(dbFileInfo, keyArgs);
 
       // Check if a file or directory exists with same key name.
       if (pathInfoFSO.getDirectoryResult() == DIRECTORY_EXISTS) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 50bb105..a90d7a2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -775,7 +775,6 @@ protected OmKeyInfo prepareFileInfo(
       dbKeyInfo.setReplicationConfig(replicationConfig);
 
       // Construct a new metadata map from KeyArgs.
-      // Clear the old one when the key is overwritten.
       dbKeyInfo.getMetadata().clear();
       dbKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
           keyArgs.getMetadataList()));
@@ -786,6 +785,10 @@ protected OmKeyInfo prepareFileInfo(
       dbKeyInfo.getTags().putAll(KeyValueUtil.getFromProtobuf(
           keyArgs.getTagsList()));
 
+      if (keyArgs.hasExpectedDataGeneration()) {
+        dbKeyInfo.setExpectedDataGeneration(keyArgs.getExpectedDataGeneration());
+      }
+
       dbKeyInfo.setFileEncryptionInfo(encInfo);
       return dbKeyInfo;
     }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index 9719865..0503578 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -31,11 +32,13 @@
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
@@ -58,9 +61,12 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -116,7 +122,7 @@ public void testValidateAndUpdateCacheWithUnknownBlockId() throws Exception {
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
 
-    assertEquals(OzoneManagerProtocolProtos.Status.OK,
+    assertEquals(OK,
         omClientResponse.getOMResponse().getStatus());
 
     // Entry should be deleted from openKey Table.
@@ -183,7 +189,7 @@ public void testValidateAndUpdateCache() throws Exception {
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
 
-    assertEquals(OzoneManagerProtocolProtos.Status.OK,
+    assertEquals(OK,
         omClientResponse.getOMResponse().getStatus());
 
     // Entry should be deleted from openKey Table.
@@ -219,6 +225,68 @@ public void testValidateAndUpdateCache() throws Exception {
   }
 
   @Test
+  public void testAtomicRewrite() throws Exception {
+    Table<String, OmKeyInfo> openKeyTable = omMetadataManager.getOpenKeyTable(getBucketLayout());
+    Table<String, OmKeyInfo> closedKeyTable = omMetadataManager.getKeyTable(getBucketLayout());
+
+    OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest());
+    OMKeyCommitRequest omKeyCommitRequest = getOmKeyCommitRequest(modifiedOmRequest);
+    KeyArgs keyArgs = modifiedOmRequest.getCommitKeyRequest().getKeyArgs();
+
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager, omKeyCommitRequest.getBucketLayout());
+
+    // Append new blocks
+    List<OmKeyLocationInfo> allocatedLocationList =
+        keyArgs.getKeyLocationsList().stream()
+            .map(OmKeyLocationInfo::getFromProtobuf)
+            .collect(Collectors.toList());
+
+    OmKeyInfo.Builder omKeyInfoBuilder = OMRequestTestUtils.createOmKeyInfo(
+        volumeName, bucketName, keyName, replicationConfig, new OmKeyLocationInfoGroup(version, new ArrayList<>()));
+    omKeyInfoBuilder.setExpectedDataGeneration(1L);
+    OmKeyInfo omKeyInfo = omKeyInfoBuilder.build();
+    omKeyInfo.appendNewBlocks(allocatedLocationList, false);
+    List<OzoneAcl> acls = Collections.singletonList(OzoneAcl.parseAcl("user:foo:rw"));
+    omKeyInfo.addAcl(acls.get(0));
+
+    String openKey = addKeyToOpenKeyTable(allocatedLocationList, omKeyInfo);
+    OmKeyInfo openKeyInfo = openKeyTable.get(openKey);
+    assertNotNull(openKeyInfo);
+    assertEquals(acls, openKeyInfo.getAcls());
+    // At this stage, we have an openKey, with rewrite generation of 1.
+    // However there is no closed key entry, so the commit should fail.
+    OMClientResponse omClientResponse =
+        omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
+    assertEquals(KEY_NOT_FOUND, omClientResponse.getOMResponse().getStatus());
+
+    // Now add the key to the key table, and try again, but with different generation
+    omKeyInfoBuilder.setExpectedDataGeneration(null);
+    omKeyInfoBuilder.setUpdateID(0L);
+    OmKeyInfo invalidKeyInfo = omKeyInfoBuilder.build();
+    closedKeyTable.put(getOzonePathKey(), invalidKeyInfo);
+    // This should fail as the updateID ia zero and the open key has rewrite generation of 1.
+    omClientResponse = omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
+    assertEquals(KEY_NOT_FOUND, omClientResponse.getOMResponse().getStatus());
+
+    omKeyInfoBuilder.setUpdateID(1L);
+    OmKeyInfo closedKeyInfo = omKeyInfoBuilder.build();
+
+    closedKeyTable.delete(getOzonePathKey());
+    closedKeyTable.put(getOzonePathKey(), closedKeyInfo);
+
+    // Now the key should commit as the updateID and rewrite Generation match.
+    omClientResponse = omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
+    assertEquals(OK, omClientResponse.getOMResponse().getStatus());
+
+    OmKeyInfo committedKey = closedKeyTable.get(getOzonePathKey());
+    assertNull(committedKey.getExpectedDataGeneration());
+    // Generation should be changed
+    assertNotEquals(closedKeyInfo.getGeneration(), committedKey.getGeneration());
+    assertEquals(acls, committedKey.getAcls());
+  }
+
+  @Test
   public void testValidateAndUpdateCacheWithUncommittedBlocks()
       throws Exception {
 
@@ -260,7 +328,7 @@ public void testValidateAndUpdateCacheWithUncommittedBlocks()
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
 
-    assertEquals(OzoneManagerProtocolProtos.Status.OK,
+    assertEquals(OK,
         omClientResponse.getOMResponse().getStatus());
 
     Map<String, RepeatedOmKeyInfo> toDeleteKeyList
@@ -385,7 +453,7 @@ private Map<String, RepeatedOmKeyInfo> doKeyCommit(boolean isHSync,
     
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
-    assertEquals(OzoneManagerProtocolProtos.Status.OK,
+    assertEquals(OK,
         omClientResponse.getOMResponse().getStatus());
 
     // Key should be present in both OpenKeyTable and KeyTable with HSync commit
@@ -550,7 +618,7 @@ public void testValidateAndUpdateCacheWithKeyNotFound() throws Exception {
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
 
-    assertEquals(OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND,
+    assertEquals(KEY_NOT_FOUND,
         omClientResponse.getOMResponse().getStatus());
 
     omKeyInfo =
@@ -596,7 +664,7 @@ public void testValidateAndUpdateCacheOnOverwrite() throws Exception {
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 102L);
 
-    assertEquals(OzoneManagerProtocolProtos.Status.OK, omClientResponse.getOMResponse().getStatus());
+    assertEquals(OK, omClientResponse.getOMResponse().getStatus());
 
     // New entry should be created in key Table.
     omKeyInfo = omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout()).get(ozoneKey);
@@ -753,6 +821,14 @@ protected String addKeyToOpenKeyTable(List<OmKeyLocationInfo> locationList)
   }
 
   @Nonnull
+  protected String addKeyToOpenKeyTable(List<OmKeyLocationInfo> locationList, OmKeyInfo keyInfo) throws Exception {
+    OMRequestTestUtils.addKeyToTable(true, false, keyInfo, clientID, 0, omMetadataManager);
+
+    return omMetadataManager.getOpenKey(volumeName, bucketName,
+        keyName, clientID);
+  }
+
+  @Nonnull
   protected OMKeyCommitRequest getOmKeyCommitRequest(OMRequest omRequest) {
     return new OMKeyCommitRequest(omRequest, BucketLayout.DEFAULT);
   }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
index 48cc527..c3b913b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
@@ -69,30 +69,36 @@ protected String getOzonePathKey() throws IOException {
   }
 
   @Override
-  protected String addKeyToOpenKeyTable(List<OmKeyLocationInfo> locationList)
+  protected String addKeyToOpenKeyTable(List<OmKeyLocationInfo> locationList, OmKeyInfo keyInfo)
       throws Exception {
     // need to initialize parentID
     if (getParentDir() == null) {
       parentID = getBucketID();
     } else {
       parentID = OMRequestTestUtils.addParentsToDirTable(volumeName,
-              bucketName, getParentDir(), omMetadataManager);
+          bucketName, getParentDir(), omMetadataManager);
     }
+    keyInfo.setParentObjectID(parentID);
+    keyInfo.appendNewBlocks(locationList, false);
+
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    return OMRequestTestUtils.addFileToKeyTable(true, false,
+        fileName, keyInfo, clientID, txnLogId, omMetadataManager);
+
+  }
+
+  @Override
+  protected String addKeyToOpenKeyTable(List<OmKeyLocationInfo> locationList)
+      throws Exception {
     long objectId = 100;
 
     OmKeyInfo omKeyInfoFSO =
         OMRequestTestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
                 RatisReplicationConfig.getInstance(ONE), new OmKeyLocationInfoGroup(version, new ArrayList<>(), false))
             .setObjectID(objectId)
-            .setParentObjectID(parentID)
             .setUpdateID(100L)
             .build();
-    omKeyInfoFSO.appendNewBlocks(locationList, false);
-
-    String fileName = OzoneFSUtils.getFileName(keyName);
-    return OMRequestTestUtils.addFileToKeyTable(true, false,
-            fileName, omKeyInfoFSO, clientID, txnLogId, omMetadataManager);
-
+    return addKeyToOpenKeyTable(locationList, omKeyInfoFSO);
   }
 
   @Nonnull
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
index 166edb5..4bfdd33 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -63,6 +62,8 @@
     .OMRequest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
@@ -71,6 +72,7 @@
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
 import static org.apache.hadoop.ozone.om.request.OMRequestTestUtils.addVolumeAndBucketToDB;
 import static org.apache.hadoop.ozone.om.request.OMRequestTestUtils.createOmKeyInfo;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.NOT_A_FILE;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -121,9 +123,9 @@ private void preExecuteTest(boolean isMultipartKey, int partNumber,
     long scmBlockSize = ozoneManager.getScmBlockSize();
     for (int i = 0; i <= repConfig.getRequiredNodes(); i++) {
       doPreExecute(createKeyRequest(isMultipartKey, partNumber,
-          scmBlockSize * i, repConfig));
+          scmBlockSize * i, repConfig, null));
       doPreExecute(createKeyRequest(isMultipartKey, partNumber,
-          scmBlockSize * i + 1, repConfig));
+          scmBlockSize * i + 1, repConfig, null));
     }
   }
 
@@ -148,7 +150,7 @@ public void testValidateAndUpdateCache(
     tags.put("tag-key2", "tag-value2");
 
     OMRequest modifiedOmRequest =
-        doPreExecute(createKeyRequest(false, 0, Collections.emptyMap(), tags));
+        doPreExecute(createKeyRequest(false, 0, emptyMap(), tags));
 
     OMKeyCreateRequest omKeyCreateRequest =
         getOMKeyCreateRequest(modifiedOmRequest);
@@ -197,7 +199,7 @@ public void testValidateAndUpdateCache(
 
     // Override same key again
     modifiedOmRequest =
-        doPreExecute(createKeyRequest(false, 0, Collections.emptyMap(), tags));
+        doPreExecute(createKeyRequest(false, 0, emptyMap(), tags));
 
     id = modifiedOmRequest.getCreateKeyRequest().getClientID();
     openKey = getOpenKey(id);
@@ -534,7 +536,7 @@ public void testCreationWithoutMetadataFollowedByOverwriteWithMetadata(
 
     // Create the key request without any initial metadata
     OMRequest createRequestWithoutMetadata = createKeyRequest(false, 0, keyName,
-        null); // Passing 'null' for metadata
+        null, emptyMap(), emptyList()); // Passing 'null' for metadata
     OMKeyCreateRequest createOmKeyCreateRequest =
         new OMKeyCreateRequest(createRequestWithoutMetadata, getBucketLayout());
 
@@ -557,7 +559,7 @@ public void testCreationWithoutMetadataFollowedByOverwriteWithMetadata(
 
     // Overwrite the previously created key with new metadata
     OMRequest overwriteRequestWithMetadata =
-        createKeyRequest(false, 0, keyName, overwriteMetadata);
+        createKeyRequest(false, 0, keyName, overwriteMetadata, emptyMap(), emptyList());
     OMKeyCreateRequest overwriteOmKeyCreateRequest =
         new OMKeyCreateRequest(overwriteRequestWithMetadata, getBucketLayout());
 
@@ -657,23 +659,23 @@ private OMRequest doPreExecute(OMRequest originalOMRequest) throws Exception {
 
   @SuppressWarnings("parameterNumber")
   protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber) {
-    return createKeyRequest(isMultipartKey, partNumber, Collections.emptyMap(), Collections.emptyMap());
+    return createKeyRequest(isMultipartKey, partNumber, emptyMap(), emptyMap());
   }
 
   protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
                                        Map<String, String> metadata, Map<String, String> tags) {
-    return createKeyRequest(isMultipartKey, partNumber, keyName, metadata, tags);
+    return createKeyRequest(isMultipartKey, partNumber, keyName, metadata, tags, emptyList());
   }
 
   private OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
                                      String keyName) {
-    return createKeyRequest(isMultipartKey, partNumber, keyName, null);
+    return createKeyRequest(isMultipartKey, partNumber, keyName, emptyMap());
   }
 
   protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
                                        String keyName,
                                        Map<String, String> metadata) {
-    return createKeyRequest(isMultipartKey, partNumber, keyName, metadata, null);
+    return createKeyRequest(isMultipartKey, partNumber, keyName, metadata, emptyMap(), emptyList());
   }
 
   /**
@@ -693,7 +695,8 @@ protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
   protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
                                        String keyName,
                                        Map<String, String> metadata,
-                                       Map<String, String> tags) {
+                                       Map<String, String> tags,
+                                       List<OzoneAcl> acls) {
     KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
@@ -704,6 +707,9 @@ protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
         .setType(replicationConfig.getReplicationType())
         .setLatestVersionLocation(true);
 
+    for (OzoneAcl acl : acls) {
+      keyArgs.addAcls(OzoneAcl.toProtobuf(acl));
+    }
     // Configure for multipart upload, if applicable
     if (isMultipartKey) {
       keyArgs.setDataSize(dataSize).setMultipartNumber(partNumber);
@@ -733,7 +739,14 @@ protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
 
   private OMRequest createKeyRequest(
       boolean isMultipartKey, int partNumber, long keyLength,
-      ReplicationConfig repConfig) {
+      ReplicationConfig repConfig, Long expectedDataGeneration) {
+    return createKeyRequest(isMultipartKey, partNumber, keyLength, repConfig,
+        expectedDataGeneration, null);
+  }
+
+  private OMRequest createKeyRequest(
+      boolean isMultipartKey, int partNumber, long keyLength,
+      ReplicationConfig repConfig, Long expectedDataGeneration, Map<String, String> metaData) {
 
     KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
         .setVolumeName(volumeName).setBucketName(bucketName)
@@ -752,8 +765,17 @@ private OMRequest createKeyRequest(
     if (isMultipartKey) {
       keyArgs.setMultipartNumber(partNumber);
     }
+    if (expectedDataGeneration != null) {
+      keyArgs.setExpectedDataGeneration(expectedDataGeneration);
+    }
+    if (metaData != null) {
+      metaData.forEach((key, value) -> keyArgs.addMetadata(KeyValue.newBuilder()
+          .setKey(key)
+          .setValue(value)
+          .build()));
+    }
 
-    OzoneManagerProtocolProtos.CreateKeyRequest createKeyRequest =
+    CreateKeyRequest createKeyRequest =
         CreateKeyRequest.newBuilder().setKeyArgs(keyArgs).build();
 
     return OMRequest.newBuilder()
@@ -935,6 +957,72 @@ public void testKeyCreateInheritParentDefaultAcls(
 
   }
 
+  @ParameterizedTest
+  @MethodSource("data")
+  public void testAtomicRewrite(
+      boolean setKeyPathLock, boolean setFileSystemPaths) throws Exception {
+    when(ozoneManager.getOzoneLockProvider()).thenReturn(
+        new OzoneLockProvider(setKeyPathLock, setFileSystemPaths));
+
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(getBucketLayout()));
+
+    // First, create a key with the rewrite ID - this should fail as no key exists
+    OMRequest omRequest = createKeyRequest(false, 0, 100,
+        RatisReplicationConfig.getInstance(THREE), 1L);
+    omRequest = doPreExecute(omRequest);
+    OMKeyCreateRequest omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
+    OMClientResponse response = omKeyCreateRequest.validateAndUpdateCache(ozoneManager, 105L);
+    assertEquals(KEY_NOT_FOUND, response.getOMResponse().getStatus());
+
+    // Now pre-create the key in the system so we can rewrite it.
+    Map<String, String> metadata = Collections.singletonMap("metakey", "metavalue");
+    Map<String, String> reWriteMetadata = Collections.singletonMap("metakey", "rewriteMetavalue");
+
+    List<OzoneAcl> acls = Collections.singletonList(OzoneAcl.parseAcl("user:foo:rw"));
+    OmKeyInfo createdKeyInfo = createAndCheck(keyName, metadata, acls);
+    // Commit openKey entry.
+    omMetadataManager.getKeyTable(getBucketLayout()).put(getOzoneKey(), createdKeyInfo);
+
+    // Retrieve the committed key info
+    OmKeyInfo existingKeyInfo = omMetadataManager.getKeyTable(getBucketLayout()).get(getOzoneKey());
+    List<OzoneAcl> existingAcls = existingKeyInfo.getAcls();
+    assertEquals(acls, existingAcls);
+
+    // Create a request with a generation which doesn't match the current key
+    omRequest = createKeyRequest(false, 0, 100,
+        RatisReplicationConfig.getInstance(THREE), existingKeyInfo.getGeneration() + 1, reWriteMetadata);
+    omRequest = doPreExecute(omRequest);
+    omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
+    response = omKeyCreateRequest.validateAndUpdateCache(ozoneManager, 105L);
+    // Still fails, as the matching key is not present.
+    assertEquals(KEY_NOT_FOUND, response.getOMResponse().getStatus());
+
+    // Now create the key with the correct rewrite generation
+    omRequest = createKeyRequest(false, 0, 100,
+        RatisReplicationConfig.getInstance(THREE), existingKeyInfo.getGeneration(), reWriteMetadata);
+    omRequest = doPreExecute(omRequest);
+    omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
+    response = omKeyCreateRequest.validateAndUpdateCache(ozoneManager, 105L);
+    assertEquals(OK, response.getOMResponse().getStatus());
+
+    OmKeyInfo openKeyInfo = omMetadataManager.getOpenKeyTable(getBucketLayout())
+        .get(getOpenKey(omRequest.getCreateKeyRequest().getClientID()));
+
+    assertEquals(existingKeyInfo.getGeneration(), openKeyInfo.getExpectedDataGeneration());
+    // Creation time should remain the same on rewrite.
+    assertEquals(existingKeyInfo.getCreationTime(), openKeyInfo.getCreationTime());
+    // Update ID should change
+    assertNotEquals(existingKeyInfo.getGeneration(), openKeyInfo.getGeneration());
+    assertEquals(metadata, existingKeyInfo.getMetadata());
+    // The metadata should not be copied from the existing key. It should be passed in the request.
+    assertEquals(reWriteMetadata, openKeyInfo.getMetadata());
+    // Ensure the ACLS are copied over from the existing key.
+    assertEquals(existingAcls, openKeyInfo.getAcls());
+  }
+
   /**
    * Leaf file has ACCESS scope acls which inherited
    * from parent DEFAULT acls.
@@ -991,9 +1079,13 @@ private void checkNotAFile(String keyName) throws Exception {
     assertEquals(NOT_A_FILE, omClientResponse.getOMResponse().getStatus());
   }
 
-
   private void createAndCheck(String keyName) throws Exception {
-    OMRequest omRequest = createKeyRequest(false, 0, keyName);
+    createAndCheck(keyName, emptyMap(), emptyList());
+  }
+
+  private OmKeyInfo createAndCheck(String keyName, Map<String, String> metadata, List<OzoneAcl> acls)
+      throws Exception {
+    OMRequest omRequest = createKeyRequest(false, 0, keyName, metadata, emptyMap(), acls);
 
     OMKeyCreateRequest omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
 
@@ -1006,16 +1098,13 @@ private void createAndCheck(String keyName) throws Exception {
 
     assertEquals(OK, omClientResponse.getOMResponse().getStatus());
 
-    checkCreatedPaths(omKeyCreateRequest, omRequest, keyName);
+    return checkCreatedPaths(omKeyCreateRequest, omRequest, keyName);
   }
 
-  protected void checkCreatedPaths(
+  protected OmKeyInfo checkCreatedPaths(
       OMKeyCreateRequest omKeyCreateRequest, OMRequest omRequest,
       String keyName) throws Exception {
     keyName = omKeyCreateRequest.validateAndNormalizeKey(true, keyName);
-    // Check intermediate directories created or not.
-    Path keyPath = Paths.get(keyName);
-    checkIntermediatePaths(keyPath);
 
     // Check open key entry
     String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
@@ -1024,6 +1113,7 @@ protected void checkCreatedPaths(
         omMetadataManager.getOpenKeyTable(omKeyCreateRequest.getBucketLayout())
             .get(openKey);
     assertNotNull(omKeyInfo);
+    return omKeyInfo;
   }
 
   protected long checkIntermediatePaths(Path keyPath) throws Exception {
@@ -1048,7 +1138,7 @@ protected String getOzoneKey() throws IOException {
   }
 
   protected OMKeyCreateRequest getOMKeyCreateRequest(OMRequest omRequest) {
-    return new OMKeyCreateRequest(omRequest, BucketLayout.DEFAULT);
+    return new OMKeyCreateRequest(omRequest, getBucketLayout());
   }
 
   protected OMKeyCreateRequest getOMKeyCreateRequest(
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
index 2a25a9b..a5181b2 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
@@ -117,7 +117,7 @@ protected void addToKeyTable(String keyName) throws Exception {
   }
 
   @Override
-  protected void checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
+  protected OmKeyInfo checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
       OMRequest omRequest, String keyName) throws Exception {
     keyName = omKeyCreateRequest.validateAndNormalizeKey(true, keyName,
         BucketLayout.FILE_SYSTEM_OPTIMIZED);
@@ -139,6 +139,7 @@ protected void checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
         omMetadataManager.getOpenKeyTable(omKeyCreateRequest.getBucketLayout())
             .get(openKey);
     assertNotNull(omKeyInfo);
+    return omKeyInfo;
   }
 
   @Override
@@ -152,6 +153,11 @@ protected long checkIntermediatePaths(Path keyPath) throws Exception {
     long lastKnownParentId = omBucketInfo.getObjectID();
     final long volumeId = omMetadataManager.getVolumeId(volumeName);
 
+    if (keyPath == null) {
+      // The file is at the root of the bucket, so it has no parent folder. The parent is
+      // the bucket itself.
+      return lastKnownParentId;
+    }
     Iterator<Path> elements = keyPath.iterator();
     StringBuilder fullKeyPath = new StringBuilder(bucketKey);
     while (elements.hasNext()) {
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index b75167c..bea831a 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -241,6 +241,14 @@ public OzoneOutputStream createKey(String volumeName, String bucketName,
   }
 
   @Override
+  public OzoneOutputStream rewriteKey(String volumeName, String bucketName, String keyName,
+      long size, long existingKeyGeneration, ReplicationConfig replicationConfig,
+      Map<String, String> metadata) throws IOException {
+    return getBucket(volumeName, bucketName)
+        .rewriteKey(keyName, size, existingKeyGeneration, replicationConfig, metadata);
+  }
+
+  @Override
   public OzoneInputStream getKey(String volumeName, String bucketName,
                                  String keyName) throws IOException {
     return getBucket(volumeName, bucketName).readKey(keyName);
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 3320801..06b6a8e 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -161,6 +161,38 @@ public void close() throws IOException {
   }
 
   @Override
+  public OzoneOutputStream rewriteKey(String keyName, long size, long existingKeyGeneration,
+      ReplicationConfig rConfig, Map<String, String> metadata) throws IOException {
+    final ReplicationConfig repConfig;
+    if (rConfig == null) {
+      repConfig = getReplicationConfig();
+    } else {
+      repConfig = rConfig;
+    }
+    ReplicationConfig finalReplicationCon = repConfig;
+    ByteArrayOutputStream byteArrayOutputStream =
+        new KeyMetadataAwareOutputStream(metadata) {
+          @Override
+          public void close() throws IOException {
+            keyContents.put(keyName, toByteArray());
+            keyDetails.put(keyName, new OzoneKeyDetails(
+                getVolumeName(),
+                getName(),
+                keyName,
+                size,
+                System.currentTimeMillis(),
+                System.currentTimeMillis(),
+                new ArrayList<>(), finalReplicationCon, metadata, null,
+                () -> readKey(keyName), true, null, null
+            ));
+            super.close();
+          }
+        };
+
+    return new OzoneOutputStream(byteArrayOutputStream, null);
+  }
+
+  @Override
   public OzoneDataStreamOutput createStreamKey(String key, long size,
                                                ReplicationConfig rConfig,
                                                Map<String, String> keyMetadata,
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/KeyCommands.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/KeyCommands.java
index 726ab8b..bbef584 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/KeyCommands.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/KeyCommands.java
@@ -44,6 +44,7 @@
         CatKeyHandler.class,
         PutKeyHandler.class,
         RenameKeyHandler.class,
+        RewriteKeyHandler.class,
         CopyKeyHandler.class,
         DeleteKeyHandler.class,
         AddAclKeyHandler.class,
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
index 68beb69..833f4f7 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
@@ -19,13 +19,13 @@
 package org.apache.hadoop.ozone.shell.keys;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -40,6 +40,7 @@
 import org.apache.hadoop.ozone.client.OzoneClientException;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.shell.OzoneAddress;
 
 import org.apache.commons.codec.digest.DigestUtils;
@@ -68,6 +69,10 @@ public class PutKeyHandler extends KeyHandler {
   @Mixin
   private ShellReplicationOptions replication;
 
+  @Option(names = "--expectedGeneration",
+      description = "Store key only if it already exists and its generation matches the value provided")
+  private long expectedGeneration;
+
   @Override
   protected void execute(OzoneClient client, OzoneAddress address)
       throws IOException, OzoneClientException {
@@ -79,7 +84,7 @@ protected void execute(OzoneClient client, OzoneAddress address)
     File dataFile = new File(fileName);
 
     if (isVerbose()) {
-      try (InputStream stream = new FileInputStream(dataFile)) {
+      try (InputStream stream = Files.newInputStream(dataFile.toPath())) {
         String hash = DigestUtils.sha256Hex(stream);
         out().printf("File sha256 checksum : %s%n", hash);
       }
@@ -109,7 +114,7 @@ protected void execute(OzoneClient client, OzoneAddress address)
     }
   }
 
-  void async(
+  private void async(
       File dataFile, OzoneBucket bucket,
       String keyName, Map<String, String> keyMetadata,
       ReplicationConfig replicationConfig, int chunkSize)
@@ -117,14 +122,21 @@ void async(
     if (isVerbose()) {
       out().println("API: async");
     }
-    try (InputStream input = new FileInputStream(dataFile);
-         OutputStream output = bucket.createKey(keyName, dataFile.length(),
-             replicationConfig, keyMetadata)) {
+    try (InputStream input = Files.newInputStream(dataFile.toPath());
+         OutputStream output = createOrReplaceKey(bucket, keyName, dataFile.length(), keyMetadata, replicationConfig)) {
       IOUtils.copyBytes(input, output, chunkSize);
     }
   }
 
-  void stream(
+  private OzoneOutputStream createOrReplaceKey(OzoneBucket bucket, String keyName,
+      long size, Map<String, String> keyMetadata, ReplicationConfig replicationConfig
+  ) throws IOException {
+    return expectedGeneration > 0
+        ? bucket.rewriteKey(keyName, size, expectedGeneration, replicationConfig, keyMetadata)
+        : bucket.createKey(keyName, size, replicationConfig, keyMetadata);
+  }
+
+  private void stream(
       File dataFile, OzoneBucket bucket,
       String keyName, Map<String, String> keyMetadata,
       ReplicationConfig replicationConfig, int chunkSize)
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/RewriteKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/RewriteKeyHandler.java
new file mode 100644
index 0000000..c6f97c6
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/RewriteKeyHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.shell.keys;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientException;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.shell.OzoneAddress;
+import org.apache.hadoop.ozone.shell.ShellReplicationOptions;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.apache.hadoop.ozone.OzoneConsts.MB;
+
+/**
+ * Rewrite a key with different replication.
+ */
+@CommandLine.Command(name = "rewrite",
+    description = "Rewrites the key with different replication")
+public class RewriteKeyHandler extends KeyHandler {
+
+  @CommandLine.Mixin
+  private ShellReplicationOptions replication;
+
+  @Override
+  protected void execute(OzoneClient client, OzoneAddress address) throws IOException, OzoneClientException {
+    String volumeName = address.getVolumeName();
+    String bucketName = address.getBucketName();
+    String keyName = address.getKeyName();
+
+    OzoneVolume vol = client.getObjectStore().getVolume(volumeName);
+    OzoneBucket bucket = vol.getBucket(bucketName);
+    OzoneKeyDetails key = bucket.getKey(keyName);
+
+    ReplicationConfig newReplication = replication.fromParamsOrConfig(getConf());
+    if (newReplication == null) {
+      newReplication = key.getReplicationConfig().getReplicationType() == HddsProtos.ReplicationType.RATIS
+          ? new ECReplicationConfig(3, 2)
+          : RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+    } else if (newReplication.equals(key.getReplicationConfig())) {
+      System.err.println("Replication unchanged: " + key.getReplicationConfig());
+      return;
+    }
+
+    try (
+        InputStream input = bucket.readKey(keyName);
+        OutputStream output = bucket.rewriteKey(keyName, key.getDataSize(), key.getGeneration(),
+            newReplication, key.getMetadata())) {
+      IOUtils.copyBytes(input, output, (int) Math.min(MB, key.getDataSize()));
+    }
+  }
+}