blob: 9e2a6d4ef57823cb8c4020ac79348c76d8cb1e70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb.gridfs;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import org.bson.Document;
import org.bson.types.ObjectId;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"mongo", "gridfs", "put", "file", "store"})
@CapabilityDescription("Writes a file to a GridFS bucket.")
public class PutGridFS extends AbstractGridFSProcessor {
static final PropertyDescriptor PROPERTIES_PREFIX = new PropertyDescriptor.Builder()
.name("putgridfs-properties-prefix")
.displayName("File Properties Prefix")
.description("Attributes that have this prefix will be added to the file stored in GridFS as metadata.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(Validator.VALID)
.build();
static final AllowableValue NO_UNIQUE = new AllowableValue("none", "None", "No uniqueness will be enforced.");
static final AllowableValue UNIQUE_NAME = new AllowableValue("name", "Name", "Only the filename must " +
"be unique.");
static final AllowableValue UNIQUE_HASH = new AllowableValue("hash", "Hash", "Only the file hash must be " +
"unique.");
static final AllowableValue UNIQUE_BOTH = new AllowableValue("both", "Both", "Both the filename and hash " +
"must be unique.");
static final PropertyDescriptor ENFORCE_UNIQUENESS = new PropertyDescriptor.Builder()
.name("putgridfs-enforce-uniqueness")
.displayName("Enforce Uniqueness")
.description("When enabled, this option will ensure that uniqueness is enforced on the bucket. It will do so by creating a MongoDB index " +
"that matches your selection. It should ideally be configured once when the bucket is created for the first time because " +
"it could take a long time to build on an existing bucket wit a lot of data.")
.allowableValues(NO_UNIQUE, UNIQUE_BOTH, UNIQUE_NAME, UNIQUE_HASH)
.defaultValue(NO_UNIQUE.getValue())
.required(true)
.build();
static final PropertyDescriptor HASH_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("putgridfs-hash-attribute")
.displayName("Hash Attribute")
.description("If uniquness enforcement is enabled and the file hash is part of the constraint, this must be set to an attribute that " +
"exists on all incoming flowfiles.")
.defaultValue("hash.value")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
.name("putgridfs-chunk-size")
.displayName("Chunk Size")
.description("Controls the maximum size of each chunk of a file uploaded into GridFS.")
.defaultValue("256 KB")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
.name("gridfs-file-name")
.displayName("File Name")
.description("The name of the file in the bucket that is the target of this processor. GridFS file names do not " +
"include path information because GridFS does not sort files into folders within a bucket.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final Relationship REL_DUPLICATE = new Relationship.Builder()
.name("duplicate")
.description("Flowfiles that fail the duplicate check are sent to this relationship.")
.build();
static final String ID_ATTRIBUTE = "gridfs.id";
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIP_SET;
static {
List _temp = new ArrayList<>();
_temp.addAll(PARENT_PROPERTIES);
_temp.add(FILE_NAME);
_temp.add(PROPERTIES_PREFIX);
_temp.add(ENFORCE_UNIQUENESS);
_temp.add(HASH_ATTRIBUTE);
_temp.add(CHUNK_SIZE);
DESCRIPTORS = Collections.unmodifiableList(_temp);
Set _rels = new HashSet();
_rels.addAll(PARENT_RELATIONSHIPS);
_rels.add(REL_DUPLICATE);
RELATIONSHIP_SET = Collections.unmodifiableSet(_rels);
}
private String uniqueness;
private String hashAttribute;
@OnScheduled
public void onScheduled(ProcessContext context) {
this.uniqueness = context.getProperty(ENFORCE_UNIQUENESS).getValue();
this.hashAttribute = context.getProperty(HASH_ATTRIBUTE).evaluateAttributeExpressions().getValue();
this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIP_SET;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
GridFSBucket bucket = getBucket(input, context);
if (!canUploadFile(context, input, bucket.getBucketName())) {
getLogger().error("Cannot upload the file because of the uniqueness policy configured.");
session.transfer(input, REL_DUPLICATE);
return;
}
final int chunkSize = context.getProperty(CHUNK_SIZE).evaluateAttributeExpressions(input).asDataSize(DataUnit.B).intValue();
try (InputStream fileInput = session.read(input)) {
String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
GridFSUploadOptions options = new GridFSUploadOptions()
.chunkSizeBytes(chunkSize)
.metadata(getMetadata(input, context));
ObjectId id = bucket.uploadFromStream(fileName, fileInput, options);
fileInput.close();
if (id != null) {
input = session.putAttribute(input, ID_ATTRIBUTE, id.toString());
session.transfer(input, REL_SUCCESS);
session.getProvenanceReporter().send(input, getTransitUri(id, input, context));
} else {
getLogger().error("ID was null, assuming failure.");
session.transfer(input, REL_FAILURE);
}
} catch (Exception ex) {
getLogger().error("Failed to upload file", ex);
session.transfer(input, REL_FAILURE);
}
}
private boolean canUploadFile(ProcessContext context, FlowFile input, String bucketName) {
boolean retVal;
if (uniqueness.equals(NO_UNIQUE.getValue())) {
retVal = true;
} else {
final String fileName = input.getAttribute(CoreAttributes.FILENAME.key());
final String fileColl = String.format("%s.files", bucketName);
final String hash = input.getAttribute(hashAttribute);
if ((uniqueness.equals(UNIQUE_BOTH.getValue()) || uniqueness.equals(UNIQUE_HASH.getValue())) && StringUtils.isEmpty(hash)) {
throw new RuntimeException(String.format("Uniqueness mode %s was set and the hash attribute %s was not found.", uniqueness, hashAttribute));
}
Document query;
if (uniqueness.equals(UNIQUE_BOTH.getValue())) {
query = new Document().append("filename", fileName).append("md5", hash);
} else if (uniqueness.equals(UNIQUE_HASH.getValue())) {
query = new Document().append("md5", hash);
} else {
query = new Document().append("filename", fileName);
}
retVal = getDatabase(input, context).getCollection(fileColl).countDocuments(query) == 0;
}
return retVal;
}
private Document getMetadata(FlowFile input, ProcessContext context) {
final String prefix = context.getProperty(PROPERTIES_PREFIX).evaluateAttributeExpressions(input).getValue();
Document doc;
if (StringUtils.isEmpty(prefix)) {
doc = Document.parse("{}");
} else {
doc = new Document();
Map<String, String> attributes = input.getAttributes();
for (Map.Entry<String, String> entry : attributes.entrySet()) {
if (entry.getKey().startsWith(prefix)) {
String cleanPrefix = prefix.endsWith(".") ? prefix : String.format("%s.", prefix);
String cleanKey = entry.getKey().replace(cleanPrefix, "");
doc.append(cleanKey, entry.getValue());
}
}
}
return doc;
}
}