blob: 7f1bb31767ee1bebe09ee0ad7f379899709d9360 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
.description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
.description("Search hits are routed to this relationship.")
.build();
public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
.description("Aggregations are routed to this relationship.")
.build();
public static final AllowableValue FLOWFILE_PER_HIT = new AllowableValue(
"splitUp-yes",
"Per Hit",
"Flowfile per hit."
);
public static final AllowableValue FLOWFILE_PER_RESPONSE = new AllowableValue(
"splitUp-no",
"Per Response",
"Flowfile per response."
);
public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
.name("el-rest-split-up-hits")
.displayName("Search Results Split")
.description("Output a flowfile containing all hits or one flowfile for each individual hit.")
.allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT)
.defaultValue(FLOWFILE_PER_RESPONSE.getValue())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor AGGREGATION_RESULTS_SPLIT = new PropertyDescriptor.Builder()
.name("el-rest-split-up-aggregations")
.displayName("Aggregation Results Split")
.description("Output a flowfile containing all aggregations or one flowfile for each individual aggregation.")
.allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT)
.defaultValue(FLOWFILE_PER_RESPONSE.getValue())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
AtomicReference<ElasticSearchClientService> clientService;
String splitUpHits;
private String splitUpAggregations;
final ObjectMapper mapper = new ObjectMapper();
static {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_ORIGINAL);
rels.add(REL_FAILURE);
rels.add(REL_HITS);
rels.add(REL_AGGREGATIONS);
relationships = Collections.unmodifiableSet(rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(QUERY);
descriptors.add(QUERY_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(CLIENT_SERVICE);
descriptors.add(SEARCH_RESULTS_SPLIT);
descriptors.add(AGGREGATION_RESULTS_SPLIT);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
splitUpHits = context.getProperty(SEARCH_RESULTS_SPLIT).getValue();
splitUpAggregations = context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue();
}
@OnStopped
public void onStopped() {
this.clientService = null;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile input = null;
if (context.hasIncomingConnection()) {
input = session.get();
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
try {
final Q queryJsonParameters = buildJsonQueryParameters(input, context, session);
List<FlowFile> hitsFlowFiles = new ArrayList<>();
final StopWatch stopWatch = new StopWatch(true);
final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, context, input, stopWatch);
finishQuery(input, queryJsonParameters, session, context, response);
} catch (Exception ex) {
getLogger().error("Error processing flowfile.", ex);
if (input != null) {
session.transfer(input, REL_FAILURE);
}
context.yield();
}
}
abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException;
void populateCommonJsonQueryParameters(final Q queryJsonParameters, final FlowFile input, final ProcessContext context,
final ProcessSession session) throws IOException {
final String query = getQuery(input, context, session);
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
: null;
queryJsonParameters.setQuery(query);
queryJsonParameters.setIndex(index);
queryJsonParameters.setType(type);
queryJsonParameters.setQueryAttr(queryAttr);
}
abstract SearchResponse doQuery(final Q queryJsonParameters, final List<FlowFile> hitsFlowFiles, final ProcessSession session,
final ProcessContext context, final FlowFile input, final StopWatch stopWatch) throws IOException;
abstract void finishQuery(final FlowFile input, final Q queryParameters, final ProcessSession session, final ProcessContext context,
final SearchResponse response) throws IOException;
FlowFile createChildFlowFile(final ProcessSession session, final FlowFile parent) {
return parent != null ? session.create(parent) : session.create();
}
private FlowFile writeAggregationFlowFileContents(final String name, final Integer number, final String json,
final ProcessSession session, final FlowFile aggFlowFile,
final Map<String, String> attributes) {
FlowFile ff = session.write(aggFlowFile, out -> out.write(json.getBytes()));
ff = session.putAllAttributes(ff, new HashMap<String, String>(){{
if (name != null) {
put("aggregation.name", name);
}
if (number != null) {
put("aggregation.number", number.toString());
}
}});
return session.putAllAttributes(ff, attributes);
}
private void handleAggregations(final Map<String, Object> aggregations, final ProcessSession session,
final FlowFile parent, final Map<String, String> attributes,
final String transitUri, final StopWatch stopWatch) throws IOException {
if (aggregations != null && !aggregations.isEmpty()) {
final List<FlowFile> aggsFlowFiles = new ArrayList<>();
if (splitUpAggregations.equals(FLOWFILE_PER_HIT.getValue())) {
int aggCount = 0;
for (final Map.Entry<String, Object> agg : aggregations.entrySet()) {
final FlowFile aggFlowFile = createChildFlowFile(session, parent);
final String aggJson = mapper.writeValueAsString(agg.getValue());
aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount, aggJson, session, aggFlowFile, attributes));
}
} else {
final FlowFile aggFlowFile = createChildFlowFile(session, parent);
final String json = mapper.writeValueAsString(aggregations);
aggsFlowFiles.add(writeAggregationFlowFileContents(null, null, json, session, aggFlowFile, attributes));
}
if (!aggsFlowFiles.isEmpty()) {
session.transfer(aggsFlowFiles, REL_AGGREGATIONS);
aggsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
}
}
}
private FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
final FlowFile hitFlowFile, final Map<String, String> attributes) {
final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
attributes.put("hit.count", Integer.toString(count));
return session.putAllAttributes(ff, attributes);
}
/*
* The List<FlowFile> hitsFlowFiles parameter and return value are used in order to allow pagination of query results
* in AbstractPaginatedJsonQueryElasticsearch. The List is created in onTrigger and passed to doQuery => handleResponse => handleHits,
* for non-paginated queries the return value will always be an empty List as the FlowFiles will have been transferred;
* for paginated queries, the List could contain one (or more) FlowFiles, to which further hits may be appended when the next
* SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
*/
List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q queryJsonParameters, final ProcessSession session,
final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
final String transitUri, final StopWatch stopWatch) throws IOException {
if (hits != null && !hits.isEmpty()) {
if (FLOWFILE_PER_HIT.getValue().equals(splitUpHits)) {
for (final Map<String, Object> hit : hits) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
final String json = mapper.writeValueAsString(hit);
hitsFlowFiles.add(writeHitFlowFile(1, json, session, hitFlowFile, attributes));
}
} else {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
final String json = mapper.writeValueAsString(hits);
hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
}
}
transferResultFlowFiles(session, hitsFlowFiles, transitUri, stopWatch);
return hitsFlowFiles;
}
private void transferResultFlowFiles(final ProcessSession session, final List<FlowFile> hitsFlowFiles, final String transitUri,
final StopWatch stopWatch) {
// output any results
if (!hitsFlowFiles.isEmpty()) {
session.transfer(hitsFlowFiles, REL_HITS);
hitsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
hitsFlowFiles.clear();
}
}
List<FlowFile> handleResponse(final SearchResponse response, final boolean newQuery, final Q queryJsonParameters,
final List<FlowFile> hitsFlowFiles, final ProcessSession session, final FlowFile input,
final StopWatch stopWatch) throws IOException {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (StringUtils.isNotBlank(queryJsonParameters.getQueryAttr())) {
attributes.put(queryJsonParameters.getQueryAttr(), queryJsonParameters.getQuery());
}
final String transitUri = clientService.get().getTransitUrl(queryJsonParameters.getIndex(), queryJsonParameters.getType());
if (newQuery) {
// only output aggregations from initial query
// (omitted from subsequent pages as aggs are calculated across the entire result set, not per page)
handleAggregations(response.getAggregations(), session, input, attributes, transitUri, stopWatch);
}
final List<FlowFile> resultFlowFiles = handleHits(response.getHits(), queryJsonParameters, session, input,
attributes, hitsFlowFiles, transitUri, stopWatch);
queryJsonParameters.addHitCount(response.getHits().size());
return resultFlowFiles;
}
}