| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.solr.cloud.api.collections; |
| |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Properties; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.solr.client.solrj.cloud.AlreadyExistsException; |
| import org.apache.solr.client.solrj.cloud.BadVersionException; |
| import org.apache.solr.client.solrj.cloud.DistribStateManager; |
| import org.apache.solr.client.solrj.cloud.NotEmptyException; |
| import org.apache.solr.client.solrj.cloud.SolrCloudManager; |
| import org.apache.solr.client.solrj.cloud.VersionedData; |
| import org.apache.solr.cloud.Overseer; |
| import org.apache.solr.cloud.ZkController; |
| import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; |
| import org.apache.solr.cloud.overseer.ClusterStateMutator; |
| import org.apache.solr.cluster.placement.PlacementPlugin; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrException.ErrorCode; |
| import org.apache.solr.common.cloud.Aliases; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.DocRouter; |
| import org.apache.solr.common.cloud.ImplicitDocRouter; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.ReplicaPosition; |
| import org.apache.solr.common.cloud.ZkConfigManager; |
| import org.apache.solr.common.cloud.ZkNodeProps; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.cloud.ZooKeeperException; |
| import org.apache.solr.common.params.CollectionAdminParams; |
| import org.apache.solr.common.params.CommonAdminParams; |
| import org.apache.solr.common.params.CoreAdminParams; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.common.util.SimpleOrderedMap; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.common.util.Utils; |
| import org.apache.solr.handler.component.ShardHandler; |
| import org.apache.solr.handler.component.ShardRequest; |
| import org.apache.solr.util.TimeOut; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.NoNodeException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS; |
| import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS; |
| import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; |
| import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; |
| import static org.apache.solr.common.params.CollectionAdminParams.ALIAS; |
| import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; |
| import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; |
| import static org.apache.solr.common.params.CommonAdminParams.ASYNC; |
| import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; |
| import static org.apache.solr.common.params.CommonParams.NAME; |
| import static org.apache.solr.common.util.StrUtils.formatString; |
| import static org.apache.solr.handler.admin.ConfigSetsHandler.DEFAULT_CONFIGSET_NAME; |
| import static org.apache.solr.handler.admin.ConfigSetsHandler.getSuffixedNameForAutoGeneratedConfigSet; |
| |
| public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd { |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| private final OverseerCollectionMessageHandler ocmh; |
| private final TimeSource timeSource; |
| private final DistribStateManager stateManager; |
| |
| public CreateCollectionCmd(OverseerCollectionMessageHandler ocmh) { |
| this.ocmh = ocmh; |
| this.stateManager = ocmh.cloudManager.getDistribStateManager(); |
| this.timeSource = ocmh.cloudManager.getTimeSource(); |
| } |
| |
| @Override |
| @SuppressWarnings({"unchecked"}) |
| public void call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception { |
| if (ocmh.zkStateReader.aliasesManager != null) { // not a mock ZkStateReader |
| ocmh.zkStateReader.aliasesManager.update(); |
| } |
| final Aliases aliases = ocmh.zkStateReader.getAliases(); |
| final String collectionName = message.getStr(NAME); |
| final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false); |
| final String alias = message.getStr(ALIAS, collectionName); |
| log.info("Create collection {}", collectionName); |
| if (clusterState.hasCollection(collectionName)) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName); |
| } |
| if (aliases.hasAlias(collectionName)) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection alias already exists: " + collectionName); |
| } |
| |
| String configName = getConfigName(collectionName, message); |
| if (configName == null) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No config set found to associate with the collection."); |
| } |
| |
| ocmh.validateConfigOrThrowSolrException(configName); |
| |
| String router = message.getStr("router.name", DocRouter.DEFAULT_NAME); |
| |
| // fail fast if parameters are wrong or incomplete |
| List<String> shardNames = populateShardNames(message, router); |
| checkReplicaTypes(message); |
| |
| |
| |
| try { |
| |
| final String async = message.getStr(ASYNC); |
| |
| ZkStateReader zkStateReader = ocmh.zkStateReader; |
| |
| OverseerCollectionMessageHandler.createConfNode(stateManager, configName, collectionName); |
| |
| Map<String,String> collectionParams = new HashMap<>(); |
| Map<String,Object> collectionProps = message.getProperties(); |
| for (Map.Entry<String, Object> entry : collectionProps.entrySet()) { |
| String propName = entry.getKey(); |
| if (propName.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) { |
| collectionParams.put(propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), (String) entry.getValue()); |
| } |
| } |
| |
| createCollectionZkNode(stateManager, collectionName, collectionParams); |
| |
| ocmh.overseer.offerStateUpdate(Utils.toJSON(message)); |
| |
| // wait for a while until we see the collection |
| TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource); |
| boolean created = false; |
| while (! waitUntil.hasTimedOut()) { |
| waitUntil.sleep(100); |
| created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName); |
| if(created) break; |
| } |
| if (!created) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName); |
| } |
| |
| // refresh cluster state |
| clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState(); |
| |
| List<ReplicaPosition> replicaPositions = null; |
| try { |
| replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName), |
| message, shardNames, ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance()); |
| } catch (Assign.AssignmentException e) { |
| ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName); |
| new DeleteCollectionCmd(ocmh).call(clusterState, deleteMessage, results); |
| // unwrap the exception |
| throw new SolrException(ErrorCode.BAD_REQUEST, e.getMessage(), e.getCause()); |
| } |
| |
| if (replicaPositions.isEmpty()) { |
| log.debug("Finished create command for collection: {}", collectionName); |
| return; |
| } |
| |
| final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(async); |
| if (log.isDebugEnabled()) { |
| log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}", |
| collectionName, shardNames, message)); |
| } |
| Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>(); |
| ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); |
| for (ReplicaPosition replicaPosition : replicaPositions) { |
| String nodeName = replicaPosition.node; |
| |
| String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(), |
| ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName), |
| replicaPosition.shard, replicaPosition.type, true); |
| if (log.isDebugEnabled()) { |
| log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}" |
| , coreName, replicaPosition.shard, collectionName, nodeName)); |
| } |
| |
| String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName); |
| // create the replica in the collection's state.json in ZK prior to creating the core. |
| // Otherwise the core creation fails |
| ZkNodeProps props = new ZkNodeProps( |
| Overseer.QUEUE_OPERATION, ADDREPLICA.toString(), |
| ZkStateReader.COLLECTION_PROP, collectionName, |
| ZkStateReader.SHARD_ID_PROP, replicaPosition.shard, |
| ZkStateReader.CORE_NAME_PROP, coreName, |
| ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), |
| ZkStateReader.NODE_NAME_PROP, nodeName, |
| ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(), |
| CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState)); |
| ocmh.overseer.offerStateUpdate(Utils.toJSON(props)); |
| |
| // Need to create new params for each request |
| ModifiableSolrParams params = new ModifiableSolrParams(); |
| params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString()); |
| |
| params.set(CoreAdminParams.NAME, coreName); |
| params.set(COLL_CONF, configName); |
| params.set(CoreAdminParams.COLLECTION, collectionName); |
| params.set(CoreAdminParams.SHARD, replicaPosition.shard); |
| params.set(ZkStateReader.NUM_SHARDS_PROP, shardNames.size()); |
| params.set(CoreAdminParams.NEW_COLLECTION, "true"); |
| params.set(CoreAdminParams.REPLICA_TYPE, replicaPosition.type.name()); |
| |
| if (async != null) { |
| String coreAdminAsyncId = async + Math.abs(System.nanoTime()); |
| params.add(ASYNC, coreAdminAsyncId); |
| shardRequestTracker.track(nodeName, coreAdminAsyncId); |
| } |
| ocmh.addPropertyParams(message, params); |
| |
| ShardRequest sreq = new ShardRequest(); |
| sreq.nodeName = nodeName; |
| params.set("qt", ocmh.adminPath); |
| sreq.purpose = 1; |
| sreq.shards = new String[]{baseUrl}; |
| sreq.actualShards = sreq.shards; |
| sreq.params = params; |
| |
| coresToCreate.put(coreName, sreq); |
| } |
| |
| // wait for all replica entries to be created |
| Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet()); |
| for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) { |
| ShardRequest sreq = e.getValue(); |
| sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName()); |
| shardHandler.submit(sreq, sreq.shards[0], sreq.params); |
| } |
| |
| shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet()); |
| @SuppressWarnings({"rawtypes"}) |
| boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0; |
| if (failure) { |
| // Let's cleanup as we hit an exception |
| // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success' |
| // element, which may be interpreted by the user as a positive ack |
| ocmh.cleanupCollection(collectionName, new NamedList<Object>()); |
| log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName); |
| throw new SolrException(ErrorCode.BAD_REQUEST, "Underlying core creation failed while creating collection: " + collectionName); |
| } else { |
| log.debug("Finished create command on all shards for collection: {}", collectionName); |
| |
| // Emit a warning about production use of data driven functionality |
| boolean defaultConfigSetUsed = message.getStr(COLL_CONF) == null || |
| message.getStr(COLL_CONF).equals(DEFAULT_CONFIGSET_NAME); |
| if (defaultConfigSetUsed) { |
| results.add("warning", "Using _default configset. Data driven schema functionality" |
| + " is enabled by default, which is NOT RECOMMENDED for production use. To turn it off:" |
| + " curl http://{host:port}/solr/" + collectionName + "/config -d '{\"set-user-property\": {\"update.autoCreateFields\":\"false\"}}'"); |
| } |
| } |
| |
| // create an alias pointing to the new collection, if different from the collectionName |
| if (!alias.equals(collectionName)) { |
| ocmh.zkStateReader.aliasesManager.applyModificationAndExportToZk(a -> a.cloneWithCollectionAlias(alias, collectionName)); |
| } |
| |
| } catch (SolrException ex) { |
| throw ex; |
| } catch (Exception ex) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex); |
| } |
| } |
| |
| private static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState, |
| DocCollection docCollection, |
| ZkNodeProps message, |
| List<String> shardNames, PlacementPlugin placementPlugin) throws IOException, InterruptedException, Assign.AssignmentException { |
| final String collectionName = message.getStr(NAME); |
| // look at the replication factor and see if it matches reality |
| // if it does not, find best nodes to create more cores |
| int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0); |
| int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1)); |
| int numPullReplicas = message.getInt(PULL_REPLICAS, 0); |
| |
| int numSlices = shardNames.size(); |
| |
| // we need to look at every node and see how many cores it serves |
| // add our new cores to existing nodes serving the least number of cores |
| // but (for now) require that each core goes on a distinct node. |
| |
| List<ReplicaPosition> replicaPositions; |
| List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM); |
| if (nodeList.isEmpty()) { |
| log.warn("It is unusual to create a collection ({}) without cores.", collectionName); |
| |
| replicaPositions = new ArrayList<>(); |
| } else { |
| int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas; |
| if (totalNumReplicas > nodeList.size()) { |
| log.warn("Specified number of replicas of {} on collection {} is higher than the number of Solr instances currently live or live and part of your {}({}). {}" |
| , totalNumReplicas |
| , collectionName |
| , OverseerCollectionMessageHandler.CREATE_NODE_SET |
| , nodeList.size() |
| , "It's unusual to run two replica of the same slice on the same Solr-instance."); |
| } |
| |
| Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder() |
| .forCollection(collectionName) |
| .forShard(shardNames) |
| .assignNrtReplicas(numNrtReplicas) |
| .assignTlogReplicas(numTlogReplicas) |
| .assignPullReplicas(numPullReplicas) |
| .onNodes(nodeList) |
| .build(); |
| Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(placementPlugin, clusterState, docCollection); |
| replicaPositions = assignStrategy.assign(cloudManager, assignRequest); |
| } |
| return replicaPositions; |
| } |
| |
| public static void checkReplicaTypes(ZkNodeProps message) { |
| int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0); |
| int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas > 0 ? 0 : 1)); |
| |
| if (numNrtReplicas + numTlogReplicas <= 0) { |
| throw new SolrException(ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0"); |
| } |
| } |
| |
| public static List<String> populateShardNames(ZkNodeProps message, String router) { |
| List<String> shardNames = new ArrayList<>(); |
| Integer numSlices = message.getInt(OverseerCollectionMessageHandler.NUM_SLICES, null); |
| if (ImplicitDocRouter.NAME.equals(router)) { |
| ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null)); |
| numSlices = shardNames.size(); |
| } else { |
| if (numSlices == null) { |
| throw new SolrException(ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " is a required param (when using CompositeId router)."); |
| } |
| if (numSlices <= 0) { |
| throw new SolrException(ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " must be > 0"); |
| } |
| ClusterStateMutator.getShardNames(numSlices, shardNames); |
| } |
| return shardNames; |
| } |
| |
| String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException { |
| String configName = message.getStr(COLL_CONF); |
| |
| if (configName == null) { |
| // if there is only one conf, use that |
| List<String> configNames = null; |
| try { |
| configNames = ocmh.zkStateReader.getZkClient().getChildren(ZkConfigManager.CONFIGS_ZKNODE, null, true); |
| if (configNames.contains(DEFAULT_CONFIGSET_NAME)) { |
| if (CollectionAdminParams.SYSTEM_COLL.equals(coll)) { |
| return coll; |
| } else { |
| String intendedConfigSetName = getSuffixedNameForAutoGeneratedConfigSet(coll); |
| copyDefaultConfigSetTo(configNames, intendedConfigSetName); |
| return intendedConfigSetName; |
| } |
| } else if (configNames != null && configNames.size() == 1) { |
| configName = configNames.get(0); |
| // no config set named, but there is only 1 - use it |
| log.info("Only one config set found in zk - using it: {}", configName); |
| } |
| } catch (KeeperException.NoNodeException e) { |
| |
| } |
| } |
| return "".equals(configName)? null: configName; |
| } |
| |
| /** |
| * Copies the _default configset to the specified configset name (overwrites if pre-existing) |
| */ |
| private void copyDefaultConfigSetTo(List<String> configNames, String targetConfig) { |
| ZkConfigManager configManager = new ZkConfigManager(ocmh.zkStateReader.getZkClient()); |
| |
| // if a configset named collection exists, re-use it |
| if (configNames.contains(targetConfig)) { |
| log.info("There exists a configset by the same name as the collection we're trying to create: {}, re-using it.", targetConfig); |
| return; |
| } |
| // Copy _default into targetConfig |
| try { |
| configManager.copyConfigDir(DEFAULT_CONFIGSET_NAME, targetConfig, new HashSet<>()); |
| } catch (Exception e) { |
| throw new SolrException(ErrorCode.INVALID_STATE, "Error while copying _default to " + targetConfig, e); |
| } |
| } |
| |
| public static void createCollectionZkNode(DistribStateManager stateManager, String collection, Map<String,String> params) { |
| log.debug("Check for collection zkNode: {}", collection); |
| String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection; |
| // clean up old terms node |
| String termsPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms"; |
| try { |
| stateManager.removeRecursively(termsPath, true, true); |
| } catch (InterruptedException e) { |
| Thread.interrupted(); |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e); |
| } catch (KeeperException | IOException | NotEmptyException | BadVersionException e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e); |
| } |
| try { |
| if (!stateManager.hasData(collectionPath)) { |
| log.debug("Creating collection in ZooKeeper: {}", collection); |
| |
| try { |
| Map<String,Object> collectionProps = new HashMap<>(); |
| |
| if (params.size() > 0) { |
| collectionProps.putAll(params); |
| // if the config name wasn't passed in, use the default |
| if (!collectionProps.containsKey(ZkController.CONFIGNAME_PROP)) { |
| // users can create the collection node and conf link ahead of time, or this may return another option |
| getConfName(stateManager, collection, collectionPath, collectionProps); |
| } |
| |
| } else if (System.getProperty("bootstrap_confdir") != null) { |
| String defaultConfigName = System.getProperty(ZkController.COLLECTION_PARAM_PREFIX + ZkController.CONFIGNAME_PROP, collection); |
| |
| // if we are bootstrapping a collection, default the config for |
| // a new collection to the collection we are bootstrapping |
| log.info("Setting config for collection: {} to {}", collection, defaultConfigName); |
| |
| Properties sysProps = System.getProperties(); |
| for (String sprop : System.getProperties().stringPropertyNames()) { |
| if (sprop.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) { |
| collectionProps.put(sprop.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), sysProps.getProperty(sprop)); |
| } |
| } |
| |
| // if the config name wasn't passed in, use the default |
| if (!collectionProps.containsKey(ZkController.CONFIGNAME_PROP)) |
| collectionProps.put(ZkController.CONFIGNAME_PROP, defaultConfigName); |
| |
| } else if (Boolean.getBoolean("bootstrap_conf")) { |
| // the conf name should should be the collection name of this core |
| collectionProps.put(ZkController.CONFIGNAME_PROP, collection); |
| } else { |
| getConfName(stateManager, collection, collectionPath, collectionProps); |
| } |
| |
| collectionProps.remove(ZkStateReader.NUM_SHARDS_PROP); // we don't put numShards in the collections properties |
| |
| ZkNodeProps zkProps = new ZkNodeProps(collectionProps); |
| stateManager.makePath(collectionPath, Utils.toJSON(zkProps), CreateMode.PERSISTENT, false); |
| |
| } catch (KeeperException e) { |
| //TODO shouldn't the stateManager ensure this does not happen; should throw AlreadyExistsException |
| // it's okay if the node already exists |
| if (e.code() != KeeperException.Code.NODEEXISTS) { |
| throw e; |
| } |
| } catch (AlreadyExistsException e) { |
| // it's okay if the node already exists |
| } |
| } else { |
| log.debug("Collection zkNode exists"); |
| } |
| |
| } catch (KeeperException e) { |
| // it's okay if another beats us creating the node |
| if (e.code() == KeeperException.Code.NODEEXISTS) { |
| return; |
| } |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e); |
| } catch (IOException e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e); |
| } catch (InterruptedException e) { |
| Thread.interrupted(); |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e); |
| } |
| |
| } |
| |
| private static void getConfName(DistribStateManager stateManager, String collection, String collectionPath, Map<String,Object> collectionProps) throws IOException, |
| KeeperException, InterruptedException { |
| // check for configName |
| log.debug("Looking for collection configName"); |
| if (collectionProps.containsKey("configName")) { |
| if (log.isInfoEnabled()) { |
| log.info("configName was passed as a param {}", collectionProps.get("configName")); |
| } |
| return; |
| } |
| |
| List<String> configNames = null; |
| int retry = 1; |
| int retryLimt = 6; |
| for (; retry < retryLimt; retry++) { |
| if (stateManager.hasData(collectionPath)) { |
| VersionedData data = stateManager.getData(collectionPath); |
| ZkNodeProps cProps = ZkNodeProps.load(data.getData()); |
| if (cProps.containsKey(ZkController.CONFIGNAME_PROP)) { |
| break; |
| } |
| } |
| |
| try { |
| configNames = stateManager.listData(ZkConfigManager.CONFIGS_ZKNODE); |
| } catch (NoSuchElementException | NoNodeException e) { |
| // just keep trying |
| } |
| |
| // check if there's a config set with the same name as the collection |
| if (configNames != null && configNames.contains(collection)) { |
| log.info("Could not find explicit collection configName, but found config name matching collection name - using that set."); |
| collectionProps.put(ZkController.CONFIGNAME_PROP, collection); |
| break; |
| } |
| // if _default exists, use that |
| if (configNames != null && configNames.contains(DEFAULT_CONFIGSET_NAME)) { |
| log.info("Could not find explicit collection configName, but found _default config set - using that set."); |
| collectionProps.put(ZkController.CONFIGNAME_PROP, DEFAULT_CONFIGSET_NAME); |
| break; |
| } |
| // if there is only one conf, use that |
| if (configNames != null && configNames.size() == 1) { |
| // no config set named, but there is only 1 - use it |
| if (log.isInfoEnabled()) { |
| log.info("Only one config set found in zk - using it: {}", configNames.get(0)); |
| } |
| collectionProps.put(ZkController.CONFIGNAME_PROP, configNames.get(0)); |
| break; |
| } |
| |
| log.info("Could not find collection configName - pausing for 3 seconds and trying again - try: {}", retry); |
| Thread.sleep(3000); |
| } |
| if (retry == retryLimt) { |
| log.error("Could not find configName for collection {}", collection); |
| throw new ZooKeeperException( |
| SolrException.ErrorCode.SERVER_ERROR, |
| "Could not find configName for collection " + collection + " found:" + configNames); |
| } |
| } |
| } |