blob: 989003aee7d2fca6ab9b0fe497dc3b7b1328d278 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.api.collections;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
public CreateShardCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
String extCollectionName = message.getStr(COLLECTION_PROP);
String sliceName = message.getStr(SHARD_ID_PROP);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
log.info("Create shard invoked: {}", message);
if (extCollectionName == null || sliceName == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
boolean followAliases = message.getBool(FOLLOW_ALIASES, false);
String collectionName;
if (followAliases) {
collectionName = ocmh.cloudManager.getClusterStateProvider().resolveSimpleAlias(extCollectionName);
} else {
collectionName = extCollectionName;
}
DocCollection collection = clusterState.getCollection(collectionName);
int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
if (numNrtReplicas + numTlogReplicas <= 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
}
//ZkStateReader zkStateReader = ocmh.zkStateReader;
ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
// wait for a while until we see the shard
//ocmh.waitForNewShard(collectionName, sliceName);
// wait for a while until we see the shard and update the local view of the cluster state
clusterState = ocmh.waitForNewShard(collectionName, sliceName);
String async = message.getStr(ASYNC);
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, collectionName,
SHARD_ID_PROP, sliceName,
ZkStateReader.NRT_REPLICAS, String.valueOf(numNrtReplicas),
ZkStateReader.TLOG_REPLICAS, String.valueOf(numTlogReplicas),
ZkStateReader.PULL_REPLICAS, String.valueOf(numPullReplicas),
OverseerCollectionMessageHandler.CREATE_NODE_SET, message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
Map<String, Object> propertyParams = new HashMap<>();
ocmh.addPropertyParams(message, propertyParams);
addReplicasProps = addReplicasProps.plus(propertyParams);
if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
final NamedList addResult = new NamedList();
try {
//ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> {
ocmh.addReplica(clusterState, addReplicasProps, addResult, () -> {
Object addResultFailure = addResult.get("failure");
if (addResultFailure != null) {
SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap();
results.add("failure", failure);
}
failure.addAll((NamedList) addResultFailure);
} else {
SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
if (success == null) {
success = new SimpleOrderedMap();
results.add("success", success);
}
success.addAll((NamedList) addResult.get("success"));
}
});
} catch (Assign.AssignmentException e) {
// clean up the slice that we created
ZkNodeProps deleteShard = new ZkNodeProps(COLLECTION_PROP, collectionName, SHARD_ID_PROP, sliceName, ASYNC, async);
new DeleteShardCmd(ocmh).call(clusterState, deleteShard, results);
throw e;
}
log.info("Finished create command on all shards for collection: {}", collectionName);
}
}