blob: 8e77b0c0c37355191883dd3b2bcdce145082a6a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreContainer;
/**
* A ShardHandlerFactory that extends HttpShardHandlerFactory and
* tracks requests made to nodes/shards such that interested parties
* can watch such requests and make assertions inside tests
* <p>
* This is a test helper only and should *not* be used for production.
*/
public class TrackingShardHandlerFactory extends HttpShardHandlerFactory {
private Queue<ShardRequestAndParams> queue;
/**
* Set the tracking queue for this factory. All the ShardHandler instances
* created from this factory will share the queue and call {@link java.util.Queue#offer(Object)}
* with a {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
* instance whenever
* {@link org.apache.solr.handler.component.ShardHandler#submit(ShardRequest, String, org.apache.solr.common.params.ModifiableSolrParams)}
* is called before the request is actually submitted to the
* wrapped {@link org.apache.solr.handler.component.HttpShardHandlerFactory} instance.
* <p>
* If a tracking queue is already set then this call will overwrite and replace the
* previous queue with this one.
*
* @param queue the {@link java.util.Queue} to be used for tracking shard requests
*/
public synchronized void setTrackingQueue(Queue<ShardRequestAndParams> queue) {
this.queue = queue;
}
/**
* @return the {@link java.util.Queue} being used for tracking, null if none
* has been set
*/
public synchronized Queue<ShardRequestAndParams> getTrackingQueue() {
return queue;
}
/**
* @return true if a tracking queue has been set through
* {@link #setTrackingQueue(java.util.List, java.util.Queue)}, false otherwise
*/
public synchronized boolean isTracking() {
return queue != null;
}
@Override
public ShardHandler getShardHandler() {
return super.getShardHandler();
}
@Override
public ShardHandler getShardHandler(Http2SolrClient client) {
final ShardHandlerFactory factory = this;
final ShardHandler wrapped = super.getShardHandler(client);
return new HttpShardHandler(this, client) {
@Override
public void prepDistributed(ResponseBuilder rb) {
wrapped.prepDistributed(rb);
}
@Override
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
synchronized (TrackingShardHandlerFactory.this) {
if (isTracking()) {
queue.offer(new ShardRequestAndParams(sreq, shard, params));
}
}
wrapped.submit(sreq, shard, params);
}
@Override
public ShardResponse takeCompletedIncludingErrors() {
return wrapped.takeCompletedIncludingErrors();
}
@Override
public ShardResponse takeCompletedOrError() {
return wrapped.takeCompletedOrError();
}
@Override
public void cancelAll() {
wrapped.cancelAll();
}
@Override
public ShardHandlerFactory getShardHandlerFactory() {
return factory;
}
};
}
@Override
public ShardHandler getShardHandler(HttpClient httpClient) {
final ShardHandlerFactory factory = this;
final ShardHandler wrapped = super.getShardHandler(httpClient);
return new HttpShardHandler(this, null) {
@Override
public void prepDistributed(ResponseBuilder rb) {
wrapped.prepDistributed(rb);
}
@Override
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
synchronized (TrackingShardHandlerFactory.this) {
if (isTracking()) {
queue.offer(new ShardRequestAndParams(sreq, shard, params));
}
}
wrapped.submit(sreq, shard, params);
}
@Override
protected NamedList<Object> request(String url, @SuppressWarnings({"rawtypes"})SolrRequest req) throws IOException, SolrServerException {
try (SolrClient client = new HttpSolrClient.Builder(url).withHttpClient(httpClient).build()) {
return client.request(req);
}
}
@Override
public ShardResponse takeCompletedIncludingErrors() {
return wrapped.takeCompletedIncludingErrors();
}
@Override
public ShardResponse takeCompletedOrError() {
return wrapped.takeCompletedOrError();
}
@Override
public void cancelAll() {
wrapped.cancelAll();
}
@Override
public ShardHandlerFactory getShardHandlerFactory() {
return factory;
}
};
}
@Override
public void close() {
super.close();
}
/**
* Sets the tracking queue for all nodes participating in this cluster. Once this method returns,
* all search and core admin requests distributed to shards will be submitted to the given queue.
* <p>
* This is equivalent to calling:
* <code>TrackingShardHandlerFactory.setTrackingQueue(cluster.getJettySolrRunners(), queue)</code>
*
* @see org.apache.solr.handler.component.TrackingShardHandlerFactory#setTrackingQueue(java.util.List, java.util.Queue)
*/
public static void setTrackingQueue(MiniSolrCloudCluster cluster, Queue<ShardRequestAndParams> queue) {
setTrackingQueue(cluster.getJettySolrRunners(), queue);
}
/**
* Sets the tracking queue for all nodes participating in this cluster. Once this method returns,
* all search and core admin requests distributed to shards will be submitted to the given queue.
*
* @param runners a list of {@link org.apache.solr.client.solrj.embedded.JettySolrRunner} nodes
* @param queue an implementation of {@link java.util.Queue} which
* accepts {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
* instances
*/
public static void setTrackingQueue(List<JettySolrRunner> runners, Queue<ShardRequestAndParams> queue) {
for (JettySolrRunner runner : runners) {
CoreContainer container = runner.getCoreContainer();
if (container != null) {
ShardHandlerFactory factory = container.getShardHandlerFactory();
assert factory instanceof TrackingShardHandlerFactory : "not a TrackingShardHandlerFactory: "
+ factory.getClass();
@SuppressWarnings("resource")
TrackingShardHandlerFactory trackingShardHandlerFactory = (TrackingShardHandlerFactory) factory;
trackingShardHandlerFactory.setTrackingQueue(queue);
}
}
}
public static class ShardRequestAndParams {
public String shard;
public ShardRequest sreq;
public ModifiableSolrParams params;
public ShardRequestAndParams(ShardRequest sreq, String shard, ModifiableSolrParams params) {
this.sreq = sreq;
this.params = params;
this.shard = shard;
}
@Override
public String toString() {
return "ShardRequestAndParams{" +
"shard='" + shard + '\'' +
", sreq=" + sreq +
", params=" + params +
'}';
}
}
/**
* A queue having helper methods to select requests by shard and purpose.
*
* @see org.apache.solr.handler.component.TrackingShardHandlerFactory#setTrackingQueue(java.util.List, java.util.Queue)
*/
public static class RequestTrackingQueue extends LinkedList<ShardRequestAndParams> {
private final Map<String, List<ShardRequestAndParams>> requests = new ConcurrentHashMap<>();
@Override
public boolean offer(ShardRequestAndParams shardRequestAndParams) {
List<ShardRequestAndParams> list = requests.get(shardRequestAndParams.shard);
if (list == null) {
list = new ArrayList<>();
}
list.add(shardRequestAndParams);
requests.put(shardRequestAndParams.shard, list);
return super.offer(shardRequestAndParams);
}
@Override
public void clear() {
requests.clear();
}
/**
* Retrieve request recorded by this queue which were sent to given collection, shard and purpose
*
* @param zkStateReader the {@link org.apache.solr.common.cloud.ZkStateReader} from which cluster state is read
* @param collectionName the given collection name for which requests have to be extracted
* @param shardId the given shard name for which requests have to be extracted
* @param purpose the shard purpose
* @return instance of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
* or null if none is found
* @throws java.lang.RuntimeException if more than one request is found to the same shard with the same purpose
*/
public ShardRequestAndParams getShardRequestByPurpose(ZkStateReader zkStateReader, String collectionName, String shardId, int purpose) throws RuntimeException {
List<TrackingShardHandlerFactory.ShardRequestAndParams> shardRequests = getShardRequests(zkStateReader, collectionName, shardId);
List<TrackingShardHandlerFactory.ShardRequestAndParams> result = new ArrayList<>(1);
for (TrackingShardHandlerFactory.ShardRequestAndParams request : shardRequests) {
if ((request.sreq.purpose & purpose) != 0) {
result.add(request);
}
}
if (result.size() > 1) {
throw new RuntimeException("Multiple requests to the same shard with the same purpose were found. Requests: " + result);
}
return result.isEmpty() ? null : result.get(0);
}
/**
* Retrieve all requests recorded by this queue which were sent to given collection and shard
*
* @param zkStateReader the {@link org.apache.solr.common.cloud.ZkStateReader} from which cluster state is read
* @param collectionName the given collection name for which requests have to be extracted
* @param shardId the given shard name for which requests have to be extracted
* @return a list of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
* or empty list if none are found
*/
public List<ShardRequestAndParams> getShardRequests(ZkStateReader zkStateReader, String collectionName, String shardId) {
DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
assert collection != null;
Slice slice = collection.getSlice(shardId);
assert slice != null;
for (Map.Entry<String, List<ShardRequestAndParams>> entry : requests.entrySet()) {
// multiple shard addresses may be present separated by '|'
List<String> list = StrUtils.splitSmart(entry.getKey(), '|');
for (Map.Entry<String, Replica> replica : slice.getReplicasMap().entrySet()) {
String coreUrl = new ZkCoreNodeProps(replica.getValue()).getCoreUrl();
if (list.contains(coreUrl)) {
return new ArrayList<>(entry.getValue());
}
}
}
return Collections.emptyList();
}
/**
* Retrieves all core admin requests distributed to nodes by Collection API commands
*
* @return a list of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
* or empty if none found
*/
public List<ShardRequestAndParams> getCoreAdminRequests() {
List<ShardRequestAndParams> results = new ArrayList<>();
Map<String, List<ShardRequestAndParams>> map = getAllRequests();
for (Map.Entry<String, List<ShardRequestAndParams>> entry : map.entrySet()) {
for (ShardRequestAndParams shardRequestAndParams : entry.getValue()) {
if (shardRequestAndParams.sreq.purpose == ShardRequest.PURPOSE_PRIVATE) {
results.add(shardRequestAndParams);
}
}
}
return results;
}
/**
* Retrieves all requests recorded by this collection as a Map of shard address (string url)
* to a list of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
*
* @return a {@link java.util.concurrent.ConcurrentHashMap} of url strings to {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams} objects
* or empty map if none have been recorded
*/
public Map<String, List<ShardRequestAndParams>> getAllRequests() {
return requests;
}
}
}