blob: 50686719fc281d39bf349c40c6392e69e8a2ca3f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.mongodb;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.bson.Document;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({"mongodb", "insert", "record", "put"})
@CapabilityDescription("This processor is a record-aware processor for inserting data into MongoDB. It uses a configured record reader and " +
"schema to read an incoming record set from the body of a flowfile and then inserts batches of those records into " +
"a configured MongoDB collection. This processor does not support updates, deletes or upserts. The number of documents to insert at a time is controlled " +
"by the \"Insert Batch Size\" configuration property. This value should be set to a reasonable size to ensure " +
"that MongoDB is not overloaded with too many inserts at once.")
public class PutMongoRecord extends AbstractMongoProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
static final PropertyDescriptor INSERT_COUNT = new PropertyDescriptor.Builder()
.displayName("Insert Batch Size")
.description("The number of records to group together for one single insert operation against MongoDB.")
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
final Set<Relationship> _relationships = new HashSet<>();
relationships = Collections.unmodifiableSet(_relationships);
public Set<Relationship> getRelationships() {
return relationships;
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
final WriteConcern writeConcern = getWriteConcern(context);
List<Document> inserts = new ArrayList<>();
int ceiling = context.getProperty(INSERT_COUNT).asInteger();
int added = 0;
boolean error = false;
try (final InputStream inStream =;
final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) {
final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);
RecordSchema schema = reader.getSchema();
Record record;
while ((record = reader.nextRecord()) != null) {
// Convert each Record to HashMap and put into the Mongo document
Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
Document document = new Document();
for (String name : schema.getFieldNames()) {
document.put(name, contentMap.get(name));
if (inserts.size() == ceiling) {
added += inserts.size();
inserts = new ArrayList<>();
if (inserts.size() > 0) {
} catch (SchemaNotFoundException | IOException | MalformedRecordException | MongoException e) {
getLogger().error("PutMongoRecord failed with error:", e);
session.transfer(flowFile, REL_FAILURE);
error = true;
} finally {
if (!error) {
String url = clientService != null
? clientService.getURI()
: context.getProperty(URI).evaluateAttributeExpressions().getValue();
session.getProvenanceReporter().send(flowFile, url, String.format("Added %d documents to MongoDB.", added));
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Inserted {} records into MongoDB", new Object[]{ added });
private Document convertArrays(Document doc) {
Document retVal = new Document();
for (Map.Entry<String, Object> entry : doc.entrySet()) {
if (entry.getValue() != null && entry.getValue().getClass().isArray()) {
retVal.put(entry.getKey(), convertArrays((Object[])entry.getValue()));
} else if (entry.getValue() != null && (entry.getValue() instanceof Map || entry.getValue() instanceof Document)) {
retVal.put(entry.getKey(), convertArrays(new Document((Map)entry.getValue())));
} else {
retVal.put(entry.getKey(), entry.getValue());
return retVal;
private List convertArrays(Object[] input) {
List retVal = new ArrayList();
for (Object o : input) {
if (o != null && o.getClass().isArray()) {
} else if (o instanceof Map) {
retVal.add(convertArrays(new Document((Map)o)));
} else {
return retVal;