blob: 5673a6d4874e74282e6bf12037c75b0e1a493195 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.api.collections;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.BadVersionException;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.LockTree;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerMessageHandler;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.OverseerTaskProcessor;
import org.apache.solr.cloud.Stats;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.overseer.CollectionMutator;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.client.solrj.response.RequestStatusState.COMPLETED;
import static org.apache.solr.client.solrj.response.RequestStatusState.FAILED;
import static org.apache.solr.client.solrj.response.RequestStatusState.NOT_FOUND;
import static org.apache.solr.client.solrj.response.RequestStatusState.RUNNING;
import static org.apache.solr.client.solrj.response.RequestStatusState.SUBMITTED;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ALIASPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESNAPSHOT;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REINDEXCOLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.RENAME;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.RESTORE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.util.Utils.makeMap;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
/**
* A {@link OverseerMessageHandler} that handles Collections API related
* overseer messages.
*/
public class OverseerCollectionMessageHandler implements OverseerMessageHandler, SolrCloseable {
public static final boolean CREATE_NODE_SET_SHUFFLE_DEFAULT = true;
public static final String CREATE_NODE_SET_SHUFFLE = CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM;
public static final String ROUTER = "router";
public static final String SHARDS_PROP = "shards";
public static final String REQUESTID = "requestid";
public static final String COLL_PROP_PREFIX = "property.";
public static final String ONLY_IF_DOWN = "onlyIfDown";
public static final String SHARD_UNIQUE = "shardUnique";
public static final String ONLY_ACTIVE_NODES = "onlyactivenodes";
static final String SKIP_CREATE_REPLICA_IN_CLUSTER_STATE = "skipCreateReplicaInClusterState";
public static final Map<String, Object> COLLECTION_PROPS_AND_DEFAULTS = Collections.unmodifiableMap(makeMap(
ROUTER, DocRouter.DEFAULT_NAME,
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.NRT_REPLICAS, "1",
ZkStateReader.TLOG_REPLICAS, "0",
ZkStateReader.PULL_REPLICAS, "0",
ZkStateReader.MAX_SHARDS_PER_NODE, "1",
WITH_COLLECTION, null,
COLOCATED_WITH, null));
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String FAILURE_FIELD = "failure";
public static final String SUCCESS_FIELD = "success";
final LBHttp2SolrClient overseerLbClient;
Overseer overseer;
HttpShardHandlerFactory shardHandlerFactory;
String adminPath;
ZkStateReader zkStateReader;
SolrCloudManager cloudManager;
String myId;
Stats stats;
TimeSource timeSource;
// Set that tracks collections that are currently being processed by a running task.
// This is used for handling mutual exclusion of the tasks.
final private LockTree lockTree = new LockTree();
// ExecutorService tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
// new SynchronousQueue<>(),
// new SolrNamedThreadFactory("OverseerCollectionMessageHandlerThreadFactory"));
public static final Random RANDOM;
static {
// We try to make things reproducible in the context of our tests by initializing the random instance
// based on the current seed
String seed = System.getProperty("tests.seed");
if (seed == null) {
RANDOM = new Random();
} else {
RANDOM = new Random(seed.hashCode());
}
}
final Map<CollectionAction, Cmd> commandMap;
private volatile boolean isClosed;
public OverseerCollectionMessageHandler(CoreContainer cc, String myId,
LBHttp2SolrClient overseerLbClient,
String adminPath,
Stats stats,
Overseer overseer) {
// TODO: can leak single instance of this oddly in AddReplicaTest
// assert ObjectReleaseTracker.track(this);
this.zkStateReader = cc.getZkController().getZkStateReader();
this.shardHandlerFactory = (HttpShardHandlerFactory) cc.getShardHandlerFactory();
this.overseerLbClient = overseerLbClient;
this.adminPath = adminPath;
this.myId = myId;
this.stats = stats;
this.overseer = overseer;
this.cloudManager = overseer.getSolrCloudManager();
this.timeSource = cloudManager.getTimeSource();
this.isClosed = false;
commandMap = new ImmutableMap.Builder<CollectionAction, Cmd>()
.put(REPLACENODE, new ReplaceNodeCmd(this))
.put(DELETENODE, new DeleteNodeCmd(this))
.put(BACKUP, new BackupCmd(this))
.put(RESTORE, new RestoreCmd(this))
.put(CREATESNAPSHOT, new CreateSnapshotCmd(this))
.put(DELETESNAPSHOT, new DeleteSnapshotCmd(this))
.put(SPLITSHARD, new SplitShardCmd(this))
.put(MOCK_COLL_TASK, this::mockOperation)
.put(MOCK_SHARD_TASK, this::mockOperation)
.put(MOCK_REPLICA_TASK, this::mockOperation)
.put(CREATESHARD, new CreateShardCmd(this))
.put(MIGRATE, new MigrateCmd(this))
.put(CREATE, new CreateCollectionCmd(this, overseer.getCoreContainer(), cloudManager))
.put(MODIFYCOLLECTION, this::modifyCollection)
.put(ADDREPLICAPROP, this::processReplicaAddPropertyCommand)
.put(DELETEREPLICAPROP, this::processReplicaDeletePropertyCommand)
.put(BALANCESHARDUNIQUE, this::balanceProperty)
.put(REBALANCELEADERS, this::processRebalanceLeaders)
.put(RELOAD, this::reloadCollection)
.put(DELETE, new DeleteCollectionCmd(this))
.put(CREATEALIAS, new CreateAliasCmd(this))
.put(DELETEALIAS, new DeleteAliasCmd(this))
.put(ALIASPROP, new SetAliasPropCmd(this))
.put(MAINTAINROUTEDALIAS, new MaintainRoutedAliasCmd(this))
.put(OVERSEERSTATUS, new OverseerStatusCmd(this))
.put(DELETESHARD, new DeleteShardCmd(this))
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
.put(ADDREPLICA, new CollectionCmdResponse(this))
.put(MOVEREPLICA, new MoveReplicaCmd(this))
.put(REINDEXCOLLECTION, new ReindexCollectionCmd(this))
.put(RENAME, new RenameCmd(this))
.build()
;
}
@Override
@SuppressWarnings("unchecked")
public OverseerSolrResponse processMessage(ZkNodeProps message, String operation, ZkStateWriter zkWriter) throws InterruptedException {
MDCLoggingContext.setCollection(message.getStr(COLLECTION));
MDCLoggingContext.setCoreName(message.getStr(REPLICA_PROP));
if (log.isDebugEnabled()) log.debug("OverseerCollectionMessageHandler.processMessage : {} , {}", operation, message);
ClusterState clusterState = zkWriter.getClusterstate();
@SuppressWarnings({"rawtypes"}) NamedList results = new NamedList();
try {
String collection = message.getStr("collection");
if (collection == null) {
collection = message.getStr("name");
}
// if (operation.equals("cleanup")) {
// log.info("Found item that needs cleanup {}", message);
// String op = message.getStr(Overseer.QUEUE_OPERATION);
// CollectionAction action = getCollectionAction(op);
// Cmd command = commandMap.get(action);
// boolean drop = command.cleanup(message);
// if (drop) {
// return null;
// }
// return new OverseerSolrResponse(null);
// }
CollectionAction action = getCollectionAction(operation);
Cmd command = commandMap.get(action);
Future writeFuture = null;
Future writeFuture2 = null;
if (command != null) {
CollectionCmdResponse.Response responce = command.call(clusterState, message, results);
if (responce == null) {
throw new SolrException(ErrorCode.SERVER_ERROR, "CMD did not return a response:" + operation);
}
if (log.isDebugEnabled()) log.debug("Command returned clusterstate={} results={}", responce.clusterState, results);
CollectionCmdResponse.Response asyncResp = null;
if (responce.clusterState != null) {
DocCollection docColl = responce.clusterState.getCollectionOrNull(collection);
if (docColl != null) {
zkWriter.enqueueUpdate(docColl, null, false);
if (responce != null && responce.asyncFinalRunner != null) {
asyncResp = responce.asyncFinalRunner.call();
}
if (asyncResp == null || asyncResp.writeFuture == null) {
writeFuture2 = overseer.writePendingUpdates(collection);
}
} else {
if (responce != null && responce.asyncFinalRunner != null) {
asyncResp = responce.asyncFinalRunner.call();
}
}
}
// MRM TODO: consider
if (asyncResp != null) {
if (log.isDebugEnabled()) log.debug("Finalize after Command returned clusterstate={}", asyncResp.clusterState);
if (asyncResp.clusterState != null) {
DocCollection docColl = asyncResp.clusterState.getCollectionOrNull(collection);
if (docColl != null) {
zkWriter.enqueueUpdate(docColl, null, false);
writeFuture = overseer.writePendingUpdates(collection);
}
}
}
if (message.getBool(WAIT_FOR_FINAL_STATE, false) && !action.equals(DELETE)) {
if (collection != null) {
if (writeFuture != null) {
writeFuture.get();
}
if (writeFuture2 != null) {
writeFuture2.get();
}
if (responce.writeFuture != null) {
responce.writeFuture.get();
}
if (asyncResp != null && asyncResp.writeFuture != null) {
asyncResp.writeFuture.get();
}
Integer version = zkWriter.lastWrittenVersion(collection);
if (version != null) {
results.add("csver", version);
} else {
//deleted
}
}
}
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
}
if (results.get("success") == null) results.add("success", new NamedList<>());
if (results.get("failure") != null) {
SimpleOrderedMap<Object> nl = new SimpleOrderedMap<>();
nl.add("msg", "Operation failed " + operation + " " + results.get("failure"));
nl.add("rspCode", 500);
results.add("exception", nl);
}
} catch (AlreadyClosedException e) {
log.warn("Overseer hit already closed exception", e);
throw e;
} catch (Exception e) {
String collName = message.getStr("collection");
if (collName == null) collName = message.getStr(NAME);
if (collName == null) {
if (log.isDebugEnabled()) log.debug("Operation " + operation + " failed", e);
} else {
if (log.isDebugEnabled()) log.debug("Collection: " + collName + " operation: " + operation + " failed", e);
}
results.add("Operation " + operation + " caused exception:", e);
SimpleOrderedMap<Object> nl = new SimpleOrderedMap<>();
nl.add("msg", e.getMessage());
nl.add("rspCode", e instanceof SolrException ? ((SolrException) e).code() : -1);
results.add("exception", nl);
}
return new OverseerSolrResponse(results);
}
@SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
@SuppressWarnings({"unchecked"})
private CollectionCmdResponse.Response mockOperation(ClusterState state, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws InterruptedException {
//only for test purposes
Thread.sleep(message.getInt("sleep", 1));
if (log.isInfoEnabled()) {
log.info("MOCK_TASK_EXECUTED time {} data {}", System.currentTimeMillis(), Utils.toJSONString(message));
}
results.add("MOCK_FINISHED", System.currentTimeMillis());
return null;
}
private CollectionAction getCollectionAction(String operation) {
CollectionAction action = CollectionAction.get(operation);
if (action == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
}
return action;
}
@SuppressWarnings({"unchecked"})
private CollectionCmdResponse.Response reloadCollection(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws KeeperException, InterruptedException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
String asyncId = message.getStr(ASYNC);
collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId);
CollectionCmdResponse.Response response = new CollectionCmdResponse.Response();
response.results = results;
// MRM TODO: - we don't change this for this cmd, we should be able to indicate that to caller
response.clusterState = null;
return response;
}
@SuppressWarnings("unchecked")
private CollectionCmdResponse.Response processRebalanceLeaders(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
checkRequired(message, COLLECTION_PROP, NODE_NAME_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP, REJOIN_AT_HEAD_PROP);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
params.set(NODE_NAME_PROP, message.getStr(NODE_NAME_PROP));
params.set("node", message.getStr("node"));
params.set(SHARD_ID_PROP, message.getStr(SHARD_ID_PROP));
params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
String baseUrl = zkStateReader.getBaseUrlForNodeName(message.getStr(message.getStr(NODE_NAME_PROP)));
ShardRequest sreq = new ShardRequest();
sreq.nodeName = message.getStr(ZkStateReader.CORE_NAME_PROP);
// yes, they must use same admin handler path everywhere...
params.set("qt", adminPath);
sreq.purpose = ShardRequest.PURPOSE_PRIVATE;
sreq.shards = new String[] {baseUrl};
sreq.actualShards = sreq.shards;
sreq.params = params;
ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseerLbClient);
shardHandler.submit(sreq, baseUrl, sreq.params);
return null;
}
@SuppressWarnings("unchecked")
private CollectionCmdResponse.Response processReplicaAddPropertyCommand(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, ZkStateReader.NUM_SHARDS_PROP, "shards", REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
Map<String, Object> propMap = new HashMap<>(message.getProperties().size() + 1);
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
propMap.putAll(message.getProperties());
ZkNodeProps m = new ZkNodeProps(propMap);
overseer.offerStateUpdate(Utils.toJSON(m));
return null;
}
private CollectionCmdResponse.Response processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
Map<String, Object> propMap = new HashMap<>(message.getProperties().size() + 1);
propMap.put(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
propMap.putAll(message.getProperties());
ZkNodeProps m = new ZkNodeProps(propMap);
overseer.offerStateUpdate(Utils.toJSON(m));
return null;
}
private CollectionCmdResponse.Response balanceProperty(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP +
"' parameters are required for the BALANCESHARDUNIQUE operation, no action taken");
}
Map<String, Object> m = new HashMap<>(message.getProperties().size() + 1);
m.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
m.putAll(message.getProperties());
overseer.offerStateUpdate(Utils.toJSON(m));
return null;
}
/**
* Get collection status from cluster state.
* Can return collection status by given shard name.
*
*
* @param collection collection map parsed from JSON-serialized {@link ClusterState}
* @param name collection name
* @param requestedShards a set of shards to be returned in the status.
* An empty or null values indicates <b>all</b> shards.
* @return map of collection properties
*/
@SuppressWarnings("unchecked")
private Map<String, Object> getCollectionStatus(Map<String, Object> collection, String name, Set<String> requestedShards) {
if (collection == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " not found");
}
if (requestedShards == null || requestedShards.isEmpty()) {
return collection;
} else {
Map<String, Object> shards = (Map<String, Object>) collection.get("shards");
Map<String, Object> selected = new HashMap<>(1);
for (String selectedShard : requestedShards) {
if (!shards.containsKey(selectedShard)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " shard: " + selectedShard + " not found");
}
selected.put(selectedShard, shards.get(selectedShard));
collection.put("shards", selected);
}
return collection;
}
}
@SuppressWarnings("unchecked")
CollectionCmdResponse.Response deleteReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
return ((DeleteReplicaCmd) commandMap.get(DELETEREPLICA)).call(clusterState, message, results);
}
void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception {
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
ZkStateReader.CORE_NAME_PROP, core,
ZkStateReader.NODE_NAME_PROP, replica.getStr(ZkStateReader.NODE_NAME_PROP),
ZkStateReader.COLLECTION_PROP, collectionName);
overseer.offerStateUpdate(Utils.toJSON(m));
}
void checkRequired(ZkNodeProps message, String... props) {
for (String prop : props) {
if(message.get(prop) == null){
throw new SolrException(ErrorCode.BAD_REQUEST, StrUtils.join(Arrays.asList(props),',') +" are required params" );
}
}
}
void checkResults(String label, NamedList<Object> results, boolean failureIsFatal) throws SolrException {
Object failure = results.get("failure");
if (failure == null) {
failure = results.get("error");
}
if (failure != null) {
String msg = "Error: " + label + ": " + Utils.toJSONString(results);
if (failureIsFatal) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg);
} else {
log.error(msg);
}
}
}
@SuppressWarnings({"unchecked"})
void commit(@SuppressWarnings({"rawtypes"})NamedList results, String slice, Replica parentShardLeader) {
log.debug("Calling soft commit to make sub shard updates visible");
String coreUrl = parentShardLeader.getCoreUrl();
// HttpShardHandler is hard coded to send a QueryRequest hence we go direct
// and we force open a searcher so that we have documents to show upon switching states
UpdateResponse updateResponse = null;
try {
updateResponse = softCommit(coreUrl, overseer.getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient());
processResponse(results, null, coreUrl, updateResponse, slice, Collections.emptySet());
} catch (Exception e) {
ParWork.propagateInterrupt(e);
processResponse(results, e, coreUrl, updateResponse, slice, Collections.emptySet());
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib softCommit on: " + coreUrl, e);
}
}
static UpdateResponse softCommit(String url, Http2SolrClient httpClient) throws SolrServerException, IOException {
UpdateRequest ureq = new UpdateRequest();
ureq.setBasePath(url);
ureq.setParams(new ModifiableSolrParams());
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true, true);
return ureq.process(httpClient);
}
void waitForNewShard(String collectionName, String sliceName) {
log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
try {
zkStateReader.waitForState(collectionName, 30, TimeUnit.SECONDS, (n, c) -> {
if (c == null)
return false;
Slice slice = c.getSlice(sliceName);
if (slice != null) {
return true;
}
return false;
});
} catch (TimeoutException e) {
String error = "Timeout waiting for new shard.";
throw new ZkController.NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted", e);
}
}
DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
if (a == null || b == null || !a.overlaps(b)) {
return null;
} else if (a.isSubsetOf(b))
return a;
else if (b.isSubsetOf(a))
return b;
else if (b.includes(a.max)) {
return new DocRouter.Range(b.min, a.max);
} else {
return new DocRouter.Range(a.min, b.max);
}
}
void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
// Now add the property.key=value pairs
for (String key : message.keySet()) {
if (key.startsWith(COLL_PROP_PREFIX)) {
params.set(key, message.getStr(key));
}
}
}
void addPropertyParams(ZkNodeProps message, Map<String, Object> map) {
// Now add the property.key=value pairs
for (String key : message.keySet()) {
if (key.startsWith(COLL_PROP_PREFIX)) {
map.put(key, message.getStr(key));
}
}
}
private CollectionCmdResponse.Response modifyCollection(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
final String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
//the rest of the processing is based on writing cluster state properties
//remove the property here to avoid any errors down the pipeline due to this property appearing
String configName = (String) message.getProperties().remove(CollectionAdminParams.COLL_CONF);
if (configName != null) {
validateConfigOrThrowSolrException(configName);
createConfNode(cloudManager.getDistribStateManager(), configName, collectionName);
reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
}
clusterState = new CollectionMutator(cloudManager).modifyCollection(clusterState, message);
// if switching to/from read-only mode reload the collection
if (message.keySet().contains(ZkStateReader.READ_ONLY)) {
reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
}
CollectionCmdResponse.Response response = new CollectionCmdResponse.Response();
response.clusterState = clusterState;
return response;
}
CollectionCmdResponse.Response cleanupCollection(String collectionName, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
log.error("Cleaning up collection [{}].", collectionName);
Map<String, Object> props = makeMap(
Overseer.QUEUE_OPERATION, DELETE.toLower(),
NAME, collectionName);
CollectionCmdResponse.Response response = commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
return response;
}
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreUrls, boolean requireActive) {
log.info("wait to see {} in clusterstate {}", coreUrls, zkStateReader.getClusterState().getCollection(collectionName));
assert coreUrls.size() > 0;
AtomicReference<Map<String, Replica>> result = new AtomicReference<>();
AtomicReference<String> errorMessage = new AtomicReference<>();
try {
zkStateReader.waitForState(collectionName, 60, TimeUnit.SECONDS, (n, c) -> { // TODO config timeout up for prod, down for non nightly tests
if (c == null)
return false;
Map<String, Replica> r = new HashMap<>();
for (String coreUrl : coreUrls) {
if (r.containsKey(coreUrl)) continue;
Collection<Slice> slices = c.getSlices();
if (slices != null) {
for (Slice slice : slices) {
for (Replica replica : slice.getReplicas()) {
if (coreUrl.equals(replica.getCoreUrl()) && ((requireActive ? replica.getState().equals(Replica.State.ACTIVE) : true)
&& zkStateReader.isNodeLive(replica.getNodeName()))) {
r.put(coreUrl, replica);
break;
}
}
}
}
}
if (r.size() == coreUrls.size()) {
result.set(r);
return true;
} else {
errorMessage.set("Timed out waiting to see all replicas: " + coreUrls + " in cluster state. Last state: " + c);
return false;
}
});
} catch (TimeoutException e) {
String error = errorMessage.get();
if (error == null)
error = "Timeout waiting for collection state.";
throw new SolrException(ErrorCode.SERVER_ERROR, error);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted", e);
}
return result.get();
}
CollectionCmdResponse.Response addReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
CollectionCmdResponse.Response response = commandMap.get(ADDREPLICA).call(clusterState, message, results);
return response;
}
CollectionCmdResponse.Response addReplicaWithResp(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
CollectionCmdResponse.Response response = ((CollectionCmdResponse) commandMap.get(ADDREPLICA)).call(clusterState, message, results);
return response;
}
void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
if(!isValid) {
//overseer.getZkStateReader().getZkClient().printLayout();
throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find the specified config set: " + configName);
}
}
/**
* This doesn't validate the config (path) itself and is just responsible for creating the confNode.
* That check should be done before the config node is created.
*/
public static void createConfNode(DistribStateManager stateManager, String configName, String coll) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
if (configName != null) {
String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
log.debug("creating collections conf node {} ", collDir);
byte[] data = Utils.toJSON(makeMap(ZkController.CONFIGNAME_PROP, configName));
if (stateManager.hasData(collDir)) {
stateManager.setData(collDir, data, -1);
} else {
stateManager.makePath(collDir, data, CreateMode.PERSISTENT, false);
}
} else {
throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
}
}
private List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList<Object> results, Replica.State stateMatcher, String asyncId) throws KeeperException, InterruptedException {
return collectionCmd( message, params, results, stateMatcher, asyncId, Collections.emptySet());
}
List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList<Object> results, Replica.State stateMatcher,
String asyncId, Set<String> okayExceptions) throws KeeperException, InterruptedException {
return collectionCmd(message, params, results, stateMatcher, asyncId, okayExceptions, null, null);
}
/**
* Send request to all replicas of a collection
* @return List of replicas which is not live for receiving the request
*/
List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList<Object> results, Replica.State stateMatcher,
String asyncId, Set<String> okayExceptions, ShardHandler shardHandler,
ShardRequestTracker shardRequestTracker) throws KeeperException, InterruptedException {
log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
String collectionName = message.getStr(NAME);
boolean processResponses = false;
if (shardHandler == null) {
shardHandler = shardHandlerFactory.getShardHandler(overseerLbClient);
processResponses = true;
}
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollectionOrNull(collectionName);
if (coll == null) return null;
List<Replica> notLivesReplicas = new ArrayList<>();
if (shardRequestTracker == null) {
shardRequestTracker = new ShardRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION), adminPath, zkStateReader, shardHandlerFactory, overseer);
}
for (Slice slice : coll.getSlices()) {
notLivesReplicas.addAll(shardRequestTracker.sliceCmd(params, stateMatcher, slice, shardHandler));
}
if (processResponses) {
shardRequestTracker.processResponses(results, shardHandler, false, null, okayExceptions);
}
return notLivesReplicas;
}
private static void processResponse(NamedList<Object> results, ShardResponse srsp, Set<String> okayExceptions) {
Throwable e = srsp.getException();
String nodeName = srsp.getNodeName();
SolrResponse solrResponse = srsp.getSolrResponse();
String shard = srsp.getShard();
processResponse(results, e, nodeName, solrResponse, shard, okayExceptions);
}
@SuppressWarnings("deprecation")
private static void processResponse(NamedList<Object> results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set<String> okayExceptions) {
String rootThrowable = null;
if (e instanceof BaseHttpSolrClient.RemoteSolrException) {
rootThrowable = ((BaseHttpSolrClient.RemoteSolrException) e).getRootThrowable();
}
if (e != null && (rootThrowable == null || !okayExceptions.contains(rootThrowable))) {
log.error("Error from shard: {}", shard, e);
addFailure(results, nodeName, e.getClass().getName() + ":" + e.getMessage());
} else {
addSuccess(results, nodeName, solrResponse.getResponse());
}
}
@SuppressWarnings("unchecked")
private static void addFailure(NamedList<Object> results, String key, Object value) {
SimpleOrderedMap<Object> failure = (SimpleOrderedMap<Object>) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap<>();
results.add("failure", failure);
}
failure.add(key, value);
}
@SuppressWarnings("unchecked")
private static void addSuccess(NamedList<Object> results, String key, Object value) {
SimpleOrderedMap<Object> success = (SimpleOrderedMap<Object>) results.get("success");
if (success == null) {
success = new SimpleOrderedMap<>();
results.add("success", success);
}
success.add(key, value);
}
private static Set<CountDownLatch> latches = ConcurrentHashMap.newKeySet();
private static NamedList<Object> waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId, String adminPath, ZkStateReader zkStateReader, HttpShardHandlerFactory shardHandlerFactory,
Overseer overseer) throws KeeperException, InterruptedException {
log.info("waitForCoreAdminAsyncCallToComplete {}", requestId);
ZkController zkController = overseer.getCoreContainer().getZkController();
ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.overseerLbClient);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
params.set(CoreAdminParams.REQUESTID, requestId);
int counter = 0;
ShardRequest sreq;
sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = params;
CountDownLatch latch = new CountDownLatch(1);
latches.add(latch);
// mn- from DistributedMap
final String successPath = "/overseer/collection-map-completed" + "/mn-" + requestId;
final String failAsyncPathToWaitOn = "/overseer/collection-map-failure" + "/mn-" + requestId;
final String runningAsyncPathToWaitOn = "/overseer/collection-map-running" + "/mn-" + requestId;
if (zkController.getOverseerRunningMap().contains(requestId)) {
WatchForResponseNode waitForResponse = new WatchForResponseNode(latch, zkStateReader.getZkClient(), successPath);
try {
Stat rstats1 = zkStateReader.getZkClient().exists(successPath, waitForResponse);
if (log.isDebugEnabled()) log.debug("created watch for async response, stat={}", rstats1);
if (rstats1 != null) {
latch.countDown();
}
Stat rstats2 = zkStateReader.getZkClient().exists(failAsyncPathToWaitOn, waitForResponse);
if (log.isDebugEnabled()) log.debug("created watch for async response, stat={}", rstats2);
if (rstats2 != null) {
latch.countDown();
}
if (log.isDebugEnabled()) log.debug("created watch for response {}", requestId);
boolean success = false;
for (int i = 0; i < 15; i++) {
success = latch.await(3, TimeUnit.SECONDS); // MRM TODO: - still need a central timeout strat
if (success) {
if (log.isDebugEnabled()) log.debug("latch was triggered {}", requestId);
break;
} else {
if (log.isDebugEnabled()) log.debug("no latch, await again {}", requestId);
}
}
if (!success) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Timeout waiting to see async zk node " + successPath);
}
} finally {
IOUtils.closeQuietly(waitForResponse);
latch.countDown();
latches.remove(latch);
}
}
if (log.isDebugEnabled()) log.debug("prepare response {}", requestId);
SolrQueryResponse srsp = new SolrQueryResponse();
final NamedList<Object> results = srsp.getValues();
if (zkController.getOverseerCompletedMap().contains(requestId)) {
NamedList<String> status = new NamedList<>();
status.add("state", COMPLETED.toString());
status.add("msg", "found [" + requestId + "] in completed tasks");
results.add("STATUS", status);
} else if (zkController.getOverseerFailureMap().contains(requestId)) {
NamedList<String> status = new NamedList<>();
status.add("state", FAILED.toString());
status.add("msg", "found [" + requestId + "] in failed tasks");
results.add("STATUS", status);
} else if (zkController.getOverseerRunningMap().contains(requestId)) {
NamedList<String> status = new NamedList<>();
status.add("state", RUNNING.toString());
status.add("msg", "found [" + requestId + "] in running tasks");
results.add("STATUS", status);
} else if (zkController.getOverseerCollectionQueue().containsTaskWithRequestId(ASYNC, requestId)) {
NamedList<String> status = new NamedList<>();
status.add("state", SUBMITTED.toString());
status.add("msg", "found [" + requestId + "] in submitted tasks");
results.add("STATUS", status);
} else {
NamedList<String> status = new NamedList<>();
status.add("state", NOT_FOUND.toString());
status.add("msg", "Did not find [" + requestId + "] in any tasks queue");
results.add("STATUS", status);
}
String r = ((NamedList<String>) srsp.getValues().get("STATUS")).get("state").toLowerCase(Locale.ROOT);
if (r.equals("running")) {
if (log.isDebugEnabled()) log.debug("The task is still RUNNING, continuing to wait.");
throw new SolrException(ErrorCode.BAD_REQUEST,
"Task is still running even after reporting complete requestId: " + requestId + "" + srsp.getValues().get("STATUS") + "retried " + counter + "times");
} else if (r.equals("completed")) {
// we're done with this entry in the DistributeMap
overseer.getCoreContainer().getZkController().clearAsyncId(requestId);
if (log.isDebugEnabled()) log.debug("The task is COMPLETED, returning");
return srsp.getValues();
} else if (r.equals("failed")) {
// TODO: Improve this. Get more information.
if (log.isDebugEnabled()) log.debug("The task is FAILED, returning");
} else if (r.equals("not_found")) {
if (log.isDebugEnabled()) log.debug("The task is notfound, retry");
return srsp.getValues();
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getValues().get("STATUS"));
}
throw new SolrException(ErrorCode.SERVER_ERROR, "No response on request for async status url=" + replica + " params=" + sreq.params);
}
@Override
public String getName() {
return "Overseer Collection Message Handler";
}
@Override
public String getTimerName(String operation) {
return "collection_" + operation;
}
@Override
public String getTaskKey(ZkNodeProps message) {
return message.containsKey(COLLECTION_PROP) ?
message.getStr(COLLECTION_PROP) : message.getStr(NAME);
}
private long sessionId = -1;
private LockTree.Session lockSession;
@Override
public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
if (lockSession == null || sessionId != taskBatch.getId()) {
//this is always called in the same thread.
//Each batch is supposed to have a new taskBatch
//So if taskBatch changes we must create a new Session
// also check if the running tasks are empty. If yes, clear lockTree
// this will ensure that locks are not 'leaked'
if(taskBatch.getRunningTasks() == 0) lockTree.clear();
lockSession = lockTree.getSession();
}
return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)),
Arrays.asList(
getTaskKey(message),
message.getStr(ZkStateReader.SHARD_ID_PROP),
message.getStr(ZkStateReader.REPLICA_PROP))
);
}
@Override
public void close() throws IOException {
this.isClosed = true;
latches.forEach(countDownLatch -> countDownLatch.countDown());
latches.clear();
// assert ObjectReleaseTracker.release(this);
}
@Override
public boolean isClosed() {
return isClosed;
}
protected interface Cmd {
CollectionCmdResponse.Response call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception;
default boolean cleanup(ZkNodeProps message) {
return false;
}
}
protected interface Finalize {
CollectionCmdResponse.Response call() throws Exception;
}
/*
* backward compatibility reasons, add the response with the async ID as top level.
* This can be removed in Solr 9
*/
@Deprecated
static boolean INCLUDE_TOP_LEVEL_RESPONSE = true;
public ShardRequestTracker syncRequestTracker() {
return new ShardRequestTracker(null,null, adminPath, zkStateReader, shardHandlerFactory, overseer);
}
public ShardRequestTracker asyncRequestTracker(String asyncId, String operation) {
return new ShardRequestTracker(asyncId, operation, adminPath, zkStateReader, shardHandlerFactory, overseer);
}
public static class ShardRequestTracker{
private final String asyncId;
private final Map<String,String> shardAsyncIdByNode = new ConcurrentHashMap<>();
private final String adminPath;
private final ZkStateReader zkStateReader;
private final HttpShardHandlerFactory shardHandlerFactory;
private final Overseer overseer;
private final String operation;
ShardRequestTracker(String asyncId, String operation, String adminPath, ZkStateReader reader, HttpShardHandlerFactory shardHandlerFactory, Overseer overseer) {
this.asyncId = asyncId;
this.adminPath = adminPath;
this.operation = operation;
this.zkStateReader = reader;
this.shardHandlerFactory = shardHandlerFactory;
this.overseer = overseer;
}
/**
* Send request to all replicas of a slice
* @return List of replicas which is not live for receiving the request
*/
public List<Replica> sliceCmd(ModifiableSolrParams params, Replica.State stateMatcher,
Slice slice, ShardHandler shardHandler) {
List<Replica> notLiveReplicas = new ArrayList<>();
for (Replica replica : slice.getReplicas()) {
if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
if (zkStateReader.isNodeLive(replica.getNodeName())) {
// For thread safety, only simple clone the ModifiableSolrParams
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
cloneParams.add(params);
cloneParams.set(CoreAdminParams.CORE, replica.getName());
sendShardRequest(replica.getNodeName(), cloneParams, shardHandler);
} else {
notLiveReplicas.add(replica);
}
}
}
return notLiveReplicas;
}
public void sendShardRequest(String nodeName, ModifiableSolrParams params,
ShardHandler shardHandler) {
sendShardRequest(nodeName, params, shardHandler, adminPath, zkStateReader);
}
public void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
String adminPath, ZkStateReader zkStateReader) {
if (asyncId != null) {
String coreAdminAsyncId = asyncId + "-" + operation + "-" + Math.abs(System.nanoTime());
params.set(ASYNC, coreAdminAsyncId);
track(nodeName, coreAdminAsyncId);
}
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.nodeName = nodeName;
sreq.params = params;
shardHandler.submit(sreq, replica, sreq.params);
}
void processResponses(NamedList<Object> results, ShardHandler shardHandler, boolean abortOnError, String msgOnError) throws KeeperException, InterruptedException {
processResponses(results, shardHandler, abortOnError, msgOnError, Collections.emptySet());
}
void processResponses(NamedList<Object> results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
Set<String> okayExceptions) throws KeeperException, InterruptedException {
// Processes all shard responses
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp, okayExceptions);
Throwable exception = srsp.getException();
if (abortOnError && exception != null) {
// drain pending requests
while (srsp != null) {
srsp = shardHandler.takeCompletedOrError();
}
throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
}
}
} while (srsp != null);
// If request is async wait for the core admin to complete before returning
if (asyncId != null) {
waitForAsyncCallsToComplete(results); // TODO: Shouldn't we abort with msgOnError exception when failure?
shardAsyncIdByNode.clear();
}
}
private void waitForAsyncCallsToComplete(NamedList<Object> results) {
shardAsyncIdByNode.forEach((node, shardAsyncId) -> {
log.debug("I am Waiting for :{}/{}", node, shardAsyncId);
NamedList<Object> reqResult = null;
try {
reqResult = waitForCoreAdminAsyncCallToComplete(node, shardAsyncId, adminPath, zkStateReader, shardHandlerFactory, overseer);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
if (INCLUDE_TOP_LEVEL_RESPONSE) {
results.add(shardAsyncId, reqResult);
}
if ("failed".equalsIgnoreCase(((NamedList<String>)reqResult.get("STATUS")).get("state"))) {
log.error("Error from shard {}: {}", node, reqResult);
addFailure(results, node, reqResult);
} else {
addSuccess(results, node, reqResult);
}
});
}
/** @deprecated consider to make it private after {@link CreateCollectionCmd} refactoring*/
@Deprecated void track(String nodeName, String coreAdminAsyncId) {
shardAsyncIdByNode.put(nodeName, coreAdminAsyncId);
}
}
private static class WatchForResponseNode implements Watcher, Closeable {
private final CountDownLatch latch;
private final SolrZkClient zkClient;
private final String watchPath;
private boolean closed;
public WatchForResponseNode(CountDownLatch latch, SolrZkClient zkClient, String watchPath) {
this.zkClient = zkClient;
this.latch = latch;
this.watchPath = watchPath;
}
@Override
public void process(WatchedEvent event) {
if (log.isDebugEnabled()) log.debug("waitForAsyncId {}", event);
if (Event.EventType.None.equals(event.getType()) || closed) {
return;
}
if (event.getType().equals(Event.EventType.NodeCreated)) {
if (log.isDebugEnabled()) log.debug("Overseer request response zk node created");
latch.countDown();
return;
} else if (event.getType().equals(Event.EventType.NodeDeleted)) {
if (log.isDebugEnabled()) log.debug("Overseer request response zk node deleted");
latch.countDown();
return;
} else if (event.getType().equals(Event.EventType.NodeDataChanged)) {
if (log.isDebugEnabled()) log.debug("Overseer request response zk node data changed");
latch.countDown();
return;
}
}
@Override
public void close() throws IOException {
this.closed = true;
try {
zkClient.removeWatches(watchPath, this, WatcherType.Any, true);
} catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
}
}
}