| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.elasticsearch; |
| |
| import com.fasterxml.jackson.annotation.JsonInclude; |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.http.Header; |
| import org.apache.http.HttpEntity; |
| import org.apache.http.HttpHost; |
| import org.apache.http.auth.AuthScope; |
| import org.apache.http.auth.UsernamePasswordCredentials; |
| import org.apache.http.client.CredentialsProvider; |
| import org.apache.http.entity.ContentType; |
| import org.apache.http.impl.client.BasicCredentialsProvider; |
| import org.apache.http.nio.entity.NStringEntity; |
| import org.apache.nifi.annotation.lifecycle.OnDisabled; |
| import org.apache.nifi.annotation.lifecycle.OnEnabled; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.controller.AbstractControllerService; |
| import org.apache.nifi.controller.ConfigurationContext; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.reporting.InitializationException; |
| import org.apache.nifi.ssl.SSLContextService; |
| import org.apache.nifi.util.StopWatch; |
| import org.apache.nifi.util.StringUtils; |
| import org.elasticsearch.client.Request; |
| import org.elasticsearch.client.Response; |
| import org.elasticsearch.client.ResponseException; |
| import org.elasticsearch.client.RestClient; |
| import org.elasticsearch.client.RestClientBuilder; |
| |
| import javax.net.ssl.SSLContext; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.nio.charset.Charset; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { |
| private ObjectMapper mapper; |
| |
| private static final List<PropertyDescriptor> properties; |
| |
| private RestClient client; |
| |
| private String url; |
| private Charset responseCharset; |
| |
| static { |
| final List<PropertyDescriptor> props = new ArrayList<>(); |
| props.add(ElasticSearchClientService.HTTP_HOSTS); |
| props.add(ElasticSearchClientService.USERNAME); |
| props.add(ElasticSearchClientService.PASSWORD); |
| props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE); |
| props.add(ElasticSearchClientService.CONNECT_TIMEOUT); |
| props.add(ElasticSearchClientService.SOCKET_TIMEOUT); |
| props.add(ElasticSearchClientService.RETRY_TIMEOUT); |
| props.add(ElasticSearchClientService.CHARSET); |
| props.add(ElasticSearchClientService.SUPPRESS_NULLS); |
| |
| properties = Collections.unmodifiableList(props); |
| } |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return properties; |
| } |
| |
| @OnEnabled |
| public void onEnabled(final ConfigurationContext context) throws InitializationException { |
| try { |
| setupClient(context); |
| responseCharset = Charset.forName(context.getProperty(CHARSET).getValue()); |
| |
| // re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic |
| mapper = new ObjectMapper(); |
| if (ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue())) { |
| mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); |
| mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); |
| } |
| } catch (final Exception ex) { |
| getLogger().error("Could not initialize ElasticSearch client.", ex); |
| throw new InitializationException(ex); |
| } |
| } |
| |
| @OnDisabled |
| public void onDisabled() throws IOException { |
| this.client.close(); |
| this.url = null; |
| } |
| |
| private void setupClient(final ConfigurationContext context) throws MalformedURLException, InitializationException { |
| final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue(); |
| final String[] hostsSplit = hosts.split(",[\\s]*"); |
| this.url = hostsSplit[0]; |
| final SSLContextService sslService = |
| context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); |
| final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); |
| final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); |
| |
| final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger(); |
| final Integer readTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger(); |
| |
| final HttpHost[] hh = new HttpHost[hostsSplit.length]; |
| for (int x = 0; x < hh.length; x++) { |
| final URL u = new URL(hostsSplit[x]); |
| hh[x] = new HttpHost(u.getHost(), u.getPort(), u.getProtocol()); |
| } |
| |
| final SSLContext sslContext; |
| try { |
| sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured())) |
| ? sslService.createContext() : null; |
| } catch (final Exception e) { |
| getLogger().error("Error building up SSL Context from the supplied configuration.", e); |
| throw new InitializationException(e); |
| } |
| |
| final RestClientBuilder builder = RestClient.builder(hh) |
| .setHttpClientConfigCallback(httpClientBuilder -> { |
| if (sslContext != null) { |
| httpClientBuilder.setSSLContext(sslContext); |
| } |
| |
| if (username != null && password != null) { |
| final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
| credentialsProvider.setCredentials(AuthScope.ANY, |
| new UsernamePasswordCredentials(username, password)); |
| httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
| } |
| |
| return httpClientBuilder; |
| }) |
| .setRequestConfigCallback(requestConfigBuilder -> { |
| requestConfigBuilder.setConnectTimeout(connectTimeout); |
| requestConfigBuilder.setSocketTimeout(readTimeout); |
| return requestConfigBuilder; |
| }); |
| |
| this.client = builder.build(); |
| } |
| |
| private Response runQuery(final String endpoint, final String query, final String index, final String type, final Map<String, String> requestParameters) { |
| final StringBuilder sb = new StringBuilder(); |
| if (StringUtils.isNotBlank(index)) { |
| sb.append("/").append(index); |
| } |
| if (StringUtils.isNotBlank(type)) { |
| sb.append("/").append(type); |
| } |
| |
| sb.append(String.format("/%s", endpoint)); |
| |
| final HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON); |
| try { |
| return performRequest("POST", sb.toString(), requestParameters, queryEntity); |
| } catch (final Exception e) { |
| throw new ElasticsearchError(e); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Map<String, Object> parseResponse(final Response response) { |
| final int code = response.getStatusLine().getStatusCode(); |
| |
| try { |
| if (code >= 200 && code < 300) { |
| final InputStream inputStream = response.getEntity().getContent(); |
| final byte[] result = IOUtils.toByteArray(inputStream); |
| inputStream.close(); |
| return mapper.readValue(new String(result, responseCharset), Map.class); |
| } else { |
| final String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s", |
| response.getStatusLine().getReasonPhrase()); |
| throw new IOException(errorMessage); |
| } |
| } catch (final Exception ex) { |
| throw new ElasticsearchError(ex); |
| } |
| } |
| |
| private List<String> parseResponseWarningHeaders(final Response response) { |
| return Arrays.stream(response.getHeaders()) |
| .filter(h -> "Warning".equalsIgnoreCase(h.getName())) |
| .map(Header::getValue) |
| .peek(h -> getLogger().warn("Elasticsearch Warning: {}", h)) |
| .collect(Collectors.toList()); |
| } |
| |
| @Override |
| public IndexOperationResponse add(final IndexOperationRequest operation, final Map<String, String> requestParameters) { |
| return bulk(Collections.singletonList(operation), requestParameters); |
| } |
| |
| private String flatten(final String str) { |
| return str.replaceAll("[\\n\\r]", "\\\\n"); |
| } |
| |
| private String buildBulkHeader(final IndexOperationRequest request) throws JsonProcessingException { |
| final String operation = request.getOperation().equals(IndexOperationRequest.Operation.Upsert) |
| ? "update" |
| : request.getOperation().getValue(); |
| return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId()); |
| } |
| |
| private String buildBulkHeader(final String operation, final String index, final String type, final String id) throws JsonProcessingException { |
| final Map<String, Object> header = new HashMap<String, Object>() {{ |
| put(operation, new HashMap<String, Object>() {{ |
| put("_index", index); |
| if (StringUtils.isNotBlank(id)) { |
| put("_id", id); |
| } |
| if (StringUtils.isNotBlank(type)) { |
| put("_type", type); |
| } |
| }}); |
| }}; |
| |
| return flatten(mapper.writeValueAsString(header)); |
| } |
| |
| protected void buildRequest(final IndexOperationRequest request, final StringBuilder builder) throws JsonProcessingException { |
| final String header = buildBulkHeader(request); |
| builder.append(header).append("\n"); |
| switch (request.getOperation()) { |
| case Index: |
| case Create: |
| final String indexDocument = mapper.writeValueAsString(request.getFields()); |
| builder.append(indexDocument).append("\n"); |
| break; |
| case Update: |
| case Upsert: |
| final Map<String, Object> doc = new HashMap<String, Object>() {{ |
| put("doc", request.getFields()); |
| if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) { |
| put("doc_as_upsert", true); |
| } |
| }}; |
| final String update = flatten(mapper.writeValueAsString(doc)).trim(); |
| builder.append(update).append("\n"); |
| break; |
| case Delete: |
| // nothing to do for Delete operations, it just needs the header |
| break; |
| default: |
| throw new IllegalArgumentException(String.format("Unhandled Index Operation type: %s", request.getOperation().name())); |
| } |
| } |
| |
| @Override |
| public IndexOperationResponse bulk(final List<IndexOperationRequest> operations, final Map<String, String> requestParameters) { |
| try { |
| final StringBuilder payload = new StringBuilder(); |
| for (final IndexOperationRequest or : operations) { |
| buildRequest(or, payload); |
| } |
| |
| if (getLogger().isDebugEnabled()) { |
| getLogger().debug(payload.toString()); |
| } |
| final HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON); |
| final StopWatch watch = new StopWatch(); |
| watch.start(); |
| final Response response = performRequest("POST", "_bulk", requestParameters, entity); |
| watch.stop(); |
| |
| final String rawResponse = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); |
| parseResponseWarningHeaders(response); |
| |
| if (getLogger().isDebugEnabled()) { |
| getLogger().debug(String.format("Response was: %s", rawResponse)); |
| } |
| |
| return IndexOperationResponse.fromJsonResponse(rawResponse); |
| } catch (final Exception ex) { |
| throw new ElasticsearchError(ex); |
| } |
| } |
| |
| @Override |
| public Long count(final String query, final String index, final String type, final Map<String, String> requestParameters) { |
| final Response response = runQuery("_count", query, index, type, requestParameters); |
| final Map<String, Object> parsed = parseResponse(response); |
| |
| return ((Integer)parsed.get("count")).longValue(); |
| } |
| |
| @Override |
| public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map<String, String> requestParameters) { |
| return deleteById(index, type, Collections.singletonList(id), requestParameters); |
| } |
| |
| @Override |
| public DeleteOperationResponse deleteById(final String index, final String type, final List<String> ids, final Map<String, String> requestParameters) { |
| try { |
| final StringBuilder sb = new StringBuilder(); |
| for (final String id : ids) { |
| final String header = buildBulkHeader("delete", index, type, id); |
| sb.append(header).append("\n"); |
| } |
| final HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON); |
| final StopWatch watch = new StopWatch(); |
| watch.start(); |
| final Response response = performRequest("POST", "_bulk", requestParameters, entity); |
| watch.stop(); |
| |
| if (getLogger().isDebugEnabled()) { |
| getLogger().debug(String.format("Response for bulk delete: %s", |
| IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8))); |
| } |
| |
| parseResponseWarningHeaders(response); |
| return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS)); |
| } catch (final Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| @Override |
| public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map<String, String> requestParameters) { |
| final StopWatch watch = new StopWatch(); |
| watch.start(); |
| final Response response = runQuery("_delete_by_query", query, index, type, requestParameters); |
| watch.stop(); |
| |
| // check for errors in response |
| parseResponse(response); |
| parseResponseWarningHeaders(response); |
| |
| return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS)); |
| } |
| |
| |
| public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map<String, String> requestParameters) { |
| final long start = System.currentTimeMillis(); |
| final Response response = runQuery("_update_by_query", query, index, type, requestParameters); |
| final long end = System.currentTimeMillis(); |
| |
| // check for errors in response |
| parseResponse(response); |
| |
| return new UpdateOperationResponse(end - start); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public Map<String, Object> get(final String index, final String type, final String id, final Map<String, String> requestParameters) { |
| try { |
| final StringBuilder endpoint = new StringBuilder(); |
| endpoint.append(index); |
| if (StringUtils.isNotBlank(type)) { |
| endpoint.append("/").append(type); |
| } else { |
| endpoint.append("/_doc"); |
| } |
| endpoint.append("/").append(id); |
| |
| final Response response = performRequest("GET", endpoint.toString(), requestParameters, null); |
| final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); |
| parseResponseWarningHeaders(response); |
| |
| return (Map<String, Object>) mapper.readValue(body, Map.class).get("_source"); |
| } catch (final Exception ex) { |
| getLogger().error("", ex); |
| return null; |
| } |
| } |
| |
| /* |
| * In pre-7.X ElasticSearch, it should return just a number. 7.X and after they are returning a map. |
| */ |
| @SuppressWarnings("unchecked") |
| private int handleSearchCount(final Object raw) { |
| if (raw instanceof Number) { |
| return Integer.parseInt(raw.toString()); |
| } else if (raw instanceof Map) { |
| return (Integer)((Map<String, Object>)raw).get("value"); |
| } else { |
| throw new ProcessException("Unknown type for hit count."); |
| } |
| } |
| |
| @Override |
| public SearchResponse search(final String query, final String index, final String type, final Map<String, String> requestParameters) { |
| try { |
| final Response response = runQuery("_search", query, index, type, requestParameters); |
| return buildSearchResponse(response); |
| } catch (final Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| @Override |
| public SearchResponse scroll(final String scroll) { |
| try { |
| final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON); |
| final Response response = performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity); |
| return buildSearchResponse(response); |
| } catch (final Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| @Override |
| public String initialisePointInTime(final String index, final String keepAlive) { |
| try { |
| final Map<String, String> params = new HashMap<String, String>() {{ |
| if (StringUtils.isNotBlank(keepAlive)) { |
| put("keep_alive", keepAlive); |
| } |
| }}; |
| final Response response = performRequest("POST", index + "/_pit", params, null); |
| final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); |
| parseResponseWarningHeaders(response); |
| |
| if (getLogger().isDebugEnabled()) { |
| getLogger().debug(String.format("Response for initialising Point in Time: %s", body)); |
| } |
| |
| return (String) mapper.readValue(body, Map.class).get("id"); |
| } catch (final Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| @Override |
| public DeleteOperationResponse deletePointInTime(final String pitId) { |
| try { |
| final HttpEntity pitEntity = new NStringEntity(String.format("{\"id\": \"%s\"}", pitId), ContentType.APPLICATION_JSON); |
| |
| final StopWatch watch = new StopWatch(true); |
| final Response response = performRequest("DELETE", "/_pit", Collections.emptyMap(), pitEntity); |
| watch.stop(); |
| |
| if (getLogger().isDebugEnabled()) { |
| getLogger().debug(String.format("Response for deleting Point in Time: %s", |
| IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8)) |
| ); |
| } |
| |
| parseResponseWarningHeaders(response); |
| return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS)); |
| } catch (final ResponseException re) { |
| if (404 == re.getResponse().getStatusLine().getStatusCode()) { |
| getLogger().debug("Point in Time {} not found in Elasticsearch for deletion, ignoring", pitId); |
| return new DeleteOperationResponse(0); |
| } else { |
| throw new RuntimeException(re); |
| } |
| } catch (final Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| @Override |
| public DeleteOperationResponse deleteScroll(final String scrollId) { |
| try { |
| final HttpEntity scrollBody = new NStringEntity(String.format("{\"scroll_id\": \"%s\"}", scrollId), ContentType.APPLICATION_JSON); |
| |
| final StopWatch watch = new StopWatch(true); |
| final Response response = performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), scrollBody); |
| watch.stop(); |
| |
| if (getLogger().isDebugEnabled()) { |
| getLogger().debug(String.format("Response for deleting Scroll: %s", |
| IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8)) |
| ); |
| } |
| |
| parseResponseWarningHeaders(response); |
| return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS)); |
| } catch (final ResponseException re) { |
| if (404 == re.getResponse().getStatusLine().getStatusCode()) { |
| getLogger().debug("Scroll Id {} not found in Elasticsearch for deletion, ignoring", scrollId); |
| return new DeleteOperationResponse(0); |
| } else { |
| throw new RuntimeException(re); |
| } |
| } catch (final Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private SearchResponse buildSearchResponse(final Response response) throws JsonProcessingException { |
| final Map<String, Object> parsed = parseResponse(response); |
| final List<String> warnings = parseResponseWarningHeaders(response); |
| |
| final int took = (Integer)parsed.get("took"); |
| final boolean timedOut = (Boolean)parsed.get("timed_out"); |
| final String pitId = parsed.get("pit_id") != null ? (String)parsed.get("pit_id") : null; |
| final String scrollId = parsed.get("_scroll_id") != null ? (String)parsed.get("_scroll_id") : null; |
| final Map<String, Object> aggregations = parsed.get("aggregations") != null |
| ? (Map<String, Object>)parsed.get("aggregations") : new HashMap<>(); |
| final Map<String, Object> hitsParent = (Map<String, Object>)parsed.get("hits"); |
| final int count = handleSearchCount(hitsParent.get("total")); |
| final List<Map<String, Object>> hits = (List<Map<String, Object>>)hitsParent.get("hits"); |
| final String searchAfter = getSearchAfter(hits); |
| |
| final SearchResponse esr = new SearchResponse(hits, aggregations, pitId, scrollId, searchAfter, count, took, timedOut, warnings); |
| |
| if (getLogger().isDebugEnabled()) { |
| final String searchSummary = "******************" + |
| String.format(Locale.getDefault(), "Took: %d", took) + |
| String.format(Locale.getDefault(), "Timed out: %s", timedOut) + |
| String.format(Locale.getDefault(), "Aggregation count: %d", aggregations.size()) + |
| String.format(Locale.getDefault(), "Hit count: %d", hits.size()) + |
| String.format(Locale.getDefault(), "PIT Id: %s", pitId) + |
| String.format(Locale.getDefault(), "Scroll Id: %s", scrollId) + |
| String.format(Locale.getDefault(), "Search After: %s", searchAfter) + |
| String.format(Locale.getDefault(), "Total found: %d", count) + |
| String.format(Locale.getDefault(), "Warnings: %s", warnings) + |
| "******************"; |
| getLogger().debug(searchSummary); |
| } |
| |
| return esr; |
| } |
| |
| private String getSearchAfter(final List<Map<String, Object>> hits) throws JsonProcessingException { |
| String searchAfter = null; |
| if (!hits.isEmpty()) { |
| final Object lastHitSort = hits.get(hits.size() - 1).get("sort"); |
| if (lastHitSort != null && !"null".equalsIgnoreCase(lastHitSort.toString())) { |
| searchAfter = mapper.writeValueAsString(lastHitSort); |
| } |
| } |
| return searchAfter; |
| } |
| |
| @Override |
| public String getTransitUrl(final String index, final String type) { |
| return this.url + |
| (StringUtils.isNotBlank(index) ? "/" : "") + |
| (StringUtils.isNotBlank(index) ? index : "") + |
| (StringUtils.isNotBlank(type) ? "/" : "") + |
| (StringUtils.isNotBlank(type) ? type : ""); |
| } |
| |
| private Response performRequest(final String method, final String endpoint, final Map<String, String> parameters, final HttpEntity entity) throws IOException { |
| final Request request = new Request(method, endpoint); |
| if (parameters != null && !parameters.isEmpty()) { |
| request.addParameters(parameters); |
| } |
| if (entity != null) { |
| request.setEntity(entity); |
| } |
| return client.performRequest(request); |
| } |
| } |