blob: 84cf2c51ebdb918f11c1e84020e7f52946d1dab8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.inlong.sort.base;
import org.apache.inlong.sort.base.sink.PartitionPolicy;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import java.util.Map;
import static org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
* connector base option constant
public final class Constants {
* constants for metrics
public static final String DIRTY_BYTES_OUT = "dirtyBytesOut";
public static final String DIRTY_RECORDS_OUT = "dirtyRecordsOut";
public static final String NUM_BYTES_OUT = "numBytesOut";
public static final String NUM_RECORDS_OUT = "numRecordsOut";
public static final String NUM_BYTES_OUT_FOR_METER = "numBytesOutForMeter";
public static final String NUM_RECORDS_OUT_FOR_METER = "numRecordsOutForMeter";
public static final String NUM_BYTES_OUT_PER_SECOND = "numBytesOutPerSecond";
public static final String NUM_RECORDS_OUT_PER_SECOND = "numRecordsOutPerSecond";
public static final String NUM_RECORDS_IN = "numRecordsIn";
public static final String NUM_BYTES_IN = "numBytesIn";
public static final String NUM_RECORDS_IN_FOR_METER = "numRecordsInForMeter";
public static final String NUM_BYTES_IN_FOR_METER = "numBytesInForMeter";
public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond";
public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond";
* Timestamp when the read phase changed
public static final String READ_PHASE_TIMESTAMP = "readPhaseTimestamp";
* Time span in seconds
public static final Integer TIME_SPAN_IN_SECONDS = 60;
* Stream id used in inlong metric
public static final String STREAM_ID = "streamId";
* Group id used in inlong metric
public static final String GROUP_ID = "groupId";
* Node id used in inlong metric
public static final String NODE_ID = "nodeId";
// sort received successfully
public static final String AUDIT_SORT_INPUT = "7";
// sort send successfully
public static final Integer AUDIT_SORT_OUTPUT = 8;
* Database Name used in inlong metric
public static final String DATABASE_NAME = "database";
* Table Name used in inlong metric
public static final String TABLE_NAME = "table";
* Collection Name used in inlong metric
public static final String COLLECTION_NAME = "collection";
* Read Phase used in inlong metric
public static final String READ_PHASE = "readPhase";
* Schema Name used in inlong metric
public static final String SCHEMA_NAME = "schema";
* Topic Name used in inlong metric
public static final String TOPIC_NAME = "topic";
* It is used for 'inlong.metric.labels' or 'sink.dirty.labels'
public static final String DELIMITER = "&";
* It is used for metric data to build schema identify
public static final String SEMICOLON = ".";
* The caret symbol (^) at the start of a regular expression to indicate
* that a match must occur at the beginning of the searched text.
public static final String CARET = "^";
* The dollar symbol ($) at the end of a regular expression to indicate
* that a match must occur at the ending of the searched text.
public static final String DOLLAR = "$";
* It is used for metric data to spilt schema identify
public static final String SPILT_SEMICOLON = "\\.";
* The delimiter of key and value, it is used for 'inlong.metric.labels' or 'sink.dirty.labels'
public static final String KEY_VALUE_DELIMITER = "=";
public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states";
* It is used for jdbc url filter for avoiding url attack
* see also in
public static final String AUTO_DESERIALIZE = "autoDeserialize";
public static final String AUTO_DESERIALIZE_TRUE = "autoDeserialize=true";
public static final String AUTO_DESERIALIZE_FALSE = "autoDeserialize=false";
public static final String DDL_FIELD_NAME = "ddl";
public static final String DDL_OP_ALTER = "ALTER";
public static final String DDL_OP_DROP = "DROP";
public static final String GHOST_TAG = "/* gh-ost */";
public static final ConfigOption<String> INLONG_METRIC =
.withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
+ "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
public static final ConfigOption<String> INLONG_AUDIT =
.withDescription("Audit proxy host address for reporting audit metrics. \n"
+ "e.g.,");
public static final ConfigOption<String> AUDIT_KEYS =
.withDescription("Audit keys for metrics collecting");
public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
.withDescription("Regard upsert delete as insert kind.");
public static final ConfigOption<String> SINK_MULTIPLE_FORMAT =
"The format of multiple sink, it represents the real format of the raw binary data");
public static final ConfigOption<String> PATTERN_PARTITION_MAP =
"Pattern rules and partition map string, " +
"eg: databasePattern1&tablePattern1:partition1,"
+ "databasePattern2&tablePattern2:partition2,"
+ "DEFAULT_PARTITION:partition3");
public static final ConfigOption<Map<String, String>> DATASOURCE_PARTITION_MAP =
"Datasource and partition maps, eg: <datasource1,partition1>");
public static final ConfigOption<String> SINK_MULTIPLE_DATABASE_PATTERN =
.withDescription("The option 'sink.multiple.database-pattern' "
+ "is used extract database name from the raw binary data, "
+ "this is only used in the multiple sink writing scenario.");
public static final ConfigOption<Boolean> SOURCE_MULTIPLE_ENABLE =
.withDescription("Whether enable migrate multiple databases");
public static final ConfigOption<String> SINK_MULTIPLE_TABLE_PATTERN =
.withDescription("The option 'sink.multiple.table-pattern' "
+ "is used extract table name from the raw binary data, "
+ "this is only used in the multiple sink writing scenario.");
public static final ConfigOption<Boolean> SINK_MULTIPLE_ENABLE =
.withDescription("The option 'sink.multiple.enable' "
+ "is used to determine whether to support multiple sink writing, default is 'false'.");
public static final ConfigOption<SchemaUpdateExceptionPolicy> SINK_MULTIPLE_SCHEMA_UPDATE_POLICY =
.withDescription("The action to deal with schema update in multiple sink.");
public static final ConfigOption<Boolean> SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS =
.withDescription("Whether ignore the single table erros when multiple sink writing scenario.");
public static final ConfigOption<Boolean> SINK_MULTIPLE_PK_AUTO_GENERATED =
.withDescription("Whether generated pk fields as whole data when source table does not have a "
+ "primary key.");
public static final ConfigOption<Boolean> SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK =
.withDescription("Because spark do not support iceberg data type: `timestamp without time zone` and"
+ "`time`, so type conversions must be mapped to types supported by spark.");
public static final ConfigOption<PartitionPolicy> SINK_PARTITION_POLICY =
.withDescription("The policy of partitioning table.");
public static final ConfigOption<String> SOURCE_PARTITION_FIELD_NAME =
"The field name in source generic raw data, partition table by the field value dynamically."
+ "Support regex expressions to match many fields in source generic raw data.");
// ========================================= dirty configuration =========================================
public static final String DIRTY_PREFIX = "dirty.";
public static final ConfigOption<Boolean> DIRTY_IGNORE =
.withDescription("Whether ignore the dirty data, default value is 'false'");
public static final ConfigOption<String> DIRTY_IDENTIFIER =
"The identifier of dirty data, it will be used for filename generation of file dirty sink, "
+ "topic generation of mq dirty sink, tablename generation of database, etc."
+ "and it supports variable replace like '${variable}'."
+ "There are several system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] "
+ "are currently supported, "
+ "and the support of other variables is determined by the connector.");
public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_ENABLE =
.withDescription("Whether supports dirty data side-output, default value is 'false'");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_CONNECTOR =
.withDescription("The connector of dirty side-output");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FORMAT =
"The format of dirty side-output, only support [csv|json] for now and default value is 'csv'");
public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_IGNORE_ERRORS =
.withDescription("Whether ignore the dirty side-output erros, default value is 'true'.");
public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_LOG_ENABLE =
.withDescription("Whether enable log print, default value is 'true'.");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LABELS =
"The labels of dirty side-output, format is 'key1=value1&key2=value2', "
+ "it supports variable replace like '${variable}',"
+ "There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] "
+ "are currently supported,"
+ " and the support of other variables is determined by the connector.");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LOG_TAG =
"The log tag of dirty side-output, it supports variable replace like '${variable}'."
+ "There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,"
+ " and the support of other variables is determined by the connector.");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER =
.withDescription("The field-delimiter of dirty side-output");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LINE_DELIMITER =
.withDescription("The line-delimiter of dirty sink");
public static final ConfigOption<Integer> DIRTY_SIDE_OUTPUT_BATCH_SIZE = ConfigOptions
"The flush max size, over this number of records, will flush data. The default value is 100.");
public static final ConfigOption<Integer> DIRTY_SIDE_OUTPUT_RETRIES = ConfigOptions
.withDescription("The retry times if writing records failed.");
public static final ConfigOption<Long> DIRTY_SIDE_OUTPUT_BATCH_INTERVAL = ConfigOptions
"The flush interval mills, over this time, "
+ "asynchronous threads will flush data. The default value is 60s.");
public static final ConfigOption<Long> DIRTY_SIDE_OUTPUT_BATCH_BYTES = ConfigOptions
"The flush max bytes, over this number in batch, will flush data. The default value is 10KB.");
public static final ConfigOption<Boolean> GH_OST_DDL_CHANGE = ConfigOptions
"Whether parse ddl changes of gh-ost, default value is 'false'.");
public static final ConfigOption<String> GH_OST_TABLE_REGEX = ConfigOptions
"Matcher the original table name from the ddl of gh-ost.");
public static final ConfigOption<Boolean> SINK_SCHEMA_CHANGE_ENABLE =
.withDescription("Whether supports schema-change, default value is 'false'");
public static final ConfigOption<String> SINK_SCHEMA_CHANGE_POLICIES =
.withDescription("The policies of schema-change, format is 'key1=value1&key2=value2', "
+ "the key is the type of schema-change and the value is the support policy of schema-change");