| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.handler.component; |
| |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import net.jcip.annotations.NotThreadSafe; |
| import org.apache.solr.client.solrj.SolrRequest; |
| import org.apache.solr.client.solrj.SolrResponse; |
| import org.apache.solr.client.solrj.impl.LBHttp2SolrClient; |
| import org.apache.solr.client.solrj.impl.LBSolrClient; |
| import org.apache.solr.client.solrj.request.QueryRequest; |
| import org.apache.solr.client.solrj.routing.NoOpReplicaListTransformer; |
| import org.apache.solr.client.solrj.routing.ReplicaListTransformer; |
| import org.apache.solr.cloud.CloudDescriptor; |
| import org.apache.solr.cloud.ZkController; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.ZkCoreNodeProps; |
| import org.apache.solr.common.params.CommonParams; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.params.ShardParams; |
| import org.apache.solr.common.params.SolrParams; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.core.CoreDescriptor; |
| import org.apache.solr.request.SolrQueryRequest; |
| import org.apache.solr.request.SolrRequestInfo; |
| import org.apache.solr.security.AllowListUrlChecker; |
| |
| @NotThreadSafe |
| public class HttpShardHandler extends ShardHandler { |
| /** |
| * If the request context map has an entry with this key and Boolean.TRUE as value, {@link |
| * #prepDistributed(ResponseBuilder)} will only include {@link |
| * org.apache.solr.common.cloud.Replica.Type#NRT} replicas as possible destination of the |
| * distributed request (or a leader replica of type {@link |
| * org.apache.solr.common.cloud.Replica.Type#TLOG}). This is used by the RealtimeGet handler, |
| * since other types of replicas shouldn't respond to RTG requests |
| */ |
| public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime"; |
| |
| private HttpShardHandlerFactory httpShardHandlerFactory; |
| private Map<ShardResponse, CompletableFuture<LBSolrClient.Rsp>> responseFutureMap; |
| private BlockingQueue<ShardResponse> responses; |
| private AtomicInteger pending; |
| private Map<String, List<String>> shardToURLs; |
| private LBHttp2SolrClient lbClient; |
| |
| public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) { |
| this.httpShardHandlerFactory = httpShardHandlerFactory; |
| this.lbClient = httpShardHandlerFactory.loadbalancer; |
| this.pending = new AtomicInteger(0); |
| this.responses = new LinkedBlockingQueue<>(); |
| this.responseFutureMap = new HashMap<>(); |
| |
| // maps "localhost:8983|localhost:7574" to a shuffled |
| // List("http://localhost:8983","http://localhost:7574") |
| // This is primarily to keep track of what order we should use to query the replicas of a shard |
| // so that we use the same replica for all phases of a distributed request. |
| shardToURLs = new HashMap<>(); |
| } |
| |
| private static class SimpleSolrResponse extends SolrResponse { |
| |
| volatile long elapsedTime; |
| |
| volatile NamedList<Object> nl; |
| |
| @Override |
| public long getElapsedTime() { |
| return elapsedTime; |
| } |
| |
| @Override |
| public NamedList<Object> getResponse() { |
| return nl; |
| } |
| |
| @Override |
| public void setResponse(NamedList<Object> rsp) { |
| nl = rsp; |
| } |
| |
| @Override |
| public void setElapsedTime(long elapsedTime) { |
| this.elapsedTime = elapsedTime; |
| } |
| } |
| |
| // Not thread safe... don't use in Callable. |
| // Don't modify the returned URL list. |
| private List<String> getURLs(String shard) { |
| List<String> urls = shardToURLs.get(shard); |
| if (urls == null) { |
| urls = httpShardHandlerFactory.buildURLList(shard); |
| shardToURLs.put(shard, urls); |
| } |
| return urls; |
| } |
| |
| @Override |
| public void submit( |
| final ShardRequest sreq, final String shard, final ModifiableSolrParams params) { |
| // do this outside of the callable for thread safety reasons |
| final List<String> urls = getURLs(shard); |
| |
| params.remove(CommonParams.WT); // use default (currently javabin) |
| params.remove(CommonParams.VERSION); |
| QueryRequest req = makeQueryRequest(sreq, params, shard); |
| req.setMethod(SolrRequest.METHOD.POST); |
| |
| LBSolrClient.Req lbReq = httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls); |
| |
| ShardResponse srsp = new ShardResponse(); |
| if (sreq.nodeName != null) { |
| srsp.setNodeName(sreq.nodeName); |
| } |
| srsp.setShardRequest(sreq); |
| srsp.setShard(shard); |
| SimpleSolrResponse ssr = new SimpleSolrResponse(); |
| srsp.setSolrResponse(ssr); |
| |
| pending.incrementAndGet(); |
| // if there are no shards available for a slice, urls.size()==0 |
| if (urls.isEmpty()) { |
| // TODO: what's the right error code here? We should use the same thing when |
| // all of the servers for a shard are down. |
| SolrException exception = |
| new SolrException( |
| SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard); |
| srsp.setException(exception); |
| srsp.setResponseCode(exception.code()); |
| responses.add(srsp); |
| return; |
| } |
| |
| long startTime = System.nanoTime(); |
| SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo(); |
| if (requestInfo != null) { |
| req.setUserPrincipal(requestInfo.getReq().getUserPrincipal()); |
| } |
| |
| CompletableFuture<LBSolrClient.Rsp> future = this.lbClient.requestAsync(lbReq); |
| future.whenComplete( |
| (rsp, throwable) -> { |
| if (rsp != null) { |
| ssr.nl = rsp.getResponse(); |
| srsp.setShardAddress(rsp.getServer()); |
| ssr.elapsedTime = |
| TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); |
| responses.add(srsp); |
| } else if (throwable != null) { |
| ssr.elapsedTime = |
| TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); |
| srsp.setException(throwable); |
| if (throwable instanceof SolrException) { |
| srsp.setResponseCode(((SolrException) throwable).code()); |
| } |
| responses.add(srsp); |
| } |
| }); |
| |
| responseFutureMap.put(srsp, future); |
| } |
| |
| /** Subclasses could modify the request based on the shard */ |
| protected QueryRequest makeQueryRequest( |
| final ShardRequest sreq, ModifiableSolrParams params, String shard) { |
| // use generic request to avoid extra processing of queries |
| return new QueryRequest(params); |
| } |
| |
| /** Subclasses could modify the Response based on the shard */ |
| protected ShardResponse transfomResponse( |
| final ShardRequest sreq, ShardResponse rsp, String shard) { |
| return rsp; |
| } |
| |
| /** |
| * returns a ShardResponse of the last response correlated with a ShardRequest. This won't return |
| * early if it runs into an error. |
| */ |
| @Override |
| public ShardResponse takeCompletedIncludingErrors() { |
| return take(false); |
| } |
| |
| /** |
| * returns a ShardResponse of the last response correlated with a ShardRequest, or immediately |
| * returns a ShardResponse if there was an error detected |
| */ |
| @Override |
| public ShardResponse takeCompletedOrError() { |
| return take(true); |
| } |
| |
| private ShardResponse take(boolean bailOnError) { |
| try { |
| while (pending.get() > 0) { |
| ShardResponse rsp = responses.take(); |
| responseFutureMap.remove(rsp); |
| |
| pending.decrementAndGet(); |
| if (bailOnError && rsp.getException() != null) |
| return rsp; // if exception, return immediately |
| // add response to the response list... we do this after the take() and |
| // not after the completion of "call" so we know when the last response |
| // for a request was received. Otherwise we might return the same |
| // request more than once. |
| rsp.getShardRequest().responses.add(rsp); |
| if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) { |
| return rsp; |
| } |
| } |
| } catch (InterruptedException e) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); |
| } |
| return null; |
| } |
| |
| @Override |
| public void cancelAll() { |
| for (CompletableFuture<LBSolrClient.Rsp> future : responseFutureMap.values()) { |
| future.cancel(true); |
| pending.decrementAndGet(); |
| } |
| responseFutureMap.clear(); |
| } |
| |
| @Override |
| public void prepDistributed(ResponseBuilder rb) { |
| final SolrQueryRequest req = rb.req; |
| final SolrParams params = req.getParams(); |
| final String shards = params.get(ShardParams.SHARDS); |
| |
| CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor(); |
| CloudDescriptor cloudDescriptor = req.getCloudDescriptor(); |
| ZkController zkController = req.getCoreContainer().getZkController(); |
| |
| final ReplicaListTransformer replicaListTransformer = |
| httpShardHandlerFactory.getReplicaListTransformer(req); |
| |
| AllowListUrlChecker urlChecker = req.getCoreContainer().getAllowListUrlChecker(); |
| if (shards != null |
| && zkController == null |
| && urlChecker.isEnabled() |
| && !urlChecker.hasExplicitAllowList()) { |
| throw new SolrException( |
| SolrException.ErrorCode.FORBIDDEN, |
| "solr.xml property '" |
| + AllowListUrlChecker.URL_ALLOW_LIST |
| + "' not configured but required (in lieu of ZkController and ClusterState) when using the '" |
| + ShardParams.SHARDS |
| + "' parameter. " |
| + AllowListUrlChecker.SET_SOLR_DISABLE_URL_ALLOW_LIST_CLUE); |
| } |
| |
| ReplicaSource replicaSource; |
| if (zkController != null) { |
| boolean onlyNrt = Boolean.TRUE == req.getContext().get(ONLY_NRT_REPLICAS); |
| |
| replicaSource = |
| new CloudReplicaSource.Builder() |
| .params(params) |
| .zkStateReader(zkController.getZkStateReader()) |
| .allowListUrlChecker(urlChecker) |
| .replicaListTransformer(replicaListTransformer) |
| .collection(cloudDescriptor.getCollectionName()) |
| .onlyNrt(onlyNrt) |
| .build(); |
| rb.slices = replicaSource.getSliceNames().toArray(new String[replicaSource.getSliceCount()]); |
| |
| if (canShortCircuit(rb.slices, onlyNrt, params, cloudDescriptor)) { |
| rb.isDistrib = false; |
| rb.shortCircuitedURL = |
| ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), coreDescriptor.getName()); |
| return; |
| // We shouldn't need to do anything to handle "shard.rows" since it was previously meant to |
| // be an optimization? |
| } |
| |
| if (!ShardParams.getShardsTolerantAsBool(params)) { |
| for (int i = 0; i < rb.slices.length; i++) { |
| if (replicaSource.getReplicasBySlice(i).isEmpty()) { |
| final ReplicaSource allActiveReplicaSource = |
| new CloudReplicaSource.Builder() |
| .params(params) |
| .zkStateReader(zkController.getZkStateReader()) |
| .allowListUrlChecker(AllowListUrlChecker.ALLOW_ALL) |
| .replicaListTransformer(NoOpReplicaListTransformer.INSTANCE) |
| .collection(cloudDescriptor.getCollectionName()) |
| .onlyNrt(false) |
| .build(); |
| final String adjective = |
| (allActiveReplicaSource.getReplicasBySlice(i).isEmpty() ? "active" : "eligible"); |
| // stop the check when there are no replicas available for a shard |
| // todo fix use of slices[i] which can be null if user specified urls in shards param |
| throw new SolrException( |
| SolrException.ErrorCode.SERVICE_UNAVAILABLE, |
| "no " + adjective + " servers hosting shard: " + rb.slices[i]); |
| } |
| } |
| } |
| } else { |
| replicaSource = |
| new StandaloneReplicaSource.Builder() |
| .allowListUrlChecker(urlChecker) |
| .shards(shards) |
| .build(); |
| rb.slices = new String[replicaSource.getSliceCount()]; |
| } |
| |
| rb.shards = new String[rb.slices.length]; |
| for (int i = 0; i < rb.slices.length; i++) { |
| rb.shards[i] = createSliceShardsStr(replicaSource.getReplicasBySlice(i)); |
| } |
| |
| String shards_rows = params.get(ShardParams.SHARDS_ROWS); |
| if (shards_rows != null) { |
| rb.shards_rows = Integer.parseInt(shards_rows); |
| } |
| String shards_start = params.get(ShardParams.SHARDS_START); |
| if (shards_start != null) { |
| rb.shards_start = Integer.parseInt(shards_start); |
| } |
| } |
| |
| private static String createSliceShardsStr(final List<String> shardUrls) { |
| return String.join("|", shardUrls); |
| } |
| |
| private boolean canShortCircuit( |
| String[] slices, |
| boolean onlyNrtReplicas, |
| SolrParams params, |
| CloudDescriptor cloudDescriptor) { |
| // Are we hosting the shard that this request is for, and are we active? If so, then handle it |
| // ourselves and make it a non-distributed request. |
| String ourSlice = cloudDescriptor.getShardId(); |
| String ourCollection = cloudDescriptor.getCollectionName(); |
| // Some requests may only be fulfilled by replicas of type Replica.Type.NRT |
| if (slices.length == 1 |
| && slices[0] != null |
| && (slices[0].equals(ourSlice) |
| || slices[0].equals( |
| ourCollection + "_" + ourSlice)) // handle the <collection>_<slice> format |
| && cloudDescriptor.getLastPublished() == Replica.State.ACTIVE |
| && (!onlyNrtReplicas || cloudDescriptor.getReplicaType() == Replica.Type.NRT)) { |
| // currently just a debugging parameter to check distrib search on a single node |
| boolean shortCircuit = params.getBool("shortCircuit", true); |
| |
| String targetHandler = params.get(ShardParams.SHARDS_QT); |
| // if a different handler is specified, don't short-circuit |
| shortCircuit = shortCircuit && targetHandler == null; |
| |
| return shortCircuit; |
| } |
| return false; |
| } |
| |
| @Override |
| public ShardHandlerFactory getShardHandlerFactory() { |
| return httpShardHandlerFactory; |
| } |
| } |