blob: b74878450fa45f60038fdbf251e434bcbe8977d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.cloud.DistributedLock;
import org.apache.solr.client.solrj.cloud.LockListener;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DefaultZkACLProvider;
import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.NodesSysPropsCacher;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Replica.Type;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkACLProvider;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkCredentialsProvider;
import org.apache.solr.common.cloud.ZkMaintenanceUtils;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.CloseTracker;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.URLUtil;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.admin.ConfigSetsHandlerApi;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.packagemanager.PackageUtils;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.servlet.SolrLifcycleListener;
import org.apache.solr.update.UpdateLog;
import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTIONS_ZKNODE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.packagemanager.PackageUtils.getMapper;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.URL;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
/**
* Handle ZooKeeper interactions.
* <p>
* notes: loads everything on init, creates what's not there - further updates
* are prompted with Watches.
* <p>
* TODO: exceptions during close on attempts to update cloud state
*/
public class ZkController implements Closeable, Runnable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String CLUSTER_SHUTDOWN = "/cluster/shutdown";
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private final ZkACLProvider zkACLProvider;
private boolean closeZkClient = false;
private volatile StatePublisher statePublisher;
private volatile ZkDistributedQueue overseerJobQueue;
private volatile OverseerTaskQueue overseerCollectionQueue;
private volatile OverseerTaskQueue overseerConfigSetQueue;
private volatile DistributedMap overseerRunningMap;
private volatile DistributedMap overseerCompletedMap;
private volatile DistributedMap overseerFailureMap;
private volatile DistributedMap asyncIdsMap;
public final static String COLLECTION_PARAM_PREFIX = "collection.";
public final static String CONFIGNAME_PROP = "configName";
private boolean isShutdownCalled;
private volatile boolean dcCalled;
private volatile boolean started;
@Override
public void run() {
disconnect(true);
log.info("Continuing to Solr shutdown");
}
public boolean isDcCalled() {
return dcCalled;
}
public LeaderElector removeShardLeaderElector(String name) {
LeaderElector elector = leaderElectors.remove(name);
IOUtils.closeQuietly(elector);
return elector;
}
public LeaderElector getLeaderElector(String name) {
return leaderElectors.get(name);
}
static class ContextKey {
private final String collection;
private final String coreNodeName;
public ContextKey(String collection, String coreNodeName) {
this.collection = collection;
this.coreNodeName = coreNodeName;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((collection == null) ? 0 : collection.hashCode());
result = prime * result
+ ((coreNodeName == null) ? 0 : coreNodeName.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
ContextKey other = (ContextKey) obj;
if (collection == null) {
if (other.collection != null) return false;
} else if (!collection.equals(other.collection)) return false;
if (coreNodeName == null) {
return other.coreNodeName == null;
} else return coreNodeName.equals(other.coreNodeName);
}
}
private static final byte[] emptyJson = Utils.toJSON(Collections.emptyMap());
private final Map<String, LeaderElector> leaderElectors = new ConcurrentHashMap<>(16);
// private final Map<ContextKey, ElectionContext> electionContexts = new ConcurrentHashMap<>(16) {
// @Override
// public ElectionContext put(ContextKey key, ElectionContext value) {
// if (ZkController.this.isClosed || cc.isShutDown()) {
// throw new AlreadyClosedException();
// }
// return super.put(key, value);
// }
// };
// private final Map<ContextKey, ElectionContext> overseerContexts = new ConcurrentHashMap<>() {
// @Override
// public ElectionContext put(ContextKey key, ElectionContext value) {
// if (ZkController.this.isClosed || cc.isShutDown()) {
// throw new AlreadyClosedException();
// }
// return super.put(key, value);
// }
// };
private final SolrZkClient zkClient;
public volatile ZkStateReader zkStateReader;
private volatile SolrCloudManager cloudManager;
private volatile CloudHttp2SolrClient cloudSolrClient;
private final String zkServerAddress; // example: 127.0.0.1:54062/solr
private final int localHostPort; // example: 54065
private final String hostName; // example: 127.0.0.1
private final String nodeName; // example: 127.0.0.1:54065_solr
private volatile String baseURL; // example: http://127.0.0.1:54065/solr
private final CloudConfig cloudConfig;
private volatile NodesSysPropsCacher sysPropsCacher;
protected volatile LeaderElector overseerElector;
private final Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>(16, 0.75f, 16);
private final Map<String, ZkCollectionTerms> collectionToTerms = new ConcurrentHashMap<>(16, 0.75f, 16);
//private final ReentrantLock collectionToTermsLock = new ReentrantLock(true);
// for now, this can be null in tests, in which case recovery will be inactive, and other features
// may accept defaults or use mocks rather than pulling things from a CoreContainer
private final CoreContainer cc;
protected volatile Overseer overseer;
private final int leaderVoteWait;
private final int leaderConflictResolveWait;
private final int clientTimeout;
private volatile boolean isClosed;
private final Object initLock = new Object();
private final ConcurrentHashMap<String, Throwable> replicasMetTragicEvent = new ConcurrentHashMap<>(16, 0.75f, 1);
@Deprecated
// keeps track of replicas that have been asked to recover by leaders running on this node
private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<>();
// keeps track of a list of objects that need to know a new ZooKeeper session was created after expiration occurred
// ref is held as a HashSet since we clone the set before notifying to avoid synchronizing too long
private final Set<OnReconnect> reconnectListeners = ConcurrentHashMap.newKeySet();
private static class MyLockListener implements LockListener {
private final CountDownLatch lockWaitLatch;
public MyLockListener(CountDownLatch lockWaitLatch) {
this.lockWaitLatch = lockWaitLatch;
}
@Override
public void lockAcquired() {
lockWaitLatch.countDown();
}
@Override
public void lockReleased() {
}
}
public static class RegisterCoreAsync implements Callable<Object> {
private final ZkController zkController;
final CoreDescriptor descriptor;
final boolean afterExpiration;
public RegisterCoreAsync(ZkController zkController, CoreDescriptor descriptor, boolean afterExpiration) {
this.descriptor = descriptor;
this.afterExpiration = afterExpiration;
this.zkController = zkController;
}
public Object call() throws Exception {
MDCLoggingContext.setCoreName(descriptor.getName());
try {
log.info("Registering core with ZK {} afterExpiration? {}", descriptor.getName(), afterExpiration);
if (zkController.isDcCalled() || zkController.getCoreContainer().isShutDown() || (afterExpiration && !descriptor.getCloudDescriptor().hasRegistered())) {
return null;
}
if (zkController.cc.getAllCoreNames().contains(descriptor.getName())) {
try {
zkController.register(descriptor.getName(), descriptor, afterExpiration);
} catch (AlreadyClosedException e) {
log.warn("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration, e);
} catch (Exception e) {
log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration, e);
}
}
return descriptor;
} finally {
MDCLoggingContext.clear();
}
}
}
// notifies registered listeners after the ZK reconnect in the background
private static class OnReconnectNotifyAsync implements Callable<Object> {
private final OnReconnect listener;
OnReconnectNotifyAsync(OnReconnect listener) {
this.listener = listener;
}
@Override
public Object call() throws Exception {
listener.command();
return null;
}
}
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig) throws InterruptedException, IOException, TimeoutException {
this(cc, new SolrZkClient(), cloudConfig);
this.closeZkClient = true;
}
/**
* @param cc Core container associated with this controller. cannot be null.
* @param cloudConfig configuration for this controller. TODO: possibly redundant with CoreContainer
*/
public ZkController(final CoreContainer cc, SolrZkClient zkClient, CloudConfig cloudConfig)
throws InterruptedException, TimeoutException, IOException {
assert new CloseTracker() != null;
if (cc == null) log.error("null corecontainer");
if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
try {
this.cc = cc;
this.cloudConfig = cloudConfig;
this.zkClient = zkClient;
// be forgiving and strip this off leading/trailing slashes
// this allows us to support users specifying hostContext="/" in
// solr.xml to indicate the root context, instead of hostContext=""
// which means the default of "solr"
String localHostContext = trimLeadingAndTrailingSlashes(cloudConfig.getSolrHostContext());
this.zkServerAddress = zkClient.getZkServerAddress();
this.localHostPort = cloudConfig.getSolrHostPort();
if (log.isDebugEnabled()) log.debug("normalize hostname {}", cloudConfig.getHost());
this.hostName = normalizeHostName(cloudConfig.getHost());
if (log.isDebugEnabled()) log.debug("generate node name");
this.nodeName = generateNodeName(this.hostName, Integer.toString(this.localHostPort), localHostContext);
log.info("node name={}", nodeName);
MDCLoggingContext.setNode(nodeName);
this.leaderVoteWait = cloudConfig.getLeaderVoteWait();
this.leaderConflictResolveWait = cloudConfig.getLeaderConflictResolveWait();
this.clientTimeout = cloudConfig.getZkClientTimeout();
String zkACLProviderClass = cloudConfig.getZkACLProviderClass();
if (zkACLProviderClass != null && zkACLProviderClass.trim().length() > 0) {
zkACLProvider = cc.getResourceLoader().newInstance(zkACLProviderClass, ZkACLProvider.class);
} else {
zkACLProvider = new DefaultZkACLProvider();
}
} catch (Exception e) {
ParWork.propagateInterrupt(e);
log.error("Exception during ZkController init", e);
throw e;
}
assert ObjectReleaseTracker.track(this);
}
public void start() {
if (started) throw new IllegalStateException("Already started");
started = true;
try {
if (zkClient.exists( ZkStateReader.LIVE_NODES_ZKNODE + "/" + getNodeName())) {
removeEphemeralLiveNode();
}
} catch (Exception e) {
ParWork.propagateInterrupt("Error Removing ephemeral live node. Continuing to close CoreContainer", e);
}
boolean isRegistered = SolrLifcycleListener.isRegistered(this);
if (!isRegistered) {
SolrLifcycleListener.registerShutdown(this);
}
String zkCredentialsProviderClass = cloudConfig.getZkCredentialsProviderClass();
if (zkCredentialsProviderClass != null && zkCredentialsProviderClass.trim().length() > 0) {
zkClient.getConnectionManager().setZkCredentialsToAddAutomatically(cc.getResourceLoader().newInstance(zkCredentialsProviderClass, ZkCredentialsProvider.class));
} else {
zkClient.getConnectionManager().setZkCredentialsToAddAutomatically(new DefaultZkCredentialsProvider());
}
addOnReconnectListener(getConfigDirListener());
zkClient.setAclProvider(zkACLProvider);
zkClient.getConnectionManager().setOnReconnect(new OnReconnect() {
@Override
public void command() {
synchronized (initLock) {
if (cc.isShutDown() || isClosed() || isShutdownCalled) {
log.info("skipping zk reconnect logic due to shutdown");
return;
}
ParWork.getRootSharedExecutor().submit(() -> {
log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
try {
removeEphemeralLiveNode();
statePublisher.clearStatCache();
// recreate our watchers first so that they exist even on any problems below
zkStateReader.createClusterStateWatchersAndUpdate();
// this is troublesome - we dont want to kill anything the old
// leader accepted
// though I guess sync will likely get those updates back? But
// only if
// he is involved in the sync, and he certainly may not be
// ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor());
// we need to create all of our lost watches
// seems we dont need to do this again...
// Overseer.createClientNodes(zkClient, getNodeName());
// start the overseer first as following code may need it's processing
overseerElector.retryElection(false);
List<CoreDescriptor> descriptors = getCoreContainer().getCoreDescriptors();
// re register all descriptors
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
// TODO: we need to think carefully about what happens when it
// was
// a leader that was expired - as well as what to do about
// leaders/overseers
// with connection loss
try {
// unload solrcores that have been 'failed over'
// throwErrorIfReplicaReplaced(descriptor);
ParWork.getRootSharedExecutor().submit(new RegisterCoreAsync(ZkController.this, descriptor, true));
} catch (Exception e) {
SolrException.log(log, "Error registering SolrCore", e);
}
}
}
// notify any other objects that need to know when the session was re-connected
// the OnReconnect operation can be expensive per listener, so do that async in the background
try (ParWork work = new ParWork(this, true, false)) {
reconnectListeners.forEach(listener -> {
try {
work.collect(new OnReconnectNotifyAsync(listener));
} catch (Exception exc) {
// not much we can do here other than warn in the log
log.warn("Error when notifying OnReconnect listener {} after session re-connected.", listener, exc);
}
});
}
createEphemeralLiveNode();
} catch (AlreadyClosedException e) {
log.info("Already closed");
return;
} catch (Exception e) {
SolrException.log(log, "", e);
}
});
}
}
@Override
public String getName() {
return "ZkController";
}
});
zkClient.setDisconnectListener(() -> {
try (ParWork worker = new ParWork("disconnected", true, false)) {
worker.collect(ZkController.this.overseerElector);
worker.collect(ZkController.this.overseer);
worker.collect(leaderElectors.values());
leaderElectors.clear();
// I don't think so...
// worker.collect("clearZkCollectionTerms", () -> {
// clearZkCollectionTerms();
// });
}
});
init();
}
private ElectionContext getOverseerContext() {
return new OverseerElectionContext(getNodeName(), zkClient, overseer);
}
public int getLeaderVoteWait() {
return leaderVoteWait;
}
public int getLeaderConflictResolveWait() {
return leaderConflictResolveWait;
}
public NodesSysPropsCacher getSysPropsCacher() {
return sysPropsCacher;
}
public void disconnect(boolean publishDown) {
log.info("disconnect");
this.dcCalled = true;
try {
removeEphemeralLiveNode();
} catch (Exception e) {
ParWork.propagateInterrupt("Error Removing ephemeral live node. Continuing to close CoreContainer", e);
}
try (ParWork closer = new ParWork(this, true, false)) {
closer.collect("replicateFromLeaders", replicateFromLeaders);
closer.collect(leaderElectors);
}
if (publishDown) {
try {
publishNodeAs(getNodeName(), OverseerAction.DOWNNODE);
} catch (Exception e) {
log.warn("Problem publish node as DOWN", e);
}
}
}
/**
* Closes the underlying ZooKeeper client.
*/
public void close() {
if (log.isDebugEnabled()) log.debug("Closing ZkController");
//assert closeTracker.close();
this.isShutdownCalled = true;
this.isClosed = true;
try (ParWork closer = new ParWork(this, true, false)) {
closer.collect(leaderElectors);
closer.collect(sysPropsCacher);
closer.collect(cloudManager);
closer.collect(cloudSolrClient);
collectionToTerms.forEach((s, zkCollectionTerms) -> closer.collect(zkCollectionTerms));
} finally {
if (statePublisher != null) {
statePublisher.submitState(StatePublisher.TERMINATE_OP);
}
IOUtils.closeQuietly(statePublisher);
IOUtils.closeQuietly(overseerElector);
if (overseer != null) {
try {
overseer.closeAndDone();
} catch (Exception e) {
log.warn("Exception closing Overseer", e);
}
}
if (zkStateReader != null) {
zkStateReader.disableCloseLock();
}
IOUtils.closeQuietly(zkStateReader);
if (closeZkClient && zkClient != null) {
zkClient.disableCloseLock();
IOUtils.closeQuietly(zkClient);
}
SolrLifcycleListener.removeShutdown(this);
assert ObjectReleaseTracker.release(this);
}
}
/**
* Best effort to give up the leadership of a shard in a core after hitting a tragic exception
* @param cd The current core descriptor
* @param tragicException The tragic exception from the {@code IndexWriter}
*/
public void giveupLeadership(CoreDescriptor cd, Throwable tragicException) {
assert tragicException != null;
assert cd != null;
DocCollection dc = getClusterState().getCollectionOrNull(cd.getCollectionName());
if (dc == null) return;
Slice shard = dc.getSlice(cd.getCloudDescriptor().getShardId());
if (shard == null) return;
// if this replica is not a leader, it will be put in recovery state by the leader
if (shard.getReplica(cd.getName()) != shard.getLeader()) return;
int numActiveReplicas = shard.getReplicas(
rep -> rep.getState() == Replica.State.ACTIVE
&& rep.getType() != Type.PULL
&& getZkStateReader().getLiveNodes().contains(rep.getNodeName())
).size();
// at least the leader still be able to search, we should give up leadership if other replicas can take over
if (numActiveReplicas >= 2) {
String key = cd.getCollectionName() + ":" + cd.getName();
//TODO better handling the case when delete replica was failed
if (replicasMetTragicEvent.putIfAbsent(key, tragicException) == null) {
log.warn("Leader {} met tragic exception, give up its leadership", key, tragicException);
try {
// by using Overseer to remove and add replica back, we can do the task in an async/robust manner
Map<String,Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, "deletereplica");
props.put(COLLECTION_PROP, cd.getCollectionName());
props.put(SHARD_ID_PROP, shard.getName());
props.put(REPLICA_PROP, cd.getName());
getOverseerCollectionQueue().offer(Utils.toJSON(new ZkNodeProps(props)), false);
props.clear();
props.put(Overseer.QUEUE_OPERATION, "addreplica");
props.put(COLLECTION_PROP, cd.getCollectionName());
props.put(SHARD_ID_PROP, shard.getName());
props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().name().toUpperCase(Locale.ROOT));
props.put(CoreAdminParams.NODE, getNodeName());
getOverseerCollectionQueue().offer(Utils.toJSON(new ZkNodeProps(props)), false);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
// Exceptions are not bubbled up. giveupLeadership is best effort, and is only called in case of some other
// unrecoverable error happened
log.error("Met exception on give up leadership for {}", key, e);
replicasMetTragicEvent.remove(key);
}
}
}
}
/**
* Returns true if config file exists
*/
public boolean configFileExists(String collection, String fileName)
throws KeeperException, InterruptedException {
Stat stat = zkClient.exists(ZkConfigManager.CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null);
return stat != null;
}
/**
* @return information about the cluster from ZooKeeper
*/
public ClusterState getClusterState() {
return zkStateReader.getClusterState();
}
public SolrCloudManager getSolrCloudManager() {
if (cloudManager != null) {
return cloudManager;
}
synchronized(this) {
if (cloudManager != null) {
return cloudManager;
}
cloudSolrClient = new CloudHttp2SolrClient.Builder(zkStateReader)
.withHttpClient(cc.getUpdateShardHandler().getTheSharedHttpClient())
.build();
cloudSolrClient.connect();
cloudManager = new SolrClientCloudManager(
new ZkDistributedQueueFactory(zkClient),
cloudSolrClient,
cc.getObjectCache(), cc.getUpdateShardHandler().getTheSharedHttpClient());
cloudManager.getClusterStateProvider().connect();
}
return cloudManager;
}
public CloudHttp2SolrClient getCloudSolrClient() {
return cloudSolrClient;
}
// normalize host removing any url scheme.
// input can be null, host, or url_prefix://host
public static String normalizeHostName(String host) {
if (host == null || host.length() == 0) {
String hostaddress;
try {
hostaddress = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
hostaddress = "127.0.0.1"; // cannot resolve system hostname, fall through
}
// Re-get the IP again for "127.0.0.1", the other case we trust the hosts
// file is right.
if ("127.0.0.1".equals(hostaddress)) {
Enumeration<NetworkInterface> netInterfaces = null;
try {
netInterfaces = NetworkInterface.getNetworkInterfaces();
while (netInterfaces.hasMoreElements()) {
NetworkInterface ni = netInterfaces.nextElement();
Enumeration<InetAddress> ips = ni.getInetAddresses();
while (ips.hasMoreElements()) {
InetAddress ip = ips.nextElement();
if (ip.isSiteLocalAddress()) {
hostaddress = ip.getHostAddress();
}
}
}
} catch (Exception e) {
ParWork.propagateInterrupt(e);
SolrException.log(log,
"Error while looking for a better host name than 127.0.0.1", e);
}
}
host = hostaddress;
} else {
if (log.isDebugEnabled()) log.debug("remove host scheme");
if (URLUtil.hasScheme(host)) {
host = URLUtil.removeScheme(host);
}
}
if (log.isDebugEnabled()) log.debug("return host {}", host);
return host;
}
public String getHostName() {
return hostName;
}
public int getHostPort() {
return localHostPort;
}
public SolrZkClient getZkClient() {
return zkClient;
}
/**
* @return zookeeper server address
*/
public String getZkServerAddress() {
return zkServerAddress;
}
boolean isClosed() {
return isClosed;
}
boolean isShutdownCalled() {
return isShutdownCalled;
}
/**
* Create the zknodes necessary for a cluster to operate
*
* @param zkClient a SolrZkClient
* @throws KeeperException if there is a Zookeeper error
* @throws InterruptedException on interrupt
*/
public static void createClusterZkNodes(SolrZkClient zkClient)
throws KeeperException, InterruptedException, IOException {
log.info("Creating cluster zk nodes");
// we want to have a full zk layout at the start
// this is especially important so that we don't miss creating
// any watchers with ZkStateReader on startup
Map<String,byte[]> paths = new HashMap<>(45);
paths.put(ZkStateReader.LIVE_NODES_ZKNODE, null);
paths.put(ZkStateReader.CONFIGS_ZKNODE, null);
paths.put(ZkStateReader.ALIASES, emptyJson);
paths.put("/overseer", null);
paths.put(Overseer.OVERSEER_ELECT, null);
paths.put(Overseer.OVERSEER_ELECT + LeaderElector.ELECTION_NODE, null);
paths.put(Overseer.OVERSEER_QUEUE, null);
paths.put(Overseer.OVERSEER_COLLECTION_QUEUE_WORK, null);
paths.put(Overseer.OVERSEER_COLLECTION_MAP_RUNNING, null);
paths.put(Overseer.OVERSEER_COLLECTION_MAP_COMPLETED, null);
paths.put(Overseer.OVERSEER_COLLECTION_MAP_FAILURE, null);
paths.put(Overseer.OVERSEER_ASYNC_IDS, null);
//
// operations.add(zkClient.createPathOp(ZkStateReader.CLUSTER_PROPS, emptyJson));
paths.put(ZkStateReader.SOLR_PKGS_PATH, getMapper().writeValueAsString(Collections.emptyMap()).getBytes("UTF-8"));
paths.put(PackageUtils.REPOSITORIES_ZK_PATH, "[]".getBytes(StandardCharsets.UTF_8));
paths.put(ZkStateReader.ROLES, emptyJson);
paths.put("/clusterstate.json", emptyJson);
paths.put(COLLECTIONS_ZKNODE, null);
//
//
// // we create the collection znode last to indicate succesful cluster init
// operations.add(zkClient.createPathOp(ZkStateReader.COLLECTIONS_ZKNODE));
zkClient.mkdirs(paths);
//
try {
zkClient.mkDirs(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson);
} catch (KeeperException.NodeExistsException e) {
// okay, can be prepopulated
}
try {
zkClient.mkDirs(ZkStateReader.CLUSTER_PROPS, emptyJson);
} catch (KeeperException.NodeExistsException e) {
// okay, can be prepopulated
}
if (!Boolean.getBoolean("solr.suppressDefaultConfigBootstrap")) {
bootstrapDefaultConfigSet(zkClient);
} else {
log.info("Supressing upload of default config set");
}
if (log.isDebugEnabled()) log.debug("Creating final {} node", "/cluster/init");
zkClient.mkdir( "/cluster/init");
}
private static void bootstrapDefaultConfigSet(SolrZkClient zkClient) throws KeeperException, InterruptedException, IOException {
if (!zkClient.exists("/configs/_default")) {
String configDirPath = getDefaultConfigDirPath();
if (configDirPath == null) {
log.warn("The _default configset could not be uploaded. Please provide 'solr.default.confdir' parameter that points to a configset {} {}"
, "intended to be the default. Current 'solr.default.confdir' value:"
, System.getProperty(SolrDispatchFilter.SOLR_DEFAULT_CONFDIR_ATTRIBUTE));
} else {
ZkMaintenanceUtils.upConfig(zkClient, Paths.get(configDirPath), ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME);
}
}
}
/**
* Gets the absolute filesystem path of the _default configset to bootstrap from.
* First tries the sysprop "solr.default.confdir". If not found, tries to find
* the _default dir relative to the sysprop "solr.install.dir".
* Returns null if not found anywhere.
*
* @lucene.internal
* @see SolrDispatchFilter#SOLR_DEFAULT_CONFDIR_ATTRIBUTE
*/
public static String getDefaultConfigDirPath() {
String configDirPath = null;
String serverSubPath = "solr" + File.separator +
"configsets" + File.separator + "_default" +
File.separator + "conf";
String subPath = File.separator + "server" + File.separator + serverSubPath;
String defaultConfigSet = System.getProperty(SolrDispatchFilter.SOLR_DEFAULT_CONFDIR_ATTRIBUTE);
log.info("{} set to {}", SolrDispatchFilter.SOLR_DEFAULT_CONFDIR_ATTRIBUTE, defaultConfigSet);
if (defaultConfigSet != null) {
configDirPath = new File(defaultConfigSet).getAbsolutePath();
} else if (defaultConfigSet != null &&
new File(System.getProperty(defaultConfigSet + subPath)).exists()) {
configDirPath = new File(defaultConfigSet + subPath).getAbsolutePath();
}
return configDirPath;
}
private void init() {
// MRM TODO:
// Runtime.getRuntime().addShutdownHook(new Thread() {
// public void run() {
// shutdown();
// ParWork.close(ParWork.getExecutor());
// }
//
// });
// synchronized (initLock) {
if (log.isDebugEnabled()) log.debug("making shutdown watcher for cluster");
try {
zkClient.exists(CLUSTER_SHUTDOWN, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.EventType.None.equals(event.getType())) {
return;
}
log.info("Got event for shutdown {}", event);
if (event.getType().equals(Event.EventType.NodeCreated)) {
log.info("Shutdown zk node created, shutting down");
shutdown();
} else {
log.info("Remaking shutdown watcher");
Stat stat;
try {
stat = zkClient.exists(CLUSTER_SHUTDOWN, this);
} catch (KeeperException | InterruptedException e) {
SolrException.log(log, e);
return;
}
if (stat != null) {
log.info("Got shutdown even while remaking watcher, shutting down");
shutdown();
}
}
}
});
} catch (KeeperException e) {
log.error("Zk Exception", e);
return;
} catch (InterruptedException e) {
log.info("interrupted");
return;
}
try {
zkClient.mkdirs("/cluster/cluster_lock");
} catch (KeeperException.NodeExistsException e) {
// okay
} catch (KeeperException e) {
log.error("Zk Exception", e);
return;
}
boolean createdClusterNodes = false;
try {
CountDownLatch lockWaitLatch = new CountDownLatch(1);
boolean create = false;
DistributedLock lock = new DistributedLock(zkClient, "/cluster/cluster_lock", zkClient.getZkACLProvider().getACLsToAdd("/cluster/cluster_lock"), new MyLockListener(lockWaitLatch));
try {
// if (log.isDebugEnabled()) log.debug("get cluster lock");
if (lock.lock()) {
create = true;
}
if (create) {
if (log.isDebugEnabled()) log.debug("got cluster lock");
// CountDownLatch latch = new CountDownLatch(1);
// zkClient.getSolrZooKeeper().sync("/cluster/init", (rc, path, ctx) -> {
// latch.countDown();
// }, new Object());
// boolean success = latch.await(10, TimeUnit.SECONDS);
// if (!success) {
// throw new SolrException(ErrorCode.SERVER_ERROR, "Timeout calling sync on collection zknode");
// }
if (!zkClient.exists("/cluster/init")) {
try {
createClusterZkNodes(zkClient);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
log.error("Failed creating initial zk layout", e);
return;
}
createdClusterNodes = true;
} else {
//if (log.isDebugEnabled()) log.debug("Cluster zk nodes already exist");
//int currentLiveNodes = zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null, true).size();
//if (log.isDebugEnabled()) log.debug("Current live nodes {}", currentLiveNodes);
// if (currentLiveNodes == 0) {
// log.info("Delete Overseer queues");
// // cluster is in a startup state, clear zk queues
// List<String> pathsToDelete = Arrays.asList(new String[]{Overseer.OVERSEER_QUEUE, Overseer.OVERSEER_QUEUE_WORK,
// Overseer.OVERSEER_COLLECTION_QUEUE_WORK, Overseer.OVERSEER_COLLECTION_MAP_RUNNING,
// Overseer.OVERSEER_COLLECTION_MAP_COMPLETED, Overseer.OVERSEER_COLLECTION_MAP_FAILURE, Overseer.OVERSEER_ASYNC_IDS});
// CountDownLatch latch = new CountDownLatch(pathsToDelete.size());
// int[] code = new int[1];
// String[] path = new String[1];
// boolean[] failed = new boolean[1];
//
// for (String delPath : pathsToDelete) {
// zkClient.getSolrZooKeeper().delete(delPath, -1,
// (resultCode, zkpath, context) -> {
// code[0] = resultCode;
// if (resultCode != 0) {
// failed[0] = true;
// path[0] = "" + zkpath;
// }
//
// latch.countDown();
// }, "");
// }
// boolean success = false;
// log.info("Wait for delete Overseer queues");
// try {
// success = latch.await(15, TimeUnit.SECONDS);
// } catch (InterruptedException e) {
// ParWork.propegateInterrupt(e);
//
// throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
// }
//
// // MRM TODO:, still haackey, do fails right
// if (code[0] != 0) {
// System.out.println("fail code: "+ code[0]);
// KeeperException e = KeeperException.create(KeeperException.Code.get(code[0]), path[0]);
// if (e instanceof NoNodeException) {
// // okay
// } else {
// throw e;
// }
//
// }
//
// if (!success) {
// throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting for operations to complete");
// }
// }
}
}
} finally {
if (log.isDebugEnabled()) log.debug("release cluster lock");
lock.unlock();
lock.close();
}
if (!createdClusterNodes) {
// wait?
}
zkStateReader = new ZkStateReader(zkClient, () -> {
if (cc != null) cc.securityNodeChanged();
});
zkStateReader.enableCloseLock();
zkStateReader.setNode(nodeName);
zkStateReader.setLeaderChecker(name -> {
LeaderElector elector = leaderElectors.get(name);
if (elector != null && elector.isLeader()) {
return true;
}
return false;
});
zkStateReader.setCollectionRemovedListener(this::removeCollectionTerms);
this.baseURL = zkStateReader.getBaseUrlForNodeName(this.nodeName);
zkStateReader.createClusterStateWatchersAndUpdate();
this.overseer = new Overseer(cc.getUpdateShardHandler(), CommonParams.CORES_HANDLER_PATH, this, cloudConfig);
try {
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
this.asyncIdsMap = Overseer.getAsyncIdsMap(zkClient);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
this.overseerJobQueue = overseer.getStateUpdateQueue();
this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient);
this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient);
statePublisher = new StatePublisher(overseerJobQueue, zkStateReader, cc);
statePublisher.start();
this.sysPropsCacher = new NodesSysPropsCacher(getSolrCloudManager().getNodeStateProvider(), getNodeName(), zkStateReader);
overseerElector = new LeaderElector(this);
//try (ParWork worker = new ParWork(this, false, true)) {
// start the overseer first as following code may need it's processing
// worker.collect("startOverseer", () -> {
ElectionContext context = getOverseerContext();
if (log.isDebugEnabled()) log.debug("Overseer setting up context {}", context.leaderProps.getNodeName());
overseerElector.setup(context);
log.info("Overseer joining election {}", context.leaderProps.getNodeName());
overseerElector.joinElection(false);
publishNodeAs(getNodeName(), OverseerAction.RECOVERYNODE);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
// }
}
private synchronized void shutdown() {
if (this.isShutdownCalled) return;
this.isShutdownCalled = true;
log.info("Cluster shutdown initiated");
URL url;
try {
url = new URL(getHostName() + ":" + getHostPort() + "/shutdown?token=" + "solrrocks");
} catch (MalformedURLException e) {
SolrException.log(log, e);
return;
}
try {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.getResponseCode();
log.info("Shutting down " + url + ": " + connection.getResponseCode() + " " + connection.getResponseMessage());
} catch (SocketException e) {
SolrException.log(log, e);
// Okay - the server is not running
} catch (IOException e) {
SolrException.log(log, e);
return;
}
}
public void publishDownStates() throws KeeperException {
publishNodeAs(getNodeName(), OverseerAction.DOWNNODE);
}
/**
* Validates if the chroot exists in zk (or if it is successfully created).
* Optionally, if create is set to true this method will create the path in
* case it doesn't exist
*
* @return true if the path exists or is created false if the path doesn't
* exist and 'create' = false
*/
public static boolean checkChrootPath(String zkHost, boolean create) // MRM TODO:
throws KeeperException, InterruptedException {
return true;
// if (!SolrZkClient.containsChroot(zkHost)) {
// return true;
// }
// log.trace("zkHost includes chroot");
// String chrootPath = zkHost.substring(zkHost.indexOf("/"), zkHost.length());
//
// SolrZkClient tmpClient = new SolrZkClient(zkHost.substring(0,
// zkHost.indexOf("/")), 60000, 30000, null, null, null);
// boolean exists = tmpClient.exists(chrootPath);
// if (!exists && create) {
// tmpClient.makePath(chrootPath, false, true);
// exists = true;
// }
// tmpClient.close();
// return exists;
}
public boolean isConnected() {
return zkClient.isConnected();
}
public void createEphemeralLiveNode() {
String nodeName = getNodeName();
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
log.info("Create our ephemeral live node {}", ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName);
createLiveNodeImpl(nodePath);
}
private void createLiveNodeImpl(String nodePath) {
try {
try {
zkClient.create(nodePath, (byte[]) null, CreateMode.EPHEMERAL, true);
} catch (KeeperException.NodeExistsException e) {
log.warn("Found our ephemeral live node already exists. This must be a quick restart after a hard shutdown? ... {}", nodePath);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
public void removeEphemeralLiveNode() {
if (zkClient.isAlive()) {
log.info("Removing our ephemeral live node");
String nodeName = getNodeName();
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
try {
zkClient.delete(nodePath, -1, true, false);
} catch (NoNodeException | SessionExpiredException e) {
// okay
} catch (Exception e) {
log.warn("Could not remove ephemeral live node {}", nodePath, e);
}
}
}
public String getNodeName() {
return nodeName;
}
/**
* Returns true if the path exists
*/
public boolean pathExists(String path) throws KeeperException,
InterruptedException {
return zkClient.exists(path);
}
public void registerUnloadWatcher(String collection, String shardId, String name) {
// MRM TODO: - this thing is currently bad
// zkStateReader.registerDocCollectionWatcher(collection,
// new UnloadCoreOnDeletedWatcher(shardId, name));
}
public String register(String coreName, final CoreDescriptor desc) throws Exception {
return register(coreName, desc, false);
}
public boolean isOverseerLeader() {
return overseerElector != null && overseerElector.isLeader();
}
public static volatile Predicate<CoreDescriptor> testing_beforeRegisterInZk;
/**
* Register shard with ZooKeeper.
*
* @return the shardId for the SolrCore
*/
private String register(String coreName, final CoreDescriptor desc, boolean afterExpiration) {
if (getCoreContainer().isShutDown() || isDcCalled()) {
throw new AlreadyClosedException();
}
if (testing_beforeRegisterInZk != null) {
boolean didTrigger = testing_beforeRegisterInZk.test(desc);
if (log.isDebugEnabled()) {
log.debug("{} pre-zk hook", (didTrigger ? "Ran" : "Skipped"));
}
}
MDCLoggingContext.setCoreName(desc.getName());
ZkShardTerms shardTerms = null;
// LeaderElector leaderElector = null;
LeaderElector leaderElector = null;
try {
final String baseUrl = getBaseUrl();
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
final String collection = cloudDesc.getCollectionName();
final String shardId = cloudDesc.getShardId();
log.debug("Register replica - core={} id={} address={} collection={} shard={} type={}", coreName, desc.getCoreProperties().get("id"), baseUrl, collection, shardId, cloudDesc.getReplicaType());
log.debug("Register terms for replica {}", coreName);
registerShardTerms(collection, cloudDesc.getShardId(), coreName);
log.info("Create leader elector for replica {}", coreName);
// leaderElector = leaderElectors.get(coreName);
// if (leaderElector == null) {
// leaderElector = new LeaderElector(this);
// LeaderElector oldElector = leaderElectors.putIfAbsent(coreName, leaderElector);
//
// if (oldElector != null) {
// IOUtils.closeQuietly(leaderElector);
// }
//
// if (cc.isShutDown()) {
// IOUtils.closeQuietly(leaderElector);
// IOUtils.closeQuietly(oldElector);
// IOUtils.closeQuietly(getShardTermsOrNull(collection, shardId));
// throw new AlreadyClosedException();
// }
// }
// If we're a preferred leader, insert ourselves at the head of the queue
boolean joinAtHead = false; //replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
if (cloudDesc.getReplicaType() != Type.PULL) {
//getCollectionTerms(collection).register(cloudDesc.getShardId(), coreName);
// MRM TODO: review joinAtHead
joinElection(desc, joinAtHead);
}
log.info("Wait to see leader for {}, {}", collection, shardId);
String leaderName = null;
for (int i = 0; i < 15; i++) {
if (isClosed() || isDcCalled() || cc.isShutDown()) {
throw new AlreadyClosedException();
}
leaderElector = leaderElectors.get(coreName);
if (leaderElector != null && leaderElector.isLeader()) {
leaderName = coreName;
break;
}
try {
DocCollection coll = zkStateReader.getClusterState().getCollectionOrNull(collection);
if (coll != null) {
Slice slice = coll.getSlice(shardId);
if (slice != null) {
Replica leaderReplica = slice.getLeader();
if (leaderReplica != null) {
if (leaderReplica.getNodeName().equals(getNodeName())) {
leaderElector = leaderElectors.get(leaderReplica.getName());
if (leaderElector != null && leaderElector.isLeader()) {
leaderName = leaderReplica.getName();
break;
}
}
}
}
}
Replica leader = zkStateReader.getLeaderRetry(getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient(),collection, shardId, Integer.getInteger("solr.getleader.looptimeout", 2000), true);
leaderName = leader.getName();
break;
} catch (TimeoutException | InterruptedException e) {
if (isClosed() || isDcCalled() || cc.isShutDown()) {
throw new AlreadyClosedException();
}
log.debug("Timeout waiting to see leader, retry collection={} shard={}", collection, shardId);
}
}
if (leaderName == null) {
log.error("No leader found while trying to register " + coreName + " with zookeeper collection={}", zkStateReader.getCollectionOrNull(collection));
throw new SolrException(ErrorCode.SERVER_ERROR, "No leader found while trying to register " + coreName + " with zookeeper");
}
boolean isLeader = leaderName.equals(coreName);
log.debug("We are {} and leader is {} isLeader={}", coreName, leaderName, isLeader);
log.debug("Check if we should recover isLeader={}", isLeader);
//assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
// recover from local transaction log and wait for it to complete before
// going active
// TODO: should this be moved to another thread? To recoveryStrat?
// TODO: should this actually be done earlier, before (or as part of)
// leader election perhaps?
try (SolrCore core = cc.getCore(coreName)) {
if (core == null || core.isClosing() || getCoreContainer().isShutDown()) {
throw new AlreadyClosedException();
}
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
boolean isTlogReplicaAndNotLeader = cloudDesc.getReplicaType() == Replica.Type.TLOG && !isLeader;
if (isTlogReplicaAndNotLeader) {
String commitVersion = ReplicateFromLeader.getCommitVersion(core);
if (commitVersion != null) {
ulog.copyOverOldUpdates(Long.parseLong(commitVersion));
}
}
// we will call register again after zk expiration and on reload
if (!afterExpiration && !core.isReloaded() && ulog != null && !isTlogReplicaAndNotLeader) {
// disable recovery in case shard is in construction state (for shard splits)
DocCollection coll = getClusterState().getCollectionOrNull(collection);
Slice slice = null;
if (coll != null) {
slice = coll.getSlice(shardId);
}
if ((slice != null && slice.getState() != Slice.State.CONSTRUCTION) || !isLeader) {
Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler().getUpdateLog().recoverFromLog();
if (recoveryFuture != null) {
// MRM TODO: if we publish active early (like we will) log replay will not be done
log.info("Replaying tlog for {} during startup... NOTE: This can take a while.", core);
recoveryFuture.get(); // NOTE: this could potentially block for
// minutes or more!
// TODO: public as recovering in the mean time?
// TODO: in the future we could do peersync in parallel with recoverFromLog
} else {
if (log.isDebugEnabled()) {
log.debug("No LogReplay needed for core={} baseURL={}", core.getName(), baseUrl);
}
}
}
}
if (cloudDesc.getReplicaType() != Type.PULL && !isLeader) {
checkRecovery(core, cc);
} else if (isTlogReplicaAndNotLeader) {
startReplicationFromLeader(coreName, true);
}
if (cloudDesc.getReplicaType() == Type.PULL) {
startReplicationFromLeader(coreName, false);
}
if (cloudDesc.getReplicaType() != Type.PULL) {
shardTerms = getShardTerms(collection, cloudDesc.getShardId());
// the watcher is added to a set so multiple calls of this method will left only one watcher
if (log.isDebugEnabled()) log.debug("add shard terms listener for {}", coreName);
shardTerms.addListener(desc.getName(), new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer()));
}
}
// the watcher is added to a set so multiple calls of this method will left only one watcher
// MRM TODO:
// registerUnloadWatcher(cloudDesc.getCollectionName(), cloudDesc.getShardId(), desc.getName());
log.debug("SolrCore Registered, core{} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
desc.getCloudDescriptor().setHasRegistered(true);
return shardId;
} catch (AlreadyClosedException e) {
log.warn("Won't register with ZooKeeper, already shutting down core={}", desc.getName());
throw e;
} catch (Exception e) {
log.error("Error registering SolrCore with Zookeeper core={}", desc, e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore with Zookeeper", e);
} finally {
if (isDcCalled() || isClosed()) {
IOUtils.closeQuietly(leaderElector);
}
MDCLoggingContext.clear();
}
}
private Replica getReplicaOrNull(DocCollection docCollection, String shard, String coreName) {
if (docCollection == null) return null;
return docCollection.getReplica(coreName);
}
public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) throws InterruptedException {
if (isClosed()) throw new AlreadyClosedException();
log.info("{} starting background replication from leader", coreName);
stopReplicationFromLeader(coreName);
ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
if (isDcCalled() || isClosed || cc.isShutDown()) {
return;
}
ReplicateFromLeader prev = replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader);
if (prev == null) {
replicateFromLeader.startReplication(switchTransactionLog);
} else {
log.warn("A replicate from leader instance already exists for core {}", coreName);
try {
prev.close();
} catch (Exception e) {
ParWork.propagateInterrupt("Error closing previous replication attempt", e);
}
// if (isClosed()) throw new AlreadyClosedException();
replicateFromLeader.startReplication(switchTransactionLog);
}
}
public void stopReplicationFromLeader(String coreName) {
log.info("{} stopping background replication from leader", coreName);
ReplicateFromLeader replicateFromLeader = replicateFromLeaders.remove(coreName);
if (replicateFromLeader != null) {
IOUtils.closeQuietly(replicateFromLeader);
}
}
/**
* Get leader props directly from zk nodes.
* @throws SessionExpiredException on zk session expiration.
*/
public Replica getLeaderProps(final String collection, long id,
final String slice, int timeoutms) throws InterruptedException, SessionExpiredException {
return getLeaderProps(collection, id, slice, timeoutms, true);
}
/**
* Get leader props directly from zk nodes.
*
* @return leader props
* @throws SessionExpiredException on zk session expiration.
*/
public Replica getLeaderProps(final String collection, long id, final String slice, int timeoutms, boolean failImmediatelyOnExpiration)
throws InterruptedException, SessionExpiredException { // MRM TODO: look at failImmediatelyOnExpiration
try {
getZkStateReader().waitForState(collection, timeoutms, TimeUnit.SECONDS, (n, c) -> c != null && c.getLeader(slice) != null);
byte[] data = zkClient.getData(ZkStateReader.getShardLeadersPath(collection, slice), null, null);
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(ZkNodeProps.load(data));
return new Replica(leaderProps.getNodeProps().getStr(CORE_NAME_PROP), leaderProps.getNodeProps().getProperties(), collection, id, slice, zkStateReader);
} catch (Exception e) {
SolrZkClient.checkInterrupted(e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
private LeaderElector joinElection(CoreDescriptor cd, boolean joinAtHead) {
log.info("joinElection {}", cd.getName());
// look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName();
String shardId = cd.getCloudDescriptor().getShardId();
Map<String, Object> props = new HashMap<>();
// we only put a subset of props into the leader node
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
props.put(CORE_NAME_PROP, cd.getName());
String id = cd.getCoreProperty("id", "-1");
if (id.equals("-1")) {
throw new IllegalArgumentException("no id found props=" + cd.getCoreProperties());
}
props.put("id", id);
String collId = cd.getCoreProperty("collId", "-1");
if (collId.equals("-1")) {
throw new IllegalArgumentException("no id found props=" + cd.getCoreProperties());
}
Replica replica = new Replica(cd.getName(), props, collection, Long.parseLong(collId), shardId, zkStateReader);
LeaderElector leaderElector;
if (isDcCalled() || isClosed) {
return null;
}
leaderElector = leaderElectors.get(replica.getName());
if (leaderElector == null) {
leaderElector = new LeaderElector(this);
LeaderElector oldElector = leaderElectors.put(replica.getName(), leaderElector);
IOUtils.closeQuietly(oldElector);
} else {
leaderElector.cancel();
}
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
collection, replica, this, cc, cd);
leaderElector.setup(context);
log.info("Joining election ...");
leaderElector.joinElection( false, joinAtHead);
return leaderElector;
}
/**
* Returns whether or not a recovery was started
*/
private void checkRecovery(SolrCore core, CoreContainer cc) {
if (log.isInfoEnabled()) {
log.info("Core needs to recover:{}", core.getName());
}
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
}
public String getBaseUrl() {
return baseURL;
}
public void publish(final CoreDescriptor cd, final Replica.State state) throws Exception {
publish(cd, state, true);
}
/**
* Publish core state to overseer.
*/
public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws Exception {
MDCLoggingContext.setCoreName(cd.getName());
log.debug("publishing state={}", state);
String collection = cd.getCloudDescriptor().getCollectionName();
String shardId = cd.getCloudDescriptor().getShardId();
Map<String,Object> props = new HashMap<>();
try (SolrCore core = cc.getCore(cd.getName())) {
// MRM TODO: if we publish anything but ACTIVE, cancel any possible election?
// System.out.println(Thread.currentThread().getStackTrace()[3]);
Integer numShards = cd.getCloudDescriptor().getNumShards();
if (numShards == null) { // XXX sys prop hack
log.debug("numShards not found on descriptor - reading it from system property");
}
props.put(Overseer.QUEUE_OPERATION, "state");
props.put(ZkStateReader.STATE_PROP, state.toString());
props.put("id", cd.getCoreProperty("collId", "-1") + "-" + cd.getCoreProperty("id", "-1"));
// props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
props.put(CORE_NAME_PROP, cd.getName());
// props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
// props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
props.put(ZkStateReader.COLLECTION_PROP, collection);
props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
// try {
// if (core.getDirectoryFactory().isSharedStorage()) {
// // MRM TODO: currently doesn't publish anywhere
// if (core.getDirectoryFactory().isSharedStorage()) {
// props.put(ZkStateReader.SHARED_STORAGE_PROP, "true");
// props.put("dataDir", core.getDataDir());
// UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
// if (ulog != null) {
// props.put("ulogDir", ulog.getLogDir());
// }
// }
// }
// } catch (SolrCoreInitializationException ex) {
// // The core had failed to initialize (in a previous request, not this one), hence nothing to do here.
// if (log.isInfoEnabled()) {
// log.info("The core '{}' had failed to initialize before.", cd.getName());
// }
// }
// pull replicas are excluded because their terms are not considered
if ((state == Replica.State.RECOVERING || state == Replica.State.BUFFERING) && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
// state is used by client, state of replica can change from RECOVERING to DOWN without needed to finish recovery
// by calling this we will know that a replica actually finished recovery or not
ZkShardTerms shardTerms = getShardTerms(collection, shardId);
shardTerms.startRecovering(cd.getName());
shardTerms.setTermEqualsToLeader(cd.getName());
}
if (state == Replica.State.ACTIVE && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
ZkShardTerms shardTerms = getShardTerms(collection, shardId);
shardTerms.doneRecovering(cd.getName());
}
ZkNodeProps m = new ZkNodeProps(props);
if (updateLastState) {
cd.getCloudDescriptor().setLastPublished(state);
}
statePublisher.submitState(m);
} finally {
MDCLoggingContext.clear();
}
}
public void publish(ZkNodeProps message) {
statePublisher.submitState(message);
}
public void registerShardTerms(String collection, String shardId, String corename) throws Exception {
ZkCollectionTerms ct = getCollectionTerms(collection);
if (ct == null) {
ct = createCollectionTerms(collection);
}
ct.register(shardId, corename);
}
public ZkShardTerms getShardTerms(String collection, String shardId) throws Exception {
ZkCollectionTerms ct = getCollectionTerms(collection);
if (ct == null) {
ct = createCollectionTerms(collection);
}
return ct.getShard(shardId);
}
public ZkShardTerms getShardTermsOrNull(String collection, String shardId) {
ZkCollectionTerms ct = getCollectionTerms(collection);
if (ct == null) return null;
return ct.getShardOrNull(shardId);
}
public void removeCollectionTerms(String collection) {
ZkCollectionTerms collectionTerms = collectionToTerms.remove(collection);
IOUtils.closeQuietly(collectionTerms);
}
public ZkCollectionTerms getCollectionTerms(String collection) {
return collectionToTerms.get(collection);
}
public ZkCollectionTerms createCollectionTerms(String collection) {
ZkCollectionTerms ct = new ZkCollectionTerms(collection, zkClient);
ZkCollectionTerms returned = collectionToTerms.putIfAbsent(collection, ct);
if (returned == null) {
return ct;
} else {
IOUtils.closeQuietly(ct);
}
return returned;
}
public void clearZkCollectionTerms() {
collectionToTerms.values().forEach(ZkCollectionTerms::close);
}
public void unregister(String coreName, String collection, String shardId) throws KeeperException, InterruptedException {
log.info("Unregister core from zookeeper {}", coreName);
try {
removeShardLeaderElector(coreName);
ZkCollectionTerms ct = collectionToTerms.get(collection);
if (ct != null) {
ct.remove(shardId, coreName);
}
replicasMetTragicEvent.remove(collection + ":" + coreName);
} finally {
try {
zkStateReader.unregisterCore(collection, coreName);
} finally {
if (statePublisher != null) {
statePublisher.clearStatCache(coreName);
}
}
}
// if (Strings.isNullOrEmpty(collection)) {
// log.error("No collection was specified.");
// assert false : "No collection was specified [" + collection + "]";
// return;
// }
// final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
// Replica replica = (docCollection == null) ? null : docCollection.getReplica(coreName);
}
public ZkStateReader getZkStateReader() {
return zkStateReader;
}
public static void linkConfSet(SolrZkClient zkClient, String collection, String confSetName) throws KeeperException, InterruptedException {
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
log.debug("Load collection config from:{}", path);
byte[] data;
try {
data = zkClient.getData(path, null, null);
} catch (NoNodeException e) {
// if there is no node, we will try and create it
// first try to make in case we are pre configuring
ZkNodeProps props = new ZkNodeProps(CONFIGNAME_PROP, confSetName);
try {
zkClient.makePath(path, Utils.toJSON(props),
CreateMode.PERSISTENT, null, true);
} catch (KeeperException e2) {
// it's okay if the node already exists
if (e2.code() != KeeperException.Code.NODEEXISTS) {
throw e;
}
// if we fail creating, setdata
// TODO: we should consider using version
zkClient.setData(path, Utils.toJSON(props), true);
}
return;
}
// we found existing data, let's update it
ZkNodeProps props;
if (data != null) {
props = ZkNodeProps.load(data);
Map<String, Object> newProps = new HashMap<>(props.getProperties());
newProps.put(CONFIGNAME_PROP, confSetName);
props = new ZkNodeProps(newProps);
} else {
props = new ZkNodeProps(CONFIGNAME_PROP, confSetName);
}
// TODO: we should consider using version
zkClient.setData(path, Utils.toJSON(props), true);
}
/**
* If in SolrCloud mode, upload config sets for each SolrCore in solr.xml.
*/
public static void bootstrapConf(SolrZkClient zkClient, CoreContainer cc) throws IOException, KeeperException {
ZkConfigManager configManager = new ZkConfigManager(zkClient);
//List<String> allCoreNames = cfg.getAllCoreNames();
List<CoreDescriptor> cds = cc.getCoresLocator().discover(cc);
if (log.isInfoEnabled()) {
log.info("bootstrapping config for {} cores into ZooKeeper using solr.xml from {}", cds.size(), cc.getSolrHome());
}
for (CoreDescriptor cd : cds) {
String coreName = cd.getName();
String confName = cd.getCollectionName();
if (StringUtils.isEmpty(confName))
confName = coreName;
Path udir = cd.getInstanceDir().resolve("conf");
log.info("Uploading directory {} with name {} for solrCore {}", udir, confName, coreName);
configManager.uploadConfigDir(udir, confName);
}
}
public ZkDistributedQueue getOverseerJobQueue() {
return overseerJobQueue;
}
public OverseerTaskQueue getOverseerCollectionQueue() {
return overseerCollectionQueue;
}
public OverseerTaskQueue getOverseerConfigSetQueue() {
return overseerConfigSetQueue;
}
public DistributedMap getOverseerRunningMap() {
return overseerRunningMap;
}
public DistributedMap getOverseerCompletedMap() {
return overseerCompletedMap;
}
public DistributedMap getOverseerFailureMap() {
return overseerFailureMap;
}
/**
* When an operation needs to be performed in an asynchronous mode, the asyncId needs
* to be claimed by calling this method to make sure it's not duplicate (hasn't been
* claimed by other request). If this method returns true, the asyncId in the parameter
* has been reserved for the operation, meaning that no other thread/operation can claim
* it. If for whatever reason, the operation is not scheduled, the asuncId needs to be
* cleared using {@link #clearAsyncId(String)}.
* If this method returns false, no reservation has been made, and this asyncId can't
* be used, since it's being used by another operation (currently or in the past)
* @param asyncId A string representing the asyncId of an operation. Can't be null.
* @return True if the reservation succeeds.
* False if this ID is already in use.
*/
public boolean claimAsyncId(String asyncId) throws KeeperException {
try {
return asyncIdsMap.putIfAbsent(asyncId, EMPTY_BYTE_ARRAY);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new RuntimeException(e);
}
}
/**
* Clears an asyncId previously claimed by calling {@link #claimAsyncId(String)}
* @param asyncId A string representing the asyncId of an operation. Can't be null.
* @return True if the asyncId existed and was cleared.
* False if the asyncId didn't exist before.
*/
public boolean clearAsyncId(String asyncId) throws KeeperException {
try {
return asyncIdsMap.remove(asyncId);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new RuntimeException(e);
}
}
public void clearStatePublisher() {
this.statePublisher.clearStatCache();
}
public void clearCachedState(String coreName) {
this.statePublisher.clearStatCache(coreName);
}
public int getClientTimeout() {
return clientTimeout;
}
public Overseer getOverseer() {
return overseer;
}
/**
* Returns the nodeName that should be used based on the specified properties.
*
* @param hostName - must not be null or the empty string
* @param hostPort - must consist only of digits, must not be null or the empty string
* @param hostContext - should not begin or end with a slash (leading/trailin slashes will be ignored), must not be null, may be the empty string to denote the root context
* @lucene.experimental
* @see ZkStateReader#getBaseUrlForNodeName
*/
public static String generateNodeName(final String hostName,
final String hostPort,
final String hostContext) {
return hostName + ':' + hostPort + '_' +
URLEncoder.encode(trimLeadingAndTrailingSlashes(hostContext), StandardCharsets.UTF_8);
}
public static String generateNodeName(final String url) {
return URLEncoder.encode(trimLeadingAndTrailingSlashes(url), StandardCharsets.UTF_8);
}
/**
* Utility method for trimming and leading and/or trailing slashes from
* its input. May return the empty string. May return null if and only
* if the input is null.
*/
public static String trimLeadingAndTrailingSlashes(final String in) {
if (null == in) return in;
String out = in;
if (out.startsWith("/")) {
out = out.substring(1);
}
if (out.endsWith("/")) {
out = out.substring(0, out.length() - 1);
}
return out;
}
public void rejoinOverseerElection(boolean joinAtHead) {
boolean closeAndDone;
try {
closeAndDone = overseer.isCloseAndDone();
} catch (NullPointerException e) {
// okay
closeAndDone = true;
}
ElectionContext context = overseerElector.getContext();
if (overseerElector == null || isClosed() || isShutdownCalled || closeAndDone || context == null) {
return;
}
try {
overseerElector.retryElection(joinAtHead);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
}
}
public void rejoinShardLeaderElection(SolrParams params) {
String coreName = params.get(CORE_NAME_PROP);
try {
log.info("Rejoin the shard leader election.");
LeaderElector elect = leaderElectors.get(coreName);
if (elect != null) {
elect.retryElection(params.getBool(REJOIN_AT_HEAD_PROP, false));
}
try (SolrCore core = getCoreContainer().getCore(coreName)) {
core.getSolrCoreState().doRecovery(core);
}
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
}
}
public CoreContainer getCoreContainer() {
return cc;
}
public void throwErrorIfReplicaReplaced(CoreDescriptor desc) {
ClusterState clusterState = getZkStateReader().getClusterState();
if (clusterState != null && desc != null) {
CloudUtil.checkSharedFSFailoverReplaced(cc, desc);
}
}
/**
* Add a listener to be notified once there is a new session created after a ZooKeeper session expiration occurs;
* in most cases, listeners will be components that have watchers that need to be re-created.
*/
public void addOnReconnectListener(OnReconnect listener) {
if (listener != null) {
reconnectListeners.add(listener);
if (log.isDebugEnabled()) log.debug("Added new OnReconnect listener {}", listener);
}
}
/**
* Removed a previously registered OnReconnect listener, such as when a core is removed or reloaded.
*/
public void removeOnReconnectListener(OnReconnect listener) {
if (listener != null) {
boolean wasRemoved = reconnectListeners.remove(listener);
if (wasRemoved) {
if (log.isDebugEnabled()) log.debug("Removed OnReconnect listener {}", listener);
} else {
log.warn("Was asked to remove OnReconnect listener {}, but remove operation " +
"did not find it in the list of registered listeners."
, listener);
}
}
}
Set<OnReconnect> getCurrentOnReconnectListeners() {
return Collections.unmodifiableSet(reconnectListeners);
}
/**
* Persists a config file to ZooKeeper using optimistic concurrency.
*
* @return true on success
*/
public static int persistConfigResourceToZooKeeper(ZkSolrResourceLoader zkLoader, int znodeVersion,
String resourceName, byte[] content,
boolean createIfNotExists) {
int latestVersion = znodeVersion;
final SolrZkClient zkClient = zkLoader.getZkClient();
final String resourceLocation = zkLoader.getConfigSetZkPath() + "/" + resourceName;
String errMsg = "Failed to persist resource at {0} - old {1}";
try {
try {
Stat stat = zkClient.setData(resourceLocation, content, znodeVersion, true);
latestVersion = stat.getVersion();// if the set succeeded , it should have incremented the version by one always
log.info("Persisted config data to node {} ", resourceLocation);
touchConfDir(zkLoader);
} catch (NoNodeException e) {
if (createIfNotExists) {
try {
zkClient.create(resourceLocation, content, CreateMode.PERSISTENT, true);
latestVersion = 0;//just created so version must be zero
touchConfDir(zkLoader);
} catch (KeeperException.NodeExistsException nee) {
try {
Stat stat = zkClient.exists(resourceLocation, null);
if (log.isDebugEnabled()) {
log.debug("failed to set data version in zk is {} and expected version is {} ", stat.getVersion(), znodeVersion);
}
} catch (Exception e1) {
ParWork.propagateInterrupt(e1);
log.warn("could not get stat");
}
if (log.isInfoEnabled()) {
log.info(StrUtils.formatString(errMsg, resourceLocation, znodeVersion));
}
throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, StrUtils.formatString(errMsg, resourceLocation, znodeVersion) + ", retry.");
}
}
}
} catch (KeeperException.BadVersionException bve) {
if (log.isInfoEnabled()) {
log.info(StrUtils.formatString("%s zkVersion= %d %s %d", errMsg, resourceLocation, znodeVersion));
}
throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, StrUtils.formatString(errMsg, resourceLocation, znodeVersion) + ", retry.");
} catch (ResourceModifiedInZkException e) {
throw e;
} catch (Exception e) {
ParWork.propagateInterrupt(e);
final String msg = "Error persisting resource at " + resourceLocation;
log.error(msg, e);
throw new SolrException(ErrorCode.SERVER_ERROR, msg, e);
}
return latestVersion;
}
public static void touchConfDir(ZkSolrResourceLoader zkLoader) {
SolrZkClient zkClient = zkLoader.getZkClient();
try {
zkClient.setData(zkLoader.getConfigSetZkPath(), new byte[]{0}, true);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
final String msg = "Error 'touching' conf location " + zkLoader.getConfigSetZkPath();
log.error(msg, e);
throw new SolrException(ErrorCode.SERVER_ERROR, msg, e);
}
}
public static class ResourceModifiedInZkException extends SolrException {
public ResourceModifiedInZkException(ErrorCode code, String msg) {
super(code, msg);
}
}
private void unregisterConfListener(String confDir, Runnable listener) {
synchronized (confDirectoryListeners) {
final ConfListeners confListeners = confDirectoryListeners.get(confDir);
if (confListeners == null) {
log.warn("{} has no more registered listeners, but a live one attempted to unregister!", confDir);
return;
}
if (confListeners.confDirListeners.remove(listener)) {
if (log.isDebugEnabled()) log.debug("removed listener for config directory [{}]", confDir);
}
if (confListeners.confDirListeners.isEmpty()) {
confDirectoryListeners.remove(confDir);
// no more listeners for this confDir, remove it from the map
if (log.isDebugEnabled()) log.debug("No more listeners for config directory [{}]", confDir);
try {
zkClient.removeWatches(COLLECTIONS_ZKNODE, confListeners.watcher, Watcher.WatcherType.Any, true);
} catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
}
}
}
/**
* This will give a callback to the listener whenever a child is modified in the
* conf directory. It is the responsibility of the listener to check if the individual
* item of interest has been modified. When the last core which was interested in
* this conf directory is gone the listeners will be removed automatically.
*/
public void registerConfListenerForCore(final String confDir, SolrCore core, final Runnable listener) {
if (listener == null) {
throw new NullPointerException("listener cannot be null");
}
final ConfListeners confDirListeners = getConfDirListeners(confDir);
confDirListeners.confDirListeners.add(listener);
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
unregisterConfListener(confDir, listener);
}
@Override
public void postClose(SolrCore core) {
}
});
}
private static class ConfListeners {
private Set<Runnable> confDirListeners;
private final Watcher watcher;
ConfListeners( Set<Runnable> confDirListeners, Watcher watcher) {
this.confDirListeners = confDirListeners;
this.watcher = watcher;
}
}
private ConfListeners getConfDirListeners(final String confDir) {
synchronized (confDirectoryListeners) {
ConfListeners confDirListeners = confDirectoryListeners.get(confDir);
if (confDirListeners == null) {
if (log.isTraceEnabled()) log.trace("watch zkdir {}", confDir);
ConfDirWatcher watcher = new ConfDirWatcher(confDir, cc, confDirectoryListeners);
confDirListeners = new ConfListeners(ConcurrentHashMap.newKeySet(), watcher);
confDirectoryListeners.put(confDir, confDirListeners);
setConfWatcher(confDir, watcher, null, cc, confDirectoryListeners, cc.getZkController().getZkClient());
}
return confDirListeners;
}
}
private final Map<String, ConfListeners> confDirectoryListeners = new ConcurrentHashMap<>();
private static class ConfDirWatcher implements Watcher {
private final String zkDir;
private final CoreContainer cc;
private final SolrZkClient zkClient;
private final Map<String, ConfListeners> confDirectoryListeners;
private ConfDirWatcher(String dir, CoreContainer cc, Map<String, ConfListeners> confDirectoryListeners) {
this.zkDir = dir;
this.cc = cc;
this.zkClient = cc.getZkController().getZkClient();
this.confDirectoryListeners = confDirectoryListeners;
}
@Override
public void process(WatchedEvent event) {
// session events are not change events, and do not remove the watcher
if (Event.EventType.None.equals(event.getType())) {
return;
}
if (cc.getZkController().isClosed() || cc.isShutDown() || cc.getZkController().isDcCalled()) {
return;
}
Stat stat = null;
try {
stat = zkClient.exists(zkDir, null);
} catch (KeeperException e) {
log.info(e.getMessage(), e);
} catch (InterruptedException e) {
log.info("WatcherImpl Interrupted");
return;
}
fireEventListeners(zkDir, confDirectoryListeners, cc);
}
}
private static boolean fireEventListeners(String zkDir, Map<String, ConfListeners> confDirectoryListeners, CoreContainer cc) {
if (cc.isShutDown()) {
return false;
}
// if this is not among directories to be watched then don't set the watcher anymore
if (!confDirectoryListeners.containsKey(zkDir)) {
if (log.isDebugEnabled()) log.debug("Watcher on {} is removed ", zkDir);
return false;
}
final Set<Runnable> listeners = confDirectoryListeners.get(zkDir).confDirListeners;
if (listeners != null) {
if (cc.isShutDown() || cc.getZkController().isDcCalled()) {
return false;
}
listeners.forEach(runnable -> ParWork.getRootSharedExecutor().submit(runnable));
}
return true;
}
private static void setConfWatcher(String zkDir, Watcher watcher, Stat stat, CoreContainer cc, Map<String, ConfListeners> confDirectoryListeners, SolrZkClient zkClient) {
try {
zkClient.addWatch(zkDir, watcher, AddWatchMode.PERSISTENT);
Stat newStat = zkClient.exists(zkDir, null);
if (stat != null && newStat.getVersion() > stat.getVersion()) {
//a race condition where a we missed an event fired
//so fire the event listeners
fireEventListeners(zkDir, confDirectoryListeners, cc);
}
} catch (KeeperException e) {
log.error("failed to set watcher for conf dir {} ", zkDir);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("failed to set watcher for conf dir {} ", zkDir);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
public OnReconnect getConfigDirListener() {
return new ZkControllerOnReconnect(confDirectoryListeners, cc);
}
/**
* Thrown during pre register process if the replica is not present in clusterstate
*/
public static class NotInClusterStateException extends SolrException {
public NotInClusterStateException(ErrorCode code, String msg) {
super(code, msg);
}
}
public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) {
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(dcore.getCollectionName());
if (collection != null) {
Replica r = collection.getReplica(dcore.getName());
return r != null;
}
return false;
}
/**
* Best effort to set DOWN state for all replicas on node.
*
* @param nodeName to operate on
*/
public void publishNodeAs(String nodeName, OverseerAction state) throws KeeperException {
log.info("Publish node={} as {}", nodeName, state);
if (overseer == null) {
log.warn("Could not publish node as down, no overseer was started yet");
return;
}
ZkNodeProps m = new ZkNodeProps(StatePublisher.OPERATION, state.toLower(),
ZkStateReader.NODE_NAME_PROP, nodeName);
try {
statePublisher.submitState(m);
} catch (AlreadyClosedException e) {
ParWork.propagateInterrupt("Not publishing node as " + state + " because a resource required to do so is already closed.", null, true);
return;
}
// Collection<SolrCore> cores = cc.getCores();
// for (SolrCore core : cores) {
// CoreDescriptor desc = core.getCoreDescriptor();
// String collection = desc.getCollectionName();
// try {
// zkStateReader.waitForState(collection, 3, TimeUnit.SECONDS, (n,c) -> {
// if (c != null) {
// List<Replica> replicas = c.getReplicas();
// for (Replica replica : replicas) {
// if (replica.getNodeName().equals(getNodeName())) {
// if (!replica.getState().equals(Replica.State.DOWN)) {
// log.info("Found state {} {}", replica.getState(), replica.getNodeName());
// return false;
// }
// }
// }
// }
// return true;
// });
// } catch (InterruptedException e) {
// ParWork.propegateInterrupt(e);
// return;
// } catch (TimeoutException e) {
// log.error("Timeout", e);
// }
// }
}
private static class ZkControllerOnReconnect implements OnReconnect {
private final Map<String, ConfListeners> confDirectoryListeners;
private final CoreContainer cc;
ZkControllerOnReconnect(Map<String, ConfListeners> confDirectoryListeners, CoreContainer cc) {
this.confDirectoryListeners = confDirectoryListeners;
this.cc = cc;
}
@Override
public void command() {
confDirectoryListeners.forEach((s, runnables) -> {
setConfWatcher(s, new ConfDirWatcher(s, cc, confDirectoryListeners), null, cc, confDirectoryListeners, cc.getZkController().getZkClient());
fireEventListeners(s, confDirectoryListeners, cc);
});
}
@Override
public String getName() {
return null;
}
}
}