| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.cloud; |
| |
| import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; |
| import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; |
| import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP; |
| import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP; |
| import static org.apache.solr.common.cloud.ZkStateReader.UNSUPPORTED_SOLR_XML; |
| import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE; |
| import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.lang.reflect.Array; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Supplier; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.cloud.SolrCloudManager; |
| import org.apache.solr.client.solrj.impl.CloudLegacySolrClient; |
| import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder; |
| import org.apache.solr.client.solrj.impl.SolrClientCloudManager; |
| import org.apache.solr.client.solrj.impl.SolrZkClientTimeout; |
| import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider; |
| import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState; |
| import org.apache.solr.cloud.overseer.ClusterStateMutator; |
| import org.apache.solr.cloud.overseer.OverseerAction; |
| import org.apache.solr.cloud.overseer.SliceMutator; |
| import org.apache.solr.common.AlreadyClosedException; |
| import org.apache.solr.common.MapWriter; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrException.ErrorCode; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.DefaultConnectionStrategy; |
| import org.apache.solr.common.cloud.DefaultZkACLProvider; |
| import org.apache.solr.common.cloud.DefaultZkCredentialsInjector; |
| import org.apache.solr.common.cloud.DefaultZkCredentialsProvider; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.DocCollectionWatcher; |
| import org.apache.solr.common.cloud.LiveNodesListener; |
| import org.apache.solr.common.cloud.NodesSysPropsCacher; |
| import org.apache.solr.common.cloud.OnReconnect; |
| import org.apache.solr.common.cloud.PerReplicaStates; |
| import org.apache.solr.common.cloud.PerReplicaStatesOps; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.SecurityAwareZkACLProvider; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.cloud.SolrZkClient; |
| import org.apache.solr.common.cloud.ZkACLProvider; |
| import org.apache.solr.common.cloud.ZkClientConnectionStrategy; |
| import org.apache.solr.common.cloud.ZkCoreNodeProps; |
| import org.apache.solr.common.cloud.ZkCredentialsInjector; |
| import org.apache.solr.common.cloud.ZkCredentialsProvider; |
| import org.apache.solr.common.cloud.ZkMaintenanceUtils; |
| import org.apache.solr.common.cloud.ZkNodeProps; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.cloud.ZooKeeperException; |
| import org.apache.solr.common.params.CommonParams; |
| import org.apache.solr.common.params.SolrParams; |
| import org.apache.solr.common.util.Compressor; |
| import org.apache.solr.common.util.ExecutorUtil; |
| import org.apache.solr.common.util.IOUtils; |
| import org.apache.solr.common.util.ObjectReleaseTracker; |
| import org.apache.solr.common.util.SolrNamedThreadFactory; |
| import org.apache.solr.common.util.StrUtils; |
| import org.apache.solr.common.util.URLUtil; |
| import org.apache.solr.common.util.Utils; |
| import org.apache.solr.common.util.ZLibCompressor; |
| import org.apache.solr.core.CloseHook; |
| import org.apache.solr.core.CloudConfig; |
| import org.apache.solr.core.CoreContainer; |
| import org.apache.solr.core.CoreDescriptor; |
| import org.apache.solr.core.NodeRoles; |
| import org.apache.solr.core.SolrCore; |
| import org.apache.solr.core.SolrCoreInitializationException; |
| import org.apache.solr.handler.component.HttpShardHandler; |
| import org.apache.solr.handler.component.HttpShardHandlerFactory; |
| import org.apache.solr.logging.MDCLoggingContext; |
| import org.apache.solr.search.SolrIndexSearcher; |
| import org.apache.solr.update.UpdateLog; |
| import org.apache.solr.util.AddressUtils; |
| import org.apache.solr.util.RTimer; |
| import org.apache.solr.util.RefCounted; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.NoNodeException; |
| import org.apache.zookeeper.KeeperException.SessionExpiredException; |
| import org.apache.zookeeper.Op; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Handle ZooKeeper interactions. |
| * |
| * <p>notes: loads everything on init, creates what's not there - further updates are prompted with |
| * Watches. |
| * |
| * <p>TODO: exceptions during close on attempts to update cloud state |
| */ |
| public class ZkController implements Closeable { |
| |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| static final int WAIT_DOWN_STATES_TIMEOUT_SECONDS = 60; |
| |
| private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery"); |
| |
| private final ZkDistributedQueue overseerJobQueue; |
| private final OverseerTaskQueue overseerCollectionQueue; |
| private final OverseerTaskQueue overseerConfigSetQueue; |
| |
| private final DistributedMap overseerRunningMap; |
| private final DistributedMap overseerCompletedMap; |
| private final DistributedMap overseerFailureMap; |
| private final DistributedMap asyncIdsMap; |
| |
| public static final String COLLECTION_PARAM_PREFIX = "collection."; |
| public static final String CONFIGNAME_PROP = "configName"; |
| |
| public static final byte[] TOUCHED_ZNODE_DATA = "{}".getBytes(StandardCharsets.UTF_8); |
| |
| static class ContextKey { |
| |
| private final String collection; |
| private final String coreNodeName; |
| |
| public ContextKey(String collection, String coreNodeName) { |
| this.collection = collection; |
| this.coreNodeName = coreNodeName; |
| } |
| |
| @Override |
| public int hashCode() { |
| final int prime = 31; |
| int result = 1; |
| result = prime * result + ((collection == null) ? 0 : collection.hashCode()); |
| result = prime * result + ((coreNodeName == null) ? 0 : coreNodeName.hashCode()); |
| return result; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) return true; |
| if (!(obj instanceof ContextKey)) return false; |
| ContextKey other = (ContextKey) obj; |
| return Objects.equals(collection, other.collection) |
| && Objects.equals(coreNodeName, other.coreNodeName); |
| } |
| |
| @Override |
| public String toString() { |
| return collection + ':' + coreNodeName; |
| } |
| } |
| |
| private final Map<ContextKey, ElectionContext> electionContexts = |
| Collections.synchronizedMap(new HashMap<>()); |
| |
| private final SolrZkClient zkClient; |
| public final ZkStateReader zkStateReader; |
| private SolrCloudManager cloudManager; |
| private CloudLegacySolrClient cloudSolrClient; |
| |
| private final String zkServerAddress; // example: 127.0.0.1:54062/solr |
| |
| private final int localHostPort; // example: 54065 |
| private final String hostName; // example: 127.0.0.1 |
| private final String nodeName; // example: 127.0.0.1:54065_solr |
| private String baseURL; // example: http://127.0.0.1:54065/solr |
| |
| private final CloudConfig cloudConfig; |
| private final NodesSysPropsCacher sysPropsCacher; |
| |
| private final DistributedClusterStateUpdater distributedClusterStateUpdater; |
| |
| private LeaderElector overseerElector; |
| |
| private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>(); |
| private final Map<String, ZkCollectionTerms> collectionToTerms = new HashMap<>(); |
| |
| // for now, this can be null in tests, in which case recovery will be inactive, and other features |
| // may accept defaults or use mocks rather than pulling things from a CoreContainer |
| private CoreContainer cc; |
| |
| protected volatile Overseer overseer; |
| |
| private int leaderVoteWait; |
| private int leaderConflictResolveWait; |
| |
| private boolean genericCoreNodeNames; |
| |
| private volatile boolean isClosed; |
| |
| private final ConcurrentHashMap<String, Throwable> replicasMetTragicEvent = |
| new ConcurrentHashMap<>(); |
| |
| @Deprecated |
| // keeps track of replicas that have been asked to recover by leaders running on this node |
| private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<>(); |
| |
| // This is an expert and unsupported development mode that does not create |
| // an Overseer or register a /live node. This let's you monitor the cluster |
| // and interact with zookeeper via the Solr admin UI on a node outside the cluster, |
| // and so one that will not be killed or stopped when testing. See developer cloud-scripts. |
| private boolean zkRunOnly = Boolean.getBoolean("zkRunOnly"); // expert |
| |
| // keeps track of a list of objects that need to know a new ZooKeeper session was created after |
| // expiration occurred ref is held as a HashSet since we clone the set before notifying to avoid |
| // synchronizing too long |
| private HashSet<OnReconnect> reconnectListeners = new HashSet<>(); |
| |
| private class RegisterCoreAsync implements Callable<Object> { |
| |
| CoreDescriptor descriptor; |
| boolean recoverReloadedCores; |
| boolean afterExpiration; |
| |
| RegisterCoreAsync( |
| CoreDescriptor descriptor, boolean recoverReloadedCores, boolean afterExpiration) { |
| this.descriptor = descriptor; |
| this.recoverReloadedCores = recoverReloadedCores; |
| this.afterExpiration = afterExpiration; |
| } |
| |
| @Override |
| public Object call() throws Exception { |
| if (log.isInfoEnabled()) { |
| log.info("Registering core {} afterExpiration? {}", descriptor.getName(), afterExpiration); |
| } |
| register(descriptor.getName(), descriptor, recoverReloadedCores, afterExpiration, false); |
| return descriptor; |
| } |
| } |
| |
| // notifies registered listeners after the ZK reconnect in the background |
| private static class OnReconnectNotifyAsync implements Callable<Object> { |
| |
| private final OnReconnect listener; |
| |
| OnReconnectNotifyAsync(OnReconnect listener) { |
| this.listener = listener; |
| } |
| |
| @Override |
| public Object call() throws Exception { |
| listener.command(); |
| return null; |
| } |
| } |
| |
| /** |
| * @param cc Core container associated with this controller. cannot be null. |
| * @param zkServerAddress where to connect to the zk server |
| * @param zkClientConnectTimeout timeout in ms |
| * @param cloudConfig configuration for this controller. TODO: possibly redundant with |
| * CoreContainer |
| * @param descriptorsSupplier a supplier of the current core descriptors. used to know which cores |
| * to re-register on reconnect |
| */ |
| @SuppressWarnings({"unchecked"}) |
| public ZkController( |
| final CoreContainer cc, |
| String zkServerAddress, |
| int zkClientConnectTimeout, |
| CloudConfig cloudConfig, |
| final Supplier<List<CoreDescriptor>> descriptorsSupplier) |
| throws InterruptedException, TimeoutException, IOException { |
| |
| if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null."); |
| this.cc = cc; |
| |
| this.cloudConfig = cloudConfig; |
| |
| // Use the configured way to do cluster state update (Overseer queue vs distributed) |
| distributedClusterStateUpdater = |
| new DistributedClusterStateUpdater(cloudConfig.getDistributedClusterStateUpdates()); |
| |
| this.genericCoreNodeNames = cloudConfig.getGenericCoreNodeNames(); |
| |
| this.zkServerAddress = zkServerAddress; |
| this.localHostPort = cloudConfig.getSolrHostPort(); |
| this.hostName = normalizeHostName(cloudConfig.getHost()); |
| this.nodeName = generateNodeName(this.hostName, Integer.toString(this.localHostPort)); |
| MDCLoggingContext.setNode(nodeName); |
| this.leaderVoteWait = cloudConfig.getLeaderVoteWait(); |
| this.leaderConflictResolveWait = cloudConfig.getLeaderConflictResolveWait(); |
| |
| int clientTimeout = cloudConfig.getZkClientTimeout(); |
| |
| String connectionStrategy = System.getProperty("solr.zookeeper.connectionStrategy"); |
| ZkClientConnectionStrategy strat = |
| ZkClientConnectionStrategy.forName(connectionStrategy, new DefaultConnectionStrategy()); |
| |
| String zkCredentialsInjectorClass = cloudConfig.getZkCredentialsInjectorClass(); |
| ZkCredentialsInjector zkCredentialsInjector = |
| StrUtils.isNullOrEmpty(zkCredentialsInjectorClass) |
| ? new DefaultZkCredentialsInjector() |
| : cc.getResourceLoader() |
| .newInstance(zkCredentialsInjectorClass, ZkCredentialsInjector.class); |
| |
| String zkACLProviderClass = cloudConfig.getZkACLProviderClass(); |
| ZkACLProvider zkACLProvider = |
| StrUtils.isNullOrEmpty(zkACLProviderClass) |
| ? new DefaultZkACLProvider() |
| : cc.getResourceLoader().newInstance(zkACLProviderClass, ZkACLProvider.class); |
| zkACLProvider.setZkCredentialsInjector(zkCredentialsInjector); |
| |
| String zkCredentialsProviderClass = cloudConfig.getZkCredentialsProviderClass(); |
| ZkCredentialsProvider zkCredentialsProvider = |
| StrUtils.isNullOrEmpty(zkCredentialsProviderClass) |
| ? new DefaultZkCredentialsProvider() |
| : cc.getResourceLoader() |
| .newInstance(zkCredentialsProviderClass, ZkCredentialsProvider.class); |
| |
| zkCredentialsProvider.setZkCredentialsInjector(zkCredentialsInjector); |
| strat.setZkCredentialsToAddAutomatically(zkCredentialsProvider); |
| addOnReconnectListener(getConfigDirListener()); |
| |
| String stateCompressionProviderClass = cloudConfig.getStateCompressorClass(); |
| Compressor compressor = |
| StrUtils.isNullOrEmpty(stateCompressionProviderClass) |
| ? new ZLibCompressor() |
| : cc.getResourceLoader().newInstance(stateCompressionProviderClass, Compressor.class); |
| |
| zkClient = |
| new SolrZkClient.Builder() |
| .withUrl(zkServerAddress) |
| .withTimeout(clientTimeout, TimeUnit.MILLISECONDS) |
| .withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS) |
| .withConnStrategy(strat) |
| .withReconnectListener(() -> onReconnect(descriptorsSupplier)) |
| .withBeforeConnect(() -> beforeReconnect(descriptorsSupplier)) |
| .withAclProvider(zkACLProvider) |
| .withClosedCheck(cc::isShutDown) |
| .withCompressor(compressor) |
| .build(); |
| // Refuse to start if ZK has a non empty /clusterstate.json or a /solr.xml file |
| checkNoOldClusterstate(zkClient); |
| |
| this.overseerRunningMap = Overseer.getRunningMap(zkClient); |
| this.overseerCompletedMap = Overseer.getCompletedMap(zkClient); |
| this.overseerFailureMap = Overseer.getFailureMap(zkClient); |
| this.asyncIdsMap = Overseer.getAsyncIdsMap(zkClient); |
| |
| zkStateReader = |
| new ZkStateReader( |
| zkClient, |
| () -> { |
| if (cc != null) cc.securityNodeChanged(); |
| }); |
| |
| init(); |
| |
| if (distributedClusterStateUpdater.isDistributedStateUpdate()) { |
| this.overseerJobQueue = null; |
| } else { |
| this.overseerJobQueue = overseer.getStateUpdateQueue(); |
| } |
| this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient); |
| this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient); |
| this.sysPropsCacher = |
| new NodesSysPropsCacher( |
| ((HttpShardHandlerFactory) getCoreContainer().getShardHandlerFactory()).getClient(), |
| zkStateReader); |
| assert ObjectReleaseTracker.track(this); |
| } |
| |
| private void beforeReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier) { |
| try { |
| overseer.close(); |
| } catch (Exception e) { |
| log.error("Error trying to stop any Overseer threads", e); |
| } |
| closeOutstandingElections(descriptorsSupplier); |
| markAllAsNotLeader(descriptorsSupplier); |
| } |
| |
| private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier) |
| throws SessionExpiredException { |
| // on reconnect, reload cloud info |
| log.info("ZooKeeper session re-connected ... refreshing core states after session expiration."); |
| clearZkCollectionTerms(); |
| try { |
| // recreate our watchers first so that they exist even on any problems below |
| zkStateReader.createClusterStateWatchersAndUpdate(); |
| |
| // this is troublesome - we don't want to kill anything the old |
| // leader accepted |
| // though I guess sync will likely get those updates back? But |
| // only if |
| // he is involved in the sync, and he certainly may not be |
| // ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor()); |
| // we need to create all of our lost watches |
| |
| // seems we don't need to do this again... |
| // Overseer.createClientNodes(zkClient, getNodeName()); |
| |
| // start the overseer first as following code may need it's processing |
| if (!zkRunOnly) { |
| ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName()); |
| |
| ElectionContext prevContext = overseerElector.getContext(); |
| if (prevContext != null) { |
| prevContext.cancelElection(); |
| prevContext.close(); |
| } |
| |
| overseerElector.setup(context); |
| |
| if (cc.nodeRoles.isOverseerAllowedOrPreferred()) { |
| overseerElector.joinElection(context, true); |
| } |
| } |
| |
| cc.cancelCoreRecoveries(); |
| |
| try { |
| registerAllCoresAsDown(descriptorsSupplier, false); |
| } catch (SessionExpiredException e) { |
| // zk has to reconnect and this will all be tried again |
| throw e; |
| } catch (Exception e) { |
| // this is really best effort - in case of races or failure cases where we now |
| // need to be the leader, if anything fails, just continue |
| log.warn("Exception while trying to register all cores as DOWN", e); |
| } |
| |
| // we have to register as live first to pick up docs in the buffer |
| createEphemeralLiveNode(); |
| |
| List<CoreDescriptor> descriptors = descriptorsSupplier.get(); |
| // re register all descriptors |
| ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null; |
| if (descriptors != null) { |
| for (CoreDescriptor descriptor : descriptors) { |
| // TODO: we need to think carefully about what happens when it was a leader |
| // that was expired - as well as what to do about leaders/overseers with |
| // connection loss |
| try { |
| // unload solr cores that have been 'failed over' |
| throwErrorIfReplicaReplaced(descriptor); |
| |
| if (executorService != null) { |
| executorService.submit(new RegisterCoreAsync(descriptor, true, true)); |
| } else { |
| register(descriptor.getName(), descriptor, true, true, false); |
| } |
| } catch (Exception e) { |
| log.error("Error registering SolrCore", e); |
| } |
| } |
| } |
| |
| // notify any other objects that need to know when the session was re-connected |
| HashSet<OnReconnect> clonedListeners; |
| synchronized (reconnectListeners) { |
| clonedListeners = new HashSet<>(reconnectListeners); |
| } |
| // the OnReconnect operation can be expensive per listener, so do that async in |
| // the background |
| for (OnReconnect listener : clonedListeners) { |
| try { |
| if (executorService != null) { |
| executorService.submit(new OnReconnectNotifyAsync(listener)); |
| } else { |
| listener.command(); |
| } |
| } catch (Exception exc) { |
| // not much we can do here other than warn in the log |
| log.warn( |
| "Error when notifying OnReconnect listener {} after session re-connected.", |
| listener, |
| exc); |
| } |
| } |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); |
| } catch (SessionExpiredException e) { |
| throw e; |
| } catch (Exception e) { |
| log.error("Exception during reconnect", e); |
| throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); |
| } |
| } |
| |
| /** |
| * Verifies if /clusterstate.json exists in Zookeepeer, and if it does and is not empty, refuses |
| * to start and outputs a helpful message regarding collection migration. Also aborts if /solr.xml |
| * exists in zookeeper. |
| * |
| * <p>If /clusterstate.json exists and is empty, it is removed. |
| */ |
| private void checkNoOldClusterstate(final SolrZkClient zkClient) throws InterruptedException { |
| try { |
| if (zkClient.exists(UNSUPPORTED_SOLR_XML, true)) { |
| String message = |
| "solr.xml found in ZooKeeper. Loading solr.xml from ZooKeeper is no longer supported since Solr 10. " |
| + "Cannot start Solr. The file can be removed with command bin/solr zk rm /solr.xml -z host:port"; |
| log.error(message); |
| throw new SolrException(ErrorCode.INVALID_STATE, message); |
| } |
| if (!zkClient.exists(ZkStateReader.UNSUPPORTED_CLUSTER_STATE, true)) { |
| return; |
| } |
| final byte[] data = |
| zkClient.getData(ZkStateReader.UNSUPPORTED_CLUSTER_STATE, null, null, true); |
| |
| if (Arrays.equals("{}".getBytes(StandardCharsets.UTF_8), data)) { |
| // Empty json. This log will only occur once. |
| log.warn( |
| "{} no longer supported starting with Solr 9. Found empty file on Zookeeper, deleting it.", |
| ZkStateReader.UNSUPPORTED_CLUSTER_STATE); |
| zkClient.delete(ZkStateReader.UNSUPPORTED_CLUSTER_STATE, -1, true); |
| } else { |
| // /clusterstate.json not empty: refuse to start but do not automatically delete. A bit of a |
| // pain but user shouldn't have older collections at this stage anyway. |
| String message = |
| ZkStateReader.UNSUPPORTED_CLUSTER_STATE |
| + " no longer supported starting with Solr 9. " |
| + "It is present and not empty. Cannot start Solr. Please first migrate collections to stateFormat=2 using an " |
| + "older version of Solr or if you don't care about the data then delete the file from " |
| + "Zookeeper using a command line tool, for example: bin/solr zk rm /clusterstate.json -z host:port"; |
| log.error(message); |
| throw new SolrException(SolrException.ErrorCode.INVALID_STATE, message); |
| } |
| } catch (KeeperException.NoNodeException e) { |
| // N instances starting at the same time could attempt to delete the file, resulting in N-1 |
| // NoNodeExceptions. |
| // If we get to this point, then it's OK to suppress the exception and continue assuming |
| // success. |
| log.debug( |
| "NoNodeException attempting to delete {}. Another instance must have deleted it already", |
| ZkStateReader.UNSUPPORTED_CLUSTER_STATE, |
| e); |
| } catch (KeeperException e) { |
| // Convert checked exception to one acceptable by the caller (see also init() further down) |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } |
| } |
| |
| public int getLeaderVoteWait() { |
| return leaderVoteWait; |
| } |
| |
| public int getLeaderConflictResolveWait() { |
| return leaderConflictResolveWait; |
| } |
| |
| private void registerAllCoresAsDown( |
| final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean updateLastPublished) |
| throws SessionExpiredException { |
| List<CoreDescriptor> descriptors = registerOnReconnect.get(); |
| if (isClosed) return; |
| if (descriptors != null) { |
| // before registering as live, make sure everyone is in a |
| // down state |
| publishNodeAsDown(getNodeName()); |
| for (CoreDescriptor descriptor : descriptors) { |
| // if it looks like we are going to be the leader, we don't |
| // want to wait for the following stuff |
| CloudDescriptor cloudDesc = descriptor.getCloudDescriptor(); |
| String collection = cloudDesc.getCollectionName(); |
| String slice = cloudDesc.getShardId(); |
| try { |
| |
| int children = |
| zkStateReader |
| .getZkClient() |
| .getChildren( |
| ZkStateReader.COLLECTIONS_ZKNODE |
| + "/" |
| + collection |
| + "/leader_elect/" |
| + slice |
| + "/election", |
| null, |
| true) |
| .size(); |
| if (children == 0) { |
| log.debug( |
| "looks like we are going to be the leader for collection {} shard {}", |
| collection, |
| slice); |
| continue; |
| } |
| |
| } catch (NoNodeException e) { |
| log.debug( |
| "looks like we are going to be the leader for collection {} shard {}", |
| collection, |
| slice); |
| continue; |
| } catch (InterruptedException e2) { |
| Thread.currentThread().interrupt(); |
| } catch (SessionExpiredException e) { |
| // zk has to reconnect |
| throw e; |
| } catch (KeeperException e) { |
| log.warn("", e); |
| Thread.currentThread().interrupt(); |
| } |
| |
| final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName(); |
| try { |
| log.debug( |
| "calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}", |
| coreZkNodeName, |
| collection, |
| slice); |
| waitForLeaderToSeeDownState(descriptor, coreZkNodeName); |
| } catch (Exception e) { |
| log.warn( |
| "There was a problem while making a best effort to ensure the leader has seen us as down, this is not unexpected as Zookeeper has just reconnected after a session expiration", |
| e); |
| if (isClosed) { |
| return; |
| } |
| } |
| } |
| } |
| } |
| |
| public NodesSysPropsCacher getSysPropsCacher() { |
| return sysPropsCacher; |
| } |
| |
| private void closeOutstandingElections(final Supplier<List<CoreDescriptor>> registerOnReconnect) { |
| List<CoreDescriptor> descriptors = registerOnReconnect.get(); |
| if (descriptors != null) { |
| for (CoreDescriptor descriptor : descriptors) { |
| closeExistingElectionContext(descriptor); |
| } |
| } |
| } |
| |
| private ContextKey closeExistingElectionContext(CoreDescriptor cd) { |
| // look for old context - if we find it, cancel it |
| String collection = cd.getCloudDescriptor().getCollectionName(); |
| final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); |
| |
| ContextKey contextKey = new ContextKey(collection, coreNodeName); |
| ElectionContext prevContext = electionContexts.get(contextKey); |
| |
| if (prevContext != null) { |
| prevContext.close(); |
| electionContexts.remove(contextKey); |
| } |
| |
| return contextKey; |
| } |
| |
| private void markAllAsNotLeader(final Supplier<List<CoreDescriptor>> registerOnReconnect) { |
| List<CoreDescriptor> descriptors = registerOnReconnect.get(); |
| if (descriptors != null) { |
| for (CoreDescriptor descriptor : descriptors) { |
| descriptor.getCloudDescriptor().setLeader(false); |
| descriptor.getCloudDescriptor().setHasRegistered(false); |
| } |
| } |
| } |
| |
| public void preClose() { |
| this.isClosed = true; |
| |
| try { |
| this.removeEphemeralLiveNode(); |
| } catch (AlreadyClosedException |
| | SessionExpiredException |
| | KeeperException.ConnectionLossException e) { |
| |
| } catch (Exception e) { |
| log.warn("Error removing live node. Continuing to close CoreContainer", e); |
| } |
| |
| try { |
| if (getZkClient().getConnectionManager().isConnected()) { |
| log.info("Publish this node as DOWN..."); |
| publishNodeAsDown(getNodeName()); |
| } |
| } catch (Exception e) { |
| log.warn("Error publishing nodes as down. Continuing to close CoreContainer", e); |
| } |
| |
| ExecutorService customThreadPool = |
| ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("preCloseThreadPool")); |
| |
| try { |
| synchronized (collectionToTerms) { |
| customThreadPool.submit( |
| () -> collectionToTerms.values().parallelStream().forEach(ZkCollectionTerms::close)); |
| } |
| |
| customThreadPool.submit( |
| () -> |
| replicateFromLeaders.values().parallelStream() |
| .forEach(ReplicateFromLeader::stopReplication)); |
| } finally { |
| ExecutorUtil.shutdownAndAwaitTermination(customThreadPool); |
| } |
| } |
| |
| /** Closes the underlying ZooKeeper client. */ |
| @Override |
| public void close() { |
| if (!this.isClosed) preClose(); |
| |
| ExecutorService customThreadPool = |
| ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("closeThreadPool")); |
| |
| customThreadPool.submit(() -> IOUtils.closeQuietly(overseerElector.getContext())); |
| |
| customThreadPool.submit(() -> IOUtils.closeQuietly(overseer)); |
| |
| try { |
| customThreadPool.submit( |
| () -> { |
| Collection<ElectionContext> values = electionContexts.values(); |
| synchronized (electionContexts) { |
| values.forEach(IOUtils::closeQuietly); |
| } |
| }); |
| |
| } finally { |
| |
| sysPropsCacher.close(); |
| customThreadPool.submit(() -> IOUtils.closeQuietly(cloudSolrClient)); |
| customThreadPool.submit(() -> IOUtils.closeQuietly(cloudManager)); |
| |
| try { |
| try { |
| zkStateReader.close(); |
| } catch (Exception e) { |
| log.error("Error closing zkStateReader", e); |
| } |
| } finally { |
| try { |
| zkClient.close(); |
| } catch (Exception e) { |
| log.error("Error closing zkClient", e); |
| } finally { |
| |
| // just in case the OverseerElectionContext managed to start another Overseer |
| IOUtils.closeQuietly(overseer); |
| |
| ExecutorUtil.shutdownAndAwaitTermination(customThreadPool); |
| } |
| } |
| } |
| assert ObjectReleaseTracker.release(this); |
| } |
| |
| /** |
| * Best effort to give up the leadership of a shard in a core after hitting a tragic exception |
| * |
| * @param cd The current core descriptor |
| */ |
| public void giveupLeadership(CoreDescriptor cd) { |
| assert cd != null; |
| |
| String collection = cd.getCollectionName(); |
| if (collection == null) return; |
| |
| DocCollection dc = getClusterState().getCollectionOrNull(collection); |
| if (dc == null) return; |
| |
| Slice shard = dc.getSlice(cd.getCloudDescriptor().getShardId()); |
| if (shard == null) return; |
| |
| // if this replica is not a leader, it will be put in recovery state by the leader |
| String leader = cd.getCloudDescriptor().getCoreNodeName(); |
| if (!Objects.equals(shard.getReplica(leader), shard.getLeader())) return; |
| |
| Set<String> liveNodes = getClusterState().getLiveNodes(); |
| int numActiveReplicas = |
| shard |
| .getReplicas( |
| rep -> |
| rep.getState() == Replica.State.ACTIVE |
| && rep.getType().leaderEligible |
| && liveNodes.contains(rep.getNodeName())) |
| .size(); |
| |
| // at least the leader still be able to search, we should give up leadership if other replicas |
| // can take over |
| if (numActiveReplicas >= 2) { |
| ContextKey key = new ContextKey(collection, leader); |
| ElectionContext context = electionContexts.get(key); |
| if (context instanceof ShardLeaderElectionContextBase) { |
| LeaderElector elector = ((ShardLeaderElectionContextBase) context).getLeaderElector(); |
| try { |
| log.warn("Leader {} met tragic exception, give up its leadership", key); |
| elector.retryElection(context, false); |
| } catch (KeeperException | InterruptedException e) { |
| SolrZkClient.checkInterrupted(e); |
| log.error("Met exception on give up leadership for {}", key, e); |
| } |
| } else { |
| // The node is probably already gone |
| log.warn("Could not get election context {} to give up leadership", key); |
| } |
| } |
| } |
| |
| /** |
| * @return information about the cluster from ZooKeeper |
| */ |
| public ClusterState getClusterState() { |
| return zkStateReader.getClusterState(); |
| } |
| |
| public DistributedClusterStateUpdater getDistributedClusterStateUpdater() { |
| return distributedClusterStateUpdater; |
| } |
| |
| public SolrCloudManager getSolrCloudManager() { |
| if (cloudManager != null) { |
| return cloudManager; |
| } |
| synchronized (this) { |
| if (cloudManager != null) { |
| return cloudManager; |
| } |
| cloudSolrClient = |
| new CloudLegacySolrClient.Builder(new ZkClientClusterStateProvider(zkStateReader)) |
| .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient()) |
| .withConnectionTimeout(15000, TimeUnit.MILLISECONDS) |
| .withSocketTimeout(30000, TimeUnit.MILLISECONDS) |
| .build(); |
| cloudManager = new SolrClientCloudManager(cloudSolrClient, cc.getObjectCache()); |
| cloudManager.getClusterStateProvider().connect(); |
| } |
| return cloudManager; |
| } |
| |
| // normalize host removing any url scheme. |
| // input can be null, host, or url_prefix://host |
| private String normalizeHostName(String host) { |
| if (host == null || host.length() == 0) { |
| host = AddressUtils.getHostToAdvertise(); |
| } else { |
| if (URLUtil.hasScheme(host)) { |
| host = URLUtil.removeScheme(host); |
| } |
| } |
| |
| return host; |
| } |
| |
| public String getHostName() { |
| return hostName; |
| } |
| |
| public int getHostPort() { |
| return localHostPort; |
| } |
| |
| public SolrZkClient getZkClient() { |
| return zkClient; |
| } |
| |
| /** |
| * @return zookeeper server address |
| */ |
| public String getZkServerAddress() { |
| return zkServerAddress; |
| } |
| |
| boolean isClosed() { |
| return isClosed; |
| } |
| |
| /** |
| * Create the zknodes necessary for a cluster to operate |
| * |
| * @param zkClient a SolrZkClient |
| * @throws KeeperException if there is a Zookeeper error |
| * @throws InterruptedException on interrupt |
| */ |
| public static void createClusterZkNodes(SolrZkClient zkClient) |
| throws KeeperException, InterruptedException, IOException { |
| ZkMaintenanceUtils.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient); |
| ZkMaintenanceUtils.ensureExists(ZkStateReader.NODE_ROLES, zkClient); |
| for (NodeRoles.Role role : NodeRoles.Role.values()) { |
| ZkMaintenanceUtils.ensureExists(NodeRoles.getZNodeForRole(role), zkClient); |
| for (String mode : role.supportedModes()) { |
| ZkMaintenanceUtils.ensureExists(NodeRoles.getZNodeForRoleMode(role, mode), zkClient); |
| } |
| } |
| |
| ZkMaintenanceUtils.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient); |
| ZkMaintenanceUtils.ensureExists(ZkStateReader.ALIASES, zkClient); |
| byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8); |
| ZkMaintenanceUtils.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, zkClient); |
| repairSecurityJson(zkClient); |
| } |
| |
| private static void repairSecurityJson(SolrZkClient zkClient) |
| throws KeeperException, InterruptedException { |
| List<ACL> securityConfAcl = zkClient.getACL(ZkStateReader.SOLR_SECURITY_CONF_PATH, null, true); |
| ZkACLProvider aclProvider = zkClient.getZkACLProvider(); |
| |
| boolean tryUpdate = false; |
| |
| if (OPEN_ACL_UNSAFE.equals(securityConfAcl)) { |
| List<ACL> aclToAdd = aclProvider.getACLsToAdd(ZkStateReader.SOLR_SECURITY_CONF_PATH); |
| if (OPEN_ACL_UNSAFE.equals(aclToAdd)) { |
| log.warn( |
| "Contents of zookeeper /security.json are world-readable;" |
| + " consider setting up ACLs as described in https://solr.apache.org/guide/solr/latest/deployment-guide/zookeeper-access-control.html"); |
| } else { |
| tryUpdate = true; |
| } |
| } else if (aclProvider instanceof SecurityAwareZkACLProvider) { |
| // Use Set to explicitly ignore order |
| Set<ACL> nonSecureACL = new HashSet<>(aclProvider.getACLsToAdd(null)); |
| // case where security.json was not treated as a secure path |
| if (nonSecureACL.equals(new HashSet<>(securityConfAcl))) { |
| tryUpdate = true; |
| } |
| } |
| |
| if (tryUpdate) { |
| if (Boolean.getBoolean("solr.security.aclautorepair.disable")) { |
| log.warn( |
| "Detected inconsistent ACLs for zookeeper /security.json, but self-repair is disabled."); |
| } else { |
| log.info("Detected inconsistent ACLs for zookeeper /security.json, attempting to repair."); |
| zkClient.updateACLs(ZkStateReader.SOLR_SECURITY_CONF_PATH); |
| } |
| } |
| } |
| |
| private void init() { |
| try { |
| createClusterZkNodes(zkClient); |
| zkStateReader.createClusterStateWatchersAndUpdate(); |
| |
| // note: Can't read cluster properties until createClusterState ^ is called |
| final String urlSchemeFromClusterProp = |
| zkStateReader.getClusterProperty(ZkStateReader.URL_SCHEME, ZkStateReader.HTTP); |
| |
| // this must happen after zkStateReader has initialized the cluster props |
| this.baseURL = Utils.getBaseUrlForNodeName(this.nodeName, urlSchemeFromClusterProp); |
| |
| checkForExistingEphemeralNode(); |
| registerLiveNodesListener(); |
| |
| // start the overseer first as following code may need it's processing |
| if (!zkRunOnly) { |
| overseerElector = new LeaderElector(zkClient); |
| this.overseer = |
| new Overseer( |
| (HttpShardHandler) cc.getShardHandlerFactory().getShardHandler(), |
| cc.getUpdateShardHandler(), |
| CommonParams.CORES_HANDLER_PATH, |
| zkStateReader, |
| this, |
| cloudConfig); |
| ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName()); |
| overseerElector.setup(context); |
| if (cc.nodeRoles.isOverseerAllowedOrPreferred()) { |
| overseerElector.joinElection(context, false); |
| } |
| } |
| |
| Stat stat = zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, null, true); |
| if (stat != null && stat.getNumChildren() > 0) { |
| publishAndWaitForDownStates(); |
| } |
| |
| // Do this last to signal we're up. |
| createEphemeralLiveNode(); |
| } catch (IOException e) { |
| log.error("", e); |
| throw new SolrException( |
| SolrException.ErrorCode.SERVER_ERROR, "Can't create ZooKeeperController", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } catch (KeeperException e) { |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } |
| } |
| |
| private void checkForExistingEphemeralNode() throws KeeperException, InterruptedException { |
| if (zkRunOnly) { |
| return; |
| } |
| String nodeName = getNodeName(); |
| String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName; |
| |
| if (!zkClient.exists(nodePath, true)) { |
| return; |
| } |
| |
| final CountDownLatch deletedLatch = new CountDownLatch(1); |
| Stat stat = |
| zkClient.exists( |
| nodePath, |
| event -> { |
| if (Watcher.Event.EventType.None.equals(event.getType())) { |
| return; |
| } |
| if (Watcher.Event.EventType.NodeDeleted.equals(event.getType())) { |
| deletedLatch.countDown(); |
| } |
| }, |
| true); |
| |
| if (stat == null) { |
| // znode suddenly disappeared but that's okay |
| return; |
| } |
| |
| boolean deleted = |
| deletedLatch.await(zkClient.getZooKeeper().getSessionTimeout() * 2L, TimeUnit.MILLISECONDS); |
| if (!deleted) { |
| throw new SolrException( |
| ErrorCode.SERVER_ERROR, |
| "A previous ephemeral live node still exists. " |
| + "Solr cannot continue. Please ensure that no other Solr process using the same port is running already."); |
| } |
| } |
| |
| private void registerLiveNodesListener() { |
| // this listener is used for generating nodeLost events, so we check only if |
| // some nodes went missing compared to last state |
| LiveNodesListener listener = |
| (oldNodes, newNodes) -> { |
| oldNodes.removeAll(newNodes); |
| if (oldNodes.isEmpty()) { // only added nodes |
| return false; |
| } |
| if (isClosed) { |
| return true; |
| } |
| // if this node is in the top three then attempt to create nodeLost message |
| int i = 0; |
| for (String n : newNodes) { |
| if (n.equals(getNodeName())) { |
| break; |
| } |
| if (i > 2) { |
| return false; // this node is not in the top three |
| } |
| i++; |
| } |
| return false; |
| }; |
| zkStateReader.registerLiveNodesListener(listener); |
| } |
| |
| public void publishAndWaitForDownStates() throws KeeperException, InterruptedException { |
| publishAndWaitForDownStates(WAIT_DOWN_STATES_TIMEOUT_SECONDS); |
| } |
| |
| public void publishAndWaitForDownStates(int timeoutSeconds) |
| throws KeeperException, InterruptedException { |
| |
| publishNodeAsDown(getNodeName()); |
| |
| Set<String> collectionsWithLocalReplica = ConcurrentHashMap.newKeySet(); |
| for (CoreDescriptor descriptor : cc.getCoreDescriptors()) { |
| collectionsWithLocalReplica.add(descriptor.getCloudDescriptor().getCollectionName()); |
| } |
| |
| CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size()); |
| for (String collectionWithLocalReplica : collectionsWithLocalReplica) { |
| zkStateReader.registerDocCollectionWatcher( |
| collectionWithLocalReplica, |
| (collectionState) -> { |
| if (collectionState == null) return false; |
| boolean foundStates = true; |
| for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) { |
| if (coreDescriptor |
| .getCloudDescriptor() |
| .getCollectionName() |
| .equals(collectionWithLocalReplica)) { |
| Replica replica = |
| collectionState.getReplica( |
| coreDescriptor.getCloudDescriptor().getCoreNodeName()); |
| if (replica == null || replica.getState() != Replica.State.DOWN) { |
| foundStates = false; |
| } |
| } |
| } |
| |
| if (foundStates && collectionsWithLocalReplica.remove(collectionWithLocalReplica)) { |
| latch.countDown(); |
| } |
| return foundStates; |
| }); |
| } |
| |
| boolean allPublishedDown = latch.await(timeoutSeconds, TimeUnit.SECONDS); |
| if (!allPublishedDown) { |
| log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state."); |
| } |
| } |
| |
| /** |
| * Validates if the chroot exists in zk (or if it is successfully created). Optionally, if create |
| * is set to true this method will create the path in case it doesn't exist |
| * |
| * @return true if the path exists or is created false if the path doesn't exist and 'create' = |
| * false |
| */ |
| public static boolean checkChrootPath(String zkHost, boolean create) |
| throws KeeperException, InterruptedException { |
| if (!SolrZkClient.containsChroot(zkHost)) { |
| return true; |
| } |
| log.trace("zkHost includes chroot"); |
| String chrootPath = zkHost.substring(zkHost.indexOf('/'), zkHost.length()); |
| |
| SolrZkClient tmpClient = |
| new SolrZkClient.Builder() |
| .withUrl(zkHost.substring(0, zkHost.indexOf('/'))) |
| .withTimeout(SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT, TimeUnit.MILLISECONDS) |
| .withConnTimeOut(SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS) |
| .build(); |
| boolean exists = tmpClient.exists(chrootPath, true); |
| if (!exists && create) { |
| log.info("creating chroot {}", chrootPath); |
| tmpClient.makePath(chrootPath, false, true); |
| exists = true; |
| } |
| tmpClient.close(); |
| return exists; |
| } |
| |
| public boolean isConnected() { |
| return zkClient.isConnected(); |
| } |
| |
| private void createEphemeralLiveNode() throws KeeperException, InterruptedException { |
| if (zkRunOnly) { |
| return; |
| } |
| |
| String nodeName = getNodeName(); |
| String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName; |
| log.info("Register node as live in ZooKeeper:{}", nodePath); |
| Map<NodeRoles.Role, String> roles = cc.nodeRoles.getRoles(); |
| List<Op> ops = new ArrayList<>(roles.size() + 1); |
| ops.add( |
| Op.create( |
| nodePath, |
| null, |
| zkClient.getZkACLProvider().getACLsToAdd(nodePath), |
| CreateMode.EPHEMERAL)); |
| |
| // Create the roles node as well |
| roles.forEach( |
| (role, mode) -> |
| ops.add( |
| Op.create( |
| NodeRoles.getZNodeForRoleMode(role, mode) + "/" + nodeName, |
| null, |
| zkClient.getZkACLProvider().getACLsToAdd(nodePath), |
| CreateMode.EPHEMERAL))); |
| |
| zkClient.multi(ops, true); |
| } |
| |
| public void removeEphemeralLiveNode() throws KeeperException, InterruptedException { |
| if (zkRunOnly) { |
| return; |
| } |
| String nodeName = getNodeName(); |
| String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName; |
| log.info("Remove node as live in ZooKeeper:{}", nodePath); |
| List<Op> ops = List.of(Op.delete(nodePath, -1)); |
| |
| try { |
| zkClient.multi(ops, true); |
| } catch (NoNodeException e) { |
| |
| } |
| } |
| |
| public String getNodeName() { |
| return nodeName; |
| } |
| |
| /** Returns true if the path exists */ |
| public boolean pathExists(String path) throws KeeperException, InterruptedException { |
| return zkClient.exists(path, true); |
| } |
| |
| /** |
| * Register shard with ZooKeeper. |
| * |
| * @return the shardId for the SolrCore |
| */ |
| public String register(String coreName, final CoreDescriptor desc, boolean skipRecovery) |
| throws Exception { |
| return register(coreName, desc, false, false, skipRecovery); |
| } |
| |
| /** |
| * Register shard with ZooKeeper. |
| * |
| * @return the shardId for the SolrCore |
| */ |
| public String register( |
| String coreName, |
| final CoreDescriptor desc, |
| boolean recoverReloadedCores, |
| boolean afterExpiration, |
| boolean skipRecovery) |
| throws Exception { |
| MDCLoggingContext.setCoreDescriptor(cc, desc); |
| try { |
| // pre register has published our down state |
| final String baseUrl = getBaseUrl(); |
| final CloudDescriptor cloudDesc = desc.getCloudDescriptor(); |
| final String collection = cloudDesc.getCollectionName(); |
| final String shardId = cloudDesc.getShardId(); |
| final String coreZkNodeName = cloudDesc.getCoreNodeName(); |
| assert coreZkNodeName != null : "we should have a coreNodeName by now"; |
| |
| // check replica's existence in clusterstate first |
| try { |
| zkStateReader.waitForState( |
| collection, |
| 100, |
| TimeUnit.MILLISECONDS, |
| (collectionState) -> |
| getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null); |
| } catch (TimeoutException e) { |
| throw new SolrException( |
| ErrorCode.SERVER_ERROR, |
| "Error registering SolrCore, timeout waiting for replica present in clusterstate"); |
| } |
| Replica replica = |
| getReplicaOrNull( |
| zkStateReader.getClusterState().getCollectionOrNull(collection), |
| shardId, |
| coreZkNodeName); |
| if (replica == null) { |
| throw new SolrException( |
| ErrorCode.SERVER_ERROR, |
| "Error registering SolrCore, replica is removed from clusterstate"); |
| } |
| |
| if (replica.getType().leaderEligible) { |
| getCollectionTerms(collection).register(cloudDesc.getShardId(), coreZkNodeName); |
| } |
| |
| ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId()); |
| |
| log.debug( |
| "Register replica - core:{} address:{} collection:{} shard:{}", |
| coreName, |
| baseUrl, |
| collection, |
| shardId); |
| |
| try { |
| // If we're a preferred leader, insert ourselves at the head of the queue |
| boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false); |
| if (replica.getType().leaderEligible) { |
| joinElection(desc, afterExpiration, joinAtHead); |
| } else { |
| if (joinAtHead) { |
| log.warn( |
| "Replica {} was designated as preferred leader but its type is {}, It won't join election", |
| coreZkNodeName, |
| replica.getType()); |
| } |
| if (log.isDebugEnabled()) { |
| log.debug( |
| "Replica {} skipping election because its type is {}", |
| coreZkNodeName, |
| replica.getType()); |
| } |
| } |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } catch (KeeperException | IOException e) { |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } |
| |
| // in this case, we want to wait for the leader as long as the leader might |
| // wait for a vote, at least - but also long enough that a large cluster has |
| // time to get its act together |
| String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000); |
| |
| String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName); |
| log.debug("We are {} and leader is {}", ourUrl, leaderUrl); |
| boolean isLeader = leaderUrl.equals(ourUrl); |
| assert !isLeader || replica.getType().leaderEligible |
| : replica.getType().name() + " replica became leader!"; |
| |
| try (SolrCore core = cc.getCore(desc.getName())) { |
| |
| // recover from local transaction log and wait for it to complete before |
| // going active |
| // TODO: should this be moved to another thread? To recoveryStrat? |
| // TODO: should this actually be done earlier, before (or as part of) |
| // leader election perhaps? |
| |
| if (core == null) { |
| throw new SolrException( |
| ErrorCode.SERVICE_UNAVAILABLE, "SolrCore is no longer available to register"); |
| } |
| |
| UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); |
| boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader; |
| if (isTlogReplicaAndNotLeader) { |
| String commitVersion = ReplicateFromLeader.getCommitVersion(core); |
| if (commitVersion != null) { |
| ulog.copyOverOldUpdates(Long.parseLong(commitVersion)); |
| } |
| } |
| // we will call register again after zk expiration and on reload |
| if (!afterExpiration && !core.isReloaded() && ulog != null && !isTlogReplicaAndNotLeader) { |
| // disable recovery in case shard is in construction state (for shard splits) |
| Slice slice = getClusterState().getCollection(collection).getSlice(shardId); |
| if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) { |
| Future<UpdateLog.RecoveryInfo> recoveryFuture = |
| core.getUpdateHandler().getUpdateLog().recoverFromLog(); |
| if (recoveryFuture != null) { |
| log.info( |
| "Replaying tlog for {} during startup... NOTE: This can take a while.", ourUrl); |
| recoveryFuture.get(); // NOTE: this could potentially block for |
| // minutes or more! |
| // TODO: public as recovering in the mean time? |
| // TODO: in the future we could do peersync in parallel with recoverFromLog |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("No LogReplay needed for core={} baseURL={}", core.getName(), baseUrl); |
| } |
| } |
| } |
| } |
| boolean didRecovery = |
| checkRecovery( |
| recoverReloadedCores, |
| isLeader, |
| skipRecovery, |
| collection, |
| coreZkNodeName, |
| shardId, |
| core, |
| cc, |
| afterExpiration); |
| if (!didRecovery) { |
| if (replica.getType().replicateFromLeader && !isLeader) { |
| startReplicationFromLeader(coreName, replica.getType().requireTransactionLog); |
| } |
| publish(desc, Replica.State.ACTIVE); |
| } |
| |
| if (replica.getType().leaderEligible) { |
| // the watcher is added to a set so multiple calls of this method will left only one |
| // watcher |
| shardTerms.addListener( |
| new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer())); |
| } |
| core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true); |
| } catch (Exception e) { |
| unregister(coreName, desc, false); |
| throw e; |
| } |
| |
| // make sure we have an update cluster state right away |
| zkStateReader.forceUpdateCollection(collection); |
| // the watcher is added to a set so multiple calls of this method will left only one watcher |
| zkStateReader.registerDocCollectionWatcher( |
| cloudDesc.getCollectionName(), |
| new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName())); |
| return shardId; |
| } finally { |
| MDCLoggingContext.clear(); |
| } |
| } |
| |
| private Replica getReplicaOrNull(DocCollection docCollection, String shard, String coreNodeName) { |
| if (docCollection == null) return null; |
| |
| Slice slice = docCollection.getSlice(shard); |
| if (slice == null) return null; |
| |
| Replica replica = slice.getReplica(coreNodeName); |
| if (replica == null) return null; |
| if (!getNodeName().equals(replica.getNodeName())) return null; |
| |
| return replica; |
| } |
| |
| public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) { |
| log.info("{} starting background replication from leader", coreName); |
| ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName); |
| synchronized ( |
| replicateFromLeader) { // synchronize to prevent any stop before we finish the start |
| if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) { |
| replicateFromLeader.startReplication(switchTransactionLog); |
| } else { |
| log.warn("A replicate from leader instance already exists for core {}", coreName); |
| } |
| } |
| } |
| |
| public void stopReplicationFromLeader(String coreName) { |
| log.info("{} stopping background replication from leader", coreName); |
| ReplicateFromLeader replicateFromLeader = replicateFromLeaders.remove(coreName); |
| if (replicateFromLeader != null) { |
| synchronized (replicateFromLeader) { |
| replicateFromLeader.stopReplication(); |
| } |
| } |
| } |
| |
| // timeoutms is the timeout for the first call to get the leader - there is then |
| // a longer wait to make sure that leader matches our local state |
| private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) { |
| |
| String collection = cloudDesc.getCollectionName(); |
| String shardId = cloudDesc.getShardId(); |
| // rather than look in the cluster state file, we go straight to the zknodes |
| // here, because on cluster restart there could be stale leader info in the |
| // cluster state node that won't be updated for a moment |
| String leaderUrl; |
| try { |
| leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms).getCoreUrl(); |
| |
| // now wait until our currently cloud state contains the latest leader since we found it in |
| // zk, we are willing to wait a while to find it in state |
| String clusterStateLeaderUrl = zkStateReader.getLeaderUrl(collection, shardId, timeoutms * 2); |
| int tries = 0; |
| final int msInSec = 1000; |
| int maxTries = leaderConflictResolveWait / msInSec; |
| while (!leaderUrl.equals(clusterStateLeaderUrl)) { |
| if (cc.isShutDown()) throw new AlreadyClosedException(); |
| if (tries > maxTries) { |
| throw new SolrException( |
| ErrorCode.SERVER_ERROR, |
| "There is conflicting information about the leader of shard: " |
| + cloudDesc.getShardId() |
| + " our state says:" |
| + clusterStateLeaderUrl |
| + " but zookeeper says:" |
| + leaderUrl); |
| } |
| tries++; |
| if (tries % 30 == 0) { |
| String warnMsg = |
| String.format( |
| Locale.ENGLISH, |
| "Still seeing conflicting information about the leader " |
| + "of shard %s for collection %s after %d seconds; our state says %s, but ZooKeeper says %s", |
| cloudDesc.getShardId(), |
| collection, |
| tries, |
| clusterStateLeaderUrl, |
| leaderUrl); |
| log.warn(warnMsg); |
| } |
| Thread.sleep(msInSec); |
| clusterStateLeaderUrl = zkStateReader.getLeaderUrl(collection, shardId, timeoutms); |
| leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms).getCoreUrl(); |
| } |
| |
| } catch (AlreadyClosedException e) { |
| throw e; |
| } catch (Exception e) { |
| log.error("Error getting leader from zk", e); |
| throw new SolrException( |
| SolrException.ErrorCode.SERVER_ERROR, |
| "Error getting leader from zk for shard " + shardId, |
| e); |
| } |
| return leaderUrl; |
| } |
| |
| /** |
| * Get leader props directly from zk nodes. |
| * |
| * @throws SessionExpiredException on zk session expiration. |
| */ |
| public ZkCoreNodeProps getLeaderProps(final String collection, final String slice, int timeoutms) |
| throws InterruptedException, SessionExpiredException { |
| return getLeaderProps(collection, slice, timeoutms, true); |
| } |
| |
| /** |
| * Get leader props directly from zk nodes. |
| * |
| * @return leader props |
| * @throws SessionExpiredException on zk session expiration. |
| */ |
| public ZkCoreNodeProps getLeaderProps( |
| final String collection, |
| final String slice, |
| int timeoutms, |
| boolean failImmediatelyOnExpiration) |
| throws InterruptedException, SessionExpiredException { |
| int iterCount = timeoutms / 1000; |
| Exception exp = null; |
| while (iterCount-- > 0) { |
| try { |
| byte[] data = |
| zkClient.getData( |
| ZkStateReader.getShardLeadersPath(collection, slice), null, null, true); |
| ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(ZkNodeProps.load(data)); |
| return leaderProps; |
| } catch (InterruptedException e) { |
| throw e; |
| } catch (SessionExpiredException e) { |
| if (failImmediatelyOnExpiration) { |
| throw e; |
| } |
| exp = e; |
| Thread.sleep(1000); |
| } catch (Exception e) { |
| exp = e; |
| Thread.sleep(1000); |
| } |
| if (cc.isShutDown()) { |
| throw new AlreadyClosedException(); |
| } |
| } |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Could not get leader props", exp); |
| } |
| |
| private void joinElection(CoreDescriptor cd, boolean afterExpiration, boolean joinAtHead) |
| throws InterruptedException, KeeperException, IOException { |
| // look for old context - if we find it, cancel it |
| String collection = cd.getCloudDescriptor().getCollectionName(); |
| final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); |
| |
| ContextKey contextKey = new ContextKey(collection, coreNodeName); |
| |
| ElectionContext prevContext = electionContexts.get(contextKey); |
| |
| if (prevContext != null) { |
| prevContext.cancelElection(); |
| } |
| |
| String shardId = cd.getCloudDescriptor().getShardId(); |
| |
| Map<String, Object> props = new HashMap<>(); |
| // we only put a subset of props into the leader node |
| props.put(ZkStateReader.CORE_NAME_PROP, cd.getName()); |
| props.put(ZkStateReader.NODE_NAME_PROP, getNodeName()); |
| props.put(ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(getNodeName())); |
| props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); |
| |
| ZkNodeProps ourProps = new ZkNodeProps(props); |
| |
| LeaderElector leaderElector = new LeaderElector(zkClient, contextKey, electionContexts); |
| ElectionContext context = |
| new ShardLeaderElectionContext( |
| leaderElector, shardId, collection, coreNodeName, ourProps, this, cc); |
| |
| leaderElector.setup(context); |
| electionContexts.put(contextKey, context); |
| leaderElector.joinElection(context, false, joinAtHead); |
| } |
| |
| /** Returns whether or not a recovery was started */ |
| private boolean checkRecovery( |
| boolean recoverReloadedCores, |
| final boolean isLeader, |
| boolean skipRecovery, |
| final String collection, |
| String coreZkNodeName, |
| String shardId, |
| SolrCore core, |
| CoreContainer cc, |
| boolean afterExpiration) { |
| if (SKIP_AUTO_RECOVERY) { |
| log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery"); |
| return false; |
| } |
| boolean doRecovery = true; |
| if (!isLeader) { |
| |
| if (skipRecovery || (!afterExpiration && core.isReloaded() && !recoverReloadedCores)) { |
| doRecovery = false; |
| } |
| |
| if (doRecovery) { |
| if (log.isInfoEnabled()) { |
| log.info("Core needs to recover:{}", core.getName()); |
| } |
| core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor()); |
| return true; |
| } |
| |
| ZkShardTerms zkShardTerms = getShardTerms(collection, shardId); |
| if (zkShardTerms.registered(coreZkNodeName) |
| && !zkShardTerms.canBecomeLeader(coreZkNodeName)) { |
| if (log.isInfoEnabled()) { |
| log.info("Leader's term larger than core {}; starting recovery process", core.getName()); |
| } |
| core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor()); |
| return true; |
| } |
| } else { |
| log.info("I am the leader, no recovery necessary"); |
| } |
| |
| return false; |
| } |
| |
| public String getBaseUrl() { |
| return baseURL; |
| } |
| |
| public void publish(final CoreDescriptor cd, final Replica.State state) |
| throws KeeperException, InterruptedException { |
| publish(cd, state, true, false); |
| } |
| |
| /** Publish core state to overseer. */ |
| public void publish( |
| final CoreDescriptor cd, |
| final Replica.State state, |
| boolean updateLastState, |
| boolean forcePublish) |
| throws KeeperException, InterruptedException { |
| if (!forcePublish) { |
| try (SolrCore core = cc.getCore(cd.getName())) { |
| if (core == null || core.isClosed()) { |
| return; |
| } |
| } |
| } |
| MDCLoggingContext.setCoreDescriptor(cc, cd); |
| try { |
| String collection = cd.getCloudDescriptor().getCollectionName(); |
| |
| log.debug("publishing state={}", state); |
| // System.out.println(Thread.currentThread().getStackTrace()[3]); |
| Integer numShards = cd.getCloudDescriptor().getNumShards(); |
| |
| assert collection != null && collection.length() > 0; |
| |
| String shardId = cd.getCloudDescriptor().getShardId(); |
| |
| String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); |
| |
| MapWriter m = |
| props -> { |
| props.put(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower()); |
| props.put(ZkStateReader.STATE_PROP, state.toString()); |
| props.put(ZkStateReader.CORE_NAME_PROP, cd.getName()); |
| props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles()); |
| props.put(ZkStateReader.NODE_NAME_PROP, getNodeName()); |
| props.put( |
| ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(getNodeName())); |
| props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId()); |
| props.put(ZkStateReader.COLLECTION_PROP, collection); |
| props.put( |
| ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString()); |
| props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false"); |
| if (numShards != null) { |
| props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString()); |
| } |
| props.putIfNotNull(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); |
| }; |
| |
| try (SolrCore core = cc.getCore(cd.getName())) { |
| if (core != null && state == Replica.State.ACTIVE) { |
| ensureRegisteredSearcher(core); |
| } |
| if (core != null && core.getDirectoryFactory().isSharedStorage()) { |
| if (core.getDirectoryFactory().isSharedStorage()) { |
| m = |
| m.append( |
| props -> { |
| props.put(ZkStateReader.SHARED_STORAGE_PROP, "true"); |
| props.put("dataDir", core.getDataDir()); |
| UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); |
| if (ulog != null) { |
| props.put("ulogDir", ulog.getLogDir()); |
| } |
| }); |
| } |
| } |
| } catch (SolrCoreInitializationException ex) { |
| // The core had failed to initialize (in a previous request, not this one), hence nothing to |
| // do here. |
| if (log.isInfoEnabled()) { |
| log.info("The core '{}' had failed to initialize before.", cd.getName()); |
| } |
| } |
| |
| // pull replicas are excluded because their terms are not considered |
| if (state == Replica.State.RECOVERING |
| && cd.getCloudDescriptor().getReplicaType().leaderEligible) { |
| // state is used by client, state of replica can change from RECOVERING to DOWN without |
| // needed to finish recovery by calling this we will know that a replica actually finished |
| // recovery or not |
| getShardTerms(collection, shardId).startRecovering(coreNodeName); |
| } |
| if (state == Replica.State.ACTIVE |
| && cd.getCloudDescriptor().getReplicaType().leaderEligible) { |
| getShardTerms(collection, shardId).doneRecovering(coreNodeName); |
| } |
| |
| if (updateLastState) { |
| cd.getCloudDescriptor().setLastPublished(state); |
| } |
| DocCollection coll = zkStateReader.getCollection(collection); |
| if (forcePublish || updateStateDotJson(coll, coreNodeName)) { |
| if (distributedClusterStateUpdater.isDistributedStateUpdate()) { |
| distributedClusterStateUpdater.doSingleStateUpdate( |
| DistributedClusterStateUpdater.MutatingCommand.ReplicaSetState, |
| new ZkNodeProps(m), |
| getSolrCloudManager(), |
| zkStateReader); |
| } else { |
| overseerJobQueue.offer(m); |
| } |
| } |
| // extra handling for PRS, we need to write the PRS entries from this node directly, |
| // as overseer does not and should not handle those entries |
| if (coll != null && coll.isPerReplicaState() && coreNodeName != null) { |
| PerReplicaStates perReplicaStates = |
| PerReplicaStatesOps.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates()); |
| PerReplicaStatesOps.flipState(coreNodeName, state, perReplicaStates) |
| .persist(coll.getZNode(), zkClient); |
| } |
| } finally { |
| MDCLoggingContext.clear(); |
| } |
| } |
| |
| /** |
| * Returns {@code true} if a message needs to be sent to overseer (or done in a distributed way) |
| * to update state.json for the collection |
| */ |
| static boolean updateStateDotJson(DocCollection coll, String replicaName) { |
| if (coll == null) return true; |
| if (!coll.isPerReplicaState()) return true; |
| Replica r = coll.getReplica(replicaName); |
| if (r == null) return true; |
| Slice shard = coll.getSlice(r.shard); |
| if (shard == null) return true; // very unlikely |
| if (shard.getParent() != null) return true; |
| for (Slice slice : coll.getSlices()) { |
| if (Objects.equals(shard.getName(), slice.getParent())) return true; |
| } |
| return false; |
| } |
| |
| public ZkShardTerms getShardTerms(String collection, String shardId) { |
| return getCollectionTerms(collection).getShard(shardId); |
| } |
| |
| private ZkCollectionTerms getCollectionTerms(String collection) { |
| synchronized (collectionToTerms) { |
| if (!collectionToTerms.containsKey(collection)) |
| collectionToTerms.put(collection, new ZkCollectionTerms(collection, zkClient)); |
| return collectionToTerms.get(collection); |
| } |
| } |
| |
| public void clearZkCollectionTerms() { |
| synchronized (collectionToTerms) { |
| collectionToTerms.values().forEach(ZkCollectionTerms::close); |
| collectionToTerms.clear(); |
| } |
| } |
| |
| public void unregister(String coreName, CoreDescriptor cd) throws Exception { |
| unregister(coreName, cd, true); |
| } |
| |
| public void unregister(String coreName, CoreDescriptor cd, boolean removeCoreFromZk) |
| throws Exception { |
| final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); |
| final String collection = cd.getCloudDescriptor().getCollectionName(); |
| getCollectionTerms(collection).remove(cd.getCloudDescriptor().getShardId(), cd); |
| replicasMetTragicEvent.remove(collection + ":" + coreNodeName); |
| |
| if (StrUtils.isNullOrEmpty(collection)) { |
| log.error("No collection was specified."); |
| assert false : "No collection was specified [" + collection + "]"; |
| return; |
| } |
| final DocCollection docCollection = |
| zkStateReader.getClusterState().getCollectionOrNull(collection); |
| Replica replica = (docCollection == null) ? null : docCollection.getReplica(coreNodeName); |
| |
| if (replica == null || replica.getType().leaderEligible) { |
| ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName)); |
| |
| if (context != null) { |
| context.cancelElection(); |
| } |
| } |
| CloudDescriptor cloudDescriptor = cd.getCloudDescriptor(); |
| if (removeCoreFromZk) { |
| // extra handling for PRS, we need to write the PRS entries from this node directly, |
| // as overseer does not and should not handle those entries |
| if (docCollection != null && docCollection.isPerReplicaState() && coreNodeName != null) { |
| if (log.isDebugEnabled()) { |
| log.debug( |
| "Unregistering core with coreNodeName {} of collection {} - deleting the PRS entries from ZK", |
| coreNodeName, |
| docCollection.getName()); |
| } |
| PerReplicaStates perReplicaStates = |
| PerReplicaStatesOps.fetch( |
| docCollection.getZNode(), zkClient, docCollection.getPerReplicaStates()); |
| PerReplicaStatesOps.deleteReplica(coreNodeName, perReplicaStates) |
| .persist(docCollection.getZNode(), zkClient); |
| } |
| MapWriter m = |
| ew -> |
| ew.put(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower()) |
| .put(ZkStateReader.CORE_NAME_PROP, coreName) |
| .put(ZkStateReader.NODE_NAME_PROP, getNodeName()) |
| .put( |
| ZkStateReader.BASE_URL_PROP, |
| zkStateReader.getBaseUrlForNodeName(getNodeName())) |
| .put(ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName()) |
| .put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); |
| if (distributedClusterStateUpdater.isDistributedStateUpdate()) { |
| distributedClusterStateUpdater.doSingleStateUpdate( |
| DistributedClusterStateUpdater.MutatingCommand.SliceRemoveReplica, |
| new ZkNodeProps(m), |
| getSolrCloudManager(), |
| zkStateReader); |
| } else { |
| overseerJobQueue.offer(m); |
| } |
| } |
| } |
| |
| public ZkStateReader getZkStateReader() { |
| return zkStateReader; |
| } |
| |
| private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) { |
| final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); |
| |
| if (coreNodeName != null) { |
| waitForShardId(cd); |
| } else { |
| // if no explicit coreNodeName, we want to match by base url and core name |
| waitForCoreNodeName(cd); |
| waitForShardId(cd); |
| } |
| } |
| |
| private void waitForCoreNodeName(CoreDescriptor descriptor) { |
| log.debug("waitForCoreNodeName >>> look for our core node name"); |
| try { |
| DocCollection collection = |
| zkStateReader.waitForState( |
| descriptor.getCollectionName(), |
| 320L, |
| TimeUnit.SECONDS, |
| c -> |
| ClusterStateMutator.getAssignedCoreNodeName( |
| c, getNodeName(), descriptor.getName()) |
| != null); |
| // Read outside of the predicate to avoid multiple potential writes |
| String name = |
| ClusterStateMutator.getAssignedCoreNodeName( |
| collection, getNodeName(), descriptor.getName()); |
| descriptor.getCloudDescriptor().setCoreNodeName(name); |
| } catch (TimeoutException | InterruptedException e) { |
| SolrZkClient.checkInterrupted(e); |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for collection state", e); |
| } |
| getCoreContainer().getCoresLocator().persist(getCoreContainer(), descriptor); |
| } |
| |
| private void waitForShardId(final CoreDescriptor cd) { |
| if (log.isDebugEnabled()) { |
| log.debug("waiting to find shard id in clusterstate for {}", cd.getName()); |
| } |
| try { |
| DocCollection collection = |
| zkStateReader.waitForState( |
| cd.getCollectionName(), |
| 320, |
| TimeUnit.SECONDS, |
| c -> c != null && c.getShardId(getNodeName(), cd.getName()) != null); |
| // Read outside of the predicate to avoid multiple potential writes |
| cd.getCloudDescriptor().setShardId(collection.getShardId(getNodeName(), cd.getName())); |
| } catch (TimeoutException | InterruptedException e) { |
| SolrZkClient.checkInterrupted(e); |
| throw new SolrException( |
| ErrorCode.SERVER_ERROR, "Failed getting shard id for core: " + cd.getName(), e); |
| } |
| } |
| |
| public String getCoreNodeName(CoreDescriptor descriptor) { |
| String coreNodeName = descriptor.getCloudDescriptor().getCoreNodeName(); |
| if (coreNodeName == null && !genericCoreNodeNames) { |
| // it's the default |
| return getNodeName() + "_" + descriptor.getName(); |
| } |
| |
| return coreNodeName; |
| } |
| |
| public void preRegister(CoreDescriptor cd, boolean publishState) { |
| |
| String coreNodeName = getCoreNodeName(cd); |
| |
| // before becoming available, make sure we are not live and active |
| // this also gets us our assigned shard id if it was not specified |
| try { |
| checkStateInZk(cd); |
| |
| CloudDescriptor cloudDesc = cd.getCloudDescriptor(); |
| |
| // make sure the node name is set on the descriptor |
| if (cloudDesc.getCoreNodeName() == null) { |
| cloudDesc.setCoreNodeName(coreNodeName); |
| } |
| |
| // publishState == false on startup |
| if (publishState || isPublishAsDownOnStartup(cloudDesc)) { |
| publish(cd, Replica.State.DOWN, false, true); |
| } |
| String collectionName = cd.getCloudDescriptor().getCollectionName(); |
| DocCollection collection = |
| zkStateReader.getClusterState().getCollectionOrNull(collectionName); |
| if (log.isDebugEnabled()) { |
| log.debug( |
| collection == null |
| ? "Collection {} not visible yet, but flagging it so a watch is registered when it becomes visible" |
| : "Registering watch for collection {}", |
| collectionName); |
| } |
| } catch (KeeperException e) { |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } catch (NotInClusterStateException e) { |
| // make the stack trace less verbose |
| throw e; |
| } catch (Exception e) { |
| log.error("", e); |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } |
| |
| doGetShardIdAndNodeNameProcess(cd); |
| } |
| |
| /** |
| * On startup, the node already published all of its replicas as DOWN, we can skip publish the |
| * replica as down |
| * |
| * @return Should publish the replica as down on startup |
| */ |
| private boolean isPublishAsDownOnStartup(CloudDescriptor cloudDesc) { |
| Replica replica = |
| zkStateReader |
| .getClusterState() |
| .getCollection(cloudDesc.getCollectionName()) |
| .getSlice(cloudDesc.getShardId()) |
| .getReplica(cloudDesc.getCoreNodeName()); |
| return !replica.getNodeName().equals(getNodeName()); |
| } |
| |
| private void checkStateInZk(CoreDescriptor cd) |
| throws InterruptedException, NotInClusterStateException { |
| CloudDescriptor cloudDesc = cd.getCloudDescriptor(); |
| String nodeName = cloudDesc.getCoreNodeName(); |
| if (nodeName == null) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, "No coreNodeName for " + cd); |
| } |
| final String coreNodeName = nodeName; |
| |
| if (cloudDesc.getShardId() == null) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, "No shard id for " + cd); |
| } |
| |
| AtomicReference<String> errorMessage = new AtomicReference<>(); |
| try { |
| zkStateReader.waitForState( |
| cd.getCollectionName(), |
| 10, |
| TimeUnit.SECONDS, |
| (c) -> { |
| if (c == null) return false; |
| Slice slice = c.getSlice(cloudDesc.getShardId()); |
| if (slice == null) { |
| errorMessage.set("Invalid shard: " + cloudDesc.getShardId()); |
| return false; |
| } |
| Replica replica = slice.getReplica(coreNodeName); |
| if (replica == null) { |
| errorMessage.set( |
| "coreNodeName " |
| + coreNodeName |
| + " does not exist in shard " |
| + cloudDesc.getShardId() |
| + ", ignore the exception if the replica was deleted"); |
| return false; |
| } |
| return true; |
| }); |
| } catch (TimeoutException e) { |
| String error = errorMessage.get(); |
| if (error == null) |
| error = |
| "coreNodeName " |
| + coreNodeName |
| + " does not exist in shard " |
| + cloudDesc.getShardId() |
| + ", ignore the exception if the replica was deleted"; |
| throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error); |
| } |
| } |
| |
| /** Attempts to cancel all leader elections. This method should be called on node shutdown. */ |
| public void tryCancelAllElections() { |
| if (zkClient.isClosed()) { |
| return; |
| } |
| Collection<ElectionContext> values = electionContexts.values(); |
| synchronized (electionContexts) { |
| values.forEach( |
| context -> { |
| try { |
| context.cancelElection(); |
| context.close(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } catch (KeeperException e) { |
| log.warn("Error on cancelling elections of {}", context.leaderPath, e); |
| } |
| }); |
| } |
| } |
| |
| private ZkCoreNodeProps waitForLeaderToSeeDownState( |
| CoreDescriptor descriptor, final String coreZkNodeName) throws SessionExpiredException { |
| // try not to wait too long here - if we are waiting too long, we should probably |
| // move along and join the election |
| |
| CloudDescriptor cloudDesc = descriptor.getCloudDescriptor(); |
| String collection = cloudDesc.getCollectionName(); |
| String shard = cloudDesc.getShardId(); |
| ZkCoreNodeProps leaderProps = null; |
| |
| int retries = 2; |
| for (int i = 0; i < retries; i++) { |
| try { |
| if (isClosed) { |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "We have been closed"); |
| } |
| |
| // go straight to zk, not the cloud state - we want current info |
| leaderProps = getLeaderProps(collection, shard, 5000); |
| break; |
| } catch (SessionExpiredException e) { |
| throw e; |
| } catch (Exception e) { |
| log.info("Did not find the leader in Zookeeper", e); |
| try { |
| Thread.sleep(2000); |
| } catch (InterruptedException e1) { |
| Thread.currentThread().interrupt(); |
| } |
| if (i == retries - 1) { |
| throw new SolrException( |
| ErrorCode.SERVER_ERROR, "There was a problem finding the leader in zk"); |
| } |
| } |
| } |
| |
| String leaderBaseUrl = leaderProps.getBaseUrl(); |
| String leaderCoreName = leaderProps.getCoreName(); |
| |
| String myCoreNodeName = cloudDesc.getCoreNodeName(); |
| String myCoreName = descriptor.getName(); |
| String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), myCoreName); |
| |
| boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl); |
| if (!isLeader && !SKIP_AUTO_RECOVERY) { |
| if (!getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) { |
| log.debug( |
| "Term of replica {} is already less than leader, so not waiting for leader to see down state.", |
| myCoreNodeName); |
| } else { |
| |
| if (log.isInfoEnabled()) { |
| log.info( |
| "replica={} is making a best effort attempt to wait for leader={} to see it's DOWN state.", |
| myCoreNodeName, |
| leaderProps.getCoreUrl()); |
| } |
| |
| // short timeouts, we may be in a storm and this is best effort, and maybe we should be the |
| // leader now |
| try (SolrClient client = |
| new Builder(leaderBaseUrl) |
| .withConnectionTimeout(8000, TimeUnit.MILLISECONDS) |
| .withSocketTimeout(30000, TimeUnit.MILLISECONDS) |
| .build()) { |
| WaitForState prepCmd = new WaitForState(); |
| prepCmd.setCoreName(leaderCoreName); |
| prepCmd.setNodeName(getNodeName()); |
| prepCmd.setCoreNodeName(coreZkNodeName); |
| prepCmd.setState(Replica.State.DOWN); |
| |
| // lets give it another chance, but without taking too long |
| retries = 3; |
| for (int i = 0; i < retries; i++) { |
| if (isClosed) { |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "We have been closed"); |
| } |
| try { |
| client.request(prepCmd); |
| break; |
| } catch (Exception e) { |
| |
| // if the core container is shutdown, don't wait |
| if (cc.isShutDown()) { |
| throw new SolrException( |
| ErrorCode.SERVICE_UNAVAILABLE, "Core container is shutdown."); |
| } |
| |
| Throwable rootCause = SolrException.getRootCause(e); |
| if (rootCause instanceof IOException) { |
| // if there was a communication error talking to the leader, see if the leader is |
| // even alive |
| if (!zkStateReader.getClusterState().liveNodesContain(leaderProps.getNodeName())) { |
| throw new SolrException( |
| ErrorCode.SERVICE_UNAVAILABLE, |
| "Node " |
| + leaderProps.getNodeName() |
| + " hosting leader for " |
| + shard |
| + " in " |
| + collection |
| + " is not live!"); |
| } |
| } |
| |
| log.error("There was a problem making a request to the leader", e); |
| try { |
| Thread.sleep(2000); |
| } catch (InterruptedException e1) { |
| Thread.currentThread().interrupt(); |
| } |
| if (i == retries - 1) { |
| throw new SolrException( |
| ErrorCode.SERVER_ERROR, "There was a problem making a request to the leader"); |
| } |
| } |
| } |
| } catch (IOException e) { |
| log.error("Error closing HttpSolrClient", e); |
| } |
| } |
| } |
| return leaderProps; |
| } |
| |
| public static void linkConfSet(SolrZkClient zkClient, String collection, String confSetName) |
| throws KeeperException, InterruptedException { |
| String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection; |
| log.debug("Load collection config from:{}", path); |
| byte[] data; |
| try { |
| data = zkClient.getData(path, null, null, true); |
| } catch (NoNodeException e) { |
| // if there is no node, we will try and create it |
| // first try to make in case we are pre configuring |
| ZkNodeProps props = new ZkNodeProps(CONFIGNAME_PROP, confSetName); |
| try { |
| |
| zkClient.makePath(path, Utils.toJSON(props), CreateMode.PERSISTENT, null, true); |
| } catch (KeeperException e2) { |
| // it's okay if the node already exists |
| if (e2.code() != KeeperException.Code.NODEEXISTS) { |
| throw e; |
| } |
| // if we fail creating, setdata |
| // TODO: we should consider using version |
| zkClient.setData(path, Utils.toJSON(props), true); |
| } |
| return; |
| } |
| // we found existing data, let's update it |
| ZkNodeProps props = null; |
| if (data != null) { |
| props = ZkNodeProps.load(data); |
| Map<String, Object> newProps = new HashMap<>(props.getProperties()); |
| newProps.put(CONFIGNAME_PROP, confSetName); |
| props = new ZkNodeProps(newProps); |
| } else { |
| props = new ZkNodeProps(CONFIGNAME_PROP, confSetName); |
| } |
| |
| // TODO: we should consider using version |
| zkClient.setData(path, Utils.toJSON(props), true); |
| } |
| |
| public ZkDistributedQueue getOverseerJobQueue() { |
| if (distributedClusterStateUpdater.isDistributedStateUpdate()) { |
| throw new IllegalStateException( |
| "Cluster is configured with distributed state update, not expecting the queue to be retrieved"); |
| } |
| return overseerJobQueue; |
| } |
| |
| public OverseerTaskQueue getOverseerCollectionQueue() { |
| return overseerCollectionQueue; |
| } |
| |
| public OverseerTaskQueue getOverseerConfigSetQueue() { |
| return overseerConfigSetQueue; |
| } |
| |
| public DistributedMap getOverseerRunningMap() { |
| return overseerRunningMap; |
| } |
| |
| public DistributedMap getOverseerCompletedMap() { |
| return overseerCompletedMap; |
| } |
| |
| public DistributedMap getOverseerFailureMap() { |
| return overseerFailureMap; |
| } |
| |
| /** |
| * When an operation needs to be performed in an asynchronous mode, the asyncId needs to be |
| * claimed by calling this method to make sure it's not duplicate (hasn't been claimed by other |
| * request). If this method returns true, the asyncId in the parameter has been reserved for the |
| * operation, meaning that no other thread/operation can claim it. If for whatever reason, the |
| * operation is not scheduled, the asuncId needs to be cleared using {@link |
| * #clearAsyncId(String)}. If this method returns false, no reservation has been made, and this |
| * asyncId can't be used, since it's being used by another operation (currently or in the past) |
| * |
| * @param asyncId A string representing the asyncId of an operation. Can't be null. |
| * @return True if the reservation succeeds. False if this ID is already in use. |
| */ |
| public boolean claimAsyncId(String asyncId) throws KeeperException { |
| try { |
| return asyncIdsMap.putIfAbsent(asyncId, new byte[0]); |
| } catch (InterruptedException e) { |
| log.error("Could not claim asyncId={}", asyncId, e); |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Clears an asyncId previously claimed by calling {@link #claimAsyncId(String)} |
| * |
| * @param asyncId A string representing the asyncId of an operation. Can't be null. |
| * @return True if the asyncId existed and was cleared. False if the asyncId didn't exist before. |
| */ |
| public boolean clearAsyncId(String asyncId) throws KeeperException { |
| try { |
| return asyncIdsMap.remove(asyncId); |
| } catch (InterruptedException e) { |
| log.error("Could not release asyncId={}", asyncId, e); |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public Overseer getOverseer() { |
| return overseer; |
| } |
| |
| public LeaderElector getOverseerElector() { |
| return overseerElector; |
| } |
| |
| /** |
| * Returns the nodeName that should be used based on the specified properties. |
| * |
| * @param hostName - must not be null or the empty string |
| * @param hostPort - must consist only of digits, must not be null or the empty string |
| * @lucene.experimental |
| * @see ZkStateReader#getBaseUrlForNodeName |
| */ |
| static String generateNodeName(final String hostName, final String hostPort) { |
| return hostName + ':' + hostPort + '_' + "solr"; |
| } |
| |
| public void rejoinOverseerElection(String electionNode, boolean joinAtHead) { |
| try { |
| final ElectionContext context = overseerElector.getContext(); |
| if (electionNode != null) { |
| // Check whether we came to this node by mistake |
| if (context != null |
| && context.leaderSeqPath != null |
| && !context.leaderSeqPath.endsWith(electionNode)) { |
| log.warn( |
| "Asked to rejoin with wrong election node : {}, current node is {}", |
| electionNode, |
| context.leaderSeqPath); |
| // however delete it . This is possible when the last attempt at deleting the election |
| // node failed. |
| if (electionNode.startsWith(getNodeName())) { |
| try { |
| zkClient.delete( |
| Overseer.OVERSEER_ELECT + LeaderElector.ELECTION_NODE + "/" + electionNode, |
| -1, |
| true); |
| } catch (NoNodeException e) { |
| // no problem |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } catch (Exception e) { |
| log.warn("Old election node exists , could not be removed ", e); |
| } |
| } |
| } else { // We're in the right place, now attempt to rejoin |
| overseerElector.retryElection( |
| new OverseerElectionContext(zkClient, overseer, getNodeName()), joinAtHead); |
| return; |
| } |
| } else { |
| overseerElector.retryElection(context, joinAtHead); |
| } |
| } catch (Exception e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e); |
| } |
| } |
| |
| public void rejoinShardLeaderElection(SolrParams params) { |
| |
| String collectionName = params.get(COLLECTION_PROP); |
| String coreNodeName = params.get(CORE_NODE_NAME_PROP); |
| String coreName = params.get(CORE_NAME_PROP); |
| boolean rejoinAtHead = params.getBool(REJOIN_AT_HEAD_PROP, false); |
| |
| try { |
| MDCLoggingContext.setCoreDescriptor(cc, cc.getCoreDescriptor(coreName)); |
| |
| log.info("Rejoin the shard leader election."); |
| |
| ContextKey contextKey = new ContextKey(collectionName, coreNodeName); |
| |
| ElectionContext prevContext = electionContexts.get(contextKey); |
| |
| String baseUrl = zkStateReader.getBaseUrlForNodeName(getNodeName()); |
| String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName); |
| |
| LeaderElector elect = ((ShardLeaderElectionContextBase) prevContext).getLeaderElector(); |
| |
| elect.retryElection(prevContext, rejoinAtHead); |
| |
| try (SolrCore core = cc.getCore(coreName)) { |
| Replica.Type replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType(); |
| if (replicaType.replicateFromLeader) { |
| String leaderUrl = |
| getLeader( |
| core.getCoreDescriptor().getCloudDescriptor(), cloudConfig.getLeaderVoteWait()); |
| if (!leaderUrl.equals(ourUrl)) { |
| // restart the replication thread to ensure the replication is running in each new |
| // replica especially if previous role is "leader" (i.e., no replication thread) |
| stopReplicationFromLeader(coreName); |
| startReplicationFromLeader(coreName, replicaType.requireTransactionLog); |
| } |
| } |
| } |
| } catch (Exception e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e); |
| } finally { |
| MDCLoggingContext.clear(); |
| } |
| } |
| |
| public void checkOverseerDesignate() { |
| try { |
| byte[] data = zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true); |
| if (data == null) return; |
| Map<?, ?> roles = (Map<?, ?>) Utils.fromJSON(data); |
| if (roles == null) return; |
| List<?> nodeList = (List<?>) roles.get("overseer"); |
| if (nodeList == null) return; |
| if (nodeList.contains(getNodeName())) { |
| setPreferredOverseer(); |
| } |
| } catch (NoNodeException nne) { |
| return; |
| } catch (Exception e) { |
| log.warn("could not read the overseer designate ", e); |
| } |
| } |
| |
| public void setPreferredOverseer() throws KeeperException, InterruptedException { |
| MapWriter props = |
| ew -> |
| ew.put(Overseer.QUEUE_OPERATION, ADDROLE.toString().toLowerCase(Locale.ROOT)) |
| .put(getNodeName(), getNodeName()) |
| .put("role", "overseer") |
| .put("persist", "false"); |
| log.warn( |
| "Going to add role {}. It is deprecated to use ADDROLE and consider using Node Roles instead.", |
| props.jsonStr()); |
| getOverseerCollectionQueue().offer(props); |
| } |
| |
| public CoreContainer getCoreContainer() { |
| return cc; |
| } |
| |
| public void throwErrorIfReplicaReplaced(CoreDescriptor desc) { |
| ClusterState clusterState = getZkStateReader().getClusterState(); |
| if (clusterState != null) { |
| DocCollection collection = |
| clusterState.getCollectionOrNull(desc.getCloudDescriptor().getCollectionName()); |
| if (collection != null) { |
| CloudUtil.checkSharedFSFailoverReplaced(cc, desc); |
| } |
| } |
| } |
| |
| /** |
| * Add a listener to be notified once there is a new session created after a ZooKeeper session |
| * expiration occurs; in most cases, listeners will be components that have watchers that need to |
| * be re-created. |
| */ |
| public void addOnReconnectListener(OnReconnect listener) { |
| if (listener != null) { |
| synchronized (reconnectListeners) { |
| reconnectListeners.add(listener); |
| log.debug("Added new OnReconnect listener {}", listener); |
| } |
| } |
| } |
| |
| /** |
| * Removed a previously registered OnReconnect listener, such as when a core is removed or |
| * reloaded. |
| */ |
| public void removeOnReconnectListener(OnReconnect listener) { |
| if (listener != null) { |
| boolean wasRemoved; |
| synchronized (reconnectListeners) { |
| wasRemoved = reconnectListeners.remove(listener); |
| } |
| if (wasRemoved) { |
| log.debug("Removed OnReconnect listener {}", listener); |
| } else { |
| log.warn( |
| "Was asked to remove OnReconnect listener {}, but remove operation " |
| + "did not find it in the list of registered listeners.", |
| listener); |
| } |
| } |
| } |
| |
| @SuppressWarnings({"unchecked"}) |
| Set<OnReconnect> getCurrentOnReconnectListeners() { |
| HashSet<OnReconnect> clonedListeners; |
| synchronized (reconnectListeners) { |
| clonedListeners = (HashSet<OnReconnect>) reconnectListeners.clone(); |
| } |
| return clonedListeners; |
| } |
| |
| /** |
| * Persists a config file to ZooKeeper using optimistic concurrency. |
| * |
| * @return true on success |
| */ |
| public static int persistConfigResourceToZooKeeper( |
| ZkSolrResourceLoader zkLoader, |
| int znodeVersion, |
| String resourceName, |
| byte[] content, |
| boolean createIfNotExists) { |
| int latestVersion = znodeVersion; |
| final ZkController zkController = zkLoader.getZkController(); |
| final SolrZkClient zkClient = zkController.getZkClient(); |
| final String resourceLocation = zkLoader.getConfigSetZkPath() + "/" + resourceName; |
| String errMsg = "Failed to persist resource at {0} - old {1}"; |
| try { |
| try { |
| Stat stat = zkClient.setData(resourceLocation, content, znodeVersion, true); |
| // if the set succeeded, it should have incremented the version by one always |
| latestVersion = stat.getVersion(); |
| log.info("Persisted config data to node {} ", resourceLocation); |
| touchConfDir(zkLoader); |
| } catch (NoNodeException e) { |
| if (createIfNotExists) { |
| try { |
| zkClient.create(resourceLocation, content, CreateMode.PERSISTENT, true); |
| latestVersion = 0; // just created so version must be zero |
| touchConfDir(zkLoader); |
| } catch (KeeperException.NodeExistsException nee) { |
| try { |
| Stat stat = zkClient.exists(resourceLocation, null, true); |
| if (log.isDebugEnabled()) { |
| log.debug( |
| "failed to set data version in zk is {} and expected version is {} ", |
| stat.getVersion(), |
| znodeVersion); |
| } |
| } catch (Exception e1) { |
| log.warn("could not get stat"); |
| } |
| |
| if (log.isInfoEnabled()) { |
| log.info(StrUtils.formatString(errMsg, resourceLocation, znodeVersion)); |
| } |
| throw new ResourceModifiedInZkException( |
| ErrorCode.CONFLICT, |
| StrUtils.formatString(errMsg, resourceLocation, znodeVersion) + ", retry."); |
| } |
| } |
| } |
| |
| } catch (KeeperException.BadVersionException bve) { |
| try { |
| zkClient.exists(resourceLocation, null, true); |
| } catch (Exception e) { |
| log.error("Exception during ZooKeeper node checking ", e); |
| } |
| if (log.isInfoEnabled()) { |
| log.info( |
| StrUtils.formatString( |
| "%s zkVersion= %d %s %d", errMsg, resourceLocation, znodeVersion)); |
| } |
| throw new ResourceModifiedInZkException( |
| ErrorCode.CONFLICT, |
| StrUtils.formatString(errMsg, resourceLocation, znodeVersion) + ", retry."); |
| } catch (ResourceModifiedInZkException e) { |
| throw e; |
| } catch (Exception e) { |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); // Restore the interrupted status |
| } |
| final String msg = "Error persisting resource at " + resourceLocation; |
| log.error(msg, e); |
| throw new SolrException(ErrorCode.SERVER_ERROR, msg, e); |
| } |
| return latestVersion; |
| } |
| |
| public static void touchConfDir(ZkSolrResourceLoader zkLoader) { |
| SolrZkClient zkClient = zkLoader.getZkController().getZkClient(); |
| String configSetZkPath = zkLoader.getConfigSetZkPath(); |
| try { |
| // Ensure that version gets updated by replacing data with itself. |
| // If there is no existing data then set it to byte[] {0}. |
| // This should trigger any watchers if necessary as well. |
| zkClient.atomicUpdate(configSetZkPath, bytes -> bytes == null ? TOUCHED_ZNODE_DATA : bytes); |
| } catch (Exception e) { |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); // Restore the interrupted status |
| } |
| final String msg = "Error 'touching' conf location " + configSetZkPath; |
| log.error(msg, e); |
| throw new SolrException(ErrorCode.SERVER_ERROR, msg, e); |
| } |
| } |
| |
| public static class ResourceModifiedInZkException extends SolrException { |
| public ResourceModifiedInZkException(ErrorCode code, String msg) { |
| super(code, msg); |
| } |
| } |
| |
| private void unregisterConfListener(String confDir, Runnable listener) { |
| synchronized (confDirectoryListeners) { |
| final Set<Runnable> listeners = confDirectoryListeners.get(confDir); |
| if (listeners == null) { |
| log.warn( |
| "{} has no more registered listeners, but a live one attempted to unregister!", |
| confDir); |
| return; |
| } |
| if (listeners.remove(listener)) { |
| log.debug("removed listener for config directory [{}]", confDir); |
| } |
| if (listeners.isEmpty()) { |
| // no more listeners for this confDir, remove it from the map |
| log.debug("No more listeners for config directory [{}]", confDir); |
| confDirectoryListeners.remove(confDir); |
| } |
| } |
| } |
| |
| /** |
| * This will give a callback to the listener whenever a child is modified in the conf directory. |
| * It is the responsibility of the listener to check if the individual item of interest has been |
| * modified. When the last core which was interested in this conf directory is gone the listeners |
| * will be removed automatically. |
| */ |
| public void registerConfListenerForCore( |
| final String confDir, SolrCore core, final Runnable listener) { |
| if (listener == null) { |
| throw new NullPointerException("listener cannot be null"); |
| } |
| synchronized (confDirectoryListeners) { |
| final Set<Runnable> confDirListeners = getConfDirListeners(confDir); |
| confDirListeners.add(listener); |
| core.addCloseHook( |
| new CloseHook() { |
| @Override |
| public void preClose(SolrCore core) { |
| unregisterConfListener(confDir, listener); |
| } |
| }); |
| } |
| } |
| |
| // this method is called in a protected confDirListeners block |
| private Set<Runnable> getConfDirListeners(final String confDir) { |
| assert Thread.holdsLock(confDirectoryListeners) : "confDirListeners lock not held by thread"; |
| Set<Runnable> confDirListeners = confDirectoryListeners.get(confDir); |
| if (confDirListeners == null) { |
| log.debug("watch zkdir {}", confDir); |
| confDirListeners = new HashSet<>(); |
| confDirectoryListeners.put(confDir, confDirListeners); |
| setConfWatcher(confDir, new WatcherImpl(confDir), null); |
| } |
| return confDirListeners; |
| } |
| |
| private final Map<String, Set<Runnable>> confDirectoryListeners = new HashMap<>(); |
| |
| private class WatcherImpl implements Watcher { |
| private final String zkDir; |
| |
| private WatcherImpl(String dir) { |
| this.zkDir = dir; |
| } |
| |
| @Override |
| public void process(WatchedEvent event) { |
| // session events are not change events, and do not remove the watcher |
| if (Event.EventType.None.equals(event.getType())) { |
| return; |
| } |
| |
| Stat stat = null; |
| try { |
| stat = zkClient.exists(zkDir, null, true); |
| } catch (KeeperException e) { |
| // ignore , it is not a big deal |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| |
| boolean resetWatcher = false; |
| try { |
| resetWatcher = fireEventListeners(zkDir); |
| } finally { |
| if (Event.EventType.None.equals(event.getType())) { |
| log.debug("A node got unwatched for {}", zkDir); |
| } else { |
| if (resetWatcher) setConfWatcher(zkDir, this, stat); |
| else log.debug("A node got unwatched for {}", zkDir); |
| } |
| } |
| } |
| } |
| |
| private boolean fireEventListeners(String zkDir) { |
| if (isClosed || cc.isShutDown()) { |
| return false; |
| } |
| synchronized (confDirectoryListeners) { |
| // if this is not among directories to be watched then don't set the watcher anymore |
| if (!confDirectoryListeners.containsKey(zkDir)) { |
| log.debug("Watcher on {} is removed ", zkDir); |
| return false; |
| } |
| final Set<Runnable> listeners = confDirectoryListeners.get(zkDir); |
| if (listeners != null && !listeners.isEmpty()) { |
| final Set<Runnable> listenersCopy = new HashSet<>(listeners); |
| // run these in a separate thread because this can be long running |
| Runnable work = |
| () -> { |
| log.debug("Running listeners for {}", zkDir); |
| for (final Runnable listener : listenersCopy) { |
| try { |
| listener.run(); |
| } catch (RuntimeException e) { |
| log.warn("listener throws error", e); |
| } |
| } |
| }; |
| cc.getCoreZkRegisterExecutorService().submit(work); |
| } |
| } |
| return true; |
| } |
| |
| private void setConfWatcher(String zkDir, Watcher watcher, Stat stat) { |
| try { |
| Stat newStat = zkClient.exists(zkDir, watcher, true); |
| if (stat != null && newStat.getVersion() > stat.getVersion()) { |
| // a race condition where a we missed an event fired |
| // so fire the event listeners |
| fireEventListeners(zkDir); |
| } |
| } catch (KeeperException e) { |
| log.error("failed to set watcher for conf dir {} ", zkDir); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| log.error("failed to set watcher for conf dir {} ", zkDir); |
| } |
| } |
| |
| public OnReconnect getConfigDirListener() { |
| return () -> { |
| synchronized (confDirectoryListeners) { |
| for (String s : confDirectoryListeners.keySet()) { |
| setConfWatcher(s, new WatcherImpl(s), null); |
| fireEventListeners(s); |
| } |
| } |
| }; |
| } |
| |
| /** |
| * @lucene.internal |
| */ |
| class UnloadCoreOnDeletedWatcher implements DocCollectionWatcher { |
| String coreNodeName; |
| String shard; |
| String coreName; |
| |
| public UnloadCoreOnDeletedWatcher(String coreNodeName, String shard, String coreName) { |
| this.coreNodeName = coreNodeName; |
| this.shard = shard; |
| this.coreName = coreName; |
| } |
| |
| @Override |
| // synchronized due to SOLR-11535 |
| // TODO: can we remove `synchronized`, now that SOLR-11535 is fixed? |
| public synchronized boolean onStateChanged(DocCollection collectionState) { |
| if (getCoreContainer().getCoreDescriptor(coreName) == null) return true; |
| |
| boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null; |
| if (replicaRemoved) { |
| try { |
| log.info("Replica {} removed from clusterstate, remove it.", coreName); |
| getCoreContainer().unload(coreName, true, true, true); |
| } catch (SolrException e) { |
| if (!e.getMessage().contains("Cannot unload non-existent core")) { |
| // no need to log if the core was already unloaded |
| log.warn("Failed to unregister core:{}", coreName, e); |
| } |
| } catch (Exception e) { |
| log.warn("Failed to unregister core:{}", coreName, e); |
| } |
| } |
| return replicaRemoved; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) return true; |
| if (!(o instanceof UnloadCoreOnDeletedWatcher)) return false; |
| UnloadCoreOnDeletedWatcher that = (UnloadCoreOnDeletedWatcher) o; |
| return Objects.equals(coreNodeName, that.coreNodeName) |
| && Objects.equals(shard, that.shard) |
| && Objects.equals(coreName, that.coreName); |
| } |
| |
| @Override |
| public int hashCode() { |
| |
| return Objects.hash(coreNodeName, shard, coreName); |
| } |
| } |
| |
| /** Thrown during pre register process if the replica is not present in clusterstate */ |
| public static class NotInClusterStateException extends SolrException { |
| public NotInClusterStateException(ErrorCode code, String msg) { |
| super(code, msg); |
| } |
| } |
| |
| public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) { |
| DocCollection collection = |
| zkStateReader.getClusterState().getCollectionOrNull(dcore.getCollectionName()); |
| if (collection != null) { |
| Collection<Slice> slices = collection.getSlices(); |
| |
| for (Slice slice : slices) { |
| Replica r = slice.getReplica(dcore.getCloudDescriptor().getCoreNodeName()); |
| if (r != null) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Best effort to set DOWN state for all replicas on node. |
| * |
| * @param nodeName to operate on |
| */ |
| public void publishNodeAsDown(String nodeName) { |
| log.info("Publish node={} as DOWN", nodeName); |
| if (distributedClusterStateUpdater.isDistributedStateUpdate()) { |
| // Note that with the current implementation, when distributed cluster state updates are |
| // enabled, we mark the node down synchronously from this thread, whereas the Overseer cluster |
| // state update frees this thread right away and the Overseer will async mark the node down |
| // but updating all affected collections. If this is an issue (i.e. takes too long), then the |
| // call below should be executed from another thread so that the calling thread can |
| // immediately return. |
| distributedClusterStateUpdater.executeNodeDownStateUpdate(nodeName, zkStateReader); |
| } else { |
| try { |
| // Create a concurrently accessible set to avoid repeating collections |
| Set<String> processedCollections = new HashSet<>(); |
| for (CoreDescriptor cd : cc.getCoreDescriptors()) { |
| String collName = cd.getCollectionName(); |
| DocCollection coll; |
| if (collName != null |
| && processedCollections.add(collName) |
| && (coll = zkStateReader.getCollection(collName)) != null |
| && coll.isPerReplicaState()) { |
| final List<String> replicasToDown = new ArrayList<>(coll.getSlicesMap().size()); |
| coll.forEachReplica( |
| (s, replica) -> { |
| if (replica.getNodeName().equals(nodeName)) { |
| replicasToDown.add(replica.getName()); |
| } |
| }); |
| PerReplicaStatesOps.downReplicas( |
| replicasToDown, |
| PerReplicaStatesOps.fetch( |
| coll.getZNode(), zkClient, coll.getPerReplicaStates())) |
| .persist(coll.getZNode(), zkClient); |
| } |
| } |
| |
| // We always send a down node event to overseer to be safe, but overseer will not need to do |
| // anything for PRS collections |
| overseer |
| .getStateUpdateQueue() |
| .offer( |
| m -> |
| m.put(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower()) |
| .put(ZkStateReader.NODE_NAME_PROP, nodeName)); |
| } catch (AlreadyClosedException e) { |
| log.info( |
| "Not publishing node as DOWN because a resource required to do so is already closed."); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| log.debug("Publish node as down was interrupted."); |
| } catch (KeeperException e) { |
| log.warn("Could not publish node as down: ", e); |
| } |
| } |
| } |
| |
| /** |
| * Ensures that a searcher is registered for the given core and if not, waits until one is |
| * registered |
| */ |
| private static void ensureRegisteredSearcher(SolrCore core) throws InterruptedException { |
| if (!core.getSolrConfig().useColdSearcher) { |
| RefCounted<SolrIndexSearcher> registeredSearcher = core.getRegisteredSearcher(); |
| if (registeredSearcher != null) { |
| if (log.isDebugEnabled()) { |
| log.debug("Found a registered searcher: {} for core: {}", registeredSearcher.get(), core); |
| } |
| registeredSearcher.decref(); |
| } else { |
| @SuppressWarnings("unchecked") |
| Future<Void>[] waitSearcher = (Future<Void>[]) Array.newInstance(Future.class, 1); |
| if (log.isInfoEnabled()) { |
| log.info( |
| "No registered searcher found for core: {}, waiting until a searcher is registered before publishing as active", |
| core.getName()); |
| } |
| final RTimer timer = new RTimer(); |
| RefCounted<SolrIndexSearcher> searcher = null; |
| try { |
| searcher = core.getSearcher(false, true, waitSearcher, true); |
| boolean success = true; |
| if (waitSearcher[0] != null) { |
| if (log.isDebugEnabled()) { |
| log.debug( |
| "Waiting for first searcher of core {}, id: {} to be registered", |
| core.getName(), |
| core); |
| } |
| try { |
| waitSearcher[0].get(); |
| } catch (ExecutionException e) { |
| log.warn( |
| "Wait for a searcher to be registered for core {}, id: {} failed due to: {}", |
| core.getName(), |
| core, |
| e, |
| e); |
| success = false; |
| } |
| } |
| if (success) { |
| if (searcher == null) { |
| // should never happen |
| if (log.isDebugEnabled()) { |
| log.debug( |
| "Did not find a searcher even after the future callback for core: {}, id: {}!!!", |
| core.getName(), |
| core); |
| } |
| } else { |
| if (log.isInfoEnabled()) { |
| log.info( |
| "Found a registered searcher: {}, took: {} ms for core: {}, id: {}", |
| searcher.get(), |
| timer.getTime(), |
| core.getName(), |
| core); |
| } |
| } |
| } |
| } finally { |
| if (searcher != null) { |
| searcher.decref(); |
| } |
| } |
| } |
| RefCounted<SolrIndexSearcher> newestSearcher = core.getNewestSearcher(false); |
| if (newestSearcher != null) { |
| if (log.isDebugEnabled()) { |
| log.debug( |
| "Found newest searcher: {} for core: {}, id: {}", |
| newestSearcher.get(), |
| core.getName(), |
| core); |
| } |
| newestSearcher.decref(); |
| } |
| } |
| } |
| } |