* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"})
@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as "
+ "the index to insert into and the type of the document.")
name = "A URL query parameter",
value = "The value to set it to",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder()
.displayName("Identifier Attribute")
.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", "
+ "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be "
+ "auto-generated by Elasticsearch. For all other Index Operations, the attribute must evaluate to a non-empty value.")
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.description("The name of the index to insert into")
AttributeExpression.ResultType.STRING, true))
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+ "This must be unset or '_doc' for Elasticsearch 7.0+.")
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
.displayName("Index Operation")
.description("The type of the operation used to index (create, index, update, upsert, delete)")
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.displayName("Batch Size")
.description("The preferred number of flow files to put to the database in a single transaction. Note that the contents of the "
+ "flow files will be stored in memory until the bulk operation is performed. Also the results should be returned in the "
+ "same order the flow files were received.")
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
static {
final Set<Relationship> _rels = new HashSet<>();
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
propertyDescriptors = Collections.unmodifiableList(descriptors);
public Set<Relationship> getRelationships() {
return relationships;
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
// Since Expression Language is allowed for index operation, we can't guarantee that we can catch
// all invalid configurations, but we should catch them as soon as we can. For example, if the
// Identifier Attribute property is empty, the Index Operation must evaluate to "index".
String idAttribute = validationContext.getProperty(ID_ATTRIBUTE).getValue();
String indexOp = validationContext.getProperty(INDEX_OP).getValue();
if (StringUtils.isEmpty(idAttribute)) {
switch (indexOp.toLowerCase()) {
case "update":
case "upsert":
case "delete":
case "":
problems.add(new ValidationResult.Builder()
.explanation("If Identifier Attribute is not set, Index Operation must evaluate to one of \"index\" or \"create\"")
return problems;
public void setup(ProcessContext context) {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles.isEmpty()) {
final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
OkHttpClient okHttpClient = getClient();
final ComponentLog logger = getLogger();
// Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list.
List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
final StringBuilder sb = new StringBuilder();
final String baseUrl = context.getProperty(ES_URL).evaluateAttributeExpressions().getValue().trim();
if (StringUtils.isEmpty(baseUrl)) {
throw new ProcessException("Elasticsearch URL is empty or null, this indicates an invalid Expression (missing variables, e.g.)");
HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk");
// Find the user-added properties and set them as query parameters on the URL
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
PropertyDescriptor pd = property.getKey();
if (pd.isDynamic()) {
if (property.getValue() != null) {
urlBuilder = urlBuilder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue());
final URL url =;
for (FlowFile file : flowFiles) {
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue());
if (StringUtils.isEmpty(index)) {
logger.error("No value for index in for {}, transferring to failure", new Object[]{id_attribute, file});
session.transfer(file, REL_FAILURE);
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue();
if (StringUtils.isEmpty(indexOp)) {
logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{file});
session.transfer(file, REL_FAILURE);
switch (indexOp.toLowerCase()) {
case "create":
case "index":
case "update":
case "upsert":
case "delete":
logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, file});
session.transfer(file, REL_FAILURE);
final String id = (id_attribute != null) ? file.getAttribute(id_attribute) : null;
// The ID must be valid for all operations except "index". For that case,
// a missing ID indicates one is to be auto-generated by Elasticsearch
if (id == null && !(indexOp.equalsIgnoreCase("index") || indexOp.equalsIgnoreCase("create"))) {
logger.error("Index operation {} requires a valid identifier value from a flow file attribute, transferring to failure.",
new Object[]{indexOp, file});
session.transfer(file, REL_FAILURE);
final StringBuilder json = new StringBuilder();, in -> {
json.append(IOUtils.toString(in, charset).replace("\r\n", " ").replace('\n', ' ').replace('\r', ' '));
String jsonString = json.toString();
// Ensure the JSON body is well-formed
try {
} catch (IOException e) {
logger.error("Flow file content is not valid JSON, penalizing and transferring to failure.",
new Object[]{indexOp, file});
file = session.penalize(file);
session.transfer(file, REL_FAILURE);
buildBulkCommand(sb, index, docType, indexOp, id, jsonString);
if (!flowFilesToTransfer.isEmpty()) {
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString());
final Response getResponse;
try {
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
} catch (final Exception e) {
logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e);
flowFilesToTransfer.forEach((flowFileToTransfer) -> {
flowFileToTransfer = session.penalize(flowFileToTransfer);
session.transfer(flowFileToTransfer, REL_FAILURE);
final int statusCode = getResponse.code();
if (isSuccess(statusCode)) {
ResponseBody responseBody = getResponse.body();
try {
final byte[] bodyBytes = responseBody.bytes();
JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
boolean errors = responseJson.get("errors").asBoolean(false);
if (errors) {
ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
if (itemNodeArray.size() > 0) {
// All items are returned whether they succeeded or failed, so iterate through the item array
// at the same time as the flow file list, moving each to success or failure accordingly,
// but only keep the first error for logging
String errorReason = null;
for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
JsonNode itemNode = itemNodeArray.get(i);
if (flowFilesToTransfer.size() > i) {
FlowFile flowFile = flowFilesToTransfer.remove(i);
int status = itemNode.findPath("status").asInt();
if (!isSuccess(status)) {
if (errorReason == null) {
// Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
String reason = itemNode.findPath("result").asText();
if (StringUtils.isEmpty(reason)) {
// If there was no result, we expect an error with a string description in the "reason" field
reason = itemNode.findPath("reason").asText();
errorReason = reason;
logger.error("Failed to process {} due to {}, transferring to failure",
new Object[]{flowFile, errorReason});
flowFile = session.penalize(flowFile);
flowFile = session.putAttribute(flowFile, "reason", errorReason);
session.transfer(flowFile, REL_FAILURE);
} else {
session.transfer(flowFile, REL_SUCCESS);
// Record provenance event
session.getProvenanceReporter().send(flowFile, url.toString());
// Transfer any remaining flowfiles to success
flowFilesToTransfer.forEach(file -> {
session.transfer(file, REL_SUCCESS);
// Record provenance event
session.getProvenanceReporter().send(file, url.toString());
} catch (IOException ioe) {
// Something went wrong when parsing the response, log the error and route to failure
logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe);
session.transfer(flowFilesToTransfer, REL_FAILURE);
} else if (statusCode / 100 == 5) {
// 5xx -> RETRY, but a server error might last a while, so yield
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...",
new Object[]{statusCode, getResponse.message()});
session.transfer(flowFilesToTransfer, REL_RETRY);
} else { // 1xx, 3xx, 4xx, etc. -> NO RETRY
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
session.transfer(flowFilesToTransfer, REL_FAILURE);