blob: 2c6b17493d225842c8ef0b025320f0b581cef254 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.integ;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.HoodieCLIIntegrationTestBase;
import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.shell.Shell;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration test class for {@link org.apache.hudi.cli.commands.ClusteringCommand}.
* <p/>
* A command use SparkLauncher need load jars under lib which generate during mvn package.
* Use integration test instead of unit test.
*/
@SpringBootTest(properties = {"spring.shell.interactive.enabled=false", "spring.shell.command.script.enabled=false"})
public class ITTestClusteringCommand extends HoodieCLIIntegrationTestBase {
@Autowired
private Shell shell;
@BeforeEach
public void init() throws IOException {
tableName = "test_table_" + ITTestClusteringCommand.class.getName();
basePath = Paths.get(basePath, tableName).toString();
HoodieCLI.conf = jsc.hadoopConfiguration();
// Create table and connect
new TableCommand().createTable(
basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
initMetaClient();
}
/**
* Test case for command 'clustering schedule'.
*/
@Test
public void testScheduleClustering() throws IOException {
// generate commits
generateCommits();
Object result = scheduleClustering();
assertAll("Command run failed",
() -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
() -> assertTrue(
result.toString().startsWith("Succeeded to schedule clustering for")));
// there is 1 requested clustering
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
assertEquals(1, timeline.filterPendingReplaceTimeline().countInstants());
}
/**
* Test case for command 'clustering run'.
*/
@Test
public void testClustering() throws IOException {
// generate commits
generateCommits();
Object result1 = scheduleClustering();
assertTrue(ShellEvaluationResultUtil.isSuccess(result1));
// get clustering instance
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
Option<String> instanceOpt =
timeline.filterPendingReplaceTimeline().firstInstant().map(HoodieInstant::getTimestamp);
assertTrue(instanceOpt.isPresent(), "Must have pending clustering.");
final String instance = instanceOpt.get();
Object result2 = shell.evaluate(() ->
String.format("clustering run --parallelism %s --clusteringInstant %s --sparkMaster %s",
2, instance, "local"));
assertAll("Command run failed",
() -> assertTrue(ShellEvaluationResultUtil.isSuccess(result2)),
() -> assertTrue(
result2.toString().startsWith("Succeeded to run clustering for ")));
// assert clustering complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.filterCompletedInstants().getInstantsAsStream()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
"Pending clustering must be completed");
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.getCompletedReplaceTimeline().getInstantsAsStream()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
"Pending clustering must be completed");
}
/**
* Test case for command 'clustering scheduleAndExecute'.
*/
@Test
public void testClusteringScheduleAndExecute() throws IOException {
// generate commits
generateCommits();
Object result = shell.evaluate(() ->
String.format("clustering scheduleAndExecute --parallelism %s --sparkMaster %s", 2, "local"));
assertAll("Command run failed",
() -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
() -> assertTrue(
result.toString().startsWith("Succeeded to run clustering for scheduleAndExecute")));
// assert clustering complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.getCompletedReplaceTimeline().getInstantsAsStream()
.map(HoodieInstant::getTimestamp).count() > 0,
"Completed clustering couldn't be 0");
}
private Object scheduleClustering() {
// generate requested clustering
return shell.evaluate(() ->
String.format("clustering schedule --hoodieConfigs hoodie.clustering.inline.max.commits=1 --sparkMaster %s", "local"));
}
private void generateCommits() throws IOException {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
try (SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg)) {
insert(jsc, client, dataGen, "001");
insert(jsc, client, dataGen, "002");
}
}
private List<HoodieRecord> insert(JavaSparkContext jsc, SparkRDDWriteClient<HoodieAvroPayload> client,
HoodieTestDataGenerator dataGen, String newCommitTime) throws IOException {
// inserts
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime);
return records;
}
private JavaRDD<WriteStatus> operateFunc(
HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord> writeRecords, String commitTime)
throws IOException {
return writeFn.apply(client, writeRecords, commitTime);
}
}