blob: c99367117baf534ec5d64254409a009bb4a5100e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.LeaderElector;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SkyHook;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.ConnectionManager;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.RoutingRule;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.util.TestInjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
private final CloudDescriptor cloudDesc;
private final ZkController zkController;
private final SolrCmdDistributor cmdDistrib;
private final CoreDescriptor desc;
protected volatile List<SolrCmdDistributor.Node> nodes;
private volatile Set<String> skippedCoreNodeNames;
private final String collection;
private final boolean readOnlyCollection;
// The cached immutable clusterState for the update... usually refreshed for each individual update.
// Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions
// such as SOLR-13815 (live split data loss.) Most likely, the only valid reasons for updating clusterState should be on
// certain types of failure + retry.
// Note: there may be other races related to
// 1) cluster topology change across multiple adds
// 2) use of methods directly on zkController that use a different clusterState
// 3) in general, not controlling carefully enough exactly when our view of clusterState is updated
protected volatile ClusterState clusterState;
// should we clone the document before sending it to replicas?
// this is set to true in the constructor if the next processors in the chain
// are custom and may modify the SolrInputDocument racing with its serialization for replication
private final boolean cloneRequiredOnLeader;
//used for keeping track of replicas that have processed an add/update from the leader
private volatile RollupRequestReplicationTracker rollupReplicationTracker = null;
private volatile LeaderRequestReplicationTracker leaderReplicationTracker = null;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public DistributedZkUpdateProcessor(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
super(req, rsp, next);
CoreContainer cc = req.getCore().getCoreContainer();
desc = req.getCore().getCoreDescriptor();
cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor();
zkController = cc.getZkController();
cmdDistrib = new SolrCmdDistributor(zkController.getZkStateReader(), cc.getUpdateShardHandler(), new IsCCClosed(req));
try {
cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
collection = cloudDesc.getCollectionName();
clusterState = zkController.getClusterState();
DocCollection coll = clusterState.getCollectionOrNull(collection, true);
if (coll != null) {
// check readOnly property in coll state
readOnlyCollection = coll.isReadOnly();
} else {
readOnlyCollection = false;
}
} catch (Exception e) {
cmdDistrib.close();
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
private boolean isReadOnly() {
return readOnlyCollection || req.getCore().readOnly;
}
private boolean isCloneRequiredOnLeader(UpdateRequestProcessor next) {
boolean shouldClone = false;
UpdateRequestProcessor nextInChain = next;
while (nextInChain != null) {
Class<? extends UpdateRequestProcessor> klass = nextInChain.getClass();
if (klass != LogUpdateProcessorFactory.LogUpdateProcessor.class
&& klass != RunUpdateProcessorFactory.RunUpdateProcessor.class
&& klass != TolerantUpdateProcessor.class) {
shouldClone = true;
break;
}
nextInChain = nextInChain.next;
}
return shouldClone;
}
@Override
protected Replica.Type computeReplicaType() {
// can't use cloudDesc since this is called by super class, before the constructor instantiates cloudDesc.
return req.getCore().getCoreDescriptor().getCloudDescriptor().getReplicaType();
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
Replica leaderReplica;
DocCollection coll = clusterState.getCollection(collection);
Slice slice = coll.getSlice(desc.getCloudDescriptor().getShardId());
String shardId = slice.getName();
try {
// Not equivalent to getLeaderProps, which retries to find a leader.
leaderReplica = slice.getLeader();
if (leaderReplica == null) {
leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 10000);
} else {
isLeader = leaderReplica.getName().equals(desc.getName());
if (isLeader) {
LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
if (leaderElector == null || !leaderElector.isLeader()) {
leaderReplica = zkController.getZkStateReader().getLeaderRetry(req.getCore().getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient(), collection, shardId, 10000, true);
}
}
}
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
}
isLeader = leaderReplica.getName().equals(desc.getName());
if (log.isDebugEnabled()) log.debug("processCommit - start commit isLeader={} commit_end_point={} replicaType={}", isLeader, req.getParams().get(COMMIT_END_POINT), replicaType);
try (ParWork worker = new ParWork(this, false, false)) {
clusterState = zkController.getClusterState();
assert TestInjection.injectFailUpdateRequests();
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN,
"Collection " + collection + " is read-only.");
}
updateCommand = cmd;
List<SolrCmdDistributor.Node> nodes = null;
zkCheck();
if (req.getParams().get(COMMIT_END_POINT, "").equals("terminal") || (req.getParams().getBool("dist") != null && !req.getParams().getBool("dist"))) {
if (log.isDebugEnabled()) log.debug(
"processCommit - Do a local commit on single replica directly");
doLocalCommit(cmd);
return;
}
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
if (log.isDebugEnabled()) {
log.debug(
"processCommit - distrib commit isLeader={} commit_end_point={} replicaType={}",
isLeader, req.getParams().get(COMMIT_END_POINT), replicaType);
}
if (req.getParams().get(COMMIT_END_POINT, "").equals("replicas")) {
if (replicaType == Replica.Type.PULL) {
log.warn("processCommit - Commit not supported on replicas of type "
+ Replica.Type.PULL);
} else if (replicaType == Replica.Type.NRT) {
if (log.isDebugEnabled()) log.debug(
"processCommit - Do a local commit on NRT endpoint for replica");
doLocalCommit(cmd);
}
} else if (req.getParams().get(COMMIT_END_POINT, "").equals("leaders")) {
sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica.getName(), params);
} else if (req.getParams().get(COMMIT_END_POINT) == null) {
// zk
List<SolrCmdDistributor.Node> useNodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT), true);
if (isLeader) {
SolrCmdDistributor.Node removeNode = null;
for (SolrCmdDistributor.Node node : useNodes) {
if (node.getCoreName().equals(this.desc.getName())) {
removeNode = node;
}
}
if (removeNode != null) {
log.debug("remove leader node since we will do a local commit now {}", leaderReplica);
useNodes.remove(removeNode);
sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica.getName(), params);
if (log.isDebugEnabled()) log.debug("processCommit(CommitUpdateCommand) - end");
}
}
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
params.set(COMMIT_END_POINT, "leaders");
if (useNodes != null && useNodes.size() > 0) {
if (log.isDebugEnabled()) log.debug("processCommit - send commit to leaders nodes={}", useNodes);
params.set(DISTRIB_FROM, Replica.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
List<SolrCmdDistributor.Node> finalUseNodes1 = useNodes;
worker.collect("distCommit", () -> {
cmdDistrib.distribCommit(cmd, finalUseNodes1, params);
});
}
return;
}
}
if (log.isDebugEnabled()) log.debug("processCommit(CommitUpdateCommand) - end");
}
private void sendCommitToReplicasAndLocalCommit(CommitUpdateCommand cmd, ParWork worker, String leaderName, ModifiableSolrParams params) {
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(COMMIT_END_POINT, "replicas");
List<SolrCmdDistributor.Node> useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderName);
if (log.isDebugEnabled()) log.debug(
"processCommit - Found the following replicas to send commit to {}",
useNodes);
if (useNodes != null && useNodes.size() > 0) {
if (log.isDebugEnabled()) log.debug("processCommit - send commit to replicas nodes={}",
useNodes);
params.set(DISTRIB_FROM, Replica
.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
List<SolrCmdDistributor.Node> finalUseNodes = useNodes;
worker.collect("distCommit", () -> {
cmdDistrib.distribCommit(cmd, finalUseNodes, params);
});
}
if (log.isDebugEnabled()) {
log.debug(
"processCommit - Do a local commit for leader");
}
worker.collect("localCommit", () -> {
try {
doLocalCommit(cmd);
} catch (Exception e) {
log.error("Failed local leader commit", e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
});
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
try {
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
// check if client has requested minimum replication factor information. will set replicationTracker to null if
// we aren't the leader or subShardLeader
checkReplicationTracker(cmd);
} catch (ZooKeeperException e) {
log.error("Got an exception that doesn't look good for further updates in this request, bailing", e);
throw e;
}
super.processAdd(cmd);
}
@Override
protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
if (log.isDebugEnabled()) log.debug("Distribute add docid={} cmd={} to {} leader={} isSubShardLeader={}", cmd.getPrintableId(), cmd, nodes, isLeader, isSubShardLeader);
if (SkyHook.skyHookDoc != null) {
SkyHook.skyHookDoc.register(cmd.getPrintableId(), "do distrib add isLeader=" + isLeader + " isSubShardLeader=" + isSubShardLeader);
}
if (isLeader && !isSubShardLeader) {
DocCollection coll;
String routeId;
try {
coll = clusterState.getCollection(collection);
routeId = cmd.getRootIdUsingRouteParam();
} catch (Exception e) {
log.error("Error getting routeId docId={}", cmd.getPrintableId(), e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Error getting routeId docId=" + cmd.getPrintableId(), e);
}
if (log.isDebugEnabled()) log.debug("going to maybe get sub shard leaders docid={} cmd={} to {} leader={} routeId={}", cmd.getPrintableId(), cmd, nodes, isLeader, routeId);
if (routeId != null) {
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), routeId, cmd.getSolrInputDocument(), cmd);
// the list<node> will actually have only one element for an add request
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(DISTRIB_FROM, Replica.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
cmdDistrib.distribAdd(cmd, subShardLeaders, params, true);
}
if (log.isDebugEnabled()) log.debug("Distribute add getNodesByRoutingRules docid={} cmd={} to {} {}", cmd.getPrintableId(), cmd, nodes, isLeader);
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, routeId, cmd.getSolrInputDocument());
if (log.isDebugEnabled()) log.debug("Distribute add got NodesByRoutingRules docid={} cmd={} to {} {}", cmd.getPrintableId(), cmd, nodesByRoutingRules, isLeader);
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
try {
if (SkyHook.skyHookDoc != null) {
SkyHook.skyHookDoc.register(cmd.getPrintableId(), "do distrib to replicas with nodesByRoutingRules");
}
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(DISTRIB_FROM, Replica.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
params.set(DISTRIB_FROM_COLLECTION, collection);
params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId());
cmdDistrib.distribAdd(cmd, nodesByRoutingRules, params, true);
} catch (IOException e) {
log.error("", e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
return;
}
}
} else {
if (log.isDebugEnabled()) {
log.debug("Not a shard or sub shard leader docId={}", cmd.getPrintableId());
}
if (!forwardToLeader) {
return;
}
}
if (log.isDebugEnabled()) {
log.debug("Using nodes {}", nodes);
}
if (log.isDebugEnabled()) log.debug("Distribute add using nodes if not null and larger than size 0 docid={} cmd={} to {} isLeader={}", cmd.getPrintableId(), cmd, nodes, isLeader);
if (nodes != null && nodes.size() > 0) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString()));
params.set(DISTRIB_FROM, Replica.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
// TODO: Kept for rolling upgrades only. Should be removed in Solr 9
params.set(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
}
// for (SolrCmdDistributor.Node node : nodes) {
// if (node.getCoreName().equals(desc.getName())) {
// log.error("docId={} IllegalState, trying to send an update to ourself", cmd.getPrintableId());
// throw new IllegalStateException("IllegalState, trying to send an update to ourself");
// }
// }
if (cmd.isInPlaceUpdate()) {
params.set(DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
// Use synchronous=true so that a new connection is used, instead
// of the update being streamed through an existing streaming client.
// When using a streaming client, the previous update
// and the current in-place update (that depends on the previous update), if reordered
// in the stream, can result in the current update being bottled up behind the previous
// update in the stream and can lead to degraded performance.
if (log.isDebugEnabled()) log.debug("Distribute add inplaceupdate docid={} cmd={} to {} {}", cmd.getPrintableId(), cmd, nodes, isLeader);
cmdDistrib.distribAdd(cmd, nodes, params, true, rollupReplicationTracker, leaderReplicationTracker);
} else {
if (SkyHook.skyHookDoc != null) {
SkyHook.skyHookDoc.register(cmd.getPrintableId(), "send update to cmdDistrib nodes=" + nodes + " cmd=" + cmd);
}
if (log.isDebugEnabled()) log.debug("Distribute add, std old nodes docid={} cmd={} to {} {}", cmd.getPrintableId(), cmd, nodes, isLeader);
cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker);
}
}
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
clusterState = zkController.getClusterState();
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
super.processDelete(cmd);
}
@Override
protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException {
setupRequest(cmd);
log.info("deletebyid {}", cmd.id);
// check if client has requested minimum replication factor information. will set replicationTracker to null if
// we aren't the leader or subShardLeader
checkReplicationTracker(cmd);
super.doDeleteById(cmd);
}
@Override
protected void doDistribDeleteById(DeleteUpdateCommand cmd) throws IOException {
if (isLeader && !isSubShardLeader) {
DocCollection coll = clusterState.getCollection(collection);
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null, cmd);
// the list<node> will actually have only one element for an add request
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(DISTRIB_FROM, Replica.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null);
}
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getId(), null);
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(DISTRIB_FROM, Replica.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set(DISTRIB_FROM_COLLECTION, collection);
params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId());
cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, null, null);
}
}
if (nodes != null) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM,
(isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString()
: DistribPhase.TOLEADER.toString()));
params.set(DISTRIB_FROM, Replica.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
// TODO: Kept for rolling upgrades only. Remove in Solr 9
params.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
}
cmdDistrib.distribDelete(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker);
}
}
@Override
protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
zkCheck();
// NONE: we are the first to receive this deleteByQuery
// - it must be forwarded to the leader of every shard
// TO: we are a leader receiving a forwarded deleteByQuery... we must:
// - block all updates (use VersionInfo)
// - flush *all* updates going to our replicas
// - forward the DBQ to our replicas and wait for the response
// - log + execute the local DBQ
// FROM: we are a replica receiving a DBQ from our leader
// - log + execute the local DBQ
DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
DocCollection coll = clusterState.getCollection(collection);
if (DistribPhase.NONE == phase) {
if (rollupReplicationTracker == null) {
rollupReplicationTracker = new RollupRequestReplicationTracker();
}
boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams()));
outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
outParams.set(DISTRIB_FROM, Replica.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
SolrParams params = req.getParams();
String route = params.get(ShardParams._ROUTE_);
Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll);
List<SolrCmdDistributor.Node> leaders = new ArrayList<>(slices.size());
for (Slice slice : slices) {
String sliceName = slice.getName();
Replica leader;
try {
leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName, 10000);
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "error getting leader", e);
}
// TODO: What if leaders changed in the meantime?
// should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
// Am I the leader for this slice?
String leaderCoreNodeName = leader.getName();
String coreName = desc.getName();
isLeader = coreName.equals(leaderCoreNodeName);
if (isLeader) {
// don't forward to ourself
leaderForAnyShard = true;
} else {
leaders.add(new SolrCmdDistributor.ForwardNode(zkController.getZkStateReader(), leader, collection, sliceName));
}
}
outParams.remove("commit"); // this will be distributed from the local commit
if (params.get(UpdateRequest.MIN_REPFACT) != null) {
// TODO: Kept this for rolling upgrades. Remove in Solr 9
outParams.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
}
cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
if (!leaderForAnyShard) {
return;
}
// change the phase to TOLEADER so we look up and forward to our own replicas (if any)
phase = DistribPhase.TOLEADER;
}
List<SolrCmdDistributor.Node> replicas = null;
if (DistribPhase.TOLEADER == phase) {
// This core should be a leader
isLeader = true;
replicas = setupRequestForDBQ(desc.getName());
} else if (DistribPhase.FROMLEADER == phase) {
isLeader = false;
}
// check if client has requested minimum replication factor information. will set replicationTracker to null if
// we aren't the leader or subShardLeader
checkReplicationTracker(cmd);
super.doDeleteByQuery(cmd, replicas, coll);
}
@Override
protected void doDistribDeleteByQuery(DeleteUpdateCommand cmd, List<SolrCmdDistributor.Node> replicas,
DocCollection coll) throws IOException {
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
boolean leaderLogic = isLeader && !isReplayOrPeersync;
// forward to all replicas
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(CommonParams.VERSION_FIELD, Long.toString(cmd.getVersion()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(DISTRIB_FROM, Replica.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
boolean subShardLeader = false;
subShardLeader = amISubShardLeader(coll, null, null, null);
if (subShardLeader) {
String myShardId = cloudDesc.getShardId();
Replica leaderReplica = null;
try {
leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, myShardId);
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "error getting leader", e);
}
// DBQ forwarded to NRT and TLOG replicas
Set<Replica.State> matchFilters = new HashSet<>(3);
matchFilters.add(Replica.State.BUFFERING);
matchFilters.add(Replica.State.RECOVERING);
matchFilters.add(Replica.State.ACTIVE);
List<Replica> replicaProps = zkController.getZkStateReader()
.getReplicaProps(collection, myShardId, leaderReplica.getName(), matchFilters, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
if (replicaProps != null) {
final List<SolrCmdDistributor.Node> myReplicas = new ArrayList<>(replicaProps.size());
for (Replica replicaProp : replicaProps) {
myReplicas.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), replicaProp, collection, myShardId));
}
cmdDistrib.distribDelete(cmd, myReplicas, params, false, rollupReplicationTracker, leaderReplicationTracker);
}
}
if (leaderLogic) {
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null, cmd);
if (subShardLeaders != null) {
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker);
}
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, null, null);
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(DISTRIB_FROM, Replica.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set(DISTRIB_FROM_COLLECTION, collection);
params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId());
cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, rollupReplicationTracker, leaderReplicationTracker);
}
if (replicas != null) {
cmdDistrib.distribDelete(cmd, replicas, params, false, rollupReplicationTracker, leaderReplicationTracker);
}
}
}
// used for deleteByQuery to get the list of nodes this leader should forward to
private List<SolrCmdDistributor.Node> setupRequestForDBQ(String name) {
List<SolrCmdDistributor.Node> nodes = null;
String shardId = cloudDesc.getShardId();
try {
// TODO: what if we are no longer the leader?
forwardToLeader = false;
Set<Replica.State> matchFilters = new HashSet<>(3);
matchFilters.add(Replica.State.BUFFERING);
matchFilters.add(Replica.State.RECOVERING);
matchFilters.add(Replica.State.ACTIVE);
List<Replica> replicaProps = zkController.getZkStateReader()
.getReplicaProps(collection, shardId, name, matchFilters, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
if (replicaProps != null) {
nodes = new ArrayList<>(replicaProps.size());
for (Replica props : replicaProps) {
nodes.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), props, collection, shardId));
}
}
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
return nodes;
}
@Override
protected String getLeaderUrl(String id) {
// try get leader from req params, fallback to zk lookup if not found.
String distribFrom = req.getParams().get(DISTRIB_FROM);
if(distribFrom != null) {
return distribFrom;
}
return getLeaderUrlZk(id);
}
private String getLeaderUrlZk(String id) {
// An update we're dependent upon didn't arrive! This is unexpected. Perhaps likely our leader is
// down or partitioned from us for some reason. Lets force refresh cluster state, and request the
// leader for the update.
if (zkController == null) { // we should be in cloud mode, but wtf? could be a unit test
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find document with id=" + id + ", but fetching from leader "
+ "failed since we're not in cloud mode.");
}
try {
return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 5000).getCoreUrl();
} catch (InterruptedException | TimeoutException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e);
}
}
@Override
protected void setupRequest(UpdateCommand cmd) {
updateCommand = cmd;
zkCheck();
if (cmd instanceof AddUpdateCommand) {
AddUpdateCommand acmd = (AddUpdateCommand)cmd;
nodes = setupRequest(acmd.getRootIdUsingRouteParam(), acmd.getSolrInputDocument());
} else if (cmd instanceof DeleteUpdateCommand) {
DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
nodes = setupRequest(dcmd.getId(), null);
}
}
protected List<SolrCmdDistributor.Node> setupRequest(String id, SolrInputDocument doc) {
return setupRequest(id, doc, null);
}
protected List<SolrCmdDistributor.Node> getNodes() {
return nodes;
}
protected List<SolrCmdDistributor.Node> setupRequest(String id, SolrInputDocument doc, String route) {
// if we are in zk mode...
assert TestInjection.injectUpdateRandomPause();
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
forwardToLeader = false;
return null;
}
clusterState = zkController.getClusterState();
DistribPhase phase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
DocCollection coll = clusterState.getCollection(collection);
Slice slice;
try {
slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
} catch (ImplicitDocRouter.NoShardException e) {
try {
zkController.getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
if (collectionState == null) {
return false;
}
if (collectionState.getSlice(e.getShard()) != null) {
return true;
}
return false;
});
} catch (Exception e2) {
throw new ImplicitDocRouter.NoShardException(ErrorCode.BAD_REQUEST, "No shard found for " + e.getShard(), e.getShard(), e2);
}
coll = zkController.getClusterState().getCollection(collection);
slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
}
if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) {
assert TestInjection.injectFailReplicaRequests();
isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
forwardToLeader = false;
return null;
}
if (slice == null) {
// No slice found. Most strict routers will have already thrown an exception, so a null return is
// a signal to use the slice of this core.
// TODO: what if this core is not in the targeted collection?
String shardId = cloudDesc.getShardId();
slice = coll.getSlice(shardId);
if (slice == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
}
}
String shardId = slice.getName();
Replica leaderReplica;
try {
// Not equivalent to getLeaderProps, which retries to find a leader.
leaderReplica = slice.getLeader();
if (leaderReplica == null) {
leaderReplica = zkController.getZkStateReader().getLeaderRetry(req.getCore().getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient(), collection, shardId, 10000, true);
} else {
isLeader = leaderReplica.getName().equals(desc.getName());
if (isLeader) {
LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
if (leaderElector == null || !leaderElector.isLeader()) {
leaderReplica = zkController.getZkStateReader().getLeaderRetry(req.getCore().getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient(), collection, shardId, 10000, true);
}
}
}
isLeader = leaderReplica != null && leaderReplica.getName().equals(desc.getName());
doDefensiveChecks(phase);
if (!isLeader) {
isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
if (isSubShardLeader) {
leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
}
}
// if request is coming from another collection then we want it to be sent to all replicas
// even if its phase is FROMLEADER
String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION);
if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) {
// we are coming from the leader, just go local - add no urls
forwardToLeader = false;
return null;
} else if (isLeader || isSubShardLeader) {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
String leaderCoreName = leaderReplica.getName();
List<Replica> replicas = clusterState.getCollection(collection)
.getSlice(shardId)
.getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
replicas.removeIf((replica) -> replica.getName().equals(leaderCoreName));
if (replicas.isEmpty()) {
return null;
}
// check for test param that lets us miss replicas
String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
Set<String> skipListSet = null;
if (skipList != null) {
skipListSet = new HashSet<>(skipList.length);
skipListSet.addAll(Arrays.asList(skipList));
log.info("test.distrib.skip.servers was found and contains:{}", skipListSet);
}
List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
skippedCoreNodeNames = new HashSet<>();
ZkShardTerms zkShardTerms = null;
try {
zkShardTerms = zkController.getShardTerms(collection, shardId);
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
for (Replica replica : replicas) {
String coreNodeName = replica.getName();
if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
if (log.isInfoEnabled()) {
log.info("check url:{} against:{} result:true", replica.getCoreUrl(), skipListSet);
}
} else if(zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
log.info("skip url:{} cause its term is less than leader", replica.getCoreUrl());
skippedCoreNodeNames.add(replica.getName());
} else if (!zkController.getZkStateReader().getLiveNodes().contains(replica.getNodeName()) || (replica.getState() == Replica.State.DOWN)) {
skippedCoreNodeNames.add(replica.getName());
} else {
nodes.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), replica, collection, shardId, maxRetriesToFollowers));
}
}
if (log.isDebugEnabled()) log.debug("We are the leader {}, forward update to replicas.. {}", req.getCore().getName(), nodes);
return nodes;
} else {
// I need to forward on to the leader...
forwardToLeader = true;
assert !isLeader;
assert !isSubShardLeader;
List<SolrCmdDistributor.Node> nodes = Collections.singletonList(
new SolrCmdDistributor.ForwardNode(zkController.getZkStateReader(), leaderReplica, collection, shardId));
if (log.isDebugEnabled()) log.debug("Forward update to leader {}", nodes);
if (desc.getName().equals(leaderReplica.getName())) {
IllegalStateException e = new IllegalStateException(
"We were asked to forward an update to ourself, which should not happen name=" + desc.getName() + " isLeader=" + isLeader);
log.error("Sending an update to ourself id={}", id, e);
throw e;
}
return nodes;
}
} catch (InterruptedException | TimeoutException e) {
ParWork.propagateInterrupt(e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
}
@Override
protected boolean shouldCloneCmdDoc() {
boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;
return willDistrib;
}
// helper method, processAdd was getting a bit large.
// Sets replicationTracker = null if we aren't the leader
// We have two possibilities here:
//
// 1> we are a leader: Allocate a LeaderTracker and, if we're getting the original request, a RollupTracker
// 2> we're a follower: allocat a RollupTracker
//
private void checkReplicationTracker(UpdateCommand cmd) {
SolrParams rp = cmd.getReq().getParams();
String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM);
// Ok,we're receiving the original request, we need a rollup tracker, but only one so we accumulate over the
// course of a batch.
if ((distribUpdate == null || DistribPhase.NONE.toString().equals(distribUpdate)) &&
rollupReplicationTracker == null) {
rollupReplicationTracker = new RollupRequestReplicationTracker();
}
// If we're a leader, we need a leader replication tracker, so let's do that. If there are multiple docs in
// a batch we need to use the _same_ leader replication tracker.
if (isLeader && leaderReplicationTracker == null) {
leaderReplicationTracker = new LeaderRequestReplicationTracker(
req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
}
}
private List<SolrCmdDistributor.Node> getCollectionUrls(String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
final DocCollection docCollection = clusterState.getCollectionOrNull(collection, true);
if (collection == null || docCollection.getSlicesMap() == null) {
throw new ZooKeeperException(SolrException.ErrorCode.BAD_REQUEST,
"Could not find collection in zk: " + clusterState);
}
Map<String,Slice> slices = docCollection.getSlicesMap();
final List<SolrCmdDistributor.Node> urls = new ArrayList<>(slices.size());
for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
Slice slice = slices.get(sliceEntry.getKey());
if (onlyLeaders) {
Replica replica = docCollection.getLeader(slice.getName());
if (replica != null) {
if (zkController.getZkStateReader().isNodeLive(replica.getNodeName())) {
urls.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), replica, collection, slice.getName()));
}
}
continue;
}
Map<String,Replica> shardMap = slice.getReplicasMap();
for (Map.Entry<String,Replica> entry : shardMap.entrySet()) {
if (!types.contains(entry.getValue().getType())) {
continue;
}
Replica nodeProps = entry.getValue();
nodeProps.getProperties().put(ZkStateReader.CORE_NAME_PROP, nodeProps.getName());
if (zkController.getZkStateReader().isNodeLive(nodeProps.getNodeName())) {
urls.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), nodeProps, collection, slice.getName()));
}
}
}
if (urls.isEmpty()) {
return null;
}
return urls;
}
/** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
private boolean couldIbeSubShardLeader(DocCollection coll) {
// Could I be the leader of a shard in "construction/recovery" state?
String myShardId = cloudDesc.getShardId();
Slice mySlice = coll.getSlice(myShardId);
Slice.State state = mySlice.getState();
return state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY;
}
/** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
protected boolean amISubShardLeader(DocCollection coll, Slice parentSlice, String id, SolrInputDocument doc) {
// Am I the leader of a shard in "construction/recovery" state?
String myShardId = cloudDesc.getShardId();
Slice mySlice = coll.getSlice(myShardId);
final Slice.State state = mySlice.getState();
if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
Replica myLeader = null;
try {
myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId, 5000);
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "error getting leader", e);
}
boolean amILeader = myLeader.getName().equals(desc.getName());
if (amILeader) {
// Does the document belong to my hash range as well?
DocRouter.Range myRange = mySlice.getRange();
if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
if (parentSlice != null) {
boolean isSubset = parentSlice.getRange() != null && myRange.isSubsetOf(parentSlice.getRange());
return isSubset && coll.getRouter().isTargetSlice(id, doc, req.getParams(), myShardId, coll);
} else {
// delete by query case -- as long as I am a sub shard leader we're fine
return true;
}
}
}
return false;
}
protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, String leaderName) {
if (log.isDebugEnabled()) log.debug("leader is {}", leaderName);
String leaderCoreNodeName = leaderName;
List<Replica> replicas = clusterState.getCollection(collection)
.getSlice(shardId)
.getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName));
if (replicas.isEmpty()) {
return null;
}
// check for test param that lets us miss replicas
String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
Set<String> skipListSet = null;
if (skipList != null) {
skipListSet = new HashSet<>(skipList.length);
skipListSet.addAll(Arrays.asList(skipList));
log.info("test.distrib.skip.servers was found and contains:{}", skipListSet);
}
List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
skippedCoreNodeNames = new HashSet<>();
ZkShardTerms zkShardTerms = null;
try {
zkShardTerms = zkController.getShardTerms(collection, shardId);
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
for (Replica replica : replicas) {
String coreNodeName = replica.getName();
if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
if (log.isInfoEnabled()) {
log.info("check url:{} against:{} result:true", replica.getCoreUrl(), skipListSet);
}
} else if (zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
if (log.isDebugEnabled()) {
log.info("skip url:{} cause its term is less than leader", replica.getCoreUrl());
}
skippedCoreNodeNames.add(replica.getName());
} else if (!zkController.getZkStateReader().getLiveNodes().contains(replica.getNodeName())
|| replica.getState() != Replica.State.ACTIVE) {
skippedCoreNodeNames.add(replica.getName());
} else {
nodes.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), replica, collection, shardId));
}
}
return nodes;
}
/** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
protected List<SolrCmdDistributor.Node> getSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc, UpdateCommand cmd) {
if (SkyHook.skyHookDoc != null && cmd instanceof AddUpdateCommand) {
SkyHook.skyHookDoc.register(((AddUpdateCommand) cmd).getPrintableId(), "getSubShardLeaders isLeader=true");
}
List<SolrCmdDistributor.Node> subLeaderNodes = null;
try {
Collection<Slice> allSlices = coll.getSlices();
for (Slice aslice : allSlices) {
final Slice.State state = aslice.getState();
if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
DocRouter.Range myRange = coll.getSlice(shardId).getRange();
if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange);
if (isSubset && (docId == null // in case of deletes
|| coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) {
Replica sliceLeader = aslice.getLeader();
// slice leader can be null because node/shard is created zk before leader election
if (sliceLeader != null && zkController.getZkStateReader().isNodeLive(sliceLeader.getNodeName())) {
if (nodes == null) nodes = new ArrayList<>();
nodes.add(new SolrCmdDistributor.ForwardNode(zkController.getZkStateReader(), sliceLeader, coll.getName(), aslice.getName()));
}
}
}
}
} catch (Throwable t) {
log.error("Exception getting sub shard leaders", t);
if (t instanceof Error) {
throw t;
}
throw new SolrException(ErrorCode.SERVER_ERROR, t);
}
if (log.isDebugEnabled()) log.debug("sub shard leaders are {}", subLeaderNodes);
return subLeaderNodes;
}
/** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#MIGRATE} */
protected List<SolrCmdDistributor.Node> getNodesByRoutingRules(ClusterState cstate, DocCollection coll, String id, SolrInputDocument doc) {
try {
DocRouter router = coll.getRouter();
List<SolrCmdDistributor.Node> nodes = null;
if (router instanceof CompositeIdRouter) {
CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
String myShardId = cloudDesc.getShardId();
Slice slice = coll.getSlice(myShardId);
Map<String,RoutingRule> routingRules = slice.getRoutingRules();
if (routingRules != null) {
// delete by query case
if (id == null) {
for (Map.Entry<String,RoutingRule> entry : routingRules.entrySet()) {
String targetCollectionName = entry.getValue().getTargetCollectionName();
final DocCollection docCollection = cstate.getCollectionOrNull(targetCollectionName);
if (docCollection != null && docCollection.getActiveSlices().size() > 0) {
Collection<Slice> activeSlices = docCollection.getActiveSlices();
Slice any = activeSlices.iterator().next();
if (nodes == null) nodes = new ArrayList<>();
nodes.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), any.getLeader()));
}
}
return nodes;
}
String routeKey = SolrIndexSplitter.getRouteKey(id);
if (routeKey != null) {
RoutingRule rule = routingRules.get(routeKey + "!");
if (rule != null) {
if (!rule.isExpired()) {
List<DocRouter.Range> ranges = rule.getRouteRanges();
if (ranges != null && !ranges.isEmpty()) {
int hash = compositeIdRouter.sliceHash(id, doc, null, coll);
for (DocRouter.Range range : ranges) {
if (range.includes(hash)) {
DocCollection targetColl = cstate.getCollection(rule.getTargetCollectionName());
Collection<Slice> activeSlices = targetColl.getRouter().getSearchSlicesSingle(id, null, targetColl);
if (activeSlices == null || activeSlices.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "No active slices serving " + id + " found for target collection: " + rule.getTargetCollectionName());
}
Replica targetLeader = targetColl.getLeader(activeSlices.iterator().next().getName());
nodes = new ArrayList<>(1);
nodes.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), targetLeader));
break;
}
}
}
} else {
log.info("Going to expire routing rule");
try {
// MRM TODO: TODO: needs to use the statepublisher
Map<String,Object> map = Utils
.makeMap(Overseer.QUEUE_OPERATION, OverseerAction.REMOVEROUTINGRULE.toLower(), ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.SHARD_ID_PROP, myShardId, "routeKey", routeKey + "!");
// zkController.getOverseer().offerStateUpdate(Utils.toJSON(map));
} catch (Exception e) {
log.error("Exception while removing routing rule for route key: {}", routeKey, e);
}
}
}
}
}
}
return nodes;
} catch (Throwable t) {
log.error("Error getting routing rules", t);
if (t instanceof Error) {
throw t;
}
throw new SolrException(ErrorCode.SERVER_ERROR, "Error getting routing rules", t);
}
}
private void doDefensiveChecks(DistribPhase phase) {
// MRM TODO: could use LeaderElector elector = zkController.getLeaderElector(desc.getName());
boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
if (isReplayOrPeersync) return;
String from = req.getParams().get(DISTRIB_FROM);
DocCollection docCollection = clusterState.getCollection(collection);
Slice mySlice = docCollection.getSlice(cloudDesc.getShardId());
if (isLeader || DistribPhase.TOLEADER == phase) {
LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
if (leaderElector == null || !leaderElector.isLeader()) {
throw new IllegalStateException(
"Not the valid leader (replica=" + req.getCore().getName() + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) + " coll=" + req.getCore()
.getCoreContainer().getZkController().getClusterState().getCollectionOrNull(req.getCore().getCoreDescriptor().getCollectionName()));
}
}
if (DistribPhase.FROMLEADER == phase && isLeader && from != null) { // from will be null on log replay
String fromShard = req.getParams().get(DISTRIB_FROM_PARENT);
if (fromShard != null) {
if (mySlice.getState() == Slice.State.ACTIVE) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"Request says it is coming from parent shard leader but we are in active state");
}
// shard splitting case -- check ranges to see if we are a sub-shard
Slice fromSlice = docCollection.getSlice(fromShard);
DocRouter.Range parentRange = fromSlice.getRange();
if (parentRange == null) parentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
if (mySlice.getRange() != null && !mySlice.getRange().isSubsetOf(parentRange)) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"Request says it is coming from parent shard leader but parent hash range is not superset of my range");
}
} else {
String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule?
if (fromCollection == null) {
log.error("Request says it is coming from leader, but we are the leader: {}", req.getParamString());
SolrException solrExc = new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
solrExc.setMetadata("cause", "LeaderChanged");
throw solrExc;
}
}
}
}
@Override
public void doClose() {
try {
super.doClose();
} finally {
if (cmdDistrib != null) {
cmdDistrib.close();
}
}
}
@Override
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
super.processMergeIndexes(cmd);
}
@Override
public void processRollback(RollbackUpdateCommand cmd) throws IOException {
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
super.processRollback(cmd);
}
// TODO: optionally fail if n replicas are not reached...
protected void doDistribFinish(Set<UpdateCommand> cancelCmds) {
// TODO: if not a forward and replication req is not specified, we could
// send in a background thread
cmdDistrib.finish();
cancelCmds.forEach(updateCommand1 -> {
cmdDistrib.getErrors().remove(updateCommand1);
});
boolean shouldUpdateTerms = isLeader && isIndexChanged;
if (shouldUpdateTerms) {
ZkShardTerms zkShardTerms = null;
try {
zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
if (skippedCoreNodeNames != null) {
zkShardTerms.ensureTermsIsHigher(desc.getName(), skippedCoreNodeNames);
}
zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
Collection<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors().values();
if (errors.size() > 0) {
log.warn("There were errors during the request {}", errors);
}
// TODO - we may need to tell about more than one error...
Set<SolrCmdDistributor.Error> errorsForClient = new HashSet<>(errors.size());
Set<String> replicasShouldBeInLowerTerms = new HashSet<>();
errors.forEach(error -> {
if (error.req == null) return;
if (error.req.node instanceof SolrCmdDistributor.ForwardNode) {
// if it's a forward, any fail is a problem -
// otherwise we assume things are fine if we got it locally
// until we start allowing min replication param
errorsForClient.add(error);
return;
}
// else...
// for now we don't error - we assume if it was added locally, we
// succeeded
log.warn("Error sending update to {}", error.req.node.getBaseUrl(), error.t);
// Since it is not a forward request, for each fail, try to tell them to
// recover - the doc was already added locally, so it should have been
// legit
DistribPhase phase = DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));
if (phase != DistribPhase.FROMLEADER) return; // don't have non-leaders try to recovery other nodes
// commits are special -- they can run on any node irrespective of whether it is a leader or not
// we don't want to run recovery on a node which missed a commit command
if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null) return;
final String replicaUrl = error.req.node.getUrl();
// if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
String cause = (error.t instanceof SolrException) ? ((SolrException) error.t).getMetadata("cause") : null;
if ("LeaderChanged".equals(cause)) {
// let's just fail this request and let the client retry? or just call processAdd again?
log.error("On {}, replica {} now thinks it is the leader! Failing the request to let the client retry!", desc.getName(), replicaUrl, error.t);
errorsForClient.add(error);
return;
}
String collection = null;
String shardId = null;
if (error.req.node instanceof SolrCmdDistributor.StdNode) {
SolrCmdDistributor.StdNode stdNode = (SolrCmdDistributor.StdNode) error.req.node;
collection = stdNode.getCollection();
shardId = stdNode.getShardId();
// before we go setting other replicas to down, make sure we're still the leader!
String leaderCoreNodeName = null;
Exception getLeaderExc = null;
Replica leaderProps = null;
try {
leaderProps = zkController.getZkStateReader().getLeader(collection, shardId);
if (leaderProps != null) {
leaderCoreNodeName = leaderProps.getName();
}
} catch (Exception exc) {
getLeaderExc = exc;
}
if (leaderCoreNodeName == null) {
log.warn("Failed to determine if {} is still the leader for collection={} shardId={} before putting {} into leader-initiated recovery", desc.getName(), collection, shardId, replicaUrl,
getLeaderExc);
}
List<Replica> myReplicas = zkController.getZkStateReader().getReplicaProps(collection, cloudDesc.getShardId(), desc.getName());
boolean foundErrorNodeInReplicaList = false;
if (myReplicas != null) {
for (Replica replicaProp : myReplicas) {
if (((Replica) replicaProp).getName().equals(((Replica) stdNode.getNodeProps()).getName())) {
foundErrorNodeInReplicaList = true;
break;
}
}
}
if (leaderCoreNodeName != null && desc.getName().equals(leaderCoreNodeName) // we are still same leader
&& foundErrorNodeInReplicaList // we found an error for one of replicas
&& !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR
try {
String coreNodeName = ((Replica) stdNode.getNodeProps()).getName();
// if false, then the node is probably not "live" anymore
// and we do not need to send a recovery message
Throwable rootCause = SolrException.getRootCause(error.t);
log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
replicasShouldBeInLowerTerms.add(coreNodeName);
} catch (Exception exc) {
SolrZkClient.checkInterrupted(exc);
Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
log.error("Leader failed to set replica {} state to DOWN due to: {}", error.req.node.getUrl(), setLirZnodeFailedCause, setLirZnodeFailedCause);
}
} else {
// not the leader anymore maybe or the error'd node is not my replica?
if (!foundErrorNodeInReplicaList) {
log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent! replicas={} node={}", desc.getName(), collection, cloudDesc.getShardId(),
stdNode.getNodeProps().getCoreUrl(), myReplicas, error.req.node.getClass().getSimpleName());
if (!shardId.equals(cloudDesc.getShardId())) {
// some replicas on other shard did not receive the updates (ex: during splitshard),
// exception must be notified to clients
errorsForClient.add(error);
}
} else {
log.warn("Core {} is no longer the leader for {} {} or we tried to put ourself into LIR, no request recovery command will be sent! replicas={}", desc.getName(), collection, shardId, myReplicas);
}
}
}
});
if (!replicasShouldBeInLowerTerms.isEmpty()) {
try {
zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()).ensureTermsIsHigher(desc.getName(), replicasShouldBeInLowerTerms);
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
handleReplicationFactor();
if (0 < errorsForClient.size()) {
throw new DistributedUpdatesAsyncException(errorsForClient);
}
}
/**
* If necessary, include in the response the achieved replication factor
*/
@SuppressWarnings("deprecation")
private void handleReplicationFactor() {
if (rsp != null && leaderReplicationTracker != null || rollupReplicationTracker != null) {
int achievedRf = Integer.MAX_VALUE;
if (leaderReplicationTracker != null) {
achievedRf = leaderReplicationTracker.getAchievedRf();
// Transfer this to the rollup tracker if it exists
if (rollupReplicationTracker != null) {
rollupReplicationTracker.testAndSetAchievedRf(achievedRf);
}
}
// Rollup tracker has accumulated stats.
if (rollupReplicationTracker != null) {
achievedRf = rollupReplicationTracker.getAchievedRf();
}
if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
// Unused, but kept for back compatibility. To be removed in Solr 9
rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, Integer.parseInt(req.getParams().get(UpdateRequest.MIN_REPFACT)));
}
NamedList<Object> header = rsp.getResponseHeader();
if (header != null) {
header.add(UpdateRequest.REPFACT, achievedRf);
}
rollupReplicationTracker = null;
leaderReplicationTracker = null;
}
}
private void zkCheck() {
// Streaming updates can delay shutdown and cause big update reorderings (new streams can't be
// initiated, but existing streams carry on). This is why we check if the CC is shutdown.
// See SOLR-8203 and loop HdfsChaosMonkeyNothingIsSafeTest (and check for inconsistent shards) to test.
if ((isLeader || isSubShardLeader) && req.getCore().getCoreContainer().isShutDown()) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is shutting down.");
}
clusterState.getCollection(collection);
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
// for log reply or peer sync, we don't need to be connected to ZK
return;
}
if (!(isLeader || isSubShardLeader) || zkController.getZkClient().isAlive()) {
return;
}
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled.");
}
private static class IsCCClosed extends ConnectionManager.IsClosed {
private final SolrQueryRequest req;
public IsCCClosed(SolrQueryRequest req) {
this.req = req;
}
@Override
public boolean isClosed() {
return req.getCore().getCoreContainer().isShutDown();
}
}
}