| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.uber.hoodie; |
| |
| import static com.uber.hoodie.common.table.HoodieTableMetaClient.MARKER_EXTN; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import com.uber.hoodie.common.HoodieClientTestUtils; |
| import com.uber.hoodie.common.HoodieTestDataGenerator; |
| import com.uber.hoodie.common.model.HoodieCommitMetadata; |
| import com.uber.hoodie.common.model.HoodieDataFile; |
| import com.uber.hoodie.common.model.HoodieKey; |
| import com.uber.hoodie.common.model.HoodieRecord; |
| import com.uber.hoodie.common.model.HoodieRollingStat; |
| import com.uber.hoodie.common.model.HoodieRollingStatMetadata; |
| import com.uber.hoodie.common.model.HoodieTestUtils; |
| import com.uber.hoodie.common.table.HoodieTableMetaClient; |
| import com.uber.hoodie.common.table.HoodieTimeline; |
| import com.uber.hoodie.common.table.TableFileSystemView; |
| import com.uber.hoodie.common.table.timeline.HoodieInstant; |
| import com.uber.hoodie.common.util.FSUtils; |
| import com.uber.hoodie.common.util.ParquetUtils; |
| import com.uber.hoodie.common.util.collection.Pair; |
| import com.uber.hoodie.config.HoodieCompactionConfig; |
| import com.uber.hoodie.config.HoodieStorageConfig; |
| import com.uber.hoodie.config.HoodieWriteConfig; |
| import com.uber.hoodie.exception.HoodieCommitException; |
| import com.uber.hoodie.exception.HoodieIOException; |
| import com.uber.hoodie.index.HoodieIndex; |
| import com.uber.hoodie.table.HoodieTable; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.stream.Collectors; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import scala.Option; |
| |
| @SuppressWarnings("unchecked") |
| public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { |
| |
| @Override |
| public void tearDown() throws IOException { |
| super.tearDown(); |
| } |
| |
| /** |
| * Test Auto Commit behavior for HoodieWriteClient insert API |
| */ |
| @Test |
| public void testAutoCommitOnInsert() throws Exception { |
| testAutoCommit(HoodieWriteClient::insert, false); |
| } |
| |
| /** |
| * Test Auto Commit behavior for HoodieWriteClient insertPrepped API |
| */ |
| @Test |
| public void testAutoCommitOnInsertPrepped() throws Exception { |
| testAutoCommit(HoodieWriteClient::insertPreppedRecords, true); |
| } |
| |
| /** |
| * Test Auto Commit behavior for HoodieWriteClient upsert API |
| */ |
| @Test |
| public void testAutoCommitOnUpsert() throws Exception { |
| testAutoCommit(HoodieWriteClient::upsert, false); |
| } |
| |
| /** |
| * Test Auto Commit behavior for HoodieWriteClient upsert Prepped API |
| */ |
| @Test |
| public void testAutoCommitOnUpsertPrepped() throws Exception { |
| testAutoCommit(HoodieWriteClient::upsertPreppedRecords, true); |
| } |
| |
| /** |
| * Test Auto Commit behavior for HoodieWriteClient bulk-insert API |
| */ |
| @Test |
| public void testAutoCommitOnBulkInsert() throws Exception { |
| testAutoCommit(HoodieWriteClient::bulkInsert, false); |
| } |
| |
| /** |
| * Test Auto Commit behavior for HoodieWriteClient bulk-insert prepped API |
| */ |
| @Test |
| public void testAutoCommitOnBulkInsertPrepped() throws Exception { |
| testAutoCommit((writeClient, recordRDD, commitTime) |
| -> writeClient.bulkInsertPreppedRecords(recordRDD, commitTime, Option.empty()), true); |
| } |
| |
| /** |
| * Test auto-commit by applying write function |
| * |
| * @param writeFn One of HoodieWriteClient Write API |
| * @throws Exception in case of failure |
| */ |
| private void testAutoCommit( |
| Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, |
| boolean isPrepped) throws Exception { |
| // Set autoCommit false |
| HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); |
| HoodieWriteClient client = getHoodieWriteClient(cfg); |
| |
| String prevCommitTime = "000"; |
| String newCommitTime = "001"; |
| int numRecords = 200; |
| JavaRDD<WriteStatus> result = |
| insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, writeFn, isPrepped, false, numRecords); |
| |
| assertFalse("If Autocommit is false, then commit should not be made automatically", |
| HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); |
| assertTrue("Commit should succeed", client.commit(newCommitTime, result)); |
| assertTrue("After explicit commit, commit file should be created", |
| HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); |
| } |
| |
| /** |
| * Test De-duplication behavior for HoodieWriteClient insert API |
| */ |
| @Test |
| public void testDeduplicationOnInsert() throws Exception { |
| testDeduplication(HoodieWriteClient::insert); |
| } |
| |
| /** |
| * Test De-duplication behavior for HoodieWriteClient bulk-insert API |
| */ |
| @Test |
| public void testDeduplicationOnBulkInsert() throws Exception { |
| testDeduplication(HoodieWriteClient::bulkInsert); |
| } |
| |
| /** |
| * Test De-duplication behavior for HoodieWriteClient upsert API |
| */ |
| @Test |
| public void testDeduplicationOnUpsert() throws Exception { |
| testDeduplication(HoodieWriteClient::upsert); |
| } |
| |
| /** |
| * Test Deduplication Logic for write function |
| * |
| * @param writeFn One of HoddieWriteClient non-prepped write APIs |
| * @throws Exception in case of failure |
| */ |
| private void testDeduplication( |
| Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception { |
| String newCommitTime = "001"; |
| |
| String recordKey = UUID.randomUUID().toString(); |
| HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01"); |
| HoodieRecord recordOne = new HoodieRecord(keyOne, |
| HoodieTestDataGenerator.generateRandomValue(keyOne, newCommitTime)); |
| |
| HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01"); |
| HoodieRecord recordTwo = new HoodieRecord(keyTwo, |
| HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime)); |
| |
| // Same key and partition as keyTwo |
| HoodieRecord recordThree = new HoodieRecord(keyTwo, |
| HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime)); |
| |
| JavaRDD<HoodieRecord> records = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1); |
| |
| // dedup should be done based on recordKey only |
| HoodieWriteClient clientWithDummyGlobalIndex = getWriteClientWithDummyIndex(true); |
| List<HoodieRecord> dedupedRecs = clientWithDummyGlobalIndex.deduplicateRecords(records, 1).collect(); |
| assertEquals(1, dedupedRecs.size()); |
| assertNodupesWithinPartition(dedupedRecs); |
| |
| // dedup should be done based on both recordKey and partitionPath |
| HoodieWriteClient clientWithDummyNonGlobalIndex = getWriteClientWithDummyIndex(false); |
| dedupedRecs = |
| clientWithDummyNonGlobalIndex.deduplicateRecords(records, 1).collect(); |
| assertEquals(2, dedupedRecs.size()); |
| assertNodupesWithinPartition(dedupedRecs); |
| |
| // Perform write-action and check |
| HoodieWriteClient client = new HoodieWriteClient(jsc, |
| getConfigBuilder().combineInput(true, true).build()); |
| client.startCommitWithTime(newCommitTime); |
| List<WriteStatus> statuses = writeFn.apply(client, records, newCommitTime).collect(); |
| assertNoWriteErrors(statuses); |
| assertEquals(2, statuses.size()); |
| assertNodupesWithinPartition( |
| statuses.stream().map(WriteStatus::getWrittenRecords) |
| .flatMap(Collection::stream).collect(Collectors.toList())); |
| } |
| |
| /** |
| * Build a test Hoodie WriteClient with dummy index to configure isGlobal flag |
| * |
| * @param isGlobal Flag to control HoodieIndex.isGlobal |
| * @return Hoodie Write Client |
| * @throws Exception in case of error |
| */ |
| private HoodieWriteClient getWriteClientWithDummyIndex(final boolean isGlobal) throws Exception { |
| HoodieIndex index = mock(HoodieIndex.class); |
| when(index.isGlobal()).thenReturn(isGlobal); |
| return new HoodieWriteClient(jsc, getConfigBuilder().build(), false, index); |
| } |
| |
| /** |
| * Test Upsert API |
| */ |
| @Test |
| public void testUpserts() throws Exception { |
| testUpsertsInternal(getConfig(), |
| HoodieWriteClient::upsert, false); |
| } |
| |
| /** |
| * Test UpsertPrepped API |
| */ |
| @Test |
| public void testUpsertsPrepped() throws Exception { |
| testUpsertsInternal(getConfig(), |
| HoodieWriteClient::upsertPreppedRecords, true); |
| } |
| |
| /** |
| * Test one of HoodieWriteClient upsert(Prepped) APIs |
| * |
| * @param hoodieWriteConfig Write Config |
| * @param writeFn One of Hoodie Write Function API |
| * @throws Exception in case of error |
| */ |
| private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig, |
| Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, |
| boolean isPrepped) throws Exception { |
| HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig); |
| |
| //Write 1 (only inserts) |
| String newCommitTime = "001"; |
| String initCommitTime = "000"; |
| int numRecords = 200; |
| insertFirstBatch(hoodieWriteConfig, |
| client, newCommitTime, initCommitTime, numRecords, HoodieWriteClient::insert, isPrepped, true, numRecords); |
| |
| // Write 2 (updates) |
| String prevCommitTime = newCommitTime; |
| newCommitTime = "004"; |
| numRecords = 100; |
| String commitTimeBetweenPrevAndNew = "002"; |
| updateBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime, |
| Optional.of(Arrays.asList(commitTimeBetweenPrevAndNew)), |
| initCommitTime, numRecords, writeFn, isPrepped, true, numRecords, 200, 2); |
| } |
| |
| /** |
| * Tesst deletion of records |
| */ |
| @Test |
| public void testDeletes() throws Exception { |
| HoodieWriteClient client = new HoodieWriteClient(jsc, getConfig()); |
| |
| /** |
| * Write 1 (inserts and deletes) |
| * Write actual 200 insert records and ignore 100 delete records |
| */ |
| String initCommitTime = "000"; |
| String newCommitTime = "001"; |
| |
| final List<HoodieRecord> recordsInFirstBatch = new ArrayList<>(); |
| Function2<List<HoodieRecord>, String, Integer> recordGenFunction = |
| (String commitTime, Integer numRecordsInThisCommit) -> { |
| List<HoodieRecord> fewRecordsForInsert = dataGen.generateInserts(commitTime, 200); |
| List<HoodieRecord> fewRecordsForDelete = dataGen.generateDeletes(commitTime, 100); |
| |
| recordsInFirstBatch.addAll(fewRecordsForInsert); |
| recordsInFirstBatch.addAll(fewRecordsForDelete); |
| return recordsInFirstBatch; |
| }; |
| writeBatch(client, newCommitTime, initCommitTime, Optional.empty(), initCommitTime, |
| //unused as genFn uses hard-coded number of inserts/updates/deletes |
| -1, |
| recordGenFunction, HoodieWriteClient::upsert, true, |
| 200, 200, 1); |
| |
| /** |
| * Write 2 (deletes+writes) |
| */ |
| String prevCommitTime = newCommitTime; |
| newCommitTime = "004"; |
| final List<HoodieRecord> recordsInSecondBatch = new ArrayList<>(); |
| |
| recordGenFunction = |
| (String commitTime, Integer numRecordsInThisCommit) -> { |
| List<HoodieRecord> fewRecordsForDelete = recordsInFirstBatch.subList(0, 50); |
| List<HoodieRecord> fewRecordsForUpdate = recordsInFirstBatch.subList(50, 100); |
| recordsInSecondBatch.addAll(dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete)); |
| recordsInSecondBatch.addAll(fewRecordsForUpdate); |
| return recordsInSecondBatch; |
| }; |
| writeBatch(client, newCommitTime, prevCommitTime, Optional.empty(), initCommitTime, |
| 100, recordGenFunction, HoodieWriteClient::upsert, true, |
| 50, 150, 2); |
| } |
| |
| /** |
| * Test scenario of new file-group getting added during upsert() |
| */ |
| @Test |
| public void testSmallInsertHandlingForUpserts() throws Exception { |
| final String testPartitionPath = "2016/09/26"; |
| final int insertSplitLimit = 100; |
| // setup the small file handling params |
| HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max |
| dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); |
| |
| HoodieWriteClient client = new HoodieWriteClient(jsc, config); |
| |
| // Inserts => will write file1 |
| String commitTime1 = "001"; |
| client.startCommitWithTime(commitTime1); |
| List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb |
| Set<String> keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); |
| |
| JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1); |
| List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect(); |
| |
| assertNoWriteErrors(statuses); |
| |
| assertEquals("Just 1 file needs to be added.", 1, statuses.size()); |
| String file1 = statuses.get(0).getFileId(); |
| assertEquals("file should contain 100 records", ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(), |
| new Path(basePath, statuses.get(0).getStat().getPath())).size(), 100); |
| |
| // Update + Inserts such that they just expand file1 |
| String commitTime2 = "002"; |
| client.startCommitWithTime(commitTime2); |
| List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40); |
| Set<String> keys2 = HoodieClientTestUtils.getRecordKeys(inserts2); |
| List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>(); |
| insertsAndUpdates2.addAll(inserts2); |
| insertsAndUpdates2.addAll(dataGen.generateUpdates(commitTime2, inserts1)); |
| |
| JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = jsc.parallelize(insertsAndUpdates2, 1); |
| statuses = client.upsert(insertAndUpdatesRDD2, commitTime2).collect(); |
| assertNoWriteErrors(statuses); |
| |
| assertEquals("Just 1 file needs to be updated.", 1, statuses.size()); |
| assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId()); |
| assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit()); |
| Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); |
| assertEquals("file should contain 140 records", |
| ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), 140); |
| |
| List<GenericRecord> records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile); |
| for (GenericRecord record : records) { |
| String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); |
| assertEquals("only expect commit2", commitTime2, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()); |
| assertTrue("key expected to be part of commit2", keys2.contains(recordKey) || keys1.contains(recordKey)); |
| } |
| |
| // update + inserts such that file1 is updated and expanded, a new file2 is created. |
| String commitTime3 = "003"; |
| client.startCommitWithTime(commitTime3); |
| List<HoodieRecord> insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 200); |
| Set<String> keys3 = HoodieClientTestUtils.getRecordKeys(insertsAndUpdates3); |
| List<HoodieRecord> updates3 = dataGen.generateUpdates(commitTime3, inserts2); |
| insertsAndUpdates3.addAll(updates3); |
| |
| JavaRDD<HoodieRecord> insertAndUpdatesRDD3 = jsc.parallelize(insertsAndUpdates3, 1); |
| statuses = client.upsert(insertAndUpdatesRDD3, commitTime3).collect(); |
| assertNoWriteErrors(statuses); |
| |
| assertEquals("2 files needs to be committed.", 2, statuses.size()); |
| HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); |
| |
| HoodieTable table = getHoodieTable(metadata, config); |
| TableFileSystemView.ReadOptimizedView fileSystemView = table.getROFileSystemView(); |
| List<HoodieDataFile> files = fileSystemView.getLatestDataFilesBeforeOrOn(testPartitionPath, commitTime3) |
| .collect(Collectors.toList()); |
| int numTotalInsertsInCommit3 = 0; |
| int numTotalUpdatesInCommit3 = 0; |
| for (HoodieDataFile file : files) { |
| if (file.getFileName().contains(file1)) { |
| assertEquals("Existing file should be expanded", commitTime3, file.getCommitTime()); |
| records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath())); |
| for (GenericRecord record : records) { |
| String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); |
| String recordCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); |
| if (recordCommitTime.equals(commitTime3)) { |
| if (keys2.contains(recordKey)) { |
| keys2.remove(recordKey); |
| numTotalUpdatesInCommit3++; |
| } else { |
| numTotalInsertsInCommit3++; |
| } |
| } |
| } |
| assertEquals("All keys added in commit 2 must be updated in commit3 correctly", 0, keys2.size()); |
| } else { |
| assertEquals("New file must be written for commit 3", commitTime3, file.getCommitTime()); |
| records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath())); |
| for (GenericRecord record : records) { |
| String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); |
| assertEquals("only expect commit3", commitTime3, |
| record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()); |
| assertTrue("key expected to be part of commit3", keys3.contains(recordKey)); |
| } |
| numTotalInsertsInCommit3 += records.size(); |
| } |
| } |
| assertEquals("Total updates in commit3 must add up", inserts2.size(), numTotalUpdatesInCommit3); |
| assertEquals("Total inserts in commit3 must add up", keys3.size(), numTotalInsertsInCommit3); |
| } |
| |
| /** |
| * Test scenario of new file-group getting added during insert() |
| */ |
| @Test |
| public void testSmallInsertHandlingForInserts() throws Exception { |
| |
| final String testPartitionPath = "2016/09/26"; |
| final int insertSplitLimit = 100; |
| // setup the small file handling params |
| HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max |
| dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); |
| HoodieWriteClient client = new HoodieWriteClient(jsc, config); |
| |
| // Inserts => will write file1 |
| String commitTime1 = "001"; |
| client.startCommitWithTime(commitTime1); |
| List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb |
| Set<String> keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); |
| JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1); |
| List<WriteStatus> statuses = client.insert(insertRecordsRDD1, commitTime1).collect(); |
| |
| assertNoWriteErrors(statuses); |
| assertPartitionMetadata(new String[]{testPartitionPath}, fs); |
| |
| assertEquals("Just 1 file needs to be added.", 1, statuses.size()); |
| String file1 = statuses.get(0).getFileId(); |
| assertEquals("file should contain 100 records", ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(), |
| new Path(basePath, statuses.get(0).getStat().getPath())).size(), 100); |
| |
| // Second, set of Inserts should just expand file1 |
| String commitTime2 = "002"; |
| client.startCommitWithTime(commitTime2); |
| List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40); |
| Set<String> keys2 = HoodieClientTestUtils.getRecordKeys(inserts2); |
| JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1); |
| statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); |
| assertNoWriteErrors(statuses); |
| |
| assertEquals("Just 1 file needs to be updated.", 1, statuses.size()); |
| assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId()); |
| assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit()); |
| Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); |
| assertEquals("file should contain 140 records", |
| ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), 140); |
| |
| List<GenericRecord> records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile); |
| for (GenericRecord record : records) { |
| String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); |
| String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); |
| assertTrue("Record expected to be part of commit 1 or commit2", |
| commitTime1.equals(recCommitTime) || commitTime2.equals(recCommitTime)); |
| assertTrue("key expected to be part of commit 1 or commit2", |
| keys2.contains(recordKey) || keys1.contains(recordKey)); |
| } |
| |
| // Lots of inserts such that file1 is updated and expanded, a new file2 is created. |
| String commitTime3 = "003"; |
| client.startCommitWithTime(commitTime3); |
| List<HoodieRecord> insert3 = dataGen.generateInserts(commitTime3, 200); |
| JavaRDD<HoodieRecord> insertRecordsRDD3 = jsc.parallelize(insert3, 1); |
| statuses = client.insert(insertRecordsRDD3, commitTime3).collect(); |
| assertNoWriteErrors(statuses); |
| assertEquals("2 files needs to be committed.", 2, statuses.size()); |
| |
| HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); |
| HoodieTable table = getHoodieTable(metaClient, config); |
| List<HoodieDataFile> files = table.getROFileSystemView() |
| .getLatestDataFilesBeforeOrOn(testPartitionPath, commitTime3) |
| .collect(Collectors.toList()); |
| assertEquals("Total of 2 valid data files", 2, files.size()); |
| |
| int totalInserts = 0; |
| for (HoodieDataFile file : files) { |
| assertEquals("All files must be at commit 3", commitTime3, file.getCommitTime()); |
| records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath())); |
| totalInserts += records.size(); |
| } |
| assertEquals("Total number of records must add up", totalInserts, |
| inserts1.size() + inserts2.size() + insert3.size()); |
| } |
| |
| /** |
| * Test to ensure commit metadata points to valid files |
| */ |
| @Test |
| public void testCommitWritesRelativePaths() throws Exception { |
| |
| HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); |
| HoodieWriteClient client = getHoodieWriteClient(cfg); |
| HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); |
| HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); |
| |
| String commitTime = "000"; |
| client.startCommitWithTime(commitTime); |
| |
| List<HoodieRecord> records = dataGen.generateInserts(commitTime, 200); |
| JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); |
| |
| JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, commitTime); |
| |
| assertTrue("Commit should succeed", client.commit(commitTime, result)); |
| assertTrue("After explicit commit, commit file should be created", |
| HoodieTestUtils.doesCommitExist(basePath, commitTime)); |
| |
| // Get parquet file paths from commit metadata |
| String actionType = metaClient.getCommitActionType(); |
| HoodieInstant commitInstant = new HoodieInstant(false, actionType, commitTime); |
| HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants(); |
| HoodieCommitMetadata commitMetadata = HoodieCommitMetadata |
| .fromBytes(commitTimeline.getInstantDetails(commitInstant).get(), HoodieCommitMetadata.class); |
| String basePath = table.getMetaClient().getBasePath(); |
| Collection<String> commitPathNames = commitMetadata.getFileIdAndFullPaths(basePath).values(); |
| |
| // Read from commit file |
| String filename = HoodieTestUtils.getCommitFilePath(basePath, commitTime); |
| FileInputStream inputStream = new FileInputStream(filename); |
| String everything = IOUtils.toString(inputStream); |
| HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), |
| HoodieCommitMetadata.class); |
| HashMap<String, String> paths = metadata.getFileIdAndFullPaths(basePath); |
| inputStream.close(); |
| |
| // Compare values in both to make sure they are equal. |
| for (String pathName : paths.values()) { |
| assertTrue(commitPathNames.contains(pathName)); |
| } |
| } |
| |
| /** |
| * Test to ensure commit metadata points to valid files |
| */ |
| @Test |
| public void testRollingStatsInMetadata() throws Exception { |
| |
| HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); |
| HoodieWriteClient client = getHoodieWriteClient(cfg); |
| HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); |
| HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); |
| |
| String commitTime = "000"; |
| client.startCommitWithTime(commitTime); |
| |
| List<HoodieRecord> records = dataGen.generateInserts(commitTime, 200); |
| JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); |
| |
| JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, commitTime); |
| |
| assertTrue("Commit should succeed", client.commit(commitTime, result)); |
| assertTrue("After explicit commit, commit file should be created", |
| HoodieTestUtils.doesCommitExist(basePath, commitTime)); |
| |
| // Read from commit file |
| String filename = HoodieTestUtils.getCommitFilePath(basePath, commitTime); |
| FileInputStream inputStream = new FileInputStream(filename); |
| String everything = IOUtils.toString(inputStream); |
| HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), |
| HoodieCommitMetadata.class); |
| HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromJsonString(metadata.getExtraMetadata() |
| .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY), HoodieRollingStatMetadata.class); |
| int inserts = 0; |
| for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats() |
| .entrySet()) { |
| for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) { |
| inserts += stat.getValue().getInserts(); |
| } |
| } |
| Assert.assertEquals(inserts, 200); |
| |
| // Update + Inserts such that they just expand file1 |
| commitTime = "001"; |
| client.startCommitWithTime(commitTime); |
| |
| records = dataGen.generateUpdates(commitTime, records); |
| writeRecords = jsc.parallelize(records, 1); |
| result = client.upsert(writeRecords, commitTime); |
| |
| assertTrue("Commit should succeed", client.commit(commitTime, result)); |
| assertTrue("After explicit commit, commit file should be created", |
| HoodieTestUtils.doesCommitExist(basePath, commitTime)); |
| |
| // Read from commit file |
| filename = HoodieTestUtils.getCommitFilePath(basePath, commitTime); |
| inputStream = new FileInputStream(filename); |
| everything = IOUtils.toString(inputStream); |
| metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class); |
| rollingStatMetadata = HoodieCommitMetadata.fromJsonString(metadata.getExtraMetadata() |
| .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY), HoodieRollingStatMetadata.class); |
| inserts = 0; |
| int upserts = 0; |
| for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats() |
| .entrySet()) { |
| for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) { |
| inserts += stat.getValue().getInserts(); |
| upserts += stat.getValue().getUpserts(); |
| } |
| } |
| Assert.assertEquals(inserts, 200); |
| Assert.assertEquals(upserts, 200); |
| |
| } |
| |
| /** |
| * Tests behavior of committing only when consistency is verified |
| */ |
| @Test |
| public void testConsistencyCheckDuringFinalize() throws Exception { |
| HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), |
| basePath); |
| String commitTime = "000"; |
| HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); |
| HoodieWriteClient client = getHoodieWriteClient(cfg); |
| Pair<Path, JavaRDD<WriteStatus>> result = testConsistencyCheck(metaClient, commitTime); |
| |
| // Delete orphan marker and commit should succeed |
| metaClient.getFs().delete(result.getKey(), false); |
| assertTrue("Commit should succeed", client.commit(commitTime, result.getRight())); |
| assertTrue("After explicit commit, commit file should be created", |
| HoodieTestUtils.doesCommitExist(basePath, commitTime)); |
| // Marker directory must be removed |
| assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(commitTime)))); |
| } |
| |
| @Test |
| public void testRollbackAfterConsistencyCheckFailure() throws Exception { |
| String commitTime = "000"; |
| HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); |
| HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); |
| HoodieWriteClient client = getHoodieWriteClient(cfg); |
| testConsistencyCheck(metaClient, commitTime); |
| |
| // Rollback of this commit should succeed |
| client.rollback(commitTime); |
| assertFalse("After explicit rollback, commit file should not be present", |
| HoodieTestUtils.doesCommitExist(basePath, commitTime)); |
| // Marker directory must be removed after rollback |
| assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(commitTime)))); |
| } |
| |
| private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String commitTime) |
| throws Exception { |
| HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withMaxConsistencyCheckIntervalMs(1) |
| .withInitialConsistencyCheckIntervalMs(1).build(); |
| HoodieWriteClient client = getHoodieWriteClient(cfg); |
| |
| client.startCommitWithTime(commitTime); |
| JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(commitTime, 200), 1); |
| JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, commitTime); |
| result.collect(); |
| |
| // Create a dummy marker file to simulate the case that a marker file was created without data file. |
| // This should fail the commit |
| String partitionPath = Arrays.stream(fs.globStatus(new Path(String.format("%s/*/*/*/*", |
| metaClient.getMarkerFolderPath(commitTime))), |
| path -> path.toString().endsWith(MARKER_EXTN))).limit(1) |
| .map(status -> status.getPath().getParent().toString()).collect(Collectors.toList()).get(0); |
| Path markerFilePath = new Path(String.format("%s/%s", partitionPath, |
| FSUtils.makeMarkerFile(commitTime, "1-0-1", UUID.randomUUID().toString()))); |
| metaClient.getFs().create(markerFilePath); |
| logger.info("Created a dummy marker path=" + markerFilePath); |
| |
| try { |
| client.commit(commitTime, result); |
| fail("Commit should fail due to consistency check"); |
| } catch (HoodieCommitException cme) { |
| assertTrue(cme.getCause() instanceof HoodieIOException); |
| } |
| return Pair.of(markerFilePath, result); |
| } |
| |
| /** |
| * Build Hoodie Write Config for small data file sizes |
| */ |
| private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) { |
| HoodieWriteConfig.Builder builder = getConfigBuilder(); |
| return builder.withCompactionConfig( |
| HoodieCompactionConfig.newBuilder().compactionSmallFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 15) |
| .insertSplitSize(insertSplitSize).build()) // tolerate upto 15 records |
| .withStorageConfig( |
| HoodieStorageConfig.newBuilder().limitFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 20).build()) |
| .build(); |
| } |
| } |