SOLR-14354: Revert changes for 8.x
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3bfa47d..869f814 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -194,8 +194,6 @@
 Optimizations
 ---------------------
 
-* SOLR-14354: HttpShardHandler send requests in async (Cao Manh Dat).
-
 * SOLR-14610: ReflectMapWriter to use MethodHandle instead of old reflection (noble)
 
 * SOLR-13132: JSON Facet perf improvements to support "sweeping" collection of "relatedness()"
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index f4e4b1e..90c4e5d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -582,7 +582,7 @@
 
     ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
 
-    OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory());
+    OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory(), updateShardHandler.getDefaultHttpClient());
     overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
     ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
     ccThread.setDaemon(true);
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
index d532f03..20e650a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
@@ -20,6 +20,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.http.client.HttpClient;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -28,6 +29,7 @@
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.apache.solr.handler.component.ShardRequest;
@@ -51,11 +53,14 @@
 
   private ZkDistributedQueue stateUpdateQueue;
 
-  public OverseerNodePrioritizer(ZkStateReader zkStateReader, ZkDistributedQueue stateUpdateQueue, String adminPath, ShardHandlerFactory shardHandlerFactory) {
+  private HttpClient httpClient;
+
+  public OverseerNodePrioritizer(ZkStateReader zkStateReader, ZkDistributedQueue stateUpdateQueue, String adminPath, ShardHandlerFactory shardHandlerFactory, HttpClient httpClient) {
     this.zkStateReader = zkStateReader;
     this.adminPath = adminPath;
     this.shardHandlerFactory = shardHandlerFactory;
     this.stateUpdateQueue = stateUpdateQueue;
+    this.httpClient = httpClient;
   }
 
   public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception {
@@ -103,7 +108,7 @@
 
   private void invokeOverseerOp(String electionNode, String op) {
     ModifiableSolrParams params = new ModifiableSolrParams();
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = ((HttpShardHandlerFactory)shardHandlerFactory).getShardHandler(httpClient);
     params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString());
     params.set("op", op);
     params.set("qt", adminPath);
diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
index c73e57b..5a1b8da 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
@@ -35,6 +35,7 @@
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.handler.component.ShardResponse;
@@ -69,7 +70,7 @@
   public SyncStrategy(CoreContainer cc) {
     UpdateShardHandler updateShardHandler = cc.getUpdateShardHandler();
     client = updateShardHandler.getDefaultHttpClient();
-    shardHandler = cc.getShardHandlerFactory().getShardHandler();
+    shardHandler = ((HttpShardHandlerFactory)cc.getShardHandlerFactory()).getShardHandler(cc.getUpdateShardHandler().getDefaultHttpClient());
     updateExecutor = updateShardHandler.getUpdateExecutor();
   }
   
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index b673e3f..e873669 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -170,7 +170,7 @@
     String backupName = request.getStr(NAME);
     String asyncId = request.getStr(ASYNC);
     String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
-    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
 
     String commitName = request.getStr(CoreAdminParams.COMMIT_NAME);
     Optional<CollectionSnapshotMetaData> snapshotMeta = Optional.empty();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 702efaa..2d3e450 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -209,7 +209,7 @@
             collectionName, shardNames, message));
       }
       Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
-      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
       for (ReplicaPosition replicaPosition : replicaPositions) {
         String nodeName = replicaPosition.node;
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
index e3d8ab5..a110952 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
@@ -96,7 +96,7 @@
     @SuppressWarnings({"rawtypes"})
     NamedList shardRequestResults = new NamedList();
     Map<String, Slice> shardByCoreName = new HashMap<>();
-    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
 
     final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
     for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index ff168c4..c263203 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -232,7 +232,7 @@
               " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
     }
 
-    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
     String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
     String asyncId = message.getStr(ASYNC);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
index 9e4388b..2f62139 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
@@ -77,7 +77,7 @@
     String asyncId = message.getStr(ASYNC);
     @SuppressWarnings({"rawtypes"})
     NamedList shardRequestResults = new NamedList();
-    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
     SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
 
     Optional<CollectionSnapshotMetaData> meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
index 85bac4b..c41cb7f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -43,6 +43,7 @@
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.apache.solr.update.SolrIndexSplitter;
@@ -160,7 +161,7 @@
     DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
 
     ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = ((HttpShardHandlerFactory)shardHandlerFactory).getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
 
     log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
     // intersect source range, keyHashRange and target range
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 720479a..94f1312 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -339,7 +339,7 @@
     sreq.shards = new String[] {baseUrl};
     sreq.actualShards = sreq.shards;
     sreq.params = params;
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
     shardHandler.submit(sreq, baseUrl, sreq.params);
   }
 
@@ -763,7 +763,7 @@
     log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
     String collectionName = message.getStr(NAME);
     @SuppressWarnings("deprecation")
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
 
     ClusterState clusterState = zkStateReader.getClusterState();
     DocCollection coll = clusterState.getCollection(collectionName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index 2f31ead7..f314ebb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -94,7 +94,7 @@
 
     String restoreCollectionName = message.getStr(COLLECTION_PROP);
     String backupName = message.getStr(NAME); // of backup
-    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
     String asyncId = message.getStr(ASYNC);
     String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index ecf9fed..c616051 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -211,7 +211,7 @@
       List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
 
       @SuppressWarnings("deprecation")
-      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
 
 
       if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, false)) {
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index b6a8131..885a272 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -16,41 +16,30 @@
  */
 package org.apache.solr.handler.component;
 
-import java.util.HashMap;
+import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.opentracing.Span;
-import io.opentracing.Tracer;
-import io.opentracing.propagation.Format;
+import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
-import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
-import org.apache.solr.client.solrj.util.Cancellable;
-import org.apache.solr.client.solrj.util.AsyncListener;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.annotation.SolrThreadUnsafe;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestInfo;
-import org.apache.solr.util.tracing.GlobalTracer;
-import org.apache.solr.util.tracing.SolrRequestCarrier;
 
 @SolrThreadUnsafe
 public class HttpShardHandler extends ShardHandler {
@@ -62,130 +51,33 @@
    */
   public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
 
-  private HttpShardHandlerFactory httpShardHandlerFactory;
-  private Map<ShardResponse, Cancellable> responseCancellableMap;
-  private BlockingQueue<ShardResponse> responses;
-  private AtomicInteger pending;
-  private Map<String, List<String>> shardToURLs;
-  private LBHttp2SolrClient lbClient;
+  final HttpShardHandlerFactory httpShardHandlerFactory;
+  private CompletionService<ShardResponse> completionService;
+  private Set<Future<ShardResponse>> pending;
+  private Http2SolrClient httpClient;
 
-  public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
+  public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, Http2SolrClient httpClient) {
+    this.httpClient = httpClient;
     this.httpShardHandlerFactory = httpShardHandlerFactory;
-    this.lbClient = httpShardHandlerFactory.loadbalancer;
-    this.pending = new AtomicInteger(0);
-    this.responses = new LinkedBlockingQueue<>();
-    this.responseCancellableMap = new HashMap<>();
-
-    // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
-    // This is primarily to keep track of what order we should use to query the replicas of a shard
-    // so that we use the same replica for all phases of a distributed request.
-    shardToURLs = new HashMap<>();
+    completionService = httpShardHandlerFactory.newCompletionService();
+    pending = new HashSet<>();
   }
 
 
-  private static class SimpleSolrResponse extends SolrResponse {
-
-    volatile long elapsedTime;
-
-    volatile NamedList<Object> nl;
-
-    @Override
-    public long getElapsedTime() {
-      return elapsedTime;
-    }
-
-    @Override
-    public NamedList<Object> getResponse() {
-      return nl;
-    }
-
-    @Override
-    public void setResponse(NamedList<Object> rsp) {
-      nl = rsp;
-    }
-
-    @Override
-    public void setElapsedTime(long elapsedTime) {
-      this.elapsedTime = elapsedTime;
-    }
-  }
-
-  // Not thread safe... don't use in Callable.
-  // Don't modify the returned URL list.
-  private List<String> getURLs(String shard) {
-    List<String> urls = shardToURLs.get(shard);
-    if (urls == null) {
-      urls = httpShardHandlerFactory.buildURLList(shard);
-      shardToURLs.put(shard, urls);
-    }
-    return urls;
-  }
-
   @Override
   public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
-    // do this outside of the callable for thread safety reasons
-    final List<String> urls = getURLs(shard);
-    final Tracer tracer = GlobalTracer.getTracer();
-    final Span span = tracer != null ? tracer.activeSpan() : null;
-
-    params.remove(CommonParams.WT); // use default (currently javabin)
-    params.remove(CommonParams.VERSION);
-    QueryRequest req = makeQueryRequest(sreq, params, shard);
-    req.setMethod(SolrRequest.METHOD.POST);
-
-    LBSolrClient.Req lbReq = httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls);
-
-    ShardResponse srsp = new ShardResponse();
-    if (sreq.nodeName != null) {
-      srsp.setNodeName(sreq.nodeName);
+    ShardRequestor shardRequestor = new ShardRequestor(sreq, shard, params, this);
+    try {
+      shardRequestor.init();
+      pending.add(completionService.submit(shardRequestor));
+    } finally {
+      shardRequestor.end();
     }
-    srsp.setShardRequest(sreq);
-    srsp.setShard(shard);
-    SimpleSolrResponse ssr = new SimpleSolrResponse();
-    srsp.setSolrResponse(ssr);
+  }
 
-    pending.incrementAndGet();
-    // if there are no shards available for a slice, urls.size()==0
-    if (urls.size() == 0) {
-      // TODO: what's the right error code here? We should use the same thing when
-      // all of the servers for a shard are down.
-      SolrException exception = new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
-      srsp.setException(exception);
-      srsp.setResponseCode(exception.code());
-      responses.add(srsp);
-      return;
-    }
-
-    // all variables that set inside this listener must be at least volatile
-    responseCancellableMap.put(srsp, this.lbClient.asyncReq(lbReq, new AsyncListener<>() {
-      volatile long startTime = System.nanoTime();
-
-      @Override
-      public void onStart() {
-        if (tracer != null && span != null) {
-          tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
-        }
-        SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
-        if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
-      }
-
-      @Override
-      public void onSuccess(LBSolrClient.Rsp rsp) {
-        ssr.nl = rsp.getResponse();
-        srsp.setShardAddress(rsp.getServer());
-        ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-        responses.add(srsp);
-      }
-
-      public void onFailure(Throwable throwable) {
-        ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-        srsp.setException(throwable);
-        if (throwable instanceof SolrException) {
-          srsp.setResponseCode(((SolrException) throwable).code());
-        }
-        responses.add(srsp);
-      }
-    }));
+  protected NamedList<Object> request(String url, @SuppressWarnings({"rawtypes"})SolrRequest req) throws IOException, SolrServerException {
+    req.setBasePath(url);
+    return httpClient.request(req);
   }
 
   /**
@@ -223,12 +115,12 @@
   }
 
   private ShardResponse take(boolean bailOnError) {
-    try {
-      while (pending.get() > 0) {
-        ShardResponse rsp = responses.take();
-        responseCancellableMap.remove(rsp);
 
-        pending.decrementAndGet();
+    while (pending.size() > 0) {
+      try {
+        Future<ShardResponse> future = completionService.take();
+        pending.remove(future);
+        ShardResponse rsp = future.get();
         if (bailOnError && rsp.getException() != null) return rsp; // if exception, return immediately
         // add response to the response list... we do this after the take() and
         // not after the completion of "call" so we know when the last response
@@ -238,9 +130,13 @@
         if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
           return rsp;
         }
+      } catch (InterruptedException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      } catch (ExecutionException e) {
+        // should be impossible... the problem with catching the exception
+        // at this level is we don't know what ShardRequest it applied to
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception", e);
       }
-    } catch (InterruptedException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
     }
     return null;
   }
@@ -248,11 +144,9 @@
 
   @Override
   public void cancelAll() {
-    for (Cancellable cancellable : responseCancellableMap.values()) {
-      cancellable.cancel();
-      pending.decrementAndGet();
+    for (Future<ShardResponse> future : pending) {
+      future.cancel(false);
     }
-    responseCancellableMap.clear();
   }
 
   @Override
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index f97700a..9f290f4 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.handler.component;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -27,6 +28,8 @@
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
@@ -34,8 +37,13 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.LBSolrClient;
 import org.apache.solr.client.solrj.routing.AffinityReplicaListTransformerFactory;
@@ -87,7 +95,7 @@
 
   protected volatile Http2SolrClient defaultClient;
   protected InstrumentedHttpListenerFactory httpListenerFactory;
-  protected LBHttp2SolrClient loadbalancer;
+  private LBHttp2SolrClient loadbalancer;
 
   int corePoolSize = 0;
   int maximumPoolSize = Integer.MAX_VALUE;
@@ -141,7 +149,27 @@
    */
   @Override
   public ShardHandler getShardHandler() {
-    return new HttpShardHandler(this);
+    return getShardHandler(defaultClient);
+  }
+
+  /**
+   * Get {@link ShardHandler} that uses custom http client.
+   */
+  public ShardHandler getShardHandler(final Http2SolrClient httpClient){
+    return new HttpShardHandler(this, httpClient);
+  }
+
+  @Deprecated
+  public ShardHandler getShardHandler(final HttpClient httpClient) {
+    // a little hack for backward-compatibility when we are moving from apache http client to jetty client
+    return new HttpShardHandler(this, null) {
+      @Override
+      protected NamedList<Object> request(String url, @SuppressWarnings({"rawtypes"})SolrRequest req) throws IOException, SolrServerException {
+        try (SolrClient client = new HttpSolrClient.Builder(url).withHttpClient(httpClient).build()) {
+          return client.request(req);
+        }
+      }
+    };
   }
 
   /**
@@ -288,7 +316,6 @@
     this.defaultClient = new Http2SolrClient.Builder()
         .connectionTimeout(connectionTimeout)
         .idleTimeout(soTimeout)
-        .withExecutor(commExecutor)
         .maxConnectionsPerHost(maxConnectionsPerHost).build();
     this.defaultClient.addListenerFactory(this.httpListenerFactory);
     this.loadbalancer = new LBHttp2SolrClient(defaultClient);
@@ -318,20 +345,32 @@
   @Override
   public void close() {
     try {
-      if (loadbalancer != null) {
-        loadbalancer.close();
-      }
+      ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
     } finally {
       try {
+        if (loadbalancer != null) {
+          loadbalancer.close();
+        }
+      } finally {
         if (defaultClient != null) {
           IOUtils.closeQuietly(defaultClient);
         }
-      } finally {
-        ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
       }
     }
   }
 
+  /**
+   * Makes a request to one or more of the given urls, using the configured load balancer.
+   *
+   * @param req The solr search request that should be sent through the load balancer
+   * @param urls The list of solr server urls to load balance across
+   * @return The response from the request
+   */
+  public LBSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List<String> urls)
+    throws SolrServerException, IOException {
+    return loadbalancer.request(newLBHttpSolrClientReq(req, urls));
+  }
+
   protected LBSolrClient.Req newLBHttpSolrClientReq(final QueryRequest req, List<String> urls) {
     int numServersToTry = (int)Math.floor(urls.size() * this.permittedLoadBalancerRequestsMaximumFraction);
     if (numServersToTry < this.permittedLoadBalancerRequestsMinimumAbsolute) {
@@ -378,6 +417,13 @@
   }
 
   /**
+   * Creates a new completion service for use by a single set of distributed requests.
+   */
+  public CompletionService<ShardResponse> newCompletionService() {
+    return new ExecutorCompletionService<>(commExecutor);
+  }
+
+  /**
    * Rebuilds the URL replacing the URL scheme of the passed URL with the
    * configured scheme replacement.If no scheme was configured, the passed URL's
    * scheme is left alone.
diff --git a/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java b/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java
new file mode 100644
index 0000000..c87f126
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import io.opentracing.Span;
+import io.opentracing.Tracer;
+import io.opentracing.propagation.Format;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.request.SolrRequestInfo;
+import org.apache.solr.util.tracing.GlobalTracer;
+import org.apache.solr.util.tracing.SolrRequestCarrier;
+import org.slf4j.MDC;
+
+class ShardRequestor implements Callable<ShardResponse> {
+  private final ShardRequest sreq;
+  private final String shard;
+  private final ModifiableSolrParams params;
+  private final Tracer tracer;
+  private final Span span;
+  private final List<String> urls;
+  private final HttpShardHandler httpShardHandler;
+
+  // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
+  // This is primarily to keep track of what order we should use to query the replicas of a shard
+  // so that we use the same replica for all phases of a distributed request.
+  private Map<String, List<String>> shardToURLs = new HashMap<>();
+
+  public ShardRequestor(ShardRequest sreq, String shard, ModifiableSolrParams params, HttpShardHandler httpShardHandler) {
+    this.sreq = sreq;
+    this.shard = shard;
+    this.params = params;
+    this.httpShardHandler = httpShardHandler;
+    // do this before call() for thread safety reasons
+    this.urls = getURLs(shard);
+    tracer = GlobalTracer.getTracer();
+    span = tracer != null ? tracer.activeSpan() : null;
+  }
+
+
+  // Not thread safe... don't use in Callable.
+  // Don't modify the returned URL list.
+  private List<String> getURLs(String shard) {
+    List<String> urls = shardToURLs.get(shard);
+    if (urls == null) {
+      urls = httpShardHandler.httpShardHandlerFactory.buildURLList(shard);
+      shardToURLs.put(shard, urls);
+    }
+    return urls;
+  }
+
+  void init() {
+    if (shard != null) {
+      MDC.put("ShardRequest.shards", shard);
+    }
+    if (urls != null && !urls.isEmpty()) {
+      MDC.put("ShardRequest.urlList", urls.toString());
+    }
+  }
+
+  void end() {
+    MDC.remove("ShardRequest.shards");
+    MDC.remove("ShardRequest.urlList");
+  }
+
+  @Override
+  public ShardResponse call() throws Exception {
+
+    ShardResponse srsp = new ShardResponse();
+    if (sreq.nodeName != null) {
+      srsp.setNodeName(sreq.nodeName);
+    }
+    srsp.setShardRequest(sreq);
+    srsp.setShard(shard);
+    SimpleSolrResponse ssr = new SimpleSolrResponse();
+    srsp.setSolrResponse(ssr);
+    long startTime = System.nanoTime();
+
+    try {
+      params.remove(CommonParams.WT); // use default (currently javabin)
+      params.remove(CommonParams.VERSION);
+
+      QueryRequest req = httpShardHandler.makeQueryRequest(sreq, params, shard);
+      if (tracer != null && span != null) {
+        tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
+      }
+      req.setMethod(SolrRequest.METHOD.POST);
+      SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
+      if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
+
+      // no need to set the response parser as binary is the defaultJab
+      // req.setResponseParser(new BinaryResponseParser());
+
+      // if there are no shards available for a slice, urls.size()==0
+      if (urls.size() == 0) {
+        // TODO: what's the right error code here? We should use the same thing when
+        // all of the servers for a shard are down.
+        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
+      }
+
+      if (urls.size() <= 1) {
+        String url = urls.get(0);
+        srsp.setShardAddress(url);
+        ssr.nl = httpShardHandler.request(url, req);
+      } else {
+        LBSolrClient.Rsp rsp = httpShardHandler.httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
+        ssr.nl = rsp.getResponse();
+        srsp.setShardAddress(rsp.getServer());
+      }
+    } catch (ConnectException cex) {
+      srsp.setException(cex); //????
+    } catch (Exception th) {
+      srsp.setException(th);
+      if (th instanceof SolrException) {
+        srsp.setResponseCode(((SolrException) th).code());
+      } else {
+        srsp.setResponseCode(-1);
+      }
+    }
+
+    ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+    return httpShardHandler.transfomResponse(sreq, srsp, shard);
+  }
+
+  static class SimpleSolrResponse extends SolrResponse {
+
+    long elapsedTime;
+
+    NamedList<Object> nl;
+
+    @Override
+    public long getElapsedTime() {
+      return elapsedTime;
+    }
+
+    @Override
+    public NamedList<Object> getResponse() {
+      return nl;
+    }
+
+    @Override
+    public void setResponse(NamedList<Object> rsp) {
+      nl = rsp;
+    }
+
+    @Override
+    public void setElapsedTime(long elapsedTime) {
+      this.elapsedTime = elapsedTime;
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java b/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java
index 9b4a66e..5da721c 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java
@@ -22,9 +22,9 @@
   private ShardRequest req;
   private String shard;
   private String nodeName;
-  private volatile String shardAddress;  // the specific shard that this response was received from
+  private String shardAddress;  // the specific shard that this response was received from
   private int rspCode;
-  private volatile Throwable exception;
+  private Throwable exception;
   private SolrResponse rsp;
 
   @Override
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index 6fb6626..f4ff08d 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -29,6 +29,7 @@
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Timer;
 import org.apache.http.NoHttpResponseException;
+import org.apache.http.client.HttpClient;
 import org.apache.http.conn.ConnectTimeoutException;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -39,8 +40,8 @@
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrInfoBean;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
 import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.handler.component.ShardResponse;
 import org.apache.solr.metrics.SolrMetricManager;
@@ -73,12 +74,13 @@
 
   private UpdateHandler uhandler;
   private UpdateLog ulog;
-  private ShardHandlerFactory shardHandlerFactory;
+  private HttpShardHandlerFactory shardHandlerFactory;
   private ShardHandler shardHandler;
   private List<SyncShardRequest> requests = new ArrayList<>();
 
   private final boolean cantReachIsSuccess;
   private final boolean doFingerprint;
+  private final HttpClient client;
   private final boolean onlyIfActive;
   private SolrCore core;
   private Updater updater;
@@ -110,13 +112,14 @@
     this.nUpdates = nUpdates;
     this.cantReachIsSuccess = cantReachIsSuccess;
     this.doFingerprint = doFingerprint && !("true".equals(System.getProperty("solr.disableFingerprint")));
+    this.client = core.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
     this.onlyIfActive = onlyIfActive;
     
     uhandler = core.getUpdateHandler();
     ulog = uhandler.getUpdateLog();
     // TODO: close
-    shardHandlerFactory = core.getCoreContainer().getShardHandlerFactory();
-    shardHandler = shardHandlerFactory.getShardHandler();
+    shardHandlerFactory = (HttpShardHandlerFactory) core.getCoreContainer().getShardHandlerFactory();
+    shardHandler = shardHandlerFactory.getShardHandler(client);
     this.updater = new Updater(msg(), core);
 
     core.getCoreMetricManager().registerMetricProducer(SolrInfoBean.Category.REPLICATION.toString(), this);
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 84dce51..fa2ed4e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -40,6 +40,7 @@
 import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.cloud.Overseer.LeaderStatus;
 import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
@@ -150,7 +151,7 @@
         Overseer overseer,
         DistributedMap completedMap,
         DistributedMap failureMap) {
-      super(zkStateReader, myId, shardHandlerFactory, adminPath, new Stats(), overseer, new OverseerNodePrioritizer(zkStateReader, overseer.getStateUpdateQueue(), adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap);
+      super(zkStateReader, myId, shardHandlerFactory, adminPath, new Stats(), overseer, new OverseerNodePrioritizer(zkStateReader, overseer.getStateUpdateQueue(), adminPath, shardHandlerFactory, null), workQueue, runningMap, completedMap, failureMap);
     }
     
     @Override
@@ -252,6 +253,8 @@
   
   protected Set<String> commonMocks(int liveNodesCount) throws Exception {
     when(shardHandlerFactoryMock.getShardHandler()).thenReturn(shardHandlerMock);
+    when(shardHandlerFactoryMock.getShardHandler(any(Http2SolrClient.class))).thenReturn(shardHandlerMock);
+    when(shardHandlerFactoryMock.getShardHandler(any(HttpClient.class))).thenReturn(shardHandlerMock);
     when(workQueueMock.peekTopN(anyInt(), any(), anyLong())).thenAnswer(invocation -> {
       Object result;
       int count = 0;
diff --git a/solr/core/src/test/org/apache/solr/security/hadoop/TestSolrCloudWithHadoopAuthPlugin.java b/solr/core/src/test/org/apache/solr/security/hadoop/TestSolrCloudWithHadoopAuthPlugin.java
index bfa3690..3043394 100644
--- a/solr/core/src/test/org/apache/solr/security/hadoop/TestSolrCloudWithHadoopAuthPlugin.java
+++ b/solr/core/src/test/org/apache/solr/security/hadoop/TestSolrCloudWithHadoopAuthPlugin.java
@@ -135,7 +135,7 @@
     CollectionAdminRequest.Delete deleteReq = CollectionAdminRequest.deleteCollection(collectionName);
     deleteReq.process(solrClient);
     AbstractDistribZkTestBase.waitForCollectionToDisappear(collectionName,
-        solrClient.getZkStateReader(), true,true, 330);
-    // cookie was used to avoid re-authentication
-    assertAuthMetricsMinimums(13, 8, 0, 5, 0, 0);  }
+        solrClient.getZkStateReader(), true, true, 330);
+    assertAuthMetricsMinimums(14, 8, 0, 6, 0, 0);
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
index ab6cf83..0046c12 100644
--- a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
+++ b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
@@ -28,8 +28,6 @@
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.util.Cancellable;
-import org.apache.solr.client.solrj.util.AsyncListener;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.NamedList;
 
@@ -122,21 +120,21 @@
     return super.request(request, collection);
   }
 
-  @Override
-  public Cancellable asyncRequest(@SuppressWarnings({"rawtypes"}) SolrRequest request,
-                                  String collection, AsyncListener<NamedList<Object>> asyncListener) {
+  public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest request,
+                                   String collection, OnComplete onComplete)
+      throws SolrServerException, IOException {
     if (request instanceof UpdateRequest) {
       UpdateRequest ur = (UpdateRequest) request;
       // won't throw exception if request is DBQ
       if (ur.getDeleteQuery() != null && !ur.getDeleteQuery().isEmpty()) {
-        return super.asyncRequest(request, collection, asyncListener);
+        return super.request(request, collection, onComplete);
       }
     }
 
     if (exp != null) {
       if (oneExpPerReq) {
         if (reqGotException.contains(request)) {
-          return super.asyncRequest(request, collection, asyncListener);
+          return super.request(request, collection, onComplete);
         }
         else
           reqGotException.add(request);
@@ -145,12 +143,17 @@
       Exception e = exception();
       if (e instanceof IOException) {
         if (LuceneTestCase.random().nextBoolean()) {
-          e = new SolrServerException(e);
+          throw (IOException) e;
+        } else {
+          throw new SolrServerException(e);
         }
+      } else if (e instanceof SolrServerException) {
+        throw (SolrServerException) e;
+      } else {
+        throw new SolrServerException(e);
       }
-      asyncListener.onFailure(e);
     }
 
-    return super.asyncRequest(request, collection, asyncListener);
+    return super.request(request, collection, onComplete);
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index f9d546e..6f09278 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -40,6 +40,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -55,10 +56,8 @@
 import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.request.V2Request;
-import org.apache.solr.client.solrj.util.Cancellable;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.client.solrj.util.Constants;
-import org.apache.solr.client.solrj.util.AsyncListener;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.StringUtils;
 import org.apache.solr.common.params.CommonParams;
@@ -77,7 +76,9 @@
 import org.eclipse.jetty.client.ProtocolHandlers;
 import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
 import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
+import org.eclipse.jetty.client.util.BufferingResponseListener;
 import org.eclipse.jetty.client.util.BytesContentProvider;
 import org.eclipse.jetty.client.util.FormContentProvider;
 import org.eclipse.jetty.client.util.InputStreamContentProvider;
@@ -97,8 +98,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException;
 import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteExecutionException;
+import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException;
 import static org.apache.solr.common.util.Utils.getObjectByPath;
 
 /**
@@ -135,8 +136,6 @@
    */
   private String serverBaseUrl;
   private boolean closeClient;
-  private ExecutorService executor;
-  private boolean shutdownExecutor;
 
   protected Http2SolrClient(String serverBaseUrl, Builder builder) {
     if (serverBaseUrl != null)  {
@@ -180,14 +179,8 @@
     HttpClient httpClient;
 
     BlockingArrayQueue<Runnable> queue = new BlockingArrayQueue<>(256, 256);
-    executor = builder.executor;
-    if (executor == null) {
-      this.executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(32,
-          256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc"));
-      shutdownExecutor = true;
-    } else {
-      shutdownExecutor = false;
-    }
+    ThreadPoolExecutor httpClientExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(32,
+        256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc"));
 
     SslContextFactory.Client sslContextFactory;
     boolean ssl;
@@ -218,7 +211,7 @@
       httpClient.setMaxConnectionsPerDestination(4);
     }
 
-    httpClient.setExecutor(this.executor);
+    httpClient.setExecutor(httpClientExecutor);
     httpClient.setStrictEventOrdering(false);
     httpClient.setConnectBlocking(true);
     httpClient.setFollowRedirects(false);
@@ -240,15 +233,14 @@
     asyncTracker.waitForComplete();
     if (closeClient) {
       try {
+        ExecutorService executor = (ExecutorService) httpClient.getExecutor();
         httpClient.setStopTimeout(1000);
         httpClient.stop();
+        ExecutorUtil.shutdownAndAwaitTermination(executor);
       } catch (Exception e) {
         throw new RuntimeException("Exception on closing client", e);
       }
     }
-    if (shutdownExecutor) {
-      ExecutorUtil.shutdownAndAwaitTermination(executor);
-    }
 
     assert ObjectReleaseTracker.release(this);
   }
@@ -370,100 +362,76 @@
     outStream.flush();
   }
 
-  private static final Exception CANCELLED_EXCEPTION = new Exception();
-  private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () -> {};
-
-  public Cancellable asyncRequest(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection, AsyncListener<NamedList<Object>> asyncListener) {
-    Request req;
-    try {
-      req = makeRequest(solrRequest, collection);
-    } catch (SolrServerException | IOException e) {
-      asyncListener.onFailure(e);
-      return FAILED_MAKING_REQUEST_CANCELLABLE;
-    }
-    final ResponseParser parser = solrRequest.getResponseParser() == null
-        ? this.parser: solrRequest.getResponseParser();
-    req.onRequestQueued(asyncTracker.queuedListener)
-        .onComplete(asyncTracker.completeListener)
-        .send(new InputStreamResponseListener() {
-          @Override
-          public void onHeaders(Response response) {
-            super.onHeaders(response);
-            InputStreamResponseListener listener = this;
-            executor.execute(() -> {
-              InputStream is = listener.getInputStream();
-              assert ObjectReleaseTracker.track(is);
-              try {
-                NamedList<Object> body = processErrorsAndResponse(solrRequest, parser, response, is);
-                asyncListener.onSuccess(body);
-              } catch (RemoteSolrException e) {
-                if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
-                  asyncListener.onFailure(e);
-                }
-              } catch (SolrServerException e) {
-                asyncListener.onFailure(e);
-              }
-            });
-          }
-
-          @Override
-          public void onFailure(Response response, Throwable failure) {
-            super.onFailure(response, failure);
-            if (failure != CANCELLED_EXCEPTION) {
-              asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
-            }
-          }
-        });
-    return () -> req.abort(CANCELLED_EXCEPTION);
-  }
-
-  @Override
-  public NamedList<Object> request(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection) throws SolrServerException, IOException {
+  public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest,
+                                      String collection,
+                                      OnComplete onComplete) throws IOException, SolrServerException {
     Request req = makeRequest(solrRequest, collection);
     final ResponseParser parser = solrRequest.getResponseParser() == null
         ? this.parser: solrRequest.getResponseParser();
 
-    try {
-      InputStreamResponseListener listener = new InputStreamResponseListener();
-      req.send(listener);
-      Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
-      InputStream is = listener.getInputStream();
-      assert ObjectReleaseTracker.track(is);
+    if (onComplete != null) {
+      // This async call only suitable for indexing since the response size is limited by 5MB
+      req.onRequestQueued(asyncTracker.queuedListener)
+          .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) {
 
-      return processErrorsAndResponse(solrRequest, parser, response, is);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    } catch (TimeoutException e) {
-      throw new SolrServerException(
-          "Timeout occured while waiting response from server at: " + req.getURI(), e);
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof ConnectException) {
-        throw new SolrServerException("Server refused connection at: " + req.getURI(), cause);
-      }
-      if (cause instanceof SolrServerException) {
-        throw (SolrServerException) cause;
-      } else if (cause instanceof IOException) {
+        @Override
+        public void onComplete(Result result) {
+          if (result.isFailed()) {
+            onComplete.onFailure(result.getFailure());
+            return;
+          }
+
+          NamedList<Object> rsp;
+          try {
+            InputStream is = getContentAsInputStream();
+            assert ObjectReleaseTracker.track(is);
+            rsp = processErrorsAndResponse(result.getResponse(),
+                parser, is, getMediaType(), getEncoding(), isV2ApiRequest(solrRequest));
+            onComplete.onSuccess(rsp);
+          } catch (Exception e) {
+            onComplete.onFailure(e);
+          }
+        }
+      });
+      return null;
+    } else {
+      try {
+        InputStreamResponseListener listener = new InputStreamResponseListener();
+        req.send(listener);
+        Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
+        InputStream is = listener.getInputStream();
+        assert ObjectReleaseTracker.track(is);
+
+        ContentType contentType = getContentType(response);
+        String mimeType = null;
+        String encoding = null;
+        if (contentType != null) {
+          mimeType = contentType.getMimeType();
+          encoding = contentType.getCharset() != null? contentType.getCharset().name() : null;
+        }
+        return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (TimeoutException e) {
         throw new SolrServerException(
-            "IOException occured when talking to server at: " + getBaseURL(), cause);
+            "Timeout occured while waiting response from server at: " + req.getURI(), e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof ConnectException) {
+          throw new SolrServerException("Server refused connection at: " + req.getURI(), cause);
+        }
+        if (cause instanceof SolrServerException) {
+          throw (SolrServerException) cause;
+        } else if (cause instanceof IOException) {
+          throw new SolrServerException(
+              "IOException occured when talking to server at: " + getBaseURL(), cause);
+        }
+        throw new SolrServerException(cause.getMessage(), cause);
       }
-      throw new SolrServerException(cause.getMessage(), cause);
     }
   }
 
-  private NamedList<Object> processErrorsAndResponse(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest,
-                                                     ResponseParser parser, Response response, InputStream is) throws SolrServerException {
-    ContentType contentType = getContentType(response);
-    String mimeType = null;
-    String encoding = null;
-    if (contentType != null) {
-      mimeType = contentType.getMimeType();
-      encoding = contentType.getCharset() != null? contentType.getCharset().name() : null;
-    }
-    return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest));
-  }
-
   private ContentType getContentType(Response response) {
     String contentType = response.getHeaders().get(HttpHeader.CONTENT_TYPE);
     return StringUtils.isEmpty(contentType)? null : ContentType.parse(contentType);
@@ -486,7 +454,6 @@
 
   private void decorateRequest(Request req, @SuppressWarnings({"rawtypes"})SolrRequest solrRequest) {
     req.header(HttpHeader.ACCEPT_ENCODING, null);
-    req.timeout(idleTimeout, TimeUnit.MILLISECONDS);
     if (solrRequest.getUserPrincipal() != null) {
       req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
     }
@@ -789,10 +756,21 @@
     }
   }
 
+  @Override
+  public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest request, String collection) throws SolrServerException, IOException {
+    return request(request, collection, null);
+  }
+
   public void setRequestWriter(RequestWriter requestWriter) {
     this.requestWriter = requestWriter;
   }
 
+  public interface OnComplete {
+    void onSuccess(NamedList<Object> result);
+
+    void onFailure(Throwable e);
+  }
+
   public void setFollowRedirects(boolean follow) {
     httpClient.setFollowRedirects(follow);
   }
@@ -849,7 +827,6 @@
     private Integer maxConnectionsPerHost;
     private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
     protected String baseSolrUrl;
-    private ExecutorService executor;
 
     public Builder() {
 
@@ -871,11 +848,6 @@
       return this;
     }
 
-    public Builder withExecutor(ExecutorService executor) {
-      this.executor = executor;
-      return this;
-    }
-
     public Builder withSSLConfig(SSLConfig sslConfig) {
       this.sslConfig = sslConfig;
       return this;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
index 8898092..ae7871a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
@@ -16,24 +16,9 @@
  */
 package org.apache.solr.client.solrj.impl;
 
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
 import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.IsUpdateRequest;
-import org.apache.solr.client.solrj.util.Cancellable;
-import org.apache.solr.client.solrj.util.AsyncListener;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.util.NamedList;
-import org.slf4j.MDC;
-
-import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
 
 /**
  * LBHttp2SolrClient or "LoadBalanced LBHttp2SolrClient" is a load balancing wrapper around
@@ -81,126 +66,4 @@
   protected SolrClient getClient(String baseUrl) {
     return httpClient;
   }
-
-  public Cancellable asyncReq(Req req, AsyncListener<Rsp> asyncListener) {
-    Rsp rsp = new Rsp();
-    boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
-    ServerIterator it = new ServerIterator(req, zombieServers);
-    asyncListener.onStart();
-    final AtomicBoolean cancelled = new AtomicBoolean(false);
-    AtomicReference<Cancellable> currentCancellable = new AtomicReference<>();
-    RetryListener retryListener = new RetryListener() {
-
-      @Override
-      public void onSuccess(Rsp rsp) {
-        asyncListener.onSuccess(rsp);
-      }
-
-      @Override
-      public void onFailure(Exception e, boolean retryReq) {
-        if (retryReq) {
-          String url;
-          try {
-            url = it.nextOrError(e);
-          } catch (SolrServerException ex) {
-            asyncListener.onFailure(e);
-            return;
-          }
-          try {
-            MDC.put("LBSolrClient.url", url);
-            synchronized (cancelled) {
-              if (cancelled.get()) {
-                return;
-              }
-              Cancellable cancellable = doRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this);
-              currentCancellable.set(cancellable);
-            }
-          } finally {
-            MDC.remove("LBSolrClient.url");
-          }
-        } else {
-          asyncListener.onFailure(e);
-        }
-      }
-    };
-    try {
-      Cancellable cancellable = doRequest(it.nextOrError(), req, rsp, isNonRetryable, it.isServingZombieServer(), retryListener);
-      currentCancellable.set(cancellable);
-    } catch (SolrServerException e) {
-      asyncListener.onFailure(e);
-    }
-    return () -> {
-      synchronized (cancelled) {
-        cancelled.set(true);
-        if (currentCancellable.get() != null) {
-          currentCancellable.get().cancel();
-        }
-      }
-    };
-  }
-
-  private interface RetryListener {
-    void onSuccess(Rsp rsp);
-    void onFailure(Exception e, boolean retryReq);
-  }
-
-  private Cancellable doRequest(String baseUrl, Req req, Rsp rsp, boolean isNonRetryable,
-                         boolean isZombie, RetryListener listener) {
-    rsp.server = baseUrl;
-    req.getRequest().setBasePath(baseUrl);
-    return ((Http2SolrClient)getClient(baseUrl)).asyncRequest(req.getRequest(), null, new AsyncListener<>() {
-      @Override
-      public void onSuccess(NamedList<Object> result) {
-        rsp.rsp = result;
-        if (isZombie) {
-          zombieServers.remove(baseUrl);
-        }
-        listener.onSuccess(rsp);
-      }
-
-      @Override
-      public void onFailure(Throwable oe) {
-        try {
-          throw (Exception) oe;
-        } catch (BaseHttpSolrClient.RemoteExecutionException e) {
-          listener.onFailure(e, false);
-        } catch (SolrException e) {
-          // we retry on 404 or 403 or 503 or 500
-          // unless it's an update - then we only retry on connect exception
-          if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
-            listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
-          } else {
-            // Server is alive but the request was likely malformed or invalid
-            if (isZombie) {
-              zombieServers.remove(baseUrl);
-            }
-            listener.onFailure(e, false);
-          }
-        } catch (SocketException e) {
-          if (!isNonRetryable || e instanceof ConnectException) {
-            listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
-          } else {
-            listener.onFailure(e, false);
-          }
-        } catch (SocketTimeoutException e) {
-          if (!isNonRetryable) {
-            listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
-          } else {
-            listener.onFailure(e, false);
-          }
-        } catch (SolrServerException e) {
-          Throwable rootCause = e.getRootCause();
-          if (!isNonRetryable && rootCause instanceof IOException) {
-            listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
-          } else if (isNonRetryable && rootCause instanceof ConnectException) {
-            listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
-          } else {
-            listener.onFailure(e, false);
-          }
-        } catch (Exception e) {
-          listener.onFailure(new SolrServerException(e), false);
-        }
-      }
-    });
-  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
index e9dd998..9ca63fb 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
@@ -28,7 +28,6 @@
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,7 +60,7 @@
 public abstract class LBSolrClient extends SolrClient {
 
   // defaults
-  protected static final Set<Integer> RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500));
+  private static final Set<Integer> RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500));
   private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
   private static final int NONSTANDARD_PING_LIMIT = 5;  // number of times we'll ping dead servers not in the server list
 
@@ -70,7 +69,7 @@
   private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>();
   // access to aliveServers should be synchronized on itself
 
-  protected final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
+  private final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
 
   // changes to aliveServers are reflected in this array, no need to synchronize
   private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
@@ -137,101 +136,6 @@
     }
   }
 
-  protected static class ServerIterator {
-    String serverStr;
-    List<String> skipped;
-    int numServersTried;
-    Iterator<String> it;
-    Iterator<String> skippedIt;
-    String exceptionMessage;
-    long timeAllowedNano;
-    long timeOutTime;
-
-    final Map<String, ServerWrapper> zombieServers;
-    final Req req;
-
-    public ServerIterator(Req req, Map<String, ServerWrapper> zombieServers) {
-      this.it = req.getServers().iterator();
-      this.req = req;
-      this.zombieServers = zombieServers;
-      this.timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
-      this.timeOutTime = System.nanoTime() + timeAllowedNano;
-      fetchNext();
-    }
-
-    public synchronized boolean hasNext() {
-      return serverStr != null;
-    }
-
-    private void fetchNext() {
-      serverStr = null;
-      if (req.numServersToTry != null && numServersTried > req.numServersToTry) {
-        exceptionMessage = "Time allowed to handle this request exceeded";
-        return;
-      }
-
-      while (it.hasNext()) {
-        serverStr = it.next();
-        serverStr = normalize(serverStr);
-        // if the server is currently a zombie, just skip to the next one
-        ServerWrapper wrapper = zombieServers.get(serverStr);
-        if (wrapper != null) {
-          final int numDeadServersToTry = req.getNumDeadServersToTry();
-          if (numDeadServersToTry > 0) {
-            if (skipped == null) {
-              skipped = new ArrayList<>(numDeadServersToTry);
-              skipped.add(wrapper.getBaseUrl());
-            } else if (skipped.size() < numDeadServersToTry) {
-              skipped.add(wrapper.getBaseUrl());
-            }
-          }
-          continue;
-        }
-
-        break;
-      }
-      if (serverStr == null && skipped != null) {
-        if (skippedIt == null) {
-          skippedIt = skipped.iterator();
-        }
-        if (skippedIt.hasNext()) {
-          serverStr = skippedIt.next();
-        }
-      }
-    }
-
-    boolean isServingZombieServer() {
-      return skippedIt != null;
-    }
-
-    public synchronized String nextOrError() throws SolrServerException {
-      return nextOrError(null);
-    }
-
-    public synchronized String nextOrError(Exception previousEx) throws SolrServerException {
-      String suffix = "";
-      if (previousEx == null) {
-        suffix = ":" + zombieServers.keySet();
-      }
-      // Skipping check time exceeded for the first request
-      if (numServersTried > 0 && isTimeExceeded(timeAllowedNano, timeOutTime)) {
-        throw new SolrServerException("Time allowed to handle this request exceeded"+suffix, previousEx);
-      }
-      if (serverStr == null) {
-        throw new SolrServerException("No live SolrServers available to handle this request"+suffix, previousEx);
-      }
-      numServersTried++;
-      if (req.getNumServersToTry() != null && numServersTried > req.getNumServersToTry()) {
-        throw new SolrServerException("No live SolrServers available to handle this request:"
-            + " numServersTried="+numServersTried
-            + " numServersToTry="+req.getNumServersToTry()+suffix, previousEx);
-      }
-      String rs = serverStr;
-      fetchNext();
-      return rs;
-    }
-  }
-
   public static class Req {
     @SuppressWarnings({"rawtypes"})
     protected SolrRequest request;
@@ -352,12 +256,45 @@
     Rsp rsp = new Rsp();
     Exception ex = null;
     boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
-    ServerIterator serverIterator = new ServerIterator(req, zombieServers);
-    String serverStr;
-    while ((serverStr = serverIterator.nextOrError(ex)) != null) {
+    List<ServerWrapper> skipped = null;
+
+    final Integer numServersToTry = req.getNumServersToTry();
+    int numServersTried = 0;
+
+    boolean timeAllowedExceeded = false;
+    long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
+    long timeOutTime = System.nanoTime() + timeAllowedNano;
+    for (String serverStr : req.getServers()) {
+      if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+        break;
+      }
+
+      serverStr = normalize(serverStr);
+      // if the server is currently a zombie, just skip to the next one
+      ServerWrapper wrapper = zombieServers.get(serverStr);
+      if (wrapper != null) {
+        // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
+        final int numDeadServersToTry = req.getNumDeadServersToTry();
+        if (numDeadServersToTry > 0) {
+          if (skipped == null) {
+            skipped = new ArrayList<>(numDeadServersToTry);
+            skipped.add(wrapper);
+          }
+          else if (skipped.size() < numDeadServersToTry) {
+            skipped.add(wrapper);
+          }
+        }
+        continue;
+      }
       try {
         MDC.put("LBSolrClient.url", serverStr);
-        ex = doRequest(serverStr, req, rsp, isNonRetryable, serverIterator.isServingZombieServer());
+
+        if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+          break;
+        }
+
+        ++numServersTried;
+        ex = doRequest(serverStr, req, rsp, isNonRetryable, false);
         if (ex == null) {
           return rsp; // SUCCESS
         }
@@ -365,19 +302,61 @@
         MDC.remove("LBSolrClient.url");
       }
     }
-    throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);
+
+    // try the servers we previously skipped
+    if (skipped != null) {
+      for (ServerWrapper wrapper : skipped) {
+        if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+          break;
+        }
+
+        if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+          break;
+        }
+
+        try {
+          MDC.put("LBSolrClient.url", wrapper.getBaseUrl());
+          ++numServersTried;
+          ex = doRequest(wrapper.baseUrl, req, rsp, isNonRetryable, true);
+          if (ex == null) {
+            return rsp; // SUCCESS
+          }
+        } finally {
+          MDC.remove("LBSolrClient.url");
+        }
+      }
+    }
+
+
+    final String solrServerExceptionMessage;
+    if (timeAllowedExceeded) {
+      solrServerExceptionMessage = "Time allowed to handle this request exceeded";
+    } else {
+      if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+        solrServerExceptionMessage = "No live SolrServers available to handle this request:"
+            + " numServersTried="+numServersTried
+            + " numServersToTry="+numServersToTry.intValue();
+      } else {
+        solrServerExceptionMessage = "No live SolrServers available to handle this request";
+      }
+    }
+    if (ex == null) {
+      throw new SolrServerException(solrServerExceptionMessage);
+    } else {
+      throw new SolrServerException(solrServerExceptionMessage+":" + zombieServers.keySet(), ex);
+    }
   }
 
   /**
    * @return time allowed in nanos, returns -1 if no time_allowed is specified.
    */
-  private static long getTimeAllowedInNanos(@SuppressWarnings({"rawtypes"})final SolrRequest req) {
+  private long getTimeAllowedInNanos(@SuppressWarnings({"rawtypes"})final SolrRequest req) {
     SolrParams reqParams = req.getParams();
     return reqParams == null ? -1 :
         TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS);
   }
 
-  private static boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
+  private boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
     return timeAllowedNano > 0 && System.nanoTime() > timeOutTime;
   }
 
@@ -435,7 +414,7 @@
 
   protected abstract SolrClient getClient(String baseUrl);
 
-  protected Exception addZombie(String serverStr, Exception e) {
+  private Exception addZombie(String serverStr, Exception e) {
     ServerWrapper wrapper = createServerWrapper(serverStr);
     wrapper.standard = false;
     zombieServers.put(serverStr, wrapper);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java
deleted file mode 100644
index be64275..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.util;
-
-/**
- * Listener for async requests
- */
-public interface AsyncListener<T> {
-  /**
-   * Callback method invoked before processing the request
-   */
-  default void onStart() {
-
-  }
-  void onSuccess(T t);
-  void onFailure(Throwable throwable);
-
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
deleted file mode 100644
index 323916a..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.util;
-
-public interface Cancellable {
-  void cancel();
-}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java
deleted file mode 100644
index 1c35507..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class LBSolrClientTest {
-
-  @Test
-  public void testServerIterator() throws SolrServerException {
-    LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"));
-    LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
-    List<String> actualServers = new ArrayList<>();
-    while (serverIterator.hasNext()) {
-      actualServers.add(serverIterator.nextOrError());
-    }
-    assertEquals(Arrays.asList("1", "2", "3", "4"), actualServers);
-    assertFalse(serverIterator.hasNext());
-    LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
-  }
-
-  @Test
-  public void testServerIteratorWithZombieServers() throws SolrServerException {
-    HashMap<String, LBSolrClient.ServerWrapper> zombieServers = new HashMap<>();
-    LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"));
-    LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, zombieServers);
-    zombieServers.put("2", new LBSolrClient.ServerWrapper("2"));
-
-    assertTrue(serverIterator.hasNext());
-    assertEquals("1", serverIterator.nextOrError());
-    assertTrue(serverIterator.hasNext());
-    assertEquals("3", serverIterator.nextOrError());
-    assertTrue(serverIterator.hasNext());
-    assertEquals("4", serverIterator.nextOrError());
-    assertTrue(serverIterator.hasNext());
-    assertEquals("2", serverIterator.nextOrError());
-  }
-
-  @Test
-  public void testServerIteratorTimeAllowed() throws SolrServerException, InterruptedException {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(CommonParams.TIME_ALLOWED, 300);
-    LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(params), Arrays.asList("1", "2", "3", "4"), 2);
-    LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
-    assertTrue(serverIterator.hasNext());
-    serverIterator.nextOrError();
-    Thread.sleep(300);
-    LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
-  }
-
-  @Test
-  public void testServerIteratorMaxRetry() throws SolrServerException {
-    LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"), 2);
-    LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
-    assertTrue(serverIterator.hasNext());
-    serverIterator.nextOrError();
-    assertTrue(serverIterator.hasNext());
-    serverIterator.nextOrError();
-    LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
-  }
-}
diff --git a/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java b/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java
index 25437ff..8e77b0c 100644
--- a/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java
+++ b/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.handler.component;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -24,7 +25,13 @@
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
@@ -32,6 +39,7 @@
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.core.CoreContainer;
 
@@ -82,9 +90,14 @@
 
   @Override
   public ShardHandler getShardHandler() {
+    return super.getShardHandler();
+  }
+
+  @Override
+  public ShardHandler getShardHandler(Http2SolrClient client) {
     final ShardHandlerFactory factory = this;
-    final ShardHandler wrapped = super.getShardHandler();
-    return new HttpShardHandler(this) {
+    final ShardHandler wrapped = super.getShardHandler(client);
+    return new HttpShardHandler(this, client) {
       @Override
       public void prepDistributed(ResponseBuilder rb) {
         wrapped.prepDistributed(rb);
@@ -123,6 +136,55 @@
   }
 
   @Override
+  public ShardHandler getShardHandler(HttpClient httpClient) {
+    final ShardHandlerFactory factory = this;
+    final ShardHandler wrapped = super.getShardHandler(httpClient);
+    return new HttpShardHandler(this, null) {
+      @Override
+      public void prepDistributed(ResponseBuilder rb) {
+        wrapped.prepDistributed(rb);
+      }
+
+      @Override
+      public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
+        synchronized (TrackingShardHandlerFactory.this) {
+          if (isTracking()) {
+            queue.offer(new ShardRequestAndParams(sreq, shard, params));
+          }
+        }
+        wrapped.submit(sreq, shard, params);
+      }
+
+      @Override
+      protected NamedList<Object> request(String url, @SuppressWarnings({"rawtypes"})SolrRequest req) throws IOException, SolrServerException {
+        try (SolrClient client = new HttpSolrClient.Builder(url).withHttpClient(httpClient).build()) {
+          return client.request(req);
+        }
+      }
+
+      @Override
+      public ShardResponse takeCompletedIncludingErrors() {
+        return wrapped.takeCompletedIncludingErrors();
+      }
+
+      @Override
+      public ShardResponse takeCompletedOrError() {
+        return wrapped.takeCompletedOrError();
+      }
+
+      @Override
+      public void cancelAll() {
+        wrapped.cancelAll();
+      }
+
+      @Override
+      public ShardHandlerFactory getShardHandlerFactory() {
+        return factory;
+      }
+    };
+  }
+
+  @Override
   public void close() {
     super.close();
   }