blob: 8d67d3cf2224741d8099bfd3b3cca3b18af389a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@SideEffectFree
@Tags({"split", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"})
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer for the FlowFiles routed to the 'splits' Relationship."),
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship."),
@WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of split FlowFiles generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")
})
@CapabilityDescription("Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles")
public class SplitRecord extends AbstractProcessor {
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("Record Writer")
.description("Specifies the Controller Service to use for writing out the records")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
static final PropertyDescriptor RECORDS_PER_SPLIT = new PropertyDescriptor.Builder()
.name("Records Per Split")
.description("Specifies how many records should be written to each 'split' or 'segment' FlowFile")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
static final Relationship REL_SPLITS = new Relationship.Builder()
.name("splits")
.description("The individual 'segments' of the original FlowFile will be routed to this relationship.")
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile cannot be transformed from the configured input format to the configured output format, "
+ "the unchanged FlowFile will be routed to this relationship.")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RECORD_READER);
properties.add(RECORD_WRITER);
properties.add(RECORDS_PER_SPLIT);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SPLITS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
return;
}
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final int maxRecords = Math.max(1, context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger());
final List<FlowFile> splits = new ArrayList<>();
final Map<String, String> originalAttributes = original.getAttributes();
final String fragmentId = UUID.randomUUID().toString();
try {
session.read(original, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) {
final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
int fragmentIndex = 0;
while (pushbackSet.isAnotherRecord()) {
FlowFile split = session.create(original);
try {
final Map<String, String> attributes = new HashMap<>();
final WriteResult writeResult;
try (final OutputStream out = session.write(split);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, split)) {
if (maxRecords == 1) {
final Record record = pushbackSet.next();
writeResult = writer.write(record);
} else {
final RecordSet limitedSet = pushbackSet.limit(maxRecords);
writeResult = writer.write(limitedSet);
}
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
attributes.put(FRAGMENT_ID, fragmentId);
attributes.put(SEGMENT_ORIGINAL_FILENAME, original.getAttribute(CoreAttributes.FILENAME.key()));
attributes.putAll(writeResult.getAttributes());
session.adjustCounter("Records Split", writeResult.getRecordCount(), false);
}
split = session.putAllAttributes(split, attributes);
} finally {
splits.add(split);
}
fragmentIndex++;
}
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Failed to parse incoming data", e);
}
}
});
} catch (final ProcessException pe) {
getLogger().error("Failed to split {}", new Object[] {original, pe});
session.remove(splits);
session.transfer(original, REL_FAILURE);
return;
}
final FlowFile originalFlowFile = FragmentAttributes.copyAttributesToOriginal(session, original, fragmentId, splits.size());
session.transfer(originalFlowFile, REL_ORIGINAL);
// Add the fragment count to each split
for(FlowFile split : splits) {
session.putAttribute(split, FRAGMENT_COUNT, String.valueOf(splits.size()));
}
session.transfer(splits, REL_SPLITS);
getLogger().info("Successfully split {} into {} FlowFiles, each containing up to {} records", new Object[] {original, splits.size(), maxRecords});
}
}