blob: 1fc1feb69a57940d52f3e806af17d4b38ce3616b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"segment", "split"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Segments a FlowFile into multiple smaller segments on byte boundaries. Each segment is given the following attributes: "
+ "fragment.identifier, fragment.index, fragment.count, segment.original.filename; these attributes can then be used by the "
+ "MergeContent processor in order to reconstitute the original FlowFile")
@WritesAttributes({
@WritesAttribute(attribute = "segment.identifier",
description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this "
+ "attribute. This attribute is added to maintain backward compatibility, but the fragment.identifier is preferred, as "
+ "it is designed to work in conjunction with the MergeContent Processor"),
@WritesAttribute(attribute = "segment.index",
description = "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile. "
+ "This attribute is added to maintain backward compatibility, but the fragment.index is preferred, as it is designed "
+ "to work in conjunction with the MergeContent Processor"),
@WritesAttribute(attribute = "segment.count",
description = "The number of segments generated from the parent FlowFile. This attribute is added to maintain backward compatibility, "
+ "but the fragment.count is preferred, as it is designed to work in conjunction with the MergeContent Processor"),
@WritesAttribute(attribute = "fragment.identifier",
description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index",
description = "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of segments generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ",
description = "The filename will be updated to include the parent's filename, the segment index, and the segment count")})
@SeeAlso(MergeContent.class)
public class SegmentContent extends AbstractProcessor {
public static final String SEGMENT_ID = "segment.identifier";
public static final String SEGMENT_INDEX = "segment.index";
public static final String SEGMENT_COUNT = "segment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder()
.name("Segment Size")
.description("The maximum data size in bytes for each segment")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.required(true)
.build();
public static final Relationship REL_SEGMENTS = new Relationship.Builder()
.name("segments")
.description("All segments will be sent to this relationship. If the file was small enough that it was not segmented, "
+ "a copy of the original is sent to this relationship as well as original")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original FlowFile will be sent to this relationship")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> propertyDescriptors;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SEGMENTS);
relationships.add(REL_ORIGINAL);
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SIZE);
this.propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String segmentId = UUID.randomUUID().toString();
final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
if (flowFile.getSize() <= segmentSize) {
flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1");
FlowFile clone = session.clone(flowFile);
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(clone, REL_SEGMENTS);
return;
}
int totalSegments = (int) (flowFile.getSize() / segmentSize);
if (totalSegments * segmentSize < flowFile.getSize()) {
totalSegments++;
}
final Map<String, String> segmentAttributes = new HashMap<>();
segmentAttributes.put(SEGMENT_ID, segmentId);
segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments));
segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
segmentAttributes.put(FRAGMENT_ID, segmentId);
segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments));
final Set<FlowFile> segmentSet = new HashSet<>();
for (int i = 1; i <= totalSegments; i++) {
final long segmentOffset = segmentSize * (i - 1);
FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset));
segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
segment = session.putAllAttributes(segment, segmentAttributes);
segmentSet.add(segment);
}
session.transfer(segmentSet, REL_SEGMENTS);
flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, segmentId, totalSegments);
session.transfer(flowFile, REL_ORIGINAL);
if (totalSegments <= 10) {
getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet});
} else {
getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments});
}
}
}