HDDS-13224. Support CodecBuffer for KeyPrefixContainerCodec (#9061)
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/KeyPrefixContainerCodec.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/KeyPrefixContainerCodec.java
index 52a6998..2e42057 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/KeyPrefixContainerCodec.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/KeyPrefixContainerCodec.java
@@ -21,9 +21,12 @@
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
+import jakarta.annotation.Nonnull;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecException;
import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
/**
@@ -36,6 +39,9 @@ public final class KeyPrefixContainerCodec
new KeyPrefixContainerCodec();
private static final String KEY_DELIMITER = "_";
+ private static final byte[] KEY_DELIMITER_BYTES = KEY_DELIMITER.getBytes(UTF_8);
+ private static final ByteBuffer KEY_DELIMITER_BUFFER = ByteBuffer.wrap(KEY_DELIMITER_BYTES).asReadOnlyBuffer();
+ public static final int LONG_SERIALIZED_SIZE = KEY_DELIMITER_BYTES.length + Long.BYTES;
public static Codec<KeyPrefixContainer> get() {
return INSTANCE;
@@ -51,25 +57,105 @@ public Class<KeyPrefixContainer> getTypeClass() {
}
@Override
+ public boolean supportCodecBuffer() {
+ return true;
+ }
+
+ @Override
+ public CodecBuffer toCodecBuffer(@Nonnull KeyPrefixContainer object, CodecBuffer.Allocator allocator) {
+ Preconditions.checkNotNull(object, "Null object can't be converted to CodecBuffer.");
+
+ final byte[] keyPrefixBytes = object.getKeyPrefix().getBytes(UTF_8);
+ int totalSize = keyPrefixBytes.length;
+
+ if (object.getKeyVersion() != -1) {
+ totalSize += LONG_SERIALIZED_SIZE;
+
+ if (object.getContainerId() != -1) {
+ totalSize += LONG_SERIALIZED_SIZE;
+ }
+ }
+
+ final CodecBuffer buffer = allocator.apply(totalSize);
+ buffer.put(ByteBuffer.wrap(keyPrefixBytes));
+
+ if (object.getKeyVersion() != -1) {
+ buffer.put(KEY_DELIMITER_BUFFER.duplicate());
+ buffer.putLong(object.getKeyVersion());
+
+ if (object.getContainerId() != -1) {
+ buffer.put(KEY_DELIMITER_BUFFER.duplicate());
+ buffer.putLong(object.getContainerId());
+ }
+ }
+
+ return buffer;
+ }
+
+ @Override
+ public KeyPrefixContainer fromCodecBuffer(@Nonnull CodecBuffer buffer) throws CodecException {
+ final ByteBuffer byteBuffer = buffer.asReadOnlyByteBuffer();
+ final int totalLength = byteBuffer.remaining();
+
+ if (totalLength == 0) {
+ throw new CodecException("Empty buffer");
+ }
+
+ final byte[] data = new byte[totalLength];
+ byteBuffer.get(data);
+
+ int lastDelimiter = findLastDelimiter(data);
+ if (lastDelimiter == -1) {
+ return KeyPrefixContainer.get(new String(data, UTF_8));
+ }
+
+ int secondLastDelimiter = findLastDelimiter(data, lastDelimiter - 1);
+ if (secondLastDelimiter == -1) {
+ String keyPrefix = new String(data, 0, lastDelimiter, UTF_8);
+ long version = Longs.fromByteArray(ArrayUtils.subarray(data,
+ lastDelimiter + 1, lastDelimiter + 1 + Long.BYTES));
+ return KeyPrefixContainer.get(keyPrefix, version);
+ }
+
+ String keyPrefix = new String(data, 0, secondLastDelimiter, UTF_8);
+ long version = Longs.fromByteArray(ArrayUtils.subarray(data,
+ secondLastDelimiter + 1, secondLastDelimiter + 1 + Long.BYTES));
+ long containerId = Longs.fromByteArray(ArrayUtils.subarray(data,
+ lastDelimiter + 1, lastDelimiter + 1 + Long.BYTES));
+
+ return KeyPrefixContainer.get(keyPrefix, version, containerId);
+ }
+
+ private int findLastDelimiter(byte[] data) {
+ return findLastDelimiter(data, data.length - 1);
+ }
+
+ private int findLastDelimiter(byte[] data, int endPos) {
+ for (int i = endPos - Long.BYTES; i >= 0; i--) {
+ if (data[i] == '_') {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ @Override
public byte[] toPersistedFormat(KeyPrefixContainer keyPrefixContainer) {
Preconditions.checkNotNull(keyPrefixContainer,
"Null object can't be converted to byte array.");
byte[] keyPrefixBytes = keyPrefixContainer.getKeyPrefix().getBytes(UTF_8);
- //Prefix seek can be done only with keyPrefix. In that case, we can
+ // Prefix seek can be done only with keyPrefix. In that case, we can
// expect the version and the containerId to be undefined.
if (keyPrefixContainer.getKeyVersion() != -1) {
- keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, KEY_DELIMITER
- .getBytes(UTF_8));
+ keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, KEY_DELIMITER_BYTES);
keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, Longs.toByteArray(
keyPrefixContainer.getKeyVersion()));
- }
-
- if (keyPrefixContainer.getContainerId() != -1) {
- keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, KEY_DELIMITER
- .getBytes(UTF_8));
- keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, Longs.toByteArray(
- keyPrefixContainer.getContainerId()));
+ if (keyPrefixContainer.getContainerId() != -1) {
+ keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, KEY_DELIMITER_BYTES);
+ keyPrefixBytes = ArrayUtils.addAll(keyPrefixBytes, Longs.toByteArray(
+ keyPrefixContainer.getContainerId()));
+ }
}
return keyPrefixBytes;
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestKeyPrefixContainerCodec.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestKeyPrefixContainerCodec.java
new file mode 100644
index 0000000..c1bda21
--- /dev/null
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestKeyPrefixContainerCodec.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.spi.impl;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class to test {@link KeyPrefixContainerCodec}.
+ */
+public class TestKeyPrefixContainerCodec {
+
+ private final Codec<KeyPrefixContainer> codec = KeyPrefixContainerCodec.get();
+
+ @Test
+ public void testKeyPrefixWithDelimiter() throws Exception {
+ runTest("testKey", 123L, 456L);
+ runTest("test_key_with_underscores", 789L, 101112L);
+ runTest("test___________________________________Key", 1L, 2L);
+ runTest("", 0L, 0L);
+ }
+
+ @Test
+ public void testCodecBufferSupport() {
+ assertTrue(codec.supportCodecBuffer());
+ }
+
+ @Test
+ public void testTypeClass() {
+ assertEquals(KeyPrefixContainer.class, codec.getTypeClass());
+ }
+
+ void runTest(String keyPrefix, long version, long containerId) throws Exception {
+ final KeyPrefixContainer original = KeyPrefixContainer.get(keyPrefix, version, containerId);
+ final KeyPrefixContainer keyAndVersion = KeyPrefixContainer.get(keyPrefix, version);
+ final KeyPrefixContainer keyOnly = KeyPrefixContainer.get(keyPrefix);
+
+ final CodecBuffer.Allocator allocator = CodecBuffer.Allocator.getHeap();
+ try (CodecBuffer originalBuffer = codec.toCodecBuffer(original, allocator);
+ CodecBuffer keyOnlyBuffer = codec.toCodecBuffer(keyOnly, allocator);
+ CodecBuffer keyAndVersionBuffer = codec.toCodecBuffer(keyAndVersion, allocator)) {
+ assertEquals(original, codec.fromCodecBuffer(originalBuffer));
+ assertTrue(originalBuffer.startsWith(keyAndVersionBuffer));
+ assertTrue(originalBuffer.startsWith(keyOnlyBuffer));
+
+ final byte[] originalBytes = assertCodecBuffer(original, originalBuffer);
+ assertEquals(original, codec.fromPersistedFormat(originalBytes));
+
+ final byte[] keyAndVersionBytes = assertCodecBuffer(keyAndVersion, keyAndVersionBuffer);
+ assertPrefix(originalBytes.length - KeyPrefixContainerCodec.LONG_SERIALIZED_SIZE,
+ originalBytes, keyAndVersionBytes);
+
+ final byte[] keyOnlyBytes = assertCodecBuffer(keyOnly, keyOnlyBuffer);
+ assertPrefix(originalBytes.length - 2 * KeyPrefixContainerCodec.LONG_SERIALIZED_SIZE,
+ originalBytes, keyOnlyBytes);
+ }
+ }
+
+ static void assertPrefix(int expectedLength, byte[] array, byte[] prefix) {
+ assertEquals(expectedLength, prefix.length);
+ for (int i = 0; i < prefix.length; i++) {
+ assertEquals(array[i], prefix[i]);
+ }
+ }
+
+ byte[] assertCodecBuffer(KeyPrefixContainer original, CodecBuffer buffer) throws Exception {
+ final byte[] bytes = codec.toPersistedFormat(original);
+ assertArrayEquals(bytes, buffer.getArray());
+ return bytes;
+ }
+}