blob: e69dcbfc61f8352f1ab5b73e76da070acfba92fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.overseer;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Timer;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.Stats;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonMap;
/**
* ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for
* both stateFormat=1 collection (stored in shared /clusterstate.json in ZK) and stateFormat=2 collections
* each of which get their own individual state.json in ZK.
*
* Updates to the cluster state are specified using the
* {@link #enqueueUpdate(ClusterState, List, ZkWriteCallback)} method. The class buffers updates
* to reduce the number of writes to ZK. The buffered updates are flushed during <code>enqueueUpdate</code>
* automatically if necessary. The {@link #writePendingUpdates()} can be used to force flush any pending updates.
*
* If either {@link #enqueueUpdate(ClusterState, List, ZkWriteCallback)} or {@link #writePendingUpdates()}
* throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the
* class is suspect and the current instance of the class should be discarded and a new instance should be created
* and used for any future updates.
*/
public class ZkStateWriter {
private static final long MAX_FLUSH_INTERVAL = TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state
*/
public static ZkWriteCommand NO_OP = ZkWriteCommand.noop();
protected final ZkStateReader reader;
protected final Stats stats;
protected Map<String, ZkWriteCommand> updates = new HashMap<>();
private int numUpdates = 0;
protected ClusterState clusterState = null;
protected boolean isClusterStateModified = false;
protected long lastUpdatedTime = 0;
/**
* Set to true if we ever get a BadVersionException so that we can disallow future operations
* with this instance
*/
protected boolean invalidState = false;
public ZkStateWriter(ZkStateReader zkStateReader, Stats stats) {
assert zkStateReader != null;
this.reader = zkStateReader;
this.stats = stats;
this.clusterState = zkStateReader.getClusterState();
}
/**
* Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified
* {@link ClusterState} is returned and it is expected that the caller will use the returned
* cluster state for the subsequent invocation of this method.
* <p>
* The modified state may be buffered or flushed to ZooKeeper depending on the internal buffering
* logic of this class. The {@link #hasPendingUpdates()} method may be used to determine if the
* last enqueue operation resulted in buffered state. The method {@link #writePendingUpdates()} can
* be used to force an immediate flush of pending cluster state changes.
*
* @param prevState the cluster state information on which the given <code>cmd</code> is applied
* @param cmds the list of {@link ZkWriteCommand} which specifies the change to be applied to cluster state in atomic
* @param callback a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object to be used
* for any callbacks
* @return modified cluster state created after applying <code>cmd</code> to <code>prevState</code>. If
* <code>cmd</code> is a no-op ({@link #NO_OP}) then the <code>prevState</code> is returned unmodified.
* @throws IllegalStateException if the current instance is no longer usable. The current instance must be
* discarded.
* @throws Exception on an error in ZK operations or callback. If a flush to ZooKeeper results
* in a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes unusable and
* must be discarded
*/
public ClusterState enqueueUpdate(ClusterState prevState, List<ZkWriteCommand> cmds, ZkWriteCallback callback) throws IllegalStateException, Exception {
if (invalidState) {
throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
}
if (cmds.isEmpty()) return prevState;
if (isNoOps(cmds)) return prevState;
boolean forceFlush = false;
if (cmds.size() == 1) {
//most messages result in only one command. let's deal with it right away
ZkWriteCommand cmd = cmds.get(0);
if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
//we do not wish to batch any updates for collections with per-replica state because
// these changes go to individual ZK nodes and there is zero advantage to batching
//now check if there are any updates for the same collection already present
if (updates.containsKey(cmd.name)) {
//this should not happen
// but let's get those updates out anyway
writeUpdate(updates.remove(cmd.name));
}
//now let's write the current message
try {
return writeUpdate(cmd);
} finally {
if (callback !=null) callback.onWrite();
}
}
} else {
//there are more than one commands created as a result of this message
for (ZkWriteCommand cmd : cmds) {
if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
// we don't try to optimize for this case. let's flush out all after this
forceFlush = true;
break;
}
}
}
for (ZkWriteCommand cmd : cmds) {
if (cmd == NO_OP) continue;
if (!isClusterStateModified && clusterStateGetModifiedWith(cmd, prevState)) {
isClusterStateModified = true;
}
prevState = prevState.copyWith(cmd.name, cmd.collection);
if (cmd.collection == null || cmd.collection.getStateFormat() != 1) {
updates.put(cmd.name, cmd);
numUpdates++;
}
}
clusterState = prevState;
if (forceFlush || maybeFlushAfter()) {
ClusterState state = writePendingUpdates();
if (callback != null) {
callback.onWrite();
}
return state;
}
return clusterState;
}
private boolean isNoOps(List<ZkWriteCommand> cmds) {
for (ZkWriteCommand cmd : cmds) {
if (cmd != NO_OP) return false;
}
return true;
}
/**
* Check whether {@value ZkStateReader#CLUSTER_STATE} (for stateFormat = 1) get changed given command
*/
private boolean clusterStateGetModifiedWith(ZkWriteCommand command, ClusterState state) {
DocCollection previousCollection = state.getCollectionOrNull(command.name);
boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
boolean isCurrentlyStateFormat1 = command.collection != null && command.collection.getStateFormat() == 1;
return wasPreviouslyStateFormat1 || isCurrentlyStateFormat1;
}
/**
* Logic to decide a flush after processing a list of ZkWriteCommand
*
* @return true if a flush to ZK is required, false otherwise
*/
private boolean maybeFlushAfter() {
return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL || numUpdates > Overseer.STATE_UPDATE_BATCH_SIZE;
}
public boolean hasPendingUpdates() {
return numUpdates != 0 || isClusterStateModified;
}
public ClusterState writeUpdate(ZkWriteCommand command) throws IllegalStateException, KeeperException, InterruptedException {
Map<String, ZkWriteCommand> commands = new HashMap<>();
commands.put(command.name, command);
return writePendingUpdates(commands);
}
public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
return writePendingUpdates(updates);
}
/**
* Writes all pending updates to ZooKeeper and returns the modified cluster state
*
* @return the modified cluster state
* @throws IllegalStateException if the current instance is no longer usable and must be discarded
* @throws KeeperException if any ZooKeeper operation results in an error
* @throws InterruptedException if the current thread is interrupted
*/
public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates) throws IllegalStateException, KeeperException, InterruptedException {
if (invalidState) {
throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
}
if ((updates == this.updates)
&& !hasPendingUpdates()) {
return clusterState;
}
Timer.Context timerContext = stats.time("update_state");
boolean success = false;
try {
if (!updates.isEmpty()) {
for (Map.Entry<String, ZkWriteCommand> entry : updates.entrySet()) {
String name = entry.getKey();
String path = ZkStateReader.getCollectionPath(name);
ZkWriteCommand cmd = entry.getValue();
DocCollection c = cmd.collection;
if (cmd.ops != null && cmd.ops.isPreOp()) {
cmd.ops.persist(path, reader.getZkClient());
clusterState = clusterState.copyWith(name,
cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
}
if (!cmd.persistCollState) continue;
if (c == null) {
// let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
log.debug("going to delete state.json {}", path);
reader.getZkClient().clean(path);
} else if (c.getStateFormat() > 1) {
byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
if (reader.getZkClient().exists(path, true)) {
if (log.isDebugEnabled()) {
log.debug("going to update_collection {} version: {}", path, c.getZNodeVersion());
}
Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion(), path);
clusterState = clusterState.copyWith(name, newCollection);
} else {
log.debug("going to create_collection {}", path);
reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path);
clusterState = clusterState.copyWith(name, newCollection);
}
} else if (c.getStateFormat() == 1) {
isClusterStateModified = true;
}
if (cmd.ops != null && !cmd.ops.isPreOp()) {
cmd.ops.persist(path, reader.getZkClient());
DocCollection currentCollState = clusterState.getCollection(cmd.name);
if ( currentCollState != null) {
clusterState = clusterState.copyWith(name,
currentCollState.copyWith(PerReplicaStates.fetch(currentCollState.getZNode(), reader.getZkClient(), null)));
}
}
}
updates.clear();
numUpdates = 0;
}
if (isClusterStateModified) {
assert clusterState.getZkClusterStateVersion() >= 0;
byte[] data = Utils.toJSON(clusterState);
Stat stat = reader.getZkClient().setData(ZkStateReader.CLUSTER_STATE, data, clusterState.getZkClusterStateVersion(), true);
Map<String, DocCollection> collections = clusterState.getCollectionsMap();
// use the reader's live nodes because our cluster state's live nodes may be stale
clusterState = new ClusterState(stat.getVersion(), reader.getClusterState().getLiveNodes(), collections);
isClusterStateModified = false;
}
lastUpdatedTime = System.nanoTime();
success = true;
} catch (KeeperException.BadVersionException bve) {
// this is a tragic error, we must disallow usage of this instance
invalidState = true;
throw bve;
} finally {
timerContext.stop();
if (success) {
stats.success("update_state");
} else {
stats.error("update_state");
}
}
log.trace("New Cluster State is: {}", clusterState);
return clusterState;
}
/**
* @return the most up-to-date cluster state until the last enqueueUpdate operation
*/
public ClusterState getClusterState() {
return clusterState;
}
public interface ZkWriteCallback {
/**
* Called by ZkStateWriter if state is flushed to ZK
*/
void onWrite() throws Exception;
}
}