| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.mongodb; |
| |
| import com.mongodb.MongoClient; |
| import com.mongodb.MongoClientOptions; |
| import com.mongodb.MongoClientURI; |
| import com.mongodb.WriteConcern; |
| import com.mongodb.client.MongoDatabase; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.io.UnsupportedEncodingException; |
| import java.net.URLEncoder; |
| import java.nio.charset.StandardCharsets; |
| import javax.net.ssl.SSLContext; |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.documentation.Tags; |
| import org.apache.nifi.annotation.lifecycle.OnDisabled; |
| import org.apache.nifi.annotation.lifecycle.OnEnabled; |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.controller.AbstractControllerService; |
| import org.apache.nifi.controller.ConfigurationContext; |
| import org.apache.nifi.ssl.SSLContextService; |
| |
| @Tags({"mongo", "mongodb", "service"}) |
| @CapabilityDescription( |
| "Provides a controller service that configures a connection to MongoDB and provides access to that connection to " + |
| "other Mongo-related components." |
| ) |
| public class MongoDBControllerService extends AbstractControllerService implements MongoDBClientService { |
| private String uri; |
| |
| @OnEnabled |
| public void onEnabled(final ConfigurationContext context) { |
| this.uri = getURI(context); |
| this.createClient(context); |
| } |
| |
| static List<PropertyDescriptor> descriptors = new ArrayList<>(); |
| |
| static { |
| descriptors.add(URI); |
| descriptors.add(DB_USER); |
| descriptors.add(DB_PASSWORD); |
| descriptors.add(SSL_CONTEXT_SERVICE); |
| descriptors.add(CLIENT_AUTH); |
| } |
| |
| protected MongoClient mongoClient; |
| |
| // TODO: Remove duplicate code by refactoring shared method to accept PropertyContext |
| protected final void createClient(ConfigurationContext context) { |
| if (mongoClient != null) { |
| closeClient(); |
| } |
| |
| getLogger().info("Creating MongoClient"); |
| |
| // Set up the client for secure (SSL/TLS communications) if configured to do so |
| final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); |
| final SSLContext sslContext; |
| |
| if (sslService == null) { |
| sslContext = null; |
| } else { |
| sslContext = sslService.createContext(); |
| } |
| |
| try { |
| if(sslContext == null) { |
| mongoClient = new MongoClient(new MongoClientURI(getURI(context))); |
| } else { |
| mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext))); |
| } |
| } catch (Exception e) { |
| getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e); |
| throw e; |
| } |
| } |
| |
| protected MongoClientOptions.Builder getClientOptions(final SSLContext sslContext) { |
| MongoClientOptions.Builder builder = MongoClientOptions.builder(); |
| builder.sslEnabled(true); |
| builder.sslContext(sslContext); |
| return builder; |
| } |
| |
| @OnStopped |
| public final void closeClient() { |
| if (mongoClient != null) { |
| mongoClient.close(); |
| mongoClient = null; |
| } |
| } |
| |
| protected String getURI(final ConfigurationContext context) { |
| final String uri = context.getProperty(URI).evaluateAttributeExpressions().getValue(); |
| final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); |
| final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); |
| if (!uri.contains("@") && user != null && passw != null) { |
| try { |
| return uri.replaceFirst("://", "://" + URLEncoder.encode(user, StandardCharsets.UTF_8.toString()) + ":" + URLEncoder.encode(passw, StandardCharsets.UTF_8.toString()) + "@"); |
| } catch (final UnsupportedEncodingException e) { |
| getLogger().warn("Failed to URL encode username and/or password. Using original URI."); |
| return uri; |
| } |
| } else { |
| return uri; |
| } |
| } |
| |
| @Override |
| public WriteConcern getWriteConcern(final ConfigurationContext context) { |
| final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); |
| WriteConcern writeConcern = null; |
| switch (writeConcernProperty) { |
| case WRITE_CONCERN_ACKNOWLEDGED: |
| writeConcern = WriteConcern.ACKNOWLEDGED; |
| break; |
| case WRITE_CONCERN_UNACKNOWLEDGED: |
| writeConcern = WriteConcern.UNACKNOWLEDGED; |
| break; |
| case WRITE_CONCERN_FSYNCED: |
| writeConcern = WriteConcern.JOURNALED; |
| getLogger().warn("Using deprecated write concern FSYNCED"); |
| break; |
| case WRITE_CONCERN_JOURNALED: |
| writeConcern = WriteConcern.JOURNALED; |
| break; |
| case WRITE_CONCERN_REPLICA_ACKNOWLEDGED: |
| writeConcern = WriteConcern.W2; |
| getLogger().warn("Using deprecated write concern REPLICA_ACKNOWLEDGED"); |
| break; |
| case WRITE_CONCERN_MAJORITY: |
| writeConcern = WriteConcern.MAJORITY; |
| break; |
| case WRITE_CONCERN_W1: |
| writeConcern = WriteConcern.W1; |
| break; |
| case WRITE_CONCERN_W2: |
| writeConcern = WriteConcern.W2; |
| break; |
| case WRITE_CONCERN_W3: |
| writeConcern = WriteConcern.W3; |
| break; |
| default: |
| writeConcern = WriteConcern.ACKNOWLEDGED; |
| } |
| return writeConcern; |
| } |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return descriptors; |
| } |
| |
| @OnDisabled |
| public void onDisable() { |
| this.mongoClient.close(); |
| } |
| |
| |
| @Override |
| public MongoDatabase getDatabase(String name) { |
| return mongoClient.getDatabase(name); |
| } |
| |
| @Override |
| public String getURI() { |
| return uri; |
| } |
| } |