blob: c0ec6e7bd5cf8cf603498b31833d7566bbc57f51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.nosql.mongodb;
import java.net.UnknownHostException;
import java.util.concurrent.Callable;
import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
import org.apache.brooklyn.util.repeat.Repeater;
import org.apache.brooklyn.util.time.Duration;
import org.bson.BSONObject;
import org.bson.BasicBSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.net.HostAndPort;
import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoCredential;
import com.mongodb.MongoException;
import com.mongodb.ServerAddress;
/**
* Manages connections to standalone MongoDB servers.
*
* @see <a href="http://docs.mongodb.org/manual/reference/command/">MongoDB database command documentation</a>
*/
public class MongoDBClientSupport {
private static final Logger LOG = LoggerFactory.getLogger(MongoDBClientSupport.class);
private ServerAddress address;
private boolean usesAuthentication;
private String username;
private String password;
private String authenticationDatabase;
private MongoClient client() {
return baseClient(connectionOptions);
}
private MongoClient fastClient() {
MongoClientOptions fastConnectionOptions = MongoClientOptions.builder()
.connectTimeout(1000 * 10)
.maxWaitTime(1000 * 10)
.serverSelectionTimeout(1000 * 10)
.build();
return baseClient(fastConnectionOptions);
}
private MongoClient baseClient(MongoClientOptions connectionOptions) {
if (usesAuthentication) {
MongoCredential credential = MongoCredential.createMongoCRCredential(username, authenticationDatabase, password.toCharArray());
return new MongoClient(address, ImmutableList.of(credential), connectionOptions);
} else {
return new MongoClient(address, connectionOptions);
}
}
// Set client to automatically reconnect to servers.
private static final MongoClientOptions connectionOptions = MongoClientOptions.builder()
.socketKeepAlive(true)
.build();
private static final BasicBSONObject EMPTY_RESPONSE = new BasicBSONObject();
public MongoDBClientSupport(ServerAddress standalone) {
address = standalone;
usesAuthentication = false;
}
public MongoDBClientSupport(ServerAddress standalone, String username, String password, String authenticationDatabase) {
// We could also use a MongoClient to access an entire replica set. See MongoClient(List<ServerAddress>).
address = standalone;
this.usesAuthentication = true;
this.username = username;
this.password = password;
this.authenticationDatabase = authenticationDatabase;
}
/**
* Creates a {@link MongoDBClientSupport} instance in standalone mode.
*/
public static MongoDBClientSupport forServer(AbstractMongoDBServer standalone) throws UnknownHostException {
HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(standalone, standalone.getAttribute(MongoDBServer.PORT));
ServerAddress address = new ServerAddress(hostAndPort.getHostText(), hostAndPort.getPort());
if (MongoDBAuthenticationUtils.usesAuthentication(standalone)) {
return new MongoDBClientSupport(address, standalone.sensors().get(MongoDBAuthenticationMixins.ROOT_USERNAME),
standalone.sensors().get(MongoDBAuthenticationMixins.ROOT_PASSWORD), standalone.sensors().get(MongoDBAuthenticationMixins.AUTHENTICATION_DATABASE));
} else {
return new MongoDBClientSupport(address);
}
}
private ServerAddress getServerAddress() {
MongoClient client = client();
try {
return client.getServerAddressList().get(0);
} finally {
client.close();
}
}
private HostAndPort getServerHostAndPort() {
ServerAddress address = getServerAddress();
return HostAndPort.fromParts(address.getHost(), address.getPort());
}
public Optional<CommandResult> runDBCommand(String database, String command) {
return runDBCommand(database, new BasicDBObject(command, Boolean.TRUE));
}
private Optional<CommandResult> runDBCommand(String database, final DBObject command) {
MongoClient client = client();
try {
final DB db = client.getDB(database);
final CommandResult[] status = new CommandResult[1];
// The mongoDB client can occasionally fail to connect. Try up to 5 times to run the command
boolean commandResult = Repeater.create().backoff(Duration.ONE_SECOND, 1.5, null).limitIterationsTo(5)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
try {
status[0] = db.command(command);
return true;
} catch (Exception e) {
LOG.warn("Command " + command + " on " + address.getHost() + " failed", e);
return false;
}
}
}).run();
if (!commandResult) {
return Optional.absent();
}
if (!status[0].ok()) {
LOG.debug("Unexpected result of {} on {}: {}",
new Object[] { command, getServerAddress(), status[0].getErrorMessage() });
}
return Optional.of(status[0]);
} finally {
client.close();
}
}
public long getShardCount() {
MongoClient client = client();
try {
return client.getDB("config").getCollection("shards").getCount();
} finally {
client.close();
}
}
public BasicBSONObject getServerStatus() {
Optional<CommandResult> result = runDBCommand("admin", "serverStatus");
if (result.isPresent() && result.get().ok()) {
return result.get();
} else {
return EMPTY_RESPONSE;
}
}
public boolean ping() {
MongoClient client = fastClient();
DBObject command = new BasicDBObject("ping", "1");
final DB db = client.getDB("admin");
try {
CommandResult status = db.command(command);
return status.ok();
} catch (MongoException e) {
LOG.warn("Pinging server {} failed with {}", address.getHost(), e);
} finally {
client.close();
}
return false;
}
public boolean initializeReplicaSet(String replicaSetName, Integer id) {
HostAndPort primary = getServerHostAndPort();
BasicBSONObject config = ReplicaSetConfig.builder(replicaSetName)
.member(primary, id)
.build();
BasicDBObject dbObject = new BasicDBObject("replSetInitiate", config);
LOG.debug("Initiating replica set with: " + dbObject);
Optional<CommandResult> result = runDBCommand("admin", dbObject);
if (result.isPresent() && result.get().ok() && LOG.isDebugEnabled()) {
LOG.debug("Completed initiating MongoDB replica set {} on entity {}", replicaSetName, this);
}
return result.isPresent() && result.get().ok();
}
/**
* Java equivalent of calling rs.conf() in the console.
*/
private BSONObject getReplicaSetConfig() {
MongoClient client = client();
try {
return client.getDB("local").getCollection("system.replset").findOne();
} catch (MongoException e) {
LOG.error("Failed to get replica set config on "+client, e);
return null;
} finally {
client.close();
}
}
/**
* Runs <code>replSetGetStatus</code> on the admin database.
*
* @return The result of <code>replSetGetStatus</code>, or
* an empty {@link BasicBSONObject} if the command threw an exception (e.g. if
* the connection was reset) or if the resultant {@link CommandResult#ok} was false.
*
* @see <a href="http://docs.mongodb.org/manual/reference/replica-status/">Replica set status reference</a>
* @see <a href="http://docs.mongodb.org/manual/reference/command/replSetGetStatus/">replSetGetStatus documentation</a>
*/
public BasicBSONObject getReplicaSetStatus() {
Optional<CommandResult> result = runDBCommand("admin", "replSetGetStatus");
if (result.isPresent() && result.get().ok()) {
return result.get();
} else {
return EMPTY_RESPONSE;
}
}
/**
* Reconfigures the replica set that this client is the primary member of to include a new member.
* <p/>
* Note that this can cause long downtime (typically 10-20s, even up to a minute).
*
* @param secondary New member of the set.
* @param id The id for the new set member. Must be unique within the set.
* @return True if successful
*/
public boolean addMemberToReplicaSet(MongoDBServer secondary, Integer id) {
// We need to:
// - get the existing configuration
// - update its version
// - add the new member to its list of members
// - run replSetReconfig with the new configuration.
BSONObject existingConfig = getReplicaSetConfig();
if (existingConfig == null) {
LOG.warn("Couldn't load existing config for replica set from {}. Server {} not added.",
getServerAddress(), secondary);
return false;
}
BasicBSONObject newConfig = ReplicaSetConfig.fromExistingConfig(existingConfig)
.primary(getServerHostAndPort())
.member(secondary, id)
.build();
return reconfigureReplicaSet(newConfig);
}
/**
* Reconfigures the replica set that this client is the primary member of to
* remove the given server.
* @param server The server to remove
* @return True if successful
*/
public boolean removeMemberFromReplicaSet(MongoDBServer server) {
BSONObject existingConfig = getReplicaSetConfig();
if (existingConfig == null) {
LOG.warn("Couldn't load existing config for replica set from {}. Server {} not removed.",
getServerAddress(), server);
return false;
}
BasicBSONObject newConfig = ReplicaSetConfig.fromExistingConfig(existingConfig)
.primary(getServerHostAndPort())
.remove(server)
.build();
return reconfigureReplicaSet(newConfig);
}
/**
* Runs replSetReconfig with the given BasicBSONObject. Returns true if the result's
* status is ok.
*/
private boolean reconfigureReplicaSet(BasicBSONObject newConfig) {
BasicDBObject command = new BasicDBObject("replSetReconfig", newConfig);
LOG.debug("Reconfiguring replica set to: " + command);
Optional<CommandResult> result = runDBCommand("admin", command);
return result.isPresent() && result.get().ok();
}
public boolean addShardToRouter(String hostAndPort) {
LOG.debug("Adding shard " + hostAndPort);
BasicDBObject command = new BasicDBObject("addShard", hostAndPort);
Optional<CommandResult> result = runDBCommand("admin", command);
return result.isPresent() && result.get().ok();
}
}