blob: 08bc29d146ee6996c61fe74dec38bd97a38f77fd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.spark.load;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.StreamLoadException;
import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.RespContent;
import org.apache.doris.spark.util.ResponseUtil;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
/**
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
private static final String loadUrlPattern = "http://%s/api/%s/%s/_stream_load";
private static final String abortUrlPattern = "http://%s/api/%s/_stream_load_2pc";
private String loadUrlStr;
private final String db;
private final String tbl;
private final String authEncoded;
private final String columns;
private final String maxFilterRatio;
private final Map<String, String> streamLoadProp;
private boolean addDoubleQuotes;
private static final long cacheExpireTimeout = 4 * 60;
private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
private final String fenodes;
private final String benodes;
private final DataFormat dataFormat;
private String FIELD_DELIMITER;
private final String LINE_DELIMITER;
private boolean streamingPassthrough = false;
private final boolean enable2PC;
private final Integer txnRetries;
private final Integer txnIntervalMs;
private final boolean autoRedirect;
private final String compressType;
public DorisStreamLoad(SparkSettings settings) {
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
this.db = dbTable[0];
this.tbl = dbTable[1];
this.fenodes = settings.getProperty(ConfigurationOptions.DORIS_FENODES);
this.benodes = settings.getProperty(ConfigurationOptions.DORIS_BENODES);
String user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
String passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
this.authEncoded = getAuthEncoded(user, passwd);
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
this.streamLoadProp = getStreamLoadProp(settings);
cache = CacheBuilder.newBuilder().expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES).build(new BackendCacheLoader(settings));
dataFormat = DataFormat.valueOf(streamLoadProp.getOrDefault("format", "csv").toUpperCase());
switch (dataFormat) {
case CSV:
FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
this.addDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false"));
if (addDoubleQuotes) {
LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop.");
streamLoadProp.put("trim_double_quotes", "true");
}
break;
case JSON:
streamLoadProp.put("read_json_by_line", "true");
break;
}
LINE_DELIMITER = escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n"));
this.streamingPassthrough = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH,
ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT);
this.enable2PC = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC,
ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT);
this.txnRetries = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_RETRIES,
ConfigurationOptions.DORIS_SINK_TXN_RETRIES_DEFAULT);
this.txnIntervalMs = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS_DEFAULT);
this.autoRedirect = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT,
ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT);
compressType=settings.getProperty(ConfigurationOptions.DORIS_SINK_DATA_COMPRESS_TYPE);
}
public String getLoadUrlStr() {
if (StringUtils.isEmpty(loadUrlStr)) {
return "";
}
return loadUrlStr;
}
private CloseableHttpClient getHttpClient() {
HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
return httpClientBuilder.build();
}
private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC, StructType schema) {
HttpPut httpPut = new HttpPut(loadUrlStr);
addCommonHeader(httpPut);
httpPut.setHeader("label", label);
if (StringUtils.isNotBlank(columns)) {
httpPut.setHeader("columns", columns);
} else {
if (schema != null && !schema.isEmpty()) {
String dfColumns = Arrays.stream(schema.fieldNames()).collect(Collectors.joining(","));
httpPut.setHeader("columns", dfColumns);
}
}
if (StringUtils.isNotBlank(maxFilterRatio)) {
httpPut.setHeader("max_filter_ratio", maxFilterRatio);
}
if (enable2PC) {
httpPut.setHeader("two_phase_commit", "true");
}
if (MapUtils.isNotEmpty(streamLoadProp)) {
streamLoadProp.forEach((k, v) -> {
if (!"strip_outer_array".equalsIgnoreCase(k)) {
httpPut.setHeader(k, v);
}
});
}
return httpPut;
}
public static class LoadResponse {
public int status;
public String respMsg;
public String respContent;
public LoadResponse(HttpResponse response) throws IOException {
this.status = response.getStatusLine().getStatusCode();
this.respMsg = response.getStatusLine().getReasonPhrase();
this.respContent = EntityUtils.toString(new BufferedHttpEntity(response.getEntity()), StandardCharsets.UTF_8);
}
@Override
public String toString() {
return "status: " + status + ", resp msg: " + respMsg + ", resp content: " + respContent;
}
}
public long load(Iterator<InternalRow> rows, StructType schema)
throws StreamLoadException, JsonProcessingException {
String label = generateLoadLabel();
LoadResponse loadResponse;
try (CloseableHttpClient httpClient = getHttpClient()) {
String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl);
this.loadUrlStr = loadUrlStr;
HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema);
if(StringUtils.isNotEmpty(compressType)){
if("gz".equals(compressType.toLowerCase()) && dataFormat.equals(DataFormat.CSV) ){
RecordBatchString recordBatchString = new RecordBatchString(RecordBatch.newBuilder(rows)
.format(dataFormat)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
String content = recordBatchString.getContent();
byte[] compressedData = compressByGZ(content);
httpPut.setEntity(new ByteArrayEntity(compressedData));
}else{
throw new StreamLoadException("Not support the compress type [" + compressType + "] for the dataformat [" + dataFormat + "]");
}
}else{
RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
.format(dataFormat)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
}
HttpResponse httpResponse = httpClient.execute(httpPut);
loadResponse = new LoadResponse(httpResponse);
} catch (IOException e) {
if (enable2PC) {
int retries = txnRetries;
while (retries > 0) {
try {
abortByLabel(label);
retries = 0;
} catch (StreamLoadException ex) {
LockSupport.parkNanos(Duration.ofMillis(txnIntervalMs).toNanos());
retries--;
}
}
}
throw new StreamLoadException("load execute failed", e);
}
if (loadResponse.status != HttpStatus.SC_OK) {
LOG.error("Stream load http status is not OK, status: {}, response: {}", loadResponse.status, loadResponse);
throw new StreamLoadException(
String.format("stream load error, http status:%d, response:%s", loadResponse.status, loadResponse));
} else {
try {
RespContent respContent = MAPPER.readValue(loadResponse.respContent, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
LOG.error("Stream load status is not success, status:{}, response:{}", respContent.getStatus(), loadResponse);
throw new StreamLoadException(
String.format("stream load error, load status:%s, response:%s", respContent.getStatus(), loadResponse));
}
LOG.info("Stream load Response:{}", loadResponse);
return respContent.getTxnId();
} catch (IOException e) {
throw new StreamLoadException(e);
}
}
}
public Long loadStream(Iterator<InternalRow> rows, StructType schema)
throws StreamLoadException, JsonProcessingException {
if (this.streamingPassthrough) {
handleStreamPassThrough();
}
return load(rows, schema);
}
public void commit(long txnId) throws StreamLoadException {
try (CloseableHttpClient client = getHttpClient()) {
String backend = getBackend();
String abortUrl = String.format(abortUrlPattern, backend, db);
HttpPut httpPut = new HttpPut(abortUrl);
addCommonHeader(httpPut);
httpPut.setHeader("txn_operation", "commit");
httpPut.setHeader("txn_id", String.valueOf(txnId));
CloseableHttpResponse response = client.execute(httpPut);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200 || response.getEntity() == null) {
LOG.warn("commit transaction response: " + response.getStatusLine().toString());
throw new StreamLoadException("Fail to commit transaction " + txnId + " with url " + abortUrl);
}
statusCode = response.getStatusLine().getStatusCode();
String reasonPhrase = response.getStatusLine().getReasonPhrase();
if (statusCode != 200) {
LOG.warn("commit failed with {}, reason {}", backend, reasonPhrase);
throw new StreamLoadException("stream load error: " + reasonPhrase);
}
if (response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = MAPPER.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
});
if (res.get("status").equals("Fail") && !ResponseUtil.isCommitted(res.get("msg"))) {
throw new StreamLoadException("Commit failed " + loadResult);
} else {
LOG.info("load result {}", loadResult);
}
}
} catch (IOException e) {
throw new StreamLoadException(e);
}
}
/**
* abort transaction by id
*
* @param txnId transaction id
* @throws StreamLoadException
*/
public void abortById(long txnId) throws StreamLoadException {
LOG.info("start abort transaction {}.", txnId);
try {
doAbort(httpPut -> httpPut.setHeader("txn_id", String.valueOf(txnId)));
} catch (StreamLoadException e) {
LOG.error("abort transaction by id: {} failed.", txnId);
throw e;
}
LOG.info("abort transaction {} succeed.", txnId);
}
/**
* abort transaction by label
*
* @param label label
* @throws StreamLoadException
*/
public void abortByLabel(String label) throws StreamLoadException {
LOG.info("start abort transaction by label: {}.", label);
try {
doAbort(httpPut -> httpPut.setHeader("label", label));
} catch (StreamLoadException e) {
LOG.error("abort transaction by label: {} failed.", label);
throw e;
}
LOG.info("abort transaction by label {} succeed.", label);
}
/**
* execute abort
*
* @param putConsumer http put process function
* @throws StreamLoadException
*/
private void doAbort(Consumer<HttpPut> putConsumer) throws StreamLoadException {
try (CloseableHttpClient client = getHttpClient()) {
String abortUrl = String.format(abortUrlPattern, getBackend(), db);
HttpPut httpPut = new HttpPut(abortUrl);
addCommonHeader(httpPut);
httpPut.setHeader("txn_operation", "abort");
putConsumer.accept(httpPut);
CloseableHttpResponse response = client.execute(httpPut);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200 || response.getEntity() == null) {
LOG.error("abort transaction response: " + response.getStatusLine().toString());
throw new IOException("Fail to abort transaction with url " + abortUrl);
}
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = MAPPER.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
});
if (!"Success".equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
throw new IOException("try abort committed transaction");
}
LOG.error("Fail to abort transaction. error: {}", res.get("msg"));
throw new IOException(String.format("Fail to abort transaction. error: %s", res.get("msg")));
}
} catch (IOException e) {
throw new StreamLoadException(e);
}
}
public Map<String, String> getStreamLoadProp(SparkSettings sparkSettings) {
Map<String, String> streamLoadPropMap = new HashMap<>();
Properties properties = sparkSettings.asProperties();
for (String key : properties.stringPropertyNames()) {
if (key.contains(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)) {
String subKey = key.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length());
streamLoadPropMap.put(subKey, properties.getProperty(key));
}
}
return streamLoadPropMap;
}
private String getBackend() {
try {
if (autoRedirect) {
return RestService.randomEndpoint(fenodes, LOG);
}
// get backends from cache
List<BackendV2.BackendRowV2> backends = cache.get("backends");
Collections.shuffle(backends);
BackendV2.BackendRowV2 backend = backends.get(0);
return backend.getIp() + ":" + backend.getHttpPort();
} catch (ExecutionException e) {
throw new RuntimeException("get backends info fail", e);
}
catch (IllegalArgumentException e) {
throw new RuntimeException("get frontend info fail", e);
}
}
private static String getAuthEncoded(String user, String passwd) {
String raw = String.format("%s:%s", user, passwd);
return Base64.getEncoder().encodeToString(raw.getBytes(StandardCharsets.UTF_8));
}
/**
* serializable be cache loader
*/
private static class BackendCacheLoader extends CacheLoader<String, List<BackendV2.BackendRowV2>> implements Serializable {
private final SparkSettings settings;
public BackendCacheLoader(SparkSettings settings) {
this.settings = settings;
}
@Override
public List<BackendV2.BackendRowV2> load(String key) throws Exception {
return RestService.getBackendRows(settings, LOG);
}
}
private String generateLoadLabel() {
Calendar calendar = Calendar.getInstance();
return String.format("spark_streamload_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
}
private String escapeString(String hexData) {
if (hexData.startsWith("\\x") || hexData.startsWith("\\X")) {
try {
hexData = hexData.substring(2);
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < hexData.length(); i += 2) {
String hexByte = hexData.substring(i, i + 2);
int decimal = Integer.parseInt(hexByte, 16);
char character = (char) decimal;
stringBuilder.append(character);
}
return stringBuilder.toString();
} catch (Exception e) {
throw new RuntimeException("escape column_separator or line_delimiter error.{}", e);
}
}
return hexData;
}
/**
* add common header to http request
*
* @param httpReq http request
*/
private void addCommonHeader(HttpRequestBase httpReq) {
httpReq.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + authEncoded);
httpReq.setHeader(HttpHeaders.EXPECT, "100-continue");
httpReq.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
}
/**
* handle stream sink data pass through
* if load format is json, set read_json_by_line to true and remove strip_outer_array parameter
*/
private void handleStreamPassThrough() {
if (dataFormat.equals(DataFormat.JSON)) {
LOG.info("handle stream pass through, force set read_json_by_line is true for json format");
streamLoadProp.put("read_json_by_line", "true");
streamLoadProp.remove("strip_outer_array");
}
}
/**
* compress data by gz compression algorithm
*/
public byte[] compressByGZ(String content) throws IOException{
byte[] compressedData;
try(ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);
){
gzipOutputStream.write(content.getBytes("UTF-8"));
gzipOutputStream.finish();
compressedData = baos.toByteArray();
}
return compressedData;
}
}