blob: 6fdb8e05f67e13273cd8b74145b8438c6271ef3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.SolrException.ErrorCode.CONFLICT;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor {
private static final String[] EMPTY_STR_ARR = new String[0];
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String[] versionFieldNames;
private final SchemaField[] userVersionFields;
private final SchemaField solrVersionField;
private final boolean ignoreOldUpdates;
private final boolean supportMissingVersionOnOldDocs;
private final String[] deleteVersionParamNames;
private final SolrCore core;
private final NamedList<Object> tombstoneConfig;
private final DistributedUpdateProcessor distribProc; // the distributed update processor following us
private final DistributedUpdateProcessor.DistribPhase phase;
private final boolean useFieldCache;
private long oldSolrVersion; // current _version_ of the doc in the index/update log
public DocBasedVersionConstraintsProcessor(List<String> versionFields,
boolean ignoreOldUpdates,
List<String> deleteVersionParamNames,
boolean supportMissingVersionOnOldDocs,
boolean useFieldCache,
NamedList<Object> tombstoneConfig,
SolrQueryRequest req,
UpdateRequestProcessor next ) {
super(next);
this.ignoreOldUpdates = ignoreOldUpdates;
this.deleteVersionParamNames = deleteVersionParamNames.toArray(EMPTY_STR_ARR);
this.supportMissingVersionOnOldDocs = supportMissingVersionOnOldDocs;
this.core = req.getCore();
this.versionFieldNames = versionFields.toArray(EMPTY_STR_ARR);
IndexSchema schema = core.getLatestSchema();
userVersionFields = new SchemaField[versionFieldNames.length];
for (int i = 0; i < versionFieldNames.length; i++) {
userVersionFields[i] = schema.getField(versionFieldNames[i]);
}
this.solrVersionField = schema.getField(CommonParams.VERSION_FIELD);
this.useFieldCache = useFieldCache;
this.distribProc = getDistributedUpdateProcessor(next);
this.phase = DistributedUpdateProcessor.DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
this.tombstoneConfig = tombstoneConfig;
}
private static DistributedUpdateProcessor getDistributedUpdateProcessor(UpdateRequestProcessor next) {
for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
if (proc instanceof DistributedUpdateProcessor) {
return (DistributedUpdateProcessor)proc;
}
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "DistributedUpdateProcessor must follow DocBasedVersionConstraintsProcessor");
}
/**
* Inspects a raw field value (which may come from a doc in the index, or a
* doc in the UpdateLog that still has String values, or a String sent by
* the user as a param) and if it is a String, asks the versionField FieldType
* to convert it to an Object suitable for comparison.
*/
private static Object convertFieldValueUsingType(final Object rawValue, SchemaField field) {
if (rawValue instanceof CharSequence) {
// in theory, the FieldType might still be CharSequence based,
// but in that case trust it to do an identity conversion...
FieldType fieldType = field.getType();
BytesRefBuilder term = new BytesRefBuilder();
fieldType.readableToIndexed((CharSequence)rawValue, term);
return fieldType.toObject(field, term.get());
}
// else...
return rawValue;
}
private Object[] convertFieldValuesUsingType(Object[] rawValues) {
Object[] returnArr = new Object[rawValues.length];
for (int i = 0; i < returnArr.length; i++) {
returnArr[i] = convertFieldValueUsingType(rawValues[i], userVersionFields[i]);
}
return returnArr;
}
/**
* Returns true if the specified new version values are greater the the ones
* already known to exist for the document, or if the document does not already
* exist.
* Returns false if the specified new versions are not high enough but the
* processor has been configured with ignoreOldUpdates=true
* Throws a SolrException if the version is not high enough and
* ignoreOldUpdates=false
*/
private boolean isVersionNewEnough(BytesRef indexedDocId,
Object[] newUserVersions) throws IOException {
assert null != indexedDocId;
assert null != newUserVersions;
newUserVersions = convertFieldValuesUsingType(newUserVersions);
final DocFoundAndOldUserAndSolrVersions docFoundAndOldUserVersions;
if (useFieldCache) {
docFoundAndOldUserVersions = getOldUserVersionsFromFieldCache(indexedDocId);
} else {
docFoundAndOldUserVersions = getOldUserVersionsFromStored(indexedDocId);
}
oldSolrVersion = docFoundAndOldUserVersions.oldSolrVersion;
if (!docFoundAndOldUserVersions.found) {
return true;
}
final Object[] oldUserVersions = docFoundAndOldUserVersions.oldUserVersions;
validateUserVersions(oldUserVersions, versionFieldNames, "Doc exists in index, but has null versionField: ");
return versionInUpdateIsAcceptable(newUserVersions, oldUserVersions);
}
private void validateUserVersions(Object[] userVersions, String[] fieldNames, String errorMessage) {
assert userVersions.length == fieldNames.length;
for (int i = 0; i < userVersions.length; i++) {
Object userVersion = userVersions[i];
if (supportMissingVersionOnOldDocs && null == userVersion) {
userVersions[i] = (Comparable<Object>) o -> -1;
} else if (null == userVersion) {
// could happen if they turn this feature on after building an index
// w/o the versionField, or if validating a new doc, not present.
throw new SolrException(SERVER_ERROR, errorMessage + fieldNames[i]);
}
}
}
private DocFoundAndOldUserAndSolrVersions getOldUserVersionsFromFieldCache(BytesRef indexedDocId) {
SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId, null, null, true);
if (oldDoc == RealTimeGetComponent.DELETED) {
return DocFoundAndOldUserAndSolrVersions.NOT_FOUND;
}
if (oldDoc == null) {
// need to look up in index now...
RefCounted<SolrIndexSearcher> newestSearcher = core.getRealtimeSearcher();
try {
SolrIndexSearcher searcher = newestSearcher.get();
long lookup = searcher.lookupId(indexedDocId);
if (lookup < 0) {
// doc not in index either...
return DocFoundAndOldUserAndSolrVersions.NOT_FOUND;
}
final LeafReaderContext segmentContext = searcher.getTopReaderContext().leaves().get((int)(lookup>>32));
final int docIdInSegment = (int)lookup;
long oldSolrVersion = getFunctionValues(segmentContext, solrVersionField, searcher).longVal(docIdInSegment);
Object[] oldUserVersions = getObjectValues(segmentContext, userVersionFields, searcher, docIdInSegment);
return new DocFoundAndOldUserAndSolrVersions(oldUserVersions, oldSolrVersion);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
} finally {
if (newestSearcher != null) { //TODO can this ever be null?
newestSearcher.decref();
}
}
} else {
return getUserVersionAndSolrVersionFromDocument(oldDoc);
}
}
private DocFoundAndOldUserAndSolrVersions getOldUserVersionsFromStored(BytesRef indexedDocId) throws IOException {
// stored fields only...
SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId, RealTimeGetComponent.Resolution.DOC);
if (null == oldDoc) {
return DocFoundAndOldUserAndSolrVersions.NOT_FOUND;
} else {
return getUserVersionAndSolrVersionFromDocument(oldDoc);
}
}
private static final class DocFoundAndOldUserAndSolrVersions {
private static final DocFoundAndOldUserAndSolrVersions NOT_FOUND = new DocFoundAndOldUserAndSolrVersions();
private final boolean found;
private final Object[] oldUserVersions;
private final long oldSolrVersion;
private DocFoundAndOldUserAndSolrVersions() {
this.found = false;
this.oldSolrVersion = -1;
this.oldUserVersions = null;
}
private DocFoundAndOldUserAndSolrVersions(Object[] oldUserVersions, long oldSolrVersion) {
this.found = true;
this.oldUserVersions = oldUserVersions;
this.oldSolrVersion = oldSolrVersion;
}
}
private DocFoundAndOldUserAndSolrVersions getUserVersionAndSolrVersionFromDocument(SolrInputDocument oldDoc) {
Object[] oldUserVersions = getUserVersionsFromDocument(oldDoc);
Object o = oldDoc.getFieldValue(solrVersionField.getName());
if (o == null) {
throw new SolrException(SERVER_ERROR, "No _version_ for document " + oldDoc);
}
long solrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
return new DocFoundAndOldUserAndSolrVersions(oldUserVersions, solrVersion);
}
private Object[] getUserVersionsFromDocument(SolrInputDocument doc) {
Object[] versions = new Object[versionFieldNames.length];
for (int i = 0; i < versionFieldNames.length; i++) {
String fieldName = versionFieldNames[i];
SchemaField schemaField = userVersionFields[i];
Object userVersion = doc.getFieldValue(fieldName);
// Make the FieldType resolve any conversion we need.
userVersion = convertFieldValueUsingType(userVersion, schemaField);
versions[i] = userVersion;
}
return versions;
}
/**
* Returns whether or not the versions in the command are acceptable to be indexed.
* If the instance is set to ignoreOldUpdates==false, it will throw a SolrException
* with CONFLICT in the event the version is not acceptable rather than return false.
*
* @param newUserVersions New versions in update request
* @param oldUserVersions Old versions currently in solr index
* @return True if acceptable, false if not (or will throw exception)
*/
protected boolean versionInUpdateIsAcceptable(Object[] newUserVersions,
Object[] oldUserVersions) {
for (int i = 0; i < oldUserVersions.length; i++) {
Object oldUserVersion = oldUserVersions[i];
Object newUserVersion = newUserVersions[i];
if (!(oldUserVersion instanceof Comparable && newUserVersion instanceof Comparable)) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass() + " vs " + newUserVersion.getClass());
}
try {
if (newUpdateComparePasses((Comparable) newUserVersion, (Comparable) oldUserVersion, versionFieldNames[i])) {
return true;
}
} catch (ClassCastException e) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass() + " vs " + newUserVersion.getClass() +
": " + e.getMessage(), e);
}
}
if (ignoreOldUpdates) {
if (log.isDebugEnabled()) {
log.debug("Dropping update since user version is not high enough: {}; old user version={}",
Arrays.toString(newUserVersions), Arrays.toString(oldUserVersions));
}
return false;
} else {
throw new SolrException(CONFLICT,
"user version is not high enough: " + Arrays.toString(newUserVersions));
}
}
/**
* Given two comparable user versions, returns whether the new version is acceptable
* to replace the old version.
* @param newUserVersion User-specified version on the new version of the document
* @param oldUserVersion User-specified version on the old version of the document
* @param userVersionFieldName Field name of the user versions being compared
* @return True if acceptable, false if not.
*/
@SuppressWarnings({"unchecked"})
protected boolean newUpdateComparePasses(@SuppressWarnings({"rawtypes"})Comparable newUserVersion,
@SuppressWarnings({"rawtypes"})Comparable oldUserVersion, String userVersionFieldName) {
return oldUserVersion.compareTo(newUserVersion) < 0;
}
private static Object[] getObjectValues(LeafReaderContext segmentContext,
SchemaField[] fields,
SolrIndexSearcher searcher,
int docIdInSegment) throws IOException {
FunctionValues[] functionValues = getManyFunctionValues(segmentContext, fields, searcher);
Object[] objectValues = new Object[functionValues.length];
for (int i = 0; i < functionValues.length; i++) {
objectValues[i] = functionValues[i].objectVal(docIdInSegment);
}
return objectValues;
}
private static FunctionValues[] getManyFunctionValues(LeafReaderContext segmentContext,
SchemaField[] fields,
SolrIndexSearcher searcher) throws IOException {
FunctionValues[] values = new FunctionValues[fields.length];
for (int i = 0; i < fields.length; i++) {
values[i] = getFunctionValues(segmentContext, fields[i], searcher);
}
return values;
}
@SuppressWarnings({"unchecked"})
private static FunctionValues getFunctionValues(LeafReaderContext segmentContext,
SchemaField field,
SolrIndexSearcher searcher) throws IOException {
ValueSource vs = field.getType().getValueSource(field, null);
@SuppressWarnings({"rawtypes"})
Map context = ValueSource.newContext(searcher);
vs.createWeight(context, searcher);
return vs.getValues(context, segmentContext);
}
private boolean isNotLeader(UpdateCommand cmd) {
if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
return true;
}
if (phase == DistributedUpdateProcessor.DistribPhase.FROMLEADER) {
return true;
}
// if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check.
distribProc.setupRequest(cmd);
return !distribProc.isLeader();
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
if (isNotLeader(cmd)) {
super.processAdd(cmd);
return;
}
final SolrInputDocument newDoc = cmd.getSolrInputDocument();
Object[] newVersions = getUserVersionsFromDocument(newDoc);
validateUserVersions(newVersions, versionFieldNames, "Doc does not have versionField: ");
for (int i=0; ;i++) {
logOverlyFailedRetries(i, cmd);
if (!isVersionNewEnough(cmd.getIndexedId(), newVersions)) {
// drop older update
return;
}
try {
cmd.setVersion(oldSolrVersion); // use optimistic concurrency to ensure that the doc has not changed in the meantime
super.processAdd(cmd);
return;
} catch (SolrException e) {
if (e.code() == 409) {
continue; // if a version conflict, retry
}
throw e; // rethrow
}
}
}
private static void logOverlyFailedRetries(int i, UpdateCommand cmd) {
// Log a warning every 256 retries.... even a few retries should normally be very unusual.
if ((i&0xff) == 0xff) {
log.warn("Unusual number of optimistic concurrency retries: retries={} cmd={}", i, cmd);
}
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if (deleteVersionParamNames.length == 0) {
// not suppose to look at deletes at all
super.processDelete(cmd);
return;
}
if ( ! cmd.isDeleteById() ) {
// nothing to do
super.processDelete(cmd);
return;
}
String[] deleteParamValues = getDeleteParamValuesFromRequest(cmd);
validateDeleteParamValues(deleteParamValues);
if (isNotLeader(cmd)) {
// transform delete to add earlier rather than later
SolrInputDocument newDoc = createTombstoneDocument(core.getLatestSchema(), cmd.getId(), versionFieldNames, deleteParamValues, this.tombstoneConfig);
AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
newCmd.solrDoc = newDoc;
newCmd.commitWithin = cmd.commitWithin;
super.processAdd(newCmd);
return;
}
for (int i=0; ;i++) {
logOverlyFailedRetries(i, cmd);
if (!isVersionNewEnough(cmd.getIndexedId(), deleteParamValues)) {
// drop this older update
return;
}
// :TODO: should this logic be split and driven by two params?
// - deleteVersionParam to do a version check
// - some new boolean param to determine if a stub document gets added in place?
try {
// drop the delete, and instead propagate an AddDoc that
// replaces the doc with a new "empty" one that records the deleted version
SolrInputDocument newDoc = createTombstoneDocument(core.getLatestSchema(), cmd.getId(), versionFieldNames, deleteParamValues, this.tombstoneConfig);
AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
newCmd.solrDoc = newDoc;
newCmd.commitWithin = cmd.commitWithin;
newCmd.setVersion(oldSolrVersion); // use optimistic concurrency to ensure that the doc has not changed in the meantime
super.processAdd(newCmd);
return;
} catch (SolrException e) {
if (e.code() == 409) {
continue; // if a version conflict, retry
}
throw e; // rethrow
}
}
}
/**
* Creates a tombstone document, that will be used to update a document that's being deleted by ID. The returned document will contain:
* <ul>
* <li>uniqueKey</li>
* <li>versions (All the fields configured as in the {@code versionField} parameter)</li>
* <li>All the fields set in the {@code tombstoneConfig} parameter</li>
* </ul>
*
* Note that if the schema contains required fields (other than version fields or uniqueKey), they should be populated by the {@code tombstoneConfig},
* otherwise this tombstone will fail when indexing (and consequently the delete will fail). Alternatively, required fields must be populated by some
* other means (like {@code DocBasedVersionConstraintsProcessorFactory} or similar)
*/
protected SolrInputDocument createTombstoneDocument(IndexSchema schema, String id, String[] versionFieldNames, String[] deleteParamValues, NamedList<Object> tombstoneConfig) {
final SolrInputDocument newDoc = new SolrInputDocument();
if (tombstoneConfig != null) {
tombstoneConfig.forEach((k,v) -> newDoc.addField(k, v));
}
newDoc.setField(schema.getUniqueKeyField().getName(), id);
setDeleteParamValues(newDoc, versionFieldNames, deleteParamValues);
return newDoc;
}
private String[] getDeleteParamValuesFromRequest(DeleteUpdateCommand cmd) {
SolrParams params = cmd.getReq().getParams();
String[] returnArr = new String[deleteVersionParamNames.length];
for (int i = 0; i < deleteVersionParamNames.length; i++) {
String deleteVersionParamName = deleteVersionParamNames[i];
String deleteParamValue = params.get(deleteVersionParamName);
returnArr[i] = deleteParamValue;
}
return returnArr;
}
private void validateDeleteParamValues(String[] values) {
for (int i = 0; i < values.length; i++) {
String deleteParamValue = values[i];
if (null == deleteParamValue) {
String deleteVersionParamName = deleteVersionParamNames[i];
throw new SolrException(BAD_REQUEST,
"Delete by ID must specify doc version param: " +
deleteVersionParamName);
}
}
}
private static void setDeleteParamValues(SolrInputDocument doc, String[] versionFieldNames, String[] values) {
for (int i = 0; i < values.length; i++) {
String versionFieldName = versionFieldNames[i];
String value = values[i];
doc.setField(versionFieldName, value);
}
}
}