blob: d4626c275a77c5b4cf87170a79fa7cc7ebccb26e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import org.apache.lucene.util.Version;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.api.collections.CreateCollectionCmd;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.ParWorkExecutor;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ConnectionManager;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SysStats;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
/**
* <p>Cluster leader. Responsible for processing state updates, node assignments, creating/deleting
* collections, shards, replicas and setting various properties.</p>
*
* <p>The <b>Overseer</b> is a single elected node in the SolrCloud cluster that is in charge of interactions with
* ZooKeeper that require global synchronization. </p>
*
* <p>The Overseer deals with:</p>
* <ul>
* <li>Cluster State updates, i.e. updating Collections' <code>state.json</code> files in ZooKeeper,</li>
* <li>Collection API implementation, see
* {@link OverseerCollectionConfigSetProcessor} and {@link OverseerCollectionMessageHandler} (and the example below),</li>
* <li>Updating Config Sets, see {@link OverseerCollectionConfigSetProcessor} and {@link OverseerConfigSetMessageHandler},</li>
* </ul>
*
* <p>The nodes in the cluster communicate with the Overseer over queues implemented in ZooKeeper. There are essentially
* two queues:</p>
* <ol>
* <li>The <b>state update queue</b>, through which nodes request the Overseer to update the <code>state.json</code> file of a
* Collection in ZooKeeper. This queue is in Zookeeper at <code>/overseer/queue</code>,</li>
* <li>A queue shared between <b>Collection API and Config Set API</b> requests. This queue is in Zookeeper at
* <code>/overseer/collection-queue-work</code>.</li>
* </ol>
*
* <p>An example of the steps involved in the Overseer processing a Collection creation API call:</p>
* <ol>
* <li>Client uses the Collection API with <code>CREATE</code> action and reaches a node of the cluster,</li>
* <li>The node (via {@link CollectionsHandler}) enqueues the request into the <code>/overseer/collection-queue-work</code>
* queue in ZooKeepeer,</li>
* <li>The {@link OverseerCollectionConfigSetProcessor} running on the Overseer node dequeues the message and using an
* executor service with a maximum pool size of {@link OverseerTaskProcessor#MAX_PARALLEL_TASKS} hands it for processing
* to {@link OverseerCollectionMessageHandler},</li>
* <li>Command {@link CreateCollectionCmd} then executes and does:
* <ol>
* <li>Update some state directly in ZooKeeper (creating collection znode),</li>
* <li>Compute replica placement on available nodes in the cluster,</li>
* <li>Enqueue a state change request for creating the <code>state.json</code> file for the collection in ZooKeeper.
* This is done by enqueuing a message in <code>/overseer/queue</code>,</li>
* <li>The command then waits for the update to be seen in ZooKeeper...</li>
* </ol></li>
* <li>The ClusterState Updater (also running on the Overseer node) dequeues the state change message and creates the
* <code>state.json</code> file in ZooKeeper for the Collection. All the work of the cluster state updater
* (creations, updates, deletes) is done sequentially for the whole cluster by a single thread.</li>
* <li>The {@link CreateCollectionCmd} sees the state change in
* ZooKeeper and:
* <ol start="5">
* <li>Builds and sends requests to each node to create the appropriate cores for all the replicas of all shards
* of the collection. Nodes create the replicas and set them to {@link org.apache.solr.common.cloud.Replica.State#ACTIVE}.</li>
* </ol></li>
* <li>The collection creation command has succeeded from the Overseer perspective,</li>
* <li>{@link CollectionsHandler} checks the replicas in Zookeeper and verifies they are all
* {@link org.apache.solr.common.cloud.Replica.State#ACTIVE},</li>
* <li>The client receives a success return.</li>
* </ol>
*/
public class Overseer implements SolrCloseable {
public static final String QUEUE_OPERATION = "op";
public static final String OVERSEER_COLLECTION_QUEUE_WORK = "/overseer/collection-queue-work";
public static final String OVERSEER_QUEUE = "/overseer/queue";
public static final String OVERSEER_ASYNC_IDS = "/overseer/async_ids";
public static final String OVERSEER_COLLECTION_MAP_FAILURE = "/overseer/collection-map-failure";
public static final String OVERSEER_COLLECTION_MAP_COMPLETED = "/overseer/collection-map-completed";
public static final String OVERSEER_COLLECTION_MAP_RUNNING = "/overseer/collection-map-running";
public static final int STATE_UPDATE_MAX_QUEUE = 20000;
public static final int NUM_RESPONSES_TO_STORE = 10000;
public static final String OVERSEER_ELECT = "/overseer/overseer_elect";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static volatile Overseer OUR_JVM_OVERSEER = null;
private volatile boolean closeAndDone;
private volatile boolean initedHttpClient = false;
private volatile QueueWatcher queueWatcher;
private volatile CollectionWorkQueueWatcher collectionQueueWatcher;
private volatile ParWorkExecutor taskExecutor;
private volatile ParWorkExecutor zkWriterExecutor;
public boolean isDone() {
return closeAndDone;
}
public ExecutorService getTaskExecutor() {
return taskExecutor;
}
public ExecutorService getTaskZkWriterExecutor() {
return zkWriterExecutor;
}
public static class OverseerThread extends SolrThread implements Closeable {
protected volatile boolean isClosed;
private final Closeable thread;
public OverseerThread(ThreadGroup ccTg, Closeable thread, String name) {
super(ccTg, (Runnable) thread, name);
this.thread = thread;
}
@Override
public void run() {
super.run();
}
@Override
public void close() throws IOException {
this.isClosed = true;
thread.close();
}
public Closeable getThread() {
return thread;
}
public boolean isClosed() {
return this.isClosed;
}
}
// private volatile OverseerThread updaterThread;
// private volatile ExecutorService stateManagmentExecutor;
//
// private volatile ExecutorService taskExecutor;
private final ZkStateWriter zkStateWriter;
private final UpdateShardHandler updateShardHandler;
private final String adminPath;
private final ZkController zkController;
private volatile Stats stats;
private volatile String id;
private volatile boolean closed = true;
private volatile boolean systemCollCompatCheck = true;
private final CloudConfig config;
public volatile Http2SolrClient overseerOnlyClient;
public volatile LBHttp2SolrClient overseerLbClient;
// overseer not responsible for closing reader
public Overseer(UpdateShardHandler updateShardHandler, String adminPath, ZkController zkController, CloudConfig config) {
this.updateShardHandler = updateShardHandler;
this.adminPath = adminPath;
this.zkController = zkController;
this.stats = new Stats();
this.config = config;
Stats stats = new Stats();
this.zkStateWriter = new ZkStateWriter(zkController.getZkStateReader(), stats, this);
}
public synchronized void start(String id, ElectionContext context, boolean weAreReplacement) throws KeeperException {
log.info("Starting Overseer");
if (getCoreContainer().isShutDown() || closeAndDone) {
if (log.isDebugEnabled()) log.debug("Already closed, exiting");
return;
}
if (!closed) {
log.warn("Startomg an Overseer that was not closed");
IOUtils.closeQuietly(zkController.overseerElector);
IOUtils.closeQuietly(this);
}
// if (OUR_JVM_OVERSEER != null) {
// throw new IllegalStateException("Cannot start an Overseer if another is running");
// }
OUR_JVM_OVERSEER = this;
// doClose();
this.id = id;
//
// stateManagmentExecutor = ParWork.getParExecutorService("stateManagmentExecutor",
// 1, 1, 3000, new SynchronousQueue());
taskExecutor = (ParWorkExecutor) ParWork.getParExecutorService("overseerTaskExecutor",
4, Math.max(4, SysStats.PROC_COUNT * 2), 1000, new BlockingArrayQueue<>(32, 64));
for (int i = 0; i < 4; i++) {
taskExecutor.prestartCoreThread();
}
zkWriterExecutor = (ParWorkExecutor) ParWork.getParExecutorService("overseerZkWriterExecutor",
12, Math.max(12, SysStats.PROC_COUNT * 2), 1000, new BlockingArrayQueue<>(64, 128));
for (int i = 0; i < 12; i++) {
zkWriterExecutor.prestartCoreThread();
}
if (overseerOnlyClient == null && !closeAndDone && !initedHttpClient) {
overseerOnlyClient = new Http2SolrClient.Builder().idleTimeout(60000).connectionTimeout(5000).markInternalRequest().build();
overseerOnlyClient.enableCloseLock();
this.overseerLbClient = new LBHttp2SolrClient(overseerOnlyClient);
initedHttpClient = true;
}
// try {
// if (log.isDebugEnabled()) {
// log.debug("set watch on leader znode");
// }
// zkController.getZkClient().exists(Overseer.OVERSEER_ELECT + "/leader", new Watcher() {
//
// @Override
// public void process(WatchedEvent event) {
// if (Event.EventType.None.equals(event.getType())) {
// return;
// }
// if (!isClosed()) {
// log.info("Overseer leader has changed, closing ...");
// Overseer.this.close();
// }
// }}, true);
// } catch (KeeperException.SessionExpiredException e) {
// log.warn("ZooKeeper session expired");
// return;
// } catch (InterruptedException | AlreadyClosedException e) {
// log.info("Already closed");
// return;
// } catch (Exception e) {
// log.error("Unexpected error in Overseer state update loop", e);
// throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
// }
log.info("Overseer (id={}) starting", id);
//launch cluster state updater thread
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
//systemCollectionCompatCheck(new StringBiConsumer());
this.zkStateWriter.init();
queueWatcher = new WorkQueueWatcher(getCoreContainer(), this);
collectionQueueWatcher = new CollectionWorkQueueWatcher(getCoreContainer(), id, overseerLbClient, adminPath, stats, Overseer.this);
try {
queueWatcher.start(weAreReplacement);
collectionQueueWatcher.start(weAreReplacement);
} catch (InterruptedException e) {
log.warn("interrupted", e);
}
closed = false;
// TODO: don't track for a moment, can leak out of collection api tests
// assert ObjectReleaseTracker.track(this);
}
public void systemCollectionCompatCheck(final BiConsumer<String, Object> consumer) {
ClusterState clusterState = zkController.getClusterState();
if (clusterState == null) {
log.warn("Unable to check back-compat of .system collection - can't obtain ClusterState.");
return;
}
DocCollection coll = clusterState.getCollectionOrNull(CollectionAdminParams.SYSTEM_COLL);
if (coll == null) {
return;
}
// check that all shard leaders are active
boolean allActive = true;
for (Slice s : coll.getActiveSlices()) {
if (s.getLeader() == null || !s.getLeader().isActive(zkController.getZkStateReader().getLiveNodes())) {
allActive = false;
break;
}
}
if (allActive) {
doCompatCheck(consumer);
} else {
// wait for all leaders to become active and then check
zkController.zkStateReader.registerCollectionStateWatcher(CollectionAdminParams.SYSTEM_COLL, (liveNodes, state) -> {
boolean active = true;
if (state == null || liveNodes.isEmpty()) {
return true;
}
for (Slice s : state.getActiveSlices()) {
if (s.getLeader() == null || !s.getLeader().isActive(liveNodes)) {
active = false;
break;
}
}
if (active) {
doCompatCheck(consumer);
}
return active;
});
}
}
private void doCompatCheck(BiConsumer<String, Object> consumer) {
if (systemCollCompatCheck) {
systemCollCompatCheck = false;
} else {
return;
}
try {
CloudHttp2SolrClient client = getCoreContainer().getZkController().getCloudSolrClient();
CollectionAdminRequest.ColStatus req = CollectionAdminRequest.collectionStatus(CollectionAdminParams.SYSTEM_COLL)
.setWithSegments(true)
.setWithFieldInfo(true);
CollectionAdminResponse rsp = req.process(client);
NamedList<Object> status = (NamedList<Object>)rsp.getResponse().get(CollectionAdminParams.SYSTEM_COLL);
Collection<String> nonCompliant = (Collection<String>)status.get("schemaNonCompliant");
if (!nonCompliant.contains("(NONE)")) {
consumer.accept("indexFieldsNotMatchingSchema", nonCompliant);
}
Set<Integer> segmentCreatedMajorVersions = new HashSet<>();
Set<String> segmentVersions = new HashSet<>();
int currentMajorVersion = Version.LATEST.major;
String currentVersion = Version.LATEST.toString();
segmentVersions.add(currentVersion);
segmentCreatedMajorVersions.add(currentMajorVersion);
NamedList<Object> shards = (NamedList<Object>)status.get("shards");
for (Map.Entry<String, Object> entry : shards) {
NamedList<Object> leader = (NamedList<Object>)((NamedList<Object>)entry.getValue()).get("leader");
if (leader == null) {
continue;
}
NamedList<Object> segInfos = (NamedList<Object>)leader.get("segInfos");
if (segInfos == null) {
continue;
}
NamedList<Object> infos = (NamedList<Object>)segInfos.get("info");
if (((Number)infos.get("numSegments")).intValue() > 0) {
segmentVersions.add(infos.get("minSegmentLuceneVersion").toString());
}
if (infos.get("commitLuceneVersion") != null) {
segmentVersions.add(infos.get("commitLuceneVersion").toString());
}
NamedList<Object> segmentInfos = (NamedList<Object>)segInfos.get("segments");
segmentInfos.forEach((k, v) -> {
NamedList<Object> segment = (NamedList<Object>)v;
segmentVersions.add(segment.get("version").toString());
if (segment.get("minVersion") != null) {
segmentVersions.add(segment.get("version").toString());
}
if (segment.get("createdVersionMajor") != null) {
segmentCreatedMajorVersions.add(((Number)segment.get("createdVersionMajor")).intValue());
}
});
}
if (segmentVersions.size() > 1) {
consumer.accept("differentSegmentVersions", segmentVersions);
consumer.accept("currentLuceneVersion", currentVersion);
}
if (segmentCreatedMajorVersions.size() > 1) {
consumer.accept("differentMajorSegmentVersions", segmentCreatedMajorVersions);
consumer.accept("currentLuceneMajorVersion", currentMajorVersion);
}
} catch (SolrServerException | IOException e) {
log.warn("Unable to perform back-compat check of .system collection", e);
}
}
public Stats getStats() {
return stats;
}
ZkController getZkController(){
return zkController;
}
public CoreContainer getCoreContainer() {
return zkController.getCoreContainer();
}
public SolrCloudManager getSolrCloudManager() {
return zkController.getSolrCloudManager();
}
public void closeAndDone() {
synchronized (this) {
this.closed = true;
this.closeAndDone = true;
}
close();
}
public boolean isCloseAndDone() {
return closeAndDone;
}
public void close() {
log.info("Overseer (id={}) closing closeAndDone={}}", id, closeAndDone);
boolean cd = closeAndDone;
OUR_JVM_OVERSEER = null;
closed = true;
IOUtils.closeQuietly(queueWatcher);
IOUtils.closeQuietly(collectionQueueWatcher);
if (taskExecutor != null) {
taskExecutor.shutdown();
}
if (zkWriterExecutor != null) {
zkWriterExecutor.shutdown();
}
if (overseerOnlyClient != null) {
overseerOnlyClient.disableCloseLock();
}
if (overseerLbClient != null) {
overseerLbClient.close();
overseerLbClient = null;
}
if (overseerOnlyClient != null) {
overseerOnlyClient.close();
overseerOnlyClient = null;
}
if (taskExecutor != null) {
try {
taskExecutor.awaitTermination(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e, true);
}
}
if (!cd) {
boolean retry;
synchronized (this) {
retry = !zkController.getCoreContainer().isShutDown() && !zkController.isShutdownCalled() && !zkController.isClosed() && !closeAndDone;
}
if (retry && zkController.getZkClient().isAlive()) {
log.info("rejoining the overseer election after closing");
try {
zkController.rejoinOverseerElection(false);
} catch (AlreadyClosedException e) {
} catch (Exception e) {
log.warn("Could not rejoin election", e);
}
}
}
if (log.isDebugEnabled()) {
log.debug("doClose - end");
}
assert ObjectReleaseTracker.release(this);
}
@Override
public boolean isClosed() {
return closed;
}
/**
* Get queue that can be used to send messages to Overseer.
* <p>
* Any and all modifications to the cluster state must be sent to
* the overseer via this queue. The complete list of overseer actions
* supported by this queue are documented inside the {@link OverseerAction} enum.
* <p>
* Performance statistics on the returned queue
* are <em>not</em> tracked by the Overseer Stats API,
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
* Therefore, this method should be used only by clients for writing to the overseer queue.
* <p>
*
* @return a {@link ZkDistributedQueue} object
*/
public ZkDistributedQueue getStateUpdateQueue() {
return getStateUpdateQueue(new Stats());
}
/**
* The overseer uses the returned queue to read any operations submitted by clients.
* This method should not be used directly by anyone other than the Overseer itself.
* This method will create the /overseer znode in ZooKeeper if it does not exist already.
*
* @param zkStats a {@link Stats} object which tracks statistics for all zookeeper operations performed by this queue
* @return a {@link ZkDistributedQueue} object
*/
ZkDistributedQueue getStateUpdateQueue(Stats zkStats) {
return new ZkDistributedQueue(zkController.getZkClient(), "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE, new ConnectionManager.IsClosed(){
public boolean isClosed() {
return Overseer.this.isClosed() || zkController.getCoreContainer().isShutDown();
}
});
}
// static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
// return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
// }
/* Internal map for failed tasks, not to be used outside of the Overseer */
static DistributedMap getRunningMap(final SolrZkClient zkClient) throws KeeperException {
return new DistributedMap(zkClient, "/overseer/collection-map-running");
}
/* Size-limited map for successfully completed tasks*/
static DistributedMap getCompletedMap(final SolrZkClient zkClient) throws KeeperException {
return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed", NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
}
/* Map for failed tasks, not to be used outside of the Overseer */
static DistributedMap getFailureMap(final SolrZkClient zkClient) throws KeeperException {
return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
}
/* Map of async IDs currently in use*/
static DistributedMap getAsyncIdsMap(final SolrZkClient zkClient) throws KeeperException {
return new DistributedMap(zkClient, Overseer.OVERSEER_ASYNC_IDS);
}
/**
* Get queue that can be used to submit collection API tasks to the Overseer.
* <p>
* This queue is used internally by the {@link CollectionsHandler} to submit collection API
* tasks which are executed by the {@link OverseerCollectionMessageHandler}. The actions supported
* by this queue are listed in the {@link org.apache.solr.common.params.CollectionParams.CollectionAction}
* enum.
* <p>
* Performance statistics on the returned queue
* are <em>not</em> tracked by the Overseer Stats API,
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) {
return getCollectionQueue(zkClient, new Stats());
}
/**
* Get queue that can be used to read collection API tasks to the Overseer.
* <p>
* This queue is used internally by the {@link OverseerCollectionMessageHandler} to read collection API
* tasks submitted by the {@link CollectionsHandler}. The actions supported
* by this queue are listed in the {@link org.apache.solr.common.params.CollectionParams.CollectionAction}
* enum.
* <p>
* Performance statistics on the returned queue are tracked by the Overseer Stats API,
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
return new OverseerTaskQueue(zkClient, "/overseer/collection-queue-work", zkStats);
}
/**
* Get queue that can be used to submit configset API tasks to the Overseer.
* <p>
* This queue is used internally by the {@link org.apache.solr.handler.admin.ConfigSetsHandler} to submit
* tasks which are executed by the {@link OverseerConfigSetMessageHandler}. The actions supported
* by this queue are listed in the {@link org.apache.solr.common.params.ConfigSetParams.ConfigSetAction}
* enum.
* <p>
* Performance statistics on the returned queue
* are <em>not</em> tracked by the Overseer Stats API,
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient) {
return getConfigSetQueue(zkClient, new Stats());
}
/**
* Get queue that can be used to read configset API tasks to the Overseer.
* <p>
* This queue is used internally by the {@link OverseerConfigSetMessageHandler} to read configset API
* tasks submitted by the {@link org.apache.solr.handler.admin.ConfigSetsHandler}. The actions supported
* by this queue are listed in the {@link org.apache.solr.common.params.ConfigSetParams.ConfigSetAction}
* enum.
* <p>
* Performance statistics on the returned queue are tracked by the Overseer Stats API,
* see {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
* <p>
* For now, this internally returns the same queue as {@link #getCollectionQueue(SolrZkClient, Stats)}.
* It is the responsibility of the client to ensure that configset API actions are prefixed with
* {@link OverseerConfigSetMessageHandler#CONFIGSETS_ACTION_PREFIX} so that it is processed by
* {@link OverseerConfigSetMessageHandler}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) {
// For now, we use the same queue as the collection queue, but ensure
// that the actions are prefixed with a unique string.
return getCollectionQueue(zkClient, zkStats);
}
public ZkStateReader getZkStateReader() {
return zkController.getZkStateReader();
}
public ZkStateWriter getZkStateWriter() {
return zkStateWriter;
}
public void offerStateUpdate(byte[] data) throws KeeperException, InterruptedException {
getStateUpdateQueue().offer(data, false);
}
public Future writePendingUpdates(String collection) {
return zkStateWriter.writePendingUpdates(collection);
}
private static abstract class QueueWatcher implements Watcher, Closeable {
protected final CoreContainer cc;
protected final ZkController zkController;
protected final String path;
protected final Overseer overseer;
protected volatile List<String> startItems;
protected volatile boolean closed;
protected final ReentrantLock ourLock = new ReentrantLock(true);
public QueueWatcher(CoreContainer cc, Overseer overseer, String path) throws KeeperException {
this.cc = cc;
this.zkController = cc.getZkController();
this.overseer = overseer;
this.path = path;
}
public abstract void start(boolean weAreReplacement) throws KeeperException, InterruptedException;
private List<String> getItems() {
try {
if (log.isDebugEnabled()) log.debug("get items from Overseer work queue {}", path);
List<String> children = zkController.getZkClient().getChildren(path, null, null, true, false);
List<String> items = new ArrayList<>(children);
Collections.sort(items);
return items;
} catch (KeeperException.SessionExpiredException e) {
log.warn("ZooKeeper session expired");
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (AlreadyClosedException e) {
throw e;
} catch (Exception e) {
log.error("Unexpected error in Overseer state update loop", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
@Override
public void process(WatchedEvent event) {
if (Event.EventType.None.equals(event.getType())) {
return;
}
if (this.closed || zkController.getZkClient().isClosed()) {
log.info("Overseer is closed, do not process watcher for queue");
return;
}
ourLock.lock();
try {
try {
List<String> items = getItems();
if (items.size() > 0) {
processQueueItems(items, false, false);
}
} catch (AlreadyClosedException e) {
} catch (Exception e) {
log.error("Exception during overseer queue queue processing", e);
}
} finally {
ourLock.unlock();
}
}
protected abstract void processQueueItems(List<String> items, boolean onStart, boolean weAreReplacement);
@Override
public void close() {
this.closed = true;
closeWatcher();
}
private void closeWatcher() {
try {
zkController.getZkClient().removeWatches(path, this, WatcherType.Any, true);
} catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
}
}
public class WorkQueueWatcher extends QueueWatcher {
public WorkQueueWatcher(CoreContainer cc, Overseer overseer) throws KeeperException {
super(cc, overseer, Overseer.OVERSEER_QUEUE);
}
public void start(boolean weAreReplacement) throws KeeperException, InterruptedException {
if (closed) return;
zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
startItems = super.getItems();
log.info("Overseer found entries on start {} {}", startItems, path);
if (startItems.size() > 0) {
processQueueItems(startItems, true, weAreReplacement);
}
}
@Override
protected void processQueueItems(List<String> items, boolean onStart, boolean weAreReplacement) {
//if (closed) return;
List<String> fullPaths = new ArrayList<>(items.size());
CountDownLatch delCountDownLatch = null;
ourLock.lock();
String forceWrite = null;
boolean wroteUpdates = false;
try {
if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
for (String item : items) {
fullPaths.add(path + "/" + item);
}
Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
final List<StateEntry> shardStateCollections = new ArrayList<>();
for (byte[] item : data.values()) {
final ZkNodeProps message = ZkNodeProps.load(item);
try {
StateEntry entry = new StateEntry();
entry.message = message;
log.debug("add state update {}", message);
shardStateCollections.add(entry);
} catch (Exception e) {
log.error("Overseer state update queue processing failed", e);
}
}
Map<String,ConcurrentHashMap<String,ZkStateWriter.StateUpdate>> collStateUpdates = new ConcurrentHashMap<>();
for (Overseer.StateEntry sentry : shardStateCollections) {
try {
ZkNodeProps stateUpdateMessage = sentry.message;
final String op = stateUpdateMessage.getStr(StatePublisher.OPERATION);
OverseerAction overseerAction = OverseerAction.get(op);
if (overseerAction == null) {
throw new RuntimeException("unknown operation:" + op + " contents:" + stateUpdateMessage.getProperties());
}
switch (overseerAction) {
case STATE:
if (log.isDebugEnabled()) log.debug("state cmd {}", stateUpdateMessage);
stateUpdateMessage.getProperties().remove(StatePublisher.OPERATION);
for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
OverseerAction oa = OverseerAction.get(stateUpdateEntry.getKey());
if (OverseerAction.RECOVERYNODE.equals(oa) || OverseerAction.DOWNNODE.equals(oa)) {
if (OverseerAction.DOWNNODE.equals(oa) && onStart && !weAreReplacement) {
continue;
}
Replica.State setState = null;
if (OverseerAction.DOWNNODE.equals(oa)) {
setState = Replica.State.DOWN;
} else if (OverseerAction.RECOVERYNODE.equals(oa)) {
setState = Replica.State.RECOVERING;
}
Replica.State finalSetState = setState;
Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
String collId = Long.toString(docColl.getId());
ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
if (updates == null) {
updates = new ConcurrentHashMap<>();
collStateUpdates.put(collId, updates);
}
List<Replica> replicas = docColl.getReplicas();
for (Replica replica : replicas) {
if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
if (log.isDebugEnabled()) log.debug("set {} node operation {} for replica {}", finalSetState, op, replica);
ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
update.id = replica.getId();
update.state = Replica.State.getShortState(finalSetState);
updates.put(update.id, update);
}
}
});
}
for (Map.Entry<String,Object> stateUpdateEntry2 : stateUpdateMessage.getProperties().entrySet()) {
// if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
OverseerAction oa2 = OverseerAction.get(stateUpdateEntry2.getKey());
if (OverseerAction.RECOVERYNODE.equals(oa2) || OverseerAction.DOWNNODE.equals(oa2)) {
continue;
}
String id = stateUpdateEntry2.getKey();
String stateString = (String) stateUpdateEntry2.getValue();
log.trace("stateString={}", stateString);
try {
String collId = id.substring(0, id.indexOf('-'));
ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
if (updates == null) {
updates = new ConcurrentHashMap<>();
collStateUpdates.put(collId, updates);
}
ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
update.id = id;
update.state = stateString;
updates.put(id, update);
} catch (Exception e) {
log.error("error processing state update {} {}", id, stateString);
}
}
}
break;
// MRM TODO:
// case ADDROUTINGRULE:
// return new SliceMutator(cloudManager).addRoutingRule(clusterState, message);
// case REMOVEROUTINGRULE:
// return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
case UPDATESHARDSTATE: // MRM TODO: look at how we handle this and make it so it can use StatePublisher
String collection = stateUpdateMessage.getStr("collection");
stateUpdateMessage.getProperties().remove("collection");
stateUpdateMessage.getProperties().remove(StatePublisher.OPERATION);
Long collIdLong = zkStateWriter.getCS().get(collection).getId();
if (collIdLong != null) {
String collId = Long.toString(collIdLong);
ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
if (updates == null) {
updates = new ConcurrentHashMap<>();
collStateUpdates.put(collId, updates);
}
if (collIdLong != null) {
for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
// MRM TODO: slice state should be done like replica state, this is a hack
forceWrite = collection;
ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
update.sliceState = (String) stateUpdateEntry.getValue();
update.sliceName = stateUpdateEntry.getKey();
updates.put(update.sliceName, update);
}
}
}
break;
default:
throw new RuntimeException("unknown operation:" + op + " contents:" + stateUpdateMessage.getProperties());
}
} catch (Exception e) {
log.error("Overseer state update queue processing failed", e);
continue;
}
}
try {
getZkStateWriter().enqueueUpdate(null, collStateUpdates, true);
} catch (Exception e) {
log.error("Overseer state update queue processing failed", e);
}
Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
for (String collection : collections) {
overseer.writePendingUpdates(collection);
}
if (collections.size() == 0 && forceWrite != null) {
overseer.writePendingUpdates(forceWrite);
}
wroteUpdates = true;
} catch (Exception e) {
log.error("Exception handling Overseer state updates",e);
} finally {
try {
if (fullPaths.size() > 0 && wroteUpdates) {
if (!zkController.getZkClient().isClosed()) {
try {
delCountDownLatch = zkController.getZkClient().delete(fullPaths, false);
} catch (Exception e) {
log.warn("Failed deleting processed items", e);
}
}
}
if (delCountDownLatch != null) {
try {
boolean success = delCountDownLatch.await(10, TimeUnit.SECONDS);
if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
}
}
} finally {
ourLock.unlock();
}
}
}
}
public static class StateEntry {
public ZkNodeProps message;
public String znodeName;
}
private static class CollectionWorkQueueWatcher extends QueueWatcher {
private final OverseerCollectionMessageHandler collMessageHandler;
private final OverseerConfigSetMessageHandler configMessageHandler;
private final DistributedMap failureMap;
private final DistributedMap runningMap;
private final DistributedMap completedMap;
public CollectionWorkQueueWatcher(CoreContainer cc, String myId, LBHttp2SolrClient overseerLbClient, String adminPath, Stats stats, Overseer overseer) throws KeeperException {
super(cc, overseer, Overseer.OVERSEER_COLLECTION_QUEUE_WORK);
collMessageHandler = new OverseerCollectionMessageHandler(cc, myId, overseerLbClient, adminPath, stats, overseer);
configMessageHandler = new OverseerConfigSetMessageHandler(cc);
failureMap = Overseer.getFailureMap(cc.getZkController().getZkClient());
runningMap = Overseer.getRunningMap(cc.getZkController().getZkClient());
completedMap = Overseer.getCompletedMap(cc.getZkController().getZkClient());
}
@Override
public void close() {
super.close();
IOUtils.closeQuietly(collMessageHandler);
IOUtils.closeQuietly(configMessageHandler);
}
@Override
public void start(boolean weAreReplacement) throws KeeperException, InterruptedException {
if (closed) return;
zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
startItems = super.getItems();
log.info("Overseer found entries on start {}", startItems);
if (startItems.size() > 0) {
processQueueItems(startItems, true, weAreReplacement);
}
}
@Override
protected void processQueueItems(List<String> items, boolean onStart, boolean weAreReplacement) {
if (closed) return;
ourLock.lock();
List<String> fullPaths = new ArrayList<>(items.size());
try {
log.debug("Found collection queue items {} onStart={}", items, onStart);
for (String item : items) {
fullPaths.add(path + "/" + item);
}
Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
if (data.size() > 0) {
for (Map.Entry<String,byte[]> entry : data.entrySet()) {
overseer.getTaskZkWriterExecutor().submit(() -> {
MDCLoggingContext.setNode(zkController.getNodeName());
try {
runAsync(entry, onStart);
} catch (Exception e) {
log.error("failed processing collection queue items " + items, e);
}
});
}
}
} finally {
try {
if (zkController.getZkClient().isAlive()) {
zkController.getZkClient().delete(fullPaths, true);
}
} catch (Exception e) {
log.warn("Delete items failed {}", e.getMessage());
}
ourLock.unlock();
}
}
private void runAsync(Map.Entry<String,byte[]> entry, boolean onStart) {
ZkStateWriter zkWriter = overseer.getZkStateWriter();
if (zkWriter == null) {
log.warn("Overseer appears closed");
throw new AlreadyClosedException();
}
try {
byte[] item = entry.getValue();
if (item == null) {
log.error("empty item {}", entry.getKey());
return;
}
String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX + entry.getKey()
.substring(entry.getKey().lastIndexOf("-") + 1);
final ZkNodeProps message = ZkNodeProps.load(item);
try {
String operation = message.getStr(Overseer.QUEUE_OPERATION);
// if (onStart) {
// log.info("Found operation on start {} {}", responsePath, message);
//
// Stat stat = zkController.getZkClient().exists(responsePath, null);
// if (stat != null && stat.getDataLength() == 0) {
// log.info("Found response and no data on start for {} {}", message, responsePath);
//
// OverseerSolrResponse rsp = collMessageHandler.processMessage(message, "cleanup", zkWriter);
// if (rsp == null) {
// // zkController.getZkClient().delete(entry.getKey(), -1);
// log.info("Set response data since operation looked okay {} {}", message, responsePath);
// NamedList response = new NamedList();
// response.add("success", true);
// OverseerSolrResponse osr = new OverseerSolrResponse(response);
// byte[] sdata = OverseerSolrResponseSerializer.serialize(osr);
// zkController.getZkClient().setData(responsePath, sdata, true);
// return;
// } else {
// log.info("Tried to cleanup partially executed cmd {} {}", message, responsePath);
// }
// }
// }
if (operation == null) {
log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
return;
}
final String asyncId = message.getStr(ASYNC);
OverseerSolrResponse response;
if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
response = configMessageHandler.processMessage(message, operation, zkWriter);
} else {
response = collMessageHandler.processMessage(message, operation, zkWriter);
}
if (log.isDebugEnabled()) log.debug("response {}", response);
if (response == null) {
NamedList nl = new NamedList();
nl.add("success", "true");
response = new OverseerSolrResponse(nl);
} else if (response.getResponse().size() == 0) {
response.getResponse().add("success", "true");
}
if (asyncId != null) {
if (log.isDebugEnabled()) {
log.debug("Updated completed map for task with zkid:[{}]", asyncId);
}
completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response), CreateMode.PERSISTENT);
} else {
byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
completedMap.update(entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1), sdata);
log.debug("Completed task:[{}] {} {}", message, response.getResponse(), responsePath);
}
} catch (Exception e) {
log.error("Exception processing entry");
}
} catch (Exception e) {
log.error("Exception processing entry", e);
}
}
}
}