| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.processors.elasticsearch; |
| |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.node.ArrayNode; |
| import okhttp3.HttpUrl; |
| import okhttp3.MediaType; |
| import okhttp3.OkHttpClient; |
| import okhttp3.RequestBody; |
| import okhttp3.Response; |
| import okhttp3.ResponseBody; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.nifi.annotation.behavior.DynamicProperty; |
| import org.apache.nifi.annotation.behavior.EventDriven; |
| import org.apache.nifi.annotation.behavior.SystemResourceConsideration; |
| import org.apache.nifi.annotation.behavior.InputRequirement; |
| import org.apache.nifi.annotation.behavior.SupportsBatching; |
| import org.apache.nifi.annotation.behavior.SystemResource; |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.documentation.Tags; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.expression.AttributeExpression; |
| import org.apache.nifi.expression.ExpressionLanguageScope; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.apache.nifi.util.StringUtils; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.net.URL; |
| import java.nio.charset.Charset; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| |
| @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) |
| @EventDriven |
| @SupportsBatching |
| @Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) |
| @CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " |
| + "the index to insert into and the type of the document.") |
| @DynamicProperty( |
| name = "A URL query parameter", |
| value = "The value to set it to", |
| expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, |
| description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing") |
| @SystemResourceConsideration(resource = SystemResource.MEMORY) |
| public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { |
| |
| public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") |
| .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); |
| |
| public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") |
| .description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); |
| |
| public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") |
| .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") |
| .build(); |
| |
| public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() |
| .name("put-es-id-attr") |
| .displayName("Identifier Attribute") |
| .description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " |
| + "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be " |
| + "auto-generated by Elasticsearch. For all other Index Operations, the attribute must evaluate to a non-empty value.") |
| .required(false) |
| .expressionLanguageSupported(ExpressionLanguageScope.NONE) |
| .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() |
| .name("put-es-index") |
| .displayName("Index") |
| .description("The name of the index to insert into") |
| .required(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( |
| AttributeExpression.ResultType.STRING, true)) |
| .build(); |
| |
| public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() |
| .name("put-es-type") |
| .displayName("Type") |
| .description("The type of this document (used by Elasticsearch for indexing and searching)") |
| .required(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() |
| .name("put-es-index-op") |
| .displayName("Index Operation") |
| .description("The type of the operation used to index (index, update, upsert, delete)") |
| .required(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) |
| .defaultValue("index") |
| .build(); |
| |
| public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() |
| .name("put-es-batch-size") |
| .displayName("Batch Size") |
| .description("The preferred number of flow files to put to the database in a single transaction. Note that the contents of the " |
| + "flow files will be stored in memory until the bulk operation is performed. Also the results should be returned in the " |
| + "same order the flow files were received.") |
| .required(true) |
| .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) |
| .defaultValue("100") |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .build(); |
| |
| private static final Set<Relationship> relationships; |
| private static final List<PropertyDescriptor> propertyDescriptors; |
| |
| private static final ObjectMapper mapper = new ObjectMapper(); |
| |
| static { |
| final Set<Relationship> _rels = new HashSet<>(); |
| _rels.add(REL_SUCCESS); |
| _rels.add(REL_FAILURE); |
| _rels.add(REL_RETRY); |
| relationships = Collections.unmodifiableSet(_rels); |
| |
| final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); |
| descriptors.add(ID_ATTRIBUTE); |
| descriptors.add(INDEX); |
| descriptors.add(TYPE); |
| descriptors.add(BATCH_SIZE); |
| descriptors.add(INDEX_OP); |
| |
| propertyDescriptors = Collections.unmodifiableList(descriptors); |
| } |
| |
| @Override |
| public Set<Relationship> getRelationships() { |
| return relationships; |
| } |
| |
| @Override |
| public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return propertyDescriptors; |
| } |
| |
| @Override |
| protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { |
| final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext)); |
| // Since Expression Language is allowed for index operation, we can't guarantee that we can catch |
| // all invalid configurations, but we should catch them as soon as we can. For example, if the |
| // Identifier Attribute property is empty, the Index Operation must evaluate to "index". |
| String idAttribute = validationContext.getProperty(ID_ATTRIBUTE).getValue(); |
| String indexOp = validationContext.getProperty(INDEX_OP).getValue(); |
| |
| if (StringUtils.isEmpty(idAttribute)) { |
| switch (indexOp.toLowerCase()) { |
| case "update": |
| case "upsert": |
| case "delete": |
| case "": |
| problems.add(new ValidationResult.Builder() |
| .valid(false) |
| .subject(INDEX_OP.getDisplayName()) |
| .explanation("If Identifier Attribute is not set, Index Operation must evaluate to \"index\"") |
| .build()); |
| break; |
| default: |
| break; |
| } |
| } |
| return problems; |
| } |
| |
| @OnScheduled |
| public void setup(ProcessContext context) { |
| super.setup(context); |
| } |
| |
| @Override |
| public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { |
| final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); |
| |
| final List<FlowFile> flowFiles = session.get(batchSize); |
| if (flowFiles.isEmpty()) { |
| return; |
| } |
| |
| final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); |
| |
| // Authentication |
| final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); |
| final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); |
| |
| |
| OkHttpClient okHttpClient = getClient(); |
| final ComponentLog logger = getLogger(); |
| |
| // Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list. |
| List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles); |
| |
| final StringBuilder sb = new StringBuilder(); |
| final String baseUrl = context.getProperty(ES_URL).evaluateAttributeExpressions().getValue().trim(); |
| if (StringUtils.isEmpty(baseUrl)) { |
| throw new ProcessException("Elasticsearch URL is empty or null, this indicates an invalid Expression (missing variables, e.g.)"); |
| } |
| HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk"); |
| |
| // Find the user-added properties and set them as query parameters on the URL |
| for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) { |
| PropertyDescriptor pd = property.getKey(); |
| if (pd.isDynamic()) { |
| if (property.getValue() != null) { |
| urlBuilder = urlBuilder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue()); |
| } |
| } |
| } |
| final URL url = urlBuilder.build().url(); |
| |
| for (FlowFile file : flowFiles) { |
| final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue(); |
| final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue()); |
| if (StringUtils.isEmpty(index)) { |
| logger.error("No value for index in for {}, transferring to failure", new Object[]{id_attribute, file}); |
| flowFilesToTransfer.remove(file); |
| session.transfer(file, REL_FAILURE); |
| continue; |
| } |
| final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue(); |
| String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue(); |
| if (StringUtils.isEmpty(indexOp)) { |
| logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{file}); |
| flowFilesToTransfer.remove(file); |
| session.transfer(file, REL_FAILURE); |
| continue; |
| } |
| |
| switch (indexOp.toLowerCase()) { |
| case "index": |
| case "update": |
| case "upsert": |
| case "delete": |
| break; |
| default: |
| logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, file}); |
| flowFilesToTransfer.remove(file); |
| session.transfer(file, REL_FAILURE); |
| continue; |
| } |
| |
| final String id = (id_attribute != null) ? file.getAttribute(id_attribute) : null; |
| |
| // The ID must be valid for all operations except "index". For that case, |
| // a missing ID indicates one is to be auto-generated by Elasticsearch |
| if (id == null && !indexOp.equalsIgnoreCase("index")) { |
| logger.error("Index operation {} requires a valid identifier value from a flow file attribute, transferring to failure.", |
| new Object[]{indexOp, file}); |
| flowFilesToTransfer.remove(file); |
| session.transfer(file, REL_FAILURE); |
| continue; |
| } |
| |
| final StringBuilder json = new StringBuilder(); |
| session.read(file, in -> { |
| json.append(IOUtils.toString(in, charset).replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ')); |
| }); |
| |
| String jsonString = json.toString(); |
| |
| // Ensure the JSON body is well-formed |
| try { |
| mapper.readTree(jsonString); |
| } catch (IOException e) { |
| logger.error("Flow file content is not valid JSON, penalizing and transferring to failure.", |
| new Object[]{indexOp, file}); |
| flowFilesToTransfer.remove(file); |
| file = session.penalize(file); |
| session.transfer(file, REL_FAILURE); |
| continue; |
| } |
| |
| buildBulkCommand(sb, index, docType, indexOp, id, jsonString); |
| } |
| if (!flowFilesToTransfer.isEmpty()) { |
| RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString()); |
| final Response getResponse; |
| try { |
| getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody); |
| } catch (final Exception e) { |
| logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e); |
| flowFilesToTransfer.forEach((flowFileToTransfer) -> { |
| flowFileToTransfer = session.penalize(flowFileToTransfer); |
| session.transfer(flowFileToTransfer, REL_FAILURE); |
| }); |
| flowFilesToTransfer.clear(); |
| return; |
| } |
| |
| final int statusCode = getResponse.code(); |
| |
| if (isSuccess(statusCode)) { |
| ResponseBody responseBody = getResponse.body(); |
| try { |
| final byte[] bodyBytes = responseBody.bytes(); |
| |
| JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes)); |
| boolean errors = responseJson.get("errors").asBoolean(false); |
| if (errors) { |
| ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items"); |
| if (itemNodeArray.size() > 0) { |
| // All items are returned whether they succeeded or failed, so iterate through the item array |
| // at the same time as the flow file list, moving each to success or failure accordingly, |
| // but only keep the first error for logging |
| String errorReason = null; |
| for (int i = itemNodeArray.size() - 1; i >= 0; i--) { |
| JsonNode itemNode = itemNodeArray.get(i); |
| if (flowFilesToTransfer.size() > i) { |
| FlowFile flowFile = flowFilesToTransfer.remove(i); |
| int status = itemNode.findPath("status").asInt(); |
| if (!isSuccess(status)) { |
| if (errorReason == null) { |
| // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason |
| String reason = itemNode.findPath("result").asText(); |
| if (StringUtils.isEmpty(reason)) { |
| // If there was no result, we expect an error with a string description in the "reason" field |
| reason = itemNode.findPath("reason").asText(); |
| } |
| errorReason = reason; |
| logger.error("Failed to process {} due to {}, transferring to failure", |
| new Object[]{flowFile, errorReason}); |
| } |
| flowFile = session.penalize(flowFile); |
| flowFile = session.putAttribute(flowFile, "reason", errorReason); |
| session.transfer(flowFile, REL_FAILURE); |
| |
| } else { |
| session.transfer(flowFile, REL_SUCCESS); |
| // Record provenance event |
| session.getProvenanceReporter().send(flowFile, url.toString()); |
| } |
| } |
| } |
| } |
| } |
| // Transfer any remaining flowfiles to success |
| flowFilesToTransfer.forEach(file -> { |
| session.transfer(file, REL_SUCCESS); |
| // Record provenance event |
| session.getProvenanceReporter().send(file, url.toString()); |
| }); |
| } catch (IOException ioe) { |
| // Something went wrong when parsing the response, log the error and route to failure |
| logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe); |
| session.transfer(flowFilesToTransfer, REL_FAILURE); |
| context.yield(); |
| } |
| } else if (statusCode / 100 == 5) { |
| // 5xx -> RETRY, but a server error might last a while, so yield |
| logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...", |
| new Object[]{statusCode, getResponse.message()}); |
| session.transfer(flowFilesToTransfer, REL_RETRY); |
| context.yield(); |
| } else { // 1xx, 3xx, 4xx, etc. -> NO RETRY |
| logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()}); |
| session.transfer(flowFilesToTransfer, REL_FAILURE); |
| } |
| getResponse.close(); |
| } |
| } |
| } |