blob: ea1b2779e436bd81a4ece9bd2f76744c2e85cfee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.api.collections;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.ActiveReplicaWatcher;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* When AddReplica is called with this set to true, then we do not try to find node assignments
* for the add replica API. If set to true, a valid "node" should be specified.
*/
public static final String SKIP_NODE_ASSIGNMENT = "skipNodeAssignment";
private final OverseerCollectionMessageHandler ocmh;
public AddReplicaCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
}
@Override
public void call(ClusterState state, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
addReplica(state, message, results, null);
}
@SuppressWarnings({"unchecked"})
List<ZkNodeProps> addReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete)
throws IOException, InterruptedException, KeeperException {
if (log.isDebugEnabled()) {
log.debug("addReplica() : {}", Utils.toJSONString(message));
}
String extCollectionName = message.getStr(COLLECTION_PROP);
boolean followAliases = message.getBool(FOLLOW_ALIASES, false);
String shard = message.getStr(SHARD_ID_PROP);
final String collectionName;
if (followAliases) {
collectionName = ocmh.cloudManager.getClusterStateProvider().resolveSimpleAlias(extCollectionName);
} else {
collectionName = extCollectionName;
}
DocCollection coll = clusterState.getCollection(collectionName);
if (coll == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " does not exist");
}
if (coll.getSlice(shard) == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collectionName + " shard: " + shard + " does not exist");
}
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
final String asyncId = message.getStr(ASYNC);
String node = message.getStr(CoreAdminParams.NODE);
String createNodeSetStr = message.getStr(CREATE_NODE_SET);
if (node != null && createNodeSetStr != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Both 'node' and 'createNodeSet' parameters cannot be specified together.");
}
int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
boolean parallel = message.getBool("parallel", false);
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
EnumMap<Replica.Type, Integer> replicaTypesVsCount = new EnumMap<>(Replica.Type.class);
replicaTypesVsCount.put(Replica.Type.NRT, message.getInt(NRT_REPLICAS, replicaType == Replica.Type.NRT ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.TLOG, message.getInt(TLOG_REPLICAS, replicaType == Replica.Type.TLOG ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.PULL, message.getInt(PULL_REPLICAS, replicaType == Replica.Type.PULL ? 1 : 0));
int totalReplicas = 0;
for (Map.Entry<Replica.Type, Integer> entry : replicaTypesVsCount.entrySet()) {
totalReplicas += entry.getValue();
}
if (totalReplicas > 1) {
if (message.getStr(CoreAdminParams.NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'name' parameter is specified");
}
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'coreNodeName' parameter is specified");
}
}
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
List<CreateReplica> createReplicas;
try {
createReplicas = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, replicaTypesVsCount, sessionWrapper)
.stream()
.map(replicaPosition -> assignReplicaDetails(ocmh.cloudManager, clusterState, message, replicaPosition))
.collect(Collectors.toList());
} finally {
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
}
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
ZkStateReader zkStateReader = ocmh.zkStateReader;
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
for (CreateReplica createReplica : createReplicas) {
assert createReplica.coreName != null;
ModifiableSolrParams params = getReplicaParams(clusterState, message, results, collectionName, coll, skipCreateReplicaInClusterState, asyncId, shardHandler, createReplica);
shardRequestTracker.sendShardRequest(createReplica.node, params, shardHandler);
}
Runnable runnable = () -> {
shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica");
for (CreateReplica replica : createReplicas) {
ocmh.waitForCoreNodeName(collectionName, replica.node, replica.coreName);
}
if (onComplete != null) onComplete.run();
};
if (!parallel || waitForFinalState) {
if (waitForFinalState) {
SolrCloseableLatch latch = new SolrCloseableLatch(totalReplicas, ocmh);
ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null,
createReplicas.stream().map(createReplica -> createReplica.coreName).collect(Collectors.toList()), latch);
try {
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
runnable.run();
if (!latch.await(timeout, TimeUnit.SECONDS)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
}
} finally {
zkStateReader.removeCollectionStateWatcher(collectionName, watcher);
}
} else {
runnable.run();
}
} else {
ocmh.tpe.submit(runnable);
}
return createReplicas.stream()
.map(createReplica -> new ZkNodeProps(
ZkStateReader.COLLECTION_PROP, createReplica.collectionName,
ZkStateReader.SHARD_ID_PROP, createReplica.sliceName,
ZkStateReader.CORE_NAME_PROP, createReplica.coreName,
ZkStateReader.NODE_NAME_PROP, createReplica.node
))
.collect(Collectors.toList());
}
private ModifiableSolrParams getReplicaParams(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, String collectionName, DocCollection coll, boolean skipCreateReplicaInClusterState, String asyncId, ShardHandler shardHandler, CreateReplica createReplica) throws IOException, InterruptedException, KeeperException {
if (coll.getStr(WITH_COLLECTION) != null) {
String withCollectionName = coll.getStr(WITH_COLLECTION);
DocCollection withCollection = clusterState.getCollection(withCollectionName);
if (withCollection.getActiveSlices().size() > 1) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + withCollection.getActiveSlices().size());
}
String withCollectionShard = withCollection.getActiveSlices().iterator().next().getName();
List<Replica> replicas = withCollection.getReplicas(createReplica.node);
if (replicas == null || replicas.isEmpty()) {
// create a replica of withCollection on the identified node before proceeding further
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
ZkStateReader.COLLECTION_PROP, withCollectionName,
ZkStateReader.SHARD_ID_PROP, withCollectionShard,
"node", createReplica.node,
// since we already computed node assignments (which include assigning a node for this withCollection replica) we want to skip the assignment step
SKIP_NODE_ASSIGNMENT, "true",
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created
addReplica(clusterState, props, results, null);
}
}
ModifiableSolrParams params = new ModifiableSolrParams();
ZkStateReader zkStateReader = ocmh.zkStateReader;
if (!Overseer.isLegacy(zkStateReader)) {
if (!skipCreateReplicaInClusterState) {
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.SHARD_ID_PROP, createReplica.sliceName,
ZkStateReader.CORE_NAME_PROP, createReplica.coreName,
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.NODE_NAME_PROP, createReplica.node,
ZkStateReader.REPLICA_TYPE, createReplica.replicaType.name());
if (createReplica.coreNodeName != null) {
props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
}
try {
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
}
}
params.set(CoreAdminParams.CORE_NODE_NAME,
ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(createReplica.coreName)).get(createReplica.coreName).getName());
}
String configName = zkStateReader.readConfigName(collectionName);
String routeKey = message.getStr(ShardParams._ROUTE_);
String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
String ulogDir = message.getStr(CoreAdminParams.ULOG_DIR);
String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
params.set(CoreAdminParams.NAME, createReplica.coreName);
params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.REPLICA_TYPE, createReplica.replicaType.name());
if (createReplica.sliceName != null) {
params.set(CoreAdminParams.SHARD, createReplica.sliceName);
} else if (routeKey != null) {
Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
if (slices.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found");
} else {
params.set(CoreAdminParams.SHARD, slices.iterator().next().getName());
}
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param");
}
if (dataDir != null) {
params.set(CoreAdminParams.DATA_DIR, dataDir);
}
if (ulogDir != null) {
params.set(CoreAdminParams.ULOG_DIR, ulogDir);
}
if (instanceDir != null) {
params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
}
if (createReplica.coreNodeName != null) {
params.set(CoreAdminParams.CORE_NODE_NAME, createReplica.coreNodeName);
}
ocmh.addPropertyParams(message, params);
return params;
}
public static CreateReplica assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState,
ZkNodeProps message, ReplicaPosition replicaPosition) {
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
String collection = message.getStr(COLLECTION_PROP);
String node = replicaPosition.node;
String shard = message.getStr(SHARD_ID_PROP);
String coreName = message.getStr(CoreAdminParams.NAME);
String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
Replica.Type replicaType = replicaPosition.type;
if (StringUtils.isBlank(coreName)) {
coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
}
log.info("Node Identified {} for creating new replica of shard {} for collection {}", node, shard, collection);
if (!clusterState.liveNodesContain(node)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
}
DocCollection coll = clusterState.getCollection(collection);
if (coreName == null) {
coreName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), coll, shard, replicaType);
} else if (!skipCreateReplicaInClusterState) {
//Validate that the core name is unique in that collection
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
String replicaCoreName = replica.getStr(CORE_NAME_PROP);
if (coreName.equals(replicaCoreName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
" for this collection");
}
}
}
}
log.info("Returning CreateReplica command.");
return new CreateReplica(collection, shard, node, replicaType, coreName, coreNodeName);
}
public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
String collectionName, ZkNodeProps message,
EnumMap<Replica.Type, Integer> replicaTypeVsCount,
AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
boolean skipNodeAssignment = message.getBool(SKIP_NODE_ASSIGNMENT, false);
String sliceName = message.getStr(SHARD_ID_PROP);
DocCollection collection = clusterState.getCollection(collectionName);
int numNrtReplicas = replicaTypeVsCount.get(Replica.Type.NRT);
int numPullReplicas = replicaTypeVsCount.get(Replica.Type.PULL);
int numTlogReplicas = replicaTypeVsCount.get(Replica.Type.TLOG);
int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
String node = message.getStr(CoreAdminParams.NODE);
Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
if (createNodeSetStr == null) {
if (node != null) {
message.getProperties().put(OverseerCollectionMessageHandler.CREATE_NODE_SET, node);
createNodeSetStr = node;
}
}
List<ReplicaPosition> positions = null;
if (!skipCreateReplicaInClusterState && !skipNodeAssignment) {
positions = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, numNrtReplicas,
numTlogReplicas, numPullReplicas, createNodeSetStr, cloudManager);
sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
}
if (positions == null) {
assert node != null;
if (node == null) {
// in case asserts are disabled
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"A node should have been identified to add replica but wasn't. Please inform solr developers at SOLR-9317");
}
// it is unlikely that multiple replicas have been requested to be created on
// the same node, but we've got to accommodate.
positions = new ArrayList<>(totalReplicas);
int i = 0;
for (Map.Entry<Replica.Type, Integer> entry : replicaTypeVsCount.entrySet()) {
for (int j = 0; j < entry.getValue(); j++) {
positions.add(new ReplicaPosition(sliceName, i++, entry.getKey(), node));
}
}
}
return positions;
}
/**
* A data structure to keep all information required to create a new replica in one place.
* Think of it as a typed ZkNodeProps for replica creation.
*
* This is <b>not</b> a public API and can be changed at any time without notice.
*/
public static class CreateReplica {
public final String collectionName;
public final String sliceName;
public final String node;
public final Replica.Type replicaType;
public String coreName;
public String coreNodeName;
CreateReplica(String collectionName, String sliceName, String node, Replica.Type replicaType, String coreName, String coreNodeName) {
this.collectionName = collectionName;
this.sliceName = sliceName;
this.node = node;
this.replicaType = replicaType;
this.coreName = coreName;
this.coreNodeName = coreNodeName;
}
}
}