HDDS-6889. EC: put key command with EC replication can use ReplicationConfig validator (#3565)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfigValidator.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfigValidator.java
index e95f4c7..7090c76 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfigValidator.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfigValidator.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.ConfigType;
 import org.apache.hadoop.hdds.conf.PostConstruct;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 
 import java.util.regex.Pattern;
 
@@ -53,7 +54,8 @@
       if (!validationRegexp.matcher(
           replicationConfig.configFormat()).matches()) {
         String replication = replicationConfig.getReplication();
-        if (replicationConfig instanceof ECReplicationConfig) {
+        if (HddsProtos.ReplicationType.EC ==
+                replicationConfig.getReplicationType()) {
           ECReplicationConfig ecConfig =
               (ECReplicationConfig) replicationConfig;
           replication =  ecConfig.getCodec() + "-" + ecConfig.getData() +
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index a5d1ded..6271d30 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -53,6 +53,7 @@
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfigValidator;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -1155,6 +1156,12 @@
             + " Erasure Coded replication.");
       }
     }
+
+    if (replicationConfig != null) {
+      ReplicationConfigValidator validator =
+              this.conf.getObject(ReplicationConfigValidator.class);
+      validator.validate(replicationConfig);
+    }
     String requestId = UUID.randomUUID().toString();
 
     OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index b8fc543..280eb35 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -149,10 +149,12 @@
 import static org.junit.Assert.fail;
 import static org.slf4j.event.Level.DEBUG;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 /**
@@ -196,7 +198,7 @@
     //  for testZReadKeyWithUnhealthyContainerReplica.
     conf.set("ozone.scm.stale.node.interval", "10s");
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(5)
+        .setNumDatanodes(14)
         .setTotalPipelineNumLimit(10)
         .setScmId(scmId)
         .setClusterId(clusterId)
@@ -863,6 +865,42 @@
     }
   }
 
+  @ParameterizedTest
+  @CsvSource({"rs-3-3-1024k,false", "xor-3-5-2048k,false",
+              "rs-3-2-1024k,true", "rs-6-3-1024k,true", "rs-10-4-1024k,true"})
+  public void testPutKeyWithReplicationConfig(String replicationValue,
+                                              boolean isValidReplicationConfig)
+          throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String keyName = UUID.randomUUID().toString();
+    String value = UUID.randomUUID().toString();
+    ReplicationConfig replicationConfig =
+            new ECReplicationConfig(replicationValue);
+    if (isValidReplicationConfig) {
+      OzoneOutputStream out = bucket.createKey(keyName,
+              value.getBytes(UTF_8).length, replicationConfig, new HashMap<>());
+      out.write(value.getBytes(UTF_8));
+      out.close();
+      OzoneKey key = bucket.getKey(keyName);
+      Assert.assertEquals(keyName, key.getName());
+      OzoneInputStream is = bucket.readKey(keyName);
+      byte[] fileContent = new byte[value.getBytes(UTF_8).length];
+      is.read(fileContent);
+      Assert.assertEquals(value, new String(fileContent, UTF_8));
+    } else {
+      Assertions.assertThrows(IllegalArgumentException.class,
+              () -> bucket.createKey(keyName, "dummy".getBytes(UTF_8).length,
+                      replicationConfig, new HashMap<>()));
+    }
+  }
+
   @Test
   public void testPutKey() throws IOException {
     String volumeName = UUID.randomUUID().toString();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
index 68c4e71..1e61d5e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
@@ -113,7 +113,7 @@
     CertificateClientTestImpl certificateClientTest =
         new CertificateClientTestImpl(conf);
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(10)
+        .setNumDatanodes(14)
         .setScmId(SCM_ID)
         .setClusterId(CLUSTER_ID)
         .setCertificateClient(certificateClientTest)