blob: c8878c2d66497d552123bd96c36a8282c3951e71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import com.beust.jcommander.IValueValidator;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import scala.collection.JavaConversions;
import static org.apache.hudi.utilities.UtilHelpers.buildSparkConf;
/**
* Export the latest records of Hudi dataset to a set of external files (e.g., plain parquet files).
*/
public class HoodieSnapshotExporter {
@FunctionalInterface
public interface Partitioner {
DataFrameWriter<Row> partition(Dataset<Row> source);
}
private static final Logger LOG = LogManager.getLogger(HoodieSnapshotExporter.class);
public static class OutputFormatValidator implements IValueValidator<String> {
public static final String HUDI = "hudi";
public static final List<String> FORMATS = CollectionUtils.createImmutableList("json", "parquet", "orc", HUDI);
@Override
public void validate(String name, String value) {
if (value == null || !FORMATS.contains(value)) {
throw new ParameterException(
String.format("Invalid output format: value:%s: supported formats:%s", value, FORMATS));
}
}
}
public static class Config implements Serializable {
@Parameter(names = {"--source-base-path"}, description = "Base path for the source Hudi dataset to be snapshotted", required = true)
public String sourceBasePath;
@Parameter(names = {"--target-output-path"}, description = "Base path for the target output files (snapshots)", required = true)
public String targetOutputPath;
@Parameter(names = {"--output-format"}, description = "Output format for the exported dataset; accept these values: json|parquet|orc|hudi", required = true,
validateValueWith = OutputFormatValidator.class)
public String outputFormat;
@Parameter(names = {"--output-partition-field"}, description = "A field to be used by Spark repartitioning")
public String outputPartitionField = null;
@Parameter(names = {"--output-partitioner"}, description = "A class to facilitate custom repartitioning")
public String outputPartitioner = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for file listing")
public int parallelism = 0;
}
public void export(JavaSparkContext jsc, Config cfg) throws IOException {
FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
if (outputFs.exists(new Path(cfg.targetOutputPath))) {
throw new HoodieSnapshotExporterException("The target output path already exists.");
}
FileSystem sourceFs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
final String latestCommitTimestamp = getLatestCommitTimestamp(sourceFs, cfg)
.<HoodieSnapshotExporterException>orElseThrow(() -> {
throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
});
LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
latestCommitTimestamp));
final HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
final List<String> partitions = getPartitions(engineContext, cfg);
if (partitions.isEmpty()) {
throw new HoodieSnapshotExporterException("The source dataset has 0 partition to snapshot.");
}
LOG.info(String.format("The job needs to export %d partitions.", partitions.size()));
if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
} else {
exportAsNonHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
}
createSuccessTag(outputFs, cfg);
}
private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) {
final HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getWriteTimeline()
.filterCompletedInstants().lastInstant();
return latestCommit.isPresent() ? Option.of(latestCommit.get().getTimestamp()) : Option.empty();
}
private List<String> getPartitions(HoodieEngineContext engineContext, Config cfg) {
return FSUtils.getAllPartitionPaths(engineContext, cfg.sourceBasePath, true, false);
}
private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
Path successTagPath = new Path(cfg.targetOutputPath + "/_SUCCESS");
if (!fs.exists(successTagPath)) {
LOG.info(String.format("Creating _SUCCESS under target output path: %s", cfg.targetOutputPath));
fs.createNewFile(successTagPath);
}
}
private void exportAsNonHudi(JavaSparkContext jsc, FileSystem sourceFs,
Config cfg, List<String> partitions, String latestCommitTimestamp) {
Partitioner defaultPartitioner = dataset -> {
Dataset<Row> hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
? hoodieDroppedDataset.write()
: hoodieDroppedDataset.repartition(new Column(cfg.outputPartitionField)).write().partitionBy(cfg.outputPartitionField);
};
Partitioner partitioner = StringUtils.isNullOrEmpty(cfg.outputPartitioner)
? defaultPartitioner
: ReflectionUtils.loadClass(cfg.outputPartitioner);
HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath);
final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);
Iterator<String> exportingFilePaths = jsc
.parallelize(partitions, partitions.size())
.flatMap(partition -> fsView
.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp)
.map(HoodieBaseFile::getPath).iterator())
.toLocalIterator();
Dataset<Row> sourceDataset = new SQLContext(jsc).read().parquet(JavaConversions.asScalaIterator(exportingFilePaths).toSeq());
partitioner.partition(sourceDataset)
.format(cfg.outputFormat)
.mode(SaveMode.ErrorIfExists)
.save(cfg.targetOutputPath);
}
private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs,
Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
final int parallelism = cfg.parallelism == 0 ? jsc.defaultParallelism() : cfg.parallelism;
final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);
final HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
final SerializableConfiguration serConf = context.getHadoopConf();
context.setJobStatus(this.getClass().getSimpleName(), "Exporting as HUDI dataset");
List<Pair<String, String>> partitionAndFileList = context.flatMap(partitions, partition -> {
// Only take latest version files <= latestCommit.
List<Pair<String, String>> filePaths = fsView
.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp)
.map(f -> Pair.of(partition, f.getPath()))
.collect(Collectors.toList());
// also need to copy over partition metadata
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
Path partitionMetaFile = HoodiePartitionMetadata.getPartitionMetafilePath(fs,
FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get();
if (fs.exists(partitionMetaFile)) {
filePaths.add(Pair.of(partition, partitionMetaFile.toString()));
}
return filePaths.stream();
}, parallelism);
context.foreach(partitionAndFileList, partitionAndFile -> {
String partition = partitionAndFile.getLeft();
Path sourceFilePath = new Path(partitionAndFile.getRight());
Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition);
FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
if (!executorOutputFs.exists(toPartitionPath)) {
executorOutputFs.mkdirs(toPartitionPath);
}
FileUtil.copy(
executorSourceFs,
sourceFilePath,
executorOutputFs,
new Path(toPartitionPath, sourceFilePath.getName()),
false,
false,
executorOutputFs.getConf());
}, parallelism);
// Also copy the .commit files
LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
FileStatus[] commitFilesToCopy =
sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), commitFilePath -> {
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
return true;
} else {
String instantTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
return HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCommitTimestamp
);
}
});
context.foreach(Arrays.asList(commitFilesToCopy), commitFile -> {
Path targetFilePath =
new Path(cfg.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitFile.getPath().getName());
FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
if (!executorOutputFs.exists(targetFilePath.getParent())) {
executorOutputFs.mkdirs(targetFilePath.getParent());
}
FileUtil.copy(
executorSourceFs,
commitFile.getPath(),
executorOutputFs,
targetFilePath,
false,
false,
executorOutputFs.getConf());
}, parallelism);
}
private BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) {
HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder()
.setConf(sourceFs.getConf())
.setBasePath(cfg.sourceBasePath)
.build();
return new HoodieTableFileSystemView(tableMetadata, tableMetadata
.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
}
public static void main(String[] args) throws IOException {
final Config cfg = new Config();
new JCommander(cfg, null, args);
SparkConf sparkConf = buildSparkConf("Hoodie-snapshot-exporter", "local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
LOG.info("Initializing spark job.");
try {
new HoodieSnapshotExporter().export(jsc, cfg);
} finally {
jsc.stop();
}
}
}