| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.flink.sink.batch; |
| |
| import org.apache.doris.flink.cfg.DorisExecutionOptions; |
| import org.apache.doris.flink.cfg.DorisOptions; |
| import org.apache.doris.flink.cfg.DorisReadOptions; |
| import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; |
| import org.apache.doris.flink.sink.writer.LabelGenerator; |
| import org.apache.flink.api.connector.sink2.Sink; |
| import org.apache.flink.api.connector.sink2.SinkWriter; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.util.Preconditions; |
| import org.apache.flink.util.StringUtils; |
| import org.apache.flink.util.concurrent.ExecutorThreadFactory; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| public class DorisBatchWriter<IN> implements SinkWriter<IN> { |
| private static final Logger LOG = LoggerFactory.getLogger(DorisBatchWriter.class); |
| private DorisBatchStreamLoad batchStreamLoad; |
| private final DorisOptions dorisOptions; |
| private final DorisReadOptions dorisReadOptions; |
| private final DorisExecutionOptions executionOptions; |
| private final String labelPrefix; |
| private final LabelGenerator labelGenerator; |
| private final long flushIntervalMs; |
| private final DorisRecordSerializer<IN> serializer; |
| private final transient ScheduledExecutorService scheduledExecutorService; |
| private transient volatile Exception flushException = null; |
| private String database; |
| private String table; |
| |
| public DorisBatchWriter(Sink.InitContext initContext, |
| DorisRecordSerializer<IN> serializer, |
| DorisOptions dorisOptions, |
| DorisReadOptions dorisReadOptions, |
| DorisExecutionOptions executionOptions) { |
| if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){ |
| String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); |
| Preconditions.checkState(tableInfo.length == 2, "tableIdentifier input error, the format is database.table"); |
| this.database = tableInfo[0]; |
| this.table = tableInfo[1]; |
| } |
| LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); |
| this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId(); |
| this.labelGenerator = new LabelGenerator(labelPrefix, false); |
| this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-flush-interval")); |
| this.serializer = serializer; |
| this.dorisOptions = dorisOptions; |
| this.dorisReadOptions = dorisReadOptions; |
| this.executionOptions = executionOptions; |
| this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs(); |
| } |
| |
| public void initializeLoad() throws IOException { |
| this.batchStreamLoad = new DorisBatchStreamLoad(dorisOptions, dorisReadOptions, executionOptions, labelGenerator); |
| // when uploading data in streaming mode, we need to regularly detect whether there are exceptions. |
| scheduledExecutorService.scheduleWithFixedDelay(this::intervalFlush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS); |
| } |
| |
| private void intervalFlush() { |
| try { |
| LOG.info("interval flush triggered."); |
| batchStreamLoad.flush(null, false); |
| } catch (InterruptedException e) { |
| flushException = e; |
| } |
| } |
| |
| @Override |
| public void write(IN in, Context context) throws IOException, InterruptedException { |
| checkFlushException(); |
| String db = this.database; |
| String tbl = this.table; |
| Tuple2<String, byte[]> rowTuple = serializer.serialize(in); |
| if(rowTuple == null || rowTuple.f1 == null){ |
| //ddl or value is null |
| return; |
| } |
| //multi table load |
| if(rowTuple.f0 != null){ |
| String[] tableInfo = rowTuple.f0.split("\\."); |
| db = tableInfo[0]; |
| tbl = tableInfo[1]; |
| } |
| batchStreamLoad.writeRecord(db, tbl, rowTuple.f1); |
| } |
| @Override |
| public void flush(boolean flush) throws IOException, InterruptedException { |
| checkFlushException(); |
| LOG.info("checkpoint flush triggered."); |
| batchStreamLoad.flush(null, true); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| LOG.info("DorisBatchWriter Close"); |
| if (scheduledExecutorService != null) { |
| scheduledExecutorService.shutdownNow(); |
| } |
| batchStreamLoad.close(); |
| } |
| |
| private void checkFlushException() { |
| if (flushException != null) { |
| throw new RuntimeException("Writing records to streamload failed.", flushException); |
| } |
| } |
| } |