blob: f5082ea16a5b5427acc74773a37e859ab551ad43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@SupportsBatching
@Tags({"elasticsearch", "insert", "update", "write", "put"})
@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as "
+ "the index to insert into and the type of the document. If the cluster has been configured for authorization "
+ "and/or secure transport (SSL/TLS) and the Shield plugin is available, secure connections can be made. This processor "
+ "supports Elasticsearch 2.x clusters.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class PutElasticsearch extends AbstractElasticsearchTransportClientProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
.build();
public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Identifier Attribute")
.description("The name of the attribute containing the identifier for each FlowFile")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("Index")
.description("The name of the index to insert into")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
AttributeExpression.ResultType.STRING, true))
.build();
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("Type")
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
.name("Index Operation")
.description("The type of the operation used to index (index, update, upsert)")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.defaultValue("index")
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The preferred number of FlowFiles to put to the database in a single transaction")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("100")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
_rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CLUSTER_NAME);
descriptors.add(HOSTS);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(PROP_SHIELD_LOCATION);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(PING_TIMEOUT);
descriptors.add(SAMPLER_INTERVAL);
descriptors.add(ID_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(CHARSET);
descriptors.add(BATCH_SIZE);
descriptors.add(INDEX_OP);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
public void setup(ProcessContext context) {
super.setup(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles.isEmpty()) {
return;
}
// Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list.
List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
try {
final BulkRequestBuilder bulk = esClient.get().prepareBulk();
if (authToken != null) {
bulk.putHeader("Authorization", authToken);
}
for (FlowFile file : flowFiles) {
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue());
final String id = file.getAttribute(id_attribute);
if (id == null) {
logger.error("No value in identifier attribute {} for {}, transferring to failure", new Object[]{id_attribute, file});
flowFilesToTransfer.remove(file);
session.transfer(file, REL_FAILURE);
} else {
session.read(file, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
String json = IOUtils.toString(in, charset)
.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
if (indexOp.equalsIgnoreCase("index")) {
bulk.add(esClient.get().prepareIndex(index, docType, id)
.setSource(json.getBytes(charset)));
} else if (indexOp.equalsIgnoreCase("upsert")) {
bulk.add(esClient.get().prepareUpdate(index, docType, id)
.setDoc(json.getBytes(charset))
.setDocAsUpsert(true));
} else if (indexOp.equalsIgnoreCase("update")) {
bulk.add(esClient.get().prepareUpdate(index, docType, id)
.setDoc(json.getBytes(charset)));
} else {
throw new IOException("Index operation: " + indexOp + " not supported.");
}
}
});
}
}
final BulkResponse response = bulk.execute().actionGet();
if (response.hasFailures()) {
// Responses are guaranteed to be in order, remove them in reverse order
BulkItemResponse[] responses = response.getItems();
if (responses != null && responses.length > 0) {
for (int i = responses.length - 1; i >= 0; i--) {
final FlowFile flowFile = flowFilesToTransfer.get(i);
if (responses[i].isFailed()) {
logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
new Object[]{flowFile, responses[i].getFailure().getMessage()});
session.transfer(flowFile, REL_FAILURE);
} else {
session.getProvenanceReporter().send(flowFile, context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + responses[i].getIndex());
session.transfer(flowFile, REL_SUCCESS);
}
flowFilesToTransfer.remove(flowFile);
}
}
}
// Transfer any remaining flowfiles to success
flowFilesToTransfer.forEach(file -> {
session.transfer(file, REL_SUCCESS);
// Record provenance event
session.getProvenanceReporter().send(file, context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" +
context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue());
});
} catch (NoNodeAvailableException
| ElasticsearchTimeoutException
| ReceiveTimeoutTransportException
| NodeClosedException exceptionToRetry) {
// Authorization errors and other problems are often returned as NoNodeAvailableExceptions without a
// traceable cause. However the cause seems to be logged, just not available to this caught exception.
// Since the error message will show up as a bulletin, we make specific mention to check the logs for
// more details.
logger.error("Failed to insert into Elasticsearch due to {}. More detailed information may be available in " +
"the NiFi logs.",
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
session.transfer(flowFilesToTransfer, REL_RETRY);
context.yield();
} catch (Exception exceptionToFail) {
logger.error("Failed to insert into Elasticsearch due to {}, transferring to failure",
new Object[]{exceptionToFail.getLocalizedMessage()}, exceptionToFail);
session.transfer(flowFilesToTransfer, REL_FAILURE);
context.yield();
}
}
/**
* Dispose of ElasticSearch client
*/
@OnStopped
public void closeClient() {
super.closeClient();
}
}