blob: bbcd375e5f7f30fb99d84a08c3c90d2330be0e04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.configuration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Configuration to hold the delta output type and delta input format.
*/
public class DeltaConfig implements Serializable {
private final DeltaOutputMode deltaOutputMode;
private final DeltaInputType deltaInputType;
private final SerializableConfiguration configuration;
public DeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType,
SerializableConfiguration configuration) {
this.deltaOutputMode = deltaOutputMode;
this.deltaInputType = deltaInputType;
this.configuration = configuration;
}
public DeltaOutputMode getDeltaOutputMode() {
return deltaOutputMode;
}
public DeltaInputType getDeltaInputType() {
return deltaInputType;
}
public Configuration getConfiguration() {
return configuration.get();
}
/**
* Represents any kind of workload operation for new data. Each workload also contains a set of Option sequence of actions that can be executed in parallel.
*/
public static class Config {
public static final String CONFIG_NAME = "config";
public static final String TYPE = "type";
public static final String NODE_NAME = "name";
public static final String DEPENDENCIES = "deps";
public static final String NO_DEPENDENCY_VALUE = "none";
public static final String CHILDREN = "children";
public static final String HIVE_QUERIES = "hive_queries";
public static final String HIVE_PROPERTIES = "hive_props";
public static final String PRESTO_QUERIES = "presto_queries";
public static final String PRESTO_PROPERTIES = "presto_props";
public static final String TRINO_QUERIES = "trino_queries";
public static final String TRINO_PROPERTIES = "trino_props";
private static String NUM_RECORDS_INSERT = "num_records_insert";
private static String NUM_RECORDS_UPSERT = "num_records_upsert";
private static String NUM_RECORDS_DELETE = "num_records_delete";
private static String REPEAT_COUNT = "repeat_count";
private static String RECORD_SIZE = "record_size";
private static String NUM_PARTITIONS_INSERT = "num_partitions_insert";
private static String NUM_PARTITIONS_UPSERT = "num_partitions_upsert";
private static String NUM_PARTITIONS_DELETE = "num_partitions_delete";
private static String NUM_FILES_UPSERT = "num_files_upsert";
private static String FRACTION_UPSERT_PER_FILE = "fraction_upsert_per_file";
private static String DISABLE_GENERATE = "disable_generate";
private static String DISABLE_INGEST = "disable_ingest";
private static String HIVE_LOCAL = "hive_local";
private static String REINIT_CONTEXT = "reinitialize_context";
private static String START_PARTITION = "start_partition";
private static String DELETE_INPUT_DATA = "delete_input_data";
private static String VALIDATE_HIVE = "validate_hive";
private static String VALIDATE_ONCE_EVERY_ITR = "validate_once_every_itr";
private static String EXECUTE_ITR_COUNT = "execute_itr_count";
private static String VALIDATE_ARCHIVAL = "validate_archival";
private static String VALIDATE_CLEAN = "validate_clean";
private static String SCHEMA_VERSION = "schema_version";
private static String NUM_ROLLBACKS = "num_rollbacks";
private static String ENABLE_ROW_WRITING = "enable_row_writing";
private static String ENABLE_METADATA_VALIDATE = "enable_metadata_validate";
private static String VALIDATE_FULL_DATA = "validate_full_data";
private static String DELETE_INPUT_DATA_EXCEPT_LATEST = "delete_input_data_except_latest";
private static String PARTITIONS_TO_DELETE = "partitions_to_delete";
private static String INPUT_PARTITIONS_TO_SKIP_VALIDATE = "input_partitions_to_skip_validate";
private static String MAX_WAIT_TIME_FOR_DELTASTREAMER_TO_CATCH_UP_MS = "max_wait_time_for_deltastreamer_catch_up_ms";
// Spark SQL Create Table
private static String TABLE_TYPE = "table_type";
private static String IS_EXTERNAL = "is_external";
private static String USE_CTAS = "use_ctas";
private static String PRIMARY_KEY = "primary_key";
private static String PRE_COMBINE_FIELD = "pre_combine_field";
private static String PARTITION_FIELD = "partition_field";
// Spark SQL Merge
private static String MERGE_CONDITION = "merge_condition";
private static String DEFAULT_MERGE_CONDITION = "target._row_key = source._row_key";
private static String MERGE_MATCHED_ACTION = "matched_action";
private static String DEFAULT_MERGE_MATCHED_ACTION = "update set *";
private static String MERGE_NOT_MATCHED_ACTION = "not_matched_action";
private static String DEFAULT_MERGE_NOT_MATCHED_ACTION = "insert *";
// Spark SQL Update
// column to update. The logic is fixed, i.e., to do "fare = fare * 1.6". to be fixed.
private static String UPDATE_COLUMN = "update_column";
private static String DEFAULT_UPDATE_COLUMN = "fare";
private static String WHERE_CONDITION_COLUMN = "condition_column";
// the where condition expression is like "begin_lon between 0.1 and 0.2"
// the value range is determined by the ratio of records to update or delete
// only support numeric type column for now
private static String DEFAULT_WHERE_CONDITION_COLUMN = "begin_lon";
// the ratio range is between 0.01 and 1.0. The ratio is approximate to the actual ratio achieved
private static String RATIO_RECORDS_CHANGE = "ratio_records_change";
private static double DEFAULT_RATIO_RECORDS_CHANGE = 0.5;
private Map<String, Object> configsMap;
public Config(Map<String, Object> configsMap) {
this.configsMap = configsMap;
}
public static Builder newBuilder() {
return new Builder();
}
public long getNumRecordsInsert() {
return Long.valueOf(configsMap.getOrDefault(NUM_RECORDS_INSERT, 0).toString());
}
public long getNumRecordsUpsert() {
return Long.valueOf(configsMap.getOrDefault(NUM_RECORDS_UPSERT, 0).toString());
}
public long getNumRecordsDelete() {
return Long.valueOf(configsMap.getOrDefault(NUM_RECORDS_DELETE, 0).toString());
}
public int getRecordSize() {
return Integer.valueOf(configsMap.getOrDefault(RECORD_SIZE, 1024).toString());
}
public boolean isEnableMetadataValidate() {
return Boolean.valueOf(configsMap.getOrDefault(ENABLE_METADATA_VALIDATE, false).toString());
}
public int getNumInsertPartitions() {
return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_INSERT, 1).toString());
}
public int getRepeatCount() {
return Integer.valueOf(configsMap.getOrDefault(REPEAT_COUNT, 1).toString());
}
public int getNumUpsertPartitions() {
return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_UPSERT, 0).toString());
}
public int getSchemaVersion() {
return Integer.valueOf(configsMap.getOrDefault(SCHEMA_VERSION, Integer.MAX_VALUE).toString());
}
public int getNumRollbacks() {
return Integer.valueOf(configsMap.getOrDefault(NUM_ROLLBACKS, 1).toString());
}
public int getStartPartition() {
return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString());
}
public int getNumDeletePartitions() {
return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_DELETE, 1).toString());
}
public int getNumUpsertFiles() {
return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 1).toString());
}
public double getFractionUpsertPerFile() {
return Double.valueOf(configsMap.getOrDefault(FRACTION_UPSERT_PER_FILE, 0.0).toString());
}
public boolean isDisableGenerate() {
return Boolean.valueOf(configsMap.getOrDefault(DISABLE_GENERATE, false).toString());
}
public boolean isDisableIngest() {
return Boolean.valueOf(configsMap.getOrDefault(DISABLE_INGEST, false).toString());
}
public String getPartitionsToDelete() {
return configsMap.getOrDefault(PARTITIONS_TO_DELETE, "").toString();
}
public boolean getReinitContext() {
return Boolean.valueOf(configsMap.getOrDefault(REINIT_CONTEXT, false).toString());
}
public boolean isDeleteInputData() {
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, false).toString());
}
public boolean isDeleteInputDataExceptLatest() {
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA_EXCEPT_LATEST, false).toString());
}
public boolean isValidateHive() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString());
}
public int validateOnceEveryIteration() {
return Integer.valueOf(configsMap.getOrDefault(VALIDATE_ONCE_EVERY_ITR, 1).toString());
}
public String inputPartitionsToSkipWithValidate() {
return configsMap.getOrDefault(INPUT_PARTITIONS_TO_SKIP_VALIDATE, "").toString();
}
public boolean isValidateFullData() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, false).toString());
}
public int getIterationCountToExecute() {
return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT, -1).toString());
}
public boolean validateArchival() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_ARCHIVAL, false).toString());
}
public boolean validateClean() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_CLEAN, false).toString());
}
public boolean enableRowWriting() {
return Boolean.valueOf(configsMap.getOrDefault(ENABLE_ROW_WRITING, false).toString());
}
public long maxWaitTimeForDeltastreamerToCatchupMs() {
return Long.valueOf(configsMap.getOrDefault(MAX_WAIT_TIME_FOR_DELTASTREAMER_TO_CATCH_UP_MS, 5 * 60 * 1000).toString());
}
public Option<String> getTableType() {
return !configsMap.containsKey(TABLE_TYPE) ? Option.empty()
: Option.of(configsMap.get(TABLE_TYPE).toString());
}
public boolean shouldUseCtas() {
return Boolean.valueOf(configsMap.getOrDefault(USE_CTAS, false).toString());
}
public boolean isTableExternal() {
return Boolean.valueOf(configsMap.getOrDefault(IS_EXTERNAL, false).toString());
}
public Option<String> getPrimaryKey() {
return !configsMap.containsKey(PRIMARY_KEY) ? Option.empty()
: Option.of(configsMap.get(PRIMARY_KEY).toString());
}
public Option<String> getPreCombineField() {
return !configsMap.containsKey(PRE_COMBINE_FIELD) ? Option.empty()
: Option.of(configsMap.get(PRE_COMBINE_FIELD).toString());
}
public Option<String> getPartitionField() {
return !configsMap.containsKey(PARTITION_FIELD) ? Option.empty()
: Option.of(configsMap.get(PARTITION_FIELD).toString());
}
public String getMergeCondition() {
return configsMap.getOrDefault(MERGE_CONDITION, DEFAULT_MERGE_CONDITION).toString();
}
public String getMatchedAction() {
return configsMap.getOrDefault(MERGE_MATCHED_ACTION, DEFAULT_MERGE_MATCHED_ACTION).toString();
}
public String getNotMatchedAction() {
return configsMap.getOrDefault(MERGE_NOT_MATCHED_ACTION, DEFAULT_MERGE_NOT_MATCHED_ACTION).toString();
}
public String getUpdateColumn() {
return configsMap.getOrDefault(UPDATE_COLUMN, DEFAULT_UPDATE_COLUMN).toString();
}
public String getWhereConditionColumn() {
return configsMap.getOrDefault(WHERE_CONDITION_COLUMN, DEFAULT_WHERE_CONDITION_COLUMN).toString();
}
public double getRatioRecordsChange() {
return Double.valueOf(configsMap.getOrDefault(RATIO_RECORDS_CHANGE, DEFAULT_RATIO_RECORDS_CHANGE).toString());
}
public Map<String, Object> getOtherConfigs() {
if (configsMap == null) {
return new HashMap<>();
}
return configsMap;
}
public List<Pair<String, Integer>> getHiveQueries() {
try {
return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(HIVE_QUERIES, new ArrayList<>());
} catch (Exception e) {
throw new RuntimeException("unable to get hive queries from configs");
}
}
public boolean isHiveLocal() {
return Boolean.valueOf(configsMap.getOrDefault(HIVE_LOCAL, false).toString());
}
public List<String> getHiveProperties() {
return (List<String>) this.configsMap.getOrDefault(HIVE_PROPERTIES, new ArrayList<>());
}
public List<String> getPrestoProperties() {
return (List<String>) this.configsMap.getOrDefault(PRESTO_PROPERTIES, new ArrayList<>());
}
public List<String> getTrinoProperties() {
return (List<String>) this.configsMap.getOrDefault(TRINO_PROPERTIES, new ArrayList<>());
}
public List<Pair<String, Integer>> getPrestoQueries() {
try {
return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(PRESTO_QUERIES, new ArrayList<>());
} catch (Exception e) {
throw new RuntimeException("unable to get presto queries from configs");
}
}
public List<Pair<String, Integer>> getTrinoQueries() {
try {
return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(TRINO_QUERIES, new ArrayList<>());
} catch (Exception e) {
throw new RuntimeException("unable to get trino queries from configs");
}
}
@Override
public String toString() {
try {
return new ObjectMapper().writeValueAsString(this.configsMap);
} catch (Exception e) {
throw new RuntimeException("unable to generate string representation of config");
}
}
public static class Builder {
private Map<String, Object> configsMap = new HashMap<>();
public Builder() {
}
public Builder withNumRecordsToInsert(long numRecordsInsert) {
this.configsMap.put(NUM_RECORDS_INSERT, numRecordsInsert);
return this;
}
public Builder withNumRecordsToUpdate(long numRecordsUpsert) {
this.configsMap.put(NUM_RECORDS_UPSERT, numRecordsUpsert);
return this;
}
public Builder withNumRecordsToDelete(long numRecordsDelete) {
this.configsMap.put(NUM_RECORDS_DELETE, numRecordsDelete);
return this;
}
public Builder withNumInsertPartitions(int numInsertPartitions) {
this.configsMap.put(NUM_PARTITIONS_INSERT, numInsertPartitions);
return this;
}
public Builder withNumUpsertPartitions(int numUpsertPartitions) {
this.configsMap.put(NUM_PARTITIONS_UPSERT, numUpsertPartitions);
return this;
}
public Builder withNumDeletePartitions(int numDeletePartitions) {
this.configsMap.put(NUM_PARTITIONS_DELETE, numDeletePartitions);
return this;
}
public Builder withSchemaVersion(int version) {
this.configsMap.put(SCHEMA_VERSION, version);
return this;
}
public Builder withNumRollbacks(int numRollbacks) {
this.configsMap.put(NUM_ROLLBACKS, numRollbacks);
return this;
}
public Builder withNumUpsertFiles(int numUpsertFiles) {
this.configsMap.put(NUM_FILES_UPSERT, numUpsertFiles);
return this;
}
public Builder withFractionUpsertPerFile(double fractionUpsertPerFile) {
this.configsMap.put(FRACTION_UPSERT_PER_FILE, fractionUpsertPerFile);
return this;
}
public Builder withStartPartition(int startPartition) {
this.configsMap.put(START_PARTITION, startPartition);
return this;
}
public Builder withNumTimesToRepeat(int repeatCount) {
this.configsMap.put(REPEAT_COUNT, repeatCount);
return this;
}
public Builder withRecordSize(int recordSize) {
this.configsMap.put(RECORD_SIZE, recordSize);
return this;
}
public Builder disableGenerate(boolean generate) {
this.configsMap.put(DISABLE_GENERATE, generate);
return this;
}
public Builder disableIngest(boolean ingest) {
this.configsMap.put(DISABLE_INGEST, ingest);
return this;
}
public Builder reinitializeContext(boolean reinitContext) {
this.configsMap.put(REINIT_CONTEXT, reinitContext);
return this;
}
public Builder withConfig(String name, Object value) {
this.configsMap.put(name, value);
return this;
}
public Builder withHiveQueryAndResults(List<Pair<String, Integer>> hiveQueries) {
this.configsMap.put(HIVE_QUERIES, hiveQueries);
return this;
}
public Builder withHiveLocal(boolean startHiveLocal) {
this.configsMap.put(HIVE_LOCAL, startHiveLocal);
return this;
}
public Builder withHiveProperties(List<String> hiveProperties) {
this.configsMap.put(HIVE_PROPERTIES, hiveProperties);
return this;
}
public Builder withPrestoProperties(List<String> prestoProperties) {
this.configsMap.put(PRESTO_PROPERTIES, prestoProperties);
return this;
}
public Builder withTrinoProperties(List<String> trinoProperties) {
this.configsMap.put(TRINO_PROPERTIES, trinoProperties);
return this;
}
public Builder withPrestoQueryAndResults(List<Pair<String, Integer>> prestoQueries) {
this.configsMap.put(PRESTO_QUERIES, prestoQueries);
return this;
}
public Builder withTrinoQueryAndResults(List<Pair<String, Integer>> trinoQueries) {
this.configsMap.put(TRINO_QUERIES, trinoQueries);
return this;
}
public Builder withConfigsMap(Map<String, Object> configsMap) {
this.configsMap = configsMap;
return this;
}
public Builder withName(String name) {
this.configsMap.put(CONFIG_NAME, name);
return this;
}
public Config build() {
return new Config(configsMap);
}
}
}
}