blob: 436aa93d636d335d89e30148bc11b3fb98692c56 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.overseer;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.api.collections.SplitShardCmd;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TestInjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence;
import static org.apache.solr.cloud.overseer.SliceMutator.getZkClient;
import static org.apache.solr.common.params.CommonParams.NAME;
public class ReplicaMutator {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final SolrCloudManager cloudManager;
protected final DistribStateManager stateManager;
protected SolrZkClient zkClient;
public ReplicaMutator(SolrCloudManager cloudManager) {
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getDistribStateManager();
this.zkClient = getZkClient(cloudManager);
}
protected Replica setProperty(Replica replica, String key, String value) {
assert key != null;
assert value != null;
if (StringUtils.equalsIgnoreCase(replica.getStr(key), value))
return replica; // already the value we're going to set
Map<String, Object> replicaProps = new LinkedHashMap<>(replica.getProperties());
replicaProps.put(key, value);
return new Replica(replica.getName(), replicaProps, replica.getCollection(), replica.getSlice());
}
protected Replica unsetProperty(Replica replica, String key) {
assert key != null;
if (!replica.containsKey(key)) return replica;
Map<String, Object> replicaProps = new LinkedHashMap<>(replica.getProperties());
replicaProps.remove(key);
return new Replica(replica.getName(), replicaProps, replica.getCollection(), replica.getSlice());
}
protected Replica setLeader(Replica replica) {
return setProperty(replica, ZkStateReader.LEADER_PROP, "true");
}
protected Replica unsetLeader(Replica replica) {
return unsetProperty(replica, ZkStateReader.LEADER_PROP);
}
protected Replica setState(Replica replica, String state) {
assert state != null;
return setProperty(replica, ZkStateReader.STATE_PROP, state);
}
public ZkWriteCommand addReplicaProperty(ClusterState clusterState, ZkNodeProps message) {
if (!checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) ||
!checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) ||
!checkKeyExistence(message, ZkStateReader.REPLICA_PROP) ||
!checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) ||
!checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Overseer ADDREPLICAPROP requires " +
ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " and " +
ZkStateReader.PROPERTY_VALUE_PROP + " no action taken.");
}
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
if (StringUtils.startsWith(property, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) {
property = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + property;
}
property = property.toLowerCase(Locale.ROOT);
String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP);
String shardUnique = message.getStr(OverseerCollectionMessageHandler.SHARD_UNIQUE);
boolean isUnique = false;
if (SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property)) {
if (StringUtils.isNotBlank(shardUnique) && Boolean.parseBoolean(shardUnique) == false) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Overseer ADDREPLICAPROP for " +
property + " cannot have " + OverseerCollectionMessageHandler.SHARD_UNIQUE + " set to anything other than" +
"'true'. No action taken");
}
isUnique = true;
} else {
isUnique = Boolean.parseBoolean(shardUnique);
}
DocCollection collection = clusterState.getCollection(collectionName);
Replica replica = collection.getReplica(replicaName);
if (replica == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " +
collectionName + "/" + sliceName + "/" + replicaName + " no action taken.");
}
log.info("Setting property {} with value {} for collection {}", property, propVal, collectionName);
log.debug("Full message: {}", message);
if (StringUtils.equalsIgnoreCase(replica.getStr(property), propVal))
return ZkStateWriter.NO_OP; // already the value we're going to set
// OK, there's no way we won't change the cluster state now
Map<String, Replica> replicas = collection.getSlice(sliceName).getReplicasCopy();
if (isUnique == false) {
replicas.get(replicaName).getProperties().put(property, propVal);
} else { // Set prop for this replica, but remove it for all others.
for (Replica rep : replicas.values()) {
if (rep.getName().equalsIgnoreCase(replicaName)) {
rep.getProperties().put(property, propVal);
} else {
rep.getProperties().remove(property);
}
}
}
Slice newSlice = new Slice(sliceName, replicas, collection.getSlice(sliceName).shallowCopy(),collectionName);
DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection,
newSlice);
return new ZkWriteCommand(collectionName, newCollection);
}
public ZkWriteCommand deleteReplicaProperty(ClusterState clusterState, ZkNodeProps message) {
if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false ||
checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false ||
checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false ||
checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Overseer DELETEREPLICAPROP requires " +
ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " no action taken.");
}
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
if (StringUtils.startsWith(property, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) {
property = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + property;
}
DocCollection collection = clusterState.getCollection(collectionName);
Replica replica = collection.getReplica(replicaName);
if (replica == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " +
collectionName + "/" + sliceName + "/" + replicaName + " no action taken.");
}
log.info("Deleting property {} for collection: {} slice: {} replica: {}", property, collectionName, sliceName, replicaName);
log.debug("Full message: {}", message);
String curProp = replica.getStr(property);
if (curProp == null) return ZkStateWriter.NO_OP; // not there anyway, nothing to do.
Slice slice = collection.getSlice(sliceName);
DocCollection newCollection = SliceMutator.updateReplica(collection,
slice, replicaName, unsetProperty(replica, property));
return new ZkWriteCommand(collectionName, newCollection);
}
public ZkWriteCommand setState(ClusterState clusterState, ZkNodeProps message) {
if (Overseer.isLegacy(cloudManager.getClusterStateProvider())) {
return updateState(clusterState, message);
} else {
return updateStateNew(clusterState, message);
}
}
protected ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps message) {
final String cName = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
log.debug("Update state numShards={} message={}", numShards, message);
List<String> shardNames = new ArrayList<>();
ZkWriteCommand writeCommand = null;
ClusterState newState = null;
//collection does not yet exist, create placeholders if num shards is specified
boolean collectionExists = prevState.hasCollection(cName);
if (!collectionExists && numShards != null) {
ClusterStateMutator.getShardNames(numShards, shardNames);
Map<String, Object> createMsg = Utils.makeMap(NAME, cName);
createMsg.putAll(message.getProperties());
writeCommand = new ClusterStateMutator(cloudManager).createCollection(prevState, new ZkNodeProps(createMsg));
DocCollection collection = writeCommand.collection;
newState = ClusterStateMutator.newState(prevState, cName, collection);
}
return updateState(newState != null ? newState : prevState,
message, cName, numShards, collectionExists);
}
private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps message, String collectionName, Integer numShards, boolean collectionExists) {
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
boolean forceSetState = message.getBool(ZkStateReader.FORCE_SET_STATE_PROP, true);
DocCollection collection = prevState.getCollectionOrNull(collectionName);
if (!forceSetState && !CloudUtil.replicaExists(prevState, collectionName, sliceName, coreNodeName)) {
log.info("Failed to update state because the replica does not exist, {}", message);
return ZkStateWriter.NO_OP;
}
boolean persistCollectionState = collection != null && collection.isPerReplicaState();
if (coreNodeName == null) {
coreNodeName = ClusterStateMutator.getAssignedCoreNodeName(collection,
message.getStr(ZkStateReader.NODE_NAME_PROP), message.getStr(ZkStateReader.CORE_NAME_PROP));
if (coreNodeName != null) {
log.debug("node={} is already registered", coreNodeName);
} else {
if (!forceSetState) {
log.info("Failed to update state because the replica does not exist, {}", message);
return ZkStateWriter.NO_OP;
}
persistCollectionState = true;
// if coreNodeName is null, auto assign one
coreNodeName = Assign.assignCoreNodeName(stateManager, collection);
}
message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP,
coreNodeName);
}
// use the provided non null shardId
if (sliceName == null) {
//get shardId from ClusterState
sliceName = ClusterStateMutator.getAssignedId(collection, coreNodeName);
if (sliceName != null) {
log.debug("shard={} is already registered", sliceName);
}
persistCollectionState = true;
}
if (sliceName == null) {
//request new shardId
if (collectionExists) {
// use existing numShards
numShards = collection.getSlices().size();
log.debug("Collection already exists with {} = {}", ZkStateReader.NUM_SHARDS_PROP, numShards);
}
sliceName = Assign.assignShard(collection, numShards);
log.info("Assigning new node to shard shard={}", sliceName);
persistCollectionState = true;
}
Slice slice = collection != null ? collection.getSlice(sliceName) : null;
Map<String, Object> replicaProps = new LinkedHashMap<>(message.getProperties());
if (slice != null) {
Replica oldReplica = slice.getReplica(coreNodeName);
if (oldReplica != null) {
if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
}
replicaProps.put(ZkStateReader.REPLICA_TYPE, oldReplica.getType().toString());
// Move custom props over.
for (Map.Entry<String, Object> ent : oldReplica.getProperties().entrySet()) {
if (ent.getKey().startsWith(OverseerCollectionMessageHandler.COLL_PROP_PREFIX)) {
replicaProps.put(ent.getKey(), ent.getValue());
}
}
}
}
// we don't put these in the clusterstate
replicaProps.remove(ZkStateReader.NUM_SHARDS_PROP);
replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
replicaProps.remove(ZkStateReader.SHARD_ID_PROP);
replicaProps.remove(ZkStateReader.COLLECTION_PROP);
replicaProps.remove(Overseer.QUEUE_OPERATION);
// remove any props with null values
Set<Map.Entry<String, Object>> entrySet = replicaProps.entrySet();
List<String> removeKeys = new ArrayList<>();
for (Map.Entry<String, Object> entry : entrySet) {
if (entry.getValue() == null) {
removeKeys.add(entry.getKey());
}
}
for (String removeKey : removeKeys) {
replicaProps.remove(removeKey);
}
replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
// remove shard specific properties
String shardRange = (String) replicaProps.remove(ZkStateReader.SHARD_RANGE_PROP);
String shardState = (String) replicaProps.remove(ZkStateReader.SHARD_STATE_PROP);
String shardParent = (String) replicaProps.remove(ZkStateReader.SHARD_PARENT_PROP);
Replica replica = new Replica(coreNodeName, replicaProps, collectionName, sliceName);
log.debug("Will update state for replica: {}", replica);
Map<String, Object> sliceProps = null;
Map<String, Replica> replicas;
if (slice != null) {
collection = checkAndCompleteShardSplit(prevState, collection, coreNodeName, sliceName, replica);
// get the current slice again because it may have been updated due to checkAndCompleteShardSplit method
slice = collection.getSlice(sliceName);
sliceProps = slice.getProperties();
replicas = slice.getReplicasCopy();
} else {
replicas = new HashMap<>(1);
sliceProps = new HashMap<>();
sliceProps.put(Slice.RANGE, shardRange);
sliceProps.put(ZkStateReader.STATE_PROP, shardState);
sliceProps.put(Slice.PARENT, shardParent);
}
replicas.put(replica.getName(), replica);
slice = new Slice(sliceName, replicas, sliceProps, collectionName);
DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
log.debug("Collection is now: {}", newCollection);
if (collection != null && collection.isPerReplicaState()) {
PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
return new ZkWriteCommand(collectionName, newCollection, PerReplicaStatesOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
} else{
return new ZkWriteCommand(collectionName, newCollection);
}
}
/**
* Handles non-legacy state updates
*/
protected ZkWriteCommand updateStateNew(ClusterState clusterState, final ZkNodeProps message) {
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
if (collectionName == null || sliceName == null) {
log.error("Invalid collection and slice {}", message);
return ZkStateWriter.NO_OP;
}
DocCollection collection = clusterState.getCollectionOrNull(collectionName);
Slice slice = collection != null ? collection.getSlice(sliceName) : null;
if (slice == null) {
log.error("No such slice exists {}", message);
return ZkStateWriter.NO_OP;
}
return updateState(clusterState, message);
}
private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Replica replica) {
Slice slice = collection.getSlice(sliceName);
Map<String, Object> sliceProps = slice.getProperties();
if (slice.getState() == Slice.State.RECOVERY) {
log.info("Shard: {} is in recovery state", sliceName);
// is this replica active?
if (replica.getState() == Replica.State.ACTIVE) {
log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName);
// are all other replicas also active?
boolean allActive = true;
for (Map.Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
if (coreNodeName.equals(entry.getKey())) continue;
if (entry.getValue().getState() != Replica.State.ACTIVE) {
allActive = false;
break;
}
}
if (allActive) {
if (log.isInfoEnabled()) {
log.info("Shard: {} - all {} replicas are active. Finding status of fellow sub-shards", sliceName, slice.getReplicasMap().size());
}
// find out about other sub shards
Map<String, Slice> allSlicesCopy = new HashMap<>(collection.getSlicesMap());
List<Slice> subShardSlices = new ArrayList<>();
outer:
for (Map.Entry<String, Slice> entry : allSlicesCopy.entrySet()) {
if (sliceName.equals(entry.getKey()))
continue;
Slice otherSlice = entry.getValue();
if (otherSlice.getState() == Slice.State.RECOVERY) {
if (slice.getParent() != null && slice.getParent().equals(otherSlice.getParent())) {
if (log.isInfoEnabled()) {
log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName());
}
// this is a fellow sub shard so check if all replicas are active
for (Map.Entry<String, Replica> sliceEntry : otherSlice.getReplicasMap().entrySet()) {
if (sliceEntry.getValue().getState() != Replica.State.ACTIVE) {
allActive = false;
break outer;
}
}
if (log.isInfoEnabled()) {
log.info("Shard: {} - Fellow sub-shard: {} has all {} replicas active", sliceName, otherSlice.getName(), otherSlice.getReplicasMap().size());
}
subShardSlices.add(otherSlice);
}
}
}
if (allActive) {
// hurray, all sub shard replicas are active
log.info("Shard: {} - All replicas across all fellow sub-shards are now ACTIVE.", sliceName);
String parentSliceName = (String) sliceProps.remove(Slice.PARENT);
// now lets see if the parent leader is still the same or else there's a chance of data loss
// see SOLR-9438 for details
String shardParentZkSession = (String) sliceProps.remove("shard_parent_zk_session");
String shardParentNode = (String) sliceProps.remove("shard_parent_node");
boolean isLeaderSame = true;
if (shardParentNode != null && shardParentZkSession != null) {
log.info("Checking whether sub-shard leader node is still the same one at {} with ZK session id {}", shardParentNode, shardParentZkSession);
try {
VersionedData leaderZnode = null;
try {
leaderZnode = stateManager.getData(ZkStateReader.LIVE_NODES_ZKNODE
+ "/" + shardParentNode, null);
} catch (NoSuchElementException e) {
// ignore
}
if (leaderZnode == null) {
log.error("The shard leader node: {} is not live anymore!", shardParentNode);
isLeaderSame = false;
} else if (!shardParentZkSession.equals(leaderZnode.getOwner())) {
log.error("The zk session id for shard leader node: {} has changed from {} to {}",
shardParentNode, shardParentZkSession, leaderZnode.getOwner());
isLeaderSame = false;
}
} catch (Exception e) {
log.warn("Error occurred while checking if parent shard node is still live with the same zk session id. {}"
, "We cannot switch shard states at this time.", e);
return collection; // we aren't going to make any changes right now
}
}
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
if (isLeaderSame) {
log.info("Sub-shard leader node is still the same one at {} with ZK session id {}. Preparing to switch shard states.", shardParentNode, shardParentZkSession);
propMap.put(parentSliceName, Slice.State.INACTIVE.toString());
propMap.put(sliceName, Slice.State.ACTIVE.toString());
long now = cloudManager.getTimeSource().getEpochTimeNs();
for (Slice subShardSlice : subShardSlices) {
propMap.put(subShardSlice.getName(), Slice.State.ACTIVE.toString());
String lastTimeStr = subShardSlice.getStr(ZkStateReader.STATE_TIMESTAMP_PROP);
if (lastTimeStr != null) {
long start = Long.parseLong(lastTimeStr);
if (log.isInfoEnabled()) {
log.info("TIMINGS: Sub-shard {} recovered in {} ms", subShardSlice.getName(),
TimeUnit.MILLISECONDS.convert(now - start, TimeUnit.NANOSECONDS));
}
} else {
if (log.isInfoEnabled()) {
log.info("TIMINGS Sub-shard {} not available: {}", subShardSlice.getName(), subShardSlice);
}
}
}
} else {
// we must mark the shard split as failed by switching sub-shards to recovery_failed state
propMap.put(sliceName, Slice.State.RECOVERY_FAILED.toString());
for (Slice subShardSlice : subShardSlices) {
propMap.put(subShardSlice.getName(), Slice.State.RECOVERY_FAILED.toString());
}
}
TestInjection.injectSplitLatch();
try {
SplitShardCmd.unlockForSplit(cloudManager, collection.getName(), parentSliceName);
} catch (Exception e) {
log.warn("Failed to unlock shard after {} successful split: {} / {}"
, (isLeaderSame ? "" : "un"), collection.getName(), parentSliceName);
}
ZkNodeProps m = new ZkNodeProps(propMap);
return new SliceMutator(cloudManager).updateShardState(prevState, m).collection;
}
}
}
}
return collection;
}
}