blob: 0ebb1c49adb69fce8984fcc0fbc782181295b33a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.druid.sink;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.druid.exception.DruidConnectorException;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.joda.JodaModule;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;
import java.util.stream.Collectors;
public class DruidWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
private static final Logger LOG = LoggerFactory.getLogger(DruidWriter.class);
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_FIELD_DELIMITER = ",";
private static final String TIMESTAMP_SPEC_COLUMN_NAME = "timestamp";
private static final String DRUID_ENDPOINT = "/druid/indexer/v1/task";
private int batchSize;
private int currentBatchSize = 0;
private final DataSchema dataSchema;
private final long processTime;
private final transient StringBuffer data;
private final CloseableHttpClient httpClient;
private final ObjectMapper mapper;
private final String coordinatorUrl;
private final String datasource;
private final SeaTunnelRowType seaTunnelRowType;
public DruidWriter(
SeaTunnelRowType seaTunnelRowType,
String coordinatorUrl,
String datasource,
int batchSize) {
this.seaTunnelRowType = seaTunnelRowType;
this.coordinatorUrl = coordinatorUrl;
this.datasource = datasource;
this.batchSize = batchSize;
this.mapper = provideDruidSerializer();
this.httpClient = HttpClients.createDefault();
this.dataSchema = provideDruidDataSchema();
this.processTime = System.currentTimeMillis();
this.data = new StringBuffer();
}
@Override
public void write(SeaTunnelRow element) throws IOException {
final StringJoiner joiner = new StringJoiner(DEFAULT_FIELD_DELIMITER, "", "");
for (int i = 0; i < element.getArity(); i++) {
final Object v = element.getField(i);
if (v != null) {
joiner.add(v.toString());
}
}
// timestamp column is a required field to add in Druid.
// See https://druid.apache.org/docs/24.0.0/ingestion/data-model.html#primary-timestamp
joiner.add(String.valueOf(processTime));
data.append(joiner);
data.append(DEFAULT_LINE_DELIMITER);
currentBatchSize++;
if (currentBatchSize >= batchSize) {
flush();
currentBatchSize = 0;
}
}
public void flush() throws IOException {
final ParallelIndexIOConfig ioConfig = provideDruidIOConfig(data);
final ParallelIndexSupervisorTask indexTask = provideIndexTask(ioConfig);
final String inputJSON = provideInputJSONString(indexTask);
String uri = new String("http://" + this.coordinatorUrl + DRUID_ENDPOINT);
HttpPost post = new HttpPost(uri);
post.setHeader("Content-Type", "application/json");
post.setHeader("Accept", "application/json, text/plain, */*");
post.setEntity(new StringEntity(inputJSON));
try (CloseableHttpResponse response = httpClient.execute(post)) {
String responseBody =
response.getEntity() != null ? response.getEntity().toString() : "";
LOG.info("Druid write task has been sent, and the response is {}", responseBody);
}
}
@Override
public void close() throws IOException {
flush();
if (httpClient != null) {
httpClient.close();
}
}
private ObjectMapper provideDruidSerializer() {
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JodaModule());
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false);
mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false);
mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false);
mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
return mapper;
}
/**
* One necessary information to provide is DimensionSchema list, which states data type of
* columns. More details in https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html
*/
private DataSchema provideDruidDataSchema() {
final List<DimensionSchema> dimensionSchemas = transformToDimensionSchema();
return new DataSchema(
datasource,
new TimestampSpec(TIMESTAMP_SPEC_COLUMN_NAME, "auto", null),
new DimensionsSpec(dimensionSchemas),
null,
new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, false, null),
null);
}
private List<DimensionSchema> transformToDimensionSchema() {
List<DimensionSchema> dimensionSchemas = new ArrayList<>();
String[] fieldNames = seaTunnelRowType.getFieldNames();
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldNames.length; i++) {
String columnName = fieldNames[i];
switch (fieldTypes[i].getSqlType()) {
case BOOLEAN:
case TIMESTAMP:
case STRING:
dimensionSchemas.add(new StringDimensionSchema(columnName));
break;
case FLOAT:
dimensionSchemas.add(new FloatDimensionSchema(columnName));
break;
case DECIMAL:
case DOUBLE:
dimensionSchemas.add(new DoubleDimensionSchema(columnName));
break;
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
dimensionSchemas.add(new LongDimensionSchema(columnName));
break;
default:
throw new DruidConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"Unsupported data type " + seaTunnelRowType.getFieldType(i));
}
}
return dimensionSchemas;
}
ParallelIndexIOConfig provideDruidIOConfig(final StringBuffer data) {
List<String> formatList =
Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList());
formatList.add(TIMESTAMP_SPEC_COLUMN_NAME);
return new ParallelIndexIOConfig(
null,
new InlineInputSource(data.toString()),
new CsvInputFormat(formatList, DEFAULT_LINE_DELIMITER, null, false, 0),
false,
null);
}
/**
* Provide ParallelIndexSupervisorTask that can run multiple indexing tasks concurrently. See
* more information in https://druid.apache.org/docs/latest/ingestion/native-batch.html
*/
@VisibleForTesting
ParallelIndexSupervisorTask provideIndexTask(final ParallelIndexIOConfig ioConfig) {
return new ParallelIndexSupervisorTask(
null, null, null, new ParallelIndexIngestionSpec(dataSchema, ioConfig, null), null);
}
/**
* Provide JSON to be sent via HTTP request. Please see payload example in
* https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html
*/
String provideInputJSONString(final ParallelIndexSupervisorTask indexTask)
throws JsonProcessingException {
String taskJSON = mapper.writeValueAsString(indexTask);
final ObjectNode jsonObject = (ObjectNode) mapper.readTree(taskJSON);
jsonObject.remove("id");
jsonObject.remove("groupId");
jsonObject.remove("resource");
final ObjectNode spec = (ObjectNode) jsonObject.get("spec");
spec.remove("tuningConfig");
jsonObject.put("spec", spec);
taskJSON = jsonObject.toString();
return taskJSON;
}
}