| package org.apache.solr.common.cloud; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.noggit.CharArr; |
| import org.noggit.JSONParser; |
| import org.noggit.JSONWriter; |
| import org.noggit.ObjectBuilder; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrException.ErrorCode; |
| import org.apache.solr.common.util.ByteUtils; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.Watcher.Event.EventType; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ZkStateReader { |
| private static Logger log = LoggerFactory.getLogger(ZkStateReader.class); |
| |
| public static final String BASE_URL_PROP = "base_url"; |
| public static final String NODE_NAME_PROP = "node_name"; |
| public static final String CORE_NODE_NAME_PROP = "core_node_name"; |
| public static final String ROLES_PROP = "roles"; |
| public static final String STATE_PROP = "state"; |
| public static final String CORE_NAME_PROP = "core"; |
| public static final String COLLECTION_PROP = "collection"; |
| public static final String SHARD_ID_PROP = "shard"; |
| public static final String SHARD_RANGE_PROP = "shard_range"; |
| public static final String SHARD_STATE_PROP = "shard_state"; |
| public static final String NUM_SHARDS_PROP = "numShards"; |
| public static final String LEADER_PROP = "leader"; |
| |
| public static final String COLLECTIONS_ZKNODE = "/collections"; |
| public static final String LIVE_NODES_ZKNODE = "/live_nodes"; |
| public static final String ALIASES = "/aliases.json"; |
| public static final String CLUSTER_STATE = "/clusterstate.json"; |
| |
| public static final String RECOVERING = "recovering"; |
| public static final String RECOVERY_FAILED = "recovery_failed"; |
| public static final String ACTIVE = "active"; |
| public static final String DOWN = "down"; |
| public static final String SYNC = "sync"; |
| |
| private volatile ClusterState clusterState; |
| |
| private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000")); |
| |
| public static final String LEADER_ELECT_ZKNODE = "/leader_elect"; |
| |
| public static final String SHARD_LEADERS_ZKNODE = "leaders"; |
| |
| |
| |
| |
| // |
| // convenience methods... should these go somewhere else? |
| // |
| public static byte[] toJSON(Object o) { |
| CharArr out = new CharArr(); |
| new JSONWriter(out, 2).write(o); // indentation by default |
| return toUTF8(out); |
| } |
| |
| public static byte[] toUTF8(CharArr out) { |
| byte[] arr = new byte[out.size() << 2]; // is 4x the real worst-case upper-bound? |
| int nBytes = ByteUtils.UTF16toUTF8(out, 0, out.size(), arr, 0); |
| return Arrays.copyOf(arr, nBytes); |
| } |
| |
| public static Object fromJSON(byte[] utf8) { |
| // convert directly from bytes to chars |
| // and parse directly from that instead of going through |
| // intermediate strings or readers |
| CharArr chars = new CharArr(); |
| ByteUtils.UTF8toUTF16(utf8, 0, utf8.length, chars); |
| JSONParser parser = new JSONParser(chars.getArray(), chars.getStart(), chars.length()); |
| try { |
| return ObjectBuilder.getVal(parser); |
| } catch (IOException e) { |
| throw new RuntimeException(e); // should never happen w/o using real IO |
| } |
| } |
| |
| |
| private static class ZKTF implements ThreadFactory { |
| private static ThreadGroup tg = new ThreadGroup("ZkStateReader"); |
| @Override |
| public Thread newThread(Runnable r) { |
| Thread td = new Thread(tg, r); |
| td.setDaemon(true); |
| return td; |
| } |
| } |
| private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1, new ZKTF()); |
| |
| private boolean clusterStateUpdateScheduled; |
| |
| private SolrZkClient zkClient; |
| |
| private boolean closeClient = false; |
| |
| private ZkCmdExecutor cmdExecutor; |
| |
| private Aliases aliases = new Aliases(); |
| |
| private volatile boolean closed = false; |
| |
| public ZkStateReader(SolrZkClient zkClient) { |
| this.zkClient = zkClient; |
| initZkCmdExecutor(zkClient.getZkClientTimeout()); |
| } |
| |
| public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException { |
| closeClient = true; |
| initZkCmdExecutor(zkClientTimeout); |
| zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, |
| // on reconnect, reload cloud info |
| new OnReconnect() { |
| |
| @Override |
| public void command() { |
| try { |
| ZkStateReader.this.createClusterStateWatchersAndUpdate(); |
| } catch (KeeperException e) { |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, |
| "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, |
| "", e); |
| } |
| |
| } |
| }); |
| } |
| |
| private void initZkCmdExecutor(int zkClientTimeout) { |
| // we must retry at least as long as the session timeout |
| cmdExecutor = new ZkCmdExecutor(zkClientTimeout); |
| } |
| |
| // load and publish a new CollectionInfo |
| public void updateClusterState(boolean immediate) throws KeeperException, InterruptedException { |
| updateClusterState(immediate, false); |
| } |
| |
| // load and publish a new CollectionInfo |
| public void updateLiveNodes() throws KeeperException, InterruptedException { |
| updateClusterState(true, true); |
| } |
| |
| public Aliases getAliases() { |
| return aliases; |
| } |
| |
| public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException, |
| InterruptedException { |
| // We need to fetch the current cluster state and the set of live nodes |
| |
| synchronized (getUpdateLock()) { |
| cmdExecutor.ensureExists(CLUSTER_STATE, zkClient); |
| cmdExecutor.ensureExists(ALIASES, zkClient); |
| |
| log.info("Updating cluster state from ZooKeeper... "); |
| |
| zkClient.exists(CLUSTER_STATE, new Watcher() { |
| |
| @Override |
| public void process(WatchedEvent event) { |
| // session events are not change events, |
| // and do not remove the watcher |
| if (EventType.None.equals(event.getType())) { |
| return; |
| } |
| log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event) , ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size()); |
| try { |
| |
| // delayed approach |
| // ZkStateReader.this.updateClusterState(false, false); |
| synchronized (ZkStateReader.this.getUpdateLock()) { |
| // remake watch |
| final Watcher thisWatch = this; |
| Stat stat = new Stat(); |
| byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat , |
| true); |
| List<String> liveNodes = zkClient.getChildren( |
| LIVE_NODES_ZKNODE, this, true); |
| |
| Set<String> liveNodesSet = new HashSet<String>(); |
| liveNodesSet.addAll(liveNodes); |
| Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes(); |
| ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln); |
| // update volatile |
| ZkStateReader.this.clusterState = clusterState; |
| } |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.SESSIONEXPIRED |
| || e.code() == KeeperException.Code.CONNECTIONLOSS) { |
| log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); |
| return; |
| } |
| log.error("", e); |
| throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, |
| "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.warn("", e); |
| return; |
| } |
| } |
| |
| }, true); |
| } |
| |
| |
| synchronized (ZkStateReader.this.getUpdateLock()) { |
| List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, |
| new Watcher() { |
| |
| @Override |
| public void process(WatchedEvent event) { |
| // session events are not change events, |
| // and do not remove the watcher |
| if (EventType.None.equals(event.getType())) { |
| return; |
| } |
| try { |
| // delayed approach |
| // ZkStateReader.this.updateClusterState(false, true); |
| synchronized (ZkStateReader.this.getUpdateLock()) { |
| List<String> liveNodes = zkClient.getChildren( |
| LIVE_NODES_ZKNODE, this, true); |
| log.info("Updating live nodes... ({})", liveNodes.size()); |
| Set<String> liveNodesSet = new HashSet<String>(); |
| liveNodesSet.addAll(liveNodes); |
| ClusterState clusterState = new ClusterState( |
| ZkStateReader.this.clusterState.getZkClusterStateVersion(), |
| liveNodesSet, ZkStateReader.this.clusterState |
| .getCollectionStates()); |
| ZkStateReader.this.clusterState = clusterState; |
| } |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.SESSIONEXPIRED |
| || e.code() == KeeperException.Code.CONNECTIONLOSS) { |
| log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); |
| return; |
| } |
| log.error("", e); |
| throw new ZooKeeperException( |
| SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.warn("", e); |
| return; |
| } |
| } |
| |
| }, true); |
| |
| Set<String> liveNodeSet = new HashSet<String>(); |
| liveNodeSet.addAll(liveNodes); |
| ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet); |
| this.clusterState = clusterState; |
| |
| zkClient.exists(ALIASES, |
| new Watcher() { |
| |
| @Override |
| public void process(WatchedEvent event) { |
| // session events are not change events, |
| // and do not remove the watcher |
| if (EventType.None.equals(event.getType())) { |
| return; |
| } |
| try { |
| synchronized (ZkStateReader.this.getUpdateLock()) { |
| log.info("Updating aliases... "); |
| |
| // remake watch |
| final Watcher thisWatch = this; |
| Stat stat = new Stat(); |
| byte[] data = zkClient.getData(ALIASES, thisWatch, stat , |
| true); |
| |
| Aliases aliases = ClusterState.load(data); |
| |
| ZkStateReader.this.aliases = aliases; |
| } |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.SESSIONEXPIRED |
| || e.code() == KeeperException.Code.CONNECTIONLOSS) { |
| log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); |
| return; |
| } |
| log.error("", e); |
| throw new ZooKeeperException( |
| SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.warn("", e); |
| return; |
| } |
| } |
| |
| }, true); |
| } |
| updateAliases(); |
| } |
| |
| |
| // load and publish a new CollectionInfo |
| private synchronized void updateClusterState(boolean immediate, |
| final boolean onlyLiveNodes) throws KeeperException, |
| InterruptedException { |
| // build immutable CloudInfo |
| |
| if (immediate) { |
| ClusterState clusterState; |
| synchronized (getUpdateLock()) { |
| List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null, |
| true); |
| Set<String> liveNodesSet = new HashSet<String>(); |
| liveNodesSet.addAll(liveNodes); |
| |
| if (!onlyLiveNodes) { |
| log.info("Updating cloud state from ZooKeeper... "); |
| |
| clusterState = ClusterState.load(zkClient, liveNodesSet); |
| } else { |
| log.info("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size()); |
| clusterState = new ClusterState( |
| ZkStateReader.this.clusterState.getZkClusterStateVersion(), liveNodesSet, |
| ZkStateReader.this.clusterState.getCollectionStates()); |
| } |
| this.clusterState = clusterState; |
| } |
| |
| } else { |
| if (clusterStateUpdateScheduled) { |
| log.info("Cloud state update for ZooKeeper already scheduled"); |
| return; |
| } |
| log.info("Scheduling cloud state update from ZooKeeper..."); |
| clusterStateUpdateScheduled = true; |
| updateCloudExecutor.schedule(new Runnable() { |
| |
| @Override |
| public void run() { |
| log.info("Updating cluster state from ZooKeeper..."); |
| synchronized (getUpdateLock()) { |
| clusterStateUpdateScheduled = false; |
| ClusterState clusterState; |
| try { |
| List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, |
| null, true); |
| Set<String> liveNodesSet = new HashSet<String>(); |
| liveNodesSet.addAll(liveNodes); |
| |
| if (!onlyLiveNodes) { |
| log.info("Updating cloud state from ZooKeeper... "); |
| |
| clusterState = ClusterState.load(zkClient, liveNodesSet); |
| } else { |
| log.info("Updating live nodes from ZooKeeper... "); |
| clusterState = new ClusterState(ZkStateReader.this.clusterState.getZkClusterStateVersion(), liveNodesSet, ZkStateReader.this.clusterState.getCollectionStates()); |
| } |
| |
| ZkStateReader.this.clusterState = clusterState; |
| |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.SESSIONEXPIRED |
| || e.code() == KeeperException.Code.CONNECTIONLOSS) { |
| log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); |
| return; |
| } |
| log.error("", e); |
| throw new ZooKeeperException( |
| SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.error("", e); |
| throw new ZooKeeperException( |
| SolrException.ErrorCode.SERVER_ERROR, "", e); |
| } |
| // update volatile |
| ZkStateReader.this.clusterState = clusterState; |
| } |
| } |
| }, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS); |
| } |
| |
| } |
| |
| /** |
| * @return information about the cluster from ZooKeeper |
| */ |
| public ClusterState getClusterState() { |
| return clusterState; |
| } |
| |
| public Object getUpdateLock() { |
| return this; |
| } |
| |
| public void close() { |
| this.closed = true; |
| if (closeClient) { |
| zkClient.close(); |
| } |
| } |
| |
| abstract class RunnableWatcher implements Runnable { |
| Watcher watcher; |
| public RunnableWatcher(Watcher watcher){ |
| this.watcher = watcher; |
| } |
| |
| } |
| |
| public String getLeaderUrl(String collection, String shard, int timeout) |
| throws InterruptedException, KeeperException { |
| ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, |
| shard, timeout)); |
| return props.getCoreUrl(); |
| } |
| |
| /** |
| * Get shard leader properties, with retry if none exist. |
| */ |
| public Replica getLeaderRetry(String collection, String shard) throws InterruptedException { |
| return getLeaderRetry(collection, shard, 1000); |
| } |
| |
| /** |
| * Get shard leader properties, with retry if none exist. |
| */ |
| public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException { |
| long timeoutAt = System.currentTimeMillis() + timeout; |
| while (System.currentTimeMillis() < timeoutAt && !closed) { |
| if (clusterState != null) { |
| Replica replica = clusterState.getLeader(collection, shard); |
| if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) { |
| return replica; |
| } |
| } |
| Thread.sleep(50); |
| } |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found, collection:" + collection + " slice:" + shard); |
| } |
| |
| /** |
| * Get path where shard leader properties live in zookeeper. |
| */ |
| public static String getShardLeadersPath(String collection, String shardId) { |
| return COLLECTIONS_ZKNODE + "/" + collection + "/" |
| + SHARD_LEADERS_ZKNODE + (shardId != null ? ("/" + shardId) |
| : ""); |
| } |
| |
| public List<ZkCoreNodeProps> getReplicaProps(String collection, |
| String shardId, String thisCoreNodeName, String coreName) { |
| return getReplicaProps(collection, shardId, thisCoreNodeName, coreName, null); |
| } |
| |
| public List<ZkCoreNodeProps> getReplicaProps(String collection, |
| String shardId, String thisCoreNodeName, String coreName, String mustMatchStateFilter) { |
| return getReplicaProps(collection, shardId, thisCoreNodeName, coreName, mustMatchStateFilter, null); |
| } |
| |
| public List<ZkCoreNodeProps> getReplicaProps(String collection, |
| String shardId, String thisCoreNodeName, String coreName, String mustMatchStateFilter, String mustNotMatchStateFilter) { |
| ClusterState clusterState = this.clusterState; |
| if (clusterState == null) { |
| return null; |
| } |
| Map<String,Slice> slices = clusterState.getSlicesMap(collection); |
| if (slices == null) { |
| throw new ZooKeeperException(ErrorCode.BAD_REQUEST, |
| "Could not find collection in zk: " + collection + " " |
| + clusterState.getCollections()); |
| } |
| |
| Slice replicas = slices.get(shardId); |
| if (replicas == null) { |
| throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId); |
| } |
| |
| Map<String,Replica> shardMap = replicas.getReplicasMap(); |
| List<ZkCoreNodeProps> nodes = new ArrayList<ZkCoreNodeProps>(shardMap.size()); |
| for (Entry<String,Replica> entry : shardMap.entrySet()) { |
| ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue()); |
| |
| String coreNodeName = entry.getValue().getName(); |
| |
| if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) { |
| if (mustMatchStateFilter == null || mustMatchStateFilter.equals(nodeProps.getState())) { |
| if (mustNotMatchStateFilter == null || !mustNotMatchStateFilter.equals(nodeProps.getState())) { |
| nodes.add(nodeProps); |
| } |
| } |
| } |
| } |
| if (nodes.size() == 0) { |
| // no replicas - go local |
| return null; |
| } |
| |
| return nodes; |
| } |
| |
| public SolrZkClient getZkClient() { |
| return zkClient; |
| } |
| |
| public void updateAliases() throws KeeperException, InterruptedException { |
| byte[] data = zkClient.getData(ALIASES, null, null, true); |
| |
| Aliases aliases = ClusterState.load(data); |
| |
| ZkStateReader.this.aliases = aliases; |
| } |
| |
| } |