| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.nifi.processors.mongodb; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.mongodb.MongoClient; |
| import com.mongodb.MongoClientOptions; |
| import com.mongodb.MongoClientOptions.Builder; |
| import com.mongodb.MongoClientURI; |
| import com.mongodb.WriteConcern; |
| import com.mongodb.client.MongoCollection; |
| import com.mongodb.client.MongoDatabase; |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.text.DateFormat; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Map; |
| import javax.net.ssl.SSLContext; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.components.AllowableValue; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.expression.ExpressionLanguageScope; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.mongodb.MongoDBClientService; |
| import org.apache.nifi.processor.AbstractProcessor; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.apache.nifi.security.util.ClientAuth; |
| import org.apache.nifi.ssl.SSLContextService; |
| import org.bson.Document; |
| |
| public abstract class AbstractMongoProcessor extends AbstractProcessor { |
| static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED"; |
| static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED"; |
| static final String WRITE_CONCERN_FSYNCED = "FSYNCED"; |
| static final String WRITE_CONCERN_JOURNALED = "JOURNALED"; |
| static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED"; |
| static final String WRITE_CONCERN_MAJORITY = "MAJORITY"; |
| static final String WRITE_CONCERN_W1 = "W1"; |
| static final String WRITE_CONCERN_W2 = "W2"; |
| static final String WRITE_CONCERN_W3 = "W3"; |
| |
| protected static final String JSON_TYPE_EXTENDED = "Extended"; |
| protected static final String JSON_TYPE_STANDARD = "Standard"; |
| protected static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON", |
| "Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver"); |
| protected static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON", |
| "Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions."); |
| |
| static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() |
| .name("mongo-client-service") |
| .displayName("Client Service") |
| .description("If configured, this property will use the assigned client service for connection pooling.") |
| .required(false) |
| .identifiesControllerService(MongoDBClientService.class) |
| .build(); |
| |
| static final PropertyDescriptor URI = new PropertyDescriptor.Builder() |
| .name("Mongo URI") |
| .displayName("Mongo URI") |
| .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") |
| .required(false) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| |
| static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() |
| .name("Mongo Database Name") |
| .displayName("Mongo Database Name") |
| .description("The name of the database to use") |
| .required(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| |
| static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() |
| .name("Mongo Collection Name") |
| .description("The name of the collection to use") |
| .required(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| |
| protected static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder() |
| .allowableValues(JSON_EXTENDED, JSON_STANDARD) |
| .defaultValue(JSON_TYPE_EXTENDED) |
| .displayName("JSON Type") |
| .name("json-type") |
| .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + |
| " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + |
| " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") |
| .expressionLanguageSupported(ExpressionLanguageScope.NONE) |
| .required(true) |
| .build(); |
| |
| public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() |
| .name("ssl-context-service") |
| .displayName("SSL Context Service") |
| .description("The SSL Context Service used to provide client certificate information for TLS/SSL " |
| + "connections.") |
| .required(false) |
| .identifiesControllerService(SSLContextService.class) |
| .build(); |
| |
| public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() |
| .name("ssl-client-auth") |
| .displayName("Client Auth") |
| .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. " |
| + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context " |
| + "has been defined and enabled.") |
| .required(false) |
| .allowableValues(ClientAuth.values()) |
| .defaultValue("REQUIRED") |
| .build(); |
| |
| public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() |
| .name("Write Concern") |
| .displayName("Write Concern") |
| .description("The write concern to use") |
| .required(true) |
| .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, |
| WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY, WRITE_CONCERN_W1, WRITE_CONCERN_W2, WRITE_CONCERN_W3) |
| .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) |
| .build(); |
| |
| static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() |
| .name("results-per-flowfile") |
| .displayName("Results Per FlowFile") |
| .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") |
| .required(false) |
| .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) |
| .defaultValue("1") |
| .build(); |
| |
| static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() |
| .name("Batch Size") |
| .displayName("Batch Size") |
| .description("The number of elements returned from the server in one batch.") |
| .required(false) |
| .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) |
| .defaultValue("100") |
| .build(); |
| |
| static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder() |
| .name("mongo-query-attribute") |
| .displayName("Query Output Attribute") |
| .description("If set, the query will be written to a specified attribute on the output flowfiles.") |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) |
| .required(false) |
| .build(); |
| static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() |
| .name("mongo-charset") |
| .displayName("Character Set") |
| .description("Specifies the character set of the document data.") |
| .required(true) |
| .defaultValue("UTF-8") |
| .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .build(); |
| |
| static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder() |
| .name("mongo-date-format") |
| .displayName("Date Format") |
| .description("The date format string to use for formatting Date fields that are returned from Mongo. It is only " + |
| "applied when the JSON output format is set to Standard JSON. Full documentation for format characters can be " + |
| "found here: https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html") |
| .defaultValue("yyyy-MM-dd'T'HH:mm:ss'Z'") |
| .addValidator((subject, input, context) -> { |
| ValidationResult.Builder result = new ValidationResult.Builder() |
| .subject(subject) |
| .input(input); |
| try { |
| new SimpleDateFormat(input).format(new Date()); |
| result.valid(true); |
| } catch (Exception ex) { |
| result.valid(false) |
| .explanation(ex.getMessage()); |
| } |
| |
| return result.build(); |
| }) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .build(); |
| |
| static final List<PropertyDescriptor> descriptors; |
| |
| static { |
| List<PropertyDescriptor> _temp = new ArrayList<>(); |
| _temp.add(CLIENT_SERVICE); |
| _temp.add(URI); |
| _temp.add(DATABASE_NAME); |
| _temp.add(COLLECTION_NAME); |
| _temp.add(SSL_CONTEXT_SERVICE); |
| _temp.add(CLIENT_AUTH); |
| descriptors = Collections.unmodifiableList(_temp); |
| } |
| |
| protected ObjectMapper objectMapper; |
| protected MongoClient mongoClient; |
| protected MongoDBClientService clientService; |
| |
| @OnScheduled |
| public final void createClient(ProcessContext context) throws IOException { |
| if (context.getProperty(CLIENT_SERVICE).isSet()) { |
| clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class); |
| return; |
| } |
| |
| if (mongoClient != null) { |
| closeClient(); |
| } |
| |
| getLogger().info("Creating MongoClient"); |
| |
| // Set up the client for secure (SSL/TLS communications) if configured to do so |
| final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); |
| final SSLContext sslContext; |
| |
| if (sslService != null) { |
| sslContext = sslService.createContext(); |
| } else { |
| sslContext = null; |
| } |
| |
| try { |
| if(sslContext == null) { |
| mongoClient = new MongoClient(new MongoClientURI(getURI(context))); |
| } else { |
| mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext))); |
| } |
| } catch (Exception e) { |
| getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e); |
| throw e; |
| } |
| } |
| |
| protected Builder getClientOptions(final SSLContext sslContext) { |
| MongoClientOptions.Builder builder = MongoClientOptions.builder(); |
| builder.sslEnabled(true); |
| builder.sslContext(sslContext); |
| return builder; |
| } |
| |
| @OnStopped |
| public final void closeClient() { |
| if (mongoClient != null) { |
| getLogger().info("Closing MongoClient"); |
| mongoClient.close(); |
| mongoClient = null; |
| } |
| } |
| |
| protected MongoDatabase getDatabase(final ProcessContext context, final FlowFile flowFile) { |
| final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue(); |
| |
| return clientService!= null ? clientService.getDatabase(databaseName) : mongoClient.getDatabase(databaseName); |
| } |
| |
| protected MongoCollection<Document> getCollection(final ProcessContext context, final FlowFile flowFile) { |
| final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue(); |
| if (StringUtils.isEmpty(collectionName)) { |
| throw new ProcessException("Collection name was empty after expression language evaluation."); |
| } |
| return getDatabase(context, flowFile).getCollection(collectionName); |
| } |
| |
| protected String getURI(final ProcessContext context) { |
| if (clientService != null) { |
| return clientService.getURI(); |
| } else { |
| return context.getProperty(URI).evaluateAttributeExpressions().getValue(); |
| } |
| } |
| |
| protected WriteConcern getWriteConcern(final ProcessContext context) { |
| final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); |
| WriteConcern writeConcern = null; |
| switch (writeConcernProperty) { |
| case WRITE_CONCERN_ACKNOWLEDGED: |
| writeConcern = WriteConcern.ACKNOWLEDGED; |
| break; |
| case WRITE_CONCERN_UNACKNOWLEDGED: |
| writeConcern = WriteConcern.UNACKNOWLEDGED; |
| break; |
| case WRITE_CONCERN_FSYNCED: |
| writeConcern = WriteConcern.JOURNALED; |
| getLogger().warn("Using deprecated write concern FSYNCED"); |
| break; |
| case WRITE_CONCERN_JOURNALED: |
| writeConcern = WriteConcern.JOURNALED; |
| break; |
| case WRITE_CONCERN_REPLICA_ACKNOWLEDGED: |
| writeConcern = WriteConcern.W2; |
| getLogger().warn("Using deprecated write concern REPLICA_ACKNOWLEDGED"); |
| break; |
| case WRITE_CONCERN_MAJORITY: |
| writeConcern = WriteConcern.MAJORITY; |
| break; |
| case WRITE_CONCERN_W1: |
| writeConcern = WriteConcern.W1; |
| break; |
| case WRITE_CONCERN_W2: |
| writeConcern = WriteConcern.W2; |
| break; |
| case WRITE_CONCERN_W3: |
| writeConcern = WriteConcern.W3; |
| break; |
| default: |
| writeConcern = WriteConcern.ACKNOWLEDGED; |
| } |
| return writeConcern; |
| } |
| |
| protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, |
| Map<String, String> extraAttributes, Relationship rel) throws UnsupportedEncodingException { |
| String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue(); |
| |
| FlowFile flowFile = parent != null ? session.create(parent) : session.create(); |
| flowFile = session.importFrom(new ByteArrayInputStream(payload.getBytes(charset)), flowFile); |
| flowFile = session.putAllAttributes(flowFile, extraAttributes); |
| if (parent == null) { |
| session.getProvenanceReporter().receive(flowFile, getURI(context)); |
| } |
| session.transfer(flowFile, rel); |
| } |
| |
| protected synchronized void configureMapper(String setting, String dateFormat) { |
| objectMapper = new ObjectMapper(); |
| |
| if (setting.equals(JSON_TYPE_STANDARD)) { |
| objectMapper.registerModule(ObjectIdSerializer.getModule()); |
| DateFormat df = new SimpleDateFormat(dateFormat); |
| objectMapper.setDateFormat(df); |
| } |
| } |
| |
| @Override |
| protected Collection<ValidationResult> customValidate(ValidationContext context) { |
| List<ValidationResult> retVal = new ArrayList<>(); |
| |
| boolean clientIsSet = context.getProperty(CLIENT_SERVICE).isSet(); |
| boolean uriIsSet = context.getProperty(URI).isSet(); |
| |
| if (clientIsSet && uriIsSet) { |
| String msg = "The client service and URI fields cannot be set at the same time."; |
| retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build()); |
| } else if (!clientIsSet && !uriIsSet) { |
| String msg = "The client service or the URI field must be set."; |
| retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build()); |
| } |
| |
| return retVal; |
| } |
| } |