blob: 9e2a6d4ef57823cb8c4020ac79348c76d8cb1e70 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.nifi.processors.mongodb.gridfs;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import org.bson.Document;
import org.bson.types.ObjectId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({"mongo", "gridfs", "put", "file", "store"})
@CapabilityDescription("Writes a file to a GridFS bucket.")
public class PutGridFS extends AbstractGridFSProcessor {
static final PropertyDescriptor PROPERTIES_PREFIX = new PropertyDescriptor.Builder()
.displayName("File Properties Prefix")
.description("Attributes that have this prefix will be added to the file stored in GridFS as metadata.")
static final AllowableValue NO_UNIQUE = new AllowableValue("none", "None", "No uniqueness will be enforced.");
static final AllowableValue UNIQUE_NAME = new AllowableValue("name", "Name", "Only the filename must " +
"be unique.");
static final AllowableValue UNIQUE_HASH = new AllowableValue("hash", "Hash", "Only the file hash must be " +
static final AllowableValue UNIQUE_BOTH = new AllowableValue("both", "Both", "Both the filename and hash " +
"must be unique.");
static final PropertyDescriptor ENFORCE_UNIQUENESS = new PropertyDescriptor.Builder()
.displayName("Enforce Uniqueness")
.description("When enabled, this option will ensure that uniqueness is enforced on the bucket. It will do so by creating a MongoDB index " +
"that matches your selection. It should ideally be configured once when the bucket is created for the first time because " +
"it could take a long time to build on an existing bucket wit a lot of data.")
static final PropertyDescriptor HASH_ATTRIBUTE = new PropertyDescriptor.Builder()
.displayName("Hash Attribute")
.description("If uniquness enforcement is enabled and the file hash is part of the constraint, this must be set to an attribute that " +
"exists on all incoming flowfiles.")
static final PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
.displayName("Chunk Size")
.description("Controls the maximum size of each chunk of a file uploaded into GridFS.")
.defaultValue("256 KB")
static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
.displayName("File Name")
.description("The name of the file in the bucket that is the target of this processor. GridFS file names do not " +
"include path information because GridFS does not sort files into folders within a bucket.")
static final Relationship REL_DUPLICATE = new Relationship.Builder()
.description("Flowfiles that fail the duplicate check are sent to this relationship.")
static final String ID_ATTRIBUTE = "";
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIP_SET;
static {
List _temp = new ArrayList<>();
DESCRIPTORS = Collections.unmodifiableList(_temp);
Set _rels = new HashSet();
RELATIONSHIP_SET = Collections.unmodifiableSet(_rels);
private String uniqueness;
private String hashAttribute;
public void onScheduled(ProcessContext context) {
this.uniqueness = context.getProperty(ENFORCE_UNIQUENESS).getValue();
this.hashAttribute = context.getProperty(HASH_ATTRIBUTE).evaluateAttributeExpressions().getValue();
this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
public Set<Relationship> getRelationships() {
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
GridFSBucket bucket = getBucket(input, context);
if (!canUploadFile(context, input, bucket.getBucketName())) {
getLogger().error("Cannot upload the file because of the uniqueness policy configured.");
session.transfer(input, REL_DUPLICATE);
final int chunkSize = context.getProperty(CHUNK_SIZE).evaluateAttributeExpressions(input).asDataSize(DataUnit.B).intValue();
try (InputStream fileInput = {
String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
GridFSUploadOptions options = new GridFSUploadOptions()
.metadata(getMetadata(input, context));
ObjectId id = bucket.uploadFromStream(fileName, fileInput, options);
if (id != null) {
input = session.putAttribute(input, ID_ATTRIBUTE, id.toString());
session.transfer(input, REL_SUCCESS);
session.getProvenanceReporter().send(input, getTransitUri(id, input, context));
} else {
getLogger().error("ID was null, assuming failure.");
session.transfer(input, REL_FAILURE);
} catch (Exception ex) {
getLogger().error("Failed to upload file", ex);
session.transfer(input, REL_FAILURE);
private boolean canUploadFile(ProcessContext context, FlowFile input, String bucketName) {
boolean retVal;
if (uniqueness.equals(NO_UNIQUE.getValue())) {
retVal = true;
} else {
final String fileName = input.getAttribute(CoreAttributes.FILENAME.key());
final String fileColl = String.format("%s.files", bucketName);
final String hash = input.getAttribute(hashAttribute);
if ((uniqueness.equals(UNIQUE_BOTH.getValue()) || uniqueness.equals(UNIQUE_HASH.getValue())) && StringUtils.isEmpty(hash)) {
throw new RuntimeException(String.format("Uniqueness mode %s was set and the hash attribute %s was not found.", uniqueness, hashAttribute));
Document query;
if (uniqueness.equals(UNIQUE_BOTH.getValue())) {
query = new Document().append("filename", fileName).append("md5", hash);
} else if (uniqueness.equals(UNIQUE_HASH.getValue())) {
query = new Document().append("md5", hash);
} else {
query = new Document().append("filename", fileName);
retVal = getDatabase(input, context).getCollection(fileColl).countDocuments(query) == 0;
return retVal;
private Document getMetadata(FlowFile input, ProcessContext context) {
final String prefix = context.getProperty(PROPERTIES_PREFIX).evaluateAttributeExpressions(input).getValue();
Document doc;
if (StringUtils.isEmpty(prefix)) {
doc = Document.parse("{}");
} else {
doc = new Document();
Map<String, String> attributes = input.getAttributes();
for (Map.Entry<String, String> entry : attributes.entrySet()) {
if (entry.getKey().startsWith(prefix)) {
String cleanPrefix = prefix.endsWith(".") ? prefix : String.format("%s.", prefix);
String cleanKey = entry.getKey().replace(cleanPrefix, "");
doc.append(cleanKey, entry.getValue());
return doc;