blob: 5438fbcfc0d98bbb6a737d27c12cea29f92c4c6e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hadoop.fs.FileStatus;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTestHarness {
private static Stream<Arguments> testClustering() {
return Stream.of(
Arguments.of(true, true, true),
Arguments.of(true, true, false),
Arguments.of(true, false, true),
Arguments.of(true, false, false),
Arguments.of(false, true, true),
Arguments.of(false, true, false),
Arguments.of(false, false, true),
Arguments.of(false, false, false)
);
}
@ParameterizedTest
@MethodSource
void testClustering(boolean doUpdates, boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
// set low compaction small File Size to generate more file groups.
HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(10L)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(1024 * 1024 * 1024)
.parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true)
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0)
.withInlineClustering(true)
.withInlineClusteringNumCommits(1)
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build())
.withRollbackUsingMarkers(false);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, cfg.getProps());
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
/*
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
/*
* Write 2 (more inserts to create new files)
*/
// we already set small file size to small number to force inserts to go into new file.
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
if (doUpdates) {
/*
* Write 3 (updates)
*/
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, false);
}
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
hoodieTable.getHoodieView().sync();
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
// expect 2 base files for each partition
assertEquals(dataGen.getPartitionPaths().length * 2, allFiles.length);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
// verify all files are included in clustering plan.
assertEquals(allFiles.length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count());
// Do the clustering and validate
doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen);
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testClusteringWithNoBaseFiles(boolean doUpdates) throws Exception {
// set low compaction small File Size to generate more file groups.
HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(10L)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(1024 * 1024 * 1024)
.parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true)
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
// set index type to INMEMORY so that log files can be indexed, and it is safe to send
// inserts straight to the log to produce file slices with only log files and no data files
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0)
.withInlineClustering(true)
.withInlineClusteringNumCommits(1).build())
.withRollbackUsingMarkers(false);
HoodieWriteConfig cfg = cfgBuilder.build();
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, cfg.getProps());
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
// test 2 inserts
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files");
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files");
// run updates
if (doUpdates) {
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, false);
}
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
hoodieTable.getHoodieView().sync();
FileStatus[] allBaseFiles = listAllBaseFilesInPath(hoodieTable);
// expect 0 base files for each partition
assertEquals(0, allBaseFiles.length);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
// verify log files are included in clustering plan for each partition.
assertEquals(dataGen.getPartitionPaths().length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count());
// do the clustering and validate
doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen);
}
}
private void doClusteringAndValidate(SparkRDDWriteClient client,
String clusteringCommitTime,
HoodieTableMetaClient metaClient,
HoodieWriteConfig cfg,
HoodieTestDataGenerator dataGen) {
client.cluster(clusteringCommitTime, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context(), metaClient);
clusteredTable.getHoodieView().sync();
Stream<HoodieBaseFile> dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths())
.flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p));
assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count());
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(),
"Expecting a single commit.");
assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction());
if (cfg.populateMetaFields()) {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")),
"Must contain 200 records");
} else {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty()));
}
}
}