blob: 57c172321d469fa78abf524d40ad6e8523f495f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.loading.LoadDistributionListener;
import org.apache.nifi.loading.LoadDistributionService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@TriggerWhenAnyDestinationAvailable
@Tags({"distribute", "load balance", "route", "round robin", "weighted"})
@CapabilityDescription("Distributes FlowFiles to downstream processors based on a Distribution Strategy. If using the Round Robin "
+ "strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional properties"
+ "can be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name "
+ "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
@DynamicProperty(name = "The relationship name(positive number)", value = "The relationship Weight(positive number)", description = "adding a "
+ "property with the name '5' and value '10' means that the relationship with name "
+ "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
@DynamicRelationship(name = "A number 1..<Number Of Relationships>", description = "FlowFiles are sent to this relationship per the "
+ "<Distribution Strategy>")
@WritesAttributes(
@WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the flow file has been routed through")
)
public class DistributeLoad extends AbstractProcessor {
public static final String STRATEGY_ROUND_ROBIN = "round robin";
public static final String STRATEGY_NEXT_AVAILABLE = "next available";
public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load distribution service";
public static final PropertyDescriptor NUM_RELATIONSHIPS = new PropertyDescriptor.Builder()
.name("Number of Relationships")
.description("Determines the number of Relationships to which the load should be distributed")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.build();
public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder()
.name("Distribution Strategy")
.description("Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all "
+ "destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 "
+ "destination can accept FlowFiles.")
.required(true)
.allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE)
.defaultValue(STRATEGY_ROUND_ROBIN)
.build();
public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
.name("Hostnames")
.description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter")
.required(true)
.addValidator((subject, input, context) -> {
ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
if (null == input) {
result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
.explanation("Need to specify delimited list of FQDNs").build();
return result;
}
String[] hostNames = input.split("(?:,+|;+|\\s+)");
for (String hostName : hostNames) {
if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
.explanation("Need a FQDN rather than a simple host name.").build();
return result;
}
}
return result;
}).build();
public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder()
.name("Load Distribution Service ID")
.description("The identifier of the Load Distribution Service")
.required(true)
.identifiesControllerService(LoadDistributionService.class)
.build();
public static final String RELATIONSHIP_ATTRIBUTE = "distribute.load.relationship";
private List<PropertyDescriptor> properties;
private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<DistributionStrategy>(new RoundRobinStrategy());
private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>();
private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
private volatile LoadDistributionListener myListener;
private final AtomicBoolean doSetProps = new AtomicBoolean(true);
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(createRelationship(1));
relationshipsRef.set(Collections.unmodifiableSet(relationships));
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(NUM_RELATIONSHIPS);
properties.add(DISTRIBUTION_STRATEGY);
this.properties = Collections.unmodifiableList(properties);
}
private static Relationship createRelationship(final int num) {
return new Relationship.Builder().name(String.valueOf(num))
.description("Where to route flowfiles for this relationship index").build();
}
@Override
public Set<Relationship> getRelationships() {
return relationshipsRef.get();
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (descriptor.equals(NUM_RELATIONSHIPS)) {
final Set<Relationship> relationships = new HashSet<>();
for (int i = 1; i <= Integer.parseInt(newValue); i++) {
relationships.add(createRelationship(i));
}
this.relationshipsRef.set(Collections.unmodifiableSet(relationships));
} else if (descriptor.equals(DISTRIBUTION_STRATEGY)) {
switch (newValue.toLowerCase()) {
case STRATEGY_ROUND_ROBIN:
strategyRef.set(new RoundRobinStrategy());
break;
case STRATEGY_NEXT_AVAILABLE:
strategyRef.set(new NextAvailableStrategy());
break;
case STRATEGY_LOAD_DISTRIBUTION_SERVICE:
strategyRef.set(new LoadDistributionStrategy());
}
doSetProps.set(true);
doCustomValidate.set(true);
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
if (strategyRef.get() instanceof LoadDistributionStrategy && doSetProps.getAndSet(false)) {
final List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
props.add(HOSTNAMES);
this.properties = Collections.unmodifiableList(props);
} else if (doSetProps.getAndSet(false)) {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(NUM_RELATIONSHIPS);
props.add(DISTRIBUTION_STRATEGY);
this.properties = Collections.unmodifiableList(props);
}
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
// validate that the property name is valid.
final int numRelationships = this.relationshipsRef.get().size();
try {
final int value = Integer.parseInt(propertyDescriptorName);
if (value <= 0 || value > numRelationships) {
return new PropertyDescriptor.Builder()
.addValidator(new InvalidPropertyNameValidator(propertyDescriptorName)).name(propertyDescriptorName).build();
}
} catch (final NumberFormatException e) {
return new PropertyDescriptor.Builder()
.addValidator(new InvalidPropertyNameValidator(propertyDescriptorName)).name(propertyDescriptorName).build();
}
// validate that the property value is valid
return new PropertyDescriptor.Builder().addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.name(propertyDescriptorName).dynamic(true).build();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Collection<ValidationResult> results = new ArrayList<>();
if (doCustomValidate.getAndSet(false)) {
String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue();
if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
// make sure Hostnames and Controller service are set
PropertyValue propDesc = validationContext.getProperty(HOSTNAMES);
if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
results.add(new ValidationResult.Builder().subject(HOSTNAMES.getName())
.explanation("Must specify Hostnames when using 'Load Distribution Strategy'").valid(false).build());
}
propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
results.add(new ValidationResult.Builder()
.subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName())
.explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'")
.valid(false).build());
}
if (results.isEmpty()) {
int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger();
String hostNamesValue = validationContext.getProperty(HOSTNAMES).getValue();
String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
int numHosts = 0;
for (String hostName : hostNames) {
if (StringUtils.isNotBlank(hostName)) {
hostNames[numHosts++] = hostName;
}
}
if (numHosts > numRels) {
results.add(new ValidationResult.Builder()
.subject("Number of Relationships and Hostnames")
.explanation("Number of Relationships must be equal to, or greater than, the number of host names")
.valid(false).build());
} else {
// create new relationships with descriptions of hostname
Set<Relationship> relsWithDesc = new TreeSet<>();
for (int i = 0; i < numHosts; i++) {
relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1))
.description(hostNames[i]).build());
}
// add add'l rels if configuration requires it...it probably shouldn't
for (int i = numHosts + 1; i <= numRels; i++) {
relsWithDesc.add(createRelationship(i));
}
relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc));
}
}
}
}
return results;
}
@OnScheduled
public void createWeightedList(final ProcessContext context) {
final Map<Integer, Integer> weightings = new LinkedHashMap<>();
String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue();
if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
String hostNamesValue = context.getProperty(HOSTNAMES).getValue();
String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
Set<String> hostNameSet = new HashSet<>();
for (String hostName : hostNames) {
if (StringUtils.isNotBlank(hostName)) {
hostNameSet.add(hostName);
}
}
LoadDistributionService svc = context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class);
myListener = new LoadDistributionListener() {
@Override
public void update(Map<String, Integer> loadInfo) {
for (Relationship rel : relationshipsRef.get()) {
String hostname = rel.getDescription();
Integer weight = 1;
if (loadInfo.containsKey(hostname)) {
weight = loadInfo.get(hostname);
}
weightings.put(Integer.decode(rel.getName()), weight);
}
updateWeightedRelationships(weightings);
}
};
Map<String, Integer> loadInfo = svc.getLoadDistribution(hostNameSet, myListener);
for (Relationship rel : relationshipsRef.get()) {
String hostname = rel.getDescription();
Integer weight = 1;
if (loadInfo.containsKey(hostname)) {
weight = loadInfo.get(hostname);
}
weightings.put(Integer.decode(rel.getName()), weight);
}
} else {
final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
for (int i = 1; i <= numRelationships; i++) {
weightings.put(i, 1);
}
for (final PropertyDescriptor propDesc : context.getProperties().keySet()) {
if (!this.properties.contains(propDesc)) {
final int relationship = Integer.parseInt(propDesc.getName());
final int weighting = context.getProperty(propDesc).asInteger();
weightings.put(relationship, weighting);
}
}
}
updateWeightedRelationships(weightings);
}
private void updateWeightedRelationships(final Map<Integer, Integer> weightings) {
final List<Relationship> relationshipList = new ArrayList<>();
for (final Map.Entry<Integer, Integer> entry : weightings.entrySet()) {
final String relationshipName = String.valueOf(entry.getKey());
final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
for (int i = 0; i < entry.getValue(); i++) {
relationshipList.add(relationship);
}
}
this.weightedRelationshipListRef.set(Collections.unmodifiableList(relationshipList));
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final DistributionStrategy strategy = strategyRef.get();
final Set<Relationship> available = context.getAvailableRelationships();
final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
final boolean allDestinationsAvailable = (available.size() == numRelationships);
if (!allDestinationsAvailable && strategy.requiresAllDestinationsAvailable()) {
// can't transfer the FlowFiles. Roll back and yield
session.rollback();
context.yield();
return;
}
final Relationship relationship = strategy.mapToRelationship(context, flowFile);
if (relationship == null) {
// can't transfer the FlowFiles. Roll back and yield
session.rollback();
context.yield();
return;
}
// add an attribute capturing which relationship a flowfile was routed through
session.putAttribute(flowFile, RELATIONSHIP_ATTRIBUTE, relationship.getName());
session.transfer(flowFile, relationship);
session.getProvenanceReporter().route(flowFile, relationship);
}
private static class InvalidPropertyNameValidator implements Validator {
private final String propertyName;
public InvalidPropertyNameValidator(final String propertyName) {
this.propertyName = propertyName;
}
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) {
return new ValidationResult.Builder().subject("Property Name").input(propertyName)
.explanation("Property Name must be a positive integer between 1 and the number of relationships (inclusive)")
.valid(false).build();
}
}
/**
* Implementations must be thread-safe.
*/
private static interface DistributionStrategy {
/**
* @param context context
* @param flowFile flowFile
* @return a mapping of FlowFile to Relationship or <code>null</code> if the needed relationships are not available to accept files
*/
Relationship mapToRelationship(ProcessContext context, FlowFile flowFile);
boolean requiresAllDestinationsAvailable();
}
private class LoadDistributionStrategy implements DistributionStrategy {
private final AtomicLong counter = new AtomicLong(0L);
@Override
public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
final int numRelationships = relationshipList.size();
// create a HashSet that contains all of the available relationships, as calling #contains on HashSet
// is much faster than calling it on a List
boolean foundFreeRelationship = false;
Relationship relationship = null;
int attempts = 0;
while (!foundFreeRelationship) {
final long counterValue = counter.getAndIncrement();
final int idx = (int) (counterValue % numRelationships);
relationship = relationshipList.get(idx);
foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
return null;
}
}
return relationship;
}
@Override
public boolean requiresAllDestinationsAvailable() {
return false;
}
}
private class RoundRobinStrategy implements DistributionStrategy {
private final AtomicLong counter = new AtomicLong(0L);
@Override
public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
final long counterValue = counter.getAndIncrement();
final int idx = (int) (counterValue % relationshipList.size());
final Relationship relationship = relationshipList.get(idx);
return relationship;
}
@Override
public boolean requiresAllDestinationsAvailable() {
return true;
}
}
private class NextAvailableStrategy implements DistributionStrategy {
private final AtomicLong counter = new AtomicLong(0L);
@Override
public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
final int numRelationships = relationshipList.size();
// create a HashSet that contains all of the available relationships, as calling #contains on HashSet
// is much faster than calling it on a List
boolean foundFreeRelationship = false;
Relationship relationship = null;
int attempts = 0;
while (!foundFreeRelationship) {
final long counterValue = counter.getAndIncrement();
final int idx = (int) (counterValue % numRelationships);
relationship = relationshipList.get(idx);
foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
return null;
}
}
return relationship;
}
@Override
public boolean requiresAllDestinationsAvailable() {
return false;
}
}
}