blob: 25a694c5f178645f3368000bd1ff325d48dc598d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.dag.DagUtils;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the entry point for running a Hudi Test Suite. Although this class has similarities with
* {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency on the changes in
* DeltaStreamer.
*/
public class HoodieTestSuiteJob {
private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class);
private final HoodieTestSuiteConfig cfg;
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
TypedProperties props;
/**
* Schema provider that supplies the command for writing out the generated payloads.
*/
private transient SchemaProvider schemaProvider;
/**
* Filesystem used.
*/
private transient FileSystem fs;
/**
* Spark context.
*/
private transient JavaSparkContext jsc;
/**
* Spark Session.
*/
private transient SparkSession sparkSession;
/**
* Hive Config.
*/
private transient HiveConf hiveConf;
private BuiltinKeyGenerator keyGenerator;
public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException {
this.cfg = cfg;
this.jsc = jsc;
this.sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating workload generator with configs : {}", props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
this.keyGenerator = (BuiltinKeyGenerator) DataSourceUtils.createKeyGenerator(props);
if (!fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), cfg.targetBasePath,
HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived");
}
}
private static HiveConf getDefaultHiveConf(Configuration cfg) {
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(cfg);
return hiveConf;
}
public static void main(String[] args) throws Exception {
final HoodieTestSuiteConfig cfg = new HoodieTestSuiteConfig();
JCommander cmd = new JCommander(cfg, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
JavaSparkContext jssc = UtilHelpers.buildSparkContext("workload-generator-" + cfg.outputTypeName
+ "-" + cfg.inputFormatName, cfg.sparkMaster);
new HoodieTestSuiteJob(cfg, jssc).runTestSuite();
}
public void runTestSuite() {
try {
WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils
.loadClass((this.cfg).workloadDagGenerator)).build()
: DagUtils.convertYamlPathToDag(this.fs, this.cfg.workloadYamlPath);
log.info("Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag));
long startTime = System.currentTimeMillis();
String schemaStr = schemaProvider.getSourceSchema().toString();
final HoodieTestSuiteWriter writer = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr);
final DeltaGenerator deltaGenerator = new DeltaGenerator(
new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName),
new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath,
schemaStr, cfg.limitFileSize), jsc, sparkSession, schemaStr, keyGenerator);
DagScheduler dagScheduler = new DagScheduler(workflowDag, writer, deltaGenerator);
dagScheduler.schedule();
log.info("Finished scheduling all tasks, Time taken {}", System.currentTimeMillis() - startTime);
} catch (Exception e) {
log.error("Failed to run Test Suite ", e);
throw new HoodieException("Failed to run Test Suite ", e);
} finally {
jsc.stop();
}
}
/**
* The Hudi test suite uses {@link HoodieDeltaStreamer} to run some operations hence extend delta streamer config.
*/
public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {
@Parameter(names = {"--input-base-path"}, description = "base path for input data"
+ "(Will be created if did not exist first time around. If exists, more data will be added to that path)",
required = true)
public String inputBasePath;
@Parameter(names = {
"--workload-generator-classname"}, description = "WorkflowDag of operations to generate the workload",
required = true)
public String workloadDagGenerator = WorkflowDagGenerator.class.getName();
@Parameter(names = {
"--workload-yaml-path"}, description = "Workflow Dag yaml path to generate the workload")
public String workloadYamlPath;
@Parameter(names = {"--delta-output-type"}, description = "Subclass of "
+ "org.apache.hudi.testsuite.workload.DeltaOutputMode to readAvro data.")
public String outputTypeName = DeltaOutputMode.DFS.name();
@Parameter(names = {"--delta-input-format"}, description = "Subclass of "
+ "org.apache.hudi.testsuite.workload.DeltaOutputMode to read avro data.")
public String inputFormatName = DeltaInputType.AVRO.name();
@Parameter(names = {"--input-file-size"}, description = "The min/max size of the input files to generate",
required = true)
public Long limitFileSize = 1024 * 1024 * 120L;
@Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to "
+ "perform"
+ " ingestion. If set to false, HoodieWriteClient will be used")
public Boolean useDeltaStreamer = false;
}
}