| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.flink.sink.writer; |
| |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.api.connector.sink2.Sink; |
| import org.apache.flink.metrics.groups.SinkWriterMetricGroup; |
| import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; |
| import org.apache.flink.util.concurrent.ExecutorThreadFactory; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.doris.flink.cfg.DorisExecutionOptions; |
| import org.apache.doris.flink.cfg.DorisOptions; |
| import org.apache.doris.flink.cfg.DorisReadOptions; |
| import org.apache.doris.flink.exception.DorisRuntimeException; |
| import org.apache.doris.flink.exception.StreamLoadException; |
| import org.apache.doris.flink.rest.models.RespContent; |
| import org.apache.doris.flink.sink.BackendUtil; |
| import org.apache.doris.flink.sink.DorisCommittable; |
| import org.apache.doris.flink.sink.HttpUtil; |
| import org.apache.doris.flink.sink.writer.serializer.DorisRecord; |
| import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; |
| import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; |
| |
| /** |
| * Doris Writer will load data to doris. |
| * |
| * @param <IN> |
| */ |
| public class DorisWriter<IN> |
| implements DorisAbstractWriter<IN, DorisWriterState, DorisCommittable> { |
| private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.class); |
| private static final List<String> DORIS_SUCCESS_STATUS = |
| new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); |
| private final long lastCheckpointId; |
| private long curCheckpointId; |
| private Map<String, DorisStreamLoad> dorisStreamLoadMap = new ConcurrentHashMap<>(); |
| private Map<String, LabelGenerator> labelGeneratorMap = new ConcurrentHashMap<>(); |
| volatile boolean globalLoading; |
| private Map<String, Boolean> loadingMap = new ConcurrentHashMap<>(); |
| private final DorisOptions dorisOptions; |
| private final DorisReadOptions dorisReadOptions; |
| private final DorisExecutionOptions executionOptions; |
| private final String labelPrefix; |
| private final int subtaskId; |
| private final int intervalTime; |
| private final DorisRecordSerializer<IN> serializer; |
| private final transient ScheduledExecutorService scheduledExecutorService; |
| private transient Thread executorThread; |
| private transient volatile Exception loadException = null; |
| private BackendUtil backendUtil; |
| private SinkWriterMetricGroup sinkMetricGroup; |
| private Map<String, DorisWriteMetrics> sinkMetricsMap = new ConcurrentHashMap<>(); |
| |
| public DorisWriter( |
| Sink.InitContext initContext, |
| Collection<DorisWriterState> state, |
| DorisRecordSerializer<IN> serializer, |
| DorisOptions dorisOptions, |
| DorisReadOptions dorisReadOptions, |
| DorisExecutionOptions executionOptions) { |
| this.lastCheckpointId = |
| initContext |
| .getRestoredCheckpointId() |
| .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); |
| this.curCheckpointId = lastCheckpointId + 1; |
| LOG.info("restore checkpointId {}", lastCheckpointId); |
| LOG.info("labelPrefix {}", executionOptions.getLabelPrefix()); |
| this.labelPrefix = executionOptions.getLabelPrefix(); |
| this.subtaskId = initContext.getSubtaskId(); |
| this.scheduledExecutorService = |
| new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check")); |
| this.serializer = serializer; |
| this.dorisOptions = dorisOptions; |
| this.dorisReadOptions = dorisReadOptions; |
| this.executionOptions = executionOptions; |
| this.intervalTime = executionOptions.checkInterval(); |
| this.globalLoading = false; |
| sinkMetricGroup = initContext.metricGroup(); |
| initializeLoad(state); |
| serializer.initial(); |
| } |
| |
| public void initializeLoad(Collection<DorisWriterState> state) { |
| this.backendUtil = BackendUtil.getInstance(dorisOptions, dorisReadOptions, LOG); |
| try { |
| if (executionOptions.enabled2PC()) { |
| abortLingeringTransactions(state); |
| } |
| } catch (Exception e) { |
| LOG.error("Failed to abort transaction.", e); |
| throw new DorisRuntimeException(e); |
| } |
| // get main work thread. |
| executorThread = Thread.currentThread(); |
| // when uploading data in streaming mode, we need to regularly detect whether there are |
| // exceptions. |
| scheduledExecutorService.scheduleWithFixedDelay( |
| this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); |
| } |
| |
| private void abortLingeringTransactions(Collection<DorisWriterState> recoveredStates) |
| throws Exception { |
| List<String> alreadyAborts = new ArrayList<>(); |
| // abort label in state |
| for (DorisWriterState state : recoveredStates) { |
| // Todo: When the sink parallelism is reduced, |
| // the txn of the redundant task before aborting is also needed. |
| if (!state.getLabelPrefix().equals(labelPrefix)) { |
| LOG.warn( |
| "Label prefix from previous execution {} has changed to {}.", |
| state.getLabelPrefix(), |
| executionOptions.getLabelPrefix()); |
| } |
| if (state.getDatabase() == null || state.getTable() == null) { |
| LOG.warn( |
| "Transactions cannot be aborted when restore because the last used flink-doris-connector version less than 1.5.0."); |
| continue; |
| } |
| String key = state.getDatabase() + "." + state.getTable(); |
| DorisStreamLoad streamLoader = getStreamLoader(key); |
| streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId); |
| alreadyAborts.add(state.getLabelPrefix()); |
| } |
| |
| // TODO: In a multi-table scenario, if do not restore from checkpoint, |
| // when modify labelPrefix at startup, we cannot abort the previous label. |
| if (!alreadyAborts.contains(labelPrefix) |
| && StringUtils.isNotEmpty(dorisOptions.getTableIdentifier()) |
| && StringUtils.isNotEmpty(labelPrefix)) { |
| // abort current labelPrefix |
| DorisStreamLoad streamLoader = getStreamLoader(dorisOptions.getTableIdentifier()); |
| streamLoader.abortPreCommit(labelPrefix, curCheckpointId); |
| } |
| } |
| |
| @Override |
| public void write(IN in, Context context) throws IOException, InterruptedException { |
| checkLoadException(); |
| writeOneDorisRecord(serializer.serialize(in)); |
| } |
| |
| @Override |
| public void flush(boolean endOfInput) throws IOException, InterruptedException { |
| writeOneDorisRecord(serializer.flush()); |
| } |
| |
| public void writeOneDorisRecord(DorisRecord record) throws IOException, InterruptedException { |
| |
| if (record == null || record.getRow() == null) { |
| // ddl or value is null |
| return; |
| } |
| |
| // multi table load |
| String tableKey = dorisOptions.getTableIdentifier(); |
| if (record.getTableIdentifier() != null) { |
| tableKey = record.getTableIdentifier(); |
| } |
| |
| DorisStreamLoad streamLoader = getStreamLoader(tableKey); |
| if (!loadingMap.containsKey(tableKey)) { |
| // start stream load only when there has data |
| LabelGenerator labelGenerator = getLabelGenerator(tableKey); |
| String currentLabel = labelGenerator.generateTableLabel(curCheckpointId); |
| streamLoader.startLoad(currentLabel, false); |
| loadingMap.put(tableKey, true); |
| globalLoading = true; |
| registerMetrics(tableKey); |
| } |
| streamLoader.writeRecord(record.getRow()); |
| } |
| |
| @VisibleForTesting |
| public void setSinkMetricGroup(SinkWriterMetricGroup sinkMetricGroup) { |
| this.sinkMetricGroup = sinkMetricGroup; |
| } |
| |
| public void registerMetrics(String tableKey) { |
| if (sinkMetricsMap.containsKey(tableKey)) { |
| return; |
| } |
| DorisWriteMetrics metrics = DorisWriteMetrics.of(sinkMetricGroup, tableKey); |
| sinkMetricsMap.put(tableKey, metrics); |
| } |
| |
| @Override |
| public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException { |
| // Verify whether data is written during a checkpoint |
| if (!globalLoading && loadingMap.values().stream().noneMatch(Boolean::booleanValue)) { |
| return Collections.emptyList(); |
| } |
| // disable exception checker before stop load. |
| globalLoading = false; |
| |
| // submit stream load http request |
| List<DorisCommittable> committableList = new ArrayList<>(); |
| for (Map.Entry<String, DorisStreamLoad> streamLoader : dorisStreamLoadMap.entrySet()) { |
| String tableIdentifier = streamLoader.getKey(); |
| if (!loadingMap.getOrDefault(tableIdentifier, false)) { |
| LOG.debug("skip table {}, no data need to load.", tableIdentifier); |
| continue; |
| } |
| DorisStreamLoad dorisStreamLoad = streamLoader.getValue(); |
| RespContent respContent = dorisStreamLoad.stopLoad(); |
| // refresh metrics |
| if (sinkMetricsMap.containsKey(tableIdentifier)) { |
| DorisWriteMetrics dorisWriteMetrics = sinkMetricsMap.get(tableIdentifier); |
| dorisWriteMetrics.flush(respContent); |
| } |
| if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { |
| String errMsg = |
| String.format( |
| "table %s stream load error: %s, see more in %s", |
| tableIdentifier, |
| respContent.getMessage(), |
| respContent.getErrorURL()); |
| throw new DorisRuntimeException(errMsg); |
| } |
| if (executionOptions.enabled2PC()) { |
| long txnId = respContent.getTxnId(); |
| committableList.add( |
| new DorisCommittable( |
| dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); |
| } |
| } |
| // clean loadingMap |
| loadingMap.clear(); |
| return committableList; |
| } |
| |
| @Override |
| public List<DorisWriterState> snapshotState(long checkpointId) throws IOException { |
| List<DorisWriterState> writerStates = new ArrayList<>(); |
| for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) { |
| // Dynamic refresh backend |
| dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); |
| DorisWriterState writerState = |
| new DorisWriterState( |
| labelPrefix, |
| dorisStreamLoad.getDb(), |
| dorisStreamLoad.getTable(), |
| subtaskId); |
| writerStates.add(writerState); |
| } |
| this.curCheckpointId = checkpointId + 1; |
| return writerStates; |
| } |
| |
| private LabelGenerator getLabelGenerator(String tableKey) { |
| return labelGeneratorMap.computeIfAbsent( |
| tableKey, |
| v -> |
| new LabelGenerator( |
| labelPrefix, executionOptions.enabled2PC(), tableKey, subtaskId)); |
| } |
| |
| private DorisStreamLoad getStreamLoader(String tableKey) { |
| LabelGenerator labelGenerator = getLabelGenerator(tableKey); |
| dorisOptions.setTableIdentifier(tableKey); |
| return dorisStreamLoadMap.computeIfAbsent( |
| tableKey, |
| v -> |
| new DorisStreamLoad( |
| backendUtil.getAvailableBackend(), |
| dorisOptions, |
| executionOptions, |
| labelGenerator, |
| new HttpUtil().getHttpClient())); |
| } |
| |
| /** Check the streamload http request regularly. */ |
| private void checkDone() { |
| for (Map.Entry<String, DorisStreamLoad> streamLoadMap : dorisStreamLoadMap.entrySet()) { |
| checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue()); |
| } |
| } |
| |
| private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoad) { |
| // the load future is done and checked in prepareCommit(). |
| // this will check error while loading. |
| LOG.debug("start timer checker, interval {} ms", intervalTime); |
| if (dorisStreamLoad.getPendingLoadFuture() != null |
| && dorisStreamLoad.getPendingLoadFuture().isDone()) { |
| if (!globalLoading || !loadingMap.get(tableIdentifier)) { |
| LOG.debug("not loading, skip timer checker for table {}", tableIdentifier); |
| return; |
| } |
| |
| // double-check the future, to avoid getting the old future |
| if (dorisStreamLoad.getPendingLoadFuture() != null |
| && dorisStreamLoad.getPendingLoadFuture().isDone()) { |
| // error happened when loading, now we should stop receive data |
| // and abort previous txn(stream load) and start a new txn(stream load) |
| // use send cached data to new txn, then notify to restart the stream |
| if (executionOptions.isUseCache()) { |
| try { |
| dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); |
| if (executionOptions.enabled2PC()) { |
| dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId); |
| } |
| // start a new txn(stream load) |
| LOG.info( |
| "getting exception, breakpoint resume for checkpoint ID: {}, table {}", |
| curCheckpointId, |
| tableIdentifier); |
| LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier); |
| dorisStreamLoad.startLoad( |
| labelGenerator.generateTableLabel(curCheckpointId), true); |
| } catch (Exception e) { |
| throw new DorisRuntimeException(e); |
| } |
| } else { |
| String errorMsg; |
| try { |
| RespContent content = |
| dorisStreamLoad.handlePreCommitResponse( |
| dorisStreamLoad.getPendingLoadFuture().get()); |
| errorMsg = content.getMessage(); |
| } catch (Exception e) { |
| errorMsg = e.getMessage(); |
| } |
| |
| loadException = new StreamLoadException(errorMsg); |
| LOG.error( |
| "table {} stream load finished unexpectedly, interrupt worker thread! {}", |
| tableIdentifier, |
| errorMsg); |
| // set the executor thread interrupted in case blocking in write data. |
| executorThread.interrupt(); |
| } |
| } |
| } |
| } |
| |
| private void checkLoadException() { |
| if (loadException != null) { |
| throw new RuntimeException("error while loading data.", loadException); |
| } |
| } |
| |
| @VisibleForTesting |
| public boolean isLoading() { |
| return this.globalLoading; |
| } |
| |
| @VisibleForTesting |
| public void setDorisStreamLoadMap(Map<String, DorisStreamLoad> streamLoadMap) { |
| this.dorisStreamLoadMap = streamLoadMap; |
| } |
| |
| @VisibleForTesting |
| public void setDorisMetricsMap(Map<String, DorisWriteMetrics> metricsMap) { |
| this.sinkMetricsMap = metricsMap; |
| } |
| |
| @Override |
| public void close() throws Exception { |
| LOG.info("Close DorisWriter."); |
| if (scheduledExecutorService != null) { |
| scheduledExecutorService.shutdownNow(); |
| } |
| if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) { |
| for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) { |
| dorisStreamLoad.close(); |
| } |
| } |
| serializer.close(); |
| } |
| } |