blob: 5f30fa15bdaa06b79fe8b877b1a66c962cac6cd2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.util.collection;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Tests RocksDB manager {@link RocksDBDAO}.
*/
public class TestRocksDBManager {
private RocksDBDAO dbManager;
@Before
public void setUpClass() {
dbManager = new RocksDBDAO("/dummy/path/" + UUID.randomUUID().toString(),
FileSystemViewStorageConfig.newBuilder().build().newBuilder().build().getRocksdbBasePath());
}
@After
public void tearDownClass() {
if (dbManager != null) {
dbManager.close();
dbManager = null;
}
}
@Test
public void testRocksDBManager() {
String prefix1 = "prefix1_";
String prefix2 = "prefix2_";
String prefix3 = "prefix3_";
String prefix4 = "prefix4_";
List<String> prefixes = Arrays.asList(prefix1, prefix2, prefix3, prefix4);
String family1 = "family1";
String family2 = "family2";
List<String> colFamilies = Arrays.asList(family1, family2);
final List<Payload<String>> payloads = new ArrayList<>();
IntStream.range(0, 100).forEach(index -> {
String prefix = prefixes.get(index % 4);
String key = prefix + UUID.randomUUID().toString();
String family = colFamilies.get(index % 2);
String val = "VALUE_" + UUID.randomUUID().toString();
payloads.add(new Payload(prefix, key, val, family));
});
colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
colFamilies.forEach(family -> dbManager.addColumnFamily(family));
colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
colFamilies.forEach(family -> dbManager.addColumnFamily(family));
Map<String, Map<String, Integer>> countsMap = new HashMap<>();
payloads.forEach(payload -> {
dbManager.put(payload.getFamily(), payload.getKey(), payload);
if (!countsMap.containsKey(payload.family)) {
countsMap.put(payload.family, new HashMap<>());
}
Map<String, Integer> c = countsMap.get(payload.family);
if (!c.containsKey(payload.prefix)) {
c.put(payload.prefix, 0);
}
int currCount = c.get(payload.prefix);
c.put(payload.prefix, currCount + 1);
});
colFamilies.forEach(family -> {
prefixes.forEach(prefix -> {
List<Pair<String, Payload>> gotPayloads =
dbManager.<Payload>prefixSearch(family, prefix).collect(Collectors.toList());
Integer expCount = countsMap.get(family).get(prefix);
Assert.assertEquals("Size check for prefix (" + prefix + ") and family (" + family + ")",
expCount == null ? 0L : expCount.longValue(), gotPayloads.size());
gotPayloads.forEach(p -> {
Assert.assertEquals(p.getRight().getFamily(), family);
Assert.assertTrue(p.getRight().getKey().toString().startsWith(prefix));
});
});
});
payloads.stream().filter(p -> !p.getPrefix().equalsIgnoreCase(prefix1)).forEach(payload -> {
Payload p = dbManager.get(payload.getFamily(), payload.getKey());
Assert.assertEquals("Retrieved correct payload for key :" + payload.getKey(), payload, p);
dbManager.delete(payload.getFamily(), payload.getKey());
Payload p2 = dbManager.get(payload.getFamily(), payload.getKey());
Assert.assertNull("Retrieved correct payload for key :" + payload.getKey(), p2);
});
colFamilies.forEach(family -> {
dbManager.prefixDelete(family, prefix1);
int got = dbManager.prefixSearch(family, prefix1).collect(Collectors.toList()).size();
Assert.assertEquals("Expected prefix delete to leave at least one item for family: " + family, countsMap.get(family).get(prefix1) == null ? 0 : 1, got);
});
payloads.stream().filter(p -> !p.getPrefix().equalsIgnoreCase(prefix1)).forEach(payload -> {
Payload p2 = dbManager.get(payload.getFamily(), payload.getKey());
Assert.assertNull("Retrieved correct payload for key :" + payload.getKey(), p2);
});
// Now do a prefix search
colFamilies.forEach(family -> {
prefixes.stream().filter(p -> !p.equalsIgnoreCase(prefix1)).forEach(prefix -> {
List<Pair<String, Payload>> gotPayloads =
dbManager.<Payload>prefixSearch(family, prefix).collect(Collectors.toList());
Assert.assertEquals("Size check for prefix (" + prefix + ") and family (" + family + ")", 0,
gotPayloads.size());
});
});
String rocksDBBasePath = dbManager.getRocksDBBasePath();
dbManager.close();
Assert.assertFalse(new File(rocksDBBasePath).exists());
}
@Test
public void testWithSerializableKey() {
String prefix1 = "prefix1_";
String prefix2 = "prefix2_";
String prefix3 = "prefix3_";
String prefix4 = "prefix4_";
List<String> prefixes = Arrays.asList(prefix1, prefix2, prefix3, prefix4);
String family1 = "family1";
String family2 = "family2";
List<String> colFamilies = Arrays.asList(family1, family2);
final List<Payload<PayloadKey>> payloads = new ArrayList<>();
IntStream.range(0, 100).forEach(index -> {
String prefix = prefixes.get(index % 4);
String key = prefix + UUID.randomUUID().toString();
String family = colFamilies.get(index % 2);
String val = "VALUE_" + UUID.randomUUID().toString();
payloads.add(new Payload(prefix, new PayloadKey((key)), val, family));
});
colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
colFamilies.forEach(family -> dbManager.addColumnFamily(family));
Map<String, Map<String, Integer>> countsMap = new HashMap<>();
dbManager.writeBatch(batch -> {
payloads.forEach(payload -> {
dbManager.putInBatch(batch, payload.getFamily(), payload.getKey(), payload);
if (!countsMap.containsKey(payload.family)) {
countsMap.put(payload.family, new HashMap<>());
}
Map<String, Integer> c = countsMap.get(payload.family);
if (!c.containsKey(payload.prefix)) {
c.put(payload.prefix, 0);
}
int currCount = c.get(payload.prefix);
c.put(payload.prefix, currCount + 1);
});
});
Iterator<List<Payload<PayloadKey>>> payloadSplits = payloads.stream()
.collect(Collectors.partitioningBy(s -> payloads.indexOf(s) > payloads.size() / 2)).values()
.iterator();
payloads.forEach(payload -> {
Payload p = dbManager.get(payload.getFamily(), payload.getKey());
Assert.assertEquals("Retrieved correct payload for key :" + payload.getKey(), payload, p);
});
payloadSplits.next().forEach(payload -> {
dbManager.delete(payload.getFamily(), payload.getKey());
Payload want = dbManager.get(payload.getFamily(), payload.getKey());
Assert.assertNull("Verify deleted during single delete for key :" + payload.getKey(), want);
});
dbManager.writeBatch(batch -> {
payloadSplits.next().forEach(payload -> {
dbManager.deleteInBatch(batch, payload.getFamily(), payload.getKey());
Payload want = dbManager.get(payload.getFamily(), payload.getKey());
Assert.assertEquals("Verify not deleted during batch delete in progress for key :" + payload.getKey(), payload, want);
});
});
payloads.forEach(payload -> {
Payload want = dbManager.get(payload.getFamily(), payload.getKey());
Assert.assertNull("Verify delete for key :" + payload.getKey(), want);
});
// Now do a prefix search
colFamilies.forEach(family -> {
prefixes.forEach(prefix -> {
List<Pair<String, Payload>> gotPayloads =
dbManager.<Payload>prefixSearch(family, prefix).collect(Collectors.toList());
Assert.assertEquals("Size check for prefix (" + prefix + ") and family (" + family + ")", 0,
gotPayloads.size());
});
});
String rocksDBBasePath = dbManager.getRocksDBBasePath();
dbManager.close();
Assert.assertFalse(new File(rocksDBBasePath).exists());
}
public static class PayloadKey implements Serializable {
private String key;
public PayloadKey(String key) {
this.key = key;
}
@Override
public String toString() {
return key;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PayloadKey that = (PayloadKey) o;
return Objects.equals(key, that.key);
}
@Override
public int hashCode() {
return Objects.hash(key);
}
}
/**
* A payload definition for {@link TestRocksDBManager}.
*/
public static class Payload<T> implements Serializable {
private final String prefix;
private final T key;
private final String val;
private final String family;
public Payload(String prefix, T key, String val, String family) {
this.prefix = prefix;
this.key = key;
this.val = val;
this.family = family;
}
public String getPrefix() {
return prefix;
}
public T getKey() {
return key;
}
public String getVal() {
return val;
}
public String getFamily() {
return family;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Payload payload = (Payload) o;
return Objects.equals(prefix, payload.prefix) && Objects.equals(key, payload.key)
&& Objects.equals(val, payload.val) && Objects.equals(family, payload.family);
}
@Override
public int hashCode() {
return Objects.hash(prefix, key, val, family);
}
}
}