blob: c2509b12d4a0a4b0dd1752d2d8607970c6bfac17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test class for {@link org.apache.hudi.cli.commands.CommitsCommand}.
*/
public class TestCommitsCommand extends AbstractShellIntegrationTest {
private String tableName;
private String tablePath;
@BeforeEach
public void init() throws IOException {
tableName = "test_table";
tablePath = basePath + File.separator + tableName;
HoodieCLI.conf = jsc.hadoopConfiguration();
// Create table and connect
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
}
private LinkedHashMap<String, Integer[]> generateData() throws Exception {
// generate data and metadata
LinkedHashMap<String, Integer[]> data = new LinkedHashMap<>();
data.put("102", new Integer[] {15, 10});
data.put("101", new Integer[] {20, 10});
data.put("100", new Integer[] {15, 15});
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
String key = entry.getKey();
Integer[] value = entry.getValue();
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, jsc.hadoopConfiguration(),
Option.of(value[0]), Option.of(value[1]));
}
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
assertEquals(3, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(),
"There should have 3 commits");
return data;
}
private String generateExpectData(int records, Map<String, Integer[]> data) throws IOException {
FileSystem fs = FileSystem.get(jsc.hadoopConfiguration());
List<String> partitionPaths =
FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
int partitions = partitionPaths.size();
// default pre-commit is not null, file add always be 0 and update always be partition nums
int fileAdded = 0;
int fileUpdated = partitions;
int errors = 0;
// generate expect result
List<Comparable[]> rows = new ArrayList<>();
data.forEach((key, value) -> {
for (int i = 0; i < records; i++) {
// there are more than 1 partitions, so need to * partitions
rows.add(new Comparable[]{key, partitions * HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES,
fileAdded, fileUpdated, partitions, partitions * value[0], partitions * value[1], errors});
}
});
final Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> {
return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
});
final TableHeader header = new TableHeader()
.addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
return HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false,
-1, false, rows);
}
/**
* Test case of 'commits show' command.
*/
@Test
public void testShowCommits() throws Exception {
Map<String, Integer[]> data = generateData();
CommandResult cr = getShell().executeCommand("commits show");
assertTrue(cr.isSuccess());
String expected = generateExpectData(1, data);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);
}
/**
* Test case of 'commits showarchived' command.
*/
@Test
public void testShowArchivedCommits() throws Exception {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.forTable("test-trip-table").build();
// generate data and metadata
Map<String, Integer[]> data = new LinkedHashMap<>();
data.put("104", new Integer[] {20, 10});
data.put("103", new Integer[] {15, 15});
data.put("102", new Integer[] {25, 45});
data.put("101", new Integer[] {35, 15});
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
String key = entry.getKey();
Integer[] value = entry.getValue();
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, jsc.hadoopConfiguration(),
Option.of(value[0]), Option.of(value[1]));
}
// archive
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration());
archiveLog.archiveIfRequired(jsc);
CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104"));
assertTrue(cr.isSuccess());
// archived 101 and 102 instant, generate expect data
assertEquals(2, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(),
"There should 2 instants not be archived!");
// archived 101 and 102 instants, remove 103 and 104 instant
data.remove("103");
data.remove("104");
String expected = generateExpectData(3, data);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);
}
/**
* Test case of 'commit showpartitions' command.
*/
@Test
public void testShowCommitPartitions() throws Exception {
Map<String, Integer[]> data = generateData();
String commitInstant = "101";
CommandResult cr = getShell().executeCommand(String.format("commit showpartitions --commit %s", commitInstant));
assertTrue(cr.isSuccess());
Integer[] value = data.get(commitInstant);
List<Comparable[]> rows = new ArrayList<>();
// prevCommit not null, so add 0, update 1
Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition ->
rows.add(new Comparable[] {partition, 0, 1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0})
);
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN,
entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);
}
/**
* Test case of 'commit showfiles' command.
*/
@Test
public void testShowCommitFiles() throws Exception {
Map<String, Integer[]> data = generateData();
String commitInstant = "101";
CommandResult cr = getShell().executeCommand(String.format("commit showfiles --commit %s", commitInstant));
assertTrue(cr.isSuccess());
Integer[] value = data.get(commitInstant);
List<Comparable[]> rows = new ArrayList<>();
Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition ->
rows.add(new Comparable[] {partition, HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT,
value[1], value[0], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES,
// default 0 errors and blank file with 0 size
0, 0}));
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_SIZE);
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);
}
/**
* Test case of 'commits compare' command.
*/
@Test
public void testCompareCommits() throws Exception {
Map<String, Integer[]> data = generateData();
String tableName2 = "test_table2";
String tablePath2 = basePath + File.separator + tableName2;
HoodieTestUtils.init(jsc.hadoopConfiguration(), tablePath2, getTableType());
data.remove("102");
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
String key = entry.getKey();
Integer[] value = entry.getValue();
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath2, key, jsc.hadoopConfiguration(),
Option.of(value[0]), Option.of(value[1]));
}
CommandResult cr = getShell().executeCommand(String.format("commits compare --path %s", tablePath2));
assertTrue(cr.isSuccess());
// the latest instant of test_table2 is 101
List<String> commitsToCatchup = metaClient.getActiveTimeline().findInstantsAfter("101", Integer.MAX_VALUE)
.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
String expected = String.format("Source %s is ahead by %d commits. Commits to catch up - %s",
tableName, commitsToCatchup.size(), commitsToCatchup);
assertEquals(expected, cr.getResult().toString());
}
/**
* Test case of 'commits sync' command.
*/
@Test
public void testSyncCommits() throws Exception {
Map<String, Integer[]> data = generateData();
String tableName2 = "test_table2";
String tablePath2 = basePath + File.separator + tableName2;
HoodieTestUtils.init(jsc.hadoopConfiguration(), tablePath2, getTableType(), tableName2);
data.remove("102");
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
String key = entry.getKey();
Integer[] value = entry.getValue();
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath2, key, jsc.hadoopConfiguration(),
Option.of(value[0]), Option.of(value[1]));
}
CommandResult cr = getShell().executeCommand(String.format("commits sync --path %s", tablePath2));
assertTrue(cr.isSuccess());
String expected = String.format("Load sync state between %s and %s", tableName, tableName2);
assertEquals(expected, cr.getResult().toString());
}
}