merging from 8x
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index 5356a9f..f241a7a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -248,7 +248,7 @@
     Map<String,Object> props;
     Map<String,Slice> slices;
 
-    if ("true".equals(String.valueOf(objs.get(DocCollection.PER_REPLICA_STATE)))) {
+    if (Boolean.parseBoolean(String.valueOf(objs.get(DocCollection.PER_REPLICA_STATE)))) {
       log.info("a collection {} has per-replica state", name); // nocommit should be a debug
       //this collection has replica states stored outside
       ReplicaStatesProvider rsp = REPLICASTATES_PROVIDER.get();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index c35c750..738383b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -178,9 +178,8 @@
       case PULL_REPLICAS:
       case TLOG_REPLICAS:
         return Integer.parseInt(o.toString());
-      case READ_ONLY:
-        return Boolean.parseBoolean(o.toString());
       case PER_REPLICA_STATE:
+      case READ_ONLY:
         return Boolean.parseBoolean(o.toString());
       case "snitch":
       default:
@@ -302,7 +301,7 @@
 
   @Override
   public String toString() {
-    return "DocCollection("+name+"/" + znode + "/" + znodeVersion
+    return "DocCollection("+name+"/" + znode + "/" + znodeVersion+" "
         + (perReplicaStates == null ? "": perReplicaStates.toString())+")="
         + toJSONString(this);
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index d66b4c5..284f082 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -28,6 +28,7 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 
 import org.apache.solr.cluster.api.SimpleMap;
 import org.apache.solr.common.MapWriter;
@@ -55,6 +56,8 @@
 public class PerReplicaStates implements ReflectMapWriter {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final char SEPARATOR = ':';
+  //no:of times to retry in case of a CAS failure
+  public static final int MAX_RETRIES = 5;
 
 
   @JsonProperty
@@ -113,13 +116,15 @@
    */
   public static void persist(WriteOps ops, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
     List<Operation> operations = ops.get();
-    for (int i = 0; i < 10; i++) {
+    for (int i = 0; i < MAX_RETRIES; i++) {
       try {
         persist(operations, znode, zkClient);
         return;
       } catch (KeeperException.NodeExistsException | KeeperException.NoNodeException e) {
         //state is stale
-        log.info("stale state for {}. retrying...", znode);
+        if(log.isInfoEnabled()) {
+          log.info("stale state for {} , attempt: {}. retrying...", znode, i);
+        }
         operations = ops.get(PerReplicaStates.fetch(znode, zkClient, null));
       }
     }
@@ -130,28 +135,20 @@
    */
   public static void persist(List<Operation> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
     if (operations == null || operations.isEmpty()) return;
-    log.debug("Per-replica state being persisted for :{}, ops: {}", znode, operations);
+    if (log.isDebugEnabled()) {
+      log.debug("Per-replica state being persisted for : '{}', ops: {}", znode, operations);
+    }
 
     List<Op> ops = new ArrayList<>(operations.size());
     for (Operation op : operations) {
       //the state of the replica is being updated
       String path = znode + "/" + op.state.asString;
-      List<ACL> acls = zkClient.getZkACLProvider().getACLsToAdd(path);
       ops.add(op.typ == Operation.Type.ADD ?
-          Op.create(path, null, acls, CreateMode.PERSISTENT) :
+          Op.create(path, null, zkClient.getZkACLProvider().getACLsToAdd(path), CreateMode.PERSISTENT) :
           Op.delete(path, -1));
     }
     try {
       zkClient.multi(ops, true);
-      if (log.isDebugEnabled()) {
-        //nocommit
-        try {
-          Stat stat = zkClient.exists(znode, null, true);
-          log.debug("After update, cversion : {}", stat.getCversion());
-        } catch (Exception e) {
-        }
-
-      }
     } catch (KeeperException e) {
       log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
       throw e;
@@ -352,10 +349,19 @@
    * Do not directly manipulate the per replica states as it can become difficult to debug them
    *
    */
-  public static abstract class WriteOps {
+  public static class WriteOps {
     private PerReplicaStates rs;
     List<Operation> ops;
     private boolean preOp = true;
+    final Function<PerReplicaStates, List<Operation>> fun;
+
+    WriteOps(Function<PerReplicaStates, List<Operation>> fun) {
+      this.fun = fun;
+    }
+
+    public PerReplicaStates getPerReplicaStates() {
+      return rs;
+    }
 
     /**
      * state of a replica is changed
@@ -363,100 +369,83 @@
      * @param newState the new state
      */
     public static WriteOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
-      return new WriteOps() {
-        @Override
-        protected List<Operation> refresh(PerReplicaStates rs) {
-          List<Operation> ops = new ArrayList<>(2);
-          State existing = rs.get(replica);
-          if (existing == null) {
-            ops.add(new Operation(Operation.Type.ADD, new State(replica, newState, Boolean.FALSE, 0)));
-          } else {
-            ops.add(new Operation(Operation.Type.ADD, new State(replica, newState, existing.isLeader, existing.version + 1)));
-            addDeleteStaleNodes(ops, existing);
-          }
-          if (log.isDebugEnabled()) {
-            log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, ops);
-          }
-          return ops;
+      return new WriteOps(prs -> {
+        List<Operation> operations = new ArrayList<>(2);
+        State existing = rs.get(replica);
+        if (existing == null) {
+          operations.add(new Operation(Operation.Type.ADD, new State(replica, newState, Boolean.FALSE, 0)));
+        } else {
+          operations.add(new Operation(Operation.Type.ADD, new State(replica, newState, existing.isLeader, existing.version + 1)));
+          addDeleteStaleNodes(operations, existing);
         }
-      }.init(rs);
+        if (log.isDebugEnabled()) {
+          log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, operations);
+        }
+        return operations;
+      }).init(rs);
     }
 
-    public PerReplicaStates getPerReplicaStates() {
-      return rs;
-    }
-
-
-    /**Switch a collection from/to perReplicaState=true
+    /**
+     * Switch a collection from/to perReplicaState=true
      */
-    public static WriteOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates prs) {
-      return new WriteOps() {
-        @Override
-        List<Operation> refresh(PerReplicaStates prs) {
-          return enable ? enable(coll) : disable(prs);
-        }
+    public static WriteOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates rs) {
+      return new WriteOps(prs -> enable ? enable(coll) : disable(prs)).init(rs);
 
-        List<Operation> enable(DocCollection coll) {
-          List<Operation> result = new ArrayList<>();
-          coll.forEachReplica((s, r) -> result.add(new Operation(Operation.Type.ADD, new State(r.getName(), r.getState(), r.isLeader(), 0))));
-          return result;
-        }
+    }
 
-        List<Operation> disable(PerReplicaStates prs) {
-          List<Operation> result = new ArrayList<>();
-          prs.states.forEachEntry((s, state) -> result.add(new Operation(Operation.Type.DELETE, state)));
-          return result;
-        }
-      }.init(prs);
+    private static List<Operation> enable(DocCollection coll) {
+      List<Operation> result = new ArrayList<>();
+      coll.forEachReplica((s, r) -> result.add(new Operation(Operation.Type.ADD, new State(r.getName(), r.getState(), r.isLeader(), 0))));
+      return result;
+    }
 
+    private static List<Operation> disable(PerReplicaStates prs) {
+      List<Operation> result = new ArrayList<>();
+      prs.states.forEachEntry((s, state) -> result.add(new Operation(Operation.Type.DELETE, state)));
+      return result;
     }
 
     /**
      * Flip the leader replica to a new one
      *
-     * @param allReplicas  allReplicas of the shard
-     * @param next next leader
+     * @param allReplicas allReplicas of the shard
+     * @param next        next leader
      */
     public static WriteOps flipLeader(Set<String> allReplicas, String next, PerReplicaStates rs) {
-      return new WriteOps() {
-
-        @Override
-        protected List<Operation> refresh(PerReplicaStates rs) {
-          List<Operation> ops = new ArrayList<>();
-          if (next != null) {
-            State st = rs.get(next);
-            if (st != null) {
-              if (!st.isLeader) {
-                ops.add(new Operation(Operation.Type.ADD, new State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
-                ops.add(new Operation(Operation.Type.DELETE, st));
-              }
-              //else do not do anything , that node is the leader
-            } else {
-              //there is no entry for the new leader.
-              //create one
-              ops.add(new Operation(Operation.Type.ADD, new State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
+      return new WriteOps(prs -> {
+        List<Operation> ops = new ArrayList<>();
+        if (next != null) {
+          State st = rs.get(next);
+          if (st != null) {
+            if (!st.isLeader) {
+              ops.add(new Operation(Operation.Type.ADD, new State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
+              ops.add(new Operation(Operation.Type.DELETE, st));
             }
+            //else do not do anything , that node is the leader
+          } else {
+            //there is no entry for the new leader.
+            //create one
+            ops.add(new Operation(Operation.Type.ADD, new State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
           }
-
-          //now go through all other replicas and unset previous leader
-          for (String r : allReplicas) {
-            State st = rs.get(r);
-            if (st == null) continue;//unlikely
-            if (!Objects.equals(r, next)) {
-              if (st.isLeader) {
-                //some other replica is the leader now. unset
-                ops.add(new Operation(Operation.Type.ADD, new State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
-                ops.add(new Operation(Operation.Type.DELETE, st));
-              }
-            }
-          }
-          if (log.isDebugEnabled()) {
-            log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
-          }
-          return ops;
         }
 
-      }.init(rs);
+        //now go through all other replicas and unset previous leader
+        for (String r : allReplicas) {
+          State st = rs.get(r);
+          if (st == null) continue;//unlikely
+          if (!Objects.equals(r, next)) {
+            if (st.isLeader) {
+              //some other replica is the leader now. unset
+              ops.add(new Operation(Operation.Type.ADD, new State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
+              ops.add(new Operation(Operation.Type.DELETE, st));
+            }
+          }
+        }
+        if (log.isDebugEnabled()) {
+          log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
+        }
+        return ops;
+      }).init(rs);
     }
 
     /**
@@ -465,75 +454,61 @@
      * @param replica name of the replica to be deleted
      */
     public static WriteOps deleteReplica(String replica, PerReplicaStates rs) {
-      return new WriteOps() {
-        @Override
-        protected List<Operation> refresh(PerReplicaStates rs) {
-          List<Operation> result;
-          if (rs == null) {
-            result = Collections.emptyList();
-          } else {
-            State state = rs.get(replica);
-            result = addDeleteStaleNodes(new ArrayList<>(), state);
-          }
-          return result;
+      return new WriteOps(prs -> {
+        List<Operation> result;
+        if (rs == null) {
+          result = Collections.emptyList();
+        } else {
+          State state = rs.get(replica);
+          result = addDeleteStaleNodes(new ArrayList<>(), state);
         }
-      }.init(rs);
+        return result;
+      }).init(rs);
     }
 
     public static WriteOps addReplica(String replica, Replica.State state, boolean isLeader, PerReplicaStates rs) {
-      return new WriteOps() {
-        @Override
-        protected List<Operation> refresh(PerReplicaStates rs) {
-          return singletonList(new Operation(Operation.Type.ADD,
-              new State(replica, state, isLeader, 0)));
-        }
-      }.init(rs);
+      return new WriteOps(perReplicaStates -> singletonList(new Operation(Operation.Type.ADD,
+          new State(replica, state, isLeader, 0)))).init(rs);
     }
 
     /**
      * mark a bunch of replicas as DOWN
      */
     public static WriteOps downReplicas(List<String> replicas, PerReplicaStates rs) {
-      return new WriteOps() {
-        @Override
-        List<Operation> refresh(PerReplicaStates rs) {
-          List<Operation> ops = new ArrayList<>();
-          for (String replica : replicas) {
-            State r = rs.get(replica);
-            if (r != null) {
-              if (r.state == Replica.State.DOWN && !r.isLeader) continue;
-              ops.add(new Operation(Operation.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
-              addDeleteStaleNodes(ops, r);
-            } else {
-              ops.add(new Operation(Operation.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
-            }
+      return new WriteOps(prs -> {
+        List<Operation> operations = new ArrayList<>();
+        for (String replica : replicas) {
+          State r = rs.get(replica);
+          if (r != null) {
+            if (r.state == Replica.State.DOWN && !r.isLeader) continue;
+            operations.add(new Operation(Operation.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
+            addDeleteStaleNodes(operations, r);
+          } else {
+            operations.add(new Operation(Operation.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
           }
-          if (log.isDebugEnabled()) {
-            log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, ops);
-          }
-          return ops;
         }
-      }.init(rs);
+        if (log.isDebugEnabled()) {
+          log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, operations);
+        }
+        return operations;
+      }).init(rs);
     }
 
     /**
-     * Just creates and deletes a summy entry so that the {@link Stat#getCversion()} of states.json
+     * Just creates and deletes a dummy entry so that the {@link Stat#getCversion()} of states.json
      * is updated
      */
     public static WriteOps touchChildren() {
-      WriteOps result = new WriteOps() {
-        @Override
-        List<Operation> refresh(PerReplicaStates rs) {
-          List<Operation> ops = new ArrayList<>();
-          State st = new State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
-          ops.add(new Operation(Operation.Type.ADD, st));
-          ops.add(new Operation(Operation.Type.DELETE, st));
-          if (log.isDebugEnabled()) {
-            log.debug("touchChildren {}", ops);
-          }
-          return ops;
+      WriteOps result = new WriteOps(prs -> {
+        List<Operation> operations = new ArrayList<>();
+        State st = new State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
+        operations.add(new Operation(Operation.Type.ADD, st));
+        operations.add(new Operation(Operation.Type.DELETE, st));
+        if (log.isDebugEnabled()) {
+          log.debug("touchChildren {}", operations);
         }
-      };
+        return operations;
+      });
       result.preOp = false;
       result.ops = result.refresh(null);
       return result;
@@ -564,12 +539,17 @@
     }
 
     /**
+     * This method should compute the set of ZK operations for a given action
+     * for instance, a state change may result in
      * if a multi operation fails because the state got modified from behind,
      * refresh the operation and try again
      *
      * @param prs The new state
      */
-    abstract List<Operation> refresh(PerReplicaStates prs);
+    List<Operation> refresh(PerReplicaStates prs) {
+      if (fun != null) return fun.apply(prs);
+      return null;
+    }
 
     @Override
     public String toString() {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 31dac96..f805237 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1271,12 +1271,9 @@
         replicaStates = zkClient.getChildren(collectionPath, this, stat, true);
         PerReplicaStates newStates = new PerReplicaStates(collectionPath, stat.getCversion(), replicaStates);
         DocCollection oldState = watchedCollectionStates.get(coll);
-        DocCollection newState = null;
-        if (oldState != null) {
-          newState = oldState.copyWith(newStates);
-        } else {
-          newState = fetchCollectionState(coll, null);
-        }
+        final DocCollection newState = oldState != null ?
+                oldState.copyWith(newStates) :
+                fetchCollectionState(coll, null);
         updateWatchedCollection(coll, newState);
         synchronized (getUpdateLock()) {
           constructState(Collections.singleton(coll));