blob: 2db49a25a5165803158bbca5a65526d4f3ae03cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import scala.collection.JavaConverters;
/**
* CLI command to perform bootstrap action & display bootstrap index.
*/
@Component
public class BootstrapCommand implements CommandMarker {
@CliCommand(value = "bootstrap run", help = "Run a bootstrap action for current Hudi table")
public String bootstrap(
@CliOption(key = {"srcPath"}, mandatory = true, help = "Bootstrap source data path of the table") final String srcPath,
@CliOption(key = {"targetPath"}, mandatory = true,
help = "Base path for the target hoodie table") final String targetPath,
@CliOption(key = {"tableName"}, mandatory = true, help = "Hoodie table name") final String tableName,
@CliOption(key = {"tableType"}, mandatory = true, help = "Hoodie table type") final String tableType,
@CliOption(key = {"rowKeyField"}, mandatory = true, help = "Record key columns for bootstrap data") final String rowKeyField,
@CliOption(key = {"partitionPathField"}, unspecifiedDefaultValue = "",
help = "Partition fields for bootstrap source data") final String partitionPathField,
@CliOption(key = {"bootstrapIndexClass"}, unspecifiedDefaultValue = "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex",
help = "Bootstrap Index Class") final String bootstrapIndexClass,
@CliOption(key = {"selectorClass"}, unspecifiedDefaultValue = "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector",
help = "Selector class for bootstrap") final String selectorClass,
@CliOption(key = {"keyGeneratorClass"}, unspecifiedDefaultValue = "org.apache.hudi.keygen.SimpleKeyGenerator",
help = "Key generator class for bootstrap") final String keyGeneratorClass,
@CliOption(key = {"fullBootstrapInputProvider"}, unspecifiedDefaultValue = "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider",
help = "Class for Full bootstrap input provider") final String fullBootstrapInputProvider,
@CliOption(key = {"schemaProviderClass"}, unspecifiedDefaultValue = "",
help = "SchemaProvider to attach schemas to bootstrap source data") final String schemaProviderClass,
@CliOption(key = {"payloadClass"}, unspecifiedDefaultValue = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
help = "Payload Class") final String payloadClass,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "1500", help = "Bootstrap writer parallelism") final int parallelism,
@CliOption(key = {"sparkMaster"}, unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = {"sparkMemory"}, unspecifiedDefaultValue = "4G", help = "Spark executor memory") final String sparkMemory,
@CliOption(key = {"enableHiveSync"}, unspecifiedDefaultValue = "false", help = "Enable Hive sync") final Boolean enableHiveSync,
@CliOption(key = {"propsFilePath"}, help = "path to properties file on localfs or dfs with configurations for hoodie client for importing",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = {"hoodieConfigs"}, help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs)
throws IOException, InterruptedException, URISyntaxException {
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
String cmd = SparkCommand.BOOTSTRAP.toString();
sparkLauncher.addAppArgs(cmd, master, sparkMemory, tableName, tableType, targetPath, srcPath, rowKeyField,
partitionPathField, String.valueOf(parallelism), schemaProviderClass, bootstrapIndexClass, selectorClass,
keyGeneratorClass, fullBootstrapInputProvider, payloadClass, String.valueOf(enableHiveSync), propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to bootstrap source data to Hudi dataset";
}
return "Bootstrapped source data as Hudi dataset";
}
@CliCommand(value = "bootstrap index showmapping", help = "Show bootstrap index mapping")
public String showBootstrapIndexMapping(
@CliOption(key = {"partitionPath"}, unspecifiedDefaultValue = "", help = "A valid partition path") String partitionPath,
@CliOption(key = {"fileIds"}, unspecifiedDefaultValue = "", help = "Valid fileIds split by comma") String fileIds,
@CliOption(key = {"limit"}, unspecifiedDefaultValue = "-1", help = "Limit rows to be displayed") Integer limit,
@CliOption(key = {"sortBy"}, unspecifiedDefaultValue = "", help = "Sorting Field") final String sortByField,
@CliOption(key = {"desc"}, unspecifiedDefaultValue = "false", help = "Ordering") final boolean descending,
@CliOption(key = {"headeronly"}, unspecifiedDefaultValue = "false", help = "Print Header Only")
final boolean headerOnly) {
if (partitionPath.isEmpty() && !fileIds.isEmpty()) {
throw new IllegalStateException("PartitionPath is mandatory when passing fileIds.");
}
BootstrapIndex.IndexReader indexReader = createBootstrapIndexReader();
List<String> indexedPartitions = indexReader.getIndexedPartitionPaths();
if (!partitionPath.isEmpty() && !indexedPartitions.contains(partitionPath)) {
return partitionPath + " is not an valid indexed partition";
}
List<BootstrapFileMapping> mappingList = new ArrayList<>();
if (!fileIds.isEmpty()) {
List<HoodieFileGroupId> fileGroupIds = Arrays.stream(fileIds.split(","))
.map(fileId -> new HoodieFileGroupId(partitionPath, fileId)).collect(Collectors.toList());
mappingList.addAll(indexReader.getSourceFileMappingForFileIds(fileGroupIds).values());
} else if (!partitionPath.isEmpty()) {
mappingList.addAll(indexReader.getSourceFileMappingForPartition(partitionPath));
} else {
for (String part : indexedPartitions) {
mappingList.addAll(indexReader.getSourceFileMappingForPartition(part));
}
}
final List<Comparable[]> rows = convertBootstrapSourceFileMapping(mappingList);
final TableHeader header = new TableHeader()
.addTableHeaderField("Hudi Partition")
.addTableHeaderField("FileId")
.addTableHeaderField("Source File Base Path")
.addTableHeaderField("Source File Partition")
.addTableHeaderField("Source File Path");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
limit, headerOnly, rows);
}
@CliCommand(value = "bootstrap index showpartitions", help = "Show bootstrap indexed partitions")
public String showBootstrapIndexPartitions() {
BootstrapIndex.IndexReader indexReader = createBootstrapIndexReader();
List<String> indexedPartitions = indexReader.getIndexedPartitionPaths();
String[] header = new String[] {"Indexed partitions"};
String[][] rows = new String[indexedPartitions.size()][1];
for (int i = 0; i < indexedPartitions.size(); i++) {
rows[i][0] = indexedPartitions.get(i);
}
return HoodiePrintHelper.print(header, rows);
}
private BootstrapIndex.IndexReader createBootstrapIndexReader() {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
if (!index.useIndex()) {
throw new HoodieException("This is not a bootstrapped Hudi table. Don't have any index info");
}
return index.createReader();
}
private List<Comparable[]> convertBootstrapSourceFileMapping(List<BootstrapFileMapping> mappingList) {
final List<Comparable[]> rows = new ArrayList<>();
for (BootstrapFileMapping mapping : mappingList) {
rows.add(new Comparable[] {mapping.getPartitionPath(), mapping.getFileId(),
mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBoostrapFileStatus().getPath().getUri()});
}
return rows;
}
}