blob: 78b8c86ab068a201b890e8fdb06b241ba0fa4258 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.elasticsearch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Node;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Requests;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
@Slf4j
public class ElasticSearchClient implements AutoCloseable {
static final String[] MALFORMED_ERRORS = {
"mapper_parsing_exception",
"action_request_validation_exception",
"illegal_argument_exception"
};
private ElasticSearchConfig config;
private ConfigCallback configCallback;
private RestHighLevelClient client;
final Set<String> indexCache = new HashSet<>();
final Map<String, String> topicToIndexCache = new HashMap<>();
final RandomExponentialRetry backoffRetry;
final BulkProcessor bulkProcessor;
final ConcurrentMap<DocWriteRequest<?>, Record> records = new ConcurrentHashMap<>();
final AtomicReference<Exception> irrecoverableError = new AtomicReference<>();
final ScheduledExecutorService executorService;
ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) {
this.config = elasticSearchConfig;
this.configCallback = new ConfigCallback();
this.backoffRetry = new RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
if (!config.isBulkEnabled()) {
bulkProcessor = null;
} else {
BulkProcessor.Builder builder = BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener)
-> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener),
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
log.trace("Bulk request id={} size={}:", l, bulkRequest.requests().size());
for (int i = 0; i < bulkResponse.getItems().length; i++) {
DocWriteRequest<?> request = bulkRequest.requests().get(i);
Record record = records.get(request);
BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i];
if (bulkItemResponse.isFailed()) {
record.fail();
try {
hasIrrecoverableError(bulkItemResponse);
} catch (Exception e) {
log.warn("Unrecoverable error:", e);
}
} else {
record.ack();
}
records.remove(request);
}
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
log.warn("Bulk request id={} failed:", l, throwable);
for (DocWriteRequest<?> request : bulkRequest.requests()) {
Record record = records.remove(request);
record.fail();
}
}
}
)
.setBulkActions(config.getBulkActions())
.setBulkSize(new ByteSizeValue(config.getBulkSizeInMb(), ByteSizeUnit.MB))
.setConcurrentRequests(config.getBulkConcurrentRequests())
.setBackoffPolicy(new RandomExponentialBackoffPolicy(backoffRetry,
config.getRetryBackoffInMs(),
config.getMaxRetries()
));
if (config.getBulkFlushIntervalInMs() > 0) {
builder.setFlushInterval(new TimeValue(config.getBulkFlushIntervalInMs(), TimeUnit.MILLISECONDS));
}
this.bulkProcessor = builder.build();
}
// idle+expired connection evictor thread
this.executorService = Executors.newSingleThreadScheduledExecutor();
this.executorService.scheduleAtFixedRate(() -> {
configCallback.connectionManager.closeExpiredConnections();
configCallback.connectionManager.closeIdleConnections(
config.getConnectionIdleTimeoutInMs(), TimeUnit.MILLISECONDS);
},
config.getConnectionIdleTimeoutInMs(),
config.getConnectionIdleTimeoutInMs(),
TimeUnit.MILLISECONDS
);
log.info("ElasticSearch URL {}", config.getElasticSearchUrl());
HttpHost[] hosts = getHttpHosts(config);
RestClientBuilder builder = RestClient.builder(hosts)
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
return builder
.setContentCompressionEnabled(config.isCompressionEnabled())
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
.setConnectTimeout(config.getConnectTimeoutInMs())
.setSocketTimeout(config.getSocketTimeoutInMs());
}
})
.setHttpClientConfigCallback(this.configCallback)
.setFailureListener(new RestClient.FailureListener() {
public void onFailure(Node node) {
log.warn("Node host={} failed", node.getHost());
}
});
this.client = new RestHighLevelClient(builder);
}
void failed(Exception e) throws Exception {
if (irrecoverableError.compareAndSet(null, e)) {
log.error("Irrecoverable error:", e);
}
}
boolean isFailed() {
return irrecoverableError.get() != null;
}
void hasIrrecoverableError(BulkItemResponse bulkItemResponse) throws Exception {
for (String error : MALFORMED_ERRORS) {
if (bulkItemResponse.getFailureMessage().contains(error)) {
switch (config.getMalformedDocAction()) {
case IGNORE:
break;
case WARN:
log.warn("Ignoring malformed document index={} id={}",
bulkItemResponse.getIndex(),
bulkItemResponse.getId(),
bulkItemResponse.getFailure().getCause());
break;
case FAIL:
log.error("Failure due to the malformed document index={} id={}",
bulkItemResponse.getIndex(),
bulkItemResponse.getId(),
bulkItemResponse.getFailure().getCause());
failed(bulkItemResponse.getFailure().getCause());
break;
}
}
}
}
IndexRequest makeIndexRequest(Record<GenericObject> record, Pair<String, String> idAndDoc) throws IOException {
IndexRequest indexRequest = Requests.indexRequest(indexName(record.getTopicName()));
if (!Strings.isNullOrEmpty(idAndDoc.getLeft())) {
indexRequest.id(idAndDoc.getLeft());
}
indexRequest.type(config.getTypeName());
indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
return indexRequest;
}
DeleteRequest makeDeleteRequest(Record<GenericObject> record, String id) throws IOException {
DeleteRequest deleteRequest = Requests.deleteRequest(indexName(record.getTopicName()));
deleteRequest.id(id);
deleteRequest.type(config.getTypeName());
return deleteRequest;
}
public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Exception {
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
IndexRequest indexRequest = makeIndexRequest(record, idAndDoc);
records.put(indexRequest, record);
bulkProcessor.add(indexRequest);
} catch (Exception e) {
log.debug("index failed id=" + idAndDoc.getLeft(), e);
record.fail();
throw e;
}
}
/**
* Index an elasticsearch document and ack the record.
* @param record
* @param idAndDoc
* @return
* @throws Exception
*/
public boolean indexDocument(Record<GenericObject> record, Pair<String, String> idAndDoc) throws Exception {
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
IndexResponse indexResponse = client.index(makeIndexRequest(record, idAndDoc), RequestOptions.DEFAULT);
if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)
|| indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) {
record.ack();
return true;
} else {
record.fail();
return false;
}
} catch (final Exception ex) {
log.error("index failed id=" + idAndDoc.getLeft(), ex);
record.fail();
throw ex;
}
}
public void bulkDelete(Record<GenericObject> record, String id) throws Exception {
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
DeleteRequest deleteRequest = makeDeleteRequest(record, id);
records.put(deleteRequest, record);
bulkProcessor.add(deleteRequest);
} catch (Exception e) {
log.debug("delete failed id=" + id, e);
record.fail();
throw e;
}
}
/**
* Delete an elasticsearch document and ack the record.
* @param record
* @param id
* @return
* @throws IOException
*/
public boolean deleteDocument(Record<GenericObject> record, String id) throws Exception {
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
DeleteResponse deleteResponse = client.delete(makeDeleteRequest(record, id), RequestOptions.DEFAULT);
log.debug("delete result=" + deleteResponse.getResult());
if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)
|| deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
record.ack();
return true;
}
record.fail();
return false;
} catch (final Exception ex) {
log.debug("index failed id=" + id, ex);
record.fail();
throw ex;
}
}
/**
* Flushes the bulk processor.
*/
public void flush() {
bulkProcessor.flush();
}
@Override
public void close() {
try {
if (bulkProcessor != null) {
bulkProcessor.awaitClose(5000L, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
log.warn("Elasticsearch bulk processor close error:", e);
}
try {
this.executorService.shutdown();
if (this.client != null) {
this.client.close();
}
} catch (IOException e) {
log.warn("Elasticsearch client close error:", e);
}
}
private void checkNotFailed() throws Exception {
if (irrecoverableError.get() != null) {
throw irrecoverableError.get();
}
}
private void checkIndexExists(Optional<String> topicName) throws IOException {
if (!config.isCreateIndexIfNeeded()) {
return;
}
String indexName = indexName(topicName);
if (!indexCache.contains(indexName)) {
synchronized (this) {
if (!indexCache.contains(indexName)) {
createIndexIfNeeded(indexName);
indexCache.add(indexName);
}
}
}
}
private String indexName(Optional<String> topicName) throws IOException {
if (config.getIndexName() != null) {
// Use the configured indexName if provided.
return config.getIndexName();
}
if (!topicName.isPresent()) {
throw new IOException("Elasticsearch index name configuration and topic name are empty");
}
return topicToIndexName(topicName.get());
}
@VisibleForTesting
public String topicToIndexName(String topicName) {
return topicToIndexCache.computeIfAbsent(topicName, k -> {
// see elasticsearch limitations https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-api-path-params
String indexName = topicName.toLowerCase(Locale.ROOT);
// remove the pulsar topic info persistent://tenant/namespace/topic
String[] parts = indexName.split("/");
if (parts.length > 1) {
indexName = parts[parts.length - 1];
}
// truncate to the max bytes length
while (indexName.getBytes(StandardCharsets.UTF_8).length > 255) {
indexName = indexName.substring(0, indexName.length() - 1);
}
if (indexName.length() <= 0 || !indexName.matches("[a-zA-Z\\.0-9][a-zA-Z_\\.\\-\\+0-9]*")) {
throw new RuntimeException(new IOException("Cannot convert the topic name='"
+ topicName + "' to a valid elasticsearch index name"));
}
if (log.isDebugEnabled()) {
log.debug("Translate topic={} to index={}", k, indexName);
}
return indexName;
});
}
@VisibleForTesting
public boolean createIndexIfNeeded(String indexName) throws IOException {
if (indexExists(indexName)) {
return false;
}
final CreateIndexRequest cireq = new CreateIndexRequest(indexName);
cireq.settings(Settings.builder()
.put("index.number_of_shards", config.getIndexNumberOfShards())
.put("index.number_of_replicas", config.getIndexNumberOfReplicas()));
return retry(() -> {
CreateIndexResponse resp = client.indices().create(cireq, RequestOptions.DEFAULT);
if (!resp.isAcknowledged() || !resp.isShardsAcknowledged()) {
throw new IOException("Unable to create index.");
}
return true;
}, "create index");
}
public boolean indexExists(final String indexName) throws IOException {
final GetIndexRequest request = new GetIndexRequest(indexName);
return retry(() -> client.indices().exists(request, RequestOptions.DEFAULT), "index exists");
}
@VisibleForTesting
protected long totalHits(String indexName) throws IOException {
return search(indexName).getHits().getTotalHits().value;
}
@VisibleForTesting
protected SearchResponse search(String indexName) throws IOException {
client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
return client.search(
new SearchRequest()
.indices(indexName)
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
RequestOptions.DEFAULT);
}
@VisibleForTesting
protected AcknowledgedResponse delete(String indexName) throws IOException {
return client.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
}
private <T> T retry(Callable<T> callable, String source) {
try {
return backoffRetry.retry(callable, config.getMaxRetries(), config.getRetryBackoffInMs(), source);
} catch (Exception e) {
log.error("error in command {} wth retry", source, e);
throw new ElasticSearchConnectionException(source + " failed", e);
}
}
public class ConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
final NHttpClientConnectionManager connectionManager;
final CredentialsProvider credentialsProvider;
public ConfigCallback() {
this.connectionManager = buildConnectionManager(ElasticSearchClient.this.config);
this.credentialsProvider = buildCredentialsProvider(ElasticSearchClient.this.config);
}
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder builder) {
builder.setMaxConnPerRoute(config.getBulkConcurrentRequests());
builder.setMaxConnTotal(config.getBulkConcurrentRequests());
builder.setConnectionManager(connectionManager);
if (this.credentialsProvider != null) {
builder.setDefaultCredentialsProvider(credentialsProvider);
}
return builder;
}
public NHttpClientConnectionManager buildConnectionManager(ElasticSearchConfig config) {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setConnectTimeout(config.getConnectTimeoutInMs())
.setSoTimeout(config.getSocketTimeoutInMs())
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager connManager;
if (config.getSsl().isEnabled()) {
ElasticSearchSslConfig sslConfig = config.getSsl();
HostnameVerifier hostnameVerifier = config.getSsl().isHostnameVerification()
? SSLConnectionSocketFactory.getDefaultHostnameVerifier()
: new NoopHostnameVerifier();
String[] cipherSuites = null;
if (!Strings.isNullOrEmpty(sslConfig.getCipherSuites())) {
cipherSuites = sslConfig.getCipherSuites().split(",");
}
String[] protocols = null;
if (!Strings.isNullOrEmpty(sslConfig.getProtocols())) {
protocols = sslConfig.getProtocols().split(",");
}
Registry<SchemeIOSessionStrategy> registry = RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", new SSLIOSessionStrategy(
buildSslContext(config),
protocols,
cipherSuites,
hostnameVerifier))
.build();
connManager = new PoolingNHttpClientConnectionManager(ioReactor, registry);
} else {
connManager = new PoolingNHttpClientConnectionManager(ioReactor);
}
return connManager;
} catch (Exception e) {
throw new ElasticSearchConnectionException(e);
}
}
private SSLContext buildSslContext(ElasticSearchConfig config)
throws NoSuchAlgorithmException, KeyManagementException, CertificateException,
KeyStoreException, IOException, UnrecoverableKeyException {
ElasticSearchSslConfig sslConfig = config.getSsl();
SSLContextBuilder sslContextBuilder = SSLContexts.custom();
if (!Strings.isNullOrEmpty(sslConfig.getProvider())) {
sslContextBuilder.setProvider(sslConfig.getProvider());
}
if (!Strings.isNullOrEmpty(sslConfig.getProtocols())) {
sslContextBuilder.setProtocol(sslConfig.getProtocols());
}
if (!Strings.isNullOrEmpty(sslConfig.getTruststorePath())
&& !Strings.isNullOrEmpty(sslConfig.getTruststorePassword())) {
sslContextBuilder.loadTrustMaterial(
new File(sslConfig.getTruststorePath()), sslConfig.getTruststorePassword().toCharArray());
}
if (!Strings.isNullOrEmpty(sslConfig.getKeystorePath())
&& !Strings.isNullOrEmpty(sslConfig.getKeystorePassword())) {
sslContextBuilder.loadKeyMaterial(new File(sslConfig.getKeystorePath()),
sslConfig.getKeystorePassword().toCharArray(),
sslConfig.getKeystorePassword().toCharArray());
}
return sslContextBuilder.build();
}
private CredentialsProvider buildCredentialsProvider(ElasticSearchConfig config) {
if (StringUtils.isEmpty(config.getUsername()) || StringUtils.isEmpty(config.getPassword())) {
return null;
}
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
return credentialsProvider;
}
}
private static HttpHost[] getHttpHosts(ElasticSearchConfig elasticSearchConfig) {
String url = elasticSearchConfig.getElasticSearchUrl();
return Arrays.stream(url.split(",")).map(host -> {
try {
URL hostUrl = new URL(host);
return new HttpHost(hostUrl.getHost(), hostUrl.getPort(),
hostUrl.getProtocol());
} catch (MalformedURLException e) {
throw new RuntimeException("Invalid elasticSearch url :" + host);
}
}).toArray(HttpHost[]::new);
}
RestHighLevelClient getClient() {
return client;
}
}