blob: e3a19d52386d6fffcd6a3fe84ced6dad90f40439 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.flink.doris.sink;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringJoiner;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* DorisDynamicOutputFormat
**/
public class DorisOutputFormat<T> extends RichOutputFormat<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(DorisOutputFormat.class);
private static final long serialVersionUID = -4514164348993670086L;
private static final long DEFAULT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String FIELD_DELIMITER_KEY = "column_separator";
private static final String FIELD_DELIMITER_DEFAULT = "\t";
private static final String LINE_DELIMITER_KEY = "line_delimiter";
private static final String LINE_DELIMITER_DEFAULT = "\n";
private static final String FORMAT_KEY = "format";
private static final String FORMAT_JSON_VALUE = "json";
private static final String NULL_VALUE = "\\N";
private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
private static final String ESCAPE_DELIMITERS_DEFAULT = "false";
private static final Pattern DELIMITER_PATTERN = Pattern.compile("\\\\x(\\d{2})");
private final String[] fieldNames;
private final boolean jsonFormat;
private final int batchSize;
private final int maxRetries;
private final long batchIntervalMs;
private final List<Object> batch = new ArrayList<>();
private String fieldDelimiter;
private String lineDelimiter;
private final DorisStreamLoad dorisStreamLoad;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile Exception flushException;
private transient volatile boolean closed = false;
public DorisOutputFormat(DorisStreamLoad dorisStreamLoad,
String[] fieldNames,
int batchSize, long batchIntervalMs, int maxRetries) {
this.dorisStreamLoad = dorisStreamLoad;
parseDelimiter();
this.fieldNames = fieldNames;
this.batchSize = batchSize;
this.batchIntervalMs = batchIntervalMs;
this.maxRetries = maxRetries;
this.jsonFormat = FORMAT_JSON_VALUE.equals(dorisStreamLoad.getStreamLoadProp().getProperty(FORMAT_KEY));
}
private void parseDelimiter() {
Properties streamLoadProp = dorisStreamLoad.getStreamLoadProp();
boolean ifEscape = Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
if (ifEscape) {
this.fieldDelimiter = escapeString(streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT));
this.lineDelimiter = escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT));
if (streamLoadProp.contains(ESCAPE_DELIMITERS_KEY)) {
streamLoadProp.remove(ESCAPE_DELIMITERS_KEY);
}
} else {
this.fieldDelimiter = streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT);
this.lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT);
}
}
private String escapeString(String s) {
Matcher m = DELIMITER_PATTERN.matcher(s);
StringBuffer buf = new StringBuffer();
while (m.find()) {
m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1))));
}
m.appendTail(buf);
return buf.toString();
}
@Override
public void configure(Configuration configuration) {
}
@Override
public void open(int taskNumber, int numTasks) {
if (batchIntervalMs > 0 && batchSize != 1) {
this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("doris-streamload-outputformat"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (DorisOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
}, batchIntervalMs, batchIntervalMs, TimeUnit.MILLISECONDS);
}
}
private void checkFlushException() {
if (flushException != null) {
throw new RuntimeException("Writing records to streamload failed.", flushException);
}
}
@Override
public synchronized void writeRecord(T row) throws IOException {
checkFlushException();
addBatch(row);
if (batchSize > 0 && batch.size() >= batchSize) {
flush();
}
}
private void addBatch(T row) {
if (row instanceof Row) {
Row rowData = (Row) row;
Map<String, String> valueMap = Maps.newHashMap();
StringJoiner value = new StringJoiner(this.fieldDelimiter);
for (int i = 0; i < rowData.getArity(); ++i) {
Object field = rowData.getField(i);
if (jsonFormat) {
String data = field != null ? field.toString() : null;
valueMap.put(this.fieldNames[i], data);
} else {
String data = field != null ? field.toString() : NULL_VALUE;
value.add(data);
}
}
Object data = jsonFormat ? valueMap : value.toString();
batch.add(data);
} else if (row instanceof String) {
batch.add(row);
} else {
throw new RuntimeException("The type of element should be 'RowData' or 'String' only.");
}
}
@Override
public synchronized void close() {
if (!closed) {
closed = true;
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
try {
flush();
} catch (Exception e) {
throw new RuntimeException("Writing records to doris failed.", e);
}
}
checkFlushException();
}
public synchronized void flush() throws IOException {
checkFlushException();
if (batch.isEmpty()) {
return;
}
String result;
if (jsonFormat) {
if (batch.get(0) instanceof String) {
result = batch.toString();
} else {
result = OBJECT_MAPPER.writeValueAsString(batch);
}
} else {
result = String.join(this.lineDelimiter, batch.toArray(new CharSequence[batch.size()]));
}
for (int i = 0; i <= maxRetries; i++) {
try {
dorisStreamLoad.load(result);
batch.clear();
break;
} catch (Exception e) {
LOGGER.error("doris sink error, retry times = {}", i, e);
if (i >= maxRetries) {
throw new IOException(e);
}
try {
Thread.sleep(DEFAULT_INTERVAL_MS * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("unable to flush; interrupted while doing another attempt", e);
}
}
}
}
}