blob: c263203dcc89c65d10f822ac0c7bc8a87a4cf56a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.api.collections;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DeleteReplicaCmd implements Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
public DeleteReplicaCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
}
@Override
@SuppressWarnings("unchecked")
public void call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
deleteReplica(clusterState, message, results,null);
}
@SuppressWarnings("unchecked")
void deleteReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete)
throws KeeperException, InterruptedException {
if (log.isDebugEnabled()) {
log.debug("deleteReplica() : {}", Utils.toJSONString(message));
}
boolean parallel = message.getBool("parallel", false);
//If a count is specified the strategy needs be different
if (message.getStr(COUNT_PROP) != null) {
deleteReplicaBasedOnCount(clusterState, message, results, onComplete, parallel);
return;
}
ocmh.checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
String extCollectionName = message.getStr(COLLECTION_PROP);
String shard = message.getStr(SHARD_ID_PROP);
String replicaName = message.getStr(REPLICA_PROP);
boolean followAliases = message.getBool(FOLLOW_ALIASES, false);
String collectionName;
if (followAliases) {
collectionName = ocmh.cloudManager.getClusterStateProvider().resolveSimpleAlias(extCollectionName);
} else {
collectionName = extCollectionName;
}
DocCollection coll = clusterState.getCollection(collectionName);
Slice slice = coll.getSlice(shard);
if (slice == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid shard name : " + shard + " in collection : " + collectionName);
}
deleteCore(slice, collectionName, replicaName, message, shard, results, onComplete, parallel);
}
/**
* Delete replicas based on count for a given collection. If a shard is passed, uses that
* else deletes given num replicas across all shards for the given collection.
*/
@SuppressWarnings({"unchecked"})
void deleteReplicaBasedOnCount(ClusterState clusterState,
ZkNodeProps message,
@SuppressWarnings({"rawtypes"})NamedList results,
Runnable onComplete,
boolean parallel)
throws KeeperException, InterruptedException {
ocmh.checkRequired(message, COLLECTION_PROP, COUNT_PROP);
int count = Integer.parseInt(message.getStr(COUNT_PROP));
String collectionName = message.getStr(COLLECTION_PROP);
String shard = message.getStr(SHARD_ID_PROP);
DocCollection coll = clusterState.getCollection(collectionName);
Slice slice = null;
//Validate if shard is passed.
if (shard != null) {
slice = coll.getSlice(shard);
if (slice == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid shard name : " + shard + " in collection : " + collectionName);
}
}
Map<Slice, Set<String>> shardToReplicasMapping = new HashMap<Slice, Set<String>>();
if (slice != null) {
Set<String> replicasToBeDeleted = pickReplicasTobeDeleted(slice, shard, collectionName, count);
shardToReplicasMapping.put(slice,replicasToBeDeleted);
} else {
//If there are many replicas left, remove the rest based on count.
Collection<Slice> allSlices = coll.getSlices();
for (Slice individualSlice : allSlices) {
Set<String> replicasToBeDeleted = pickReplicasTobeDeleted(individualSlice, individualSlice.getName(), collectionName, count);
shardToReplicasMapping.put(individualSlice, replicasToBeDeleted);
}
}
for (Map.Entry<Slice, Set<String>> entry : shardToReplicasMapping.entrySet()) {
Slice shardSlice = entry.getKey();
String shardId = shardSlice.getName();
Set<String> replicas = entry.getValue();
//callDeleteReplica on all replicas
for (String replica: replicas) {
log.debug("Deleting replica {} for shard {} based on count {}", replica, shardId, count);
deleteCore(shardSlice, collectionName, replica, message, shard, results, onComplete, parallel);
}
results.add("shard_id", shardId);
results.add("replicas_deleted", replicas);
}
}
/**
* Pick replicas to be deleted. Avoid picking the leader.
*/
private Set<String> pickReplicasTobeDeleted(Slice slice, String shard, String collectionName, int count) {
validateReplicaAvailability(slice, shard, collectionName, count);
Collection<Replica> allReplicas = slice.getReplicas();
Set<String> replicasToBeRemoved = new HashSet<String>();
Replica leader = slice.getLeader();
for (Replica replica: allReplicas) {
if (count == 0) {
break;
}
//Try avoiding to pick up the leader to minimize activity on the cluster.
if (leader.getCoreName().equals(replica.getCoreName())) {
continue;
}
replicasToBeRemoved.add(replica.getName());
count --;
}
return replicasToBeRemoved;
}
/**
* Validate if there is less replicas than requested to remove. Also error out if there is
* only one replica available
*/
private void validateReplicaAvailability(Slice slice, String shard, String collectionName, int count) {
//If there is a specific shard passed, validate if there any or just 1 replica left
if (slice != null) {
Collection<Replica> allReplicasForShard = slice.getReplicas();
if (allReplicasForShard == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No replicas found in shard/collection: " +
shard + "/" + collectionName);
}
if (allReplicasForShard.size() == 1) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There is only one replica available in shard/collection: " +
shard + "/" + collectionName + ". Cannot delete that.");
}
if (allReplicasForShard.size() <= count) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There are lesser num replicas requested to be deleted than are available in shard/collection : " +
shard + "/" + collectionName + " Requested: " + count + " Available: " + allReplicasForShard.size() + ".");
}
}
}
@SuppressWarnings({"unchecked"})
void deleteCore(Slice slice, String collectionName, String replicaName,ZkNodeProps message, String shard, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete, boolean parallel) throws KeeperException, InterruptedException {
Replica replica = slice.getReplica(replicaName);
if (replica == null) {
ArrayList<String> l = new ArrayList<>();
for (Replica r : slice.getReplicas())
l.add(r.getName());
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : " +
shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
}
// If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
// on the command.
if (Boolean.parseBoolean(message.getStr(OverseerCollectionMessageHandler.ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName +
" with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
}
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
String asyncId = message.getStr(ASYNC);
ModifiableSolrParams params = new ModifiableSolrParams();
params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
params.add(CoreAdminParams.CORE, core);
params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
params.set(CoreAdminParams.DELETE_METRICS_HISTORY, message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true));
boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
if (isLive) {
shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
}
Callable<Boolean> callable = () -> {
try {
if (isLive) {
shardRequestTracker.processResponses(results, shardHandler, false, null);
//check if the core unload removed the corenode zk entry
if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
}
// try and ensure core info is removed from cluster state
ocmh.deleteCoreNode(collectionName, replicaName, replica, core);
if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
return Boolean.FALSE;
} catch (Exception e) {
results.add("failure", "Could not complete delete " + e.getMessage());
throw e;
} finally {
if (onComplete != null) onComplete.run();
}
};
if (!parallel) {
try {
if (!callable.call())
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
} catch (InterruptedException | KeeperException e) {
throw e;
} catch (Exception ex) {
throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
}
} else {
ocmh.tpe.submit(callable);
}
}
}