SOLR-8744: Minimize the impact on ZK when there are a lot of blocked tasks
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 54c0697..2cd09d1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -82,6 +82,7 @@
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
@@ -210,6 +211,7 @@
}
@Override
+ @SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
@SuppressWarnings("unchecked")
public SolrResponse processMessage(ZkNodeProps message, String operation) {
log.info("OverseerCollectionMessageHandler.processMessage : "+ operation + " , "+ message.toString());
@@ -289,6 +291,8 @@
case MOCK_REPLICA_TASK: {
//only for test purposes
Thread.sleep(message.getInt("sleep", 1));
+ log.info("MOCK_TASK_EXECUTED time {} data {}",System.currentTimeMillis(), Utils.toJSONString(message));
+ results.add("MOCK_FINISHED", System.currentTimeMillis());
break;
}
default:
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 092ed97..9c739c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -19,15 +19,19 @@
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import com.google.common.collect.ImmutableSet;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
@@ -36,6 +40,7 @@
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.util.DefaultSolrThreadFactory;
@@ -63,6 +68,7 @@
* executed concurrently
*/
public static final int MAX_PARALLEL_TASKS = 100;
+ public static final int MAX_BLOCKED_TASKS = 1000;
public ExecutorService tpe;
@@ -74,7 +80,7 @@
private DistributedMap failureMap;
// Set that maintains a list of all the tasks that are running. This is keyed on zk id of the task.
- final private Set runningTasks;
+ final private Set<String> runningTasks;
// List of completed tasks. This is used to clean up workQueue in zk.
final private HashMap<String, QueueEvent> completedTasks;
@@ -91,6 +97,24 @@
// It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
// deleted from the work-queue as that is a batched operation.
final private Set<String> runningZKTasks;
+ // This map may contain tasks which are read from work queue but could not
+ // be executed because they are blocked or the execution queue is full
+ // This is an optimization to ensure that we do not read the same tasks
+ // again and again from ZK.
+ final private Map<String, QueueEvent> blockedTasks = new LinkedHashMap<>();
+ final private Predicate<String> excludedTasks = new Predicate<String>() {
+ @Override
+ public boolean test(String s) {
+ return runningTasks.contains(s) || blockedTasks.containsKey(s);
+ }
+
+ @Override
+ public String toString() {
+ return StrUtils.join(ImmutableSet.of(runningTasks, blockedTasks.keySet()), ',');
+ }
+
+ };
+
private final Object waitLock = new Object();
private OverseerMessageHandlerSelector selector;
@@ -115,7 +139,7 @@
this.completedMap = completedMap;
this.failureMap = failureMap;
this.runningZKTasks = new HashSet<>();
- this.runningTasks = new HashSet();
+ this.runningTasks = new HashSet<>();
this.completedTasks = new HashMap<>();
}
@@ -189,17 +213,46 @@
if (waited)
cleanUpWorkQueue();
- List<QueueEvent> heads = workQueue.peekTopN(MAX_PARALLEL_TASKS, runningZKTasks, 2000L);
+
+ ArrayList<QueueEvent> heads = new ArrayList<>(blockedTasks.size() + MAX_PARALLEL_TASKS);
+ heads.addAll(blockedTasks.values());
+
+ //If we have enough items in the blocked tasks already, it makes
+ // no sense to read more items from the work queue. it makes sense
+ // to clear out at least a few items in the queue before we read more items
+ if (heads.size() < MAX_BLOCKED_TASKS) {
+ //instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as we can execute
+ int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasks.size());
+ List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 2000L);
+ log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
+ heads.addAll(newTasks);
+ } else {
+ // Prevent free-spinning this loop.
+ Thread.sleep(1000);
+ }
+
+ if (isClosed) break;
+
if (heads.isEmpty()) {
continue;
}
- log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
-
- if (isClosed) break;
+ blockedTasks.clear(); // clear it now; may get refilled below.
taskBatch.batchId++;
+ boolean tooManyTasks = false;
for (QueueEvent head : heads) {
+ if (!tooManyTasks) {
+ synchronized (runningTasks) {
+ tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
+ }
+ }
+ if (tooManyTasks) {
+ // Too many tasks are running, just shove the rest into the "blocked" queue.
+ if(blockedTasks.size() < MAX_BLOCKED_TASKS)
+ blockedTasks.put(head.getId(), head);
+ continue;
+ }
if (runningZKTasks.contains(head.getId())) continue;
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
@@ -217,6 +270,9 @@
OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch);
if (lock == null) {
log.debug("Exclusivity check failed for [{}]", message.toString());
+ //we may end crossing the size of the MAX_BLOCKED_TASKS. They are fine
+ if (blockedTasks.size() < MAX_BLOCKED_TASKS)
+ blockedTasks.put(head.getId(), head);
continue;
}
try {
@@ -370,7 +426,6 @@
runningTasks.add(head.getId());
}
-// messageHandler.markExclusiveTask(taskKey, message);
if (asyncId != null)
runningMap.put(asyncId, null);
@@ -512,6 +567,7 @@
synchronized (runningTasks) {
log.debug("RunningTasks: {}", runningTasks.toString());
}
+ log.debug("BlockedTasks: {}", blockedTasks.keySet().toString());
synchronized (completedTasks) {
log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index aae7df2..5719aa9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import java.util.function.Predicate;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -222,7 +223,7 @@
}
- public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
+ public List<QueueEvent> peekTopN(int n, Predicate<String> excludeSet, long waitMillis)
throws KeeperException, InterruptedException {
ArrayList<QueueEvent> topN = new ArrayList<>();
@@ -232,7 +233,7 @@
else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
try {
- for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.contains(dir + "/" + child))) {
+ for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.test(dir + "/" + child))) {
topN.add(new QueueEvent(dir + "/" + element.first(),
element.second(), null));
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
index c18b330..ec8a6c4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
@@ -38,6 +38,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerTaskProcessor.MAX_PARALLEL_TASKS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
/**
* Tests the Multi threaded Collections API.
*/
@@ -55,11 +60,58 @@
@Test
@ShardsFixed(num = 4)
public void test() throws Exception {
-
testParallelCollectionAPICalls();
testTaskExclusivity();
testDeduplicationOfSubmittedTasks();
testLongAndShortRunningParallelApiCalls();
+ testFillWorkQueue();
+ }
+
+ private void testFillWorkQueue() throws Exception {
+ try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
+ DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(),
+ "/overseer/collection-queue-work", new Overseer.Stats());
+ //fill the work queue with blocked tasks by adding more than the no:of parallel tasks
+ for (int i = 0; i < MAX_PARALLEL_TASKS+5; i++) {
+ distributedQueue.offer(Utils.toJSON(Utils.makeMap(
+ "collection", "A_COLL",
+ QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
+ ASYNC, String.valueOf(i),
+
+ "sleep", (i == 0 ? "1000" : "1") //first task waits for 1 second, and thus blocking
+ // all other tasks. Subsequent tasks only wait for 1ms
+ )));
+ log.info("MOCK task added {}", i);
+
+ }
+ Thread.sleep(10);//wait and post the next message
+
+ //this is not going to be blocked because it operates on another collection
+ distributedQueue.offer(Utils.toJSON(Utils.makeMap(
+ "collection", "B_COLL",
+ QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
+ ASYNC, "200",
+ "sleep", "1"
+ )));
+
+
+ Long acoll = null, bcoll = null;
+ for (int i = 0; i < 100; i++) {
+ if (bcoll == null) {
+ CollectionAdminResponse statusResponse = getStatusResponse("200", client);
+ bcoll = (Long) statusResponse.getResponse().get("MOCK_FINISHED");
+ }
+ if (acoll == null) {
+ CollectionAdminResponse statusResponse = getStatusResponse("2", client);
+ acoll = (Long) statusResponse.getResponse().get("MOCK_FINISHED");
+ }
+ if (acoll != null && bcoll != null) break;
+ Thread.sleep(100);
+ }
+ assertTrue(acoll != null && bcoll != null);
+ assertTrue(acoll > bcoll);
+ }
+
}
private void testParallelCollectionAPICalls() throws IOException, SolrServerException {
@@ -116,14 +168,14 @@
distributedQueue.offer(Utils.toJSON(Utils.makeMap(
"collection", "ocptest_shardsplit",
- Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
- CommonAdminParams.ASYNC, "1001",
+ QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
+ ASYNC, "1001",
"sleep", "100"
)));
distributedQueue.offer(Utils.toJSON(Utils.makeMap(
"collection", "ocptest_shardsplit",
- Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
- CommonAdminParams.ASYNC, "1002",
+ QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
+ ASYNC, "1002",
"sleep", "100"
)));
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 4658367..8f4ee1e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -21,6 +21,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
@@ -163,7 +164,7 @@
log.info("SHARDHANDLER");
return shardHandlerMock;
}).anyTimes();
- workQueueMock.peekTopN(EasyMock.anyInt(), anyObject(Set.class), EasyMock.anyLong());
+ workQueueMock.peekTopN(EasyMock.anyInt(), anyObject(Predicate.class), EasyMock.anyLong());
expectLastCall().andAnswer(() -> {
Object result;
int count = 0;
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
index 95cdd40..6380aac 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
@@ -20,6 +20,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.response.SolrResponseBase;
@@ -75,7 +77,7 @@
tq.createRequestNode(Utils.toJSON(props), watchID);
// Set a SolrResponse as the response node by removing the QueueEvent, as done in OverseerTaskProcessor
- List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, Collections.emptySet(), 1000);
+ List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, s -> false, 1000);
OverseerTaskQueue.QueueEvent requestId2Event = null;
for (OverseerTaskQueue.QueueEvent queueEvent : queueEvents) {
Map<String, Object> eventProps = (Map<String, Object>) Utils.fromJSON(queueEvent.getBytes());
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index e5e6ade..a7660d8 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -1992,12 +1992,16 @@
}
static RequestStatusState getRequestState(String requestId, SolrClient client) throws IOException, SolrServerException {
- CollectionAdminRequest.RequestStatus requestStatusRequest = new CollectionAdminRequest.RequestStatus();
- requestStatusRequest.setRequestId(requestId);
- CollectionAdminResponse response = requestStatusRequest.process(client);
+ CollectionAdminResponse response = getStatusResponse(requestId, client);
NamedList innerResponse = (NamedList) response.getResponse().get("status");
return RequestStatusState.fromKey((String) innerResponse.get("state"));
}
+ static CollectionAdminResponse getStatusResponse(String requestId, SolrClient client) throws SolrServerException, IOException {
+ CollectionAdminRequest.RequestStatus requestStatusRequest = new CollectionAdminRequest.RequestStatus();
+ requestStatusRequest.setRequestId(requestId);
+ return requestStatusRequest.process(client);
+ }
+
}