blob: f567b2ef2ca70705ca36254072656d683f1da9a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.api.collections;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import org.apache.solr.cloud.ActiveReplicaWatcher;
import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.TimeOut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
private final TimeSource timeSource;
public MoveReplicaCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
this.timeSource = ocmh.cloudManager.getTimeSource();
}
@Override
public void call(ClusterState state, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
moveReplica(ocmh.zkStateReader.getClusterState(), message, results);
}
private void moveReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
if (log.isDebugEnabled()) {
log.debug("moveReplica() : {}", Utils.toJSONString(message));
}
ocmh.checkRequired(message, COLLECTION_PROP, CollectionParams.TARGET_NODE);
String extCollection = message.getStr(COLLECTION_PROP);
String targetNode = message.getStr(CollectionParams.TARGET_NODE);
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
boolean inPlaceMove = message.getBool(IN_PLACE_MOVE, true);
int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
String async = message.getStr(ASYNC);
boolean followAliases = message.getBool(FOLLOW_ALIASES, false);
String collection;
if (followAliases) {
collection = ocmh.cloudManager.getClusterStateProvider().resolveSimpleAlias(extCollection);
} else {
collection = extCollection;
}
DocCollection coll = clusterState.getCollection(collection);
if (coll == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
}
if (!clusterState.getLiveNodes().contains(targetNode)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target node: " + targetNode + " not in live nodes: " + clusterState.getLiveNodes());
}
Replica replica = null;
if (message.containsKey(REPLICA_PROP)) {
String replicaName = message.getStr(REPLICA_PROP);
replica = coll.getReplica(replicaName);
if (replica == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " replica: " + replicaName + " does not exist");
}
} else {
String sourceNode = message.getStr(CollectionParams.SOURCE_NODE, message.getStr(CollectionParams.FROM_NODE));
if (sourceNode == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + CollectionParams.SOURCE_NODE +
" or '" + CollectionParams.FROM_NODE + "' is a required param");
}
String shardId = message.getStr(SHARD_ID_PROP);
if (shardId == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + SHARD_ID_PROP + "' is a required param");
}
Slice slice = coll.getSlice(shardId);
List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas(r -> sourceNode.equals(r.getNodeName())));
if (sliceReplicas.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " node: " + sourceNode + " does not have any replica belonging to shard: " + shardId);
}
Collections.shuffle(sliceReplicas, OverseerCollectionMessageHandler.RANDOM);
replica = sliceReplicas.iterator().next();
}
if (coll.getStr(CollectionAdminParams.COLOCATED_WITH) != null) {
// we must ensure that moving this replica does not cause the co-location to break
String sourceNode = replica.getNodeName();
String colocatedCollectionName = coll.getStr(CollectionAdminParams.COLOCATED_WITH);
DocCollection colocatedCollection = clusterState.getCollectionOrNull(colocatedCollectionName);
if (colocatedCollection != null) {
if (colocatedCollection.getReplica((s, r) -> sourceNode.equals(r.getNodeName())) != null) {
// check if we have at least two replicas of the collection on the source node
// only then it is okay to move one out to another node
List<Replica> replicasOnSourceNode = coll.getReplicas(replica.getNodeName());
if (replicasOnSourceNode == null || replicasOnSourceNode.size() < 2) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " is co-located with collection: " + colocatedCollectionName
+ " and has a single replica: " + replica.getName() + " on node: " + replica.getNodeName()
+ " so it is not possible to move it to another node");
}
}
}
}
log.info("Replica will be moved to node {}: {}", targetNode, replica);
Slice slice = null;
for (Slice s : coll.getSlices()) {
if (s.getReplicas().contains(replica)) {
slice = s;
}
}
assert slice != null;
Object dataDir = replica.get("dataDir");
boolean isSharedFS = replica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && dataDir != null;
if (isSharedFS && inPlaceMove) {
log.debug("-- moveHdfsReplica");
moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout, waitForFinalState);
} else {
log.debug("-- moveNormalReplica (inPlaceMove={}, isSharedFS={}", inPlaceMove, isSharedFS);
moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout, waitForFinalState);
}
}
@SuppressWarnings({"unchecked"})
private void moveHdfsReplica(ClusterState clusterState, @SuppressWarnings({"rawtypes"})NamedList results, String dataDir, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
String skipCreateReplicaInClusterState = "true";
if (clusterState.getLiveNodes().contains(replica.getNodeName())) {
skipCreateReplicaInClusterState = "false";
ZkNodeProps removeReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
REPLICA_PROP, replica.getName()
);
removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false);
removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false);
if (async != null) removeReplicasProps.getProperties().put(ASYNC, async);
@SuppressWarnings({"rawtypes"})
NamedList deleteResult = new NamedList();
try {
ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
} catch (SolrException e) {
// assume this failed completely so there's nothing to roll back
deleteResult.add("failure", e.toString());
}
if (deleteResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s",
coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure"));
log.warn(errorString);
results.add("failure", errorString);
return;
}
TimeOut timeOut = new TimeOut(20L, TimeUnit.SECONDS, timeSource);
while (!timeOut.hasTimedOut()) {
coll = ocmh.zkStateReader.getClusterState().getCollection(coll.getName());
if (coll.getReplica(replica.getName()) != null) {
timeOut.sleep(100);
} else {
break;
}
}
if (timeOut.hasTimedOut()) {
results.add("failure", "Still see deleted replica in clusterstate!");
return;
}
}
String ulogDir = replica.getStr(CoreAdminParams.ULOG_DIR);
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
CoreAdminParams.NODE, targetNode,
CoreAdminParams.CORE_NODE_NAME, replica.getName(),
CoreAdminParams.NAME, replica.getCoreName(),
WAIT_FOR_FINAL_STATE, String.valueOf(waitForFinalState),
SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, skipCreateReplicaInClusterState,
CoreAdminParams.ULOG_DIR, ulogDir.substring(0, ulogDir.lastIndexOf(UpdateLog.TLOG_NAME)),
CoreAdminParams.DATA_DIR, dataDir,
ZkStateReader.REPLICA_TYPE, replica.getType().name());
if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
@SuppressWarnings({"rawtypes"})
NamedList addResult = new NamedList();
try {
ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, addResult, null);
} catch (Exception e) {
// fatal error - try rolling back
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
results.add("failure", errorString);
log.warn("Error adding replica {} - trying to roll back...", addReplicasProps, e);
addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName());
@SuppressWarnings({"rawtypes"})
NamedList rollback = new NamedList();
ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null);
if (rollback.get("failure") != null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+ ", collection may be inconsistent: " + rollback.get("failure"));
}
return;
}
if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
log.warn(errorString);
results.add("failure", errorString);
log.debug("--- trying to roll back...");
// try to roll back
addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName());
@SuppressWarnings({"rawtypes"})
NamedList rollback = new NamedList();
try {
ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+ ", collection may be inconsistent!", e);
}
if (rollback.get("failure") != null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+ ", collection may be inconsistent! Failure: " + rollback.get("failure"));
}
return;
} else {
String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
"to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), replica.getCoreName(), targetNode);
results.add("success", successString);
}
}
@SuppressWarnings({"unchecked"})
private void moveNormalReplica(ClusterState clusterState, @SuppressWarnings({"rawtypes"})NamedList results, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
String newCoreName = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType());
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
CoreAdminParams.NODE, targetNode,
CoreAdminParams.NAME, newCoreName,
ZkStateReader.REPLICA_TYPE, replica.getType().name());
if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
@SuppressWarnings({"rawtypes"})
NamedList addResult = new NamedList();
SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh);
ActiveReplicaWatcher watcher = null;
ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null).get(0);
log.debug("props {}", props);
if (replica.equals(slice.getLeader()) || waitForFinalState) {
watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch);
log.debug("-- registered watcher {}", watcher);
ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
}
if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
log.warn(errorString);
results.add("failure", errorString);
if (watcher != null) { // unregister
ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
}
return;
}
// wait for the other replica to be active if the source replica was a leader
if (watcher != null) {
try {
log.debug("Waiting for leader's replica to recover.");
if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
String errorString = String.format(Locale.ROOT, "Timed out waiting for leader's replica to recover, collection=%s shard=%s" +
" on node=%s", coll.getName(), slice.getName(), targetNode);
log.warn(errorString);
results.add("failure", errorString);
return;
} else {
if (log.isDebugEnabled()) {
log.debug("Replica {} is active - deleting the source...", watcher.getActiveReplicas());
}
}
} finally {
ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
}
}
ZkNodeProps removeReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
REPLICA_PROP, replica.getName());
if (async != null) removeReplicasProps.getProperties().put(ASYNC, async);
@SuppressWarnings({"rawtypes"})
NamedList deleteResult = new NamedList();
try {
ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
} catch (SolrException e) {
deleteResult.add("failure", e.toString());
}
if (deleteResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s",
coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure"));
log.warn(errorString);
results.add("failure", errorString);
} else {
String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
"to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode);
results.add("success", successString);
}
}
}