blob: 0edcba66d3eb9ec8077f564f59de6bc10650000e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.HashSet;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"counter","debug", "instrumentation"})
@CapabilityDescription("This processor allows users to set specific counters and key points in their flow. It is useful for debugging and basic counting functions.")
@ReadsAttribute(attribute = "counterName", description = "The name of the counter to update/get.")
public class UpdateCounter extends AbstractProcessor {
static final PropertyDescriptor COUNTER_NAME = new PropertyDescriptor.Builder()
.name("counter-name")
.displayName("Counter Name")
.description("The name of the counter you want to set the value of - supports expression language like ${counterName}")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor DELTA = new PropertyDescriptor.Builder()
.name("delta")
.displayName("Delta")
.description("Adjusts the counter by the specified delta for each flow file received. May be a positive or negative integer.")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("Counter was updated/retrieved")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(COUNTER_NAME);
descriptors.add(DELTA);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
session.adjustCounter(context.getProperty(COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue(),
Long.parseLong(context.getProperty(DELTA).evaluateAttributeExpressions(flowFile).getValue()),
false
);
session.transfer(flowFile, SUCCESS);
}
}