blob: 41a45911b0439f7ecba981d7ce257b086a746317 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document.mongo;
import java.util.Collection;
import java.util.Date;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.mongodb.ServerAddress;
import com.mongodb.event.ServerHeartbeatSucceededEvent;
import com.mongodb.event.ServerMonitorListenerAdapter;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Keeps track of the status of a replica set based on information provided
* by heartbeat events. This status provides a replica set lag estimate, which
* can be used to decide whether secondaries are sufficiently up-to-date and
* read operations can be sent to a secondary. This is particularly useful when
* causal consistent client sessions are used with the MongoDB Java driver. Read
* operations shouldn't be sent to a secondary when it lags too much behind,
* otherwise the read operation will block until it was able to catch up.
*/
public class ReplicaSetStatus extends ServerMonitorListenerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ReplicaSetStatus.class);
public static final long UNKNOWN_LAG = Long.MAX_VALUE;
/**
* Most recent heartbeats from connected servers
*/
private final Map<ServerAddress, Heartbeat> heartbeats = new HashMap<>();
private final Set<ServerAddress> members = new HashSet<>();
private final Deque<Long> estimatesPerMember = new LinkedList<>();
private long lagEstimate = UNKNOWN_LAG;
@Override
public void serverHeartbeatSucceeded(ServerHeartbeatSucceededEvent event) {
synchronized (heartbeats) {
ServerAddress address = event.getConnectionId().getServerId().getAddress();
Heartbeat beat = new Heartbeat(event);
heartbeats.put(address, beat);
members.addAll(beat.getHosts());
if (!members.isEmpty()) {
updateLag();
}
}
}
public long getLagEstimate() {
return lagEstimate;
}
private void updateLag() {
if (!heartbeats.keySet().containsAll(members)) {
lagEstimate = UNKNOWN_LAG;
return;
}
long oldestUpdate = Long.MAX_VALUE;
long newestUpdate = Long.MIN_VALUE;
long oldestWrite = Long.MAX_VALUE;
long newestWrite = Long.MIN_VALUE;
for (Map.Entry<ServerAddress, Heartbeat> entry : heartbeats.entrySet()) {
if (!members.contains(entry.getKey())) {
continue;
}
Heartbeat beat = entry.getValue();
Date lastWrite = beat.getLastWrite();
if (lastWrite == null) {
oldestWrite = 0;
newestWrite = Long.MAX_VALUE;
} else {
oldestWrite = Math.min(oldestWrite, lastWrite.getTime());
newestWrite = Math.max(newestWrite, lastWrite.getTime());
}
long updateTime = beat.getTime();
oldestUpdate = Math.min(oldestUpdate, updateTime);
newestUpdate = Math.max(newestUpdate, updateTime);
}
// heartbeats happen concurrently for all servers. It may happen we
// have some fresh and some stale heartbeats with update times up to
// heartbeatFreqMillis apart
long uncertaintyMillis = newestUpdate - oldestUpdate;
estimatesPerMember.addFirst(Math.max(0, newestWrite - oldestWrite - uncertaintyMillis));
// average estimates over up to number of members and remove old value
long estimate = 0;
int i = 0;
for (Iterator<Long> it = estimatesPerMember.iterator(); it.hasNext(); ) {
long v = it.next();
if (i++ < members.size()) {
estimate += v;
} else {
it.remove();
}
}
lagEstimate = estimate / members.size();
LOG.debug("lagEstimate: {} ms ({})", lagEstimate, estimatesPerMember);
}
private static class Heartbeat {
private final List<ServerAddress> hosts;
private final Date lastWrite;
private final long localTime;
Heartbeat(ServerHeartbeatSucceededEvent event) {
this.hosts = hostsFrom(event);
this.lastWrite = lastWriteFrom(event);
this.localTime = localTimeFrom(event).getTime();
}
Collection<ServerAddress> getHosts() {
return hosts;
}
long getTime() {
return localTime;
}
@Nullable
Date getLastWrite() {
return lastWrite;
}
}
private static List<ServerAddress> hostsFrom(ServerHeartbeatSucceededEvent event) {
return event.getReply().getArray("hosts", new BsonArray()).stream()
.map(bsonValue -> new ServerAddress(bsonValue.asString().getValue()))
.collect(Collectors.toList());
}
private static Date localTimeFrom(ServerHeartbeatSucceededEvent event) {
BsonDocument reply = event.getReply();
return new Date(reply.getDateTime("localTime").getValue());
}
private static Date lastWriteFrom(ServerHeartbeatSucceededEvent event) {
BsonDocument reply = event.getReply();
if (!reply.containsKey("lastWrite")) {
return null;
}
return new Date(reply.getDocument("lastWrite")
.getDateTime("lastWriteDate").getValue());
}
}