blob: 90bd461f4f7273839331c495f1600fc758d3c19c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.OperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public abstract class AbstractByQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("If the \"by query\" operation fails, and a flowfile was read, it will be sent to this relationship.")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("If the \"by query\" operation succeeds, and a flowfile was read, it will be sent to this relationship.")
.build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
private volatile ElasticSearchClientService clientService;
static {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(QUERY);
descriptors.add(QUERY_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(CLIENT_SERVICE);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
abstract String getTookAttribute();
abstract String getErrorAttribute();
abstract OperationResponse performOperation(final ElasticSearchClientService clientService, final String query,
final String index, final String type,
final Map<String, String> requestParameters);
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile input = null;
if (context.hasIncomingConnection()) {
input = session.get();
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
try {
final String query = getQuery(input, context, session);
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
final String type = context.getProperty(TYPE).isSet()
? context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue()
: null;
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
: null;
final OperationResponse or = performOperation(this.clientService, query, index, type, getUrlQueryParameters(context, input));
if (input == null) {
input = session.create();
}
final Map<String, String> attrs = new HashMap<>();
attrs.put(getTookAttribute(), String.valueOf(or.getTook()));
if (!StringUtils.isBlank(queryAttr)) {
attrs.put(queryAttr, query);
}
input = session.putAllAttributes(input, attrs);
session.transfer(input, REL_SUCCESS);
} catch (final Exception e) {
if (input != null) {
input = session.putAttribute(input, getErrorAttribute(), e.getMessage());
session.transfer(input, REL_FAILURE);
}
getLogger().error("Error running \"by query\" operation: ", e);
context.yield();
}
}
}