| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.flink.sink.writer; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import org.apache.doris.flink.cfg.DorisExecutionOptions; |
| import org.apache.doris.flink.cfg.DorisOptions; |
| import org.apache.doris.flink.exception.DorisException; |
| import org.apache.doris.flink.exception.DorisRuntimeException; |
| import org.apache.doris.flink.exception.StreamLoadException; |
| import org.apache.doris.flink.rest.models.RespContent; |
| import org.apache.doris.flink.sink.HttpPutBuilder; |
| import org.apache.doris.flink.sink.ResponseUtil; |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.util.Preconditions; |
| import org.apache.flink.util.concurrent.ExecutorThreadFactory; |
| import org.apache.http.client.methods.CloseableHttpResponse; |
| import org.apache.http.entity.InputStreamEntity; |
| import org.apache.http.impl.client.CloseableHttpClient; |
| import org.apache.http.util.EntityUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.regex.Matcher; |
| |
| import static org.apache.doris.flink.sink.LoadStatus.FAIL; |
| import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST; |
| import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN; |
| import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; |
| import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; |
| |
| /** |
| * load data to doris. |
| **/ |
| public class DorisStreamLoad implements Serializable { |
| private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); |
| private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); |
| private final LabelGenerator labelGenerator; |
| private final byte[] lineDelimiter; |
| private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; |
| private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc"; |
| private static final String JOB_EXIST_FINISHED = "FINISHED"; |
| |
| private String loadUrlStr; |
| private String hostPort; |
| private final String abortUrlStr; |
| private final String user; |
| private final String passwd; |
| private final String db; |
| private final String table; |
| private final boolean enable2PC; |
| private final Properties streamLoadProp; |
| private final RecordStream recordStream; |
| private Future<CloseableHttpResponse> pendingLoadFuture; |
| private final CloseableHttpClient httpClient; |
| private final ExecutorService executorService; |
| private boolean loadBatchFirstRecord; |
| |
| public DorisStreamLoad(String hostPort, |
| DorisOptions dorisOptions, |
| DorisExecutionOptions executionOptions, |
| LabelGenerator labelGenerator, |
| CloseableHttpClient httpClient) { |
| this.hostPort = hostPort; |
| String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); |
| this.db = tableInfo[0]; |
| this.table = tableInfo[1]; |
| this.user = dorisOptions.getUsername(); |
| this.passwd = dorisOptions.getPassword(); |
| this.labelGenerator = labelGenerator; |
| this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table); |
| this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db); |
| this.enable2PC = executionOptions.enabled2PC(); |
| this.streamLoadProp = executionOptions.getStreamLoadProp(); |
| this.httpClient = httpClient; |
| this.executorService = new ThreadPoolExecutor(1, 1, |
| 0L, TimeUnit.MILLISECONDS, |
| new LinkedBlockingQueue<>(), new ExecutorThreadFactory("stream-load-upload")); |
| this.recordStream = new RecordStream(executionOptions.getBufferSize(), executionOptions.getBufferCount()); |
| lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT).getBytes(); |
| loadBatchFirstRecord = true; |
| } |
| |
| public String getDb() { |
| return db; |
| } |
| |
| public String getHostPort() { |
| return hostPort; |
| } |
| |
| public void setHostPort(String hostPort) { |
| this.hostPort = hostPort; |
| this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db, this.table); |
| } |
| |
| public Future<CloseableHttpResponse> getPendingLoadFuture() { |
| return pendingLoadFuture; |
| } |
| |
| /** |
| * try to discard pending transactions with labels beginning with labelSuffix. |
| * @param labelSuffix |
| * @param chkID |
| * @throws Exception |
| */ |
| public void abortPreCommit(String labelSuffix, long chkID) throws Exception { |
| long startChkID = chkID; |
| LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID); |
| while (true) { |
| try { |
| String label = labelGenerator.generateLabel(startChkID); |
| HttpPutBuilder builder = new HttpPutBuilder(); |
| builder.setUrl(loadUrlStr) |
| .baseAuth(user, passwd) |
| .addCommonHeader() |
| .enable2PC() |
| .setLabel(label) |
| .setEmptyEntity() |
| .addProperties(streamLoadProp); |
| RespContent respContent = handlePreCommitResponse(httpClient.execute(builder.build())); |
| Preconditions.checkState("true".equals(respContent.getTwoPhaseCommit())); |
| if (LABEL_ALREADY_EXIST.equals(respContent.getStatus())) { |
| // label already exist and job finished |
| if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) { |
| throw new DorisException("Load status is " + LABEL_ALREADY_EXIST + " and load job finished, " + |
| "change you label prefix or restore from latest savepoint!"); |
| |
| } |
| // job not finished, abort. |
| Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage()); |
| if (matcher.find()) { |
| Preconditions.checkState(label.equals(matcher.group(1))); |
| long txnId = Long.parseLong(matcher.group(2)); |
| LOG.info("abort {} for exist label {}", txnId, label); |
| abortTransaction(txnId); |
| } else { |
| LOG.error("response: {}", respContent.toString()); |
| throw new DorisException("Load Status is " + LABEL_ALREADY_EXIST + ", but no txnID associated with it!"); |
| } |
| } else { |
| LOG.info("abort {} for check label {}.", respContent.getTxnId(), label); |
| abortTransaction(respContent.getTxnId()); |
| break; |
| } |
| startChkID++; |
| } catch (Exception e) { |
| LOG.warn("failed to stream load data", e); |
| throw e; |
| } |
| } |
| LOG.info("abort for labelSuffix {} finished", labelSuffix); |
| } |
| |
| /** |
| * write record into stream. |
| * @param record |
| * @throws IOException |
| */ |
| public void writeRecord(byte[] record) throws IOException{ |
| if (loadBatchFirstRecord) { |
| loadBatchFirstRecord = false; |
| } else { |
| recordStream.write(lineDelimiter); |
| } |
| recordStream.write(record); |
| } |
| |
| @VisibleForTesting |
| public RecordStream getRecordStream() { |
| return recordStream; |
| } |
| |
| public RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception{ |
| final int statusCode = response.getStatusLine().getStatusCode(); |
| if (statusCode == 200 && response.getEntity() != null) { |
| String loadResult = EntityUtils.toString(response.getEntity()); |
| LOG.info("load Result {}", loadResult); |
| return OBJECT_MAPPER.readValue(loadResult, RespContent.class); |
| } |
| throw new StreamLoadException("stream load error: " + response.getStatusLine().toString()); |
| } |
| |
| public RespContent stopLoad() throws IOException{ |
| recordStream.endInput(); |
| LOG.info("stream load stopped."); |
| Preconditions.checkState(pendingLoadFuture != null); |
| try { |
| return handlePreCommitResponse(pendingLoadFuture.get()); |
| } catch (Exception e) { |
| throw new DorisRuntimeException(e); |
| } |
| } |
| |
| /** |
| * start write data for new checkpoint. |
| * @param label |
| * @throws IOException |
| */ |
| public void startLoad(String label) throws IOException{ |
| loadBatchFirstRecord = true; |
| HttpPutBuilder putBuilder = new HttpPutBuilder(); |
| recordStream.startInput(); |
| LOG.info("stream load started for {}", label); |
| try { |
| InputStreamEntity entity = new InputStreamEntity(recordStream); |
| putBuilder.setUrl(loadUrlStr) |
| .baseAuth(user, passwd) |
| .addCommonHeader() |
| .setLabel(label) |
| .setEntity(entity) |
| .addProperties(streamLoadProp); |
| if (enable2PC) { |
| putBuilder.enable2PC(); |
| } |
| pendingLoadFuture = executorService.submit(() -> { |
| LOG.info("start execute load"); |
| return httpClient.execute(putBuilder.build()); |
| }); |
| } catch (Exception e) { |
| String err = "failed to stream load data with label: " + label; |
| LOG.warn(err, e); |
| throw e; |
| } |
| } |
| |
| private void abortTransaction(long txnID) throws Exception { |
| HttpPutBuilder builder = new HttpPutBuilder(); |
| builder.setUrl(abortUrlStr) |
| .baseAuth(user, passwd) |
| .addCommonHeader() |
| .addTxnId(txnID) |
| .setEmptyEntity() |
| .abort(); |
| CloseableHttpResponse response = httpClient.execute(builder.build()); |
| |
| int statusCode = response.getStatusLine().getStatusCode(); |
| if (statusCode != 200 || response.getEntity() == null) { |
| LOG.warn("abort transaction response: " + response.getStatusLine().toString()); |
| throw new DorisRuntimeException("Fail to abort transaction " + txnID + " with url " + abortUrlStr); |
| } |
| |
| ObjectMapper mapper = new ObjectMapper(); |
| String loadResult = EntityUtils.toString(response.getEntity()); |
| Map<String, String> res = mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>(){}); |
| if (FAIL.equals(res.get("status"))) { |
| if (ResponseUtil.isCommitted(res.get("msg"))) { |
| throw new DorisException("try abort committed transaction, " + |
| "do you recover from old savepoint?"); |
| } |
| LOG.warn("Fail to abort transaction. error: {}", res.get("msg")); |
| } |
| } |
| |
| public void close() throws IOException { |
| if (null != httpClient) { |
| try { |
| httpClient.close(); |
| } catch (IOException e) { |
| throw new IOException("Closing httpClient failed.", e); |
| } |
| } |
| if (null != executorService) { |
| executorService.shutdownNow(); |
| } |
| } |
| } |