| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.seatunnel.connectors.doris.sink.committer; |
| |
| import org.apache.seatunnel.api.sink.SinkCommitter; |
| import org.apache.seatunnel.connectors.doris.config.DorisConfig; |
| import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; |
| import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; |
| import org.apache.seatunnel.connectors.doris.sink.HttpPutBuilder; |
| import org.apache.seatunnel.connectors.doris.sink.LoadStatus; |
| import org.apache.seatunnel.connectors.doris.util.HttpUtil; |
| import org.apache.seatunnel.connectors.doris.util.ResponseUtil; |
| |
| import org.apache.http.client.methods.CloseableHttpResponse; |
| import org.apache.http.impl.client.CloseableHttpClient; |
| import org.apache.http.util.EntityUtils; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import lombok.extern.slf4j.Slf4j; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| /** The committer to commit transaction. */ |
| @Slf4j |
| public class DorisCommitter implements SinkCommitter<DorisCommitInfo> { |
| private static final String COMMIT_PATTERN = "http://%s/api/%s/_stream_load_2pc"; |
| private static final int HTTP_TEMPORARY_REDIRECT = 200; |
| private final CloseableHttpClient httpClient; |
| private final DorisConfig dorisConfig; |
| int maxRetry; |
| |
| public DorisCommitter(DorisConfig dorisConfig) { |
| this(dorisConfig, new HttpUtil().getHttpClient()); |
| } |
| |
| public DorisCommitter(DorisConfig dorisConfig, CloseableHttpClient client) { |
| this.dorisConfig = dorisConfig; |
| this.httpClient = client; |
| } |
| |
| @Override |
| public List<DorisCommitInfo> commit(List<DorisCommitInfo> commitInfos) throws IOException { |
| for (DorisCommitInfo commitInfo : commitInfos) { |
| commitTransaction(commitInfo); |
| } |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| public void abort(List<DorisCommitInfo> commitInfos) throws IOException { |
| for (DorisCommitInfo commitInfo : commitInfos) { |
| abortTransaction(commitInfo); |
| } |
| } |
| |
| private void commitTransaction(DorisCommitInfo committable) |
| throws IOException, DorisConnectorException { |
| int statusCode = -1; |
| String reasonPhrase = null; |
| int retry = 0; |
| String hostPort = committable.getHostPort(); |
| CloseableHttpResponse response = null; |
| while (retry++ <= dorisConfig.getMaxRetries()) { |
| HttpPutBuilder putBuilder = new HttpPutBuilder(); |
| putBuilder |
| .setUrl(String.format(COMMIT_PATTERN, hostPort, committable.getDb())) |
| .baseAuth(dorisConfig.getUsername(), dorisConfig.getPassword()) |
| .addCommonHeader() |
| .addTxnId(committable.getTxbID()) |
| .setEmptyEntity() |
| .commit(); |
| try { |
| response = httpClient.execute(putBuilder.build()); |
| } catch (IOException e) { |
| log.error("commit transaction failed: ", e); |
| hostPort = dorisConfig.getFrontends(); |
| continue; |
| } |
| statusCode = response.getStatusLine().getStatusCode(); |
| reasonPhrase = response.getStatusLine().getReasonPhrase(); |
| if (statusCode != HTTP_TEMPORARY_REDIRECT) { |
| log.warn("commit failed with {}, reason {}", hostPort, reasonPhrase); |
| hostPort = dorisConfig.getFrontends(); |
| } else { |
| break; |
| } |
| } |
| |
| if (statusCode != HTTP_TEMPORARY_REDIRECT) { |
| throw new DorisConnectorException( |
| DorisConnectorErrorCode.STREAM_LOAD_FAILED, reasonPhrase); |
| } |
| |
| ObjectMapper mapper = new ObjectMapper(); |
| if (response.getEntity() != null) { |
| String loadResult = EntityUtils.toString(response.getEntity()); |
| Map<String, String> res = |
| mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {}); |
| if (!LoadStatus.SUCCESS.equals(res.get("status"))) { |
| log.error( |
| "commit transaction error url:{},TxnId:{},result:{}", |
| String.format(COMMIT_PATTERN, hostPort, committable.getDb()), |
| committable.getTxbID(), |
| loadResult); |
| throw new DorisConnectorException( |
| DorisConnectorErrorCode.COMMIT_FAILED, loadResult); |
| } else { |
| log.info("load result {}", loadResult); |
| } |
| } |
| } |
| |
| private void abortTransaction(DorisCommitInfo committable) |
| throws IOException, DorisConnectorException { |
| int statusCode; |
| int retry = 0; |
| String hostPort = committable.getHostPort(); |
| CloseableHttpResponse response = null; |
| while (retry++ <= maxRetry) { |
| HttpPutBuilder builder = new HttpPutBuilder(); |
| builder.setUrl(String.format(COMMIT_PATTERN, hostPort, committable.getDb())) |
| .baseAuth(dorisConfig.getUsername(), dorisConfig.getPassword()) |
| .addCommonHeader() |
| .addTxnId(committable.getTxbID()) |
| .setEmptyEntity() |
| .abort(); |
| response = httpClient.execute(builder.build()); |
| statusCode = response.getStatusLine().getStatusCode(); |
| if (statusCode != HTTP_TEMPORARY_REDIRECT || response.getEntity() == null) { |
| log.warn("abort transaction response: " + response.getStatusLine().toString()); |
| throw new DorisConnectorException( |
| DorisConnectorErrorCode.STREAM_LOAD_FAILED, |
| "Fail to abort transaction " |
| + committable.getTxbID() |
| + " with url " |
| + String.format(COMMIT_PATTERN, hostPort, committable.getDb())); |
| } |
| } |
| |
| ObjectMapper mapper = new ObjectMapper(); |
| String loadResult = EntityUtils.toString(response.getEntity()); |
| Map<String, String> res = |
| mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {}); |
| if (!LoadStatus.SUCCESS.equals(res.get("status"))) { |
| if (ResponseUtil.isCommitted(res.get("msg"))) { |
| throw new DorisConnectorException( |
| DorisConnectorErrorCode.STREAM_LOAD_FAILED, |
| "try abort committed transaction, " + "do you recover from old savepoint?"); |
| } |
| log.warn( |
| "Fail to abort transaction. txnId: {}, error: {}", |
| committable.getTxbID(), |
| res.get("msg")); |
| } |
| } |
| } |