| package org.apache.solr.common.cloud; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required byOCP applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrException.ErrorCode; |
| import org.apache.solr.common.util.ByteUtils; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.Watcher.Event.EventType; |
| import org.apache.zookeeper.data.Stat; |
| import org.noggit.CharArr; |
| import org.noggit.JSONParser; |
| import org.noggit.JSONWriter; |
| import org.noggit.ObjectBuilder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.net.URLDecoder; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| public class ZkStateReader { |
| private static Logger log = LoggerFactory.getLogger(ZkStateReader.class); |
| |
| public static final String BASE_URL_PROP = "base_url"; |
| public static final String NODE_NAME_PROP = "node_name"; |
| public static final String CORE_NODE_NAME_PROP = "core_node_name"; |
| public static final String ROLES_PROP = "roles"; |
| public static final String STATE_PROP = "state"; |
| public static final String CORE_NAME_PROP = "core"; |
| public static final String COLLECTION_PROP = "collection"; |
| public static final String SHARD_ID_PROP = "shard"; |
| public static final String REPLICA_PROP = "replica"; |
| public static final String SHARD_RANGE_PROP = "shard_range"; |
| public static final String SHARD_STATE_PROP = "shard_state"; |
| public static final String SHARD_PARENT_PROP = "shard_parent"; |
| public static final String NUM_SHARDS_PROP = "numShards"; |
| public static final String LEADER_PROP = "leader"; |
| |
| public static final String COLLECTIONS_ZKNODE = "/collections"; |
| public static final String LIVE_NODES_ZKNODE = "/live_nodes"; |
| public static final String ALIASES = "/aliases.json"; |
| public static final String CLUSTER_STATE = "/clusterstate.json"; |
| public static final String CLUSTER_PROPS = "/clusterprops.json"; |
| |
| |
| public static final String ROLES = "/roles.json"; |
| |
| public static final String RECOVERING = "recovering"; |
| public static final String RECOVERY_FAILED = "recovery_failed"; |
| public static final String ACTIVE = "active"; |
| public static final String DOWN = "down"; |
| public static final String SYNC = "sync"; |
| |
| public static final String CONFIGS_ZKNODE = "/configs"; |
| public final static String CONFIGNAME_PROP="configName"; |
| |
| public static final String LEGACY_CLOUD = "legacyCloud"; |
| |
| public static final String URL_SCHEME = "urlScheme"; |
| |
| private volatile ClusterState clusterState; |
| |
| private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000")); |
| |
| public static final String LEADER_ELECT_ZKNODE = "/leader_elect"; |
| |
| public static final String SHARD_LEADERS_ZKNODE = "leaders"; |
| private final Set<String> watchedCollections = new HashSet<String>(); |
| |
| |
| /**These are collections which are actively watched by this instance . |
| * |
| */ |
| private Map<String , DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>(); |
| private Set<String> allCollections = Collections.emptySet(); |
| |
| |
| |
| // |
| // convenience methods... should these go somewhere else? |
| // |
| public static byte[] toJSON(Object o) { |
| CharArr out = new CharArr(); |
| new JSONWriter(out, 2).write(o); // indentation by default |
| return toUTF8(out); |
| } |
| |
| public static byte[] toUTF8(CharArr out) { |
| byte[] arr = new byte[out.size() << 2]; // is 4x the real worst-case upper-bound? |
| int nBytes = ByteUtils.UTF16toUTF8(out, 0, out.size(), arr, 0); |
| return Arrays.copyOf(arr, nBytes); |
| } |
| |
| public static Object fromJSON(byte[] utf8) { |
| // convert directly from bytes to chars |
| // and parse directly from that instead of going through |
| // intermediate strings or readers |
| CharArr chars = new CharArr(); |
| ByteUtils.UTF8toUTF16(utf8, 0, utf8.length, chars); |
| JSONParser parser = new JSONParser(chars.getArray(), chars.getStart(), chars.length()); |
| try { |
| return ObjectBuilder.getVal(parser); |
| } catch (IOException e) { |
| throw new RuntimeException(e); // should never happen w/o using real IO |
| } |
| } |
| |
| /** |
| * Returns config set name for collection. |
| * |
| * @param collection to return config set name for |
| */ |
| public String readConfigName(String collection) { |
| |
| String configName = null; |
| |
| String path = COLLECTIONS_ZKNODE + "/" + collection; |
| if (log.isInfoEnabled()) { |
| log.info("Load collection config from:" + path); |
| } |
| |
| try { |
| byte[] data = zkClient.getData(path, null, null, true); |
| |
| if(data != null) { |
| ZkNodeProps props = ZkNodeProps.load(data); |
| configName = props.getStr(CONFIGNAME_PROP); |
| } |
| |
| if (configName != null) { |
| if (!zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) { |
| log.error("Specified config does not exist in ZooKeeper:" + configName); |
| throw new ZooKeeperException(ErrorCode.SERVER_ERROR, |
| "Specified config does not exist in ZooKeeper:" + configName); |
| } else if (log.isInfoEnabled()) { |
| log.info("path={} {}={} specified config exists in ZooKeeper", |
| new Object[] {path, CONFIGNAME_PROP, configName}); |
| } |
| } else { |
| throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path); |
| } |
| } |
| catch (KeeperException e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Error loading config name for collection " + collection, e); |
| } |
| catch (InterruptedException e) { |
| Thread.interrupted(); |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Error loading config name for collection " + collection, e); |
| } |
| |
| return configName; |
| } |
| |
| |
| private static class ZKTF implements ThreadFactory { |
| private static ThreadGroup tg = new ThreadGroup("ZkStateReader"); |
| @Override |
| public Thread newThread(Runnable r) { |
| Thread td = new Thread(tg, r); |
| td.setDaemon(true); |
| return td; |
| } |
| } |
| private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1, new ZKTF()); |
| |
| private boolean clusterStateUpdateScheduled; |
| |
| private SolrZkClient zkClient; |
| |
| private boolean closeClient = false; |
| |
| private ZkCmdExecutor cmdExecutor; |
| |
| private volatile Aliases aliases = new Aliases(); |
| |
| private volatile boolean closed = false; |
| |
| public ZkStateReader(SolrZkClient zkClient) { |
| this.zkClient = zkClient; |
| initZkCmdExecutor(zkClient.getZkClientTimeout()); |
| } |
| |
| public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException { |
| closeClient = true; |
| initZkCmdExecutor(zkClientTimeout); |
| zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, |
| // on reconnect, reload cloud info |
| new OnReconnect() { |
| |
| @Override |
| public void command() { |
| try { |
| ZkStateReader.this.createClusterStateWatchersAndUpdate(); |
| } catch (KeeperException e) { |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, |
| "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, |
| "", e); |
| } |
| |
| } |
| }); |
| } |
| |
| private void initZkCmdExecutor(int zkClientTimeout) { |
| // we must retry at least as long as the session timeout |
| cmdExecutor = new ZkCmdExecutor(zkClientTimeout); |
| } |
| |
| // load and publish a new CollectionInfo |
| public void updateClusterState(boolean immediate) throws KeeperException, InterruptedException { |
| updateClusterState(immediate, false); |
| } |
| |
| // load and publish a new CollectionInfo |
| public void updateLiveNodes() throws KeeperException, InterruptedException { |
| updateClusterState(true, true); |
| } |
| |
| public Aliases getAliases() { |
| return aliases; |
| } |
| |
| public Boolean checkValid(String coll, int version){ |
| DocCollection collection = clusterState.getCollectionOrNull(coll); |
| if(collection ==null) return null; |
| if(collection.getZnodeVersion() < version){ |
| log.info("server older than client {}<{}",collection.getZnodeVersion(),version); |
| DocCollection nu = getCollectionLive(this, coll); |
| if(nu.getZnodeVersion()> collection.getZnodeVersion()){ |
| updateWatchedCollection(nu); |
| collection = nu; |
| } |
| } |
| if(collection.getZnodeVersion() == version) return Boolean.TRUE; |
| log.info("wrong version from client {}!={} ",version, collection.getZnodeVersion()); |
| return Boolean.FALSE; |
| |
| } |
| |
| public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException, |
| InterruptedException { |
| // We need to fetch the current cluster state and the set of live nodes |
| |
| synchronized (getUpdateLock()) { |
| cmdExecutor.ensureExists(CLUSTER_STATE, zkClient); |
| cmdExecutor.ensureExists(ALIASES, zkClient); |
| |
| log.info("Updating cluster state from ZooKeeper... "); |
| |
| zkClient.exists(CLUSTER_STATE, new Watcher() { |
| |
| @Override |
| public void process(WatchedEvent event) { |
| // session events are not change events, |
| // and do not remove the watcher |
| if (EventType.None.equals(event.getType())) { |
| return; |
| } |
| log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event) , ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size()); |
| try { |
| |
| // delayed approach |
| // ZkStateReader.this.updateClusterState(false, false); |
| synchronized (ZkStateReader.this.getUpdateLock()) { |
| // remake watch |
| final Watcher thisWatch = this; |
| Stat stat = new Stat(); |
| byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat , |
| true); |
| Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes(); |
| ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln,ZkStateReader.this, null); |
| // update volatile |
| ZkStateReader.this.clusterState = clusterState; |
| |
| updateCollectionNames(); |
| // HashSet<String> all = new HashSet<>(colls);; |
| // all.addAll(clusterState.getAllInternalCollections()); |
| // all.remove(null); |
| |
| } |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.SESSIONEXPIRED |
| || e.code() == KeeperException.Code.CONNECTIONLOSS) { |
| log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); |
| return; |
| } |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, |
| "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.warn("", e); |
| return; |
| } |
| } |
| |
| }, true); |
| } |
| |
| |
| synchronized (ZkStateReader.this.getUpdateLock()) { |
| List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, |
| new Watcher() { |
| |
| @Override |
| public void process(WatchedEvent event) { |
| // session events are not change events, |
| // and do not remove the watcher |
| if (EventType.None.equals(event.getType())) { |
| return; |
| } |
| try { |
| // delayed approach |
| // ZkStateReader.this.updateClusterState(false, true); |
| synchronized (ZkStateReader.this.getUpdateLock()) { |
| List<String> liveNodes = zkClient.getChildren( |
| LIVE_NODES_ZKNODE, this, true); |
| log.debug("Updating live nodes... ({})", liveNodes.size()); |
| Set<String> liveNodesSet = new HashSet<>(); |
| liveNodesSet.addAll(liveNodes); |
| |
| ClusterState clusterState = ZkStateReader.this.clusterState; |
| |
| clusterState.setLiveNodes(liveNodesSet); |
| } |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.SESSIONEXPIRED |
| || e.code() == KeeperException.Code.CONNECTIONLOSS) { |
| log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); |
| return; |
| } |
| log.error("", e); |
| throw new ZooKeeperException( |
| SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.warn("", e); |
| return; |
| } |
| } |
| |
| }, true); |
| |
| Set<String> liveNodeSet = new HashSet<>(); |
| liveNodeSet.addAll(liveNodes); |
| ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this); |
| this.clusterState = clusterState; |
| updateCollectionNames(); |
| |
| zkClient.exists(ALIASES, |
| new Watcher() { |
| |
| @Override |
| public void process(WatchedEvent event) { |
| // session events are not change events, |
| // and do not remove the watcher |
| if (EventType.None.equals(event.getType())) { |
| return; |
| } |
| try { |
| synchronized (ZkStateReader.this.getUpdateLock()) { |
| log.info("Updating aliases... "); |
| |
| // remake watch |
| final Watcher thisWatch = this; |
| Stat stat = new Stat(); |
| byte[] data = zkClient.getData(ALIASES, thisWatch, stat , |
| true); |
| |
| Aliases aliases = ClusterState.load(data); |
| |
| ZkStateReader.this.aliases = aliases; |
| } |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.SESSIONEXPIRED |
| || e.code() == KeeperException.Code.CONNECTIONLOSS) { |
| log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); |
| return; |
| } |
| log.error("", e); |
| throw new ZooKeeperException( |
| SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.warn("", e); |
| return; |
| } |
| } |
| |
| }, true); |
| } |
| updateAliases(); |
| //on reconnect of SolrZkClient re-add watchers for the watched external collections |
| synchronized (this){ |
| for (String watchedCollection : watchedCollections) { |
| addZkWatch(watchedCollection); |
| } |
| } |
| } |
| |
| public void updateCollectionNames() throws KeeperException, InterruptedException { |
| Set<String> colls = getExternColls(); |
| colls.addAll(clusterState.getCollectionStates().keySet()); |
| allCollections = Collections.unmodifiableSet(colls); |
| } |
| |
| private Set<String> getExternColls() throws KeeperException, InterruptedException { |
| List<String> children = null; |
| try { |
| children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true); |
| } catch (KeeperException.NoNodeException e) { |
| log.warn("Error fetching collection names"); |
| |
| return new HashSet<>(); |
| } |
| if(children == null || children.isEmpty()) return new HashSet<>(); |
| HashSet<String> result = new HashSet<>(children.size()); |
| |
| for (String c : children) { |
| try { |
| if(zkClient.exists(getCollectionPath(c),true)) result.add(c); |
| } catch (Exception e) { |
| log.warn("Error checking external collections", e); |
| } |
| } |
| return result; |
| } |
| |
| |
| // load and publish a new CollectionInfo |
| private synchronized void updateClusterState(boolean immediate, |
| final boolean onlyLiveNodes) throws KeeperException, |
| InterruptedException { |
| // build immutable CloudInfo |
| |
| if (immediate) { |
| ClusterState clusterState; |
| synchronized (getUpdateLock()) { |
| List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null, |
| true); |
| Set<String> liveNodesSet = new HashSet<>(); |
| liveNodesSet.addAll(liveNodes); |
| |
| if (!onlyLiveNodes) { |
| log.info("Updating cloud state from ZooKeeper... "); |
| |
| clusterState = ClusterState.load(zkClient, liveNodesSet,this); |
| } else { |
| log.info("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size()); |
| clusterState = this.clusterState; |
| clusterState.setLiveNodes(liveNodesSet); |
| } |
| this.clusterState = clusterState; |
| updateCollectionNames(); |
| } |
| |
| } else { |
| if (clusterStateUpdateScheduled) { |
| log.info("Cloud state update for ZooKeeper already scheduled"); |
| return; |
| } |
| log.info("Scheduling cloud state update from ZooKeeper..."); |
| clusterStateUpdateScheduled = true; |
| updateCloudExecutor.schedule(new Runnable() { |
| |
| @Override |
| public void run() { |
| log.info("Updating cluster state from ZooKeeper..."); |
| synchronized (getUpdateLock()) { |
| clusterStateUpdateScheduled = false; |
| ClusterState clusterState; |
| try { |
| List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, |
| null, true); |
| Set<String> liveNodesSet = new HashSet<>(); |
| liveNodesSet.addAll(liveNodes); |
| |
| if (!onlyLiveNodes) { |
| log.info("Updating cloud state from ZooKeeper... "); |
| |
| clusterState = ClusterState.load(zkClient, liveNodesSet,ZkStateReader.this); |
| } else { |
| log.info("Updating live nodes from ZooKeeper... "); |
| clusterState = ZkStateReader.this.clusterState; |
| clusterState.setLiveNodes(liveNodesSet); |
| |
| } |
| |
| ZkStateReader.this.clusterState = clusterState; |
| |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.SESSIONEXPIRED |
| || e.code() == KeeperException.Code.CONNECTIONLOSS) { |
| log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); |
| return; |
| } |
| log.error("", e); |
| throw new ZooKeeperException( |
| SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.error("", e); |
| throw new ZooKeeperException( |
| SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } |
| // update volatile |
| ZkStateReader.this.clusterState = clusterState; |
| } |
| } |
| }, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS); |
| } |
| synchronized (this) { |
| for (String watchedCollection : watchedCollections) { |
| watchedCollectionStates.put(watchedCollection, getCollectionLive(this, watchedCollection)); |
| } |
| } |
| } |
| |
| /** |
| * @return information about the cluster from ZooKeeper |
| */ |
| public ClusterState getClusterState() { |
| return clusterState; |
| } |
| |
| public Object getUpdateLock() { |
| return this; |
| } |
| |
| public void close() { |
| this.closed = true; |
| if (closeClient) { |
| zkClient.close(); |
| } |
| } |
| |
| abstract class RunnableWatcher implements Runnable { |
| Watcher watcher; |
| public RunnableWatcher(Watcher watcher){ |
| this.watcher = watcher; |
| } |
| |
| } |
| |
| public String getLeaderUrl(String collection, String shard, int timeout) |
| throws InterruptedException, KeeperException { |
| ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, |
| shard, timeout)); |
| return props.getCoreUrl(); |
| } |
| |
| /** |
| * Get shard leader properties, with retry if none exist. |
| */ |
| public Replica getLeaderRetry(String collection, String shard) throws InterruptedException { |
| return getLeaderRetry(collection, shard, 4000); |
| } |
| |
| /** |
| * Get shard leader properties, with retry if none exist. |
| */ |
| public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException { |
| long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); |
| while (System.nanoTime() < timeoutAt && !closed) { |
| if (clusterState != null) { |
| Replica replica = clusterState.getLeader(collection, shard); |
| if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) { |
| return replica; |
| } |
| } |
| Thread.sleep(50); |
| } |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for " |
| + timeout + "ms " + ", collection: " + collection + " slice: " + shard); |
| } |
| |
| /** |
| * Get path where shard leader properties live in zookeeper. |
| */ |
| public static String getShardLeadersPath(String collection, String shardId) { |
| return COLLECTIONS_ZKNODE + "/" + collection + "/" |
| + SHARD_LEADERS_ZKNODE + (shardId != null ? ("/" + shardId) |
| : ""); |
| } |
| |
| public List<ZkCoreNodeProps> getReplicaProps(String collection, |
| String shardId, String thisCoreNodeName, String coreName) { |
| return getReplicaProps(collection, shardId, thisCoreNodeName, coreName, null); |
| } |
| |
| public List<ZkCoreNodeProps> getReplicaProps(String collection, |
| String shardId, String thisCoreNodeName, String coreName, String mustMatchStateFilter) { |
| return getReplicaProps(collection, shardId, thisCoreNodeName, coreName, mustMatchStateFilter, null); |
| } |
| |
| public List<ZkCoreNodeProps> getReplicaProps(String collection, |
| String shardId, String thisCoreNodeName, String coreName, String mustMatchStateFilter, String mustNotMatchStateFilter) { |
| assert thisCoreNodeName != null; |
| ClusterState clusterState = this.clusterState; |
| if (clusterState == null) { |
| return null; |
| } |
| Map<String,Slice> slices = clusterState.getSlicesMap(collection); |
| if (slices == null) { |
| throw new ZooKeeperException(ErrorCode.BAD_REQUEST, |
| "Could not find collection in zk: " + collection + " " |
| + clusterState.getCollections()); |
| } |
| |
| Slice replicas = slices.get(shardId); |
| if (replicas == null) { |
| throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId); |
| } |
| |
| Map<String,Replica> shardMap = replicas.getReplicasMap(); |
| List<ZkCoreNodeProps> nodes = new ArrayList<>(shardMap.size()); |
| for (Entry<String,Replica> entry : shardMap.entrySet()) { |
| ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue()); |
| |
| String coreNodeName = entry.getValue().getName(); |
| |
| if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) { |
| if (mustMatchStateFilter == null || mustMatchStateFilter.equals(nodeProps.getState())) { |
| if (mustNotMatchStateFilter == null || !mustNotMatchStateFilter.equals(nodeProps.getState())) { |
| nodes.add(nodeProps); |
| } |
| } |
| } |
| } |
| if (nodes.size() == 0) { |
| // no replicas |
| return null; |
| } |
| |
| return nodes; |
| } |
| |
| public SolrZkClient getZkClient() { |
| return zkClient; |
| } |
| public Set<String> getAllCollections(){ |
| return allCollections; |
| } |
| |
| public void updateAliases() throws KeeperException, InterruptedException { |
| byte[] data = zkClient.getData(ALIASES, null, null, true); |
| |
| Aliases aliases = ClusterState.load(data); |
| |
| ZkStateReader.this.aliases = aliases; |
| } |
| public Map getClusterProps(){ |
| Map result = null; |
| try { |
| if(getZkClient().exists(ZkStateReader.CLUSTER_PROPS,true)){ |
| result = (Map) ZkStateReader.fromJSON(getZkClient().getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)) ; |
| } else { |
| result= new LinkedHashMap(); |
| } |
| return result; |
| } catch (Exception e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR,"Error reading cluster properties",e) ; |
| } |
| } |
| |
| /** |
| * Returns the baseURL corresponding to a given node's nodeName -- |
| * NOTE: does not (currently) imply that the nodeName (or resulting |
| * baseURL) exists in the cluster. |
| * @lucene.experimental |
| */ |
| public String getBaseUrlForNodeName(final String nodeName) { |
| final int _offset = nodeName.indexOf("_"); |
| if (_offset < 0) { |
| throw new IllegalArgumentException("nodeName does not contain expected '_' seperator: " + nodeName); |
| } |
| final String hostAndPort = nodeName.substring(0,_offset); |
| try { |
| final String path = URLDecoder.decode(nodeName.substring(1+_offset), "UTF-8"); |
| String urlScheme = (String) getClusterProps().get(URL_SCHEME); |
| if(urlScheme == null) { |
| urlScheme = "http"; |
| } |
| return urlScheme + "://" + hostAndPort + (path.isEmpty() ? "" : ("/" + path)); |
| } catch (UnsupportedEncodingException e) { |
| throw new IllegalStateException("JVM Does not seem to support UTF-8", e); |
| } |
| } |
| |
| public void updateWatchedCollection(DocCollection c) { |
| if(watchedCollections.contains(c.getName())){ |
| watchedCollectionStates.put(c.getName(), c); |
| log.info("Updated DocCollection "+c.getName()+" to: "); |
| } |
| } |
| |
| /** |
| * <b>Advance usage</b> |
| * This method can be used to fetch a collection object and control whether it hits |
| * the cache only or if information can be looked up from ZooKeeper. |
| * |
| * @param coll the collection name |
| * @param cachedCopyOnly whether to fetch data from cache only or if hitting Zookeeper is acceptable |
| * @return the {@link org.apache.solr.common.cloud.DocCollection} |
| */ |
| public DocCollection getCollection(String coll, boolean cachedCopyOnly) { |
| if(clusterState.getCollectionStates().get(coll) != null) return clusterState.getCollectionStates().get(coll); |
| if (watchedCollections.contains(coll) || cachedCopyOnly) { |
| DocCollection c = watchedCollectionStates.get(coll); |
| if (c != null || cachedCopyOnly) return c; |
| } |
| return getCollectionLive(this, coll); |
| } |
| // this is only set by Overseer not to be set by others. If Overseer has |
| // unfinished external collections which are yet to be persisted to ZK |
| // this map is populated and this class can use that information |
| public Map ephemeralCollectionData; |
| |
| public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) { |
| String collectionPath = getCollectionPath(coll); |
| if(zkStateReader.ephemeralCollectionData !=null ){ |
| ClusterState cs = (ClusterState) zkStateReader.ephemeralCollectionData.get(collectionPath); |
| if(cs !=null) { |
| return cs.getCollectionStates().get(coll); |
| } |
| } |
| try { |
| if (!zkStateReader.getZkClient().exists(collectionPath, true)) return null; |
| Stat stat = new Stat(); |
| byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true); |
| ClusterState state = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(), zkStateReader, collectionPath); |
| return state.getCollectionStates().get(coll); |
| } catch (KeeperException.NoNodeException e) { |
| log.warn("No node available : " + collectionPath, e); |
| return null; |
| } catch (KeeperException e) { |
| throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e); |
| } |
| } |
| |
| public DocCollection getCollection(String coll) { |
| return getCollection(coll, false); |
| } |
| |
| public static String getCollectionPath(String coll) { |
| return COLLECTIONS_ZKNODE+"/"+coll + "/state.json"; |
| } |
| |
| public void addCollectionWatch(String coll) throws KeeperException, InterruptedException { |
| synchronized (this){ |
| if(watchedCollections.contains(coll)) return; |
| else { |
| watchedCollections.add(coll); |
| } |
| addZkWatch(coll); |
| } |
| |
| } |
| |
| private void addZkWatch(final String coll) throws KeeperException, InterruptedException { |
| log.info("addZkWatch {}", coll); |
| final String fullpath = getCollectionPath(coll); |
| synchronized (getUpdateLock()){ |
| |
| cmdExecutor.ensureExists(fullpath, zkClient); |
| log.info("Updating collection state at {} from ZooKeeper... ",fullpath); |
| |
| Watcher watcher = new Watcher() { |
| |
| @Override |
| public void process(WatchedEvent event) { |
| // session events are not change events, |
| // and do not remove the watcher |
| if (EventType.None.equals(event.getType())) { |
| return; |
| } |
| log.info("A cluster state change: {}, has occurred - updating... ", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size()); |
| try { |
| |
| // delayed approach |
| // ZkStateReader.this.updateClusterState(false, false); |
| synchronized (ZkStateReader.this.getUpdateLock()) { |
| if(!watchedCollections.contains(coll)) { |
| log.info("Unwatched collection {}",coll); |
| return; |
| } |
| // remake watch |
| final Watcher thisWatch = this; |
| Stat stat = new Stat(); |
| byte[] data = zkClient.getData(fullpath, thisWatch, stat, true); |
| |
| if(data == null || data.length ==0){ |
| log.warn("No value set for collection state : {}", coll); |
| return; |
| |
| } |
| ClusterState clusterState = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(),ZkStateReader.this,fullpath); |
| // update volatile |
| |
| DocCollection newState = clusterState.getCollectionStates().get(coll); |
| watchedCollectionStates.put(coll, newState); |
| log.info("Updating data for {} to ver {} ", coll , newState.getZnodeVersion()); |
| |
| } |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.SESSIONEXPIRED |
| || e.code() == KeeperException.Code.CONNECTIONLOSS) { |
| log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); |
| return; |
| } |
| log.error("Unwatched collection :"+coll , e); |
| throw new ZooKeeperException(ErrorCode.SERVER_ERROR, |
| "", e); |
| |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.interrupted(); |
| log.error("Unwatched collection :"+coll , e); |
| return; |
| } |
| } |
| |
| }; |
| zkClient.exists(fullpath, watcher, true); |
| } |
| |
| watchedCollectionStates.put(coll, getCollectionLive(this, coll)); |
| } |
| |
| /**This is not a public API. Only used by ZkController */ |
| public void removeZKWatch(final String coll){ |
| synchronized (this){ |
| watchedCollections.remove(coll); |
| } |
| } |
| |
| |
| |
| |
| } |