blob: 84cf2c51ebdb918f11c1e84020e7f52946d1dab8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.sort.base;
import org.apache.inlong.sort.base.sink.PartitionPolicy;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import java.util.Map;
import static org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
/**
* connector base option constant
*/
public final class Constants {
/**
* constants for metrics
*/
public static final String DIRTY_BYTES_OUT = "dirtyBytesOut";
public static final String DIRTY_RECORDS_OUT = "dirtyRecordsOut";
public static final String NUM_BYTES_OUT = "numBytesOut";
public static final String NUM_RECORDS_OUT = "numRecordsOut";
public static final String NUM_BYTES_OUT_FOR_METER = "numBytesOutForMeter";
public static final String NUM_RECORDS_OUT_FOR_METER = "numRecordsOutForMeter";
public static final String NUM_BYTES_OUT_PER_SECOND = "numBytesOutPerSecond";
public static final String NUM_RECORDS_OUT_PER_SECOND = "numRecordsOutPerSecond";
public static final String NUM_RECORDS_IN = "numRecordsIn";
public static final String NUM_BYTES_IN = "numBytesIn";
public static final String NUM_RECORDS_IN_FOR_METER = "numRecordsInForMeter";
public static final String NUM_BYTES_IN_FOR_METER = "numBytesInForMeter";
public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond";
public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond";
/**
* Timestamp when the read phase changed
*/
public static final String READ_PHASE_TIMESTAMP = "readPhaseTimestamp";
/**
* Time span in seconds
*/
public static final Integer TIME_SPAN_IN_SECONDS = 60;
/**
* Stream id used in inlong metric
*/
public static final String STREAM_ID = "streamId";
/**
* Group id used in inlong metric
*/
public static final String GROUP_ID = "groupId";
/**
* Node id used in inlong metric
*/
public static final String NODE_ID = "nodeId";
// sort received successfully
public static final String AUDIT_SORT_INPUT = "7";
// sort send successfully
public static final Integer AUDIT_SORT_OUTPUT = 8;
/**
* Database Name used in inlong metric
*/
public static final String DATABASE_NAME = "database";
/**
* Table Name used in inlong metric
*/
public static final String TABLE_NAME = "table";
/**
* Collection Name used in inlong metric
*/
public static final String COLLECTION_NAME = "collection";
/**
* Read Phase used in inlong metric
*/
public static final String READ_PHASE = "readPhase";
/**
* Schema Name used in inlong metric
*/
public static final String SCHEMA_NAME = "schema";
/**
* Topic Name used in inlong metric
*/
public static final String TOPIC_NAME = "topic";
/**
* It is used for 'inlong.metric.labels' or 'sink.dirty.labels'
*/
public static final String DELIMITER = "&";
/**
* It is used for metric data to build schema identify
*/
public static final String SEMICOLON = ".";
/**
* The caret symbol (^) at the start of a regular expression to indicate
* that a match must occur at the beginning of the searched text.
*/
public static final String CARET = "^";
/**
* The dollar symbol ($) at the end of a regular expression to indicate
* that a match must occur at the ending of the searched text.
*/
public static final String DOLLAR = "$";
/**
* It is used for metric data to spilt schema identify
*/
public static final String SPILT_SEMICOLON = "\\.";
/**
* The delimiter of key and value, it is used for 'inlong.metric.labels' or 'sink.dirty.labels'
*/
public static final String KEY_VALUE_DELIMITER = "=";
public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states";
/**
* It is used for jdbc url filter for avoiding url attack
* see also in https://su18.org/post/jdbc-connection-url-attack/
*/
public static final String AUTO_DESERIALIZE = "autoDeserialize";
public static final String AUTO_DESERIALIZE_TRUE = "autoDeserialize=true";
public static final String AUTO_DESERIALIZE_FALSE = "autoDeserialize=false";
public static final String DDL_FIELD_NAME = "ddl";
public static final String DDL_OP_ALTER = "ALTER";
public static final String DDL_OP_DROP = "DROP";
public static final String GHOST_TAG = "/* gh-ost */";
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric.labels")
.stringType()
.noDefaultValue()
.withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
+ "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
public static final ConfigOption<String> INLONG_AUDIT =
ConfigOptions.key(METRICS_AUDIT_PROXY_HOSTS_KEY)
.stringType()
.noDefaultValue()
.withDescription("Audit proxy host address for reporting audit metrics. \n"
+ "e.g. 127.0.0.1:10081,0.0.0.1:10081");
public static final ConfigOption<String> AUDIT_KEYS =
ConfigOptions.key("metrics.audit.key")
.stringType()
.defaultValue("")
.withDescription("Audit keys for metrics collecting");
public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
ConfigOptions.key("sink.ignore.changelog")
.booleanType()
.defaultValue(false)
.withDescription("Regard upsert delete as insert kind.");
public static final ConfigOption<String> SINK_MULTIPLE_FORMAT =
ConfigOptions.key("sink.multiple.format")
.stringType()
.noDefaultValue()
.withDescription(
"The format of multiple sink, it represents the real format of the raw binary data");
public static final ConfigOption<String> PATTERN_PARTITION_MAP =
ConfigOptions.key("pattern.partition.map")
.stringType()
.noDefaultValue()
.withDescription(
"Pattern rules and partition map string, " +
"eg: databasePattern1&tablePattern1:partition1,"
+ "databasePattern2&tablePattern2:partition2,"
+ "DEFAULT_PARTITION:partition3");
public static final ConfigOption<Map<String, String>> DATASOURCE_PARTITION_MAP =
ConfigOptions.key("datasource.partition.map")
.mapType()
.noDefaultValue()
.withDescription(
"Datasource and partition maps, eg: <datasource1,partition1>");
public static final ConfigOption<String> SINK_MULTIPLE_DATABASE_PATTERN =
ConfigOptions.key("sink.multiple.database-pattern")
.stringType()
.noDefaultValue()
.withDescription("The option 'sink.multiple.database-pattern' "
+ "is used extract database name from the raw binary data, "
+ "this is only used in the multiple sink writing scenario.");
public static final ConfigOption<Boolean> SOURCE_MULTIPLE_ENABLE =
ConfigOptions.key("source.multiple.enable")
.booleanType()
.defaultValue(false)
.withDescription("Whether enable migrate multiple databases");
public static final ConfigOption<String> SINK_MULTIPLE_TABLE_PATTERN =
ConfigOptions.key("sink.multiple.table-pattern")
.stringType()
.noDefaultValue()
.withDescription("The option 'sink.multiple.table-pattern' "
+ "is used extract table name from the raw binary data, "
+ "this is only used in the multiple sink writing scenario.");
public static final ConfigOption<Boolean> SINK_MULTIPLE_ENABLE =
ConfigOptions.key("sink.multiple.enable")
.booleanType()
.defaultValue(false)
.withDescription("The option 'sink.multiple.enable' "
+ "is used to determine whether to support multiple sink writing, default is 'false'.");
public static final ConfigOption<SchemaUpdateExceptionPolicy> SINK_MULTIPLE_SCHEMA_UPDATE_POLICY =
ConfigOptions.key("sink.multiple.schema-update.policy")
.enumType(SchemaUpdateExceptionPolicy.class)
.defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST)
.withDescription("The action to deal with schema update in multiple sink.");
public static final ConfigOption<Boolean> SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS =
ConfigOptions.key("sink.multiple.ignore-single-table-errors")
.booleanType()
.defaultValue(true)
.withDescription("Whether ignore the single table erros when multiple sink writing scenario.");
public static final ConfigOption<Boolean> SINK_MULTIPLE_PK_AUTO_GENERATED =
ConfigOptions.key("sink.multiple.pk-auto-generated")
.booleanType()
.defaultValue(false)
.withDescription("Whether generated pk fields as whole data when source table does not have a "
+ "primary key.");
public static final ConfigOption<Boolean> SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK =
ConfigOptions.key("sink.multiple.typemap-compatible-with-spark")
.booleanType()
.defaultValue(false)
.withDescription("Because spark do not support iceberg data type: `timestamp without time zone` and"
+ "`time`, so type conversions must be mapped to types supported by spark.");
public static final ConfigOption<PartitionPolicy> SINK_PARTITION_POLICY =
ConfigOptions.key("sink.partition.policy")
.enumType(PartitionPolicy.class)
.defaultValue(PartitionPolicy.PROC_TIME)
.withDescription("The policy of partitioning table.");
public static final ConfigOption<String> SOURCE_PARTITION_FIELD_NAME =
ConfigOptions.key("source.partition.field.name")
.stringType()
.noDefaultValue()
.withDescription(
"The field name in source generic raw data, partition table by the field value dynamically."
+ "Support regex expressions to match many fields in source generic raw data.");
// ========================================= dirty configuration =========================================
public static final String DIRTY_PREFIX = "dirty.";
public static final ConfigOption<Boolean> DIRTY_IGNORE =
ConfigOptions.key("dirty.ignore")
.booleanType()
.defaultValue(false)
.withDescription("Whether ignore the dirty data, default value is 'false'");
public static final ConfigOption<String> DIRTY_IDENTIFIER =
ConfigOptions.key("dirty.identifier")
.stringType()
.noDefaultValue()
.withDescription(
"The identifier of dirty data, it will be used for filename generation of file dirty sink, "
+ "topic generation of mq dirty sink, tablename generation of database, etc."
+ "and it supports variable replace like '${variable}'."
+ "There are several system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] "
+ "are currently supported, "
+ "and the support of other variables is determined by the connector.");
public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_ENABLE =
ConfigOptions.key("dirty.side-output.enable")
.booleanType()
.defaultValue(false)
.withDescription("Whether supports dirty data side-output, default value is 'false'");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_CONNECTOR =
ConfigOptions.key("dirty.side-output.connector")
.stringType()
.noDefaultValue()
.withDescription("The connector of dirty side-output");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FORMAT =
ConfigOptions.key("dirty.side-output.format")
.stringType()
.defaultValue("csv")
.withDescription(
"The format of dirty side-output, only support [csv|json] for now and default value is 'csv'");
public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_IGNORE_ERRORS =
ConfigOptions.key("dirty.side-output.ignore-errors")
.booleanType()
.defaultValue(true)
.withDescription("Whether ignore the dirty side-output erros, default value is 'true'.");
public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_LOG_ENABLE =
ConfigOptions.key("dirty.side-output.log.enable")
.booleanType()
.defaultValue(true)
.withDescription("Whether enable log print, default value is 'true'.");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LABELS =
ConfigOptions.key("dirty.side-output.labels")
.stringType()
.noDefaultValue()
.withDescription(
"The labels of dirty side-output, format is 'key1=value1&key2=value2', "
+ "it supports variable replace like '${variable}',"
+ "There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] "
+ "are currently supported,"
+ " and the support of other variables is determined by the connector.");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LOG_TAG =
ConfigOptions.key("dirty.side-output.log-tag")
.stringType()
.defaultValue("DirtyData")
.withDescription(
"The log tag of dirty side-output, it supports variable replace like '${variable}'."
+ "There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,"
+ " and the support of other variables is determined by the connector.");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER =
ConfigOptions.key("dirty.side-output.field-delimiter")
.stringType()
.defaultValue(",")
.withDescription("The field-delimiter of dirty side-output");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LINE_DELIMITER =
ConfigOptions.key("dirty.side-output.line-delimiter")
.stringType()
.defaultValue("\n")
.withDescription("The line-delimiter of dirty sink");
public static final ConfigOption<Integer> DIRTY_SIDE_OUTPUT_BATCH_SIZE = ConfigOptions
.key("dirty.side-output.batch.size")
.intType()
.defaultValue(100)
.withDescription(
"The flush max size, over this number of records, will flush data. The default value is 100.");
public static final ConfigOption<Integer> DIRTY_SIDE_OUTPUT_RETRIES = ConfigOptions
.key("dirty.side-output.retries")
.intType()
.defaultValue(3)
.withDescription("The retry times if writing records failed.");
public static final ConfigOption<Long> DIRTY_SIDE_OUTPUT_BATCH_INTERVAL = ConfigOptions
.key("dirty.side-output.batch.interval")
.longType()
.defaultValue(60000L)
.withDescription(
"The flush interval mills, over this time, "
+ "asynchronous threads will flush data. The default value is 60s.");
public static final ConfigOption<Long> DIRTY_SIDE_OUTPUT_BATCH_BYTES = ConfigOptions
.key("dirty.side-output.batch.bytes")
.longType()
.defaultValue(10240L)
.withDescription(
"The flush max bytes, over this number in batch, will flush data. The default value is 10KB.");
public static final ConfigOption<Boolean> GH_OST_DDL_CHANGE = ConfigOptions
.key("gh-ost.ddl.change")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether parse ddl changes of gh-ost, default value is 'false'.");
public static final ConfigOption<String> GH_OST_TABLE_REGEX = ConfigOptions
.key("gh-ost.table.regex")
.stringType()
.defaultValue("^_(.*)_(gho|ghc|del)$")
.withDescription(
"Matcher the original table name from the ddl of gh-ost.");
public static final ConfigOption<Boolean> SINK_SCHEMA_CHANGE_ENABLE =
ConfigOptions.key("sink.schema-change.enable")
.booleanType()
.defaultValue(false)
.withDescription("Whether supports schema-change, default value is 'false'");
public static final ConfigOption<String> SINK_SCHEMA_CHANGE_POLICIES =
ConfigOptions.key("sink.schema-change.policies")
.stringType()
.noDefaultValue()
.withDescription("The policies of schema-change, format is 'key1=value1&key2=value2', "
+ "the key is the type of schema-change and the value is the support policy of schema-change");
}