merging from 8x
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index 5356a9f..f241a7a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -248,7 +248,7 @@
Map<String,Object> props;
Map<String,Slice> slices;
- if ("true".equals(String.valueOf(objs.get(DocCollection.PER_REPLICA_STATE)))) {
+ if (Boolean.parseBoolean(String.valueOf(objs.get(DocCollection.PER_REPLICA_STATE)))) {
log.info("a collection {} has per-replica state", name); // nocommit should be a debug
//this collection has replica states stored outside
ReplicaStatesProvider rsp = REPLICASTATES_PROVIDER.get();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index c35c750..738383b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -178,9 +178,8 @@
case PULL_REPLICAS:
case TLOG_REPLICAS:
return Integer.parseInt(o.toString());
- case READ_ONLY:
- return Boolean.parseBoolean(o.toString());
case PER_REPLICA_STATE:
+ case READ_ONLY:
return Boolean.parseBoolean(o.toString());
case "snitch":
default:
@@ -302,7 +301,7 @@
@Override
public String toString() {
- return "DocCollection("+name+"/" + znode + "/" + znodeVersion
+ return "DocCollection("+name+"/" + znode + "/" + znodeVersion+" "
+ (perReplicaStates == null ? "": perReplicaStates.toString())+")="
+ toJSONString(this);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index d66b4c5..284f082 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
+import java.util.function.Function;
import org.apache.solr.cluster.api.SimpleMap;
import org.apache.solr.common.MapWriter;
@@ -55,6 +56,8 @@
public class PerReplicaStates implements ReflectMapWriter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final char SEPARATOR = ':';
+ //no:of times to retry in case of a CAS failure
+ public static final int MAX_RETRIES = 5;
@JsonProperty
@@ -113,13 +116,15 @@
*/
public static void persist(WriteOps ops, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
List<Operation> operations = ops.get();
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < MAX_RETRIES; i++) {
try {
persist(operations, znode, zkClient);
return;
} catch (KeeperException.NodeExistsException | KeeperException.NoNodeException e) {
//state is stale
- log.info("stale state for {}. retrying...", znode);
+ if(log.isInfoEnabled()) {
+ log.info("stale state for {} , attempt: {}. retrying...", znode, i);
+ }
operations = ops.get(PerReplicaStates.fetch(znode, zkClient, null));
}
}
@@ -130,28 +135,20 @@
*/
public static void persist(List<Operation> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
if (operations == null || operations.isEmpty()) return;
- log.debug("Per-replica state being persisted for :{}, ops: {}", znode, operations);
+ if (log.isDebugEnabled()) {
+ log.debug("Per-replica state being persisted for : '{}', ops: {}", znode, operations);
+ }
List<Op> ops = new ArrayList<>(operations.size());
for (Operation op : operations) {
//the state of the replica is being updated
String path = znode + "/" + op.state.asString;
- List<ACL> acls = zkClient.getZkACLProvider().getACLsToAdd(path);
ops.add(op.typ == Operation.Type.ADD ?
- Op.create(path, null, acls, CreateMode.PERSISTENT) :
+ Op.create(path, null, zkClient.getZkACLProvider().getACLsToAdd(path), CreateMode.PERSISTENT) :
Op.delete(path, -1));
}
try {
zkClient.multi(ops, true);
- if (log.isDebugEnabled()) {
- //nocommit
- try {
- Stat stat = zkClient.exists(znode, null, true);
- log.debug("After update, cversion : {}", stat.getCversion());
- } catch (Exception e) {
- }
-
- }
} catch (KeeperException e) {
log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
throw e;
@@ -352,10 +349,19 @@
* Do not directly manipulate the per replica states as it can become difficult to debug them
*
*/
- public static abstract class WriteOps {
+ public static class WriteOps {
private PerReplicaStates rs;
List<Operation> ops;
private boolean preOp = true;
+ final Function<PerReplicaStates, List<Operation>> fun;
+
+ WriteOps(Function<PerReplicaStates, List<Operation>> fun) {
+ this.fun = fun;
+ }
+
+ public PerReplicaStates getPerReplicaStates() {
+ return rs;
+ }
/**
* state of a replica is changed
@@ -363,100 +369,83 @@
* @param newState the new state
*/
public static WriteOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
- return new WriteOps() {
- @Override
- protected List<Operation> refresh(PerReplicaStates rs) {
- List<Operation> ops = new ArrayList<>(2);
- State existing = rs.get(replica);
- if (existing == null) {
- ops.add(new Operation(Operation.Type.ADD, new State(replica, newState, Boolean.FALSE, 0)));
- } else {
- ops.add(new Operation(Operation.Type.ADD, new State(replica, newState, existing.isLeader, existing.version + 1)));
- addDeleteStaleNodes(ops, existing);
- }
- if (log.isDebugEnabled()) {
- log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, ops);
- }
- return ops;
+ return new WriteOps(prs -> {
+ List<Operation> operations = new ArrayList<>(2);
+ State existing = rs.get(replica);
+ if (existing == null) {
+ operations.add(new Operation(Operation.Type.ADD, new State(replica, newState, Boolean.FALSE, 0)));
+ } else {
+ operations.add(new Operation(Operation.Type.ADD, new State(replica, newState, existing.isLeader, existing.version + 1)));
+ addDeleteStaleNodes(operations, existing);
}
- }.init(rs);
+ if (log.isDebugEnabled()) {
+ log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, operations);
+ }
+ return operations;
+ }).init(rs);
}
- public PerReplicaStates getPerReplicaStates() {
- return rs;
- }
-
-
- /**Switch a collection from/to perReplicaState=true
+ /**
+ * Switch a collection from/to perReplicaState=true
*/
- public static WriteOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates prs) {
- return new WriteOps() {
- @Override
- List<Operation> refresh(PerReplicaStates prs) {
- return enable ? enable(coll) : disable(prs);
- }
+ public static WriteOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates rs) {
+ return new WriteOps(prs -> enable ? enable(coll) : disable(prs)).init(rs);
- List<Operation> enable(DocCollection coll) {
- List<Operation> result = new ArrayList<>();
- coll.forEachReplica((s, r) -> result.add(new Operation(Operation.Type.ADD, new State(r.getName(), r.getState(), r.isLeader(), 0))));
- return result;
- }
+ }
- List<Operation> disable(PerReplicaStates prs) {
- List<Operation> result = new ArrayList<>();
- prs.states.forEachEntry((s, state) -> result.add(new Operation(Operation.Type.DELETE, state)));
- return result;
- }
- }.init(prs);
+ private static List<Operation> enable(DocCollection coll) {
+ List<Operation> result = new ArrayList<>();
+ coll.forEachReplica((s, r) -> result.add(new Operation(Operation.Type.ADD, new State(r.getName(), r.getState(), r.isLeader(), 0))));
+ return result;
+ }
+ private static List<Operation> disable(PerReplicaStates prs) {
+ List<Operation> result = new ArrayList<>();
+ prs.states.forEachEntry((s, state) -> result.add(new Operation(Operation.Type.DELETE, state)));
+ return result;
}
/**
* Flip the leader replica to a new one
*
- * @param allReplicas allReplicas of the shard
- * @param next next leader
+ * @param allReplicas allReplicas of the shard
+ * @param next next leader
*/
public static WriteOps flipLeader(Set<String> allReplicas, String next, PerReplicaStates rs) {
- return new WriteOps() {
-
- @Override
- protected List<Operation> refresh(PerReplicaStates rs) {
- List<Operation> ops = new ArrayList<>();
- if (next != null) {
- State st = rs.get(next);
- if (st != null) {
- if (!st.isLeader) {
- ops.add(new Operation(Operation.Type.ADD, new State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
- ops.add(new Operation(Operation.Type.DELETE, st));
- }
- //else do not do anything , that node is the leader
- } else {
- //there is no entry for the new leader.
- //create one
- ops.add(new Operation(Operation.Type.ADD, new State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
+ return new WriteOps(prs -> {
+ List<Operation> ops = new ArrayList<>();
+ if (next != null) {
+ State st = rs.get(next);
+ if (st != null) {
+ if (!st.isLeader) {
+ ops.add(new Operation(Operation.Type.ADD, new State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
+ ops.add(new Operation(Operation.Type.DELETE, st));
}
+ //else do not do anything , that node is the leader
+ } else {
+ //there is no entry for the new leader.
+ //create one
+ ops.add(new Operation(Operation.Type.ADD, new State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
}
-
- //now go through all other replicas and unset previous leader
- for (String r : allReplicas) {
- State st = rs.get(r);
- if (st == null) continue;//unlikely
- if (!Objects.equals(r, next)) {
- if (st.isLeader) {
- //some other replica is the leader now. unset
- ops.add(new Operation(Operation.Type.ADD, new State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
- ops.add(new Operation(Operation.Type.DELETE, st));
- }
- }
- }
- if (log.isDebugEnabled()) {
- log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
- }
- return ops;
}
- }.init(rs);
+ //now go through all other replicas and unset previous leader
+ for (String r : allReplicas) {
+ State st = rs.get(r);
+ if (st == null) continue;//unlikely
+ if (!Objects.equals(r, next)) {
+ if (st.isLeader) {
+ //some other replica is the leader now. unset
+ ops.add(new Operation(Operation.Type.ADD, new State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
+ ops.add(new Operation(Operation.Type.DELETE, st));
+ }
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
+ }
+ return ops;
+ }).init(rs);
}
/**
@@ -465,75 +454,61 @@
* @param replica name of the replica to be deleted
*/
public static WriteOps deleteReplica(String replica, PerReplicaStates rs) {
- return new WriteOps() {
- @Override
- protected List<Operation> refresh(PerReplicaStates rs) {
- List<Operation> result;
- if (rs == null) {
- result = Collections.emptyList();
- } else {
- State state = rs.get(replica);
- result = addDeleteStaleNodes(new ArrayList<>(), state);
- }
- return result;
+ return new WriteOps(prs -> {
+ List<Operation> result;
+ if (rs == null) {
+ result = Collections.emptyList();
+ } else {
+ State state = rs.get(replica);
+ result = addDeleteStaleNodes(new ArrayList<>(), state);
}
- }.init(rs);
+ return result;
+ }).init(rs);
}
public static WriteOps addReplica(String replica, Replica.State state, boolean isLeader, PerReplicaStates rs) {
- return new WriteOps() {
- @Override
- protected List<Operation> refresh(PerReplicaStates rs) {
- return singletonList(new Operation(Operation.Type.ADD,
- new State(replica, state, isLeader, 0)));
- }
- }.init(rs);
+ return new WriteOps(perReplicaStates -> singletonList(new Operation(Operation.Type.ADD,
+ new State(replica, state, isLeader, 0)))).init(rs);
}
/**
* mark a bunch of replicas as DOWN
*/
public static WriteOps downReplicas(List<String> replicas, PerReplicaStates rs) {
- return new WriteOps() {
- @Override
- List<Operation> refresh(PerReplicaStates rs) {
- List<Operation> ops = new ArrayList<>();
- for (String replica : replicas) {
- State r = rs.get(replica);
- if (r != null) {
- if (r.state == Replica.State.DOWN && !r.isLeader) continue;
- ops.add(new Operation(Operation.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
- addDeleteStaleNodes(ops, r);
- } else {
- ops.add(new Operation(Operation.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
- }
+ return new WriteOps(prs -> {
+ List<Operation> operations = new ArrayList<>();
+ for (String replica : replicas) {
+ State r = rs.get(replica);
+ if (r != null) {
+ if (r.state == Replica.State.DOWN && !r.isLeader) continue;
+ operations.add(new Operation(Operation.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
+ addDeleteStaleNodes(operations, r);
+ } else {
+ operations.add(new Operation(Operation.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
}
- if (log.isDebugEnabled()) {
- log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, ops);
- }
- return ops;
}
- }.init(rs);
+ if (log.isDebugEnabled()) {
+ log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, operations);
+ }
+ return operations;
+ }).init(rs);
}
/**
- * Just creates and deletes a summy entry so that the {@link Stat#getCversion()} of states.json
+ * Just creates and deletes a dummy entry so that the {@link Stat#getCversion()} of states.json
* is updated
*/
public static WriteOps touchChildren() {
- WriteOps result = new WriteOps() {
- @Override
- List<Operation> refresh(PerReplicaStates rs) {
- List<Operation> ops = new ArrayList<>();
- State st = new State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
- ops.add(new Operation(Operation.Type.ADD, st));
- ops.add(new Operation(Operation.Type.DELETE, st));
- if (log.isDebugEnabled()) {
- log.debug("touchChildren {}", ops);
- }
- return ops;
+ WriteOps result = new WriteOps(prs -> {
+ List<Operation> operations = new ArrayList<>();
+ State st = new State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
+ operations.add(new Operation(Operation.Type.ADD, st));
+ operations.add(new Operation(Operation.Type.DELETE, st));
+ if (log.isDebugEnabled()) {
+ log.debug("touchChildren {}", operations);
}
- };
+ return operations;
+ });
result.preOp = false;
result.ops = result.refresh(null);
return result;
@@ -564,12 +539,17 @@
}
/**
+ * This method should compute the set of ZK operations for a given action
+ * for instance, a state change may result in
* if a multi operation fails because the state got modified from behind,
* refresh the operation and try again
*
* @param prs The new state
*/
- abstract List<Operation> refresh(PerReplicaStates prs);
+ List<Operation> refresh(PerReplicaStates prs) {
+ if (fun != null) return fun.apply(prs);
+ return null;
+ }
@Override
public String toString() {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 31dac96..f805237 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1271,12 +1271,9 @@
replicaStates = zkClient.getChildren(collectionPath, this, stat, true);
PerReplicaStates newStates = new PerReplicaStates(collectionPath, stat.getCversion(), replicaStates);
DocCollection oldState = watchedCollectionStates.get(coll);
- DocCollection newState = null;
- if (oldState != null) {
- newState = oldState.copyWith(newStates);
- } else {
- newState = fetchCollectionState(coll, null);
- }
+ final DocCollection newState = oldState != null ?
+ oldState.copyWith(newStates) :
+ fetchCollectionState(coll, null);
updateWatchedCollection(coll, newState);
synchronized (getUpdateLock()) {
constructState(Collections.singleton(coll));