blob: 34b050c92c9e126184436dcdf8410e2c53ed6bce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.configuration;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Hoodie Flink config options.
*
* <p>It has the options for Hoodie table read and write. It also defines some utilities.
*/
@ConfigClassProperty(name = "Flink Options",
groupName = ConfigGroups.Names.FLINK_SQL,
description = "Flink jobs using the SQL can be configured through the options in WITH clause."
+ " The actual datasource level configs are listed below.")
public class FlinkOptions extends HoodieConfig {
private FlinkOptions() {
}
// ------------------------------------------------------------------------
// Base Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> PATH = ConfigOptions
.key("path")
.stringType()
.noDefaultValue()
.withDescription("Base path for the target hoodie table.\n"
+ "The path would be created if it does not exist,\n"
+ "otherwise a Hoodie table expects to be initialized successfully");
// ------------------------------------------------------------------------
// Common Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
.key("partition.default_name")
.stringType()
.defaultValue("__DEFAULT_PARTITION__")
.withDescription("The default partition name in case the dynamic partition"
+ " column value is null/empty string");
// ------------------------------------------------------------------------
// Metadata table Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> METADATA_ENABLED = ConfigOptions
.key("metadata.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enable the internal metadata table which serves table metadata like level file listings, default false");
public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions
.key("metadata.compaction.delta_commits")
.intType()
.defaultValue(24)
.withDescription("Max delta commits for metadata table to trigger compaction, default 24");
// ------------------------------------------------------------------------
// Index Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> INDEX_BOOTSTRAP_ENABLED = ConfigOptions
.key("index.bootstrap.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to bootstrap the index state from existing hoodie table, default false");
public static final ConfigOption<Double> INDEX_STATE_TTL = ConfigOptions
.key("index.state.ttl")
.doubleType()
.defaultValue(1.5D)
.withDescription("Index state ttl in days, default 1.5 day");
public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = ConfigOptions
.key("index.global.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to update index for the old partition path\n"
+ "if same key record with different partition path came in, default false");
public static final ConfigOption<String> INDEX_PARTITION_REGEX = ConfigOptions
.key("index.partition.regex")
.stringType()
.defaultValue(".*")
.withDescription("Whether to load partitions in state if partition path matching, default *");
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
.key("read.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual read, default is 4");
public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = ConfigOptions
.key("source.avro-schema.path")
.stringType()
.noDefaultValue()
.withDescription("Source avro schema file path, the parsed schema is used for deserialization");
public static final ConfigOption<String> SOURCE_AVRO_SCHEMA = ConfigOptions
.key("source.avro-schema")
.stringType()
.noDefaultValue()
.withDescription("Source avro schema string, the parsed schema is used for deserialization");
public static final String QUERY_TYPE_SNAPSHOT = "snapshot";
public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized";
public static final String QUERY_TYPE_INCREMENTAL = "incremental";
public static final ConfigOption<String> QUERY_TYPE = ConfigOptions
.key("hoodie.datasource.query.type")
.stringType()
.defaultValue(QUERY_TYPE_SNAPSHOT)
.withDescription("Decides how data files need to be read, in\n"
+ "1) Snapshot mode (obtain latest view, based on row & columnar data);\n"
+ "2) incremental mode (new data since an instantTime);\n"
+ "3) Read Optimized mode (obtain latest view, based on columnar data)\n."
+ "Default: snapshot");
public static final String REALTIME_SKIP_MERGE = "skip_merge";
public static final String REALTIME_PAYLOAD_COMBINE = "payload_combine";
public static final ConfigOption<String> MERGE_TYPE = ConfigOptions
.key("hoodie.datasource.merge.type")
.stringType()
.defaultValue(REALTIME_PAYLOAD_COMBINE)
.withDescription("For Snapshot query on merge on read table. Use this key to define how the payloads are merged, in\n"
+ "1) skip_merge: read the base file records plus the log file records;\n"
+ "2) payload_combine: read the base file records first, for each record in base file, checks whether the key is in the\n"
+ " log file records(combines the two records with same key for base and log file records), then read the left log file records");
public static final ConfigOption<Boolean> UTC_TIMEZONE = ConfigOptions
.key("read.utc-timezone")
.booleanType()
.defaultValue(true)
.withDescription("Use UTC timezone or local timezone to the conversion between epoch"
+ " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
+ " use UTC timezone, by default true");
public static final ConfigOption<Boolean> READ_AS_STREAMING = ConfigOptions
.key("read.streaming.enabled")
.booleanType()
.defaultValue(false)// default read as batch
.withDescription("Whether to read as streaming source, default false");
public static final ConfigOption<Integer> READ_STREAMING_CHECK_INTERVAL = ConfigOptions
.key("read.streaming.check-interval")
.intType()
.defaultValue(60)// default 1 minute
.withDescription("Check interval for streaming read of SECOND, default 1 minute");
public static final ConfigOption<String> READ_STREAMING_START_COMMIT = ConfigOptions
.key("read.streaming.start-commit")
.stringType()
.noDefaultValue()
.withDescription("Start commit instant for streaming read, the commit time format should be 'yyyyMMddHHmmss', "
+ "by default reading from the latest instant");
// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
.key(HoodieWriteConfig.TABLE_NAME.key())
.stringType()
.noDefaultValue()
.withDescription("Table name to register to Hive metastore");
public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name();
public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name();
public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
.key("table.type")
.stringType()
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
public static final ConfigOption<Boolean> INSERT_ALLOW_DUP = ConfigOptions
.key("write.insert.allow_dup")
.booleanType()
.defaultValue(true)
.withDescription("Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly, default true");
public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
.stringType()
.defaultValue("upsert")
.withDescription("The write operation, that this write should do");
public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
.key("write.precombine.field")
.stringType()
.defaultValue("ts")
.withDescription("Field used in preCombining before actual write. When two records have the same\n"
+ "key value, we will pick the one with the largest value for the precombine field,\n"
+ "determined by Object.compareTo(..)");
public static final ConfigOption<String> PAYLOAD_CLASS = ConfigOptions
.key("write.payload.class")
.stringType()
.defaultValue(OverwriteWithLatestAvroPayload.class.getName())
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");
/**
* Flag to indicate whether to drop duplicates upon insert.
* By default insert will accept duplicates, to gain extra performance.
*/
public static final ConfigOption<Boolean> INSERT_DROP_DUPS = ConfigOptions
.key("write.insert.drop.duplicates")
.booleanType()
.defaultValue(false)
.withDescription("Flag to indicate whether to drop duplicates upon insert.\n"
+ "By default insert will accept duplicates, to gain extra performance");
public static final ConfigOption<Integer> RETRY_TIMES = ConfigOptions
.key("write.retry.times")
.intType()
.defaultValue(3)
.withDescription("Flag to indicate how many times streaming job should retry for a failed checkpoint batch.\n"
+ "By default 3");
public static final ConfigOption<Long> RETRY_INTERVAL_MS = ConfigOptions
.key("write.retry.interval.ms")
.longType()
.defaultValue(2000L)
.withDescription("Flag to indicate how long (by millisecond) before a retry should issued for failed checkpoint batch.\n"
+ "By default 2000 and it will be doubled by every retry");
public static final ConfigOption<Boolean> IGNORE_FAILED = ConfigOptions
.key("write.ignore.failed")
.booleanType()
.defaultValue(true)
.withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n"
+ "By default true (in favor of streaming progressing over data integrity)");
public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
.key(KeyGeneratorOptions.RECORDKEY_FIELD.key())
.stringType()
.defaultValue("uuid")
.withDescription("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");
public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
.key(KeyGeneratorOptions.PARTITIONPATH_FIELD.key())
.stringType()
.defaultValue("")
.withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n"
+ "Actual value obtained by invoking .toString(), default ''");
public static final ConfigOption<Boolean> URL_ENCODE_PARTITIONING = ConfigOptions
.key(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key())
.booleanType()
.defaultValue(false)
.withDescription("Whether to encode the partition path url, default false");
public static final ConfigOption<Boolean> HIVE_STYLE_PARTITIONING = ConfigOptions
.key(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING.key())
.booleanType()
.defaultValue(false)
.withDescription("Whether to use Hive style partitioning.\n"
+ "If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\n"
+ "By default false (the names of partition folders are only partition values)");
public static final ConfigOption<String> KEYGEN_CLASS = ConfigOptions
.key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key())
.stringType()
.defaultValue("")
.withDescription("Key generator class, that implements will extract the key out of incoming record");
public static final ConfigOption<String> KEYGEN_TYPE = ConfigOptions
.key(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key())
.stringType()
.defaultValue(KeyGeneratorType.SIMPLE.name())
.withDescription("Key generator type, that implements will extract the key out of incoming record");
public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions
.key("write.index_bootstrap.tasks")
.intType()
.noDefaultValue()
.withDescription("Parallelism of tasks that do index bootstrap, default is 4");
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
.key("write.bucket_assign.tasks")
.intType()
.noDefaultValue()
.withDescription("Parallelism of tasks that do bucket assign, default is 4");
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
.key("write.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual write, default is 4");
public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions
.key("write.task.max.size")
.doubleType()
.defaultValue(1024D) // 1GB
.withDescription("Maximum memory in MB for a write task, when the threshold hits,\n"
+ "it flushes the max size data bucket to avoid OOM, default 1GB");
public static final ConfigOption<Long> WRITE_RATE_LIMIT = ConfigOptions
.key("write.rate.limit")
.longType()
.defaultValue(0L) // default no limit
.withDescription("Write record rate limit per second to prevent traffic jitter and improve stability, default 0 (no limit)");
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
.key("write.batch.size")
.doubleType()
.defaultValue(64D) // 64MB
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB");
public static final ConfigOption<Integer> WRITE_LOG_BLOCK_SIZE = ConfigOptions
.key("write.log_block.size")
.intType()
.defaultValue(128)
.withDescription("Max log block size in MB for log file, default 128MB");
public static final ConfigOption<Integer> WRITE_LOG_MAX_SIZE = ConfigOptions
.key("write.log.max.size")
.intType()
.defaultValue(1024)
.withDescription("Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB");
public static final ConfigOption<Integer> WRITE_MERGE_MAX_MEMORY = ConfigOptions
.key("write.merge.max_memory")
.intType()
.defaultValue(100) // default 100 MB
.withDescription("Max memory in MB for merge, default 100MB");
// this is only for internal use
public static final ConfigOption<Long> WRITE_COMMIT_ACK_TIMEOUT = ConfigOptions
.key("write.commit.ack.timeout")
.longType()
.defaultValue(-1L) // default at least once
.withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n"
+ "waits for the instant commit success, only for internal use");
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION = ConfigOptions
.key("write.bulk_insert.shuffle_by_partition")
.booleanType()
.defaultValue(true)
.withDescription("Whether to shuffle the inputs by partition path for bulk insert tasks, default true");
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_BY_PARTITION = ConfigOptions
.key("write.bulk_insert.sort_by_partition")
.booleanType()
.defaultValue(true)
.withDescription("Whether to sort the inputs by partition path for bulk insert tasks, default true");
public static final ConfigOption<Integer> WRITE_SORT_MEMORY = ConfigOptions
.key("write.sort.memory")
.intType()
.defaultValue(128)
.withDescription("Sort memory in MB, default 128MB");
// ------------------------------------------------------------------------
// Compaction Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> COMPACTION_SCHEDULE_ENABLED = ConfigOptions
.key("compaction.schedule.enabled")
.booleanType()
.defaultValue(true) // default true for MOR write
.withDescription("Schedule the compaction plan, enabled by default for MOR");
public static final ConfigOption<Boolean> COMPACTION_ASYNC_ENABLED = ConfigOptions
.key("compaction.async.enabled")
.booleanType()
.defaultValue(true) // default true for MOR write
.withDescription("Async Compaction, enabled by default for MOR");
public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
.key("compaction.tasks")
.intType()
.defaultValue(10) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.5 (assumes two commits generate one bucket)
.withDescription("Parallelism of tasks that do actual compaction, default is 10");
public static final String NUM_COMMITS = "num_commits";
public static final String TIME_ELAPSED = "time_elapsed";
public static final String NUM_AND_TIME = "num_and_time";
public static final String NUM_OR_TIME = "num_or_time";
public static final ConfigOption<String> COMPACTION_TRIGGER_STRATEGY = ConfigOptions
.key("compaction.trigger.strategy")
.stringType()
.defaultValue(NUM_COMMITS) // default true for MOR write
.withDescription("Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
+ "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
+ "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
+ "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
+ "Default is 'num_commits'");
public static final ConfigOption<Integer> COMPACTION_DELTA_COMMITS = ConfigOptions
.key("compaction.delta_commits")
.intType()
.defaultValue(5)
.withDescription("Max delta commits needed to trigger compaction, default 5 commits");
public static final ConfigOption<Integer> COMPACTION_DELTA_SECONDS = ConfigOptions
.key("compaction.delta_seconds")
.intType()
.defaultValue(3600) // default 1 hour
.withDescription("Max delta seconds time needed to trigger compaction, default 1 hour");
public static final ConfigOption<Integer> COMPACTION_MAX_MEMORY = ConfigOptions
.key("compaction.max_memory")
.intType()
.defaultValue(100) // default 100 MB
.withDescription("Max memory in MB for compaction spillable map, default 100MB");
public static final ConfigOption<Long> COMPACTION_TARGET_IO = ConfigOptions
.key("compaction.target_io")
.longType()
.defaultValue(5120L) // default 5 GB
.withDescription("Target IO per compaction (both read and write), default 5 GB");
public static final ConfigOption<Boolean> CLEAN_ASYNC_ENABLED = ConfigOptions
.key("clean.async.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether to cleanup the old commits immediately on new commits, enabled by default");
public static final ConfigOption<Integer> CLEAN_RETAIN_COMMITS = ConfigOptions
.key("clean.retain_commits")
.intType()
.defaultValue(10)// default 10 commits
.withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10");
public static final ConfigOption<Integer> ARCHIVE_MAX_COMMITS = ConfigOptions
.key("archive.max_commits")
.intType()
.defaultValue(30)// default max 30 commits
.withDescription("Max number of commits to keep before archiving older commits into a sequential log, default 30");
public static final ConfigOption<Integer> ARCHIVE_MIN_COMMITS = ConfigOptions
.key("archive.min_commits")
.intType()
.defaultValue(20)// default min 20 commits
.withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 20");
// ------------------------------------------------------------------------
// Hive Sync Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions
.key("hive_sync.enable")
.booleanType()
.defaultValue(false)
.withDescription("Asynchronously sync Hive meta to HMS, default false");
public static final ConfigOption<String> HIVE_SYNC_DB = ConfigOptions
.key("hive_sync.db")
.stringType()
.defaultValue("default")
.withDescription("Database name for hive sync, default 'default'");
public static final ConfigOption<String> HIVE_SYNC_TABLE = ConfigOptions
.key("hive_sync.table")
.stringType()
.defaultValue("unknown")
.withDescription("Table name for hive sync, default 'unknown'");
public static final ConfigOption<String> HIVE_SYNC_FILE_FORMAT = ConfigOptions
.key("hive_sync.file_format")
.stringType()
.defaultValue("PARQUET")
.withDescription("File format for hive sync, default 'PARQUET'");
public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions
.key("hive_sync.mode")
.stringType()
.defaultValue("jdbc")
.withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'");
public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
.key("hive_sync.username")
.stringType()
.defaultValue("hive")
.withDescription("Username for hive sync, default 'hive'");
public static final ConfigOption<String> HIVE_SYNC_PASSWORD = ConfigOptions
.key("hive_sync.password")
.stringType()
.defaultValue("hive")
.withDescription("Password for hive sync, default 'hive'");
public static final ConfigOption<String> HIVE_SYNC_JDBC_URL = ConfigOptions
.key("hive_sync.jdbc_url")
.stringType()
.defaultValue("jdbc:hive2://localhost:10000")
.withDescription("Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'");
public static final ConfigOption<String> HIVE_SYNC_METASTORE_URIS = ConfigOptions
.key("hive_sync.metastore.uris")
.stringType()
.defaultValue("")
.withDescription("Metastore uris for hive sync, default ''");
public static final ConfigOption<String> HIVE_SYNC_PARTITION_FIELDS = ConfigOptions
.key("hive_sync.partition_fields")
.stringType()
.defaultValue("")
.withDescription("Partition fields for hive sync, default ''");
public static final ConfigOption<String> HIVE_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigOptions
.key("hive_sync.partition_extractor_class")
.stringType()
.defaultValue(SlashEncodedDayPartitionValueExtractor.class.getCanonicalName())
.withDescription("Tool to extract the partition value from HDFS path, "
+ "default 'SlashEncodedDayPartitionValueExtractor'");
public static final ConfigOption<Boolean> HIVE_SYNC_ASSUME_DATE_PARTITION = ConfigOptions
.key("hive_sync.assume_date_partitioning")
.booleanType()
.defaultValue(false)
.withDescription("Assume partitioning is yyyy/mm/dd, default false");
public static final ConfigOption<Boolean> HIVE_SYNC_USE_JDBC = ConfigOptions
.key("hive_sync.use_jdbc")
.booleanType()
.defaultValue(true)
.withDescription("Use JDBC when hive synchronization is enabled, default true");
public static final ConfigOption<Boolean> HIVE_SYNC_AUTO_CREATE_DB = ConfigOptions
.key("hive_sync.auto_create_db")
.booleanType()
.defaultValue(true)
.withDescription("Auto create hive database if it does not exists, default true");
public static final ConfigOption<Boolean> HIVE_SYNC_IGNORE_EXCEPTIONS = ConfigOptions
.key("hive_sync.ignore_exceptions")
.booleanType()
.defaultValue(false)
.withDescription("Ignore exceptions during hive synchronization, default false");
public static final ConfigOption<Boolean> HIVE_SYNC_SKIP_RO_SUFFIX = ConfigOptions
.key("hive_sync.skip_ro_suffix")
.booleanType()
.defaultValue(false)
.withDescription("Skip the _ro suffix for Read optimized table when registering, default false");
public static final ConfigOption<Boolean> HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions
.key("hive_sync.support_timestamp")
.booleanType()
.defaultValue(false)
.withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
+ "Disabled by default for backward compatibility.");
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";
/**
* Collects the config options that start with 'properties.' into a 'key'='value' list.
*/
public static Map<String, String> getHoodieProperties(Map<String, String> options) {
return getHoodiePropertiesWithPrefix(options, PROPERTIES_PREFIX);
}
/**
* Collects the config options that start with specified prefix {@code prefix} into a 'key'='value' list.
*/
public static Map<String, String> getHoodiePropertiesWithPrefix(Map<String, String> options, String prefix) {
final Map<String, String> hoodieProperties = new HashMap<>();
if (hasPropertyOptions(options)) {
options.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.forEach(key -> {
final String value = options.get(key);
final String subKey = key.substring((prefix).length());
hoodieProperties.put(subKey, value);
});
}
return hoodieProperties;
}
/**
* Collects all the config options, the 'properties.' prefix would be removed if the option key starts with it.
*/
public static Configuration flatOptions(Configuration conf) {
final Map<String, String> propsMap = new HashMap<>();
conf.toMap().forEach((key, value) -> {
final String subKey = key.startsWith(PROPERTIES_PREFIX)
? key.substring((PROPERTIES_PREFIX).length())
: key;
propsMap.put(subKey, value);
});
return fromMap(propsMap);
}
private static boolean hasPropertyOptions(Map<String, String> options) {
return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
/**
* Creates a new configuration that is initialized with the options of the given map.
*/
public static Configuration fromMap(Map<String, String> map) {
final Configuration configuration = new Configuration();
map.forEach(configuration::setString);
return configuration;
}
/**
* Returns whether the given conf defines default value for the option {@code option}.
*/
public static <T> boolean isDefaultValueDefined(Configuration conf, ConfigOption<T> option) {
return !conf.getOptional(option).isPresent()
|| conf.get(option).equals(option.defaultValue());
}
/**
* Returns all the optional config options.
*/
public static Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>(allOptions());
options.remove(PATH);
return options;
}
/**
* Returns all the config options.
*/
public static List<ConfigOption<?>> allOptions() {
Field[] declaredFields = FlinkOptions.class.getDeclaredFields();
List<ConfigOption<?>> options = new ArrayList<>();
for (Field field : declaredFields) {
if (java.lang.reflect.Modifier.isStatic(field.getModifiers())
&& field.getType().equals(ConfigOption.class)) {
try {
options.add((ConfigOption<?>) field.get(ConfigOption.class));
} catch (IllegalAccessException e) {
throw new HoodieException("Error while fetching static config option", e);
}
}
}
return options;
}
}