blob: d95f946578b0c366714ded67a157375297970ea4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.api.collections.RoutedAlias;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
/**
* Distributes update requests to a series of collections partitioned by a "routing" field. Issues
* requests to create new collections on-demand.
*
* Depends on this core having a special core property that points to the alias name that this collection is a part of.
* And further requires certain properties on the Alias. Collections pointed to by the alias must be named for the alias
* plus underscored ('_') and a routing specifier specific to the type of routed alias. These collections should not be
* created by the user, but are created automatically by the routed alias.
*
* @since 7.2.0 (formerly known as TimeRoutedAliasUpdateProcessor)
*/
public class RoutedAliasUpdateProcessor extends UpdateRequestProcessor {
private static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// make sure we don't request collection properties any more frequently than once a minute during
// slow continuous indexing, and even less frequently during bulk indexing. (cache is updated by zk
// watch instead of re-requested until indexing has been stopped for the duration specified here)
public static final int CACHE_FOR_MILLIS = 60000;
// refs to std infrastructure
private final SolrQueryRequest req;
private final SolrCmdDistributor cmdDistrib;
private final ZkController zkController;
// Stuff specific to this class
private final String thisCollection;
private final RoutedAlias routedAlias;
private final SolrParams outParamsToLeader;
public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
String aliasName = null;
// Demeter please don't arrest us... hide your eyes :(
// todo: a core should have a more direct way of finding a collection name, and the collection properties
SolrCore core = req.getCore();
CoreDescriptor coreDescriptor = core.getCoreDescriptor();
CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor();
if (cloudDescriptor != null) {
String collectionName = cloudDescriptor.getCollectionName();
CoreContainer coreContainer = core.getCoreContainer();
ZkController zkController = coreContainer.getZkController();
ZkStateReader zkStateReader = zkController.getZkStateReader();
Map<String, String> collectionProperties = zkStateReader.getCollectionProperties(collectionName, CACHE_FOR_MILLIS);
aliasName = collectionProperties.get(RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP);
}
// fall back on core properties (legacy)
if (StringUtils.isBlank(aliasName)) {
aliasName = coreDescriptor.getCoreProperty(RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, null);
}
final DistribPhase shardDistribPhase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
final DistribPhase aliasDistribPhase =
DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
if (aliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
// if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
// TODO this may eventually not be true but at the moment it is
// if shardDistribPhase is not NONE, then the phase is after the scope of this URP
return next;
} else {
try {
RoutedAlias alias = RoutedAlias.fromProps(aliasName, getAliasProps(req, aliasName));
return new RoutedAliasUpdateProcessor(req, next, aliasDistribPhase, alias);
} catch (Exception e) { // ensure we throw SERVER_ERROR not BAD_REQUEST at this stage
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Routed alias has invalid properties: " + e, e);
}
}
}
private static Map<String, String> getAliasProps(SolrQueryRequest req, String aliasName) {
ZkController zkController = req.getCore().getCoreContainer().getZkController();
final Map<String, String> aliasProperties = zkController.getZkStateReader().getAliases().getCollectionAliasProperties(aliasName);
if (aliasProperties.isEmpty()) {
throw RoutedAlias.newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
}
return aliasProperties;
}
private RoutedAliasUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next,
DistribPhase aliasDistribPhase, RoutedAlias routedAlias) {
super(next);
this.routedAlias = routedAlias;
assert aliasDistribPhase == DistribPhase.NONE;
final SolrCore core = req.getCore();
final CoreContainer cc = core.getCoreContainer();
this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
this.req = req;
this.zkController = cc.getZkController();
this.cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
// Don't distribute these params; they will be distributed from the local processCommit separately.
// (See RequestHandlerUtils.handleCommit from which this list was retrieved from)
outParams.remove(UpdateParams.OPTIMIZE);
outParams.remove(UpdateParams.COMMIT);
outParams.remove(UpdateParams.SOFT_COMMIT);
outParams.remove(UpdateParams.PREPARE_COMMIT);
outParams.remove(UpdateParams.ROLLBACK);
// Add these...
// Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain)
outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString());
// Signal this is a distributed search from this URP (see #wrap())
outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName()));
outParamsToLeader = outParams;
}
private String getAliasName() {
return routedAlias.getAliasName();
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
routedAlias.validateRouteValue(cmd);
// to avoid potential for race conditions, this next method should not get called again unless
// we have created a collection synchronously
routedAlias.updateParsedCollectionAliases(this.zkController.zkStateReader, false);
String targetCollection = routedAlias.createCollectionsIfRequired(cmd);
if (thisCollection.equals(targetCollection)) {
// pass on through; we've reached the right collection
super.processAdd(cmd);
} else {
// send to the right collection
SolrCmdDistributor.Node targetLeaderNode = routeDocToSlice(targetCollection, cmd.getSolrInputDocument());
cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
}
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly? It doesn't.
}
// Not supported by SolrCmdDistributor and is sketchy any way
// @Override
// public void processRollback(RollbackUpdateCommand cmd) throws IOException {
// }
@Override
public void finish() throws IOException {
try {
cmdDistrib.finish();
final List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
if (!errors.isEmpty()) {
throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
}
} finally {
super.finish();
}
}
@Override
protected void doClose() {
try {
cmdDistrib.close();
} finally {
super.doClose();
}
}
private SolrCmdDistributor.Node routeDocToSlice(String collection, SolrInputDocument doc) {
SchemaField uniqueKeyField = req.getSchema().getUniqueKeyField();
// schema might not have key field...
String idFieldName = uniqueKeyField == null ? null : uniqueKeyField.getName();
String idValue = uniqueKeyField == null ? null : doc.getFieldValue(idFieldName).toString();
DocCollection coll = zkController.getClusterState().getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(idValue, doc, null, req.getParams(), coll);
return getLeaderNode(collection, slice);
}
private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
final Aliases aliases = zkController.getZkStateReader().getAliases();
List<String> collections = aliases.getCollectionAliasListMap().get(getAliasName());
if (collections == null) {
throw RoutedAlias.newAliasMustExistException(getAliasName());
}
return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList());
}
private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
final Slice[] activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlicesArr();
if (activeSlices.length == 0) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
}
final Slice slice = activeSlices[0];
return getLeaderNode(collection, slice);
}
private SolrCmdDistributor.Node getLeaderNode(String collection, Slice slice) {
//TODO when should we do StdNode vs RetryNode?
final Replica leader = slice.getLeader();
if (leader == null) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
}
return new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT);
}
}