blob: 46891a4238795cfe58719e0b8feae9f0457e4201 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.bin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.FlowFileSessionWrapper;
import org.apache.nifi.processor.util.StandardValidators;
/**
* Base class for file-binning processors.
*
*/
public abstract class BinFiles extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
.name("Minimum Group Size")
.description("The minimum size of for the bundle")
.required(true)
.defaultValue("0 B")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Group Size")
.description("The maximum size for the bundle. If not specified, there is no maximum.")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder()
.name("Minimum Number of Entries")
.description("The minimum number of files to include in a bundle")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
.name("Maximum Number of Entries")
.description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
.name("Maximum number of Bins")
.description("Specifies the maximum number of bins that can be held in memory at any one time")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
.name("Max Bin Age")
.description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> "
+ "where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The FlowFiles that were used to create the bundle")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure")
.build();
private final BinManager binManager = new BinManager();
private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
@OnStopped
public final void resetState() {
binManager.purge();
Bin bin;
while ((bin = readyBins.poll()) != null) {
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
wrapper.getSession().rollback();
}
}
}
/**
* Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId().
*
* @param context context
* @param session session
* @param flowFile flowFile
* @return The flow file, possibly altered
*/
protected abstract FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
/**
* Returns a group ID representing a bin. This allows flow files to be binned into like groups.
*
* @param context context
* @param flowFile flowFile
* @return The appropriate group ID
*/
protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile);
/**
* Performs any additional setup of the bin manager. Called during the OnScheduled phase.
*
* @param binManager The bin manager
* @param context context
*/
protected abstract void setUpBinManager(BinManager binManager, ProcessContext context);
/**
* Processes a single bin. Implementing class is responsible for committing each session
*
* @param unmodifiableBin A reference to a single bin of flow file/session wrappers
* @param binContents A copy of the contents of the bin
* @param context The context
* @param session The session that created the bin
* @return Return true if the input bin was already committed. E.g., in case of a failure, the implementation may choose to transfer all binned files to Failure and commit their sessions. If
* false, the processBins() method will transfer the files to Original and commit the sessions
*
* @throws ProcessException if any problem arises while processing a bin of FlowFiles. All flow files in the bin will be transferred to failure and the ProcessSession provided by the 'session'
* argument rolled back
*/
protected abstract boolean processBin(Bin unmodifiableBin, List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException;
/**
* Allows additional custom validation to be done. This will be called from the parent's customValidation method.
*
* @param context The context
* @return Validation results indicating problems
*/
protected Collection<ValidationResult> additionalCustomValidation(final ValidationContext context) {
return new ArrayList<>();
}
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final int totalBinCount = binManager.getBinCount() + readyBins.size();
final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
final int flowFilesBinned;
if (totalBinCount < maxBinCount) {
flowFilesBinned = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned});
} else {
flowFilesBinned = 0;
getLogger().debug("Will not bin any FlowFiles because {} bins already exist;"
+ "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount});
}
if (!isScheduled()) {
return;
}
final int binsMigrated = migrateBins(context);
final int binsProcessed = processBins(context, sessionFactory);
//If we accomplished nothing then let's yield
if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
context.yield();
}
}
private int migrateBins(final ProcessContext context) {
int added = 0;
for (final Bin bin : binManager.removeReadyBins(true)) {
this.readyBins.add(bin);
added++;
}
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
// bins. So we may as well expire it now.
if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
final Bin bin = binManager.removeOldestBin();
if (bin != null) {
added++;
this.readyBins.add(bin);
}
}
return added;
}
private int processBins(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
final Bin bin = readyBins.poll();
if (bin == null) {
return 0;
}
final List<Bin> bins = new ArrayList<>();
bins.add(bin);
final ProcessorLog logger = getLogger();
final ProcessSession session = sessionFactory.createSession();
final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(bin.getContents());
boolean binAlreadyCommitted = false;
try {
binAlreadyCommitted = this.processBin(bin, binCopy, context, session);
} catch (final ProcessException e) {
logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
for (final FlowFileSessionWrapper wrapper : binCopy) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
wrapper.getSession().commit();
}
session.rollback();
return 1;
} catch (final Exception e) {
logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {binCopy.size(), e});
for (final FlowFileSessionWrapper wrapper : binCopy) {
wrapper.getSession().rollback();
}
session.rollback();
return 1;
}
// we first commit the bundle's session before the originals' sessions because if we are restarted or crash
// between commits, we favor data redundancy over data loss. Since we have no Distributed Transaction capability
// across multiple sessions, we cannot guarantee atomicity across the sessions
session.commit();
// If this bin's session has been committed, move on.
if (!binAlreadyCommitted) {
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
wrapper.getSession().commit();
}
}
return 1;
}
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
int flowFilesBinned = 0;
while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
if (!isScheduled()) {
break;
}
final ProcessSession session = sessionFactory.createSession();
FlowFile flowFile = session.get();
if (flowFile == null) {
break;
}
flowFile = this.preprocessFlowFile(context, session, flowFile);
String groupId = this.getGroupId(context, flowFile);
final boolean binned = binManager.offer(groupId, flowFile, session);
// could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
if (!binned) {
Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
bin.offer(flowFile, session);
this.readyBins.add(bin);
}
flowFilesBinned++;
}
return flowFilesBinned;
}
@OnScheduled
public final void onScheduled(final ProcessContext context) throws IOException {
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
if (context.getProperty(MAX_BIN_AGE).isSet()) {
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
} else {
binManager.setMaxBinAge(Integer.MAX_VALUE);
}
if (context.getProperty(MAX_SIZE).isSet()) {
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
} else {
binManager.setMaximumSize(Long.MAX_VALUE);
}
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
if (context.getProperty(MAX_ENTRIES).isSet()) {
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
} else {
binManager.setMaximumEntries(Integer.MAX_VALUE);
}
this.setUpBinManager(binManager, context);
}
@Override
protected final Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
if (maxBytes != null && maxBytes.longValue() < minBytes) {
problems.add(
new ValidationResult.Builder()
.subject(MIN_SIZE.getName())
.input(context.getProperty(MIN_SIZE).getValue())
.valid(false)
.explanation("Min Size must be less than or equal to Max Size")
.build()
);
}
final Long min = context.getProperty(MIN_ENTRIES).asLong();
final Long max = context.getProperty(MAX_ENTRIES).asLong();
if (min != null && max != null) {
if (min > max) {
problems.add(
new ValidationResult.Builder().subject(MIN_ENTRIES.getName())
.input(context.getProperty(MIN_ENTRIES).getValue())
.valid(false)
.explanation("Min Entries must be less than or equal to Max Entries")
.build()
);
}
}
Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
if (otherProblems != null) {
problems.addAll(otherProblems);
}
return problems;
}
}