blob: d0cc6d709f4fa1ebc0f71e9ea07601eca128a64f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.lang.invoke.MethodHandles;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.ToleratedUpdateError.CmdType;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* Suppresses errors for individual add/delete commands within a single request.
* Instead of failing on the first error, at most <code>maxErrors</code> errors (or unlimited
* if <code>-1==maxErrors</code>) are logged and recorded the batch continues.
* The client will receive a <code>status==200</code> response, which includes a list of errors
* that were tolerated.
* </p>
* <p>
* If more then <code>maxErrors</code> occur, the first exception recorded will be re-thrown,
* Solr will respond with <code>status==5xx</code> or <code>status==4xx</code>
* (depending on the underlying exceptions) and it won't finish processing any more updates in the request.
* (ie: subsequent update commands in the request will not be processed even if they are valid).
* </p>
*
* <p>
* NOTE: In cloud based collections, this processor expects to <b>NOT</b> be used on {@link DistribPhase#FROMLEADER}
* requests (because any successes that occur locally on the leader are considered successes even if there is some
* subsequent error on a replica). {@link TolerantUpdateProcessorFactory} will short circut it away in those
* requests.
* </p>
*
* @see TolerantUpdateProcessorFactory
*/
public class TolerantUpdateProcessor extends UpdateRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* String to be used as document key for errors when a real uniqueKey can't be determined
*/
private static final String UNKNOWN_ID = "(unknown)";
/**
* Response Header
*/
private final NamedList<Object> header;
/**
* Number of errors this UpdateRequestProcessor will tolerate. If more then this occur,
* the original exception will be thrown, interrupting the processing of the document
* batch
*/
private final int maxErrors;
/** The uniqueKey field */
private SchemaField uniqueKeyField;
private final SolrQueryRequest req;
private ZkController zkController;
/**
* Known errors that occurred in this batch, in order encountered (may not be the same as the
* order the commands were originally executed in due to the async distributed updates).
*/
private final List<ToleratedUpdateError> knownErrors = new ArrayList<ToleratedUpdateError>();
// Kludge: Because deleteByQuery updates are forwarded to every leader, we can get identical
// errors reported by every leader for the same underlying problem.
//
// It would be nice if we could cleanly handle the unlikely (but possible) situation of an
// update stream that includes multiple identical DBQs, with identical failures, and
// to report each one once, for example...
// add: id#1
// dbq: foo:bar
// add: id#2
// add: id#3
// dbq: foo:bar
//
// ...but i can't figure out a way to accurately identify & return duplicate
// ToleratedUpdateErrors from duplicate identical underlying requests w/o erroneously returning identical
// ToleratedUpdateErrors for the *same* underlying request but from diff shards.
//
// So as a kludge, we keep track of them for deduping against identical remote failures
//
private Set<ToleratedUpdateError> knownDBQErrors = new HashSet<>();
private final FirstErrTracker firstErrTracker = new FirstErrTracker();
private final DistribPhase distribPhase;
public TolerantUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next, int maxErrors, DistribPhase distribPhase) {
super(next);
assert maxErrors >= -1;
header = rsp.getResponseHeader();
this.maxErrors = ToleratedUpdateError.getEffectiveMaxErrors(maxErrors);
this.req = req;
this.distribPhase = distribPhase;
assert ! DistribPhase.FROMLEADER.equals(distribPhase);
this.zkController = this.req.getCore().getCoreContainer().getZkController();
this.uniqueKeyField = this.req.getCore().getLatestSchema().getUniqueKeyField();
assert null != uniqueKeyField : "Factory didn't enforce uniqueKey field?";
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
BytesRef id = null;
try {
// force AddUpdateCommand to validate+cache the id before proceeding
id = cmd.getIndexedId();
super.processAdd(cmd);
} catch (Throwable t) {
firstErrTracker.caught(t);
knownErrors.add(new ToleratedUpdateError
(CmdType.ADD,
getPrintableId(id),
t.getMessage()));
if (knownErrors.size() > maxErrors) {
firstErrTracker.throwFirst();
}
}
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
try {
super.processDelete(cmd);
} catch (Throwable t) {
firstErrTracker.caught(t);
ToleratedUpdateError err = new ToleratedUpdateError(cmd.isDeleteById() ? CmdType.DELID : CmdType.DELQ,
cmd.isDeleteById() ? cmd.id : cmd.query,
t.getMessage());
knownErrors.add(err);
// NOTE: we're not using this to dedup before adding to knownErrors.
// if we're lucky enough to get an immediate local failure (ie: we're a leader, or some other processor
// failed) then recording the multiple failures is a good thing -- helps us with an accurate fail
// fast if we exceed maxErrors
if (CmdType.DELQ.equals(err.getType())) {
knownDBQErrors.add(err);
}
if (knownErrors.size() > maxErrors) {
firstErrTracker.throwFirst();
}
}
}
@Override
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
try {
super.processMergeIndexes(cmd);
} catch (Throwable t) {
// we're not tolerant of errors from this type of command, but we
// do need to track it so we can annotate it with any other errors we were already tolerant of
firstErrTracker.caught(t);
throw t;
}
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
try {
super.processCommit(cmd);
} catch (Throwable t) {
// we're not tolerant of errors from this type of command, but we
// do need to track it so we can annotate it with any other errors we were already tolerant of
firstErrTracker.caught(t);
throw t;
}
}
@Override
public void processRollback(RollbackUpdateCommand cmd) throws IOException {
try {
super.processRollback(cmd);
} catch (Throwable t) {
// we're not tolerant of errors from this type of command, but we
// do need to track it so we can annotate it with any other errors we were already tolerant of
firstErrTracker.caught(t);
throw t;
}
}
@Override
public void finish() throws IOException {
// even if processAdd threw an error, this.finish() is still called and we might have additional
// errors from other remote leaders that we need to check for from the finish method of downstream processors
// (like DUP)
try {
super.finish();
} catch (DistributedUpdateProcessor.DistributedUpdatesAsyncException duae) {
firstErrTracker.caught(duae);
// adjust our stats based on each of the distributed errors
for (Error error : duae.errors) {
// we can't trust the req info from the Error, because multiple original requests might have been
// lumped together
//
// instead we trust the metadata that the TolerantUpdateProcessor running on the remote node added
// to the exception when it failed.
if ( ! (error.e instanceof SolrException) ) {
log.error("async update exception is not SolrException, no metadata to process", error.e);
continue;
}
SolrException remoteErr = (SolrException) error.e;
NamedList<String> remoteErrMetadata = remoteErr.getMetadata();
if (null == remoteErrMetadata) {
log.warn("remote error has no metadata to aggregate: ", remoteErr);
continue;
}
for (int i = 0; i < remoteErrMetadata.size(); i++) {
ToleratedUpdateError err =
ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i),
remoteErrMetadata.getVal(i));
if (null == err) {
// some metadata unrelated to this update processor
continue;
}
if (CmdType.DELQ.equals(err.getType())) {
if (knownDBQErrors.contains(err)) {
// we've already seen this identical error, probably a dup from another shard
continue;
} else {
knownDBQErrors.add(err);
}
}
knownErrors.add(err);
}
}
}
header.add("errors", ToleratedUpdateError.formatForResponseHeader(knownErrors));
// include in response so client knows what effective value was (may have been server side config)
header.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxErrors));
// annotate any error that might be thrown (or was already thrown)
firstErrTracker.annotate(knownErrors);
// decide if we have hit a situation where we know an error needs to be thrown.
if ((DistribPhase.TOLEADER.equals(distribPhase) ? 0 : maxErrors) < knownErrors.size()) {
// NOTE: even if maxErrors wasn't exceeded, we need to throw an error when we have any errors if we're
// a leader that was forwarded to by another node so that the forwarding node knows we encountered some
// problems and can aggregate the results
firstErrTracker.throwFirst();
}
}
/**
* Returns the output of {@link org.apache.solr.schema.FieldType#indexedToReadable(BytesRef, CharsRefBuilder)}
* of the field type of the uniqueKey on the {@link BytesRef} passed as parameter.
* <code>ref</code> should be the indexed representation of the id -- if null
* (possibly because it's missing in the update) this method will return {@link #UNKNOWN_ID}
*/
private String getPrintableId(BytesRef ref) {
if (ref == null) {
return UNKNOWN_ID;
}
return uniqueKeyField.getType().indexedToReadable(ref, new CharsRefBuilder()).toString();
}
/**
* Simple helper class for "tracking" any exceptions encountered.
*
* Only remembers the "first" exception encountered, and wraps it in a SolrException if needed, so that
* it can later be annotated with the metadata our users expect and re-thrown.
*
* NOTE: NOT THREAD SAFE
*/
private static final class FirstErrTracker {
SolrException first = null;
boolean thrown = false;
public FirstErrTracker() {
/* NOOP */
}
/**
* Call this method immediately anytime an exception is caught from a down stream method --
* even if you are going to ignore it (for now). If you plan to rethrow the Exception, use
* {@link #throwFirst} instead.
*/
public void caught(Throwable t) {
assert null != t;
if (null == first) {
if (t instanceof SolrException) {
first = (SolrException)t;
} else {
first = new SolrException(ErrorCode.SERVER_ERROR, "Tolerantly Caught Exception: " + t.getMessage(), t);
}
}
}
/**
* Call this method in place of any situation where you would normally (re)throw an exception
* (already passed to the {@link #caught} method because maxErrors was exceeded
* is exceed.
*
* This method will keep a record that this update processor has already thrown the exception, and do
* nothing on future calls, so subsequent update processor methods can update the metadata but won't
* inadvertently re-throw this (or any other) cascading exception by mistake.
*/
public void throwFirst() throws SolrException {
assert null != first : "caught was never called?";
if (! thrown) {
thrown = true;
throw first;
}
}
/**
* Annotates the first exception (which may already have been thrown, or be thrown in the future) with
* the metadata from this update processor. For use in {@link TolerantUpdateProcessor#finish}
*/
public void annotate(List<ToleratedUpdateError> errors) {
if (null == first) {
return; // no exception to annotate
}
assert null != errors : "how do we have an exception to annotate w/o any errors?";
NamedList<String> firstErrMetadata = first.getMetadata();
if (null == firstErrMetadata) { // obnoxious
firstErrMetadata = new NamedList<String>();
first.setMetadata(firstErrMetadata);
} else {
// any existing metadata representing ToleratedUpdateErrors in this single exception needs removed
// so we can add *all* of the known ToleratedUpdateErrors (from this and other exceptions)
for (int i = 0; i < firstErrMetadata.size(); i++) {
if (null != ToleratedUpdateError.parseMetadataIfToleratedUpdateError
(firstErrMetadata.getName(i), firstErrMetadata.getVal(i))) {
firstErrMetadata.remove(i);
// NOTE: post decrementing index so we don't miss anything as we remove items
i--;
}
}
}
for (ToleratedUpdateError te : errors) {
firstErrMetadata.add(te.getMetadataKey(), te.getMetadataValue());
}
}
/** The first exception that was thrown (or may be thrown) whose metadata can be annotated. */
public SolrException getFirst() {
return first;
}
}
}