blob: 5b83f63f1fbf7c449af6174a191487d954862702 [file] [log] [blame]
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.PlainIdRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OverseerCollectionProcessor implements Runnable, ClosableThread {
public static final String NUM_SLICES = "numShards";
public static final String REPLICATION_FACTOR = "replicationFactor";
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
public static final String CREATE_NODE_SET = "createNodeSet";
public static final String DELETECOLLECTION = "deletecollection";
public static final String CREATECOLLECTION = "createcollection";
public static final String RELOADCOLLECTION = "reloadcollection";
public static final String CREATEALIAS = "createalias";
public static final String DELETEALIAS = "deletealias";
public static final String SPLITSHARD = "splitshard";
// TODO: use from Overseer?
private static final String QUEUE_OPERATION = "operation";
private static Logger log = LoggerFactory
.getLogger(OverseerCollectionProcessor.class);
private DistributedQueue workQueue;
private String myId;
private ShardHandler shardHandler;
private String adminPath;
private ZkStateReader zkStateReader;
private boolean isClosed;
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()));
}
protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, DistributedQueue workQueue) {
this.zkStateReader = zkStateReader;
this.myId = myId;
this.shardHandler = shardHandler;
this.adminPath = adminPath;
this.workQueue = workQueue;
}
@Override
public void run() {
log.info("Process current queue of collection creations");
while (amILeader() && !isClosed) {
try {
QueueEvent head = workQueue.peek(true);
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
final String operation = message.getStr(QUEUE_OPERATION);
SolrResponse response = processMessage(message, operation);
head.setBytes(SolrResponse.serializable(response));
workQueue.remove(head);
log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("Overseer cannot talk to ZK");
return;
}
SolrException.log(log, "", e);
throw new ZooKeeperException(
SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Throwable e) {
SolrException.log(log, "", e);
}
}
}
public void close() {
isClosed = true;
}
protected boolean amILeader() {
try {
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
"/overseer_elect/leader", null, null, true));
if (myId.equals(props.getStr("id"))) {
return true;
}
} catch (KeeperException e) {
log.warn("", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
return false;
}
protected SolrResponse processMessage(ZkNodeProps message, String operation) {
NamedList results = new NamedList();
try {
if (CREATECOLLECTION.equals(operation)) {
createCollection(zkStateReader.getClusterState(), message, results);
} else if (DELETECOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
params.set(CoreAdminParams.DELETE_DATA_DIR, true);
collectionCmd(zkStateReader.getClusterState(), message, params, results, null);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.REMOVECOLLECTION, "name", message.getStr("name"));
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
// wait for a while until we don't see the collection
long now = System.currentTimeMillis();
long timeout = now + 30000;
boolean removed = false;
while (System.currentTimeMillis() < timeout) {
Thread.sleep(100);
removed = !zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
if (removed) {
Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
break;
}
}
if (!removed) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + message.getStr("name"));
}
} else if (RELOADCOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
collectionCmd(zkStateReader.getClusterState(), message, params, results, ZkStateReader.ACTIVE);
} else if (CREATEALIAS.equals(operation)) {
createAlias(zkStateReader.getAliases(), message);
} else if (DELETEALIAS.equals(operation)) {
deleteAlias(zkStateReader.getAliases(), message);
} else if (SPLITSHARD.equals(operation)) {
splitShard(zkStateReader.getClusterState(), message, results);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
}
} catch (Throwable t) {
SolrException.log(log, "Collection " + operation + " of " + operation
+ " failed", t);
results.add("Operation " + operation + " caused exception:", t);
SimpleOrderedMap nl = new SimpleOrderedMap();
nl.add("msg", t.getMessage());
nl.add("rspCode", t instanceof SolrException ? ((SolrException)t).code() : -1);
results.add("exception", nl);
}
return new OverseerSolrResponse(results);
}
private void createAlias(Aliases aliases, ZkNodeProps message) {
String aliasName = message.getStr("name");
String collections = message.getStr("collections");
Map<String,Map<String,String>> newAliasesMap = new HashMap<String,Map<String,String>>();
Map<String,String> newCollectionAliasesMap = new HashMap<String,String>();
Map<String,String> prevColAliases = aliases.getCollectionAliasMap();
if (prevColAliases != null) {
newCollectionAliasesMap.putAll(prevColAliases);
}
newCollectionAliasesMap.put(aliasName, collections);
newAliasesMap.put("collection", newCollectionAliasesMap);
Aliases newAliases = new Aliases(newAliasesMap);
byte[] jsonBytes = null;
if (newAliases.collectionAliasSize() > 0) { // only sub map right now
jsonBytes = ZkStateReader.toJSON(newAliases.getAliasMap());
}
try {
zkStateReader.getZkClient().setData(ZkStateReader.ALIASES,
jsonBytes, true);
checkForAlias(aliasName, collections);
// some fudge for other nodes
Thread.sleep(100);
} catch (KeeperException e) {
log.error("", e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
log.warn("", e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
private void checkForAlias(String name, String value) {
long now = System.currentTimeMillis();
long timeout = now + 30000;
boolean success = false;
Aliases aliases = null;
while (System.currentTimeMillis() < timeout) {
aliases = zkStateReader.getAliases();
String collections = aliases.getCollectionAlias(name);
if (collections != null && collections.equals(value)) {
success = true;
break;
}
}
if (!success) {
log.warn("Timeout waiting to be notified of Alias change...");
}
}
private void checkForAliasAbsence(String name) {
long now = System.currentTimeMillis();
long timeout = now + 30000;
boolean success = false;
Aliases aliases = null;
while (System.currentTimeMillis() < timeout) {
aliases = zkStateReader.getAliases();
String collections = aliases.getCollectionAlias(name);
if (collections == null) {
success = true;
break;
}
}
if (!success) {
log.warn("Timeout waiting to be notified of Alias change...");
}
}
private void deleteAlias(Aliases aliases, ZkNodeProps message) {
String aliasName = message.getStr("name");
Map<String,Map<String,String>> newAliasesMap = new HashMap<String,Map<String,String>>();
Map<String,String> newCollectionAliasesMap = new HashMap<String,String>();
newCollectionAliasesMap.putAll(aliases.getCollectionAliasMap());
newCollectionAliasesMap.remove(aliasName);
newAliasesMap.put("collection", newCollectionAliasesMap);
Aliases newAliases = new Aliases(newAliasesMap);
byte[] jsonBytes = null;
if (newAliases.collectionAliasSize() > 0) { // only sub map right now
jsonBytes = ZkStateReader.toJSON(newAliases.getAliasMap());
}
try {
zkStateReader.getZkClient().setData(ZkStateReader.ALIASES,
jsonBytes, true);
checkForAliasAbsence(aliasName);
// some fudge for other nodes
Thread.sleep(100);
} catch (KeeperException e) {
log.error("", e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
log.warn("", e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
log.info("Split shard invoked");
String collectionName = message.getStr("collection");
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
Slice parentSlice = clusterState.getSlice(collectionName, slice);
if (parentSlice == null) {
if(clusterState.getCollections().contains(collectionName)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName);
}
}
// find the leader for the shard
Replica parentShardLeader = clusterState.getLeader(collectionName, slice);
DocCollection collection = clusterState.getCollection(collectionName);
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
DocRouter.Range range = parentSlice.getRange();
if (range == null) {
range = new PlainIdRouter().fullRange();
}
// todo: fixed to two partitions?
// todo: accept the range as a param to api?
List<DocRouter.Range> subRanges = router.partitionRange(2, range);
try {
List<String> subSlices = new ArrayList<String>(subRanges.size());
List<String> subShardNames = new ArrayList<String>(subRanges.size());
String nodeName = parentShardLeader.getNodeName();
for (int i = 0; i < subRanges.size(); i++) {
String subSlice = slice + "_" + i;
subSlices.add(subSlice);
String subShardName = collectionName + "_" + subSlice + "_replica1";
subShardNames.add(subShardName);
Slice oSlice = clusterState.getSlice(collectionName, subSlice);
if (oSlice != null) {
if (Slice.ACTIVE.equals(oSlice.getState())) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
} else if (Slice.CONSTRUCTION.equals(oSlice.getState())) {
for (Replica replica : oSlice.getReplicas()) {
String core = replica.getStr("core");
log.info("Unloading core: " + core + " from node: " + replica.getNodeName());
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
params.set(CoreAdminParams.CORE, core);
params.set(CoreAdminParams.DELETE_INDEX, "true");
sendShardRequest(replica.getNodeName(), params);
}
}
}
}
// do not abort splitshard if the unloading fails
// this can happen because the replicas created previously may be down
// the only side effect of this is that the sub shard may end up having more replicas than we want
collectShardResponses(results, false, null);
for (int i=0; i<subRanges.size(); i++) {
String subSlice = subSlices.get(i);
String subShardName = subShardNames.get(i);
DocRouter.Range subRange = subRanges.get(i);
log.info("Creating shard " + subShardName + " as part of slice "
+ subSlice + " of collection " + collectionName + " on "
+ nodeName);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
params.set(CoreAdminParams.NAME, subShardName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, subSlice);
params.set(CoreAdminParams.SHARD_RANGE, subRange.toString());
params.set(CoreAdminParams.SHARD_STATE, Slice.CONSTRUCTION);
//params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); todo: is it necessary, we're not creating collections?
sendShardRequest(nodeName, params);
// wait for parent leader to acknowledge the sub-shard core
log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
cmd.setCoreName(subShardName);
cmd.setNodeName(nodeName);
cmd.setCoreNodeName(nodeName + "_" + subShardName);
cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
}
collectShardResponses(results, true,
"SPLTSHARD failed to create subshard leaders or timed out waiting for them to come up");
log.info("Successfully created all sub-shards for collection "
+ collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice "
+ slice + " of collection " + collectionName + " on "
+ parentShardLeader);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
for (int i = 0; i < subShardNames.size(); i++) {
String subShardName = subShardNames.get(i);
params.add(CoreAdminParams.TARGET_CORE, subShardName);
}
sendShardRequest(parentShardLeader.getNodeName(), params);
collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command");
log.info("Index on shard: " + nodeName + " split into two successfully");
// apply buffered updates on sub-shards
for (int i = 0; i < subShardNames.size(); i++) {
String subShardName = subShardNames.get(i);
log.info("Applying buffered updates on : " + subShardName);
params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, subShardName);
sendShardRequest(nodeName, params);
}
collectShardResponses(results, true,
"SPLITSHARD failed while asking sub shard leaders to apply buffered updates");
log.info("Successfully applied buffered updates on : " + subShardNames);
// Replica creation for the new Slices
// look at the replication factor and see if it matches reality
// if it does not, find best nodes to create more cores
// TODO: Have replication factor decided in some other way instead of numShards for the parent
int repFactor = clusterState.getSlice(collectionName, slice).getReplicas().size();
// we need to look at every node and see how many cores it serves
// add our new cores to existing nodes serving the least number of cores
// but (for now) require that each core goes on a distinct node.
// TODO: add smarter options that look at the current number of cores per
// node?
// for now we just go random
Set<String> nodes = clusterState.getLiveNodes();
List<String> nodeList = new ArrayList<String>(nodes.size());
nodeList.addAll(nodes);
Collections.shuffle(nodeList);
// TODO: Have maxShardsPerNode param for this operation?
// Remove the node that hosts the parent shard for replica creation.
nodeList.remove(nodeName);
// TODO: change this to handle sharding a slice into > 2 sub-shards.
for (int i = 1; i <= subSlices.size(); i++) {
Collections.shuffle(nodeList);
String sliceName = subSlices.get(i - 1);
for (int j = 2; j <= repFactor; j++) {
String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
String shardName = collectionName + "_" + sliceName + "_replica" + (j);
log.info("Creating replica shard " + shardName + " as part of slice "
+ sliceName + " of collection " + collectionName + " on "
+ subShardNodeName);
// Need to create new params for each request
params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
params.set(CoreAdminParams.NAME, shardName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
// TODO: Figure the config used by the parent shard and use it.
//params.set("collection.configName", configName);
//Not using this property. Do we really need to use it?
//params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
sendShardRequest(subShardNodeName, params);
// wait for the replicas to be seen as active on sub shard leader
log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
cmd.setCoreName(subShardNames.get(i-1));
cmd.setNodeName(subShardNodeName);
cmd.setCoreNodeName(subShardNodeName + "_" + shardName);
cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
}
}
collectShardResponses(results, true,
"SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
log.info("Successfully created all replica shards for all sub-slices "
+ subSlices);
log.info("Requesting update shard state");
DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<String, Object>();
propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
propMap.put(slice, Slice.INACTIVE);
for (String subSlice : subSlices) {
propMap.put(subSlice, Slice.ACTIVE);
}
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
inQueue.offer(ZkStateReader.toJSON(m));
return true;
} catch (SolrException e) {
throw e;
} catch (Exception e) {
log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
}
}
private void collectShardResponses(NamedList results, boolean abortOnError, String msgOnError) {
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
if (abortOnError && srsp.getException() != null) {
throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, srsp.getException());
}
}
} while (srsp != null);
}
private void sendShardRequest(String nodeName, ModifiableSolrParams params) {
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
String replica = zkStateReader.getZkClient().getBaseUrlForNodeName(nodeName);
if (replica.startsWith("http://")) replica = replica.substring(7);
sreq.shards = new String[]{replica};
sreq.actualShards = sreq.shards;
sreq.params = params;
shardHandler.submit(sreq, replica, sreq.params);
}
private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
String collectionName = message.getStr("name");
if (clusterState.getCollections().contains(collectionName)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
}
try {
// look at the replication factor and see if it matches reality
// if it does not, find best nodes to create more cores
int repFactor = msgStrToInt(message, REPLICATION_FACTOR, 1);
Integer numSlices = msgStrToInt(message, NUM_SLICES, null);
if (numSlices == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param");
}
int maxShardsPerNode = msgStrToInt(message, MAX_SHARDS_PER_NODE, 1);
String createNodeSetStr;
List<String> createNodeList = ((createNodeSetStr = message.getStr(CREATE_NODE_SET)) == null)?null:StrUtils.splitSmart(createNodeSetStr, ",", true);
if (repFactor <= 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than or equal to 0");
}
if (numSlices <= 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
}
String configName = message.getStr("collection.configName");
// we need to look at every node and see how many cores it serves
// add our new cores to existing nodes serving the least number of cores
// but (for now) require that each core goes on a distinct node.
// TODO: add smarter options that look at the current number of cores per
// node?
// for now we just go random
Set<String> nodes = clusterState.getLiveNodes();
List<String> nodeList = new ArrayList<String>(nodes.size());
nodeList.addAll(nodes);
if (createNodeList != null) nodeList.retainAll(createNodeList);
Collections.shuffle(nodeList);
if (nodeList.size() <= 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName
+ ". No live Solr-instances" + ((createNodeList != null)?" among Solr-instances specified in " + CREATE_NODE_SET + ":" + createNodeSetStr:""));
}
if (repFactor > nodeList.size()) {
log.warn("Specified "
+ REPLICATION_FACTOR
+ " of "
+ repFactor
+ " on collection "
+ collectionName
+ " is higher than or equal to the number of Solr instances currently live or part of your " + CREATE_NODE_SET + "("
+ nodeList.size()
+ "). Its unusual to run two replica of the same slice on the same Solr-instance.");
}
int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
int requestedShardsToCreate = numSlices * repFactor;
if (maxShardsAllowedToCreate < requestedShardsToCreate) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
+ MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+ ", and the number of live nodes is " + nodeList.size()
+ ". This allows a maximum of " + maxShardsAllowedToCreate
+ " to be created. Value of " + NUM_SLICES + " is " + numSlices
+ " and value of " + REPLICATION_FACTOR + " is " + repFactor
+ ". This requires " + requestedShardsToCreate
+ " shards to be created (higher than the allowed number)");
}
for (int i = 1; i <= numSlices; i++) {
for (int j = 1; j <= repFactor; j++) {
String nodeName = nodeList.get((repFactor * (i - 1) + (j - 1)) % nodeList.size());
String sliceName = "shard" + i;
String shardName = collectionName + "_" + sliceName + "_replica" + j;
log.info("Creating shard " + shardName + " as part of slice "
+ sliceName + " of collection " + collectionName + " on "
+ nodeName);
// Need to create new params for each request
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
params.set(CoreAdminParams.NAME, shardName);
params.set("collection.configName", configName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
String replica = zkStateReader.getZkClient()
.getBaseUrlForNodeName(nodeName);
if (replica.startsWith("http://")) replica = replica.substring(7);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = params;
shardHandler.submit(sreq, replica, sreq.params);
}
}
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
}
} while (srsp != null);
log.info("Finished create command on all shards for collection: "
+ collectionName);
} catch (SolrException ex) {
throw ex;
} catch (Exception ex) {
throw new SolrException(ErrorCode.SERVER_ERROR, null, ex);
}
}
private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, String stateMatcher) {
log.info("Executing Collection Cmd : " + params);
String collectionName = message.getStr("name");
DocCollection coll = clusterState.getCollection(collectionName);
if (coll == null) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Could not find collection:" + collectionName);
}
for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
Slice slice = entry.getValue();
Map<String,Replica> shards = slice.getReplicasMap();
Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
for (Map.Entry<String,Replica> shardEntry : shardEntries) {
final ZkNodeProps node = shardEntry.getValue();
if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) && (stateMatcher != null ? node.getStr(ZkStateReader.STATE_PROP).equals(stateMatcher) : true)) {
// For thread safety, only simple clone the ModifiableSolrParams
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
cloneParams.add(params);
cloneParams.set(CoreAdminParams.CORE,
node.getStr(ZkStateReader.CORE_NAME_PROP));
String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
ShardRequest sreq = new ShardRequest();
sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
// yes, they must use same admin handler path everywhere...
cloneParams.set("qt", adminPath);
sreq.purpose = 1;
// TODO: this sucks
if (replica.startsWith("http://")) replica = replica.substring(7);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = cloneParams;
log.info("Collection Admin sending CoreAdmin cmd to " + replica
+ " params:" + sreq.params);
shardHandler.submit(sreq, replica, sreq.params);
}
}
}
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
}
} while (srsp != null);
}
private void processResponse(NamedList results, ShardResponse srsp) {
Throwable e = srsp.getException();
if (e != null) {
log.error("Error from shard: " + srsp.getShard(), e);
SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap();
results.add("failure", failure);
}
failure.add(srsp.getNodeName(), e.getClass().getName() + ":" + e.getMessage());
} else {
SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
if (success == null) {
success = new SimpleOrderedMap();
results.add("success", success);
}
success.add(srsp.getNodeName(), srsp.getSolrResponse().getResponse());
}
}
private Integer msgStrToInt(ZkNodeProps message, String key, Integer def)
throws Exception {
String str = message.getStr(key);
try {
return str == null ? def : Integer.parseInt(str);
} catch (Exception ex) {
SolrException.log(log, "Could not parse " + key, ex);
throw ex;
}
}
@Override
public boolean isClosed() {
return isClosed;
}
}