blob: f9d016214844c0cb8bbf3eb52926966eabec1ae4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.functional;
import java.util.ConcurrentModificationException;
import java.util.concurrent.ExecutorService;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end.
*/
public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final Random RANDOM = new Random();
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
public static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
public static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = "test-invalid-hive-sync-source1.properties";
public static final String PROPS_INVALID_FILE = "test-invalid-props.properties";
public static final String PROPS_INVALID_TABLE_CONFIG_FILE = "test-invalid-table-config.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
protected static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties";
private static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
private static String PARQUET_SOURCE_ROOT;
private static String JSON_KAFKA_SOURCE_ROOT;
private static final int PARQUET_NUM_RECORDS = 5;
private static final int CSV_NUM_RECORDS = 3;
private static final int JSON_KAFKA_NUM_RECORDS = 5;
// Required fields
private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
private static final String TABLE_TYPE_PARAM = "--table-type";
private static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE";
private static final String TARGET_TABLE_PARAM = "--target-table";
private static final String TARGET_TABLE_VALUE = "test";
private static final String BASE_FILE_FORMAT_PARAM = "--base-file-format";
private static final String BASE_FILE_FORMAT_VALUE = "PARQUET";
private static final String SOURCE_LIMIT_PARAM = "--source-limit";
private static final String SOURCE_LIMIT_VALUE = "500";
private static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync";
private static final String HOODIE_CONF_PARAM = "--hoodie-conf";
private static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table";
private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
public static KafkaTestUtils testUtils;
protected static String topicName;
protected static int testNum = 1;
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true);
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
testUtils = new KafkaTestUtils();
testUtils.setup();
topicName = "topic" + testNum;
// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/config/base.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
dfsBasePath + "/sql-transformer.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc", dfs, dfsBasePath + "/source_short_trip_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc", dfs, dfsBasePath + "/source_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc", dfs, dfsBasePath + "/target_short_trip_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc", dfs, dfsBasePath + "/target_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties", dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties", dfs, dfsBasePath + "/config/uber_config.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties");
TypedProperties props = new TypedProperties();
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb1");
props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), "hive_trips");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
MultiPartKeysValueExtractor.class.getName());
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
// Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to
// downstream hudi table
TypedProperties downstreamProps = new TypedProperties();
downstreamProps.setProperty("include", "base.properties");
downstreamProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
// Source schema is the target schema of upstream table
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc");
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath + "/test-downstream-source.properties");
// Properties used for testing invalid key generator
TypedProperties invalidProps = new TypedProperties();
invalidProps.setProperty("include", "sql-transformer.properties");
invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", "invalid");
invalidProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
TypedProperties props1 = new TypedProperties();
populateAllCommonProps(props1);
UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1);
TypedProperties properties = new TypedProperties();
populateInvalidTableConfigFilePathProps(properties);
UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + "/" + PROPS_INVALID_TABLE_CONFIG_FILE);
TypedProperties invalidHiveSyncProps = new TypedProperties();
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
}
@AfterAll
public static void release() {
if (testUtils != null) {
testUtils.teardown();
}
}
private static void populateInvalidTableConfigFilePathProps(TypedProperties props) {
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties");
}
private static void populateAllCommonProps(TypedProperties props) {
populateCommonProps(props);
populateCommonKafkaProps(props);
populateCommonHiveProps(props);
}
protected static void populateCommonProps(TypedProperties props) {
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties");
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties");
}
protected static void populateCommonKafkaProps(TypedProperties props) {
//Kafka source properties
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, "earliest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
}
protected static void populateCommonHiveProps(TypedProperties props) {
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb2");
props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY(), "false");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
MultiPartKeysValueExtractor.class.getName());
}
protected static TypedProperties prepareMultiWriterProps(String propsFileName) throws IOException {
TypedProperties props = new TypedProperties();
populateAllCommonProps(props);
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
props.setProperty("include", "base.properties");
props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control");
props.setProperty("hoodie.cleaner.policy.failed.writes", "LAZY");
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
props.setProperty("hoodie.write.lock.hivemetastore.database", "testdb1");
props.setProperty("hoodie.write.lock.hivemetastore.table", "table1");
props.setProperty("hoodie.write.lock.zookeeper.url", "127.0.0.1");
props.setProperty("hoodie.write.lock.zookeeper.port", "2828");
props.setProperty("hoodie.write.lock.wait_time_ms", "1200000");
props.setProperty("hoodie.write.lock.num_retries", "10");
props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table");
props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
return props;
}
@AfterAll
public static void cleanupClass() {
UtilitiesTestBase.cleanupClass();
}
@BeforeEach
public void setup() throws Exception {
super.setup();
}
@AfterEach
public void teardown() throws Exception {
super.teardown();
}
static class TestHelpers {
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) {
return makeConfig(basePath, op, Collections.singletonList(DropAllTransformer.class.getName()));
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op) {
return makeConfig(basePath, op, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames) {
return makeConfig(basePath, op, transformerClassNames, PROPS_FILENAME_TEST_SOURCE, false);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames,
String propsFilename, boolean enableHiveSync) {
return makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true,
false, null, null);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames,
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
String payloadClassName, String tableType) {
return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync,
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp");
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName,
List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
cfg.sourceClassName = sourceClassName;
cfg.transformerClassNames = transformerClassNames;
cfg.operation = op;
cfg.enableHiveSync = enableHiveSync;
cfg.sourceOrderingField = sourceOrderingField;
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = sourceLimit;
if (updatePayloadClass) {
cfg.payloadClassName = payloadClassName;
}
if (useSchemaProviderClass) {
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
}
return cfg;
}
static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, WriteOperationType op,
boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips_copy";
cfg.tableType = "COPY_ON_WRITE";
cfg.sourceClassName = HoodieIncrSource.class.getName();
cfg.operation = op;
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/test-downstream-source.properties";
cfg.sourceLimit = 1000;
if (null != schemaProviderClassName) {
cfg.schemaProviderClassName = schemaProviderClassName;
}
List<String> cfgs = new ArrayList<>();
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt);
cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
// No partition
cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
cfg.configs = cfgs;
return cfg;
}
static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) {
long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count();
assertEquals(expected, recordCount);
}
static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
return sqlContext.read().format("org.apache.hudi").load(tablePath).groupBy("_hoodie_commit_time").count()
.sort("_hoodie_commit_time").collectAsList();
}
static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) {
sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
long recordCount =
sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance is not NULL").count();
assertEquals(expected, recordCount);
}
static void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) {
sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
long recordCount =
sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance = 1.0").count();
assertEquals(expected, recordCount);
}
static void assertAtleastNCompactionCommits(int minExpected, String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numCompactionCommits = (int) timeline.getInstants().count();
assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected);
}
static void assertAtleastNDeltaCommits(int minExpected, String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numCompactionCommits = (int) timeline.getInstants().count();
assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected);
}
static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits)
throws IOException {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieInstant lastInstant = timeline.lastInstant().get();
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(lastInstant).get(), HoodieCommitMetadata.class);
assertEquals(totalCommits, timeline.countInstants());
assertEquals(expected, commitMetadata.getMetadata(CHECKPOINT_KEY));
return lastInstant.getTimestamp();
}
static void waitTillCondition(Function<Boolean, Boolean> condition, Future dsFuture, long timeoutInSecs) throws Exception {
Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> {
boolean ret = false;
while (!ret && !dsFuture.isDone()) {
try {
Thread.sleep(3000);
ret = condition.apply(true);
} catch (Throwable error) {
LOG.warn("Got error :", error);
ret = false;
}
}
return true;
});
res.get(timeoutInSecs, TimeUnit.SECONDS);
}
static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
}
@Test
public void testProps() {
TypedProperties props =
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field"));
assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator",
props.getString("hoodie.datasource.write.keygenerator.class"));
}
private static HoodieDeltaStreamer.Config getBaseConfig() {
// Base config with all required fields
HoodieDeltaStreamer.Config base = new HoodieDeltaStreamer.Config();
base.targetBasePath = TGT_BASE_PATH_VALUE;
base.tableType = TABLE_TYPE_VALUE;
base.targetTableName = TARGET_TABLE_VALUE;
return base;
}
private static Stream<Arguments> provideValidCliArgs() {
HoodieDeltaStreamer.Config base = getBaseConfig();
// String parameter
HoodieDeltaStreamer.Config conf1 = getBaseConfig();
conf1.baseFileFormat = BASE_FILE_FORMAT_VALUE;
// Integer parameter
HoodieDeltaStreamer.Config conf2 = getBaseConfig();
conf2.sourceLimit = Long.parseLong(SOURCE_LIMIT_VALUE);
// Boolean Parameter
HoodieDeltaStreamer.Config conf3 = getBaseConfig();
conf3.enableHiveSync = true;
// ArrayList Parameter with 1 value
HoodieDeltaStreamer.Config conf4 = getBaseConfig();
conf4.configs = Arrays.asList(HOODIE_CONF_VALUE1);
// ArrayList Parameter with comma separated values
HoodieDeltaStreamer.Config conf5 = getBaseConfig();
conf5.configs = Arrays.asList(HOODIE_CONF_VALUE2);
// Multiple ArrayList values
HoodieDeltaStreamer.Config conf6 = getBaseConfig();
conf6.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2);
// Super set of all cases
HoodieDeltaStreamer.Config conf = getBaseConfig();
conf.baseFileFormat = BASE_FILE_FORMAT_VALUE;
conf.sourceLimit = Long.parseLong(SOURCE_LIMIT_VALUE);
conf.enableHiveSync = true;
conf.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2);
String[] allConfig = new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, SOURCE_LIMIT_PARAM,
SOURCE_LIMIT_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE, ENABLE_HIVE_SYNC_PARAM, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2};
return Stream.of(
// Base
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE}, base),
// String
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE}, conf1),
// Integer
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
SOURCE_LIMIT_PARAM, SOURCE_LIMIT_VALUE}, conf2),
// Boolean
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
ENABLE_HIVE_SYNC_PARAM}, conf3),
// Array List 1
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1}, conf4),
// Array List with comma
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf5),
// Array list with multiple values
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf6),
// All
Arguments.of(allConfig, conf)
);
}
@ParameterizedTest
@MethodSource("provideValidCliArgs")
public void testValidCommandLineArgs(String[] args, HoodieDeltaStreamer.Config expected) {
assertEquals(expected, HoodieDeltaStreamer.getConfig(args));
}
@Test
public void testKafkaConnectCheckpointProvider() throws IOException {
String tableBasePath = dfsBasePath + "/test_table";
String bootstrapPath = dfsBasePath + "/kafka_topic1";
String partitionPath = bootstrapPath + "/year=2016/month=05/day=01";
String filePath = partitionPath + "/kafka_topic1+0+100+200.parquet";
String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
TypedProperties props =
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath);
cfg.initialCheckpointProvider = checkpointProviderClass;
// create regular kafka connect hdfs dirs
dfs.mkdirs(new Path(bootstrapPath));
dfs.mkdirs(new Path(partitionPath));
// generate parquet files using kafka connect naming convention
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100)), new Path(filePath));
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs, hdfsTestService.getHadoopConf(), Option.ofNullable(props));
assertEquals("kafka_topic1,0:200", deltaStreamer.getConfig().checkpoint);
}
@Test
public void testPropsWithInvalidKeyGenerator() throws Exception {
Exception e = assertThrows(IOException.class, () -> {
String tableBasePath = dfsBasePath + "/test_table";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc);
deltaStreamer.sync();
}, "Should error out when setting the key generator class property to an invalid value");
// expected
LOG.debug("Expected error during getting the key generator", e);
assertTrue(e.getMessage().contains("Could not load key generator class"));
}
@Test
public void testTableCreation() throws Exception {
Exception e = assertThrows(TableNotFoundException.class, () -> {
dfs.mkdirs(new Path(dfsBasePath + "/not_a_table"));
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", WriteOperationType.BULK_INSERT), jsc);
deltaStreamer.sync();
}, "Should error out when pointed out at a dir thats not a table");
// expected
LOG.debug("Expected error during table creation", e);
}
@Test
public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
String tableBasePath = dfsBasePath + "/test_table";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
// No new data => no commits.
cfg.sourceLimit = 0;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
// upsert() #1
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.UPSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
// Perform bootstrap with tableBasePath as source
String bootstrapSourcePath = dfsBasePath + "/src_bootstrapped";
Dataset<Row> sourceDf = sqlContext.read()
.format("org.apache.hudi")
.load(tableBasePath + "/*/*.parquet");
sourceDf.write().format("parquet").save(bootstrapSourcePath);
String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
cfg.runBootstrap = true;
cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath));
cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName()));
cfg.configs.add("hoodie.bootstrap.parallelism=5");
cfg.targetBasePath = newDatasetBasePath;
new HoodieDeltaStreamer(cfg, jsc).sync();
Dataset<Row> res = sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath + "/*.parquet");
LOG.info("Schema :");
res.printSchema();
TestHelpers.assertRecordCount(1950, newDatasetBasePath + "/*.parquet", sqlContext);
res.registerTempTable("bootstrapped");
assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
StructField[] fields = res.schema().fields();
List<String> fieldNames = Arrays.asList(res.schema().fieldNames());
List<String> expectedFieldNames = Arrays.asList(sourceDf.schema().fieldNames());
assertEquals(expectedFieldNames.size(), fields.length);
assertTrue(fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
assertTrue(fieldNames.containsAll(expectedFieldNames));
}
@Test
public void testUpsertsCOWContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
}
@Test
public void testUpsertsMORContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
}
@Test
public void testUpsertsCOWContinuousModeWithMultipleWriters() throws Exception {
testUpsertsContinuousModeWithMultipleWriters(HoodieTableType.COPY_ON_WRITE, "continuous_cow_mulitwriter");
}
@Test
public void testUpsertsMORContinuousModeWithMultipleWriters() throws Exception {
testUpsertsContinuousModeWithMultipleWriters(HoodieTableType.MERGE_ON_READ, "continuous_mor_mulitwriter");
}
@Test
public void testLatestCheckpointCarryOverWithMultipleWriters() throws Exception {
testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType.COPY_ON_WRITE, "continuous_cow_checkpoint");
}
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
String tableBasePath = dfsBasePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = true;
cfg.tableType = tableType.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true;
});
}
private void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType, String tempDir) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
String tableBasePath = dfsBasePath + "/" + tempDir;
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// Prepare base dataset with some commits
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true;
});
// create a backfill job
HoodieDeltaStreamer.Config cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgBackfillJob.continuousMode = false;
cfgBackfillJob.tableType = tableType.name();
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
// re-init ingestion job to start sync service
HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// run ingestion & backfill in parallel, create conflict and fail one
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2,
cfgIngestionJob, backfillJob, cfgBackfillJob, true);
// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.test.source.generate.inserts", "true");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
Arrays.asList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgBackfillJob.continuousMode = false;
cfgBackfillJob.tableType = tableType.name();
meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
// re-init ingestion job
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// re-init backfill job
HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
// run ingestion & backfill in parallel, avoid conflict and succeed both
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3,
cfgIngestionJob, backfillJob2, cfgBackfillJob, false);
}
private void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType, String tempDir) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
String tableBasePath = dfsBasePath + "/" + tempDir;
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// Prepare base dataset with some commits
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true;
});
// create a backfill job with checkpoint from the first instant
HoodieDeltaStreamer.Config cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgBackfillJob.continuousMode = false;
cfgBackfillJob.tableType = tableType.name();
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadataForFirstInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
// get current checkpoint after preparing base dataset with some commits
HoodieCommitMetadata commitMetadataForLastInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
String lastCheckpointBeforeParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
// run the backfill job, enable overriding checkpoint from the latest commit
props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.write.meta.key.prefixes", CHECKPOINT_KEY);
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
// reset checkpoint to first instant to simulate a random checkpoint for backfill job
// checkpoint will move from 00000 to 00001 for this backfill job
cfgBackfillJob.checkpoint = commitMetadataForFirstInstant.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
backfillJob.sync();
// check if the checkpoint is carried over
timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
commitMetadataForLastInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
String lastCheckpointAfterParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
Assertions.assertEquals(lastCheckpointBeforeParallelBackfill, lastCheckpointAfterParallelBackfill);
}
private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String lastSuccessfulCommit = timeline.lastInstant().get().getTimestamp();
// Condition for parallel ingestion job
Function<Boolean, Boolean> conditionForRegularIngestion = (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true;
};
try {
Future regularIngestionJobFuture = service.submit(() -> {
try {
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
Future backfillJobFuture = service.submit(() -> {
try {
backfillJob.sync();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
backfillJobFuture.get();
regularIngestionJobFuture.get();
if (expectConflict) {
Assertions.fail("Failed to handle concurrent writes");
}
} catch (Exception e) {
/**
* Need to perform getMessage().contains since the exception coming
* from {@link org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DeltaSyncService} gets wrapped many times into RuntimeExceptions.
*/
if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) {
// expected ConcurrentModificationException since ingestion & backfill will have overlapping writes
} else {
throw e;
}
}
}
private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
try {
ds.sync();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
});
TestHelpers.waitTillCondition(condition, dsFuture, 240);
ds.shutdownGracefully();
dsFuture.get();
}
@Test
public void testInlineClustering() throws Exception {
String tableBasePath = dfsBasePath + "/inlineClustering";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true"));
cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
LOG.info("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize);
return completeReplaceSize > 0;
});
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
}
private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
String clusteringInstantTime, boolean runSchedule) {
HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
config.basePath = basePath;
config.clusteringInstantTime = clusteringInstantTime;
config.runSchedule = runSchedule;
config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
return config;
}
@Test
public void testHoodieAsyncClusteringJob() throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfg.configs.add(String.format("%s=true", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
null, true);
HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, scheduleClusteringConfig);
Option<String> scheduleClusteringInstantTime = Option.empty();
try {
scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
} catch (Exception e) {
LOG.warn("Schedule clustering failed", e);
return false;
}
if (scheduleClusteringInstantTime.isPresent()) {
LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get());
HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
scheduleClusteringInstantTime.get(), false);
HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
LOG.info("Cluster success");
} else {
LOG.warn("Schedule clustering failed");
}
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
System.out.println("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize);
return completeReplaceSize > 0;
});
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
}
/**
* Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline The first
* step involves using a SQL template to transform a source TEST-DATA-SOURCE ============================> HUDI TABLE
* 1 ===============> HUDI TABLE 2 (incr-pull with transform) (incr-pull) Hudi Table 1 is synced with Hive.
*/
@Test
public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception {
String tableBasePath = dfsBasePath + "/test_table2";
String downstreamTableBasePath = dfsBasePath + "/test_downstream_table2";
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
// Initial bulk insert to ingest to first hudi table
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath + "/*/*.parquet", sqlContext);
String lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
// Now incrementally pull from the above hudi table and ingest to second table
HoodieDeltaStreamer.Config downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT,
true, null);
new HoodieDeltaStreamer(downstreamCfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, dfs, 1);
// No new data => no commits for upstream table
cfg.sourceLimit = 0;
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
// with no change in upstream table, no change in downstream too when pulled.
HoodieDeltaStreamer.Config downstreamCfg1 =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath,
WriteOperationType.BULK_INSERT, true, DummySchemaProvider.class.getName());
new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, dfs, 1);
// upsert() #1 on upstream hudi table
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.UPSERT;
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1950, tableBasePath + "/*/*.parquet", sqlContext);
lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
// Incrementally pull changes in upstream hudi table and apply to downstream table
downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.UPSERT,
false, null);
downstreamCfg.sourceLimit = 2000;
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
TestHelpers.assertRecordCount(2000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(2000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(2000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
String finalInstant =
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, dfs, 2);
counts = TestHelpers.countsPerCommit(downstreamTableBasePath + "/*/*.parquet", sqlContext);
assertEquals(2000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
// Test Hive integration
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist");
assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(lastInstantForUpstreamTable,
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be updated in the TBLPROPERTIES");
}
@Test
public void testNullSchemaProvider() throws Exception {
String tableBasePath = dfsBasePath + "/test_table";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
false, false, null, null);
Exception e = assertThrows(HoodieException.class, () -> {
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
}, "Should error out when schema provider is not provided");
LOG.debug("Expected error during reading data from source ", e);
assertTrue(e.getMessage().contains("Please provide a valid schema provider class!"));
}
@Test
public void testPayloadClassUpdate() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, false, null, "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext);
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
//now assert that hoodie.properties file now has updated payload class name
Properties props = new Properties();
String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration());
try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) {
props.load(inputStream);
}
assertEquals(props.getProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME), DummyAvroPayload.class.getName());
}
@Test
public void testPayloadClassUpdateWithCOWTable() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset_cow";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, false, null, null);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext);
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, true, DummyAvroPayload.class.getName(), null);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
//now assert that hoodie.properties file does not have payload class prop since it is a COW table
Properties props = new Properties();
String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration());
try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) {
props.load(inputStream);
}
assertFalse(props.containsKey(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME));
}
@Test
public void testFilterDupes() throws Exception {
String tableBasePath = dfsBasePath + "/test_dupes_table";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
// Generate the same 1000 records + 1000 new ones for upsert
cfg.filterDupes = true;
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.INSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
// 1000 records for commit 00000 & 1000 for commit 00001
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
assertEquals(1000, counts.get(0).getLong(1));
assertEquals(1000, counts.get(1).getLong(1));
// Test with empty commits
HoodieTableMetaClient mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
cfg2.filterDupes = false;
cfg2.sourceLimit = 2000;
cfg2.operation = WriteOperationType.UPSERT;
cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
ds2.sync();
mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
HoodieInstant newLastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), HoodieTimeline.GREATER_THAN, lastFinished.getTimestamp()
));
// Ensure it is empty
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class);
System.out.println("New Commit Metadata=" + commitMetadata);
assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());
// Try UPSERT with filterDupes true. Expect exception
cfg2.filterDupes = true;
cfg2.operation = WriteOperationType.UPSERT;
try {
new HoodieDeltaStreamer(cfg2, jsc).sync();
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
}
}
@Test
public void testDistributedTestDataSource() {
TypedProperties props = new TypedProperties();
props.setProperty(SourceConfigs.MAX_UNIQUE_RECORDS_PROP, "1000");
props.setProperty(SourceConfigs.NUM_SOURCE_PARTITIONS_PROP, "1");
props.setProperty(SourceConfigs.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true");
DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props, jsc, sparkSession, null);
InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000);
batch.getBatch().get().cache();
long c = batch.getBatch().get().count();
assertEquals(1000, c);
}
private static void prepareParquetDFSFiles(int numRecords) throws IOException {
prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT);
}
protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath) throws IOException {
prepareParquetDFSFiles(numRecords, baseParquetPath, FIRST_PARQUET_FILE_NAME, false, null, null);
}
protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema,
String schemaStr, Schema schema) throws IOException {
String path = baseParquetPath + "/" + fileName;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
if (useCustomSchema) {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr),
schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
} else {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInserts("000", numRecords)), new Path(path));
}
}
private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) throws IOException {
if (createTopic) {
try {
testUtils.createTopic(topicName, 2);
} catch (TopicExistsException e) {
// no op
}
}
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA)));
}
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false);
}
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();
if (addCommonProps) {
populateCommonProps(parquetProps);
}
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.embed.timeline.server","false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile);
if (hasTransformer) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + targetSchemaFile);
}
}
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot);
UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName);
}
private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null);
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
testNum++;
}
private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
populateAllCommonProps(props);
props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server","false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName);
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, autoResetValue);
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
}
/**
* Tests Deltastreamer with parquet dfs source and transitions to JsonKafkaSource.
* @param autoResetToLatest true if auto reset value to be set to LATEST. false to leave it as default(i.e. EARLIEST)
* @throws Exception
*/
private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoResetToLatest) throws Exception {
// prep parquet source
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
int parquetRecords = 10;
prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
prepareParquetDFSSource(true, false,"source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false);
// delta streamer w/ parquest source
String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext);
deltaStreamer.shutdownGracefully();
// prep json kafka source
topicName = "topic" + testNum;
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, autoResetToLatest ? "latest" : "earliest", topicName);
// delta streamer w/ json kafka source
deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
// if auto reset value is set to LATEST, this all kafka records so far may not be synced.
int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS);
TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext);
// verify 2nd batch to test LATEST auto reset value.
prepareJsonKafkaDFSFiles(20, false, topicName);
totalExpectedRecords += 20;
deltaStreamer.sync();
TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext);
testNum++;
}
@Test
public void testJsonKafkaDFSSource() throws Exception {
topicName = "topic" + testNum;
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",topicName);
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
int totalRecords = JSON_KAFKA_NUM_RECORDS;
int records = 10;
totalRecords += records;
prepareJsonKafkaDFSFiles(records, false, topicName);
deltaStreamer.sync();
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
}
@Test
public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception {
testDeltaStreamerTransitionFromParquetToKafkaSource(false);
}
@Test
public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception {
testDeltaStreamerTransitionFromParquetToKafkaSource(true);
}
@Test
public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
testParquetDFSSource(false, null);
}
@Test
public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
@Test
public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception {
testParquetDFSSource(true, null);
}
@Test
public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
private void prepareCsvDFSSource(
boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException {
String sourceRoot = dfsBasePath + "/csvFiles";
String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0";
// Properties used for testing delta-streamer with CSV source
TypedProperties csvProps = new TypedProperties();
csvProps.setProperty("include", "base.properties");
csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField);
csvProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source-flattened.avsc");
if (hasTransformer) {
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target-flattened.avsc");
}
}
csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
if (sep != ',') {
if (sep == '\t') {
csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
} else {
csvProps.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(sep));
}
}
if (hasHeader) {
csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader));
}
UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV);
String path = sourceRoot + "/1.csv";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
UtilitiesTestBase.Helpers.saveCsvToDFS(
hasHeader, sep,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", CSV_NUM_RECORDS, true)),
dfs, path);
}
private void testCsvDFSSource(
boolean hasHeader, char sep, boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassNames != null);
String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(
tableBasePath, WriteOperationType.INSERT, CsvDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
testNum++;
}
@Test
public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
// The CSV files have header, the columns are separated by ',', the default separator
// No schema provider is specified, no transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files
testCsvDFSSource(true, ',', false, null);
}
@Test
public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws Exception {
// The CSV files have header, the columns are separated by '\t',
// which is passed in through the Hudi CSV properties
// No schema provider is specified, no transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files
testCsvDFSSource(true, '\t', false, null);
}
@Test
public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws Exception {
// The CSV files have header, the columns are separated by '\t'
// File schema provider is used, no transformer is applied
// In this case, the source schema comes from the source Avro schema file
testCsvDFSSource(true, '\t', true, null);
}
@Test
public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() throws Exception {
// The CSV files have header, the columns are separated by '\t'
// No schema provider is specified, transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files.
// Target schema is determined based on the Dataframe after transformation
testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
@Test
public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception {
// The CSV files have header, the columns are separated by '\t'
// File schema provider is used, transformer is applied
// In this case, the source and target schema come from the Avro schema files
testCsvDFSSource(true, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
@Test
public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
// The CSV files do not have header, the columns are separated by '\t',
// which is passed in through the Hudi CSV properties
// No schema provider is specified, no transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files
// No CSV header and no schema provider at the same time are not recommended
// as the column names are not informative
testCsvDFSSource(false, '\t', false, null);
}
@Test
public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer() throws Exception {
// The CSV files do not have header, the columns are separated by '\t'
// File schema provider is used, no transformer is applied
// In this case, the source schema comes from the source Avro schema file
testCsvDFSSource(false, '\t', true, null);
}
@Test
public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws Exception {
// The CSV files do not have header, the columns are separated by '\t'
// No schema provider is specified, transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files.
// Target schema is determined based on the Dataframe after transformation
// No CSV header and no schema provider at the same time are not recommended,
// as the transformer behavior may be unexpected
Exception e = assertThrows(AnalysisException.class, () -> {
testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}, "Should error out when doing the transformation.");
LOG.debug("Expected error during transformation", e);
assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
}
@Test
public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
// The CSV files do not have header, the columns are separated by '\t'
// File schema provider is used, transformer is applied
// In this case, the source and target schema come from the Avro schema files
testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
/**
* UDF to calculate Haversine distance.
*/
public static class DistanceUDF implements UDF4<Double, Double, Double, Double, Double> {
/**
* Returns some random number as distance between the points.
*
* @param lat1 Latitiude of source
* @param lat2 Latitude of destination
* @param lon1 Longitude of source
* @param lon2 Longitude of destination
*/
@Override
public Double call(Double lat1, Double lat2, Double lon1, Double lon2) {
return RANDOM.nextDouble();
}
}
/**
* Adds a new field "haversine_distance" to the row.
*/
public static class TripsWithDistanceTransformer implements Transformer {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
rowDataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType);
return rowDataset.withColumn("haversine_distance", functions.callUDF("distance_udf", functions.col("begin_lat"),
functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")));
}
}
public static class TestGenerator extends SimpleKeyGenerator {
public TestGenerator(TypedProperties props) {
super(props);
}
}
public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload {
public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) {
super(gr, orderingVal);
}
}
/**
* Return empty table.
*/
public static class DropAllTransformer implements Transformer {
@Override
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
System.out.println("DropAllTransformer called !!");
return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
}
}
public static class TestIdentityTransformer implements Transformer {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
return rowDataset;
}
}
}