blob: edc797ee2eee308b2c2a98b77d62b84448927bf4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;
import static org.apache.solr.common.params.CommonParams.DISTRIB;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.search.SolrIndexSearcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class IterativeMergeStrategy implements MergeStrategy {
protected volatile ExecutorService executorService;
protected volatile CloseableHttpClient httpClient;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public void merge(ResponseBuilder rb, ShardRequest sreq) {
rb._responseDocs = new SolrDocumentList(); // Null pointers will occur otherwise.
rb.onePassDistributedQuery = true; // Turn off the second pass distributed.
executorService = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("IterativeMergeStrategy"));
httpClient = getHttpClient();
try {
process(rb, sreq);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
HttpClientUtil.close(httpClient);
executorService.shutdownNow();
}
}
public boolean mergesIds() {
return true;
}
public int getCost() {
return 0;
}
public boolean handlesMergeFields() {
return false;
}
public void handleMergeFields(ResponseBuilder rb, SolrIndexSearcher searcher) {
}
public class CallBack implements Callable<CallBack> {
private HttpSolrClient solrClient;
private QueryRequest req;
private QueryResponse response;
private ShardResponse originalShardResponse;
public CallBack(ShardResponse originalShardResponse, QueryRequest req) {
this.solrClient = new Builder(originalShardResponse.getShardAddress())
.withHttpClient(httpClient)
.build();
this.req = req;
this.originalShardResponse = originalShardResponse;
req.setMethod(SolrRequest.METHOD.POST);
ModifiableSolrParams params = (ModifiableSolrParams)req.getParams();
params.add(DISTRIB, "false");
}
public QueryResponse getResponse() {
return this.response;
}
public ShardResponse getOriginalShardResponse() {
return this.originalShardResponse;
}
public CallBack call() throws Exception{
this.response = req.process(solrClient);
return this;
}
}
public List<Future<CallBack>> callBack(List<ShardResponse> responses, QueryRequest req) {
@SuppressWarnings({"unchecked", "rawtypes"})
List<Future<CallBack>> futures = new ArrayList();
for(ShardResponse response : responses) {
futures.add(this.executorService.submit(new CallBack(response, req)));
}
return futures;
}
public Future<CallBack> callBack(ShardResponse response, QueryRequest req) {
return this.executorService.submit(new CallBack(response, req));
}
protected abstract void process(ResponseBuilder rb, ShardRequest sreq) throws Exception;
private CloseableHttpClient getHttpClient() {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
CloseableHttpClient httpClient = HttpClientUtil.createClient(params);
return httpClient;
}
}