blob: 3cbc48e7b80c79221d77545e9ca213c7c9fb9a18 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieMergeOnReadTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRollingStat;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.TableFileSystemView.SliceView;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestMergeOnReadTable extends HoodieClientTestHarness {
@Before
public void init() throws IOException {
initDFS();
initSparkContexts("TestHoodieMergeOnReadTable");
jsc.hadoopConfiguration().addResource(dfs.getConf());
initPath();
dfs.mkdirs(new Path(basePath));
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
initTestDataGenerator();
}
@After
public void clean() throws IOException {
cleanupResources();
}
@Test
public void testSimpleInsertAndUpdate() throws Exception {
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
Assert.assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
/**
* Write 2 (updates)
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
Map<HoodieKey, HoodieRecord> recordsMap = new HashMap<>();
for (HoodieRecord rec : records) {
if (!recordsMap.containsKey(rec.getKey())) {
recordsMap.put(rec.getKey(), rec);
}
}
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
client.compact(compactionCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals("Expecting a single commit.", 1,
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
assertTrue(HoodieTimeline.compareTimestamps("000", latestCompactionCommitTime, HoodieTimeline.LESSER));
assertEquals("Must contain 200 records", 200,
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count());
}
}
// Check if record level metadata is aggregated properly at the end of write.
@Test
public void testMetadataAggregateFromWriteStatus() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
String newCommitTime = "001";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
Map<String, String> allWriteStatusMergedMetadataMap =
MetadataMergeWriteStatus.mergeMetadataForWriteStatuses(statuses);
assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
// For metadata key InputRecordCount_1506582000, value is 2 for each record. So sum of this
// should be 2 * records.size()
assertEquals(String.valueOf(2 * records.size()),
allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000"));
}
}
@Test
public void testSimpleInsertUpdateAndDelete() throws Exception {
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
/**
* Write 1 (only inserts, written as parquet file)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
/**
* Write 2 (only updates, written to .log file)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, records);
writeRecords = jsc.parallelize(records, 1);
statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
/**
* Write 2 (only deletes, written to .log file)
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records);
statuses = client.upsert(jsc.parallelize(fewRecordsForDelete, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals("Must contain 0 records", 0, recordsRead.size());
}
}
@Test
public void testCOWToMORConvertedTableRollback() throws Exception {
// Set TableType to COW
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE);
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
// verify there are no errors
assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertTrue(commit.isPresent());
assertEquals("commit should be 001", "001", commit.get().getTimestamp());
/**
* Write 2 (updates)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
// Set TableType to MOR
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
// rollback a COW commit when TableType is MOR
client.rollback(newCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
final String absentCommit = newCommitTime;
assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
}
}
@Test
public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
HoodieWriteConfig cfg = getConfig(false);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
// Test delta commit rollback
/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
tableView =
getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
/**
* Write 2 (inserts + updates - testing failed delta commit)
*/
final String commitTime1 = "002";
// WriteClient with custom config (disable small file handling)
try (HoodieWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
secondClient.startCommitWithTime(commitTime1);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
statuses = secondClient.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
// Test failed delta commit rollback
secondClient.rollback(commitTime1);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
// After rollback, there should be no parquet file with the failed commit time
Assert.assertEquals(Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime1)).count(), 0);
dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
}
/**
* Write 3 (inserts + updates - testing successful delta commit)
*/
final String commitTime2 = "002";
try (HoodieWriteClient thirdClient = getHoodieWriteClient(cfg);) {
thirdClient.startCommitWithTime(commitTime2);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
writeRecords = jsc.parallelize(copyOfRecords, 1);
writeStatusJavaRDD = thirdClient.upsert(writeRecords, commitTime2);
thirdClient.commit(commitTime2, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
// Test successful delta commit rollback
thirdClient.rollback(commitTime2);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
// After rollback, there should be no parquet file with the failed commit time
Assert.assertEquals(Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime2)).count(), 0);
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// check that the number of records read is still correct after rollback operation
assertEquals(recordsRead.size(), 200);
// Test compaction commit rollback
/**
* Write 4 (updates)
*/
newCommitTime = "003";
thirdClient.startCommitWithTime(newCommitTime);
writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
thirdClient.commit(newCommitTime, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
thirdClient.rollback(compactedCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
}
}
}
@Test
public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
HoodieWriteConfig cfg = getConfig(false);
try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue("Should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
/**
* Write 2 (inserts + updates)
*/
newCommitTime = "002";
// WriteClient with custom config (disable small file handling)
HoodieWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
nClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
statuses = nClient.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
nClient.commit(newCommitTime, writeStatusJavaRDD);
copyOfRecords.clear();
// Schedule a compaction
/**
* Write 3 (inserts + updates)
*/
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> newInserts = dataGen.generateInserts(newCommitTime, 100);
records = dataGen.generateUpdates(newCommitTime, records);
records.addAll(newInserts);
writeRecords = jsc.parallelize(records, 1);
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime = "004";
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
// Compaction commit
/**
* Write 4 (updates)
*/
newCommitTime = "005";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, records);
writeRecords = jsc.parallelize(records, 1);
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
compactionInstantTime = "006";
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
JavaRDD<WriteStatus> ws = client.compact(compactionInstantTime);
client.commitCompaction(compactionInstantTime, ws, Option.empty());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
/**
* Write 5 (updates)
*/
newCommitTime = "007";
client.startCommitWithTime(newCommitTime);
copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
statuses = client.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
client.commit(newCommitTime, writeStatusJavaRDD);
copyOfRecords.clear();
// Rollback latest commit first
client.restoreToInstant("000");
metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
List<HoodieFileGroup> fileGroups =
((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList());
assertTrue(fileGroups.isEmpty());
// make sure there are no log files remaining
assertEquals(0L, ((HoodieTableFileSystemView) rtView).getAllFileGroups()
.filter(fileGroup -> fileGroup.getAllRawFileSlices().noneMatch(f -> f.getLogFiles().count() == 0))
.count());
}
}
protected HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff() {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withAutoCommit(false).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withEmbeddedTimelineServerEnabled(true)
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024).build()).forTable("test-trip-table")
.build();
}
@Test
public void testUpsertPartitioner() throws Exception {
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
/**
* Write 1 (only inserts, written as parquet file)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
Map<String, Long> parquetFileIdToSize =
dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
List<HoodieBaseFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
assertTrue("Should list the parquet files we wrote in the delta commit",
dataFilesList.size() > 0);
/**
* Write 2 (only updates + inserts, written to .log file + correction of existing parquet file size)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> newRecords = dataGen.generateUpdates(newCommitTime, records);
newRecords.addAll(dataGen.generateInserts(newCommitTime, 20));
statuses = client.upsert(jsc.parallelize(newRecords), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp());
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
roView = getHoodieTableFileSystemView(metaClient,
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
List<HoodieBaseFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList());
Map<String, Long> parquetFileIdToNewSize =
newDataFilesList.stream().collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
assertTrue(parquetFileIdToNewSize.entrySet().stream().anyMatch(entry -> parquetFileIdToSize.get(entry.getKey()) < entry.getValue()));
List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// Wrote 20 records in 2 batches
assertEquals("Must contain 40 records", 40, recordsRead.size());
}
}
@Test
public void testLogFileCountsAfterCompaction() throws Exception {
// insert 100 records
HoodieWriteConfig config = getConfig(true);
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
writeClient.insert(recordsRDD, newCommitTime).collect();
// Update all the 100 records
HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
HoodieReadClient readClient = new HoodieReadClient(jsc, config);
updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
// Write them to corresponding avro logfiles
HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(),
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
// Verify that all data file has one log file
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
((SyncableFileSystemView) (table.getSliceView())).reset();
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles =
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
for (FileSlice fileSlice : groupedLogFiles) {
assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count());
}
}
// Mark 2nd delta-instant as completed
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
// Do a compaction
String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime);
// Verify that recently written compacted data file has no log file
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
assertTrue("Compaction commit should be > than last insert", HoodieTimeline
.compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime, HoodieTimeline.GREATER));
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles =
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
for (FileSlice slice : groupedLogFiles) {
assertEquals("After compaction there should be no log files visible on a full view", 0, slice.getLogFiles().count());
}
List<WriteStatus> writeStatuses = result.collect();
assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
}
}
}
@Test
public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception {
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime);
writeClient.commit(newCommitTime, statuses);
HoodieTable table =
HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
assertEquals(0, tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count());
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
}
Assert.assertTrue(numLogFiles > 0);
// Do a compaction
String commitTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
statuses = writeClient.compact(commitTime);
assertEquals(statuses.map(status -> status.getStat().getPath().contains("parquet")).count(), numLogFiles);
Assert.assertEquals(statuses.count(), numLogFiles);
writeClient.commitCompaction(commitTime, statuses, Option.empty());
}
}
@Test
public void testInsertsGeneratedIntoLogFilesRollback() throws Exception {
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime);
// trigger an action
List<WriteStatus> writeStatuses = statuses.collect();
// Ensure that inserts are written to only log files
Assert.assertEquals(
writeStatuses.stream().filter(writeStatus -> !writeStatus.getStat().getPath().contains("log")).count(), 0);
Assert.assertTrue(
writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPath().contains("log")));
// rollback a failed commit
boolean rollback = writeClient.rollback(newCommitTime);
Assert.assertTrue(rollback);
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
// insert 100 records
records = dataGen.generateInserts(newCommitTime, 100);
recordsRDD = jsc.parallelize(records, 1);
statuses = writeClient.insert(recordsRDD, newCommitTime);
writeClient.commit(newCommitTime, statuses);
// Sleep for small interval (at least 1 second) to force a new rollback start time.
Thread.sleep(1000);
// We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs
// and calling rollback twice
final String lastCommitTime = newCommitTime;
HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieInstant last = metaClient.getCommitsTimeline().getInstants()
.filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get();
String fileName = last.getFileName();
// Save the .commit file to local directory.
// Rollback will be called twice to test the case where rollback failed first time and retried.
// We got the "BaseCommitTime cannot be null" exception before the fix
TemporaryFolder folder = new TemporaryFolder();
folder.create();
File file = folder.newFile();
metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), fileName),
new Path(file.getAbsolutePath()));
writeClient.rollback(newCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
}
assertEquals(0, numLogFiles);
metaClient.getFs().copyFromLocalFile(new Path(file.getAbsolutePath()),
new Path(metaClient.getMetaPath(), fileName));
Thread.sleep(1000);
// Rollback again to pretend the first rollback failed partially. This should not error our
writeClient.rollback(newCommitTime);
folder.delete();
}
}
@Test
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction() throws Exception {
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime);
writeClient.commit(newCommitTime, statuses);
// trigger an action
statuses.collect();
HoodieTable table =
HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
}
Assert.assertTrue(numLogFiles > 0);
// Do a compaction
newCommitTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
statuses = writeClient.compact(newCommitTime);
// Ensure all log files have been compacted into parquet files
assertEquals(statuses.map(status -> status.getStat().getPath().contains("parquet")).count(), numLogFiles);
Assert.assertEquals(statuses.count(), numLogFiles);
writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
// Trigger a rollback of compaction
writeClient.rollback(newCommitTime);
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
tableRTFileSystemView = table.getSliceView();
((SyncableFileSystemView) tableRTFileSystemView).reset();
Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
System.out.println("Last Instant =" + lastInstant);
for (String partitionPath : dataGen.getPartitionPaths()) {
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
}
}
}
/**
* Test to ensure rolling stats are correctly written to metadata file.
*/
@Test
public void testRollingStatsInMetadata() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
// Create a commit without rolling stats in metadata to test backwards compatibility
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();
HoodieInstant instant = new HoodieInstant(State.REQUESTED, commitActionType, "000");
activeTimeline.createNewInstant(instant);
activeTimeline.transitionRequestedToInflight(instant, Option.empty());
instant = new HoodieInstant(State.INFLIGHT, commitActionType, "000");
activeTimeline.saveAsComplete(instant, Option.empty());
String commitTime = "001";
client.startCommitWithTime(commitTime);
List<HoodieRecord> records = dataGen.generateInserts(commitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> statuses = client.insert(writeRecords, commitTime);
assertTrue("Commit should succeed", client.commit(commitTime, statuses));
// Read from commit file
table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
int inserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
inserts += stat.getValue().getInserts();
}
}
Assert.assertEquals(inserts, 200);
commitTime = "002";
client.startCommitWithTime(commitTime);
records = dataGen.generateUpdates(commitTime, records);
writeRecords = jsc.parallelize(records, 1);
statuses = client.upsert(writeRecords, commitTime);
assertTrue("Commit should succeed", client.commit(commitTime, statuses));
// Read from commit file
table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
metadata = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
inserts = 0;
int upserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
inserts += stat.getValue().getInserts();
upserts += stat.getValue().getUpserts();
}
}
Assert.assertEquals(inserts, 200);
Assert.assertEquals(upserts, 200);
client.rollback(commitTime);
// Read from commit file
table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
metadata = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
inserts = 0;
upserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
inserts += stat.getValue().getInserts();
upserts += stat.getValue().getUpserts();
}
}
Assert.assertEquals(inserts, 200);
Assert.assertEquals(upserts, 0);
}
}
/**
* Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them.
*/
@Test
public void testRollingStatsWithSmallFileHandling() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
Map<String, Long> fileIdToInsertsMap = new HashMap<>();
Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
String commitTime = "000";
client.startCommitWithTime(commitTime);
List<HoodieRecord> records = dataGen.generateInserts(commitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> statuses = client.insert(writeRecords, commitTime);
assertTrue("Commit should succeed", client.commit(commitTime, statuses));
// Read from commit file
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
int inserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
inserts += stat.getValue().getInserts();
fileIdToInsertsMap.put(stat.getKey(), stat.getValue().getInserts());
fileIdToUpsertsMap.put(stat.getKey(), stat.getValue().getUpserts());
}
}
Assert.assertEquals(inserts, 200);
commitTime = "001";
client.startCommitWithTime(commitTime);
// generate updates + inserts. inserts should be handled into small files
records = dataGen.generateUpdates(commitTime, records);
records.addAll(dataGen.generateInserts(commitTime, 200));
writeRecords = jsc.parallelize(records, 1);
statuses = client.upsert(writeRecords, commitTime);
assertTrue("Commit should succeed", client.commit(commitTime, statuses));
// Read from commit file
table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
metadata = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
inserts = 0;
int upserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
// No new file id should be created, all the data should be written to small files already there
assertTrue(fileIdToInsertsMap.containsKey(stat.getKey()));
assertTrue(fileIdToUpsertsMap.containsKey(stat.getKey()));
inserts += stat.getValue().getInserts();
upserts += stat.getValue().getUpserts();
}
}
Assert.assertEquals(inserts, 400);
Assert.assertEquals(upserts, 200);
// Test small file handling after compaction
commitTime = "002";
client.scheduleCompactionAtInstant(commitTime, Option.of(metadata.getExtraMetadata()));
statuses = client.compact(commitTime);
client.commitCompaction(commitTime, statuses, Option.empty());
// Read from commit file
table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
metadata = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
HoodieRollingStatMetadata rollingStatMetadata1 = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
// Ensure that the rolling stats from the extra metadata of delta commits is copied over to the compaction commit
for (Map.Entry<String, Map<String, HoodieRollingStat>> entry : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
Assert.assertTrue(rollingStatMetadata1.getPartitionToRollingStats().containsKey(entry.getKey()));
Assert.assertEquals(rollingStatMetadata1.getPartitionToRollingStats().get(entry.getKey()).size(),
entry.getValue().size());
}
// Write inserts + updates
commitTime = "003";
client.startCommitWithTime(commitTime);
// generate updates + inserts. inserts should be handled into small files
records = dataGen.generateUpdates(commitTime, records);
records.addAll(dataGen.generateInserts(commitTime, 200));
writeRecords = jsc.parallelize(records, 1);
statuses = client.upsert(writeRecords, commitTime);
assertTrue("Commit should succeed", client.commit(commitTime, statuses));
// Read from commit file
table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
metadata = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
inserts = 0;
upserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
// No new file id should be created, all the data should be written to small files already there
assertTrue(fileIdToInsertsMap.containsKey(stat.getKey()));
inserts += stat.getValue().getInserts();
upserts += stat.getValue().getUpserts();
}
}
Assert.assertEquals(inserts, 600);
Assert.assertEquals(upserts, 600);
}
}
private HoodieWriteConfig getConfig(Boolean autoCommit) {
return getConfigBuilder(autoCommit).build();
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
return getConfigBuilder(autoCommit, IndexType.BLOOM);
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieIndex.IndexType indexType) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build());
}
private void assertNoWriteErrors(List<WriteStatus> statuses) {
// Verify there are no errors
for (WriteStatus status : statuses) {
assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
}
}
}