blob: 875eb7582d530afc1de7926666eeb78489b89ed7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.connector.doris.internal;
import org.apache.streampark.common.enums.Semantic;
import org.apache.streampark.connector.doris.conf.DorisConfig;
import org.apache.streampark.flink.connector.doris.bean.DorisSinkBufferEntry;
import org.apache.streampark.flink.connector.doris.bean.LoadStatusFailedException;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class DorisSinkWriter implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DorisSinkWriter.class);
private final DorisConfig dorisConfig;
private final DorisStreamLoader streamLoader;
private ScheduledExecutorService schedule;
private ScheduledFuture<?> scheduledFuture;
private transient Counter totalFlushBytes;
private transient Counter totalFlushRows;
private transient Counter totalFlushTime;
private transient Counter totalFlushTimeWithoutRetries;
private transient Counter totalFlushSucceededTimes;
private transient Counter totalFlushFailedTimes;
private final Map<String, DorisSinkBufferEntry> bufferMap = new ConcurrentHashMap<>();
private final Long timeout = 3000L;
private volatile boolean closed = false;
private volatile boolean flushThreadAlive = false;
private volatile Throwable flushException;
final LinkedBlockingDeque<DorisSinkBufferEntry> flushQueue = new LinkedBlockingDeque<>(10);
private static final String COUNTER_TOTAL_FLUSH_BYTES = "totalFlushBytes";
private static final String COUNTER_TOTAL_FLUSH_ROWS = "totalFlushRows";
private static final String COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES = "totalFlushTimeNsWithoutRetries";
private static final String COUNTER_TOTAL_FLUSH_COST_TIME = "totalFlushTimeNs";
private static final String COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES = "totalFlushSucceededTimes";
private static final String COUNTER_TOTAL_FLUSH_FAILED_TIMES = "totalFlushFailedTimes";
private final Semantic semantic;
public DorisSinkWriter(DorisConfig dorisConfig) {
this.streamLoader = new DorisStreamLoader(dorisConfig);
this.dorisConfig = dorisConfig;
semantic = Semantic.of(dorisConfig.semantic());
}
public void setRuntimeContext(RuntimeContext runtimeCtx) {
totalFlushBytes = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES);
totalFlushRows = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS);
totalFlushTime = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME);
totalFlushTimeWithoutRetries = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
totalFlushSucceededTimes = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES);
totalFlushFailedTimes = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES);
}
public void startAsyncFlushing() {
final Thread flushThread = new Thread(() -> {
while (true) {
try {
if (!asyncFlush()) {
LOG.info("doris flush thread is about to exit.");
flushThreadAlive = false;
break;
}
} catch (Exception e) {
flushException = e;
}
}
});
flushThread.setUncaughtExceptionHandler((t, e) -> {
LOG.error("dorics flush thread uncaught exception occurred:" + e.getMessage(), e);
flushException = e;
flushThreadAlive = false;
});
flushThread.setName("doris-flush");
flushThread.setDaemon(true);
flushThread.start();
flushThreadAlive = true;
}
public void startScheduler() {
if (semantic.equals(Semantic.EXACTLY_ONCE)) {
return;
}
stopSchedule();
this.schedule = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-interval-sink"));
this.scheduledFuture = this.schedule.schedule(() -> {
synchronized (DorisSinkWriter.this) {
if (!closed) {
try {
LOG.info("doris interval sinking trigger");
if (bufferMap.isEmpty()) {
startScheduler();
}
flush(null, false);
} catch (Exception e) {
flushException = e;
}
}
}
}, dorisConfig.flushInterval(), TimeUnit.MILLISECONDS);
}
private void stopSchedule() {
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.schedule.shutdown();
}
}
public final synchronized void writeRecords(String database, String table, String... records) throws IOException {
checkFlushException();
try {
if (records.length == 0) {
return;
}
final String bufferKey = String.format("%s.%s", database, table);
final DorisSinkBufferEntry bufferEntity = bufferMap.computeIfAbsent(bufferKey,
k -> new DorisSinkBufferEntry(database, table, dorisConfig.lablePrefix()));
for (String record : records) {
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
bufferEntity.addToBuffer(bts);
}
if (Semantic.EXACTLY_ONCE.equals(semantic)) {
return;
}
if (bufferEntity.getBatchCount() >= dorisConfig.sinkMaxRow() || bufferEntity.getBatchSize() >= dorisConfig.sinkMaxBytes()) {
LOG.info(String.format("doris buffer Sinking triggered: db: [%s] table: [%s] rows[%d] label[%s].", database, table, bufferEntity.getBatchCount(), bufferEntity.getLabel()));
flush(bufferKey, false);
}
} catch (Exception e) {
throw new IOException("Writing records to doris failed.", e);
}
}
public synchronized void flush(String bufferKey, boolean waitUntilDone) throws Exception {
if (bufferMap.isEmpty()) {
flushInternal(null, waitUntilDone);
return;
}
if (null == bufferKey) {
for (String key : bufferMap.keySet()) {
flushInternal(key, waitUntilDone);
}
return;
}
flushInternal(bufferKey, waitUntilDone);
}
private synchronized void flushInternal(String bufferKey, boolean waitUntilDone) throws Exception {
checkFlushException();
if (null == bufferKey || bufferMap.isEmpty() || !bufferMap.containsKey(bufferKey)) {
if (waitUntilDone) {
waitAsyncFlushingDone();
}
return;
}
offer(bufferMap.get(bufferKey));
bufferMap.remove(bufferKey);
if (waitUntilDone) {
// wait the last flush
waitAsyncFlushingDone();
}
}
private void waitAsyncFlushingDone() throws InterruptedException {
// wait for previous flushings
offer(new DorisSinkBufferEntry(null, null, null));
checkFlushException();
}
private void offer(DorisSinkBufferEntry bufferEntity) throws InterruptedException {
if (!flushThreadAlive) {
throw new RuntimeException(
"Flush thread already exit or not start ,please exec startAsyncFlushing() , ignore offer request for label[%s] ");
}
if (!flushQueue.offer(bufferEntity, dorisConfig.sinkOfferTimeout(), TimeUnit.MILLISECONDS)) {
throw new RuntimeException(
"Timeout while offering data to flushQueue, exceed " + dorisConfig.sinkOfferTimeout() + " ms, see " +
dorisConfig.sinkOption().sinkOfferTimeout().key());
}
}
private boolean asyncFlush() throws Exception {
final DorisSinkBufferEntry flushData = flushQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (flushData == null || flushData.getBatchCount() == 0) {
return true;
}
stopSchedule();
LOG.info(String.format("Async stream load: db[%s] table[%s] rows[%d] bytes[%d] label[%s].", flushData.getDatabase(),
flushData.getTable(), flushData.getBatchCount(), flushData.getBatchSize(), flushData.getLabel()));
long startWithRetries = System.nanoTime();
for (int i = 0; i < dorisConfig.sinkMaxRetries(); i++) {
try {
long start = System.nanoTime();
streamLoader.doStreamLoad(flushData);
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
if (null != totalFlushBytes) {
totalFlushBytes.inc(flushData.getBatchSize());
totalFlushRows.inc(flushData.getBatchCount());
totalFlushTime.inc(System.nanoTime() - startWithRetries);
totalFlushTimeWithoutRetries.inc(System.nanoTime() - start);
totalFlushSucceededTimes.inc();
}
startScheduler();
break;
} catch (Exception e) {
if (totalFlushFailedTimes != null) {
totalFlushFailedTimes.inc();
}
LOG.warn("Failed to flush batch data to doris, retry times = {}", i, e);
if (i >= dorisConfig.sinkMaxRetries()) {
throw e;
}
if (e instanceof LoadStatusFailedException && ((LoadStatusFailedException) e).needReCreateLabel()) {
String oldLabel = flushData.getLabel();
flushData.reGenerateLabel();
LOG.warn(String.format("Batch label changed from [%s] to [%s]", oldLabel, flushData.getLabel()));
}
}
try {
Thread.sleep(1000L * (i + 1));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("Unable to flush, interrupted while doing another attempt", ex);
}
}
return true;
}
public synchronized void close() throws Exception {
if (!closed) {
closed = true;
LOG.info("Sink is about to close.");
flush(null, false);
this.bufferMap.clear();
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
schedule.shutdown();
}
}
checkFlushException();
}
private void checkFlushException() {
if (flushException != null) {
StackTraceElement[] stack = Thread.currentThread().getStackTrace();
for (int i = 0; i < stack.length; i++) {
LOG.info(stack[i].getClassName() + "." + stack[i].getMethodName() + " line:" + stack[i].getLineNumber());
}
throw new RuntimeException("Writing records to doris failed.", flushException);
}
}
public Map<String, DorisSinkBufferEntry> getBufferedBatchMap() {
return new HashMap<>(bufferMap);
}
public void setBufferedBatchMap(Map<String, DorisSinkBufferEntry> newBufferMap) {
if (Semantic.EXACTLY_ONCE.equals(semantic)) {
bufferMap.clear();
bufferMap.putAll(newBufferMap);
}
}
}