blob: b6320da8f704dbae7e54c5a4adc21155c25ac330 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.io.Serializable;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
@Setter
@Getter
@ToString
public class SinkConfig extends CommonConfig {
public static final String BATCH_SIZE = "batch_size";
public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
public static final String MAX_RETRIES = "max_retries";
public static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms";
public static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
public static final String DEFAULT_THRIFT_BUFFER_SIZE = "default_thrift_buffer_size";
public static final String MAX_THRIFT_FRAME_SIZE = "max_thrift_frame_size";
public static final String ZONE_ID = "zone_id";
public static final String ENABLE_RPC_COMPRESSION = "enable_rpc_compression";
public static final String CONNECTION_TIMEOUT_IN_MS = "connection_timeout_in_ms";
public static final String TIMESERIES_OPTIONS = "timeseries_options";
public static final String TIMESERIES_OPTION_PATH = "path";
public static final String TIMESERIES_OPTION_DATA_TYPE = "data_type";
private static final int DEFAULT_BATCH_SIZE = 1024;
private int batchSize = DEFAULT_BATCH_SIZE;
private Integer batchIntervalMs;
private int maxRetries;
private int retryBackoffMultiplierMs;
private int maxRetryBackoffMs;
private Integer thriftDefaultBufferSize;
private Integer thriftMaxFrameSize;
private ZoneId zoneId;
private Boolean enableRPCCompression;
private Integer connectionTimeoutInMs;
private List<TimeseriesOption> timeseriesOptions;
public SinkConfig(@NonNull List<String> nodeUrls,
@NonNull String username,
@NonNull String password) {
super(nodeUrls, username, password);
}
public static SinkConfig loadConfig(Config pluginConfig) {
SinkConfig sinkConfig = new SinkConfig(
pluginConfig.getStringList(NODE_URLS),
pluginConfig.getString(USERNAME),
pluginConfig.getString(PASSWORD));
if (pluginConfig.hasPath(BATCH_SIZE)) {
int batchSize = checkIntArgument(pluginConfig.getInt(BATCH_SIZE));
sinkConfig.setBatchSize(batchSize);
}
if (pluginConfig.hasPath(BATCH_INTERVAL_MS)) {
int batchIntervalMs = checkIntArgument(pluginConfig.getInt(BATCH_INTERVAL_MS));
sinkConfig.setBatchIntervalMs(batchIntervalMs);
}
if (pluginConfig.hasPath(MAX_RETRIES)) {
int maxRetries = checkIntArgument(pluginConfig.getInt(MAX_RETRIES));
sinkConfig.setMaxRetries(maxRetries);
}
if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
int retryBackoffMultiplierMs = checkIntArgument(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
sinkConfig.setRetryBackoffMultiplierMs(retryBackoffMultiplierMs);
}
if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS)) {
int maxRetryBackoffMs = checkIntArgument(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS));
sinkConfig.setMaxRetryBackoffMs(maxRetryBackoffMs);
}
if (pluginConfig.hasPath(DEFAULT_THRIFT_BUFFER_SIZE)) {
int thriftDefaultBufferSize = checkIntArgument(pluginConfig.getInt(DEFAULT_THRIFT_BUFFER_SIZE));
sinkConfig.setThriftDefaultBufferSize(thriftDefaultBufferSize);
}
if (pluginConfig.hasPath(MAX_THRIFT_FRAME_SIZE)) {
int thriftMaxFrameSize = checkIntArgument(pluginConfig.getInt(MAX_THRIFT_FRAME_SIZE));
sinkConfig.setThriftMaxFrameSize(thriftMaxFrameSize);
}
if (pluginConfig.hasPath(ZONE_ID)) {
sinkConfig.setZoneId(ZoneId.of(pluginConfig.getString(ZONE_ID)));
}
if (pluginConfig.hasPath(ENABLE_RPC_COMPRESSION)) {
sinkConfig.setEnableRPCCompression(pluginConfig.getBoolean(ENABLE_RPC_COMPRESSION));
}
if (pluginConfig.hasPath(CONNECTION_TIMEOUT_IN_MS)) {
int connectionTimeoutInMs = checkIntArgument(pluginConfig.getInt(CONNECTION_TIMEOUT_IN_MS));
checkNotNull(sinkConfig.getEnableRPCCompression());
sinkConfig.setConnectionTimeoutInMs(connectionTimeoutInMs);
}
if (pluginConfig.hasPath(TIMESERIES_OPTIONS)) {
List<? extends Config> timeseriesConfigs = pluginConfig.getConfigList(TIMESERIES_OPTIONS);
List<TimeseriesOption> timeseriesOptions = new ArrayList<>(timeseriesConfigs.size());
for (Config timeseriesConfig : timeseriesConfigs) {
String timeseriesPath = timeseriesConfig.getString(TIMESERIES_OPTION_PATH);
String timeseriesDataType = timeseriesConfig.getString(TIMESERIES_OPTION_DATA_TYPE);
TimeseriesOption timeseriesOption = new TimeseriesOption(
timeseriesPath, TSDataType.valueOf(timeseriesDataType));
timeseriesOptions.add(timeseriesOption);
}
sinkConfig.setTimeseriesOptions(timeseriesOptions);
}
return sinkConfig;
}
private static int checkIntArgument(int args) {
checkArgument(args > 0);
return args;
}
@Getter
@ToString
@AllArgsConstructor
public static class TimeseriesOption implements Serializable {
private String path;
private TSDataType dataType = TSDataType.TEXT;
}
}