blob: d97075cb168e1727c70d5e7b9c26c8212cfa8dcf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.flink.elasticsearch6.sink;
import static org.apache.seatunnel.flink.elasticsearch6.config.Config.DEFAULT_INDEX;
import static org.apache.seatunnel.flink.elasticsearch6.config.Config.DEFAULT_INDEX_TIME_FORMAT;
import static org.apache.seatunnel.flink.elasticsearch6.config.Config.DEFAULT_INDEX_TYPE;
import static org.apache.seatunnel.flink.elasticsearch6.config.Config.HOSTS;
import static org.apache.seatunnel.flink.elasticsearch6.config.Config.INDEX;
import static org.apache.seatunnel.flink.elasticsearch6.config.Config.INDEX_TIME_FORMAT;
import static org.apache.seatunnel.flink.elasticsearch6.config.Config.INDEX_TYPE;
import static org.apache.seatunnel.flink.elasticsearch6.config.Config.PARALLELISM;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.utils.StringTemplate;
import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import com.google.auto.service.AutoService;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@AutoService(BaseFlinkSink.class)
public class Elasticsearch6 implements FlinkStreamSink, FlinkBatchSink {
private static final long serialVersionUID = 8445868321245456793L;
private static final int DEFAULT_CONFIG_SIZE = 3;
private Config config;
private String indexName;
@Override
public void setConfig(Config config) {
this.config = config;
}
@Override
public Config getConfig() {
return config;
}
@Override
public CheckResult checkConfig() {
return CheckConfigUtil.checkAllExists(config, HOSTS);
}
@Override
public void prepare(FlinkEnvironment env) {
Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG_SIZE);
configMap.put(INDEX, DEFAULT_INDEX);
configMap.put(INDEX_TYPE, DEFAULT_INDEX_TYPE);
configMap.put(INDEX_TIME_FORMAT, DEFAULT_INDEX_TIME_FORMAT);
Config defaultConfig = ConfigFactory.parseMap(configMap);
config = config.withFallback(defaultConfig);
}
@Override
public String getPluginName() {
return "ElasticSearch";
}
@Override
public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
List<HttpHost> httpHosts = new ArrayList<>();
List<String> hosts = config.getStringList(HOSTS);
for (String host : hosts) {
httpHosts.add(new HttpHost(host.split(":")[0], Integer.parseInt(host.split(":")[1]), "http"));
}
RowTypeInfo rowTypeInfo = (RowTypeInfo) dataStream.getType();
indexName = StringTemplate.substitute(config.getString(INDEX), config.getString(INDEX_TIME_FORMAT));
ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts, (ElasticsearchSinkFunction<Row>) (element, ctx, indexer) ->
indexer.add(createIndexRequest(rowTypeInfo.getFieldNames(), element))
);
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
// finally, build and add the sink to the job's pipeline
if (config.hasPath(PARALLELISM)) {
int parallelism = config.getInt(PARALLELISM);
dataStream.addSink(esSinkBuilder.build()).setParallelism(parallelism);
} else {
dataStream.addSink(esSinkBuilder.build());
}
}
@Override
public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
RowTypeInfo rowTypeInfo = (RowTypeInfo) dataSet.getType();
indexName = StringTemplate.substitute(config.getString(INDEX), config.getString(INDEX_TIME_FORMAT));
DataSink<Row> dataSink = dataSet.output(new ElasticsearchOutputFormat<>(config,
(ElasticsearchSinkFunction<Row>) (element, ctx, indexer) ->
indexer.add(createIndexRequest(rowTypeInfo.getFieldNames(), element))));
if (config.hasPath(PARALLELISM)) {
int parallelism = config.getInt(PARALLELISM);
dataSink.setParallelism(parallelism);
}
}
private IndexRequest createIndexRequest(String[] fieldNames, Row element) {
int elementLen = element.getArity();
Map<String, Object> json = new HashMap<>(elementLen);
for (int i = 0; i < elementLen; i++) {
json.put(fieldNames[i], element.getField(i));
}
return Requests.indexRequest()
.index(indexName)
.type(config.getString(INDEX_TYPE))
.source(json);
}
}