blob: 4634f041e153ee960e9b84ddb5a7d9746134085f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
@SideEffectFree
@TriggerSerially
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"rate control", "throttle", "rate", "throughput"})
@CapabilityDescription("Controls the rate at which data is transferred to follow-on processors."
+ " If you configure a very small Time Duration, then the accuracy of the throttle gets worse."
+ " You can improve this accuracy by decreasing the Yield Duration, at the expense of more Tasks given to the processor.")
public class ControlRate extends AbstractProcessor {
public static final String DATA_RATE = "data rate";
public static final String FLOWFILE_RATE = "flowfile count";
public static final String ATTRIBUTE_RATE = "attribute value";
public static final AllowableValue DATA_RATE_VALUE = new AllowableValue(DATA_RATE, DATA_RATE,
"Rate is controlled by counting bytes transferred per time duration.");
public static final AllowableValue FLOWFILE_RATE_VALUE = new AllowableValue(FLOWFILE_RATE, FLOWFILE_RATE,
"Rate is controlled by counting flowfiles transferred per time duration");
public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
"Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
// based on testing to balance commits and 10,000 FF swap limit
public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
.name("Rate Control Criteria")
.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
.required(true)
.allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
.defaultValue(DATA_RATE)
.build();
public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
.name("Maximum Rate")
.description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+ "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
.build();
public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Rate Controlled Attribute")
.description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
+ "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
+ "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
.name("Time Duration")
.description("The amount of time to which the Maximum Rate pertains. Changing this value resets the rate counters.")
.required(true)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.defaultValue("1 min")
.build();
public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Grouping Attribute")
.description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
+ "each value specified by the attribute with this name. Changing this value resets the rate counters.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are transferred to this relationship under normal conditions")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles will be routed to this relationship if they are missing a necessary Rate Controlled Attribute or the attribute is not in the expected format")
.build();
private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
private volatile String rateControlCriteria = null;
private volatile String rateControlAttribute = null;
private volatile String maximumRateStr = null;
private volatile String groupingAttributeName = null;
private volatile int timePeriodSeconds = 1;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RATE_CONTROL_CRITERIA);
properties.add(MAX_RATE);
properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
properties.add(TIME_PERIOD);
properties.add(GROUPING_ATTRIBUTE_NAME);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
final Validator rateValidator;
switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
case DATA_RATE:
rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
break;
case ATTRIBUTE_RATE:
rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
if (rateAttr == null) {
validationResults.add(new ValidationResult.Builder()
.subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
.explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
.build());
}
break;
case FLOWFILE_RATE:
default:
rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
break;
}
final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
if (!rateResult.isValid()) {
validationResults.add(rateResult);
}
return validationResults;
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
if (descriptor.equals(RATE_CONTROL_CRITERIA)
|| descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
|| descriptor.equals(GROUPING_ATTRIBUTE_NAME)
|| descriptor.equals(TIME_PERIOD)) {
// if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
throttleMap.clear();
} else if (descriptor.equals(MAX_RATE)) {
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches()) {
newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(newValue);
}
for (final Throttle throttle : throttleMap.values()) {
throttle.setMaxRate(newRate);
}
}
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
rateControlCriteria = context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase();
rateControlAttribute = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
maximumRateStr = context.getProperty(MAX_RATE).getValue().toUpperCase();
groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH));
if (flowFiles.isEmpty()) {
context.yield();
return;
}
// Periodically clear any Throttle that has not been used in more than 2 throttling periods
final long lastClearTime = lastThrottleClearTime.get();
final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
if (lastClearTime < throttleExpirationMillis) {
if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
while (itr.hasNext()) {
final Map.Entry<String, Throttle> entry = itr.next();
final Throttle throttle = entry.getValue();
if (throttle.tryLock()) {
try {
if (throttle.lastUpdateTime() < lastClearTime) {
itr.remove();
}
} finally {
throttle.unlock();
}
}
}
}
}
final ComponentLog logger = getLogger();
for (FlowFile flowFile : flowFiles) {
// call this to capture potential error
final long accrualAmount = getFlowFileAccrual(flowFile);
if (accrualAmount < 0) {
logger.error("Routing {} to 'failure' due to missing or invalid attribute", new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
} else {
logger.info("transferring {} to 'success'", new Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
}
}
}
/*
* Determine the amount this FlowFile will incur against the maximum allowed rate.
* If the value returned is negative then the flowfile given is missing the required attribute
* or the attribute has an invalid value for accrual.
*/
private long getFlowFileAccrual(FlowFile flowFile) {
long rateValue;
switch (rateControlCriteria) {
case DATA_RATE:
rateValue = flowFile.getSize();
break;
case FLOWFILE_RATE:
rateValue = 1;
break;
case ATTRIBUTE_RATE:
final String attributeValue = flowFile.getAttribute(rateControlAttribute);
if (attributeValue == null) {
return -1L;
}
if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
return -1L;
}
rateValue = Long.parseLong(attributeValue);
break;
default:
throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + rateControlCriteria);
}
return rateValue;
}
private static class Throttle extends ReentrantLock {
private final AtomicLong maxRate = new AtomicLong(1L);
private final long timePeriodMillis;
private final TimedBuffer<TimestampedLong> timedBuffer;
private final ComponentLog logger;
private volatile long penalizationPeriod = 0;
private volatile long penalizationExpired = 0;
private volatile long lastUpdateTime;
public Throttle(final int timePeriod, final TimeUnit unit, final ComponentLog logger) {
this.timePeriodMillis = TimeUnit.MILLISECONDS.convert(timePeriod, unit);
this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new LongEntityAccess());
this.logger = logger;
}
public void setMaxRate(final long maxRate) {
this.maxRate.set(maxRate);
}
public long lastUpdateTime() {
return lastUpdateTime;
}
public boolean tryAdd(final long value) {
final long now = System.currentTimeMillis();
if (penalizationExpired > now) {
return false;
}
final long maxRateValue = maxRate.get();
final TimestampedLong sum = timedBuffer.getAggregateValue(timePeriodMillis);
if (sum != null && sum.getValue() >= maxRateValue) {
if (logger.isDebugEnabled()) {
logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", new Object[]{sum.getValue(), sum.getTimestamp(), value});
}
return false;
}
// Implement the Throttle penalization based on how much extra 'amountOver' was allowed through
if (penalizationPeriod > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", new Object[]{penalizationPeriod});
}
penalizationExpired = now + penalizationPeriod;
penalizationPeriod = 0;
return false;
}
if (logger.isDebugEnabled()) {
logger.debug("current sum for throttle is {} at time {}, so allowing rate of {} through",
new Object[]{sum == null ? 0 : sum.getValue(), sum == null ? 0 : sum.getTimestamp(), value});
}
final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
if (transferred > maxRateValue) {
final long amountOver = transferred - maxRateValue;
// determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
final double pct = (double) amountOver / (double) maxRateValue;
this.penalizationPeriod = (long) (timePeriodMillis * pct);
if (logger.isDebugEnabled()) {
logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
}
}
lastUpdateTime = now;
return true;
}
}
private class ThrottleFilter implements FlowFileFilter {
private final int flowFilesPerBatch;
private int flowFilesInBatch = 0;
ThrottleFilter(final int maxFFPerBatch) {
flowFilesPerBatch = maxFFPerBatch;
}
@Override
public FlowFileFilterResult filter(FlowFile flowFile) {
long accrual = getFlowFileAccrual(flowFile);
if (accrual < 0) {
// this FlowFile is invalid for this configuration so let the processor deal with it
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
}
String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
// the flow file may not have the required attribute: in this case it is considered part
// of the DEFAULT_GROUP_ATTRIBUTE
if (groupName == null) {
groupName = DEFAULT_GROUP_ATTRIBUTE;
}
Throttle throttle = throttleMap.get(groupName);
if (throttle == null) {
throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(maximumRateStr).matches()) {
newRate = DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(maximumRateStr);
}
throttle.setMaxRate(newRate);
throttleMap.put(groupName, throttle);
}
throttle.lock();
try {
if (throttle.tryAdd(accrual)) {
flowFilesInBatch += 1;
if (flowFilesInBatch>= flowFilesPerBatch) {
flowFilesInBatch = 0;
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
} else {
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
}
}
} finally {
throttle.unlock();
}
// If we are not using a grouping attribute, then no FlowFile will be able to continue on. So we can
// just TERMINATE the iteration over FlowFiles.
// However, if we are using a grouping attribute, then another FlowFile in the queue may be able to proceed,
// so we want to continue our iteration.
if (groupingAttributeName == null) {
return FlowFileFilterResult.REJECT_AND_TERMINATE;
}
return FlowFileFilterResult.REJECT_AND_CONTINUE;
}
}
}