package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
.description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
.description("Search hits are routed to this relationship.")
public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
.description("Aggregations are routed to this relationship.")
public static final AllowableValue FLOWFILE_PER_HIT = new AllowableValue(
"Per Hit",
"Flowfile per hit."
public static final AllowableValue FLOWFILE_PER_RESPONSE = new AllowableValue(
"Per Response",
"Flowfile per response."
public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
.displayName("Search Results Split")
.description("Output a flowfile containing all hits or one flowfile for each individual hit.")
public static final PropertyDescriptor AGGREGATION_RESULTS_SPLIT = new PropertyDescriptor.Builder()
.displayName("Aggregation Results Split")
.description("Output a flowfile containing all aggregations or one flowfile for each individual aggregation.")
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
AtomicReference<ElasticSearchClientService> clientService;
String splitUpHits;
private String splitUpAggregations;
final ObjectMapper mapper = new ObjectMapper();
static {
final Set<Relationship> rels = new HashSet<>();
relationships = Collections.unmodifiableSet(rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
propertyDescriptors = Collections.unmodifiableList(descriptors);
public Set<Relationship> getRelationships() {
return relationships;
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
public void onScheduled(final ProcessContext context) {
clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
splitUpHits = context.getProperty(SEARCH_RESULTS_SPLIT).getValue();
splitUpAggregations = context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue();
public void onStopped() {
this.clientService = null;
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile input = null;
if (context.hasIncomingConnection()) {
input = session.get();
if (input == null && context.hasNonLoopConnection()) {
try {
final Q queryJsonParameters = buildJsonQueryParameters(input, context, session);
List<FlowFile> hitsFlowFiles = new ArrayList<>();
final StopWatch stopWatch = new StopWatch(true);
final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, context, input, stopWatch);
finishQuery(input, queryJsonParameters, session, context, response);
} catch (Exception ex) {
getLogger().error("Error processing flowfile.", ex);
if (input != null) {
session.transfer(input, REL_FAILURE);
abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException;
void populateCommonJsonQueryParameters(final Q queryJsonParameters, final FlowFile input, final ProcessContext context,
final ProcessSession session) throws IOException {
final String query = getQuery(input, context, session);
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
: null;
abstract SearchResponse doQuery(final Q queryJsonParameters, final List<FlowFile> hitsFlowFiles, final ProcessSession session,
final ProcessContext context, final FlowFile input, final StopWatch stopWatch) throws IOException;
abstract void finishQuery(final FlowFile input, final Q queryParameters, final ProcessSession session, final ProcessContext context,
final SearchResponse response) throws IOException;
FlowFile createChildFlowFile(final ProcessSession session, final FlowFile parent) {
return parent != null ? session.create(parent) : session.create();
private FlowFile writeAggregationFlowFileContents(final String name, final Integer number, final String json,
final ProcessSession session, final FlowFile aggFlowFile,
final Map<String, String> attributes) {
FlowFile ff = session.write(aggFlowFile, out -> out.write(json.getBytes()));
ff = session.putAllAttributes(ff, new HashMap<String, String>(){{
if (name != null) {
put("", name);
if (number != null) {
put("aggregation.number", number.toString());
return session.putAllAttributes(ff, attributes);
private void handleAggregations(final Map<String, Object> aggregations, final ProcessSession session,
final FlowFile parent, final Map<String, String> attributes,
final String transitUri, final StopWatch stopWatch) throws IOException {
if (aggregations != null && !aggregations.isEmpty()) {
final List<FlowFile> aggsFlowFiles = new ArrayList<>();
if (splitUpAggregations.equals(FLOWFILE_PER_HIT.getValue())) {
int aggCount = 0;
for (final Map.Entry<String, Object> agg : aggregations.entrySet()) {
final FlowFile aggFlowFile = createChildFlowFile(session, parent);
final String aggJson = mapper.writeValueAsString(agg.getValue());
aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount, aggJson, session, aggFlowFile, attributes));
} else {
final FlowFile aggFlowFile = createChildFlowFile(session, parent);
final String json = mapper.writeValueAsString(aggregations);
aggsFlowFiles.add(writeAggregationFlowFileContents(null, null, json, session, aggFlowFile, attributes));
if (!aggsFlowFiles.isEmpty()) {
session.transfer(aggsFlowFiles, REL_AGGREGATIONS);
aggsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
private FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
final FlowFile hitFlowFile, final Map<String, String> attributes) {
final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
attributes.put("hit.count", Integer.toString(count));
return session.putAllAttributes(ff, attributes);
* The List<FlowFile> hitsFlowFiles parameter and return value are used in order to allow pagination of query results
* in AbstractPaginatedJsonQueryElasticsearch. The List is created in onTrigger and passed to doQuery => handleResponse => handleHits,
* for non-paginated queries the return value will always be an empty List as the FlowFiles will have been transferred;
* for paginated queries, the List could contain one (or more) FlowFiles, to which further hits may be appended when the next
* SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q queryJsonParameters, final ProcessSession session,
final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
final String transitUri, final StopWatch stopWatch) throws IOException {
if (hits != null && !hits.isEmpty()) {
if (FLOWFILE_PER_HIT.getValue().equals(splitUpHits)) {
for (final Map<String, Object> hit : hits) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
final String json = mapper.writeValueAsString(hit);
hitsFlowFiles.add(writeHitFlowFile(1, json, session, hitFlowFile, attributes));
} else {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
final String json = mapper.writeValueAsString(hits);
hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
transferResultFlowFiles(session, hitsFlowFiles, transitUri, stopWatch);
return hitsFlowFiles;
private void transferResultFlowFiles(final ProcessSession session, final List<FlowFile> hitsFlowFiles, final String transitUri,
final StopWatch stopWatch) {
// output any results
if (!hitsFlowFiles.isEmpty()) {
session.transfer(hitsFlowFiles, REL_HITS);
hitsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
List<FlowFile> handleResponse(final SearchResponse response, final boolean newQuery, final Q queryJsonParameters,
final List<FlowFile> hitsFlowFiles, final ProcessSession session, final FlowFile input,
final StopWatch stopWatch) throws IOException {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (StringUtils.isNotBlank(queryJsonParameters.getQueryAttr())) {
attributes.put(queryJsonParameters.getQueryAttr(), queryJsonParameters.getQuery());
final String transitUri = clientService.get().getTransitUrl(queryJsonParameters.getIndex(), queryJsonParameters.getType());
if (newQuery) {
// only output aggregations from initial query
// (omitted from subsequent pages as aggs are calculated across the entire result set, not per page)
handleAggregations(response.getAggregations(), session, input, attributes, transitUri, stopWatch);
final List<FlowFile> resultFlowFiles = handleHits(response.getHits(), queryJsonParameters, session, input,
attributes, hitsFlowFiles, transitUri, stopWatch);
return resultFlowFiles;