blob: 6e9da2e338e9294c0beecaa8f95784404a9dca2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.pattern;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.List;
import java.util.Objects;
/**
* Extended Put pattern capable of handling FlowFile groups.
* @param <FC> Function context class.
* @param <C> Connection class.
* @param <FFG> FlowFileGroup class.
*/
public class PutGroup<FC, C extends AutoCloseable, FFG extends PartialFunctions.FlowFileGroup> extends Put<FC, C> {
public PutGroup() {
// Just to make a composition valid.
this.putFlowFile = (context, session, functionContext, connection, inputFlowFile, result) -> {
throw new UnsupportedOperationException();
};
}
@FunctionalInterface
public interface PutFlowFiles<FC, C, FFG> {
void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection,
FFG inputFlowFileGroup, RoutingResult result) throws ProcessException;
}
@Override
protected void validateCompositePattern() {
super.validateCompositePattern();
Objects.requireNonNull(groupFlowFiles, "GroupFlowFiles function is required.");
}
/**
* PutGroup does not support PutFileFile function for single FlowFile.
* Throws UnsupportedOperationException if called.
*/
@Override
public void putFlowFile(PutFlowFile<FC, C> putFlowFile) {
throw new UnsupportedOperationException("PutFlowFile can not be used with PutGroup pattern. Specify PutFlowFiles instead.");
}
@FunctionalInterface
public interface GroupFlowFiles<FC, C, FFG> {
List<FFG> apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException;
}
private GroupFlowFiles<FC, C, FFG> groupFlowFiles;
private PutFlowFiles<FC, C, FFG> putFlowFiles;
/**
* Specify a function that groups input FlowFiles into FlowFile groups.
*/
public void groupFetchedFlowFiles(GroupFlowFiles<FC, C, FFG> f) {
groupFlowFiles = f;
}
/**
* Specify a function that puts an input FlowFile group to a target storage using a given connection.
*/
public void putFlowFiles(PutFlowFiles<FC, C, FFG> f) {
putFlowFiles = f;
}
@Override
protected void putFlowFiles(ProcessContext context, ProcessSession session, FC functionContext,
C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException {
final List<FFG> flowFileGroups = groupFlowFiles
.apply(context, session, functionContext, connection, flowFiles, result);
for (FFG group : flowFileGroups) {
putFlowFiles.apply(context, session, functionContext, connection, group, result);
}
}
}