SOLR-13834: ZkController#getSolrCloudManager() now uses the same ZkStateReader instance instead of instantiating a new one

ZkController#getSolrCloudManager() created a new instance of ZkStateReader, thereby causing mismatch in the
visibility of the cluster state and, as a result, undesired race conditions.
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3418d17..7ffbe52 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -314,6 +314,9 @@
 
 * SOLR-13829: RecursiveEvaluator casts Continuous numbers to Discrete Numbers, causing mismatch (Trey Grainger, Joel Bernstein)
 
+* SOLR-13834: ZkController#getSolrCloudManager() created a new instance of ZkStateReader, thereby causing mismatch in the
+  visibility of the cluster state and, as a result, undesired race conditions (Clay Goddard via Ishan Chattopadhyaya)
+
 Other Changes
 ----------------------
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 403f282..3e9e738 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -40,7 +40,6 @@
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -59,6 +58,7 @@
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
 import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
+import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
 import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
 import org.apache.solr.cloud.overseer.OverseerAction;
@@ -746,7 +746,7 @@
       if (cloudManager != null) {
         return cloudManager;
       }
-      cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkServerAddress), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000)
+      cloudSolrClient = new CloudSolrClient.Builder(new ZkClientClusterStateProvider(zkStateReader)).withSocketTimeout(30000).withConnectionTimeout(15000)
           .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient())
           .withConnectionTimeout(15000).withSocketTimeout(30000).build();
       cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cloudSolrClient);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 08d27e5..bb1081d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -77,10 +77,13 @@
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
     }
 
-    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    //ZkStateReader zkStateReader = ocmh.zkStateReader;
     ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
     // wait for a while until we see the shard
-    ocmh.waitForNewShard(collectionName, sliceName);
+    //ocmh.waitForNewShard(collectionName, sliceName);
+    // wait for a while until we see the shard and update the local view of the cluster state
+    clusterState = ocmh.waitForNewShard(collectionName, sliceName);
+
     String async = message.getStr(ASYNC);
     ZkNodeProps addReplicasProps = new ZkNodeProps(
         COLLECTION_PROP, collectionName,
@@ -97,7 +100,8 @@
     if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
     final NamedList addResult = new NamedList();
     try {
-      ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> {
+      //ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> {
+      ocmh.addReplica(clusterState, addReplicasProps, addResult, () -> {
         Object addResultFailure = addResult.get("failure");
         if (addResultFailure != null) {
           SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 64b0ef9..54b7f5b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -553,12 +553,14 @@
     throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
   }
 
-  void waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
+  ClusterState waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
     log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
     RTimer timer = new RTimer();
     int retryCount = 320;
     while (retryCount-- > 0) {
-      DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+      ClusterState clusterState = zkStateReader.getClusterState();
+      DocCollection collection = clusterState.getCollection(collectionName);
+
       if (collection == null) {
         throw new SolrException(ErrorCode.SERVER_ERROR,
             "Unable to find collection: " + collectionName + " in clusterstate");
@@ -567,7 +569,7 @@
       if (slice != null) {
         log.debug("Waited for {}ms for slice {} of collection {} to be available",
             timer.getTime(), sliceName, collectionName);
-        return;
+        return clusterState;
       }
       Thread.sleep(1000);
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index da098af..32329ff 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -310,11 +310,8 @@
 
         ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
 
-        // wait until we are able to see the new shard in cluster state
-        ocmh.waitForNewShard(collectionName, subSlice);
-
-        // refresh cluster state
-        clusterState = zkStateReader.getClusterState();
+        // wait until we are able to see the new shard in cluster state and refresh the local view of the cluster state
+        clusterState = ocmh.waitForNewShard(collectionName, subSlice);
 
         log.debug("Adding first replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
             + " on " + nodeName);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 0b08780..24748ca 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -253,7 +253,14 @@
     public Builder(List<String> solrUrls) {
       this.solrUrls = solrUrls;
     }
-    
+
+    /**
+     * Provide an already created {@link ClusterStateProvider} instance
+     */
+    public Builder(ClusterStateProvider stateProvider) {
+      this.stateProvider = stateProvider;
+    }
+
     /**
      * Provide a series of ZK hosts which will be used when configuring {@link CloudSolrClient} instances.
      *