blob: 7dbde872caf73ea55e516619db5e48df5541c9cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document.mongo;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.mongodb.BasicDBObject;
import com.mongodb.ClientSessionOptions;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientException;
import com.mongodb.MongoCommandException;
import com.mongodb.MongoQueryException;
import com.mongodb.ReadConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.event.ServerHeartbeatFailedEvent;
import com.mongodb.event.ServerHeartbeatStartedEvent;
import com.mongodb.event.ServerHeartbeatSucceededEvent;
import com.mongodb.event.ServerMonitorListener;
import com.mongodb.session.ClientSession;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class MongoStatus implements ServerMonitorListener {
private static final Logger LOG = LoggerFactory.getLogger(MongoStatus.class);
private static final ImmutableSet<String> SERVER_DETAIL_FIELD_NAMES
= ImmutableSet.<String>builder()
.add("host", "process", "connections", "repl", "storageEngine", "mem")
.build();
private final MongoClient client;
private final String dbName;
private BasicDBObject serverStatus;
private BasicDBObject buildInfo;
private String version;
private Boolean majorityReadConcernSupported;
private Boolean majorityReadConcernEnabled;
private Boolean clientSessionSupported;
private final ReplicaSetStatus replicaSetStatus = new ReplicaSetStatus();
public MongoStatus(@NotNull MongoClient client,
@NotNull String dbName) {
this.client = client;
this.dbName = dbName;
}
public void checkVersion() {
if (!isVersion(2, 6)) {
String msg = "MongoDB version 2.6.0 or higher required. " +
"Currently connected to a MongoDB with version: " + version;
throw new RuntimeException(msg);
}
}
/**
* Check if the majority read concern is supported by this storage engine.
* The fact that read concern is supported doesn't it can be used - it also
* has to be enabled.
*
* @return true if the majority read concern is supported
*/
public boolean isMajorityReadConcernSupported() {
if (majorityReadConcernSupported == null) {
BasicDBObject stat = getServerStatus();
if (stat.isEmpty()) {
LOG.debug("User doesn't have privileges to get server status; falling back to the isMajorityReadConcernEnabled()");
return isMajorityReadConcernEnabled();
} else {
if (stat.containsField("storageEngine")) {
BasicDBObject storageEngine = (BasicDBObject) stat.get("storageEngine");
majorityReadConcernSupported = storageEngine.getBoolean("supportsCommittedReads");
} else {
majorityReadConcernSupported = false;
}
}
}
return majorityReadConcernSupported;
}
/**
* Check if the majority read concern is enabled and can be used for queries.
*
* @return true if the majority read concern is enabled
*/
public boolean isMajorityReadConcernEnabled() {
if (majorityReadConcernEnabled == null) {
// Mongo API doesn't seem to provide an option to check whether the
// majority read concern has been enabled, so we have to try to use
// it and optionally catch the exception.
MongoCollection<?> emptyCollection = client.getDatabase(dbName)
.getCollection("emptyCollection-" + System.currentTimeMillis());
try (MongoCursor cursor = emptyCollection
.withReadConcern(ReadConcern.MAJORITY)
.find(new BasicDBObject()).iterator()) {
cursor.hasNext();
majorityReadConcernEnabled = true;
} catch (MongoQueryException | IllegalArgumentException e) {
majorityReadConcernEnabled = false;
}
}
return majorityReadConcernEnabled;
}
@NotNull
public String getServerDetails() {
Map<String, Object> details = Maps.newHashMap();
for (String key : SERVER_DETAIL_FIELD_NAMES) {
Object value = getServerStatus().get(key);
if (value != null) {
details.put(key, value);
}
}
return details.toString();
}
@NotNull
public String getVersion() {
if (version == null) {
String v = getServerStatus().getString("version");
if (v == null) {
// OAK-4841: serverStatus was probably unauthorized,
// use buildInfo command to get version
v = getBuildInfo().getString("version");
}
version = v;
}
return version;
}
boolean isVersion(int requiredMajor, int requiredMinor) {
String v = getVersion();
Matcher m = Pattern.compile("^(\\d+)\\.(\\d+)\\..*").matcher(v);
if (!m.matches()) {
throw new IllegalArgumentException("Malformed MongoDB version: " + v);
}
int major = Integer.parseInt(m.group(1));
int minor = Integer.parseInt(m.group(2));
if (major > requiredMajor) {
return true;
} else if (major == requiredMajor) {
return minor >= requiredMinor;
} else {
return false;
}
}
/**
* @return {@code true} if client sessions are supported.
*/
boolean isClientSessionSupported() {
if (clientSessionSupported == null) {
// must be at least 3.6
if (isVersion(3, 6)) {
ClientSessionOptions options = ClientSessionOptions.builder()
.causallyConsistent(true).build();
try (ClientSession ignored = client.startSession(options)) {
clientSessionSupported = true;
} catch (MongoClientException e) {
clientSessionSupported = false;
}
} else {
clientSessionSupported = false;
}
}
return clientSessionSupported;
}
/**
* Returns an estimate of the replica-set lag in milliseconds. The returned
* value is not an accurate measurement of the replication lag and should
* only be used as a rough estimate to decide whether secondaries can be
* used for queries in general.
* <p>
* This method may return {@link ReplicaSetStatus#UNKNOWN_LAG} if the value
* is currently unknown.
*
* @return an estimate of the
*/
long getReplicaSetLagEstimate() {
return replicaSetStatus.getLagEstimate();
}
//------------------------< ServerMonitorListener >-------------------------
@Override
public void serverHearbeatStarted(ServerHeartbeatStartedEvent event) {
LOG.debug("serverHeartbeatStarted {}", event.getConnectionId());
replicaSetStatus.serverHearbeatStarted(event);
}
@Override
public void serverHeartbeatSucceeded(ServerHeartbeatSucceededEvent event) {
LOG.debug("serverHeartbeatSucceeded {}, {}", event.getConnectionId(), event.getReply());
replicaSetStatus.serverHeartbeatSucceeded(event);
}
@Override
public void serverHeartbeatFailed(ServerHeartbeatFailedEvent event) {
LOG.debug("serverHeartbeatFailed {} ({} ms)", event.getConnectionId(),
event.getElapsedTime(TimeUnit.MILLISECONDS),
event.getThrowable());
replicaSetStatus.serverHeartbeatFailed(event);
}
//-------------------------------< internal >-------------------------------
private BasicDBObject getServerStatus() {
if (serverStatus == null) {
try {
serverStatus = client.getDatabase(dbName).runCommand(
new BasicDBObject("serverStatus", 1), BasicDBObject.class);
} catch (MongoCommandException e) {
if (e.getErrorCode() == -1) {
// OAK-7485: workaround when running on
// MongoDB Atlas shared instances
serverStatus = new BasicDBObject();
} else if (e.getErrorCode() == 13) {
// "Unauthorized"
// User is not authorized to run the
// serverStatus command (OAK-8122).
serverStatus = new BasicDBObject();
} else {
throw e;
}
}
}
return serverStatus;
}
private BasicDBObject getBuildInfo() {
if (buildInfo == null) {
buildInfo = client.getDatabase(dbName).runCommand(
new BasicDBObject("buildInfo", 1), BasicDBObject.class);
}
return buildInfo;
}
// for testing purposes
void setVersion(String version) {
this.version = version;
}
void setServerStatus(BasicDBObject serverStatus) {
this.majorityReadConcernSupported = null;
this.serverStatus = serverStatus;
}
}