| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.update.processor; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.lucene.util.BytesRef; |
| import org.apache.lucene.util.CharsRefBuilder; |
| import org.apache.solr.client.solrj.SolrRequest; |
| import org.apache.solr.client.solrj.SolrRequest.METHOD; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.cloud.DistributedQueue; |
| import org.apache.solr.client.solrj.impl.HttpSolrClient; |
| import org.apache.solr.client.solrj.request.GenericSolrRequest; |
| import org.apache.solr.client.solrj.request.UpdateRequest; |
| import org.apache.solr.client.solrj.response.SimpleSolrResponse; |
| import org.apache.solr.cloud.CloudDescriptor; |
| import org.apache.solr.cloud.Overseer; |
| import org.apache.solr.cloud.ZkController; |
| import org.apache.solr.cloud.ZkShardTerms; |
| import org.apache.solr.cloud.overseer.OverseerAction; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrException.ErrorCode; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.SolrInputField; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.CompositeIdRouter; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.DocRouter; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.RoutingRule; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.cloud.Slice.State; |
| import org.apache.solr.common.cloud.SolrZkClient; |
| import org.apache.solr.common.cloud.ZkCoreNodeProps; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.cloud.ZooKeeperException; |
| import org.apache.solr.common.params.CommonParams; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.params.ShardParams; |
| import org.apache.solr.common.params.SolrParams; |
| import org.apache.solr.common.params.UpdateParams; |
| import org.apache.solr.common.util.Hash; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.common.util.Utils; |
| import org.apache.solr.core.CoreContainer; |
| import org.apache.solr.handler.component.RealTimeGetComponent; |
| import org.apache.solr.request.SolrQueryRequest; |
| import org.apache.solr.response.SolrQueryResponse; |
| import org.apache.solr.schema.SchemaField; |
| import org.apache.solr.update.AddUpdateCommand; |
| import org.apache.solr.update.CommitUpdateCommand; |
| import org.apache.solr.update.DeleteUpdateCommand; |
| import org.apache.solr.update.SolrCmdDistributor; |
| import org.apache.solr.update.SolrCmdDistributor.Error; |
| import org.apache.solr.update.SolrCmdDistributor.Node; |
| import org.apache.solr.update.SolrCmdDistributor.ForwardNode; |
| import org.apache.solr.update.SolrCmdDistributor.StdNode; |
| import org.apache.solr.update.SolrIndexSplitter; |
| import org.apache.solr.update.UpdateCommand; |
| import org.apache.solr.update.UpdateLog; |
| import org.apache.solr.update.UpdateShardHandler; |
| import org.apache.solr.update.VersionBucket; |
| import org.apache.solr.update.VersionInfo; |
| import org.apache.solr.util.TestInjection; |
| import org.apache.solr.util.TimeOut; |
| import org.apache.zookeeper.KeeperException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.solr.common.params.CommonParams.DISTRIB; |
| import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; |
| |
| // NOT mt-safe... create a new processor for each add thread |
| // TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for |
| public class DistributedUpdateProcessor extends UpdateRequestProcessor { |
| |
| final static String PARAM_WHITELIST_CTX_KEY = DistributedUpdateProcessor.class + "PARAM_WHITELIST_CTX_KEY"; |
| public static final String DISTRIB_FROM_SHARD = "distrib.from.shard"; |
| public static final String DISTRIB_FROM_COLLECTION = "distrib.from.collection"; |
| public static final String DISTRIB_FROM_PARENT = "distrib.from.parent"; |
| public static final String DISTRIB_FROM = "distrib.from"; |
| public static final String DISTRIB_INPLACE_PREVVERSION = "distrib.inplace.prevversion"; |
| private static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers"; |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| /** |
| * Request forwarded to a leader of a different shard will be retried up to this amount of times by default |
| */ |
| static final int MAX_RETRIES_ON_FORWARD_DEAULT = 25; |
| |
| /** |
| * Requests from leader to it's followers will be retried this amount of times by default |
| */ |
| static final int MAX_RETRIES_TO_FOLLOWERS_DEFAULT = 3; |
| |
| /** |
| * Values this processor supports for the <code>DISTRIB_UPDATE_PARAM</code>. |
| * This is an implementation detail exposed solely for tests. |
| * |
| * @see DistributingUpdateProcessorFactory#DISTRIB_UPDATE_PARAM |
| */ |
| public static enum DistribPhase { |
| NONE, TOLEADER, FROMLEADER; |
| |
| public static DistribPhase parseParam(final String param) { |
| if (param == null || param.trim().isEmpty()) { |
| return NONE; |
| } |
| try { |
| return valueOf(param); |
| } catch (IllegalArgumentException e) { |
| throw new SolrException |
| (SolrException.ErrorCode.BAD_REQUEST, "Illegal value for " + |
| DISTRIB_UPDATE_PARAM + ": " + param, e); |
| } |
| } |
| } |
| |
| public static final String COMMIT_END_POINT = "commit_end_point"; |
| public static final String LOG_REPLAY = "log_replay"; |
| |
| // used to assert we don't call finish more than once, see finish() |
| private boolean finished = false; |
| |
| private final SolrQueryRequest req; |
| private final SolrQueryResponse rsp; |
| private final UpdateRequestProcessor next; |
| private final AtomicUpdateDocumentMerger docMerger; |
| |
| private final UpdateLog ulog; |
| private final VersionInfo vinfo; |
| private final boolean versionsStored; |
| private boolean returnVersions; |
| |
| private NamedList<Object> addsResponse = null; |
| private NamedList<Object> deleteResponse = null; |
| private NamedList<Object> deleteByQueryResponse = null; |
| private CharsRefBuilder scratch; |
| |
| private final SchemaField idField; |
| |
| private SolrCmdDistributor cmdDistrib; |
| |
| private final boolean zkEnabled; |
| |
| private final CloudDescriptor cloudDesc; |
| private final String collection; |
| private final ZkController zkController; |
| |
| // these are setup at the start of each request processing |
| // method in this update processor |
| private boolean isLeader = true; |
| private boolean forwardToLeader = false; |
| private boolean isSubShardLeader = false; |
| private List<Node> nodes; |
| private Set<String> skippedCoreNodeNames; |
| private boolean isIndexChanged = false; |
| |
| /** |
| * Number of times requests forwarded to some other shard's leader can be retried |
| */ |
| private final int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD_DEAULT; |
| /** |
| * Number of times requests from leaders to followers can be retried |
| */ |
| private final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT; |
| |
| private UpdateCommand updateCommand; // the current command this processor is working on. |
| |
| //used for keeping track of replicas that have processed an add/update from the leader |
| private RollupRequestReplicationTracker rollupReplicationTracker = null; |
| private LeaderRequestReplicationTracker leaderReplicationTracker = null; |
| |
| // should we clone the document before sending it to replicas? |
| // this is set to true in the constructor if the next processors in the chain |
| // are custom and may modify the SolrInputDocument racing with its serialization for replication |
| private final boolean cloneRequiredOnLeader; |
| private final Replica.Type replicaType; |
| |
| public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { |
| this(req, rsp, new AtomicUpdateDocumentMerger(req), next); |
| } |
| |
| /** Specification of AtomicUpdateDocumentMerger is currently experimental. |
| * @lucene.experimental |
| */ |
| public DistributedUpdateProcessor(SolrQueryRequest req, |
| SolrQueryResponse rsp, AtomicUpdateDocumentMerger docMerger, UpdateRequestProcessor next) { |
| super(next); |
| this.rsp = rsp; |
| this.next = next; |
| this.docMerger = docMerger; |
| this.idField = req.getSchema().getUniqueKeyField(); |
| // version init |
| |
| this.ulog = req.getCore().getUpdateHandler().getUpdateLog(); |
| this.vinfo = ulog == null ? null : ulog.getVersionInfo(); |
| versionsStored = this.vinfo != null && this.vinfo.getVersionField() != null; |
| returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false); |
| |
| // TODO: better way to get the response, or pass back info to it? |
| // SolrRequestInfo reqInfo = returnVersions ? SolrRequestInfo.getRequestInfo() : null; |
| |
| this.req = req; |
| |
| // this should always be used - see filterParams |
| DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist |
| (this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, CommonParams.VERSION_FIELD, |
| UpdateParams.EXPUNGE_DELETES, UpdateParams.OPTIMIZE, UpdateParams.MAX_OPTIMIZE_SEGMENTS); |
| |
| CoreContainer cc = req.getCore().getCoreContainer(); |
| |
| this.zkEnabled = cc.isZooKeeperAware(); |
| zkController = cc.getZkController(); |
| if (zkEnabled) { |
| cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler()); |
| } |
| //this.rsp = reqInfo != null ? reqInfo.getRsp() : null; |
| cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor(); |
| |
| if (cloudDesc != null) { |
| collection = cloudDesc.getCollectionName(); |
| replicaType = cloudDesc.getReplicaType(); |
| } else { |
| collection = null; |
| replicaType = Replica.Type.NRT; |
| } |
| |
| boolean shouldClone = false; |
| UpdateRequestProcessor nextInChain = next; |
| while (nextInChain != null) { |
| Class<? extends UpdateRequestProcessor> klass = nextInChain.getClass(); |
| if (klass != LogUpdateProcessorFactory.LogUpdateProcessor.class |
| && klass != RunUpdateProcessor.class |
| && klass != TolerantUpdateProcessor.class) { |
| shouldClone = true; |
| break; |
| } |
| nextInChain = nextInChain.next; |
| } |
| cloneRequiredOnLeader = shouldClone; |
| } |
| |
| private List<Node> setupRequest(String id, SolrInputDocument doc) { |
| return setupRequest(id, doc, null); |
| } |
| |
| private List<Node> setupRequest(String id, SolrInputDocument doc, String route) { |
| // if we are in zk mode... |
| if (!zkEnabled) { |
| return null; |
| } |
| |
| assert TestInjection.injectUpdateRandomPause(); |
| |
| if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { |
| isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. |
| forwardToLeader = false; |
| return null; |
| } |
| |
| ClusterState cstate = zkController.getClusterState(); |
| DocCollection coll = cstate.getCollection(collection); |
| Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll); |
| |
| if (slice == null) { |
| // No slice found. Most strict routers will have already thrown an exception, so a null return is |
| // a signal to use the slice of this core. |
| // TODO: what if this core is not in the targeted collection? |
| String shardId = cloudDesc.getShardId(); |
| slice = coll.getSlice(shardId); |
| if (slice == null) { |
| throw new SolrException(ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll); |
| } |
| } |
| |
| DistribPhase phase = |
| DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); |
| |
| if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) { |
| if (cloudDesc.isLeader()) { |
| // locally we think we are leader but the request says it came FROMLEADER |
| // that could indicate a problem, let the full logic below figure it out |
| } else { |
| |
| assert TestInjection.injectFailReplicaRequests(); |
| |
| isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. |
| forwardToLeader = false; |
| return null; |
| } |
| } |
| |
| String shardId = slice.getName(); |
| |
| try { |
| // Not equivalent to getLeaderProps, which retries to find a leader. |
| // Replica leader = slice.getLeader(); |
| Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId); |
| isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName()); |
| |
| if (!isLeader) { |
| isSubShardLeader = amISubShardLeader(coll, slice, id, doc); |
| if (isSubShardLeader) { |
| shardId = cloudDesc.getShardId(); |
| leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId); |
| } |
| } |
| |
| doDefensiveChecks(phase); |
| |
| // if request is coming from another collection then we want it to be sent to all replicas |
| // even if its phase is FROMLEADER |
| String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION); |
| |
| if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) { |
| // we are coming from the leader, just go local - add no urls |
| forwardToLeader = false; |
| return null; |
| } else if (isLeader || isSubShardLeader) { |
| // that means I want to forward onto my replicas... |
| // so get the replicas... |
| forwardToLeader = false; |
| ClusterState clusterState = zkController.getZkStateReader().getClusterState(); |
| String leaderCoreNodeName = leaderReplica.getName(); |
| List<Replica> replicas = clusterState.getCollection(collection) |
| .getSlice(shardId) |
| .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); |
| replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName)); |
| if (replicas.isEmpty()) { |
| return null; |
| } |
| |
| // check for test param that lets us miss replicas |
| String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS); |
| Set<String> skipListSet = null; |
| if (skipList != null) { |
| skipListSet = new HashSet<>(skipList.length); |
| skipListSet.addAll(Arrays.asList(skipList)); |
| log.info("test.distrib.skip.servers was found and contains:" + skipListSet); |
| } |
| |
| List<Node> nodes = new ArrayList<>(replicas.size()); |
| skippedCoreNodeNames = new HashSet<>(); |
| ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId); |
| for (Replica replica: replicas) { |
| String coreNodeName = replica.getName(); |
| if (skipList != null && skipListSet.contains(replica.getCoreUrl())) { |
| log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true"); |
| } else if(zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) { |
| log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl()); |
| skippedCoreNodeNames.add(replica.getName()); |
| } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) { |
| skippedCoreNodeNames.add(replica.getName()); |
| } else { |
| nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers)); |
| } |
| } |
| return nodes; |
| |
| } else { |
| // I need to forward on to the leader... |
| forwardToLeader = true; |
| return Collections.singletonList( |
| new ForwardNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId, maxRetriesOnForward)); |
| } |
| |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); |
| } |
| } |
| |
| /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */ |
| private boolean couldIbeSubShardLeader(DocCollection coll) { |
| // Could I be the leader of a shard in "construction/recovery" state? |
| String myShardId = cloudDesc.getShardId(); |
| Slice mySlice = coll.getSlice(myShardId); |
| State state = mySlice.getState(); |
| return state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY; |
| } |
| |
| /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */ |
| private boolean amISubShardLeader(DocCollection coll, Slice parentSlice, String id, SolrInputDocument doc) throws InterruptedException { |
| // Am I the leader of a shard in "construction/recovery" state? |
| String myShardId = cloudDesc.getShardId(); |
| Slice mySlice = coll.getSlice(myShardId); |
| final State state = mySlice.getState(); |
| if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) { |
| Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId); |
| boolean amILeader = myLeader.getName().equals(cloudDesc.getCoreNodeName()); |
| if (amILeader) { |
| // Does the document belong to my hash range as well? |
| DocRouter.Range myRange = mySlice.getRange(); |
| if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE); |
| if (parentSlice != null) { |
| boolean isSubset = parentSlice.getRange() != null && myRange.isSubsetOf(parentSlice.getRange()); |
| return isSubset && coll.getRouter().isTargetSlice(id, doc, req.getParams(), myShardId, coll); |
| } else { |
| // delete by query case -- as long as I am a sub shard leader we're fine |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */ |
| private List<Node> getSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc) { |
| Collection<Slice> allSlices = coll.getSlices(); |
| List<Node> nodes = null; |
| for (Slice aslice : allSlices) { |
| final Slice.State state = aslice.getState(); |
| if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) { |
| DocRouter.Range myRange = coll.getSlice(shardId).getRange(); |
| if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE); |
| boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange); |
| if (isSubset && |
| (docId == null // in case of deletes |
| || coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) { |
| Replica sliceLeader = aslice.getLeader(); |
| // slice leader can be null because node/shard is created zk before leader election |
| if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) { |
| if (nodes == null) nodes = new ArrayList<>(); |
| ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader); |
| nodes.add(new StdNode(nodeProps, coll.getName(), aslice.getName())); |
| } |
| } |
| } |
| } |
| return nodes; |
| } |
| |
| /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#MIGRATE} */ |
| private List<Node> getNodesByRoutingRules(ClusterState cstate, DocCollection coll, String id, SolrInputDocument doc) { |
| DocRouter router = coll.getRouter(); |
| List<Node> nodes = null; |
| if (router instanceof CompositeIdRouter) { |
| CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router; |
| String myShardId = cloudDesc.getShardId(); |
| Slice slice = coll.getSlice(myShardId); |
| Map<String, RoutingRule> routingRules = slice.getRoutingRules(); |
| if (routingRules != null) { |
| |
| // delete by query case |
| if (id == null) { |
| for (Entry<String, RoutingRule> entry : routingRules.entrySet()) { |
| String targetCollectionName = entry.getValue().getTargetCollectionName(); |
| final DocCollection docCollection = cstate.getCollectionOrNull(targetCollectionName); |
| if (docCollection != null && docCollection.getActiveSlicesArr().length > 0) { |
| final Slice[] activeSlices = docCollection.getActiveSlicesArr(); |
| Slice any = activeSlices[0]; |
| if (nodes == null) nodes = new ArrayList<>(); |
| nodes.add(new StdNode(new ZkCoreNodeProps(any.getLeader()))); |
| } |
| } |
| return nodes; |
| } |
| |
| String routeKey = SolrIndexSplitter.getRouteKey(id); |
| if (routeKey != null) { |
| RoutingRule rule = routingRules.get(routeKey + "!"); |
| if (rule != null) { |
| if (! rule.isExpired()) { |
| List<DocRouter.Range> ranges = rule.getRouteRanges(); |
| if (ranges != null && !ranges.isEmpty()) { |
| int hash = compositeIdRouter.sliceHash(id, doc, null, coll); |
| for (DocRouter.Range range : ranges) { |
| if (range.includes(hash)) { |
| DocCollection targetColl = cstate.getCollection(rule.getTargetCollectionName()); |
| Collection<Slice> activeSlices = targetColl.getRouter().getSearchSlicesSingle(id, null, targetColl); |
| if (activeSlices == null || activeSlices.isEmpty()) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, |
| "No active slices serving " + id + " found for target collection: " + rule.getTargetCollectionName()); |
| } |
| Replica targetLeader = targetColl.getLeader(activeSlices.iterator().next().getName()); |
| nodes = new ArrayList<>(1); |
| nodes.add(new StdNode(new ZkCoreNodeProps(targetLeader))); |
| break; |
| } |
| } |
| } |
| } else { |
| ReentrantLock ruleExpiryLock = req.getCore().getRuleExpiryLock(); |
| if (!ruleExpiryLock.isLocked()) { |
| try { |
| if (ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) { |
| log.info("Going to expire routing rule"); |
| try { |
| Map<String, Object> map = Utils.makeMap(Overseer.QUEUE_OPERATION, OverseerAction.REMOVEROUTINGRULE.toLower(), |
| ZkStateReader.COLLECTION_PROP, collection, |
| ZkStateReader.SHARD_ID_PROP, myShardId, |
| "routeKey", routeKey + "!"); |
| SolrZkClient zkClient = zkController.getZkClient(); |
| DistributedQueue queue = Overseer.getStateUpdateQueue(zkClient); |
| queue.offer(Utils.toJSON(map)); |
| } catch (KeeperException e) { |
| log.warn("Exception while removing routing rule for route key: " + routeKey, e); |
| } catch (Exception e) { |
| log.error("Exception while removing routing rule for route key: " + routeKey, e); |
| } finally { |
| ruleExpiryLock.unlock(); |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| return nodes; |
| } |
| |
| private void doDefensiveChecks(DistribPhase phase) { |
| boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; |
| if (isReplayOrPeersync) return; |
| |
| String from = req.getParams().get(DISTRIB_FROM); |
| ClusterState clusterState = zkController.getClusterState(); |
| |
| DocCollection docCollection = clusterState.getCollection(collection); |
| Slice mySlice = docCollection.getSlice(cloudDesc.getShardId()); |
| boolean localIsLeader = cloudDesc.isLeader(); |
| if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay |
| String fromShard = req.getParams().get(DISTRIB_FROM_PARENT); |
| if (fromShard != null) { |
| if (mySlice.getState() == Slice.State.ACTIVE) { |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, |
| "Request says it is coming from parent shard leader but we are in active state"); |
| } |
| // shard splitting case -- check ranges to see if we are a sub-shard |
| Slice fromSlice = docCollection.getSlice(fromShard); |
| DocRouter.Range parentRange = fromSlice.getRange(); |
| if (parentRange == null) parentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE); |
| if (mySlice.getRange() != null && !mySlice.getRange().isSubsetOf(parentRange)) { |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, |
| "Request says it is coming from parent shard leader but parent hash range is not superset of my range"); |
| } |
| } else { |
| String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule? |
| if (fromCollection == null) { |
| log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString()); |
| SolrException solrExc = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader"); |
| solrExc.setMetadata("cause", "LeaderChanged"); |
| throw solrExc; |
| } |
| } |
| } |
| |
| if ((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) { |
| log.error("ClusterState says we are the leader, but locally we don't think so"); |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, |
| "ClusterState says we are the leader (" + zkController.getBaseUrl() |
| + "/" + req.getCore().getName() + "), but locally we don't think so. Request came from " + from); |
| } |
| } |
| |
| |
| // used for deleteByQuery to get the list of nodes this leader should forward to |
| private List<Node> setupRequestForDBQ() { |
| List<Node> nodes = null; |
| String shardId = cloudDesc.getShardId(); |
| |
| try { |
| Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId); |
| isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName()); |
| |
| // TODO: what if we are no longer the leader? |
| |
| forwardToLeader = false; |
| List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader() |
| .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); |
| if (replicaProps != null) { |
| nodes = new ArrayList<>(replicaProps.size()); |
| for (ZkCoreNodeProps props : replicaProps) { |
| nodes.add(new StdNode(props, collection, shardId)); |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } |
| |
| return nodes; |
| } |
| |
| |
| @Override |
| public void processAdd(AddUpdateCommand cmd) throws IOException { |
| |
| assert TestInjection.injectFailUpdateRequests(); |
| |
| updateCommand = cmd; |
| |
| if (zkEnabled) { |
| zkCheck(); |
| nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument()); |
| } else { |
| isLeader = getNonZkLeaderAssumption(req); |
| } |
| |
| // check if client has requested minimum replication factor information. will set replicationTracker to null if |
| // we aren't the leader or subShardLeader |
| checkReplicationTracker(cmd); |
| |
| // If we were sent a previous version, set this to the AddUpdateCommand (if not already set) |
| if (!cmd.isInPlaceUpdate()) { |
| cmd.prevVersion = cmd.getReq().getParams().getLong(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, -1); |
| } |
| // TODO: if minRf > 1 and we know the leader is the only active replica, we could fail |
| // the request right here but for now I think it is better to just return the status |
| // to the client that the minRf wasn't reached and let them handle it |
| |
| boolean dropCmd = false; |
| if (!forwardToLeader) { |
| dropCmd = versionAdd(cmd); |
| } |
| |
| if (dropCmd) { |
| // TODO: do we need to add anything to the response? |
| return; |
| } |
| |
| if (zkEnabled && isLeader && !isSubShardLeader) { |
| DocCollection coll = zkController.getClusterState().getCollection(collection); |
| List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument()); |
| // the list<node> will actually have only one element for an add request |
| if (subShardLeaders != null && !subShardLeaders.isEmpty()) { |
| ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); |
| params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); |
| params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( |
| zkController.getBaseUrl(), req.getCore().getName())); |
| params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId()); |
| cmdDistrib.distribAdd(cmd, subShardLeaders, params, true); |
| } |
| final List<Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument()); |
| if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { |
| ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); |
| params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); |
| params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( |
| zkController.getBaseUrl(), req.getCore().getName())); |
| params.set(DISTRIB_FROM_COLLECTION, collection); |
| params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId()); |
| cmdDistrib.distribAdd(cmd, nodesByRoutingRules, params, true); |
| } |
| } |
| |
| if (nodes != null) { |
| ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); |
| params.set(DISTRIB_UPDATE_PARAM, |
| (isLeader || isSubShardLeader ? |
| DistribPhase.FROMLEADER.toString() : |
| DistribPhase.TOLEADER.toString())); |
| params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( |
| zkController.getBaseUrl(), req.getCore().getName())); |
| |
| if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) { |
| // TODO: Kept for rolling upgrades only. Should be removed in Solr 9 |
| params.set(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT)); |
| } |
| |
| if (cmd.isInPlaceUpdate()) { |
| params.set(DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion)); |
| |
| // Use synchronous=true so that a new connection is used, instead |
| // of the update being streamed through an existing streaming client. |
| // When using a streaming client, the previous update |
| // and the current in-place update (that depends on the previous update), if reordered |
| // in the stream, can result in the current update being bottled up behind the previous |
| // update in the stream and can lead to degraded performance. |
| cmdDistrib.distribAdd(cmd, nodes, params, true, rollupReplicationTracker, leaderReplicationTracker); |
| } else { |
| cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker); |
| } |
| } |
| |
| // TODO: what to do when no idField? |
| if (returnVersions && rsp != null && idField != null) { |
| if (addsResponse == null) { |
| addsResponse = new NamedList<>(1); |
| rsp.add("adds",addsResponse); |
| } |
| if (scratch == null) scratch = new CharsRefBuilder(); |
| idField.getType().indexedToReadable(cmd.getIndexedId(), scratch); |
| addsResponse.add(scratch.toString(), cmd.getVersion()); |
| } |
| |
| // TODO: keep track of errors? needs to be done at a higher level though since |
| // an id may fail before it gets to this processor. |
| // Given that, it may also make sense to move the version reporting out of this |
| // processor too. |
| |
| } |
| |
| // helper method, processAdd was getting a bit large. |
| // Sets replicationTracker = null if we aren't the leader |
| // We have two possibilities here: |
| // |
| // 1> we are a leader: Allocate a LeaderTracker and, if we're getting the original request, a RollupTracker |
| // 2> we're a follower: allocat a RollupTracker |
| // |
| private void checkReplicationTracker(UpdateCommand cmd) { |
| if (zkEnabled == false) { |
| rollupReplicationTracker = null; // never need one of these in stand-alone |
| leaderReplicationTracker = null; |
| return; |
| } |
| |
| SolrParams rp = cmd.getReq().getParams(); |
| String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM); |
| // Ok,we're receiving the original request, we need a rollup tracker, but only one so we accumulate over the |
| // course of a batch. |
| if ((distribUpdate == null || DistribPhase.NONE.toString().equals(distribUpdate)) && |
| rollupReplicationTracker == null) { |
| rollupReplicationTracker = new RollupRequestReplicationTracker(); |
| } |
| // If we're a leader, we need a leader replication tracker, so let's do that. If there are multiple docs in |
| // a batch we need to use the _same_ leader replication tracker. |
| if (isLeader && leaderReplicationTracker == null) { |
| leaderReplicationTracker = new LeaderRequestReplicationTracker( |
| req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); |
| } |
| } |
| |
| |
| @Override |
| protected void doClose() { |
| if (cmdDistrib != null) { |
| cmdDistrib.close(); |
| } |
| } |
| |
| // TODO: optionally fail if n replicas are not reached... |
| private void doFinish() { |
| boolean shouldUpdateTerms = isLeader && isIndexChanged; |
| if (shouldUpdateTerms) { |
| ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()); |
| if (skippedCoreNodeNames != null) { |
| zkShardTerms.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), skippedCoreNodeNames); |
| } |
| zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero(); |
| } |
| // TODO: if not a forward and replication req is not specified, we could |
| // send in a background thread |
| |
| cmdDistrib.finish(); |
| List<Error> errors = cmdDistrib.getErrors(); |
| // TODO - we may need to tell about more than one error... |
| |
| List<Error> errorsForClient = new ArrayList<>(errors.size()); |
| Set<String> replicasShouldBeInLowerTerms = new HashSet<>(); |
| for (final SolrCmdDistributor.Error error : errors) { |
| |
| if (error.req.node instanceof ForwardNode) { |
| // if it's a forward, any fail is a problem - |
| // otherwise we assume things are fine if we got it locally |
| // until we start allowing min replication param |
| errorsForClient.add(error); |
| continue; |
| } |
| |
| // else... |
| |
| // for now we don't error - we assume if it was added locally, we |
| // succeeded |
| if (log.isWarnEnabled()) { |
| log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e); |
| } |
| |
| // Since it is not a forward request, for each fail, try to tell them to |
| // recover - the doc was already added locally, so it should have been |
| // legit |
| |
| DistribPhase phase = DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM)); |
| if (phase != DistribPhase.FROMLEADER) |
| continue; // don't have non-leaders try to recovery other nodes |
| |
| // commits are special -- they can run on any node irrespective of whether it is a leader or not |
| // we don't want to run recovery on a node which missed a commit command |
| if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null) |
| continue; |
| |
| final String replicaUrl = error.req.node.getUrl(); |
| |
| // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request |
| String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null; |
| if ("LeaderChanged".equals(cause)) { |
| // let's just fail this request and let the client retry? or just call processAdd again? |
| log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+ |
| " now thinks it is the leader! Failing the request to let the client retry! "+error.e); |
| errorsForClient.add(error); |
| continue; |
| } |
| |
| String collection = null; |
| String shardId = null; |
| |
| if (error.req.node instanceof StdNode) { |
| StdNode stdNode = (StdNode)error.req.node; |
| collection = stdNode.getCollection(); |
| shardId = stdNode.getShardId(); |
| |
| // before we go setting other replicas to down, make sure we're still the leader! |
| String leaderCoreNodeName = null; |
| Exception getLeaderExc = null; |
| Replica leaderProps = null; |
| try { |
| leaderProps = zkController.getZkStateReader().getLeader(collection, shardId); |
| if (leaderProps != null) { |
| leaderCoreNodeName = leaderProps.getName(); |
| } |
| } catch (Exception exc) { |
| getLeaderExc = exc; |
| } |
| if (leaderCoreNodeName == null) { |
| log.warn("Failed to determine if {} is still the leader for collection={} shardId={} " + |
| "before putting {} into leader-initiated recovery", |
| cloudDesc.getCoreNodeName(), collection, shardId, replicaUrl, getLeaderExc); |
| } |
| |
| List<ZkCoreNodeProps> myReplicas = zkController.getZkStateReader().getReplicaProps(collection, |
| cloudDesc.getShardId(), cloudDesc.getCoreNodeName()); |
| boolean foundErrorNodeInReplicaList = false; |
| if (myReplicas != null) { |
| for (ZkCoreNodeProps replicaProp : myReplicas) { |
| if (((Replica) replicaProp.getNodeProps()).getName().equals(((Replica)stdNode.getNodeProps().getNodeProps()).getName())) { |
| foundErrorNodeInReplicaList = true; |
| break; |
| } |
| } |
| } |
| |
| if (leaderCoreNodeName != null && cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) // we are still same leader |
| && foundErrorNodeInReplicaList // we found an error for one of replicas |
| && !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR |
| try { |
| String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName(); |
| // if false, then the node is probably not "live" anymore |
| // and we do not need to send a recovery message |
| Throwable rootCause = SolrException.getRootCause(error.e); |
| log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause); |
| replicasShouldBeInLowerTerms.add(coreNodeName); |
| } catch (Exception exc) { |
| Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc); |
| log.error("Leader failed to set replica " + |
| error.req.node.getUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause); |
| } |
| } else { |
| // not the leader anymore maybe or the error'd node is not my replica? |
| if (!foundErrorNodeInReplicaList) { |
| log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+ |
| cloudDesc.getShardId()+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " + |
| "No request recovery command will be sent!"); |
| if (!shardId.equals(cloudDesc.getShardId())) { |
| // some replicas on other shard did not receive the updates (ex: during splitshard), |
| // exception must be notified to clients |
| errorsForClient.add(error); |
| } |
| } else { |
| log.warn("Core " + cloudDesc.getCoreNodeName() + " is no longer the leader for " + collection + " " |
| + shardId + " or we tried to put ourself into LIR, no request recovery command will be sent!"); |
| } |
| } |
| } |
| } |
| if (!replicasShouldBeInLowerTerms.isEmpty()) { |
| zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()) |
| .ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms); |
| } |
| handleReplicationFactor(); |
| if (0 < errorsForClient.size()) { |
| throw new DistributedUpdatesAsyncException(errorsForClient); |
| } |
| } |
| |
| /** |
| * If necessary, include in the response the achieved replication factor |
| */ |
| @SuppressWarnings("deprecation") |
| private void handleReplicationFactor() { |
| if (leaderReplicationTracker != null || rollupReplicationTracker != null) { |
| int achievedRf = Integer.MAX_VALUE; |
| |
| if (leaderReplicationTracker != null) { |
| |
| achievedRf = leaderReplicationTracker.getAchievedRf(); |
| |
| // Transfer this to the rollup tracker if it exists |
| if (rollupReplicationTracker != null) { |
| rollupReplicationTracker.testAndSetAchievedRf(achievedRf); |
| } |
| } |
| |
| // Rollup tracker has accumulated stats. |
| if (rollupReplicationTracker != null) { |
| achievedRf = rollupReplicationTracker.getAchievedRf(); |
| } |
| if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) { |
| // Unused, but kept for back compatibility. To be removed in Solr 9 |
| rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, Integer.parseInt(req.getParams().get(UpdateRequest.MIN_REPFACT))); |
| } |
| rsp.getResponseHeader().add(UpdateRequest.REPFACT, achievedRf); |
| rollupReplicationTracker = null; |
| leaderReplicationTracker = null; |
| |
| } |
| } |
| |
| // must be synchronized by bucket |
| private void doLocalAdd(AddUpdateCommand cmd) throws IOException { |
| super.processAdd(cmd); |
| isIndexChanged = true; |
| } |
| |
| // must be synchronized by bucket |
| private void doLocalDelete(DeleteUpdateCommand cmd) throws IOException { |
| super.processDelete(cmd); |
| isIndexChanged = true; |
| } |
| |
| public static int bucketHash(BytesRef idBytes) { |
| assert idBytes != null; |
| return Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0); |
| } |
| |
| /** |
| * @return whether or not to drop this cmd |
| * @throws IOException If there is a low-level I/O error. |
| */ |
| protected boolean versionAdd(AddUpdateCommand cmd) throws IOException { |
| BytesRef idBytes = cmd.getIndexedId(); |
| |
| if (idBytes == null) { |
| super.processAdd(cmd); |
| return false; |
| } |
| |
| if (vinfo == null) { |
| if (AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) { |
| throw new SolrException |
| (SolrException.ErrorCode.BAD_REQUEST, |
| "Atomic document updates are not supported unless <updateLog/> is configured"); |
| } else { |
| super.processAdd(cmd); |
| return false; |
| } |
| } |
| |
| // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here) |
| int bucketHash = bucketHash(idBytes); |
| |
| // at this point, there is an update we need to try and apply. |
| // we may or may not be the leader. |
| |
| // Find any existing version in the document |
| // TODO: don't reuse update commands any more! |
| long versionOnUpdate = cmd.getVersion(); |
| |
| if (versionOnUpdate == 0) { |
| SolrInputField versionField = cmd.getSolrInputDocument().getField(CommonParams.VERSION_FIELD); |
| if (versionField != null) { |
| Object o = versionField.getValue(); |
| versionOnUpdate = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString()); |
| } else { |
| // Find the version |
| String versionOnUpdateS = req.getParams().get(CommonParams.VERSION_FIELD); |
| versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS); |
| } |
| } |
| |
| boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; |
| boolean leaderLogic = isLeader && !isReplayOrPeersync; |
| boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null; |
| |
| VersionBucket bucket = vinfo.bucket(bucketHash); |
| |
| long dependentVersionFound = -1; |
| // if this is an in-place update, check and wait if we should be waiting for a previous update (on which |
| // this update depends), before entering the synchronized block |
| if (!leaderLogic && cmd.isInPlaceUpdate()) { |
| dependentVersionFound = waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket); |
| if (dependentVersionFound == -1) { |
| // it means the document has been deleted by now at the leader. drop this update |
| return true; |
| } |
| } |
| |
| vinfo.lockForUpdate(); |
| try { |
| synchronized (bucket) { |
| bucket.notifyAll(); //just in case anyone is waiting let them know that we have a new update |
| // we obtain the version when synchronized and then do the add so we can ensure that |
| // if version1 < version2 then version1 is actually added before version2. |
| |
| // even if we don't store the version field, synchronizing on the bucket |
| // will enable us to know what version happened first, and thus enable |
| // realtime-get to work reliably. |
| // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason? |
| // there may be other reasons in the future for a version on the commands |
| |
| if (versionsStored) { |
| |
| long bucketVersion = bucket.highest; |
| |
| if (leaderLogic) { |
| |
| if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) { |
| // forwarded from a collection but we are not buffering so strip original version and apply our own |
| // see SOLR-5308 |
| log.info("Removing version field from doc: " + cmd.getPrintableId()); |
| cmd.solrDoc.remove(CommonParams.VERSION_FIELD); |
| versionOnUpdate = 0; |
| } |
| |
| boolean updated = getUpdatedDocument(cmd, versionOnUpdate); |
| |
| // leaders can also be in buffering state during "migrate" API call, see SOLR-5308 |
| if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE |
| && isReplayOrPeersync == false) { |
| // we're not in an active state, and this update isn't from a replay, so buffer it. |
| log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId()); |
| cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); |
| ulog.add(cmd); |
| return true; |
| } |
| |
| if (versionOnUpdate != 0) { |
| Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); |
| long foundVersion = lastVersion == null ? -1 : lastVersion; |
| if ( versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0) || (versionOnUpdate==1 && foundVersion > 0) ) { |
| // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd |
| // specified it must exist (versionOnUpdate==1) and it does. |
| } else { |
| throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate + " actual=" + foundVersion); |
| } |
| } |
| |
| |
| long version = vinfo.getNewClock(); |
| cmd.setVersion(version); |
| cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version); |
| bucket.updateHighest(version); |
| } else { |
| // The leader forwarded us this update. |
| cmd.setVersion(versionOnUpdate); |
| |
| if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) { |
| // we're not in an active state, and this update isn't from a replay, so buffer it. |
| cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); |
| ulog.add(cmd); |
| return true; |
| } |
| |
| if (cmd.isInPlaceUpdate()) { |
| long prev = cmd.prevVersion; |
| Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); |
| if (lastVersion == null || Math.abs(lastVersion) < prev) { |
| // this was checked for (in waitForDependentUpdates()) before entering the synchronized block. |
| // So we shouldn't be here, unless what must've happened is: |
| // by the time synchronization block was entered, the prev update was deleted by DBQ. Since |
| // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version |
| // from the deleted list (which might be older than the prev update!) |
| UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate); |
| |
| if (fetchedFromLeader instanceof DeleteUpdateCommand) { |
| log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document" |
| + " was deleted at the leader subsequently.", idBytes.utf8ToString()); |
| versionDelete((DeleteUpdateCommand)fetchedFromLeader); |
| return true; |
| } else { |
| assert fetchedFromLeader instanceof AddUpdateCommand; |
| // Newer document was fetched from the leader. Apply that document instead of this current in-place update. |
| log.info("In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}", |
| idBytes.utf8ToString(), fetchedFromLeader); |
| |
| // Make this update to become a non-inplace update containing the full document obtained from the leader |
| cmd.solrDoc = ((AddUpdateCommand)fetchedFromLeader).solrDoc; |
| cmd.prevVersion = -1; |
| cmd.setVersion((long)cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD)); |
| assert cmd.isInPlaceUpdate() == false; |
| } |
| } else { |
| if (lastVersion != null && Math.abs(lastVersion) > prev) { |
| // this means we got a newer full doc update and in that case it makes no sense to apply the older |
| // inplace update. Drop this update |
| log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion |
| + ". Dropping current update."); |
| return true; |
| } else { |
| // We're good, we should apply this update. First, update the bucket's highest. |
| if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { |
| bucket.updateHighest(versionOnUpdate); |
| } |
| } |
| } |
| } else { |
| // if we aren't the leader, then we need to check that updates were not re-ordered |
| if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { |
| // we're OK... this update has a version higher than anything we've seen |
| // in this bucket so far, so we know that no reordering has yet occurred. |
| bucket.updateHighest(versionOnUpdate); |
| } else { |
| // there have been updates higher than the current update. we need to check |
| // the specific version for this id. |
| Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); |
| if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { |
| // This update is a repeat, or was reordered. We need to drop this update. |
| log.debug("Dropping add update due to version {}", idBytes.utf8ToString()); |
| return true; |
| } |
| } |
| } |
| if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { |
| cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); |
| } |
| } |
| } |
| |
| boolean willDistrib = isLeader && nodes != null && nodes.size() > 0; |
| |
| SolrInputDocument clonedDoc = null; |
| if (willDistrib && cloneRequiredOnLeader) { |
| clonedDoc = cmd.solrDoc.deepCopy(); |
| } |
| |
| // TODO: possibly set checkDeleteByQueries as a flag on the command? |
| doLocalAdd(cmd); |
| |
| if (willDistrib && cloneRequiredOnLeader) { |
| cmd.solrDoc = clonedDoc; |
| } |
| |
| } // end synchronized (bucket) |
| } finally { |
| vinfo.unlockForUpdate(); |
| } |
| return false; |
| } |
| |
| @VisibleForTesting |
| boolean shouldBufferUpdate(AddUpdateCommand cmd, boolean isReplayOrPeersync, UpdateLog.State state) { |
| if (state == UpdateLog.State.APPLYING_BUFFERED |
| && !isReplayOrPeersync |
| && !cmd.isInPlaceUpdate()) { |
| // this a new update sent from the leader, it contains whole document therefore it won't depend on other updates |
| return false; |
| } |
| |
| return state != UpdateLog.State.ACTIVE && isReplayOrPeersync == false; |
| } |
| |
| /** |
| * This method checks the update/transaction logs and index to find out if the update ("previous update") that the current update |
| * depends on (in the case that this current update is an in-place update) has already been completed. If not, |
| * this method will wait for the missing update until it has arrived. If it doesn't arrive within a timeout threshold, |
| * then this actively fetches from the leader. |
| * |
| * @return -1 if the current in-place should be dropped, or last found version if previous update has been indexed. |
| */ |
| private long waitForDependentUpdates(AddUpdateCommand cmd, long versionOnUpdate, |
| boolean isReplayOrPeersync, VersionBucket bucket) throws IOException { |
| long lastFoundVersion = 0; |
| TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| |
| vinfo.lockForUpdate(); |
| try { |
| synchronized (bucket) { |
| Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId()); |
| lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion; |
| |
| if (Math.abs(lastFoundVersion) < cmd.prevVersion) { |
| log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}", |
| (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion, isReplayOrPeersync, cmd.getPrintableId()); |
| } |
| |
| while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) { |
| try { |
| long timeLeft = waitTimeout.timeLeft(TimeUnit.MILLISECONDS); |
| if (timeLeft > 0) { // wait(0) waits forever until notified, but we don't want that. |
| bucket.wait(timeLeft); |
| } |
| } catch (InterruptedException ie) { |
| throw new RuntimeException(ie); |
| } |
| lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId()); |
| lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion; |
| } |
| } |
| } finally { |
| vinfo.unlockForUpdate(); |
| } |
| |
| if (Math.abs(lastFoundVersion) > cmd.prevVersion) { |
| // This must've been the case due to a higher version full update succeeding concurrently, while we were waiting or |
| // trying to index this partial update. Since a full update more recent than this partial update has succeeded, |
| // we can drop the current update. |
| if (log.isDebugEnabled()) { |
| log.debug("Update was applied on version: {}, but last version I have is: {}" |
| + ". Current update should be dropped. id={}", cmd.prevVersion, lastFoundVersion, cmd.getPrintableId()); |
| } |
| return -1; |
| } else if (Math.abs(lastFoundVersion) == cmd.prevVersion) { |
| assert 0 < lastFoundVersion : "prevVersion " + cmd.prevVersion + " found but is a delete!"; |
| if (log.isDebugEnabled()) { |
| log.debug("Dependent update found. id={}", cmd.getPrintableId()); |
| } |
| return lastFoundVersion; |
| } |
| |
| // We have waited enough, but dependent update didn't arrive. Its time to actively fetch it from leader |
| log.info("Missing update, on which current in-place update depends on, hasn't arrived. id={}, looking for version={}, last found version={}", |
| cmd.getPrintableId(), cmd.prevVersion, lastFoundVersion); |
| |
| UpdateCommand missingUpdate = fetchFullUpdateFromLeader(cmd, versionOnUpdate); |
| if (missingUpdate instanceof DeleteUpdateCommand) { |
| log.info("Tried to fetch document {} from the leader, but the leader says document has been deleted. " |
| + "Deleting the document here and skipping this update: Last found version: {}, was looking for: {}", cmd.getPrintableId(), lastFoundVersion, cmd.prevVersion); |
| versionDelete((DeleteUpdateCommand)missingUpdate); |
| return -1; |
| } else { |
| assert missingUpdate instanceof AddUpdateCommand; |
| log.debug("Fetched the document: {}", ((AddUpdateCommand)missingUpdate).getSolrInputDocument()); |
| versionAdd((AddUpdateCommand)missingUpdate); |
| log.info("Added the fetched document, id="+((AddUpdateCommand)missingUpdate).getPrintableId()+", version="+missingUpdate.getVersion()); |
| } |
| return missingUpdate.getVersion(); |
| } |
| |
| /** |
| * This method is used when an update on which a particular in-place update has been lost for some reason. This method |
| * sends a request to the shard leader to fetch the latest full document as seen on the leader. |
| * @return AddUpdateCommand containing latest full doc at shard leader for the given id, or null if not found. |
| */ |
| private UpdateCommand fetchFullUpdateFromLeader(AddUpdateCommand inplaceAdd, long versionOnUpdate) throws IOException { |
| String id = inplaceAdd.getPrintableId(); |
| UpdateShardHandler updateShardHandler = inplaceAdd.getReq().getCore().getCoreContainer().getUpdateShardHandler(); |
| ModifiableSolrParams params = new ModifiableSolrParams(); |
| params.set(DISTRIB, false); |
| params.set("getInputDocument", id); |
| params.set("onlyIfActive", true); |
| SolrRequest<SimpleSolrResponse> ur = new GenericSolrRequest(METHOD.GET, "/get", params); |
| |
| String leaderUrl = req.getParams().get(DISTRIB_FROM); |
| |
| if (leaderUrl == null) { |
| // An update we're dependent upon didn't arrive! This is unexpected. Perhaps likely our leader is |
| // down or partitioned from us for some reason. Lets force refresh cluster state, and request the |
| // leader for the update. |
| if (zkController == null) { // we should be in cloud mode, but wtf? could be a unit test |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Can't find document with id=" + id + ", but fetching from leader " |
| + "failed since we're not in cloud mode."); |
| } |
| Replica leader; |
| try { |
| leader = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId()); |
| } catch (InterruptedException e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e); |
| } |
| leaderUrl = leader.getCoreUrl(); |
| } |
| |
| NamedList<Object> rsp = null; |
| try (HttpSolrClient hsc = new HttpSolrClient.Builder(leaderUrl). |
| withHttpClient(updateShardHandler.getUpdateOnlyHttpClient()).build()) { |
| rsp = hsc.request(ur); |
| } catch (SolrServerException e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Error during fetching [" + id + |
| "] from leader (" + leaderUrl + "): ", e); |
| } |
| Object inputDocObj = rsp.get("inputDocument"); |
| Long version = (Long)rsp.get("version"); |
| SolrInputDocument leaderDoc = (SolrInputDocument) inputDocObj; |
| |
| if (leaderDoc == null) { |
| // this doc was not found (deleted) on the leader. Lets delete it here as well. |
| DeleteUpdateCommand del = new DeleteUpdateCommand(inplaceAdd.getReq()); |
| del.setIndexedId(inplaceAdd.getIndexedId()); |
| del.setId(inplaceAdd.getIndexedId().utf8ToString()); |
| del.setVersion((version == null || version == 0)? -versionOnUpdate: version); |
| return del; |
| } |
| |
| AddUpdateCommand cmd = new AddUpdateCommand(req); |
| cmd.solrDoc = leaderDoc; |
| cmd.setVersion((long)leaderDoc.getFieldValue(CommonParams.VERSION_FIELD)); |
| return cmd; |
| } |
| |
| // TODO: may want to switch to using optimistic locking in the future for better concurrency |
| // that's why this code is here... need to retry in a loop closely around/in versionAdd |
| boolean getUpdatedDocument(AddUpdateCommand cmd, long versionOnUpdate) throws IOException { |
| if (!AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) return false; |
| |
| Set<String> inPlaceUpdatedFields = AtomicUpdateDocumentMerger.computeInPlaceUpdatableFields(cmd); |
| if (inPlaceUpdatedFields.size() > 0) { // non-empty means this is suitable for in-place updates |
| if (docMerger.doInPlaceUpdateMerge(cmd, inPlaceUpdatedFields)) { |
| return true; |
| } else { |
| // in-place update failed, so fall through and re-try the same with a full atomic update |
| } |
| } |
| |
| // full (non-inplace) atomic update |
| SolrInputDocument sdoc = cmd.getSolrInputDocument(); |
| BytesRef id = cmd.getIndexedId(); |
| SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(cmd.getReq().getCore(), id); |
| |
| if (oldDoc == null) { |
| // create a new doc by default if an old one wasn't found |
| if (versionOnUpdate <= 0) { |
| oldDoc = new SolrInputDocument(); |
| } else { |
| // could just let the optimistic locking throw the error |
| throw new SolrException(ErrorCode.CONFLICT, "Document not found for update. id=" + cmd.getPrintableId()); |
| } |
| } else { |
| oldDoc.remove(CommonParams.VERSION_FIELD); |
| } |
| |
| |
| cmd.solrDoc = docMerger.merge(sdoc, oldDoc); |
| return true; |
| } |
| |
| @Override |
| public void processDelete(DeleteUpdateCommand cmd) throws IOException { |
| |
| assert TestInjection.injectFailUpdateRequests(); |
| |
| updateCommand = cmd; |
| |
| if (!cmd.isDeleteById()) { |
| doDeleteByQuery(cmd); |
| } else { |
| doDeleteById(cmd); |
| } |
| } |
| |
| // Implementing min_rf here was a bit tricky. When a request comes in for a delete by id to a replica that does _not_ |
| // have any documents specified by those IDs, the request is not forwarded to any other replicas on that shard. Thus |
| // we have to spoof the replicationTracker and set the achieved rf to the number of active replicas. |
| // |
| private void doDeleteById(DeleteUpdateCommand cmd) throws IOException { |
| if (zkEnabled) { |
| zkCheck(); |
| nodes = setupRequest(cmd.getId(), null, cmd.getRoute()); |
| } else { |
| isLeader = getNonZkLeaderAssumption(req); |
| } |
| |
| // check if client has requested minimum replication factor information. will set replicationTracker to null if |
| // we aren't the leader or subShardLeader |
| checkReplicationTracker(cmd); |
| |
| boolean dropCmd = false; |
| if (!forwardToLeader) { |
| dropCmd = versionDelete(cmd); |
| } |
| |
| if (dropCmd) { |
| // TODO: do we need to add anything to the response? |
| return; |
| } |
| |
| if (zkEnabled && isLeader && !isSubShardLeader) { |
| DocCollection coll = zkController.getClusterState().getCollection(collection); |
| List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null); |
| // the list<node> will actually have only one element for an add request |
| if (subShardLeaders != null && !subShardLeaders.isEmpty()) { |
| ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); |
| params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); |
| params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( |
| zkController.getBaseUrl(), req.getCore().getName())); |
| params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId()); |
| cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null); |
| } |
| |
| final List<Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null); |
| if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { |
| ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); |
| params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); |
| params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( |
| zkController.getBaseUrl(), req.getCore().getName())); |
| params.set(DISTRIB_FROM_COLLECTION, collection); |
| params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId()); |
| cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, null, null); |
| } |
| } |
| |
| if (nodes != null) { |
| ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); |
| params.set(DISTRIB_UPDATE_PARAM, |
| (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString() |
| : DistribPhase.TOLEADER.toString())); |
| params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( |
| zkController.getBaseUrl(), req.getCore().getName())); |
| |
| if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) { |
| // TODO: Kept for rolling upgrades only. Remove in Solr 9 |
| params.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT)); |
| } |
| cmdDistrib.distribDelete(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker); |
| } |
| |
| // cmd.getIndexId == null when delete by query |
| // TODO: what to do when no idField? |
| if (returnVersions && rsp != null && cmd.getIndexedId() != null && idField != null) { |
| if (deleteResponse == null) { |
| deleteResponse = new NamedList<>(1); |
| rsp.add("deletes",deleteResponse); |
| } |
| if (scratch == null) scratch = new CharsRefBuilder(); |
| idField.getType().indexedToReadable(cmd.getIndexedId(), scratch); |
| deleteResponse.add(scratch.toString(), cmd.getVersion()); // we're returning the version of the delete.. not the version of the doc we deleted. |
| } |
| } |
| |
| /** @see DistributedUpdateProcessorFactory#addParamToDistributedRequestWhitelist */ |
| @SuppressWarnings("unchecked") |
| protected ModifiableSolrParams filterParams(SolrParams params) { |
| ModifiableSolrParams fparams = new ModifiableSolrParams(); |
| |
| Set<String> whitelist = (Set<String>) this.req.getContext().get(PARAM_WHITELIST_CTX_KEY); |
| assert null != whitelist : "whitelist can't be null, constructor adds to it"; |
| |
| for (String p : whitelist) { |
| passParam(params, fparams, p); |
| } |
| return fparams; |
| } |
| |
| private void passParam(SolrParams params, ModifiableSolrParams fparams, String param) { |
| String[] values = params.getParams(param); |
| if (values != null) { |
| for (String value : values) { |
| fparams.add(param, value); |
| } |
| } |
| } |
| |
| public void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { |
| |
| // even in non zk mode, tests simulate updates from a leader |
| if(!zkEnabled) { |
| isLeader = getNonZkLeaderAssumption(req); |
| } else { |
| zkCheck(); |
| } |
| |
| // NONE: we are the first to receive this deleteByQuery |
| // - it must be forwarded to the leader of every shard |
| // TO: we are a leader receiving a forwarded deleteByQuery... we must: |
| // - block all updates (use VersionInfo) |
| // - flush *all* updates going to our replicas |
| // - forward the DBQ to our replicas and wait for the response |
| // - log + execute the local DBQ |
| // FROM: we are a replica receiving a DBQ from our leader |
| // - log + execute the local DBQ |
| DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); |
| |
| DocCollection coll = zkEnabled |
| ? zkController.getClusterState().getCollection(collection) : null; |
| |
| if (zkEnabled && DistribPhase.NONE == phase) { |
| if (rollupReplicationTracker == null) { |
| rollupReplicationTracker = new RollupRequestReplicationTracker(); |
| } |
| boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard |
| |
| ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams())); |
| outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); |
| outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( |
| zkController.getBaseUrl(), req.getCore().getName())); |
| |
| SolrParams params = req.getParams(); |
| String route = params.get(ShardParams._ROUTE_); |
| Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll); |
| |
| List<Node> leaders = new ArrayList<>(slices.size()); |
| for (Slice slice : slices) { |
| String sliceName = slice.getName(); |
| Replica leader; |
| try { |
| leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName); |
| } catch (InterruptedException e) { |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e); |
| } |
| |
| // TODO: What if leaders changed in the meantime? |
| // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader? |
| |
| // Am I the leader for this slice? |
| ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader); |
| String leaderCoreNodeName = leader.getName(); |
| String coreNodeName = cloudDesc.getCoreNodeName(); |
| isLeader = coreNodeName.equals(leaderCoreNodeName); |
| |
| if (isLeader) { |
| // don't forward to ourself |
| leaderForAnyShard = true; |
| } else { |
| leaders.add(new ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward)); |
| } |
| } |
| |
| outParams.remove("commit"); // this will be distributed from the local commit |
| |
| |
| if (params.get(UpdateRequest.MIN_REPFACT) != null) { |
| // TODO: Kept this for rolling upgrades. Remove in Solr 9 |
| outParams.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT)); |
| } |
| cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null); |
| |
| if (!leaderForAnyShard) { |
| return; |
| } |
| |
| // change the phase to TOLEADER so we look up and forward to our own replicas (if any) |
| phase = DistribPhase.TOLEADER; |
| } |
| |
| List<Node> replicas = null; |
| |
| if (zkEnabled && DistribPhase.TOLEADER == phase) { |
| // This core should be a leader |
| isLeader = true; |
| replicas = setupRequestForDBQ(); |
| } else if (DistribPhase.FROMLEADER == phase) { |
| isLeader = false; |
| } |
| |
| |
| // check if client has requested minimum replication factor information. will set replicationTracker to null if |
| // we aren't the leader or subShardLeader |
| checkReplicationTracker(cmd); |
| |
| if (vinfo == null) { |
| super.processDelete(cmd); |
| return; |
| } |
| |
| // at this point, there is an update we need to try and apply. |
| // we may or may not be the leader. |
| |
| boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; |
| boolean leaderLogic = isLeader && !isReplayOrPeersync; |
| versionDeleteByQuery(cmd); |
| if (zkEnabled) { |
| // forward to all replicas |
| ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); |
| params.set(CommonParams.VERSION_FIELD, Long.toString(cmd.getVersion())); |
| params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); |
| params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( |
| zkController.getBaseUrl(), req.getCore().getName())); |
| |
| boolean someReplicas = false; |
| boolean subShardLeader = false; |
| try { |
| subShardLeader = amISubShardLeader(coll, null, null, null); |
| if (subShardLeader) { |
| String myShardId = cloudDesc.getShardId(); |
| Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry( |
| collection, myShardId); |
| // DBQ forwarded to NRT and TLOG replicas |
| List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader() |
| .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); |
| if (replicaProps != null) { |
| final List<Node> myReplicas = new ArrayList<>(replicaProps.size()); |
| for (ZkCoreNodeProps replicaProp : replicaProps) { |
| myReplicas.add(new StdNode(replicaProp, collection, myShardId)); |
| } |
| cmdDistrib.distribDelete(cmd, myReplicas, params, false, rollupReplicationTracker, leaderReplicationTracker); |
| someReplicas = true; |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); |
| } |
| if (leaderLogic) { |
| List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null); |
| if (subShardLeaders != null) { |
| cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker); |
| } |
| final List<Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null); |
| if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { |
| params = new ModifiableSolrParams(filterParams(req.getParams())); |
| params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); |
| params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( |
| zkController.getBaseUrl(), req.getCore().getName())); |
| params.set(DISTRIB_FROM_COLLECTION, collection); |
| params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId()); |
| |
| cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, rollupReplicationTracker, leaderReplicationTracker); |
| } |
| if (replicas != null) { |
| cmdDistrib.distribDelete(cmd, replicas, params, false, rollupReplicationTracker, leaderReplicationTracker); |
| someReplicas = true; |
| } |
| } |
| |
| if (someReplicas) { |
| cmdDistrib.blockAndDoRetries(); |
| } |
| } |
| |
| |
| if (returnVersions && rsp != null) { |
| if (deleteByQueryResponse == null) { |
| deleteByQueryResponse = new NamedList<>(1); |
| rsp.add("deleteByQuery",deleteByQueryResponse); |
| } |
| deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion()); |
| } |
| } |
| |
| protected void versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { |
| // Find the version |
| long versionOnUpdate = cmd.getVersion(); |
| if (versionOnUpdate == 0) { |
| String versionOnUpdateS = req.getParams().get(CommonParams.VERSION_FIELD); |
| versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS); |
| } |
| versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version |
| |
| boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; |
| boolean leaderLogic = isLeader && !isReplayOrPeersync; |
| |
| if (!leaderLogic && versionOnUpdate == 0) { |
| throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader"); |
| } |
| |
| vinfo.blockUpdates(); |
| try { |
| |
| if (versionsStored) { |
| if (leaderLogic) { |
| long version = vinfo.getNewClock(); |
| cmd.setVersion(-version); |
| // TODO update versions in all buckets |
| |
| doLocalDelete(cmd); |
| |
| } else { |
| cmd.setVersion(-versionOnUpdate); |
| |
| if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) { |
| // we're not in an active state, and this update isn't from a replay, so buffer it. |
| cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); |
| ulog.deleteByQuery(cmd); |
| return; |
| } |
| |
| if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { |
| // TLOG replica not leader, don't write the DBQ to IW |
| cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); |
| } |
| doLocalDelete(cmd); |
| } |
| } |
| |
| // since we don't know which documents were deleted, the easiest thing to do is to invalidate |
| // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader |
| // (so cache misses will see up-to-date data) |
| |
| } finally { |
| vinfo.unblockUpdates(); |
| } |
| } |
| |
| // internal helper method to tell if we are the leader for an add or deleteById update |
| // NOTE: not called by this class! |
| boolean isLeader(UpdateCommand cmd) { |
| updateCommand = cmd; |
| |
| if (zkEnabled) { |
| zkCheck(); |
| if (cmd instanceof AddUpdateCommand) { |
| AddUpdateCommand acmd = (AddUpdateCommand)cmd; |
| nodes = setupRequest(acmd.getHashableId(), acmd.getSolrInputDocument()); |
| } else if (cmd instanceof DeleteUpdateCommand) { |
| DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd; |
| nodes = setupRequest(dcmd.getId(), null); |
| } |
| } else { |
| isLeader = getNonZkLeaderAssumption(req); |
| } |
| |
| return isLeader; |
| } |
| |
| private void zkCheck() { |
| |
| // Streaming updates can delay shutdown and cause big update reorderings (new streams can't be |
| // initiated, but existing streams carry on). This is why we check if the CC is shutdown. |
| // See SOLR-8203 and loop HdfsChaosMonkeyNothingIsSafeTest (and check for inconsistent shards) to test. |
| if (req.getCore().getCoreContainer().isShutDown()) { |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is shutting down."); |
| } |
| |
| if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { |
| // for log reply or peer sync, we don't need to be connected to ZK |
| return; |
| } |
| |
| if (!zkController.getZkClient().getConnectionManager().isLikelyExpired()) { |
| return; |
| } |
| |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled."); |
| } |
| |
| protected boolean versionDelete(DeleteUpdateCommand cmd) throws IOException { |
| |
| BytesRef idBytes = cmd.getIndexedId(); |
| |
| if (vinfo == null || idBytes == null) { |
| super.processDelete(cmd); |
| return false; |
| } |
| |
| // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here) |
| int bucketHash = bucketHash(idBytes); |
| |
| // at this point, there is an update we need to try and apply. |
| // we may or may not be the leader. |
| |
| // Find the version |
| long versionOnUpdate = cmd.getVersion(); |
| if (versionOnUpdate == 0) { |
| String versionOnUpdateS = req.getParams().get(CommonParams.VERSION_FIELD); |
| versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS); |
| } |
| long signedVersionOnUpdate = versionOnUpdate; |
| versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version |
| |
| boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; |
| boolean leaderLogic = isLeader && !isReplayOrPeersync; |
| boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null; |
| |
| if (!leaderLogic && versionOnUpdate==0) { |
| throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader"); |
| } |
| |
| VersionBucket bucket = vinfo.bucket(bucketHash); |
| |
| vinfo.lockForUpdate(); |
| try { |
| |
| synchronized (bucket) { |
| if (versionsStored) { |
| long bucketVersion = bucket.highest; |
| |
| if (leaderLogic) { |
| |
| if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) { |
| // forwarded from a collection but we are not buffering so strip original version and apply our own |
| // see SOLR-5308 |
| log.info("Removing version field from doc: " + cmd.getId()); |
| versionOnUpdate = signedVersionOnUpdate = 0; |
| } |
| |
| // leaders can also be in buffering state during "migrate" API call, see SOLR-5308 |
| if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE |
| && !isReplayOrPeersync) { |
| // we're not in an active state, and this update isn't from a replay, so buffer it. |
| log.info("Leader logic applied but update log is buffering: " + cmd.getId()); |
| cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); |
| ulog.delete(cmd); |
| return true; |
| } |
| |
| if (signedVersionOnUpdate != 0) { |
| Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); |
| long foundVersion = lastVersion == null ? -1 : lastVersion; |
| if ( (signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0) || (signedVersionOnUpdate == 1 && foundVersion > 0) ) { |
| // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd |
| // specified it must exist (versionOnUpdate==1) and it does. |
| } else { |
| throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected=" + signedVersionOnUpdate + " actual=" + foundVersion); |
| } |
| } |
| |
| long version = vinfo.getNewClock(); |
| cmd.setVersion(-version); |
| bucket.updateHighest(version); |
| } else { |
| cmd.setVersion(-versionOnUpdate); |
| |
| if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) { |
| // we're not in an active state, and this update isn't from a replay, so buffer it. |
| cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); |
| ulog.delete(cmd); |
| return true; |
| } |
| |
| // if we aren't the leader, then we need to check that updates were not re-ordered |
| if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { |
| // we're OK... this update has a version higher than anything we've seen |
| // in this bucket so far, so we know that no reordering has yet occured. |
| bucket.updateHighest(versionOnUpdate); |
| } else { |
| // there have been updates higher than the current update. we need to check |
| // the specific version for this id. |
| Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); |
| if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { |
| // This update is a repeat, or was reordered. We need to drop this update. |
| log.debug("Dropping delete update due to version {}", idBytes.utf8ToString()); |
| return true; |
| } |
| } |
| |
| if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { |
| cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); |
| } |
| } |
| } |
| |
| doLocalDelete(cmd); |
| return false; |
| } // end synchronized (bucket) |
| |
| } finally { |
| vinfo.unlockForUpdate(); |
| } |
| } |
| |
| @Override |
| public void processCommit(CommitUpdateCommand cmd) throws IOException { |
| |
| assert TestInjection.injectFailUpdateRequests(); |
| |
| updateCommand = cmd; |
| List<Node> nodes = null; |
| boolean singleLeader = false; |
| if (zkEnabled) { |
| zkCheck(); |
| |
| nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT)); |
| if (nodes == null) { |
| // This could happen if there are only pull replicas |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, |
| "Unable to distribute commit operation. No replicas available of types " + Replica.Type.TLOG + " or " + Replica.Type.NRT); |
| } |
| if (isLeader && nodes.size() == 1 && replicaType != Replica.Type.PULL) { |
| singleLeader = true; |
| } |
| } |
| |
| if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) { |
| if (replicaType == Replica.Type.TLOG) { |
| try { |
| Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry( |
| collection, cloudDesc.getShardId()); |
| isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName()); |
| if (isLeader) { |
| long commitVersion = vinfo.getNewClock(); |
| cmd.setVersion(commitVersion); |
| doLocalCommit(cmd); |
| } else { |
| assert TestInjection.waitForInSyncWithLeader(req.getCore(), |
| zkController, collection, cloudDesc.getShardId()): "Core " + req.getCore() + " not in sync with leader"; |
| } |
| } catch (InterruptedException e) { |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e); |
| } |
| } else if (replicaType == Replica.Type.PULL) { |
| log.warn("Commit not supported on replicas of type " + Replica.Type.PULL); |
| } else { |
| // NRT replicas will always commit |
| if (vinfo != null) { |
| long commitVersion = vinfo.getNewClock(); |
| cmd.setVersion(commitVersion); |
| } |
| doLocalCommit(cmd); |
| } |
| } else { |
| ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); |
| if (!req.getParams().getBool(COMMIT_END_POINT, false)) { |
| params.set(COMMIT_END_POINT, true); |
| params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); |
| params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( |
| zkController.getBaseUrl(), req.getCore().getName())); |
| if (nodes != null) { |
| cmdDistrib.distribCommit(cmd, nodes, params); |
| cmdDistrib.blockAndDoRetries(); |
| } |
| } |
| } |
| } |
| |
| private void doLocalCommit(CommitUpdateCommand cmd) throws IOException { |
| if (vinfo != null) { |
| vinfo.lockForUpdate(); |
| } |
| try { |
| |
| if (ulog == null || ulog.getState() == UpdateLog.State.ACTIVE || (cmd.getFlags() & UpdateCommand.REPLAY) != 0) { |
| super.processCommit(cmd); |
| } else { |
| log.info("Ignoring commit while not ACTIVE - state: " + ulog.getState() + " replay: " + ((cmd.getFlags() & UpdateCommand.REPLAY) != 0)); |
| } |
| |
| } finally { |
| if (vinfo != null) { |
| vinfo.unlockForUpdate(); |
| } |
| } |
| } |
| |
| @Override |
| public void finish() throws IOException { |
| assert ! finished : "lifecycle sanity check"; |
| finished = true; |
| |
| if (zkEnabled) doFinish(); |
| |
| if (next != null && nodes == null) next.finish(); |
| } |
| |
| private List<Node> getCollectionUrls(String collection, EnumSet<Replica.Type> types) { |
| ClusterState clusterState = zkController.getClusterState(); |
| final DocCollection docCollection = clusterState.getCollectionOrNull(collection); |
| if (collection == null || docCollection.getSlicesMap() == null) { |
| throw new ZooKeeperException(ErrorCode.BAD_REQUEST, |
| "Could not find collection in zk: " + clusterState); |
| } |
| Map<String,Slice> slices = docCollection.getSlicesMap(); |
| final List<Node> urls = new ArrayList<>(slices.size()); |
| for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) { |
| Slice replicas = slices.get(sliceEntry.getKey()); |
| |
| Map<String,Replica> shardMap = replicas.getReplicasMap(); |
| |
| for (Entry<String,Replica> entry : shardMap.entrySet()) { |
| if (!types.contains(entry.getValue().getType())) { |
| continue; |
| } |
| ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue()); |
| if (clusterState.liveNodesContain(nodeProps.getNodeName())) { |
| urls.add(new StdNode(nodeProps, collection, replicas.getName())); |
| } |
| } |
| } |
| if (urls.isEmpty()) { |
| return null; |
| } |
| return urls; |
| } |
| |
| /** |
| * Returns a boolean indicating whether or not the caller should behave as |
| * if this is the "leader" even when ZooKeeper is not enabled. |
| * (Even in non zk mode, tests may simulate updates to/from a leader) |
| */ |
| public static boolean getNonZkLeaderAssumption(SolrQueryRequest req) { |
| DistribPhase phase = |
| DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); |
| |
| // if we have been told we are coming from a leader, then we are |
| // definitely not the leader. Otherwise assume we are. |
| return DistribPhase.FROMLEADER != phase; |
| } |
| |
| public static final class DistributedUpdatesAsyncException extends SolrException { |
| public final List<Error> errors; |
| public DistributedUpdatesAsyncException(List<Error> errors) { |
| super(buildCode(errors), buildMsg(errors), null); |
| this.errors = errors; |
| |
| // create a merged copy of the metadata from all wrapped exceptions |
| NamedList<String> metadata = new NamedList<String>(); |
| for (Error error : errors) { |
| if (error.e instanceof SolrException) { |
| SolrException e = (SolrException) error.e; |
| NamedList<String> eMeta = e.getMetadata(); |
| if (null != eMeta) { |
| metadata.addAll(eMeta); |
| } |
| } |
| } |
| if (0 < metadata.size()) { |
| this.setMetadata(metadata); |
| } |
| } |
| |
| /** Helper method for constructor */ |
| private static int buildCode(List<Error> errors) { |
| assert null != errors; |
| assert 0 < errors.size(); |
| |
| int minCode = Integer.MAX_VALUE; |
| int maxCode = Integer.MIN_VALUE; |
| for (Error error : errors) { |
| log.trace("REMOTE ERROR: {}", error); |
| minCode = Math.min(error.statusCode, minCode); |
| maxCode = Math.max(error.statusCode, maxCode); |
| } |
| if (minCode == maxCode) { |
| // all codes are consistent, use that... |
| return minCode; |
| } else if (400 <= minCode && maxCode < 500) { |
| // all codes are 4xx, use 400 |
| return ErrorCode.BAD_REQUEST.code; |
| } |
| // ...otherwise use sensible default |
| return ErrorCode.SERVER_ERROR.code; |
| } |
| |
| /** Helper method for constructor */ |
| private static String buildMsg(List<Error> errors) { |
| assert null != errors; |
| assert 0 < errors.size(); |
| |
| if (1 == errors.size()) { |
| return "Async exception during distributed update: " + errors.get(0).e.getMessage(); |
| } else { |
| StringBuilder buf = new StringBuilder(errors.size() + " Async exceptions during distributed update: "); |
| for (Error error : errors) { |
| buf.append("\n"); |
| buf.append(error.e.getMessage()); |
| } |
| return buf.toString(); |
| } |
| } |
| } |
| |
| |
| // Keeps track of the replication factor achieved for a distributed update request |
| // originated in this distributed update processor. A RollupReplicationTracker is the only tracker that will |
| // persist across sub-requests. |
| // |
| // Note that the replica that receives the original request has the only RollupReplicationTracker that exists for the |
| // lifetime of the batch. The leader for each shard keeps track of its own achieved replication for its shard |
| // and attaches that to the response to the originating node (i.e. the one with the RollupReplicationTracker). |
| // Followers in general do not need a tracker of any sort with the sole exception of the RollupReplicationTracker |
| // allocated on the original node that receives the top-level request. |
| // |
| // DeleteById is tricky. Since the docs are sent one at a time, there has to be some fancy dancing. In the |
| // deleteById case, here are the rules: |
| // |
| // If I'm leader, there are two possibilities: |
| // 1> I got the original request. This is the hard one. There are two sub-cases: |
| // a> Some document in the request is deleted from the shard I lead. In this case my computed replication |
| // factor counts. |
| // b> No document in the packet is deleted from my shard. In that case I have nothing to say about the |
| // achieved replication factor. |
| // |
| // 2> I'm a leader and I got the request from some other replica. In this case I can be certain of a couple of things: |
| // a> The document in the request will be deleted from my shard |
| // b> my replication factor counts. |
| // |
| // Even the DeleteById case follows the rules for whether a RollupReplicaitonTracker is allocated. |
| // This doesn't matter when it comes to delete-by-query since all leaders get the sub-request. |
| |
| |
| public static class RollupRequestReplicationTracker { |
| |
| private int achievedRf = Integer.MAX_VALUE; |
| |
| public int getAchievedRf() { |
| return achievedRf; |
| } |
| |
| // We want to report only the minimun _ever_ achieved... |
| public void testAndSetAchievedRf(int rf) { |
| this.achievedRf = Math.min(this.achievedRf, rf); |
| } |
| |
| public String toString() { |
| StringBuilder sb = new StringBuilder("RollupRequestReplicationTracker") |
| .append(" achievedRf: ") |
| .append(achievedRf); |
| return sb.toString(); |
| } |
| } |
| |
| |
| // Allocate a LeaderRequestReplicatinTracker if (and only if) we're a leader. If the request comes in to the leader |
| // at first, allocate both one of these and a RollupRequestReplicationTracker. |
| // |
| // Since these are leader-only, all they really have to do is track the individual update request for this shard |
| // and return it to be added to the rollup tracker. Which is kind of simple since we get an onSuccess method in |
| // SolrCmdDistributor |
| |
| public static class LeaderRequestReplicationTracker { |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| // Since we only allocate one of these on the leader and, by definition, the leader has been found and is running, |
| // we have a replication factor of one by default. |
| private int achievedRf = 1; |
| |
| private final String myShardId; |
| |
| public LeaderRequestReplicationTracker(String shardId) { |
| this.myShardId = shardId; |
| } |
| |
| // gives the replication factor that was achieved for this request |
| public int getAchievedRf() { |
| return achievedRf; |
| } |
| |
| public void trackRequestResult(Node node, boolean success) { |
| if (log.isDebugEnabled()) { |
| log.debug("trackRequestResult({}): success? {}, shardId={}", node, success, myShardId); |
| } |
| |
| if (success) { |
| ++achievedRf; |
| } |
| } |
| |
| public String toString() { |
| StringBuilder sb = new StringBuilder("LeaderRequestReplicationTracker"); |
| sb.append(", achievedRf=") |
| .append(getAchievedRf()) |
| .append(" for shard ") |
| .append(myShardId); |
| return sb.toString(); |
| } |
| } |
| } |