blob: 90b4c41986aad73fca012e44dd6c415286a968b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.util.Collection;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.detect.Detector;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaMetadataKeys;
import org.apache.tika.mime.MediaType;
import org.apache.tika.mime.MimeType;
import org.apache.tika.mime.MimeTypes;
import org.apache.tika.mime.MimeTypesFactory;
import org.apache.tika.mime.MimeTypeException;
/**
* <p>
* Attempts to detect the MIME Type of a FlowFile by examining its contents. If the MIME Type is determined, it is added
* to an attribute with the name mime.type. In addition, mime.extension is set if a common file extension is known.
* </p>
*
* <p>
* MIME Type detection is performed by Apache Tika; more information about detection is available at http://tika.apache.org.
*
* <ul>
* <li>application/flowfile-v3</li>
* <li>application/flowfile-v1</li>
* </ul>
* </p>
*/
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"compression", "gzip", "bzip2", "zip", "MIME", "mime.type", "file", "identify"})
@CapabilityDescription("Attempts to identify the MIME Type used for a FlowFile. If the MIME Type can be identified, "
+ "an attribute with the name 'mime.type' is added with the value being the MIME Type. If the MIME Type cannot be determined, "
+ "the value will be set to 'application/octet-stream'. In addition, the attribute mime.extension will be set if a common file "
+ "extension for the MIME Type is known. If both Config File and Config Body are not set, the default NiFi MIME Types will "
+ "be used.")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "This Processor sets the FlowFile's mime.type attribute to the detected MIME Type. "
+ "If unable to detect the MIME Type, the attribute's value will be set to application/octet-stream"),
@WritesAttribute(attribute = "mime.extension", description = "This Processor sets the FlowFile's mime.extension attribute to the file "
+ "extension associated with the detected MIME Type. "
+ "If there is no correlated extension, the attribute's value will be empty")
}
)
public class IdentifyMimeType extends AbstractProcessor {
public static final PropertyDescriptor USE_FILENAME_IN_DETECTION = new PropertyDescriptor.Builder()
.displayName("Use Filename In Detection")
.name("use-filename-in-detection")
.description("If true will pass the filename to Tika to aid in detection.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor MIME_CONFIG_FILE = new PropertyDescriptor.Builder()
.displayName("Config File")
.name("config-file")
.required(false)
.description("Path to MIME type config file. Only one of Config File or Config Body may be used.")
.addValidator(new StandardValidators.FileExistsValidator(true))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIME_CONFIG_BODY = new PropertyDescriptor.Builder()
.displayName("Config Body")
.name("config-body")
.required(false)
.description("Body of MIME type config file. Only one of Config File or Config Body may be used.")
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are routed to success")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private final TikaConfig config;
private Detector detector;
private MimeTypes mimeTypes;
public IdentifyMimeType() {
this.config = TikaConfig.getDefaultConfig();
}
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(USE_FILENAME_IN_DETECTION);
properties.add(MIME_CONFIG_BODY);
properties.add(MIME_CONFIG_FILE);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(rels);
}
@OnScheduled
public void setup(final ProcessContext context) {
String configBody = context.getProperty(MIME_CONFIG_BODY).getValue();
String configFile = context.getProperty(MIME_CONFIG_FILE).evaluateAttributeExpressions().getValue();
if (configBody == null && configFile == null){
this.detector = config.getDetector();
this.mimeTypes = config.getMimeRepository();
} else if (configBody != null) {
try {
this.detector = MimeTypesFactory.create(new ByteArrayInputStream(configBody.getBytes()));
this.mimeTypes = (MimeTypes)this.detector;
} catch (Exception e) {
context.yield();
throw new ProcessException("Failed to load config body", e);
}
} else {
try (final FileInputStream fis = new FileInputStream(configFile);
final InputStream bis = new BufferedInputStream(fis)) {
this.detector = MimeTypesFactory.create(bis);
this.mimeTypes = (MimeTypes)this.detector;
} catch (Exception e) {
context.yield();
throw new ProcessException("Failed to load config file", e);
}
}
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final AtomicReference<String> mimeTypeRef = new AtomicReference<>(null);
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream stream) throws IOException {
try (final InputStream in = new BufferedInputStream(stream);
final TikaInputStream tikaStream = TikaInputStream.get(in)) {
Metadata metadata = new Metadata();
if (filename != null && context.getProperty(USE_FILENAME_IN_DETECTION).asBoolean()) {
metadata.add(TikaMetadataKeys.RESOURCE_NAME_KEY, filename);
}
// Get mime type
MediaType mediatype = detector.detect(tikaStream, metadata);
mimeTypeRef.set(mediatype.toString());
}
}
});
String mimeType = mimeTypeRef.get();
String extension = "";
try {
MimeType mimetype;
mimetype = mimeTypes.forName(mimeType);
extension = mimetype.getExtension();
} catch (MimeTypeException ex) {
logger.warn("MIME type extension lookup failed: {}", new Object[]{ex});
}
// Workaround for bug in Tika - https://issues.apache.org/jira/browse/TIKA-1563
if (mimeType != null && mimeType.equals("application/gzip") && extension.equals(".tgz")) {
extension = ".gz";
}
if (mimeType == null) {
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/octet-stream");
flowFile = session.putAttribute(flowFile, "mime.extension", "");
logger.info("Unable to identify MIME Type for {}; setting to application/octet-stream", new Object[]{flowFile});
} else {
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeType);
flowFile = session.putAttribute(flowFile, "mime.extension", extension);
logger.info("Identified {} as having MIME Type {}", new Object[]{flowFile, mimeType});
}
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Set<ValidationResult> results = new HashSet<>();
String body = validationContext.getProperty(MIME_CONFIG_BODY).getValue();
String file = validationContext.getProperty(MIME_CONFIG_FILE).getValue();
if(body != null && file != null) {
results.add(new ValidationResult.Builder()
.input(MIME_CONFIG_FILE.getName())
.subject(file)
.valid(false)
.explanation("Can only specify Config Body or Config File. Not both.")
.build());
}
return results;
}
}