blob: 8f7022d42af5116ca995e306a782991bafac3d51 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.cfg;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.Properties;
/**
* Doris sink batch options.
*/
public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;
public static final int DEFAULT_CHECK_INTERVAL = 10000;
public static final int DEFAULT_MAX_RETRY_TIMES = 1;
private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
private static final int DEFAULT_BUFFER_COUNT = 3;
//batch flush
private static final int DEFAULT_FLUSH_QUEUE_SIZE = 2;
private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 50000;
private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 10 * 1024 * 1024;
private static final long DEFAULT_BUFFER_FLUSH_INTERVAL_MS = 10 * 1000;
private final int checkInterval;
private final int maxRetries;
private final int bufferSize;
private final int bufferCount;
private final String labelPrefix;
private final boolean useCache;
/**
* Properties for the StreamLoad.
*/
private final Properties streamLoadProp;
private final Boolean enableDelete;
private Boolean enable2PC;
private boolean force2PC;
//batch mode param
private final int flushQueueSize;
private final int bufferFlushMaxRows;
private final int bufferFlushMaxBytes;
private final long bufferFlushIntervalMs;
private final boolean enableBatchMode;
private final boolean ignoreUpdateBefore;
public DorisExecutionOptions(int checkInterval,
int maxRetries,
int bufferSize,
int bufferCount,
String labelPrefix,
boolean useCache,
Properties streamLoadProp,
Boolean enableDelete,
Boolean enable2PC,
boolean enableBatchMode,
int flushQueueSize,
int bufferFlushMaxRows,
int bufferFlushMaxBytes,
long bufferFlushIntervalMs,
boolean ignoreUpdateBefore,
boolean force2PC) {
Preconditions.checkArgument(maxRetries >= 0);
this.checkInterval = checkInterval;
this.maxRetries = maxRetries;
this.bufferSize = bufferSize;
this.bufferCount = bufferCount;
this.labelPrefix = labelPrefix;
this.useCache = useCache;
this.streamLoadProp = streamLoadProp;
this.enableDelete = enableDelete;
this.enable2PC = enable2PC;
this.force2PC = force2PC;
this.enableBatchMode = enableBatchMode;
this.flushQueueSize = flushQueueSize;
this.bufferFlushMaxRows = bufferFlushMaxRows;
this.bufferFlushMaxBytes = bufferFlushMaxBytes;
this.bufferFlushIntervalMs = bufferFlushIntervalMs;
this.ignoreUpdateBefore = ignoreUpdateBefore;
}
public static Builder builder() {
return new Builder();
}
public static Builder builderDefaults() {
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
return new Builder().setStreamLoadProp(properties);
}
public static DorisExecutionOptions defaults() {
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
return new Builder().setStreamLoadProp(properties).build();
}
public Integer checkInterval() {
return checkInterval;
}
public Integer getMaxRetries() {
return maxRetries;
}
public static long getSerialVersionUID() {
return serialVersionUID;
}
public int getBufferSize() {
return bufferSize;
}
public int getBufferCount() {
return bufferCount;
}
public String getLabelPrefix() {
return labelPrefix;
}
public boolean isUseCache () {
return useCache;
}
public Properties getStreamLoadProp() {
return streamLoadProp;
}
public Boolean getDeletable() {
return enableDelete;
}
public Boolean enabled2PC() {
return enable2PC;
}
public int getFlushQueueSize() {
return flushQueueSize;
}
public int getBufferFlushMaxRows() {
return bufferFlushMaxRows;
}
public int getBufferFlushMaxBytes() {
return bufferFlushMaxBytes;
}
public long getBufferFlushIntervalMs() {
return bufferFlushIntervalMs;
}
public boolean enableBatchMode() {
return enableBatchMode;
}
public boolean getIgnoreUpdateBefore(){
return ignoreUpdateBefore;
}
public void setEnable2PC(Boolean enable2PC) {
this.enable2PC = enable2PC;
}
public boolean force2PC() {
return force2PC;
}
/**
* Builder of {@link DorisExecutionOptions}.
*/
public static class Builder {
private int checkInterval = DEFAULT_CHECK_INTERVAL;
private int maxRetries = DEFAULT_MAX_RETRY_TIMES;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private int bufferCount = DEFAULT_BUFFER_COUNT;
private String labelPrefix = "";
private boolean useCache = false;
private Properties streamLoadProp = new Properties();
private boolean enableDelete = true;
private boolean enable2PC = true;
//A flag used to determine whether to forcibly open 2pc. By default, the uniq model close 2pc.
private boolean force2PC = false;
private int flushQueueSize = DEFAULT_FLUSH_QUEUE_SIZE;
private int bufferFlushMaxRows = DEFAULT_BUFFER_FLUSH_MAX_ROWS;
private int bufferFlushMaxBytes = DEFAULT_BUFFER_FLUSH_MAX_BYTES;
private long bufferFlushIntervalMs = DEFAULT_BUFFER_FLUSH_INTERVAL_MS;
private boolean enableBatchMode = false;
private boolean ignoreUpdateBefore = true;
public Builder setCheckInterval(Integer checkInterval) {
this.checkInterval = checkInterval;
return this;
}
public Builder setMaxRetries(Integer maxRetries) {
this.maxRetries = maxRetries;
return this;
}
public Builder setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public Builder setBufferCount(int bufferCount) {
this.bufferCount = bufferCount;
return this;
}
public Builder setLabelPrefix(String labelPrefix) {
this.labelPrefix = labelPrefix;
return this;
}
public Builder setUseCache(boolean useCache) {
this.useCache = useCache;
return this;
}
public Builder setStreamLoadProp(Properties streamLoadProp) {
this.streamLoadProp = streamLoadProp;
return this;
}
public Builder setDeletable(Boolean enableDelete) {
this.enableDelete = enableDelete;
return this;
}
public Builder disable2PC() {
this.enable2PC = false;
return this;
}
public Builder enable2PC() {
this.enable2PC = true;
//Force open 2pc
this.force2PC = true;
return this;
}
public Builder enableBatchMode() {
this.enableBatchMode = true;
return this;
}
public Builder setFlushQueueSize(int flushQueueSize) {
this.flushQueueSize = flushQueueSize;
return this;
}
public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
Preconditions.checkState(bufferFlushIntervalMs >= 1000, "bufferFlushIntervalMs must be greater than or equal to 1 second");
this.bufferFlushIntervalMs = bufferFlushIntervalMs;
return this;
}
public Builder setBufferFlushMaxRows(int bufferFlushMaxRows) {
this.bufferFlushMaxRows = bufferFlushMaxRows;
return this;
}
public Builder setBufferFlushMaxBytes(int bufferFlushMaxBytes) {
this.bufferFlushMaxBytes = bufferFlushMaxBytes;
return this;
}
public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) {
this.ignoreUpdateBefore = ignoreUpdateBefore;
return this;
}
public DorisExecutionOptions build() {
return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, useCache,
streamLoadProp, enableDelete, enable2PC, enableBatchMode, flushQueueSize, bufferFlushMaxRows,
bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore, force2PC);
}
}
}