blob: 25a694c5f178645f3368000bd1ff325d48dc598d [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hudi.integ.testsuite;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.dag.DagUtils;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This is the entry point for running a Hudi Test Suite. Although this class has similarities with
* {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency on the changes in
* DeltaStreamer.
public class HoodieTestSuiteJob {
private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class);
private final HoodieTestSuiteConfig cfg;
* Bag of properties with source, hoodie client, key generator etc.
TypedProperties props;
* Schema provider that supplies the command for writing out the generated payloads.
private transient SchemaProvider schemaProvider;
* Filesystem used.
private transient FileSystem fs;
* Spark context.
private transient JavaSparkContext jsc;
* Spark Session.
private transient SparkSession sparkSession;
* Hive Config.
private transient HiveConf hiveConf;
private BuiltinKeyGenerator keyGenerator;
public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException {
this.cfg = cfg;
this.jsc = jsc;
this.sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();"Creating workload generator with configs : {}", props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
this.keyGenerator = (BuiltinKeyGenerator) DataSourceUtils.createKeyGenerator(props);
if (!fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), cfg.targetBasePath,
HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived");
private static HiveConf getDefaultHiveConf(Configuration cfg) {
HiveConf hiveConf = new HiveConf();
return hiveConf;
public static void main(String[] args) throws Exception {
final HoodieTestSuiteConfig cfg = new HoodieTestSuiteConfig();
JCommander cmd = new JCommander(cfg, args);
if ( || args.length == 0) {
JavaSparkContext jssc = UtilHelpers.buildSparkContext("workload-generator-" + cfg.outputTypeName
+ "-" + cfg.inputFormatName, cfg.sparkMaster);
new HoodieTestSuiteJob(cfg, jssc).runTestSuite();
public void runTestSuite() {
try {
WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils
: DagUtils.convertYamlPathToDag(this.fs, this.cfg.workloadYamlPath);"Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag));
long startTime = System.currentTimeMillis();
String schemaStr = schemaProvider.getSourceSchema().toString();
final HoodieTestSuiteWriter writer = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr);
final DeltaGenerator deltaGenerator = new DeltaGenerator(
new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName),
new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath,
schemaStr, cfg.limitFileSize), jsc, sparkSession, schemaStr, keyGenerator);
DagScheduler dagScheduler = new DagScheduler(workflowDag, writer, deltaGenerator);
dagScheduler.schedule();"Finished scheduling all tasks, Time taken {}", System.currentTimeMillis() - startTime);
} catch (Exception e) {
log.error("Failed to run Test Suite ", e);
throw new HoodieException("Failed to run Test Suite ", e);
} finally {
* The Hudi test suite uses {@link HoodieDeltaStreamer} to run some operations hence extend delta streamer config.
public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {
@Parameter(names = {"--input-base-path"}, description = "base path for input data"
+ "(Will be created if did not exist first time around. If exists, more data will be added to that path)",
required = true)
public String inputBasePath;
@Parameter(names = {
"--workload-generator-classname"}, description = "WorkflowDag of operations to generate the workload",
required = true)
public String workloadDagGenerator = WorkflowDagGenerator.class.getName();
@Parameter(names = {
"--workload-yaml-path"}, description = "Workflow Dag yaml path to generate the workload")
public String workloadYamlPath;
@Parameter(names = {"--delta-output-type"}, description = "Subclass of "
+ "org.apache.hudi.testsuite.workload.DeltaOutputMode to readAvro data.")
public String outputTypeName =;
@Parameter(names = {"--delta-input-format"}, description = "Subclass of "
+ "org.apache.hudi.testsuite.workload.DeltaOutputMode to read avro data.")
public String inputFormatName =;
@Parameter(names = {"--input-file-size"}, description = "The min/max size of the input files to generate",
required = true)
public Long limitFileSize = 1024 * 1024 * 120L;
@Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to "
+ "perform"
+ " ingestion. If set to false, HoodieWriteClient will be used")
public Boolean useDeltaStreamer = false;