| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.cloud.overseer; |
| |
| import static java.util.Collections.singletonMap; |
| |
| import com.codahale.metrics.Timer; |
| import java.lang.invoke.MethodHandles; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| import org.apache.solr.cloud.Overseer; |
| import org.apache.solr.cloud.Stats; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.PerReplicaStatesFetcher; |
| import org.apache.solr.common.cloud.PerReplicaStatesOps; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.util.Utils; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for |
| * collections each of which gets their own individual state.json in ZK. |
| * |
| * <p>Updates to the cluster state are specified using the {@link #enqueueUpdate(ClusterState, List, |
| * ZkWriteCallback)} method. The class buffers updates to reduce the number of writes to ZK. The |
| * buffered updates are flushed during <code>enqueueUpdate</code> automatically if necessary. The |
| * {@link #writePendingUpdates()} can be used to force flush any pending updates. |
| * |
| * <p>If either {@link #enqueueUpdate(ClusterState, List, ZkWriteCallback)} or {@link |
| * #writePendingUpdates()} throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} |
| * then the internal buffered state of the class is suspect and the current instance of the class |
| * should be discarded and a new instance should be created and used for any future updates. |
| */ |
| public class ZkStateWriter { |
| private static final long MAX_FLUSH_INTERVAL = |
| TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS); |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| /** |
| * Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state |
| */ |
| public static ZkWriteCommand NO_OP = ZkWriteCommand.NO_OP; |
| |
| protected final ZkStateReader reader; |
| protected final Stats stats; |
| |
| protected Map<String, ZkWriteCommand> updates = new HashMap<>(); |
| private int numUpdates = 0; |
| protected ClusterState clusterState = null; |
| protected long lastUpdatedTime = 0; |
| |
| /** |
| * Set to true if we ever get a BadVersionException so that we can disallow future operations with |
| * this instance |
| */ |
| protected boolean invalidState = false; |
| |
| public ZkStateWriter(ZkStateReader zkStateReader, Stats stats) { |
| assert zkStateReader != null; |
| |
| this.reader = zkStateReader; |
| this.stats = stats; |
| this.clusterState = zkStateReader.getClusterState(); |
| } |
| |
| /** |
| * if any collection is updated not through this class (directly written to ZK, then it needs to |
| * be updated locally) |
| */ |
| public void updateClusterState(Function<ClusterState, ClusterState> fun) { |
| clusterState = fun.apply(clusterState); |
| } |
| |
| /** |
| * Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified {@link |
| * ClusterState} is returned and it is expected that the caller will use the returned cluster |
| * state for the subsequent invocation of this method. |
| * |
| * <p>The modified state may be buffered or flushed to ZooKeeper depending on the internal |
| * buffering logic of this class. The {@link #hasPendingUpdates()} method may be used to determine |
| * if the last enqueue operation resulted in buffered state. The method {@link |
| * #writePendingUpdates()} can be used to force an immediate flush of pending cluster state |
| * changes. |
| * |
| * @param prevState the cluster state information on which the given <code>cmd</code> is applied |
| * @param cmds the list of {@link ZkWriteCommand} which specifies the change to be applied to |
| * cluster state in atomic |
| * @param callback a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object |
| * to be used for any callbacks |
| * @return modified cluster state created after applying <code>cmd</code> to <code>prevState |
| * </code>. If <code>cmd</code> is a no-op ({@link #NO_OP}) then the <code>prevState</code> is |
| * returned unmodified. |
| * @throws IllegalStateException if the current instance is no longer usable. The current instance |
| * must be discarded. |
| * @throws Exception on an error in ZK operations or callback. If a flush to ZooKeeper results in |
| * a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes |
| * unusable and must be discarded |
| */ |
| public ClusterState enqueueUpdate( |
| ClusterState prevState, List<ZkWriteCommand> cmds, ZkWriteCallback callback) |
| throws IllegalStateException, Exception { |
| if (invalidState) { |
| throw new IllegalStateException( |
| "ZkStateWriter has seen a tragic error, this instance can no longer be used"); |
| } |
| if (cmds.isEmpty()) return prevState; |
| if (isNoOps(cmds)) return prevState; |
| |
| boolean forceFlush = false; |
| if (cmds.size() == 1) { |
| // most messages result in only one command. let's deal with it right away |
| ZkWriteCommand cmd = cmds.get(0); |
| if (cmd.collection != null && cmd.collection.isPerReplicaState()) { |
| // we do not wish to batch any updates for collections with per-replica state because |
| // these changes go to individual ZK nodes and there is zero advantage to batching |
| // now check if there are any updates for the same collection already present |
| if (updates.containsKey(cmd.name)) { |
| // this should not happen |
| // but let's get those updates out anyway |
| writeUpdate(updates.remove(cmd.name)); |
| } |
| // now let's write the current message |
| try { |
| return writeUpdate(cmd); |
| } finally { |
| if (callback != null) callback.onWrite(); |
| } |
| } |
| } else { |
| // there are more than one commands created as a result of this message |
| for (ZkWriteCommand cmd : cmds) { |
| if (cmd.collection != null && cmd.collection.isPerReplicaState()) { |
| // we don't try to optimize for this case. let's flush out all after this |
| forceFlush = true; |
| break; |
| } |
| } |
| } |
| |
| for (ZkWriteCommand cmd : cmds) { |
| if (cmd == NO_OP) continue; |
| prevState = prevState.copyWith(cmd.name, cmd.collection); |
| updates.put(cmd.name, cmd); |
| numUpdates++; |
| } |
| clusterState = prevState; |
| |
| if (forceFlush || maybeFlushAfter()) { |
| ClusterState state = writePendingUpdates(); |
| if (callback != null) { |
| callback.onWrite(); |
| } |
| return state; |
| } |
| |
| return clusterState; |
| } |
| |
| private boolean isNoOps(List<ZkWriteCommand> cmds) { |
| for (ZkWriteCommand cmd : cmds) { |
| if (cmd != NO_OP) return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Logic to decide a flush after processing a list of ZkWriteCommand |
| * |
| * @return true if a flush to ZK is required, false otherwise |
| */ |
| private boolean maybeFlushAfter() { |
| return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL |
| || numUpdates > Overseer.STATE_UPDATE_BATCH_SIZE; |
| } |
| |
| public boolean hasPendingUpdates() { |
| return numUpdates != 0; |
| } |
| |
| public ClusterState writeUpdate(ZkWriteCommand command) |
| throws IllegalStateException, KeeperException, InterruptedException { |
| Map<String, ZkWriteCommand> commands = new HashMap<>(); |
| commands.put(command.name, command); |
| return writePendingUpdates(commands, false); |
| } |
| |
| public ClusterState writePendingUpdates() throws KeeperException, InterruptedException { |
| return writePendingUpdates(updates, true); |
| } |
| /** |
| * Writes all pending updates to ZooKeeper and returns the modified cluster state |
| * |
| * @return the modified cluster state |
| * @throws IllegalStateException if the current instance is no longer usable and must be discarded |
| * @throws KeeperException if any ZooKeeper operation results in an error |
| * @throws InterruptedException if the current thread is interrupted |
| */ |
| public ClusterState writePendingUpdates( |
| Map<String, ZkWriteCommand> updates, boolean resetPendingUpdateCounters) |
| throws IllegalStateException, KeeperException, InterruptedException { |
| if (invalidState) { |
| throw new IllegalStateException( |
| "ZkStateWriter has seen a tragic error, this instance can no longer be used"); |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug( |
| String.format( |
| Locale.ROOT, |
| "Request to write pending updates with updates of length: %d, " |
| + "pending updates of length: %d, writing all pending updates: %b", |
| updates.size(), |
| this.updates.size(), |
| updates == this.updates)); |
| } |
| |
| if ((updates == this.updates) && !hasPendingUpdates()) { |
| if (log.isDebugEnabled()) { |
| log.debug("Attempted to flush all pending updates, but there are no pending updates"); |
| } |
| return clusterState; |
| } |
| Timer.Context timerContext = stats.time("update_state"); |
| boolean success = false; |
| try { |
| if (!updates.isEmpty()) { |
| for (Map.Entry<String, ZkWriteCommand> entry : updates.entrySet()) { |
| String name = entry.getKey(); |
| String path = DocCollection.getCollectionPath(name); |
| ZkWriteCommand cmd = entry.getValue(); |
| DocCollection c = cmd.collection; |
| |
| // Update the Per Replica State znodes if needed |
| if (cmd.ops != null) { |
| cmd.ops.persist(path, reader.getZkClient()); |
| clusterState = |
| clusterState.copyWith( |
| name, |
| cmd.collection.copyWith( |
| PerReplicaStatesFetcher.fetch( |
| cmd.collection.getZNode(), reader.getZkClient(), null))); |
| } |
| |
| // Update the state.json file if needed |
| if (!cmd.persistJsonState) continue; |
| if (c == null) { |
| // let's clean up the state.json of this collection only, the rest should be cleaned by |
| // delete collection cmd |
| log.debug("going to delete state.json {}", path); |
| reader.getZkClient().clean(path); |
| } else { |
| byte[] data = Utils.toJSON(singletonMap(c.getName(), c)); |
| if (reader.getZkClient().exists(path, true)) { |
| if (log.isDebugEnabled()) { |
| log.debug("going to update_collection {} version: {}", path, c.getZNodeVersion()); |
| } |
| Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true); |
| DocCollection newCollection = |
| new DocCollection( |
| name, |
| c.getSlicesMap(), |
| c.getProperties(), |
| c.getRouter(), |
| stat.getVersion(), |
| new PerReplicaStatesFetcher.LazyPrsSupplier(reader.getZkClient(), path)); |
| clusterState = clusterState.copyWith(name, newCollection); |
| } else { |
| log.debug("going to create_collection {}", path); |
| reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true); |
| DocCollection newCollection = |
| new DocCollection( |
| name, |
| c.getSlicesMap(), |
| c.getProperties(), |
| c.getRouter(), |
| 0, |
| new PerReplicaStatesFetcher.LazyPrsSupplier(reader.getZkClient(), path)); |
| clusterState = clusterState.copyWith(name, newCollection); |
| } |
| } |
| |
| // When dealing with a per replica collection that did not do any update to the per |
| // replica states znodes but did update state.json, we add then remove a dummy node to |
| // change the cversion of the parent znode. This is not needed by Solr, there's no code |
| // watching the children and not watching the state.json node itself. It would be useful |
| // for external code watching the collection's Zookeeper state.json node children but not |
| // the node itself. |
| if (cmd.ops == null && cmd.isPerReplicaStateCollection) { |
| PerReplicaStatesOps.touchChildren().persist(path, reader.getZkClient()); |
| DocCollection currentCollState = clusterState.getCollection(cmd.name); |
| if (currentCollState != null) { |
| clusterState = |
| clusterState.copyWith( |
| name, |
| currentCollState.copyWith( |
| PerReplicaStatesFetcher.fetch( |
| currentCollState.getZNode(), reader.getZkClient(), null))); |
| } |
| } |
| } |
| |
| updates.clear(); |
| } |
| |
| if (resetPendingUpdateCounters) { |
| resetPendingUpdateCounters(); |
| } |
| success = true; |
| } catch (KeeperException.BadVersionException bve) { |
| // this is a tragic error, we must disallow usage of this instance |
| invalidState = true; |
| throw bve; |
| } finally { |
| timerContext.stop(); |
| if (success) { |
| stats.success("update_state"); |
| } else { |
| stats.error("update_state"); |
| } |
| } |
| |
| log.trace("New Cluster State is: {}", clusterState); |
| return clusterState; |
| } |
| |
| public void resetPendingUpdateCounters() { |
| lastUpdatedTime = System.nanoTime(); |
| numUpdates = 0; |
| } |
| |
| /** |
| * @return the most up-to-date cluster state until the last enqueueUpdate operation |
| */ |
| public ClusterState getClusterState() { |
| return clusterState; |
| } |
| |
| public interface ZkWriteCallback { |
| /** Called by ZkStateWriter if state is flushed to ZK */ |
| void onWrite() throws Exception; |
| } |
| } |