| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.index.bloom; |
| |
| import org.apache.hudi.common.HoodieClientTestHarness; |
| import org.apache.hudi.common.HoodieClientTestUtils; |
| import org.apache.hudi.common.TestRawTripPayload; |
| import org.apache.hudi.common.bloom.filter.BloomFilter; |
| import org.apache.hudi.common.bloom.filter.BloomFilterFactory; |
| import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode; |
| import org.apache.hudi.common.model.HoodieKey; |
| import org.apache.hudi.common.model.HoodieRecord; |
| import org.apache.hudi.common.table.HoodieTableMetaClient; |
| import org.apache.hudi.common.util.FSUtils; |
| import org.apache.hudi.common.util.FileIOUtils; |
| import org.apache.hudi.common.util.HoodieAvroUtils; |
| import org.apache.hudi.common.util.Option; |
| import org.apache.hudi.common.util.collection.Pair; |
| import org.apache.hudi.config.HoodieIndexConfig; |
| import org.apache.hudi.config.HoodieWriteConfig; |
| import org.apache.hudi.io.HoodieKeyLookupHandle; |
| import org.apache.hudi.table.HoodieTable; |
| |
| import org.apache.avro.Schema; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.spark.api.java.JavaPairRDD; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.stream.Collectors; |
| |
| import scala.Tuple2; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| @RunWith(Parameterized.class) |
| public class TestHoodieBloomIndex extends HoodieClientTestHarness { |
| |
| private String schemaStr; |
| private Schema schema; |
| |
| private boolean rangePruning; |
| private boolean treeFiltering; |
| private boolean bucketizedChecking; |
| |
| @Parameterized.Parameters(name = "{index}: Test with rangePruning={0}, treeFiltering ={1}, bucketizedChecking is:{2}") |
| public static Collection<Object[]> data() { |
| Object[][] data = |
| new Object[][] {{true, true, true}, {false, true, true}, {true, true, false}, {true, false, true}}; |
| return Arrays.asList(data); |
| } |
| |
| public TestHoodieBloomIndex(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) { |
| this.rangePruning = rangePruning; |
| this.treeFiltering = treeFiltering; |
| this.bucketizedChecking = bucketizedChecking; |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| initSparkContexts("TestHoodieBloomIndex"); |
| initPath(); |
| initFileSystem(); |
| // We have some records to be tagged (two different partitions) |
| schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); |
| schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); |
| initMetaClient(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| cleanupResources(); |
| } |
| |
| private HoodieWriteConfig makeConfig() { |
| return HoodieWriteConfig.newBuilder().withPath(basePath) |
| .withIndexConfig(HoodieIndexConfig.newBuilder().bloomIndexPruneByRanges(rangePruning) |
| .bloomIndexTreebasedFilter(treeFiltering).bloomIndexBucketizedChecking(bucketizedChecking) |
| .bloomIndexKeysPerBucket(2).build()) |
| .build(); |
| } |
| |
| @Test |
| public void testLoadInvolvedFiles() throws IOException { |
| HoodieWriteConfig config = makeConfig(); |
| HoodieBloomIndex index = new HoodieBloomIndex(config); |
| |
| // Create some partitions, and put some files |
| // "2016/01/21": 0 file |
| // "2016/04/01": 1 file (2_0_20160401010101.parquet) |
| // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, |
| // 4_0_20150312101010.parquet) |
| new File(basePath + "/2016/01/21").mkdirs(); |
| new File(basePath + "/2016/04/01").mkdirs(); |
| new File(basePath + "/2015/03/12").mkdirs(); |
| |
| TestRawTripPayload rowChange1 = |
| new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); |
| HoodieRecord record1 = |
| new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); |
| TestRawTripPayload rowChange2 = |
| new TestRawTripPayload("{\"_row_key\":\"001\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); |
| HoodieRecord record2 = |
| new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); |
| TestRawTripPayload rowChange3 = |
| new TestRawTripPayload("{\"_row_key\":\"002\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); |
| HoodieRecord record3 = |
| new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3); |
| TestRawTripPayload rowChange4 = |
| new TestRawTripPayload("{\"_row_key\":\"003\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); |
| HoodieRecord record4 = |
| new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); |
| |
| HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet", new ArrayList<>(), |
| schema, null, false); |
| HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet", new ArrayList<>(), |
| schema, null, false); |
| HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet", Collections.singletonList(record1), |
| schema, null, false); |
| HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet", |
| Arrays.asList(record2, record3, record4), schema, null, false); |
| |
| List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12"); |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); |
| List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table); |
| // Still 0, as no valid commit |
| assertEquals(filesList.size(), 0); |
| |
| // Add some commits |
| new File(basePath + "/.hoodie").mkdirs(); |
| new File(basePath + "/.hoodie/20160401010101.commit").createNewFile(); |
| new File(basePath + "/.hoodie/20150312101010.commit").createNewFile(); |
| |
| table = HoodieTable.getHoodieTable(metaClient, config, jsc); |
| filesList = index.loadInvolvedFiles(partitions, jsc, table); |
| assertEquals(filesList.size(), 4); |
| |
| if (rangePruning) { |
| // these files will not have the key ranges |
| assertNull(filesList.get(0)._2().getMaxRecordKey()); |
| assertNull(filesList.get(0)._2().getMinRecordKey()); |
| assertFalse(filesList.get(1)._2().hasKeyRanges()); |
| assertNotNull(filesList.get(2)._2().getMaxRecordKey()); |
| assertNotNull(filesList.get(2)._2().getMinRecordKey()); |
| assertTrue(filesList.get(3)._2().hasKeyRanges()); |
| |
| // no longer sorted, but should have same files. |
| |
| List<Tuple2<String, BloomIndexFileInfo>> expected = |
| Arrays.asList(new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")), |
| new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")), |
| new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000", "000")), |
| new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001", "003"))); |
| assertEquals(expected, filesList); |
| } |
| } |
| |
| @Test |
| public void testRangePruning() { |
| HoodieWriteConfig config = makeConfig(); |
| HoodieBloomIndex index = new HoodieBloomIndex(config); |
| |
| final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo = new HashMap<>(); |
| partitionToFileIndexInfo.put("2017/10/22", |
| Arrays.asList(new BloomIndexFileInfo("f1"), new BloomIndexFileInfo("f2", "000", "000"), |
| new BloomIndexFileInfo("f3", "001", "003"), new BloomIndexFileInfo("f4", "002", "007"), |
| new BloomIndexFileInfo("f5", "009", "010"))); |
| |
| JavaPairRDD<String, String> partitionRecordKeyPairRDD = |
| jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), |
| new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t); |
| |
| List<Tuple2<String, HoodieKey>> comparisonKeyList = |
| index.explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect(); |
| |
| assertEquals(10, comparisonKeyList.size()); |
| Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream() |
| .collect(Collectors.groupingBy(t -> t._2.getRecordKey(), Collectors.mapping(t -> t._1, Collectors.toList()))); |
| |
| assertEquals(4, recordKeyToFileComps.size()); |
| assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002"))); |
| assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("003"))); |
| assertEquals(new HashSet<>(Arrays.asList("f1", "f4")), new HashSet<>(recordKeyToFileComps.get("004"))); |
| assertEquals(new HashSet<>(Arrays.asList("f1", "f4")), new HashSet<>(recordKeyToFileComps.get("005"))); |
| } |
| |
| @Test |
| public void testCheckUUIDsAgainstOneFile() throws IOException, InterruptedException { |
| |
| // Create some records to use |
| String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," |
| + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; |
| String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," |
| + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; |
| String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," |
| + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; |
| String recordStr4 = "{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," |
| + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":32}"; |
| TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); |
| HoodieRecord record1 = |
| new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); |
| TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); |
| HoodieRecord record2 = |
| new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); |
| TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); |
| HoodieRecord record3 = |
| new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3); |
| TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); |
| HoodieRecord record4 = |
| new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); |
| |
| // We write record1, record2 to a parquet file, but the bloom filter contains (record1, |
| // record2, record3). |
| BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); |
| filter.add(record3.getRecordKey()); |
| String filename = HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1, record2), |
| schema, filter, true); |
| |
| // The bloom filter contains 3 records |
| assertTrue(filter.mightContain(record1.getRecordKey())); |
| assertTrue(filter.mightContain(record2.getRecordKey())); |
| assertTrue(filter.mightContain(record3.getRecordKey())); |
| assertFalse(filter.mightContain(record4.getRecordKey())); |
| |
| // Compare with file |
| List<String> uuids = |
| Arrays.asList(record1.getRecordKey(), record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey()); |
| |
| List<String> results = HoodieKeyLookupHandle.checkCandidatesAgainstFile(jsc.hadoopConfiguration(), uuids, |
| new Path(basePath + "/2016/01/31/" + filename)); |
| assertEquals(results.size(), 2); |
| assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0") |
| || results.get(1).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")); |
| assertTrue(results.get(0).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0") |
| || results.get(1).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")); |
| // TODO(vc): Need more coverage on actual filenames |
| // assertTrue(results.get(0)._2().equals(filename)); |
| // assertTrue(results.get(1)._2().equals(filename)); |
| } |
| |
| @Test |
| public void testTagLocationWithEmptyRDD() { |
| // We have some records to be tagged (two different partitions) |
| JavaRDD<HoodieRecord> recordRDD = jsc.emptyRDD(); |
| // Also create the metadata and config |
| HoodieWriteConfig config = makeConfig(); |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); |
| |
| // Let's tag |
| HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); |
| |
| try { |
| bloomIndex.tagLocation(recordRDD, jsc, table); |
| } catch (IllegalArgumentException e) { |
| fail("EmptyRDD should not result in IllegalArgumentException: Positive number of slices required"); |
| } |
| } |
| |
| @Test |
| public void testTagLocation() throws Exception { |
| // We have some records to be tagged (two different partitions) |
| String rowKey1 = UUID.randomUUID().toString(); |
| String rowKey2 = UUID.randomUUID().toString(); |
| String rowKey3 = UUID.randomUUID().toString(); |
| String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; |
| String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; |
| String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; |
| // place same row key under a different partition. |
| String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; |
| TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); |
| HoodieRecord record1 = |
| new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); |
| TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); |
| HoodieRecord record2 = |
| new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); |
| TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); |
| HoodieRecord record3 = |
| new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3); |
| TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); |
| HoodieRecord record4 = |
| new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); |
| JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); |
| |
| // Also create the metadata and config |
| HoodieWriteConfig config = makeConfig(); |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); |
| |
| // Let's tag |
| HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); |
| JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); |
| |
| // Should not find any files |
| for (HoodieRecord record : taggedRecordRDD.collect()) { |
| assertFalse(record.isCurrentLocationKnown()); |
| } |
| |
| // We create three parquet file, each having one record. (two different partitions) |
| String filename1 = |
| HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, null, true); |
| String filename2 = |
| HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), schema, null, true); |
| String filename3 = |
| HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), schema, null, true); |
| |
| // We do the tag again |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| table = HoodieTable.getHoodieTable(metaClient, config, jsc); |
| |
| taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); |
| |
| // Check results |
| for (HoodieRecord record : taggedRecordRDD.collect()) { |
| if (record.getRecordKey().equals(rowKey1)) { |
| if (record.getPartitionPath().equals("2015/01/31")) { |
| assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3)); |
| } else { |
| assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename1)); |
| } |
| } else if (record.getRecordKey().equals(rowKey2)) { |
| assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2)); |
| } else if (record.getRecordKey().equals(rowKey3)) { |
| assertFalse(record.isCurrentLocationKnown()); |
| } |
| } |
| } |
| |
| @Test |
| public void testCheckExists() throws Exception { |
| // We have some records to be tagged (two different partitions) |
| |
| String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," |
| + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; |
| String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," |
| + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; |
| String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," |
| + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; |
| // record key same as recordStr2 |
| String recordStr4 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," |
| + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; |
| TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); |
| HoodieKey key1 = new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()); |
| HoodieRecord record1 = new HoodieRecord(key1, rowChange1); |
| TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); |
| HoodieKey key2 = new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()); |
| HoodieRecord record2 = new HoodieRecord(key2, rowChange2); |
| TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); |
| HoodieKey key3 = new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()); |
| TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); |
| HoodieKey key4 = new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()); |
| HoodieRecord record4 = new HoodieRecord(key4, rowChange4); |
| JavaRDD<HoodieKey> keysRDD = jsc.parallelize(Arrays.asList(key1, key2, key3, key4)); |
| |
| // Also create the metadata and config |
| HoodieWriteConfig config = makeConfig(); |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); |
| |
| // Let's tag |
| HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); |
| JavaPairRDD<HoodieKey, Option<Pair<String, String>>> taggedRecordRDD = |
| bloomIndex.fetchRecordLocation(keysRDD, jsc, table); |
| |
| // Should not find any files |
| for (Tuple2<HoodieKey, Option<Pair<String, String>>> record : taggedRecordRDD.collect()) { |
| assertTrue(!record._2.isPresent()); |
| } |
| |
| // We create three parquet file, each having one record. (two different partitions) |
| String filename1 = |
| HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, null, true); |
| String filename2 = |
| HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), schema, null, true); |
| String filename3 = |
| HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), schema, null, true); |
| |
| // We do the tag again |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| table = HoodieTable.getHoodieTable(metaClient, config, jsc); |
| taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table); |
| |
| // Check results |
| for (Tuple2<HoodieKey, Option<Pair<String, String>>> record : taggedRecordRDD.collect()) { |
| if (record._1.getRecordKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) { |
| assertTrue(record._2.isPresent()); |
| assertEquals(FSUtils.getFileId(filename1), record._2.get().getRight()); |
| } else if (record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) { |
| assertTrue(record._2.isPresent()); |
| if (record._1.getPartitionPath().equals("2015/01/31")) { |
| assertEquals(FSUtils.getFileId(filename3), record._2.get().getRight()); |
| } else { |
| assertEquals(FSUtils.getFileId(filename2), record._2.get().getRight()); |
| } |
| } else if (record._1.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) { |
| assertFalse(record._2.isPresent()); |
| } |
| } |
| } |
| |
| @Test |
| public void testBloomFilterFalseError() throws IOException, InterruptedException { |
| // We have two hoodie records |
| String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," |
| + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; |
| String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," |
| + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; |
| |
| // We write record1 to a parquet file, using a bloom filter having both records |
| TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); |
| HoodieRecord record1 = |
| new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); |
| TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); |
| HoodieRecord record2 = |
| new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); |
| |
| BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, |
| BloomFilterTypeCode.SIMPLE.name()); |
| filter.add(record2.getRecordKey()); |
| String filename = |
| HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, filter, true); |
| assertTrue(filter.mightContain(record1.getRecordKey())); |
| assertTrue(filter.mightContain(record2.getRecordKey())); |
| |
| // We do the tag |
| JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2)); |
| HoodieWriteConfig config = makeConfig(); |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); |
| |
| HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); |
| JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); |
| |
| // Check results |
| for (HoodieRecord record : taggedRecordRDD.collect()) { |
| if (record.getKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) { |
| assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename)); |
| } else if (record.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) { |
| assertFalse(record.isCurrentLocationKnown()); |
| } |
| } |
| } |
| |
| } |