| package org.apache.solr.update; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CompletionService; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorCompletionService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.RejectedExecutionException; |
| |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.impl.HttpSolrServer; |
| import org.apache.solr.client.solrj.request.AbstractUpdateRequest; |
| import org.apache.solr.client.solrj.request.UpdateRequestExt; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrException.ErrorCode; |
| import org.apache.solr.common.cloud.ZkCoreNodeProps; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.core.Diagnostics; |
| import org.apache.solr.core.SolrCore; |
| import org.apache.solr.util.AdjustableSemaphore; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| public class SolrCmdDistributor { |
| private static final int MAX_RETRIES_ON_FORWARD = 15; |
| public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class); |
| |
| static AdjustableSemaphore semaphore = new AdjustableSemaphore(8); |
| |
| CompletionService<Request> completionService; |
| Set<Future<Request>> pending; |
| |
| int maxBufferedAddsPerServer = 10; |
| int maxBufferedDeletesPerServer = 10; |
| |
| private Response response = new Response(); |
| |
| private final Map<Node,List<AddRequest>> adds = new HashMap<Node,List<AddRequest>>(); |
| private final Map<Node,List<DeleteRequest>> deletes = new HashMap<Node,List<DeleteRequest>>(); |
| private UpdateShardHandler updateShardHandler; |
| |
| class AddRequest { |
| AddUpdateCommand cmd; |
| ModifiableSolrParams params; |
| } |
| |
| class DeleteRequest { |
| DeleteUpdateCommand cmd; |
| ModifiableSolrParams params; |
| } |
| |
| public static interface AbortCheck { |
| public boolean abortCheck(); |
| } |
| |
| public SolrCmdDistributor(int numHosts, UpdateShardHandler updateShardHandler) { |
| int maxPermits = Math.max(16, numHosts * 16); |
| // limits how many tasks can actually execute at once |
| if (maxPermits != semaphore.getMaxPermits()) { |
| semaphore.setMaxPermits(maxPermits); |
| } |
| |
| this.updateShardHandler = updateShardHandler; |
| completionService = new ExecutorCompletionService<Request>(updateShardHandler.getCmdDistribExecutor()); |
| pending = new HashSet<Future<Request>>(); |
| } |
| |
| public void finish() { |
| |
| flushAdds(1); |
| flushDeletes(1); |
| |
| checkResponses(true); |
| } |
| |
| public void distribDelete(DeleteUpdateCommand cmd, List<Node> urls, ModifiableSolrParams params) throws IOException { |
| checkResponses(false); |
| |
| if (cmd.isDeleteById()) { |
| doDelete(cmd, urls, params); |
| } else { |
| doDelete(cmd, urls, params); |
| } |
| } |
| |
| public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException { |
| checkResponses(false); |
| |
| // make sure any pending deletes are flushed |
| flushDeletes(1); |
| |
| // TODO: this is brittle |
| // need to make a clone since these commands may be reused |
| AddUpdateCommand clone = new AddUpdateCommand(null); |
| |
| clone.solrDoc = cmd.solrDoc; |
| clone.commitWithin = cmd.commitWithin; |
| clone.overwrite = cmd.overwrite; |
| clone.setVersion(cmd.getVersion()); |
| AddRequest addRequest = new AddRequest(); |
| addRequest.cmd = clone; |
| addRequest.params = params; |
| |
| for (Node node : nodes) { |
| List<AddRequest> alist = adds.get(node); |
| if (alist == null) { |
| alist = new ArrayList<AddRequest>(2); |
| adds.put(node, alist); |
| } |
| alist.add(addRequest); |
| } |
| |
| flushAdds(maxBufferedAddsPerServer); |
| } |
| |
| /** |
| * Synchronous (blocking) add to specified node. Any error returned from node is propagated. |
| */ |
| public void syncAdd(AddUpdateCommand cmd, Node node, ModifiableSolrParams params) throws IOException { |
| log.info("SYNCADD on {} : {}", node, cmd.getPrintableId()); |
| checkResponses(false); |
| // flush all pending deletes |
| flushDeletes(1); |
| // flush all pending adds |
| flushAdds(1); |
| // finish with the pending requests |
| checkResponses(false); |
| |
| UpdateRequestExt ureq = new UpdateRequestExt(); |
| ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); |
| ureq.setParams(params); |
| syncRequest(node, ureq); |
| } |
| |
| public void syncDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException { |
| log.info("SYNCDELETE on {} : ", nodes, cmd); |
| checkResponses(false); |
| // flush all pending adds |
| flushAdds(1); |
| // flush all pending deletes |
| flushDeletes(1); |
| // finish pending requests |
| checkResponses(false); |
| |
| DeleteUpdateCommand clonedCmd = clone(cmd); |
| DeleteRequest deleteRequest = new DeleteRequest(); |
| deleteRequest.cmd = clonedCmd; |
| deleteRequest.params = params; |
| |
| UpdateRequestExt ureq = new UpdateRequestExt(); |
| if (cmd.isDeleteById()) { |
| ureq.deleteById(cmd.getId(), cmd.getVersion()); |
| } else { |
| ureq.deleteByQuery(cmd.query); |
| } |
| ureq.setParams(params); |
| for (Node node : nodes) { |
| syncRequest(node, ureq); |
| } |
| } |
| |
| private void syncRequest(Node node, UpdateRequestExt ureq) { |
| Request sreq = new Request(); |
| sreq.node = node; |
| sreq.ureq = ureq; |
| |
| String url = node.getUrl(); |
| String fullUrl; |
| if (!url.startsWith("http://") && !url.startsWith("https://")) { |
| fullUrl = "http://" + url; |
| } else { |
| fullUrl = url; |
| } |
| |
| HttpSolrServer server = new HttpSolrServer(fullUrl, |
| updateShardHandler.getHttpClient()); |
| |
| try { |
| sreq.ursp = server.request(ureq); |
| } catch (Exception e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + sreq.node, sreq.exception); |
| } |
| } |
| |
| public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes, |
| ModifiableSolrParams params) throws IOException { |
| |
| // make sure we are ordered |
| flushAdds(1); |
| flushDeletes(1); |
| |
| |
| // Wait for all outstanding responses to make sure that a commit |
| // can't sneak in ahead of adds or deletes we already sent. |
| // We could do this on a per-server basis, but it's more complex |
| // and this solution will lead to commits happening closer together. |
| checkResponses(true); |
| |
| // currently, we dont try to piggy back on outstanding adds or deletes |
| |
| UpdateRequestExt ureq = new UpdateRequestExt(); |
| ureq.setParams(params); |
| |
| addCommit(ureq, cmd); |
| |
| log.info("Distrib commit to:" + nodes + " params:" + params); |
| |
| for (Node node : nodes) { |
| submit(ureq, node); |
| } |
| |
| // if the command wanted to block until everything was committed, |
| // then do that here. |
| |
| if (cmd.waitSearcher) { |
| checkResponses(true); |
| } |
| } |
| |
| private void doDelete(DeleteUpdateCommand cmd, List<Node> nodes, |
| ModifiableSolrParams params) { |
| |
| flushAdds(1); |
| |
| DeleteUpdateCommand clonedCmd = clone(cmd); |
| DeleteRequest deleteRequest = new DeleteRequest(); |
| deleteRequest.cmd = clonedCmd; |
| deleteRequest.params = params; |
| for (Node node : nodes) { |
| List<DeleteRequest> dlist = deletes.get(node); |
| |
| if (dlist == null) { |
| dlist = new ArrayList<DeleteRequest>(2); |
| deletes.put(node, dlist); |
| } |
| dlist.add(deleteRequest); |
| } |
| |
| flushDeletes(maxBufferedDeletesPerServer); |
| } |
| |
| void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) { |
| if (cmd == null) return; |
| ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE |
| : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes); |
| } |
| |
| boolean flushAdds(int limit) { |
| // check for pending deletes |
| |
| Set<Node> removeNodes = new HashSet<Node>(); |
| Set<Node> nodes = adds.keySet(); |
| |
| for (Node node : nodes) { |
| List<AddRequest> alist = adds.get(node); |
| if (alist == null || alist.size() < limit) continue; |
| |
| UpdateRequestExt ureq = new UpdateRequestExt(); |
| |
| ModifiableSolrParams combinedParams = new ModifiableSolrParams(); |
| |
| for (AddRequest aReq : alist) { |
| AddUpdateCommand cmd = aReq.cmd; |
| combinedParams.add(aReq.params); |
| |
| ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); |
| } |
| |
| if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams()); |
| ureq.getParams().add(combinedParams); |
| |
| removeNodes.add(node); |
| |
| submit(ureq, node); |
| } |
| |
| for (Node node : removeNodes) { |
| adds.remove(node); |
| } |
| |
| return true; |
| } |
| |
| boolean flushDeletes(int limit) { |
| // check for pending deletes |
| |
| Set<Node> removeNodes = new HashSet<Node>(); |
| Set<Node> nodes = deletes.keySet(); |
| for (Node node : nodes) { |
| List<DeleteRequest> dlist = deletes.get(node); |
| if (dlist == null || dlist.size() < limit) continue; |
| UpdateRequestExt ureq = new UpdateRequestExt(); |
| |
| ModifiableSolrParams combinedParams = new ModifiableSolrParams(); |
| |
| for (DeleteRequest dReq : dlist) { |
| DeleteUpdateCommand cmd = dReq.cmd; |
| combinedParams.add(dReq.params); |
| if (cmd.isDeleteById()) { |
| ureq.deleteById(cmd.getId(), cmd.getVersion()); |
| } else { |
| ureq.deleteByQuery(cmd.query); |
| } |
| |
| if (ureq.getParams() == null) ureq |
| .setParams(new ModifiableSolrParams()); |
| ureq.getParams().add(combinedParams); |
| } |
| |
| removeNodes.add(node); |
| submit(ureq, node); |
| } |
| |
| for (Node node : removeNodes) { |
| deletes.remove(node); |
| } |
| |
| return true; |
| } |
| |
| private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) { |
| DeleteUpdateCommand c = (DeleteUpdateCommand)cmd.clone(); |
| // TODO: shouldnt the clone do this? |
| c.setFlags(cmd.getFlags()); |
| c.setVersion(cmd.getVersion()); |
| return c; |
| } |
| |
| public static class Request { |
| public Node node; |
| UpdateRequestExt ureq; |
| NamedList<Object> ursp; |
| int rspCode; |
| public Exception exception; |
| int retries; |
| } |
| |
| void submit(UpdateRequestExt ureq, Node node) { |
| Request sreq = new Request(); |
| sreq.node = node; |
| sreq.ureq = ureq; |
| submit(sreq); |
| } |
| |
| public void submit(final Request sreq) { |
| |
| final String url = sreq.node.getUrl(); |
| |
| Callable<Request> task = new Callable<Request>() { |
| @Override |
| public Request call() throws Exception { |
| Request clonedRequest = null; |
| try { |
| clonedRequest = new Request(); |
| clonedRequest.node = sreq.node; |
| clonedRequest.ureq = sreq.ureq; |
| clonedRequest.retries = sreq.retries; |
| |
| String fullUrl; |
| if (!url.startsWith("http://") && !url.startsWith("https://")) { |
| fullUrl = "http://" + url; |
| } else { |
| fullUrl = url; |
| } |
| |
| HttpSolrServer server = new HttpSolrServer(fullUrl, |
| updateShardHandler.getHttpClient()); |
| |
| if (Thread.currentThread().isInterrupted()) { |
| clonedRequest.rspCode = 503; |
| clonedRequest.exception = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down."); |
| return clonedRequest; |
| } |
| |
| clonedRequest.ursp = server.request(clonedRequest.ureq); |
| |
| // currently no way to get the request body. |
| } catch (Exception e) { |
| clonedRequest.exception = e; |
| if (e instanceof SolrException) { |
| clonedRequest.rspCode = ((SolrException) e).code(); |
| } else { |
| clonedRequest.rspCode = -1; |
| } |
| } finally { |
| semaphore.release(); |
| } |
| return clonedRequest; |
| } |
| }; |
| try { |
| semaphore.acquire(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Update thread interrupted", e); |
| } |
| try { |
| pending.add(completionService.submit(task)); |
| } catch (RejectedExecutionException e) { |
| semaphore.release(); |
| throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down", e); |
| } |
| |
| } |
| |
| public static Diagnostics.Callable testing_errorHook; // called on error when forwarding request. Currently data=[this, Request] |
| |
| void checkResponses(boolean block) { |
| |
| while (pending != null && pending.size() > 0) { |
| try { |
| Future<Request> future = block ? completionService.take() |
| : completionService.poll(); |
| if (future == null) return; |
| pending.remove(future); |
| |
| try { |
| Request sreq = future.get(); |
| if (sreq.rspCode != 0) { |
| // error during request |
| |
| if (testing_errorHook != null) Diagnostics.call(testing_errorHook, this, sreq); |
| |
| // if there is a retry url, we want to retry... |
| boolean isRetry = sreq.node.checkRetry(); |
| boolean doRetry = false; |
| int rspCode = sreq.rspCode; |
| |
| // this can happen in certain situations such as shutdown |
| if (isRetry) { |
| if (rspCode == 404 || rspCode == 403 || rspCode == 503 |
| || rspCode == 500) { |
| doRetry = true; |
| } |
| |
| // if its an ioexception, lets try again |
| if (sreq.exception instanceof IOException) { |
| doRetry = true; |
| } else if (sreq.exception instanceof SolrServerException) { |
| if (((SolrServerException) sreq.exception).getRootCause() instanceof IOException) { |
| doRetry = true; |
| } |
| } |
| } |
| |
| if (isRetry && sreq.retries < MAX_RETRIES_ON_FORWARD && doRetry) { |
| sreq.retries++; |
| sreq.rspCode = 0; |
| sreq.exception = null; |
| SolrException.log(SolrCmdDistributor.log, "forwarding update to " + sreq.node.getUrl() + " failed - retrying ... "); |
| Thread.sleep(500); |
| submit(sreq); |
| } else { |
| Exception e = sreq.exception; |
| Error error = new Error(); |
| error.e = e; |
| error.node = sreq.node; |
| response.errors.add(error); |
| response.sreq = sreq; |
| SolrException.log(SolrCmdDistributor.log, "shard update error " |
| + sreq.node, sreq.exception); |
| } |
| } |
| |
| } catch (ExecutionException e) { |
| // shouldn't happen since we catch exceptions ourselves |
| SolrException.log(SolrCore.log, |
| "error sending update request to shard", e); |
| } |
| |
| } catch (InterruptedException e) { |
| throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, |
| "interrupted waiting for shard update response", e); |
| } |
| } |
| } |
| |
| public static class Response { |
| public Request sreq; |
| public List<Error> errors = new ArrayList<Error>(); |
| } |
| |
| public static class Error { |
| public Node node; |
| public Exception e; |
| } |
| |
| public Response getResponse() { |
| return response; |
| } |
| |
| public static abstract class Node { |
| public abstract String getUrl(); |
| public abstract boolean checkRetry(); |
| public abstract String getCoreName(); |
| public abstract String getBaseUrl(); |
| public abstract ZkCoreNodeProps getNodeProps(); |
| } |
| |
| public static class StdNode extends Node { |
| protected ZkCoreNodeProps nodeProps; |
| |
| public StdNode(ZkCoreNodeProps nodeProps) { |
| this.nodeProps = nodeProps; |
| } |
| |
| @Override |
| public String getUrl() { |
| return nodeProps.getCoreUrl(); |
| } |
| |
| @Override |
| public String toString() { |
| return this.getClass().getSimpleName() + ": " + nodeProps.getCoreUrl(); |
| } |
| |
| @Override |
| public boolean checkRetry() { |
| return false; |
| } |
| |
| @Override |
| public String getBaseUrl() { |
| return nodeProps.getBaseUrl(); |
| } |
| |
| @Override |
| public String getCoreName() { |
| return nodeProps.getCoreName(); |
| } |
| |
| @Override |
| public int hashCode() { |
| final int prime = 31; |
| int result = 1; |
| String baseUrl = nodeProps.getBaseUrl(); |
| String coreName = nodeProps.getCoreName(); |
| String url = nodeProps.getCoreUrl(); |
| result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode()); |
| result = prime * result + ((coreName == null) ? 0 : coreName.hashCode()); |
| result = prime * result + ((url == null) ? 0 : url.hashCode()); |
| return result; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) return true; |
| if (obj == null) return false; |
| if (getClass() != obj.getClass()) return false; |
| StdNode other = (StdNode) obj; |
| String baseUrl = nodeProps.getBaseUrl(); |
| String coreName = nodeProps.getCoreName(); |
| String url = nodeProps.getCoreUrl(); |
| if (baseUrl == null) { |
| if (other.nodeProps.getBaseUrl() != null) return false; |
| } else if (!baseUrl.equals(other.nodeProps.getBaseUrl())) return false; |
| if (coreName == null) { |
| if (other.nodeProps.getCoreName() != null) return false; |
| } else if (!coreName.equals(other.nodeProps.getCoreName())) return false; |
| if (url == null) { |
| if (other.nodeProps.getCoreUrl() != null) return false; |
| } else if (!url.equals(other.nodeProps.getCoreUrl())) return false; |
| return true; |
| } |
| |
| @Override |
| public ZkCoreNodeProps getNodeProps() { |
| return nodeProps; |
| } |
| } |
| |
| } |
| |
| |