blob: e8fe50247bd0b8e790a4cfc832e8f76da50c625c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
import org.apache.jackrabbit.util.ISO8601;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents the document stored in the settings collection containing a
* 'cluster view'.
* <p>
* A 'cluster view' is the state of the membership of instances that are or have
* all been connected to the same oak repository. The 'cluster view' is
* maintained by all instances in the cluster concurrently - the faster one
* wins. Its information is derived from the clusterNodes collection. From there
* the following three states are derived and instances are grouped into these:
* <ul>
* <li>Active: an instance is active and has no recoveryLock is currently
* acquired. The lease timeout is ignored. When the lease times out, this is
* noticed by one of the instances at some point and a recovery is started, at
* which point the instance transitions from 'Active' to 'Recovering'.</li>
* <li>Recovering: an instance that was active but currently has the
* recoveryLock acquired by one of the instances.</li>
* <li>Inactive: an instance is not set to active (in which case the
* recoveryLock is never set)</li>
* </ul>
* <p>
* Note that the states managed in this ClusterViewDocument differs from the one
* from ClusterView - since ClusterView also manages the fact that after a
* recovery of a crashed instance there could be a 'backlog' of changes which it
* doesn't yet see until a background read is performed.
*/
class ClusterViewDocument {
private static final Logger logger = LoggerFactory.getLogger(ClusterViewDocument.class);
/** the id of this document is always 'clusterView' **/
private static final String CLUSTERVIEW_DOC_ID = "clusterView";
// keys that we store in the root document - and in the history
/**
* document key that stores the monotonically incrementing sequence number
* of the cluster view. Any update will increase this by 1
**/
static final String VIEW_SEQ_NUM_KEY = "seqNum";
/**
* document key that stores the comma-separated list of active instance ids
**/
static final String ACTIVE_KEY = "active";
/**
* document key that stores the comma-separated list of inactive instance
* ids (they might still have a backlog, that is handled in ClusterView
* though, never persisted
*/
static final String INACTIVE_KEY = "inactive";
/**
* document key that stores the comma-separated list of recovering instance
* ids
**/
static final String RECOVERING_KEY = "recovering";
/**
* document key that stores the date and time when this view was created -
* for debugging purpose only
**/
private static final String CREATED_KEY = "created";
/**
* document key that stores the id of the instance that created this view -
* for debugging purpose only
**/
private static final String CREATOR_KEY = "creator";
/**
* document key that stores the date and time when this was was retired -
* for debugging purpose only
**/
private static final String RETIRED_KEY = "retired";
/**
* document key that stores the id of the instance that retired this view -
* for debugging purpose only
**/
private static final String RETIRER_KEY = "retirer";
/**
* document key that stores a short, limited history of previous cluster
* views - for debugging purpose only
**/
private static final String CLUSTER_VIEW_HISTORY_KEY = "clusterViewHistory";
/** number of elements kept in the CLUSTERVIEW_HISTORY_KEY field **/
static final int HISTORY_LIMIT = 10;
/** the monotonically incrementing sequence number of this cluster view **/
private final long viewSeqNum;
/** the ids of instances that are active at this moment **/
private final Integer[] activeIds;
/**
* the ids of instances that are recovering (lastRev-recovery) at this
* moment
**/
private final Integer[] recoveringIds;
/** the ids of instances that are inactive at this moment **/
private final Integer[] inactiveIds;
/**
* the short, limited history of previous cluster views, for debugging only
**/
private final Map<Object, String> viewHistory;
/** the date+time at which this view was created, for debugging only **/
private final String createdAt;
/** the id of the instance that created this view, for debugging only **/
private final Long createdBy;
/**
* Main method by which the ClusterViewDocument is updated in the settings
* collection
*
* @return the resulting ClusterViewDocument as just updated in the settings
* collection - or null if another instance was updating the
* clusterview concurrently (in which case the caller should re-read
* first and possibly re-update if needed)
*/
static ClusterViewDocument readOrUpdate(DocumentNodeStore documentNodeStore, Set<Integer> activeIds, Set<Integer> recoveringIds,
Set<Integer> inactiveIds) {
logger.trace("readOrUpdate: expected: activeIds: {}, recoveringIds: {}, inactiveIds: {}", activeIds, recoveringIds,
inactiveIds);
if (activeIds == null || activeIds.size() == 0) {
logger.info("readOrUpdate: activeIds must not be null or empty");
throw new IllegalStateException("activeIds must not be null or empty");
}
int localClusterId = documentNodeStore.getClusterId();
final ClusterViewDocument previousView = doRead(documentNodeStore);
if (previousView != null) {
if (previousView.matches(activeIds, recoveringIds, inactiveIds)) {
logger.trace("readOrUpdate: view unchanged, returning: {}", previousView);
return previousView;
}
}
logger.trace(
"readOrUpdate: view change detected, going to update from {} to activeIds: {}, recoveringIds: {}, inactiveIds: {}",
previousView, activeIds, recoveringIds, inactiveIds);
UpdateOp updateOp = new UpdateOp(CLUSTERVIEW_DOC_ID, true);
Date now = new Date();
updateOp.set(ACTIVE_KEY, setToCsv(activeIds));
updateOp.set(RECOVERING_KEY, setToCsv(recoveringIds));
updateOp.set(INACTIVE_KEY, setToCsv(inactiveIds));
updateOp.set(CREATED_KEY, ISO8601.format(now));
updateOp.set(CREATOR_KEY, (long)localClusterId);
if (previousView != null) {
Map<Object, String> previousHistory = previousView.getHistory();
if (previousHistory!=null) {
Map<Object, String> mapClone = new HashMap<Object, String>(previousHistory);
while(mapClone.size()>=HISTORY_LIMIT) {
Revision oldestRevision = oldestRevision(mapClone);
if (oldestRevision==null) {
break;
}
updateOp.removeMapEntry(CLUSTER_VIEW_HISTORY_KEY, oldestRevision);
if (mapClone.remove(oldestRevision)==null) {
// prevent an endless loop
break;
}
}
}
updateOp.setMapEntry(CLUSTER_VIEW_HISTORY_KEY, Revision.newRevision(localClusterId), asHistoryEntry(previousView, localClusterId, now));
}
final Long newViewSeqNum;
if (previousView == null) {
// we are the first ever, looks like, that the clusterview is
// defined
// so we can use viewId==1 and we make sure no other cluster node
// tries to create this first one simultaneously - so we use
// 'create'
ArrayList<UpdateOp> updateOps = new ArrayList<UpdateOp>();
newViewSeqNum = 1L;
updateOp.setNew(true); // paranoia as that's already set above
updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum);
updateOps.add(updateOp);
logger.debug("updateAndRead: trying to create the first ever clusterView - hence {}={}", VIEW_SEQ_NUM_KEY,
newViewSeqNum);
if (!documentNodeStore.getDocumentStore().create(Collection.SETTINGS, updateOps)) {
logger.debug("updateAndRead: someone else just created the first view ever while I tried - reread that one later");
return null;
}
} else {
// there were earlier clusterViews (the normal case) - thus we
// use 'findAndUpdate' with the condition that
// the view id is still at the previousview one
Long previousViewSeqNum = previousView.getViewSeqNum();
updateOp.setNew(false); // change to false from true above
updateOp.equals(VIEW_SEQ_NUM_KEY, null, previousViewSeqNum);
newViewSeqNum = previousViewSeqNum + 1;
updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum);
logger.debug("updateAndRead: trying to update the clusterView to {}={} ", VIEW_SEQ_NUM_KEY, newViewSeqNum);
if (documentNodeStore.getDocumentStore().findAndUpdate(Collection.SETTINGS, updateOp) == null) {
logger.debug(
"updateAndRead: someone else just updated the view which I wanted to do as well - reread that one later");
return null;
}
}
// whatever the outcome of the above - we don't care -
// re-reading will in any case definitely show what has been persisted
// and if the re-read view contains the same id, it is what we have
// written
// - otherwise someone else came in between and we have to step back and
// retry
ClusterViewDocument readResult = doRead(documentNodeStore);
if (readResult == null) {
logger.debug("updateAndRead: got null from read - whatever the exact reason, we must retry in a moment.");
return null;
} else if (newViewSeqNum.equals(readResult.getViewSeqNum())) {
logger.debug("updateAndRead: matching view - no change");
return readResult;
} else {
logger.debug("updateAndRead: someone else in the cluster was updating right after I also succeeded - re-read in a bit");
return null;
}
}
/**
* Determine the oldest history entry (to be removed to keep history
* limited to HISTORY_LIMIT)
*
* @param historyMap
*/
private static Revision oldestRevision(Map<Object, String> historyMap) {
String oldestRevision = null;
for (Iterator<Object> it = historyMap.keySet().iterator(); it.hasNext();) {
Object obj = it.next();
// obj can be a String or a Revision
// in case of it being a Revision the toString() will
// be appropriate, hence:
String r = obj.toString();
if (oldestRevision == null) {
oldestRevision = r;
} else if (Revision.getTimestampDifference(Revision.fromString(r), Revision.fromString(oldestRevision)) < 0) {
oldestRevision = r;
}
}
if (oldestRevision==null) {
return null;
} else {
return Revision.fromString(oldestRevision);
}
}
/** Converts a previous clusterView document into a history 'string' **/
private static String asHistoryEntry(final ClusterViewDocument previousView, int retiringClusterNodeId, Date retireTime) {
if (previousView==null) {
throw new IllegalArgumentException("previousView must not be null");
}
String h;
JsopBuilder b = new JsopBuilder();
b.object();
b.key(VIEW_SEQ_NUM_KEY);
b.value(previousView.getViewSeqNum());
b.key(CREATED_KEY);
b.value(String.valueOf(previousView.getCreatedAt()));
b.key(CREATOR_KEY);
b.value(previousView.getCreatedBy());
b.key(RETIRED_KEY);
b.value(String.valueOf(ISO8601.format(retireTime)));
b.key(RETIRER_KEY);
b.value(retiringClusterNodeId);
b.key(ACTIVE_KEY);
b.value(setToCsv(previousView.getActiveIds()));
b.key(RECOVERING_KEY);
b.value(setToCsv(previousView.getRecoveringIds()));
b.key(INACTIVE_KEY);
b.value(setToCsv(previousView.getInactiveIds()));
b.endObject();
h = b.toString();
return h;
}
/**
* helper method to convert a set to a comma-separated string (without using
* toString() for safety)
*
* @return null if set is null or empty, comma-separated string (no spaces)
* otherwise
*/
private static String setToCsv(Set<Integer> ids) {
if (ids == null || ids.size() == 0) {
return null;
}
StringBuffer sb = new StringBuffer();
for (Integer id : ids) {
if (sb.length() != 0) {
sb.append(",");
}
sb.append(id);
}
return sb.toString();
}
/**
* helper method to convert an array to a comma-separated string
*
* @return null if array is null or empty, comman-separated string (no
* spaces) otherwise
*/
static String arrayToCsv(Integer[] arr) {
if (arr == null || arr.length == 0) {
return null;
}
StringBuffer sb = new StringBuffer();
for (Integer a : arr) {
if (sb.length() != 0) {
sb.append(",");
}
sb.append(a);
}
return sb.toString();
}
/**
* inverse helper method which converts a comma-separated string into an
* integer array
**/
static Integer[] csvToIntegerArray(String csv) {
if (csv == null) {
return null;
}
String[] split = csv.split(",");
Integer[] result = new Integer[split.length];
for (int i = 0; i < split.length; i++) {
result[i] = Integer.parseInt(split[i]);
}
return result;
}
/**
* internal reader of an existing clusterView document from the settings
* collection
**/
private static ClusterViewDocument doRead(DocumentNodeStore documentNodeStore) {
DocumentStore documentStore = documentNodeStore.getDocumentStore();
Document doc = documentStore.find(Collection.SETTINGS, "clusterView",
-1 /* -1; avoid caching */);
if (doc == null) {
return null;
} else {
ClusterViewDocument clusterView = new ClusterViewDocument(doc);
if (clusterView.isValid()) {
return clusterView;
} else {
logger.warn("read: clusterView document is not valid: " + doc.format());
return null;
}
}
}
/** comparison helper that compares an integer array with a set **/
static boolean matches(Integer[] expected, Set<Integer> actual) {
boolean expectedIsEmpty = expected == null || expected.length == 0;
boolean actualIsEmpty = actual == null || actual.size() == 0;
if (expectedIsEmpty && actualIsEmpty) {
// if both are null or empty, they match
return true;
}
if (expectedIsEmpty != actualIsEmpty) {
// if one of them is only empty, but the other not, then they don't
// match
return false;
}
if (expected.length != actual.size()) {
// different size
return false;
}
for (int i = 0; i < expected.length; i++) {
Integer aMemberId = expected[i];
if (!actual.contains(aMemberId)) {
return false;
}
}
return true;
}
@SuppressWarnings("unchecked")
ClusterViewDocument(Document doc) {
if (doc == null) {
throw new IllegalArgumentException("doc must not be null");
}
this.viewSeqNum = (Long) doc.get(VIEW_SEQ_NUM_KEY);
this.createdAt = (String) doc.get(CREATED_KEY);
Object creatorId = doc.get(CREATOR_KEY);
if (creatorId instanceof Number) {
this.createdBy = ((Number) creatorId).longValue();
} else if (creatorId == null) {
this.createdBy = -1L;
} else {
throw new IllegalStateException("Unsupported type of creator: "+creatorId);
}
Object obj = doc.get(ACTIVE_KEY);
if (obj == null || !(obj instanceof String)) {
logger.trace("<init>: {} : {}", ACTIVE_KEY, obj);
this.activeIds = new Integer[0];
} else {
this.activeIds = csvToIntegerArray((String) obj);
}
Object obj2 = doc.get(RECOVERING_KEY);
if (obj2 == null || !(obj2 instanceof String)) {
logger.trace("<init>: {} : {}", RECOVERING_KEY, obj2);
this.recoveringIds = new Integer[0];
} else {
this.recoveringIds = csvToIntegerArray((String) obj2);
}
Object obj3 = doc.get(INACTIVE_KEY);
if (obj3 == null || !(obj3 instanceof String)) {
logger.trace("<init>: {} : {}", INACTIVE_KEY, obj3);
this.inactiveIds = new Integer[0];
} else {
this.inactiveIds = csvToIntegerArray((String) obj3);
}
Object obj4 = doc.get(CLUSTER_VIEW_HISTORY_KEY);
if (obj4 == null || !(obj4 instanceof Map)) {
logger.trace("<init> viewHistory is null");
this.viewHistory = null;
} else {
this.viewHistory = ((Map<Object, String>) obj4);
}
}
/** Returns the set of active ids of this cluster view **/
Set<Integer> getActiveIds() {
return new HashSet<Integer>(Arrays.asList(activeIds));
}
/** Returns the set of recovering ids of this cluster view **/
Set<Integer> getRecoveringIds() {
return new HashSet<Integer>(Arrays.asList(recoveringIds));
}
/** Returns the set of inactive ids of this cluster view **/
Set<Integer> getInactiveIds() {
return new HashSet<Integer>(Arrays.asList(inactiveIds));
}
/** Returns the history map **/
Map<Object, String> getHistory() {
return viewHistory;
}
@Override
public String toString() {
return "a ClusterView[valid=" + isValid() + ", viewSeqNum=" + viewSeqNum
+ ", activeIds=" + arrayToCsv(activeIds) + ", recoveringIds=" + arrayToCsv(recoveringIds) + ", inactiveIds="
+ arrayToCsv(inactiveIds) + "]";
}
boolean isValid() {
return viewSeqNum >= 0 && activeIds != null && activeIds.length > 0;
}
/**
* Returns the date+time when this view was created, for debugging purpose
* only
**/
String getCreatedAt() {
return createdAt;
}
/**
* Returns the id of the instance that created this view, for debugging
* purpose only
**/
long getCreatedBy() {
return createdBy;
}
/** Returns the monotonically incrementing sequenece number of this view **/
long getViewSeqNum() {
return viewSeqNum;
}
private boolean matches(Set<Integer> activeIds, Set<Integer> recoveringIds, Set<Integer> inactiveIds) {
if (!matches(this.activeIds, activeIds)) {
return false;
}
if (!matches(this.recoveringIds, recoveringIds)) {
return false;
}
if (!matches(this.inactiveIds, inactiveIds)) {
return false;
}
return true;
}
}