blob: 4f1cd086d8fa25cf784e5b15b262a768c1c58f9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.elasticsearch;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
/**
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
*/
@Internal
public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge<Client> {
private static final long serialVersionUID = -2632363720584123682L;
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch1ApiCallBridge.class);
/** User-provided transport addresses. This is null if we are using an embedded {@link Node} for communication. */
private final List<TransportAddress> transportAddresses;
/** The embedded {@link Node} used for communication. This is null if we are using a TransportClient. */
private transient Node node;
/**
* Constructor for use of an embedded {@link Node} for communication with the Elasticsearch cluster.
*/
Elasticsearch1ApiCallBridge() {
this.transportAddresses = null;
}
/**
* Constructor for use of a {@link TransportClient} for communication with the Elasticsearch cluster.
*/
Elasticsearch1ApiCallBridge(List<TransportAddress> transportAddresses) {
Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
this.transportAddresses = transportAddresses;
}
@Override
public Client createClient(Map<String, String> clientConfig) {
if (transportAddresses == null) {
// Make sure that we disable http access to our embedded node
Settings settings = settingsBuilder()
.put(clientConfig)
.put("http.enabled", false)
.build();
node = nodeBuilder()
.settings(settings)
.client(true)
.data(false)
.node();
Client client = node.client();
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch client from embedded node");
}
return client;
} else {
Settings settings = settingsBuilder()
.put(clientConfig)
.build();
TransportClient transportClient = new TransportClient(settings);
for (TransportAddress transport: transportAddresses) {
transportClient.addTransportAddress(transport);
}
// verify that we actually are connected to a cluster
if (transportClient.connectedNodes().isEmpty()) {
throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
}
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
}
return transportClient;
}
}
@Override
public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) {
return BulkProcessor.builder(client, listener);
}
@Override
public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
if (!bulkItemResponse.isFailed()) {
return null;
} else {
return new RuntimeException(bulkItemResponse.getFailureMessage());
}
}
@Override
public void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
// Elasticsearch 1.x does not support backoff retries for failed bulk requests
LOG.warn("Elasticsearch 1.x does not support backoff retries.");
}
@Override
public void cleanup() {
if (node != null && !node.isClosed()) {
node.close();
node = null;
}
}
}