blob: dcbec94dbddebe8564fb6261e616548c0779eeae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import javax.annotation.concurrent.Immutable;
import java.util.Properties;
/**
* Configs needed to sync data into the Hive Metastore.
*/
@Immutable
@ConfigClassProperty(name = "Hive Sync Configs",
groupName = ConfigGroups.Names.META_SYNC,
description = "Configurations used by the Hudi to sync metadata to Hive Metastore.")
public class HiveSyncConfig extends HoodieSyncConfig {
/*
* Config constants below are retained here for BWC purpose.
*/
public static final ConfigProperty<String> HIVE_SYNC_ENABLED = HiveSyncConfigHolder.HIVE_SYNC_ENABLED;
public static final ConfigProperty<String> HIVE_USER = HiveSyncConfigHolder.HIVE_USER;
public static final ConfigProperty<String> HIVE_PASS = HiveSyncConfigHolder.HIVE_PASS;
public static final ConfigProperty<String> HIVE_URL = HiveSyncConfigHolder.HIVE_URL;
public static final ConfigProperty<String> HIVE_USE_PRE_APACHE_INPUT_FORMAT = HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
/**
* @deprecated Use {@link #HIVE_SYNC_MODE} instead of this config from 0.9.0
*/
@Deprecated
public static final ConfigProperty<String> HIVE_USE_JDBC = HiveSyncConfigHolder.HIVE_USE_JDBC;
public static final ConfigProperty<String> METASTORE_URIS = HiveSyncConfigHolder.METASTORE_URIS;
public static final ConfigProperty<String> HIVE_AUTO_CREATE_DATABASE = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
public static final ConfigProperty<String> HIVE_IGNORE_EXCEPTIONS = HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS;
public static final ConfigProperty<String> HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE = HiveSyncConfigHolder.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE;
public static final ConfigProperty<String> HIVE_SUPPORT_TIMESTAMP_TYPE = HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
public static final ConfigProperty<String> HIVE_TABLE_PROPERTIES = HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES;
public static final ConfigProperty<String> HIVE_TABLE_SERDE_PROPERTIES = HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES;
public static final ConfigProperty<String> HIVE_SYNC_AS_DATA_SOURCE_TABLE = HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE;
public static final ConfigProperty<Integer> HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = HiveSyncConfigHolder.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD;
public static final ConfigProperty<Boolean> HIVE_CREATE_MANAGED_TABLE = HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
public static final ConfigProperty<Boolean> HIVE_SYNC_OMIT_METADATA_FIELDS = HiveSyncConfigHolder.HIVE_SYNC_OMIT_METADATA_FIELDS;
public static final ConfigProperty<Integer> HIVE_BATCH_SYNC_PARTITION_NUM = HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
public static final ConfigProperty<String> HIVE_SYNC_MODE = HiveSyncConfigHolder.HIVE_SYNC_MODE;
public static final ConfigProperty<Boolean> HIVE_SYNC_BUCKET_SYNC = HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
public static final ConfigProperty<String> HIVE_SYNC_BUCKET_SYNC_SPEC = HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
public static final ConfigProperty<String> HIVE_SYNC_COMMENT = HiveSyncConfigHolder.HIVE_SYNC_COMMENT;
public static final ConfigProperty<String> HIVE_SYNC_TABLE_STRATEGY = HiveSyncConfigHolder.HIVE_SYNC_TABLE_STRATEGY;
public static final ConfigProperty<Boolean> HIVE_SYNC_FILTER_PUSHDOWN_ENABLED = ConfigProperty
.key("hoodie.datasource.hive_sync.filter_pushdown_enabled")
.defaultValue(false)
.markAdvanced()
.withDocumentation("Whether to enable push down partitions by filter");
public static final ConfigProperty<Integer> HIVE_SYNC_FILTER_PUSHDOWN_MAX_SIZE = ConfigProperty
.key("hoodie.datasource.hive_sync.filter_pushdown_max_size")
.defaultValue(1000)
.markAdvanced()
.withDocumentation("Max size limit to push down partition filters, if the estimate push down "
+ "filters exceed this size, will directly try to fetch all partitions");
public static String getBucketSpec(String bucketCols, int bucketNum) {
return "CLUSTERED BY (" + bucketCols + " INTO " + bucketNum + " BUCKETS";
}
public HiveSyncConfig(Properties props) {
super(props);
validateParameters();
}
public HiveSyncConfig(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
HiveConf hiveConf = hadoopConf instanceof HiveConf
? (HiveConf) hadoopConf : new HiveConf(hadoopConf, HiveConf.class);
setHadoopConf(hiveConf);
validateParameters();
}
public HiveConf getHiveConf() {
return (HiveConf) getHadoopConf();
}
public boolean useBucketSync() {
return getBooleanOrDefault(HIVE_SYNC_BUCKET_SYNC);
}
public static class HiveSyncConfigParams {
@ParametersDelegate()
public final HoodieSyncConfigParams hoodieSyncConfigParams = new HoodieSyncConfigParams();
@Parameter(names = {"--user"}, description = "Hive username")
public String hiveUser;
@Parameter(names = {"--pass"}, description = "Hive password")
public String hivePass;
@Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url")
public String jdbcUrl;
@Parameter(names = {"--use-pre-apache-input-format"},
description = "Use InputFormat under com.uber.hoodie package "
+ "instead of org.apache.hudi package. Use this when you are in the process of migrating from "
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
+ "org.apache.hudi input format.")
public Boolean usePreApacheInputFormat;
@Deprecated
@Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
public Boolean useJdbc;
@Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
public String metastoreUris;
@Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
public String syncMode;
@Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
public Boolean autoCreateDatabase;
@Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions")
public Boolean ignoreExceptions;
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
public Boolean skipROSuffix;
@Parameter(names = {"--table-properties"}, description = "Table properties to hive table")
public String tableProperties;
@Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table")
public String serdeProperties;
@Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type."
+ "Disabled by default for backward compatibility.")
public Boolean supportTimestamp;
@Parameter(names = {"--managed-table"}, description = "Create a managed table")
public Boolean createManagedTable;
@Parameter(names = {"--omit-metafields"}, description = "Omit metafields in schema")
public Boolean omitMetaFields;
@Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
public Integer batchSyncNum;
@Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
public Boolean syncAsSparkDataSourceTable;
@Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
public Integer sparkSchemaLengthThreshold;
@Parameter(names = {"--bucket-sync"}, description = "use bucket sync")
public Boolean bucketSync;
@Parameter(names = {"--bucket-spec"}, description = "bucket spec stored in metastore")
public String bucketSpec;
@Parameter(names = {"--sync-comment"}, description = "synchronize table comments to hive")
public Boolean syncComment;
@Parameter(names = {"--sync-strategy"}, description = "Hive table synchronization strategy. Available option: ONLY_RO, ONLY_RT, ALL")
public Boolean syncStrategy;
public boolean isHelp() {
return hoodieSyncConfigParams.isHelp();
}
public TypedProperties toProps() {
final TypedProperties props = hoodieSyncConfigParams.toProps();
props.setPropertyIfNonNull(HIVE_USER.key(), hiveUser);
props.setPropertyIfNonNull(HIVE_PASS.key(), hivePass);
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
props.setPropertyIfNonNull(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), usePreApacheInputFormat);
props.setPropertyIfNonNull(HIVE_USE_JDBC.key(), useJdbc);
props.setPropertyIfNonNull(HIVE_SYNC_MODE.key(), syncMode);
props.setPropertyIfNonNull(METASTORE_URIS.key(), metastoreUris);
props.setPropertyIfNonNull(HIVE_AUTO_CREATE_DATABASE.key(), autoCreateDatabase);
props.setPropertyIfNonNull(HIVE_IGNORE_EXCEPTIONS.key(), ignoreExceptions);
props.setPropertyIfNonNull(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key(), skipROSuffix);
props.setPropertyIfNonNull(HIVE_SUPPORT_TIMESTAMP_TYPE.key(), supportTimestamp);
props.setPropertyIfNonNull(HIVE_TABLE_PROPERTIES.key(), tableProperties);
props.setPropertyIfNonNull(HIVE_TABLE_SERDE_PROPERTIES.key(), serdeProperties);
props.setPropertyIfNonNull(HIVE_SYNC_AS_DATA_SOURCE_TABLE.key(), syncAsSparkDataSourceTable);
props.setPropertyIfNonNull(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), sparkSchemaLengthThreshold);
props.setPropertyIfNonNull(HIVE_CREATE_MANAGED_TABLE.key(), createManagedTable);
props.setPropertyIfNonNull(HIVE_SYNC_OMIT_METADATA_FIELDS.key(), omitMetaFields);
props.setPropertyIfNonNull(HIVE_BATCH_SYNC_PARTITION_NUM.key(), batchSyncNum);
props.setPropertyIfNonNull(HIVE_SYNC_BUCKET_SYNC.key(), bucketSync);
props.setPropertyIfNonNull(HIVE_SYNC_BUCKET_SYNC_SPEC.key(), bucketSpec);
props.setPropertyIfNonNull(HIVE_SYNC_COMMENT.key(), syncComment);
props.setPropertyIfNonNull(HIVE_SYNC_TABLE_STRATEGY.key(), syncStrategy);
return props;
}
}
public void validateParameters() {
ValidationUtils.checkArgument(getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) > 0, "batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
}
}