blob: 22cf3084348c6905d23b7e131f9d3c4acbf577d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.mongodb;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientURI;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoDatabase;
import java.util.ArrayList;
import java.util.List;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.ssl.SSLContextService;
@Tags({"mongo", "mongodb", "service"})
@CapabilityDescription(
"Provides a controller service that configures a connection to MongoDB and provides access to that connection to " +
"other Mongo-related components."
)
public class MongoDBControllerService extends AbstractControllerService implements MongoDBClientService {
private String uri;
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.uri = getURI(context);
this.createClient(context);
}
static List<PropertyDescriptor> descriptors = new ArrayList<>();
static {
descriptors.add(URI);
descriptors.add(DB_USER);
descriptors.add(DB_PASSWORD);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(CLIENT_AUTH);
}
protected MongoClient mongoClient;
// TODO: Remove duplicate code by refactoring shared method to accept PropertyContext
protected final void createClient(ConfigurationContext context) {
if (mongoClient != null) {
closeClient();
}
getLogger().info("Creating MongoClient");
// Set up the client for secure (SSL/TLS communications) if configured to do so
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext;
if (sslService == null) {
sslContext = null;
} else {
sslContext = sslService.createContext();
}
try {
if(sslContext == null) {
mongoClient = new MongoClient(new MongoClientURI(getURI(context)));
} else {
mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
}
} catch (Exception e) {
getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
throw e;
}
}
protected MongoClientOptions.Builder getClientOptions(final SSLContext sslContext) {
MongoClientOptions.Builder builder = MongoClientOptions.builder();
builder.sslEnabled(true);
builder.sslContext(sslContext);
return builder;
}
@OnStopped
public final void closeClient() {
if (mongoClient != null) {
mongoClient.close();
mongoClient = null;
}
}
protected String getURI(final ConfigurationContext context) {
final String uri = context.getProperty(URI).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
if (!uri.contains("@") && user != null && passw != null) {
try {
return uri.replaceFirst("://", "://" + URLEncoder.encode(user, StandardCharsets.UTF_8.toString()) + ":" + URLEncoder.encode(passw, StandardCharsets.UTF_8.toString()) + "@");
} catch (final UnsupportedEncodingException e) {
getLogger().warn("Failed to URL encode username and/or password. Using original URI.");
return uri;
}
} else {
return uri;
}
}
@Override
public WriteConcern getWriteConcern(final ConfigurationContext context) {
final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue();
WriteConcern writeConcern = null;
switch (writeConcernProperty) {
case WRITE_CONCERN_ACKNOWLEDGED:
writeConcern = WriteConcern.ACKNOWLEDGED;
break;
case WRITE_CONCERN_UNACKNOWLEDGED:
writeConcern = WriteConcern.UNACKNOWLEDGED;
break;
case WRITE_CONCERN_FSYNCED:
writeConcern = WriteConcern.JOURNALED;
getLogger().warn("Using deprecated write concern FSYNCED");
break;
case WRITE_CONCERN_JOURNALED:
writeConcern = WriteConcern.JOURNALED;
break;
case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
writeConcern = WriteConcern.W2;
getLogger().warn("Using deprecated write concern REPLICA_ACKNOWLEDGED");
break;
case WRITE_CONCERN_MAJORITY:
writeConcern = WriteConcern.MAJORITY;
break;
case WRITE_CONCERN_W1:
writeConcern = WriteConcern.W1;
break;
case WRITE_CONCERN_W2:
writeConcern = WriteConcern.W2;
break;
case WRITE_CONCERN_W3:
writeConcern = WriteConcern.W3;
break;
default:
writeConcern = WriteConcern.ACKNOWLEDGED;
}
return writeConcern;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnDisabled
public void onDisable() {
this.mongoClient.close();
}
@Override
public MongoDatabase getDatabase(String name) {
return mongoClient.getDatabase(name);
}
@Override
public String getURI() {
return uri;
}
}