HDDS-13224. Support CodecBuffer for KeyPrefixContainerCodec (#9061)

diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/KeyPrefixContainerCodec.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/KeyPrefixContainerCodec.java
index 52a6998..2e42057 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/KeyPrefixContainerCodec.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/KeyPrefixContainerCodec.java
@@ -21,9 +21,12 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
+import jakarta.annotation.Nonnull;
 import java.nio.ByteBuffer;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecException;
 import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
 
 /**
@@ -36,6 +39,9 @@ public final class KeyPrefixContainerCodec
       new KeyPrefixContainerCodec();
 
   private static final String KEY_DELIMITER = "_";
+  private static final byte[] KEY_DELIMITER_BYTES = KEY_DELIMITER.getBytes(UTF_8);
+  private static final ByteBuffer KEY_DELIMITER_BUFFER = ByteBuffer.wrap(KEY_DELIMITER_BYTES).asReadOnlyBuffer();
+  public static final int LONG_SERIALIZED_SIZE = KEY_DELIMITER_BYTES.length + Long.BYTES;
 
   public static Codec<KeyPrefixContainer> get() {
     return INSTANCE;
@@ -51,25 +57,105 @@ public Class<KeyPrefixContainer> getTypeClass() {
   }
 
   @Override
+  public boolean supportCodecBuffer() {
+    return true;
+  }
+
+  @Override
+  public CodecBuffer toCodecBuffer(@Nonnull KeyPrefixContainer object, CodecBuffer.Allocator allocator) {
+    Preconditions.checkNotNull(object, "Null object can't be converted to CodecBuffer.");
+
+    final byte[] keyPrefixBytes = object.getKeyPrefix().getBytes(UTF_8);
+    int totalSize = keyPrefixBytes.length;
+
+    if (object.getKeyVersion() != -1) {
+      totalSize += LONG_SERIALIZED_SIZE;
+
+      if (object.getContainerId() != -1) {
+        totalSize += LONG_SERIALIZED_SIZE;
+      }
+    }
+
+    final CodecBuffer buffer = allocator.apply(totalSize);
+    buffer.put(ByteBuffer.wrap(keyPrefixBytes));
+
+    if (object.getKeyVersion() != -1) {
+      buffer.put(KEY_DELIMITER_BUFFER.duplicate());
+      buffer.putLong(object.getKeyVersion());
+
+      if (object.getContainerId() != -1) {
+        buffer.put(KEY_DELIMITER_BUFFER.duplicate());
+        buffer.putLong(object.getContainerId());
+      }
+    }
+
+    return buffer;
+  }
+
+  @Override
+  public KeyPrefixContainer fromCodecBuffer(@Nonnull CodecBuffer buffer) throws CodecException {
+    final ByteBuffer byteBuffer = buffer.asReadOnlyByteBuffer();
+    final int totalLength = byteBuffer.remaining();
+
+    if (totalLength == 0) {
+      throw new CodecException("Empty buffer");
+    }
+
+    final byte[] data = new byte[totalLength];
+    byteBuffer.get(data);
+
+    int lastDelimiter = findLastDelimiter(data);
+    if (lastDelimiter == -1) {
+      return KeyPrefixContainer.get(new String(data, UTF_8));
+    }
+
+    int secondLastDelimiter = findLastDelimiter(data, lastDelimiter - 1);
+    if (secondLastDelimiter == -1) {
+      String keyPrefix = new String(data, 0, lastDelimiter, UTF_8);
+      long version = Longs.fromByteArray(ArrayUtils.subarray(data,
+          lastDelimiter + 1, lastDelimiter + 1 + Long.BYTES));
+      return KeyPrefixContainer.get(keyPrefix, version);
+    }
+
+    String keyPrefix = new String(data, 0, secondLastDelimiter, UTF_8);
+    long version = Longs.fromByteArray(ArrayUtils.subarray(data,
+        secondLastDelimiter + 1, secondLastDelimiter + 1 + Long.BYTES));
+    long containerId = Longs.fromByteArray(ArrayUtils.subarray(data,
+        lastDelimiter + 1, lastDelimiter + 1 + Long.BYTES));
+
+    return KeyPrefixContainer.get(keyPrefix, version, containerId);
+  }
+
+  private int findLastDelimiter(byte[] data) {
+    return findLastDelimiter(data, data.length - 1);
+  }
+
+  private int findLastDelimiter(byte[] data, int endPos) {
+    for (int i = endPos - Long.BYTES; i >= 0; i--) {
+      if (data[i] == '_') {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  @Override
   public byte[] toPersistedFormat(KeyPrefixContainer keyPrefixContainer) {
     Preconditions.checkNotNull(keyPrefixContainer,
             "Null object can't be converted to byte array.");
     byte[] keyPrefixBytes = keyPrefixContainer.getKeyPrefix().getBytes(UTF_8);
 
-    //Prefix seek can be done only with keyPrefix. In that case, we can
+    // Prefix seek can be done only with keyPrefix. In that case, we can
     // expect the version and the containerId to be undefined.
     if (keyPrefixContainer.getKeyVersion() != -1) {
-      keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, KEY_DELIMITER
-          .getBytes(UTF_8));
+      keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, KEY_DELIMITER_BYTES);
       keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, Longs.toByteArray(
           keyPrefixContainer.getKeyVersion()));
-    }
-
-    if (keyPrefixContainer.getContainerId() != -1) {
-      keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, KEY_DELIMITER
-          .getBytes(UTF_8));
-      keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, Longs.toByteArray(
-          keyPrefixContainer.getContainerId()));
+      if (keyPrefixContainer.getContainerId() != -1) {
+        keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, KEY_DELIMITER_BYTES);
+        keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, Longs.toByteArray(
+            keyPrefixContainer.getContainerId()));
+      }
     }
 
     return keyPrefixBytes;
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestKeyPrefixContainerCodec.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestKeyPrefixContainerCodec.java
new file mode 100644
index 0000000..c1bda21
--- /dev/null
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestKeyPrefixContainerCodec.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.spi.impl;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class to test {@link KeyPrefixContainerCodec}.
+ */
+public class TestKeyPrefixContainerCodec {
+
+  private final Codec<KeyPrefixContainer> codec = KeyPrefixContainerCodec.get();
+
+  @Test
+  public void testKeyPrefixWithDelimiter() throws Exception {
+    runTest("testKey", 123L, 456L);
+    runTest("test_key_with_underscores", 789L, 101112L);
+    runTest("test___________________________________Key", 1L, 2L);
+    runTest("", 0L, 0L);
+  }
+
+  @Test
+  public void testCodecBufferSupport() {
+    assertTrue(codec.supportCodecBuffer());
+  }
+
+  @Test
+  public void testTypeClass() {
+    assertEquals(KeyPrefixContainer.class, codec.getTypeClass());
+  }
+
+  void runTest(String keyPrefix, long version, long containerId) throws Exception {
+    final KeyPrefixContainer original = KeyPrefixContainer.get(keyPrefix, version, containerId);
+    final KeyPrefixContainer keyAndVersion = KeyPrefixContainer.get(keyPrefix, version);
+    final KeyPrefixContainer keyOnly = KeyPrefixContainer.get(keyPrefix);
+
+    final CodecBuffer.Allocator allocator = CodecBuffer.Allocator.getHeap();
+    try (CodecBuffer originalBuffer = codec.toCodecBuffer(original, allocator);
+         CodecBuffer keyOnlyBuffer = codec.toCodecBuffer(keyOnly, allocator);
+         CodecBuffer keyAndVersionBuffer = codec.toCodecBuffer(keyAndVersion, allocator)) {
+      assertEquals(original, codec.fromCodecBuffer(originalBuffer));
+      assertTrue(originalBuffer.startsWith(keyAndVersionBuffer));
+      assertTrue(originalBuffer.startsWith(keyOnlyBuffer));
+
+      final byte[] originalBytes = assertCodecBuffer(original, originalBuffer);
+      assertEquals(original, codec.fromPersistedFormat(originalBytes));
+
+      final byte[] keyAndVersionBytes = assertCodecBuffer(keyAndVersion, keyAndVersionBuffer);
+      assertPrefix(originalBytes.length - KeyPrefixContainerCodec.LONG_SERIALIZED_SIZE,
+          originalBytes, keyAndVersionBytes);
+
+      final byte[] keyOnlyBytes = assertCodecBuffer(keyOnly, keyOnlyBuffer);
+      assertPrefix(originalBytes.length - 2 * KeyPrefixContainerCodec.LONG_SERIALIZED_SIZE,
+          originalBytes, keyOnlyBytes);
+    }
+  }
+
+  static void assertPrefix(int expectedLength, byte[] array, byte[] prefix) {
+    assertEquals(expectedLength, prefix.length);
+    for (int i = 0; i < prefix.length; i++) {
+      assertEquals(array[i], prefix[i]);
+    }
+  }
+
+  byte[] assertCodecBuffer(KeyPrefixContainer original, CodecBuffer buffer) throws Exception {
+    final byte[] bytes = codec.toPersistedFormat(original);
+    assertArrayEquals(bytes, buffer.getArray());
+    return bytes;
+  }
+}