HDDS-7051. Fix offset Condition in ECKeyOutputStream (#3623)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index a3c197f..bcc6761 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -157,7 +157,7 @@
     }
     try {
       int writtenLen = 0;
-      while (off + writtenLen < len) {
+      while (writtenLen < len) {
         writtenLen += handleWrite(b, off + writtenLen, len - writtenLen);
       }
     } catch (Exception e) {
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 5003f8e..208ad82 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -262,33 +262,52 @@
   }
 
   @Test
+  public void testChunksInSingleWriteOpWithOffset() throws IOException {
+    testMultipleChunksInSingleWriteOp(100, 12, 11);
+  }
+
+  @Test
   public void test12ChunksInSingleWriteOp() throws IOException {
     testMultipleChunksInSingleWriteOp(12);
   }
 
-  public void testMultipleChunksInSingleWriteOp(int numChunks)
+  private void testMultipleChunksInSingleWriteOp(int numChunks)
+          throws IOException {
+    testMultipleChunksInSingleWriteOp(0, numChunks, numChunks);
+  }
+  private void testMultipleChunksInSingleWriteOp(int offset, int bufferChunks,
+                                                 int numChunks)
       throws IOException {
-    byte[] inputData = new byte[numChunks * chunkSize];
+    byte[] inputData = new byte[offset + bufferChunks * chunkSize];
     for (int i = 0; i < numChunks; i++) {
-      int start = (i * chunkSize);
+      int start = offset + (i * chunkSize);
       Arrays.fill(inputData, start, start + chunkSize - 1,
           String.valueOf(i % 9).getBytes(UTF_8)[0]);
     }
-    final OzoneBucket bucket = writeIntoECKey(inputData, keyName,
-        new DefaultReplicationConfig(ReplicationType.EC,
-            new ECReplicationConfig(dataBlocks, parityBlocks,
-                ECReplicationConfig.EcCodec.RS, chunkSize)));
+    final OzoneBucket bucket = writeIntoECKey(offset, numChunks * chunkSize,
+            inputData, keyName, new DefaultReplicationConfig(ReplicationType.EC,
+                    new ECReplicationConfig(dataBlocks, parityBlocks,
+                            ECReplicationConfig.EcCodec.RS, chunkSize)));
     OzoneKey key = bucket.getKey(keyName);
-    validateContent(inputData, bucket, key);
+    validateContent(offset, numChunks * chunkSize, inputData, bucket, key);
   }
 
-  private void validateContent(byte[] inputData, OzoneBucket bucket,
+  private void validateContent(byte[] inputData,
+                               OzoneBucket bucket,
+                               OzoneKey key) throws IOException {
+    validateContent(0, inputData.length, inputData, bucket, key);
+  }
+
+  private void validateContent(int offset, int length, byte[] inputData,
+                               OzoneBucket bucket,
       OzoneKey key) throws IOException {
     Assert.assertEquals(keyName, key.getName());
     try (OzoneInputStream is = bucket.readKey(keyName)) {
-      byte[] fileContent = new byte[inputData.length];
-      Assert.assertEquals(inputData.length, is.read(fileContent));
-      Assert.assertEquals(new String(inputData, UTF_8),
+      byte[] fileContent = new byte[length];
+      Assert.assertEquals(length, is.read(fileContent));
+      Assert.assertEquals(new String(Arrays.copyOfRange(inputData, offset,
+                      offset + length),
+                      UTF_8),
           new String(fileContent, UTF_8));
     }
   }
@@ -1139,11 +1158,29 @@
 
   private OzoneBucket writeIntoECKey(byte[] data, String key,
       DefaultReplicationConfig defaultReplicationConfig) throws IOException {
-    return writeIntoECKey(new byte[][] {data}, key, defaultReplicationConfig);
+    return writeIntoECKey(0, data.length, data, key, defaultReplicationConfig);
+  }
+  private OzoneBucket writeIntoECKey(int offset, int length, byte[] data,
+      String key, DefaultReplicationConfig defaultReplicationConfig)
+      throws IOException {
+    return writeIntoECKey(new int[]{offset}, new int[]{length},
+            new byte[][] {data}, key, defaultReplicationConfig);
   }
 
   private OzoneBucket writeIntoECKey(byte[][] chunks, String key,
       DefaultReplicationConfig defaultReplicationConfig) throws IOException {
+    int[] offsets = new int[chunks.length];
+    Arrays.fill(offsets, 0);
+    int[] lengths = Arrays.stream(chunks)
+            .mapToInt(chunk -> chunk.length).toArray();
+    return writeIntoECKey(offsets, lengths, chunks,
+            key, defaultReplicationConfig);
+  }
+
+  private OzoneBucket writeIntoECKey(int[] offsets, int[] lengths,
+                                     byte[][] chunks,
+                                     String key,
+      DefaultReplicationConfig defaultReplicationConfig) throws IOException {
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     if (defaultReplicationConfig != null) {
@@ -1160,7 +1197,7 @@
         new ECReplicationConfig(dataBlocks, parityBlocks,
             ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
       for (int i = 0; i < chunks.length; i++) {
-        out.write(chunks[i]);
+        out.write(chunks[i], offsets[i], lengths[i]);
       }
     }
     return bucket;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 4f0a091..e53f943 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -263,6 +263,11 @@
   }
 
   @Test
+  public void testChunksInSingleWriteOpWithOffset() throws IOException {
+    testMultipleChunksInSingleWriteOp(11, 25, 19);
+  }
+
+  @Test
   public void test15ChunksInSingleWriteOp() throws IOException {
     testMultipleChunksInSingleWriteOp(15);
   }
@@ -277,18 +282,28 @@
     testMultipleChunksInSingleWriteOp(21);
   }
 
-  public void testMultipleChunksInSingleWriteOp(int numChunks)
-      throws IOException {
-    byte[] inputData = getInputBytes(numChunks);
+  private void testMultipleChunksInSingleWriteOp(int offset,
+                                                int bufferChunks, int numChunks)
+          throws IOException {
+    byte[] inputData = getInputBytes(offset, bufferChunks, numChunks);
     final OzoneBucket bucket = getOzoneBucket();
-    String keyName = "testMultipleChunksInSingleWriteOp" + numChunks;
+    String keyName =
+            String.format("testMultipleChunksInSingleWriteOpOffset" +
+                    "%dBufferChunks%dNumChunks", offset, bufferChunks,
+                    numChunks);
     try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
         new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
             chunkSize), new HashMap<>())) {
-      out.write(inputData);
+      out.write(inputData, offset, numChunks * chunkSize);
     }
 
-    validateContent(inputData, bucket, bucket.getKey(keyName));
+    validateContent(offset, numChunks * chunkSize, inputData, bucket,
+            bucket.getKey(keyName));
+  }
+
+  private void testMultipleChunksInSingleWriteOp(int numChunks)
+      throws IOException {
+    testMultipleChunksInSingleWriteOp(0, numChunks, numChunks);
   }
 
   @Test
@@ -332,11 +347,18 @@
   }
 
   private void validateContent(byte[] inputData, OzoneBucket bucket,
+                               OzoneKey key) throws IOException {
+    validateContent(0, inputData.length, inputData, bucket, key);
+  }
+
+  private void validateContent(int offset, int length, byte[] inputData,
+                               OzoneBucket bucket,
       OzoneKey key) throws IOException {
     try (OzoneInputStream is = bucket.readKey(key.getName())) {
-      byte[] fileContent = new byte[inputData.length];
-      Assert.assertEquals(inputData.length, is.read(fileContent));
-      Assert.assertEquals(new String(inputData, UTF_8),
+      byte[] fileContent = new byte[length];
+      Assert.assertEquals(length, is.read(fileContent));
+      Assert.assertEquals(new String(Arrays.copyOfRange(inputData, offset,
+                      offset + length), UTF_8),
           new String(fileContent, UTF_8));
     }
   }
@@ -410,9 +432,13 @@
   }
 
   private byte[] getInputBytes(int numChunks) {
-    byte[] inputData = new byte[numChunks * chunkSize];
+    return getInputBytes(0, numChunks, numChunks);
+  }
+
+  private byte[] getInputBytes(int offset, int bufferChunks, int numChunks) {
+    byte[] inputData = new byte[offset + bufferChunks * chunkSize];
     for (int i = 0; i < numChunks; i++) {
-      int start = (i * chunkSize);
+      int start = offset + (i * chunkSize);
       Arrays.fill(inputData, start, start + chunkSize - 1,
           String.valueOf(i % 9).getBytes(UTF_8)[0]);
     }