| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.update; |
| |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.nio.charset.Charset; |
| import java.nio.file.Files; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Deque; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.ListIterator; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorCompletionService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import com.codahale.metrics.Gauge; |
| import com.codahale.metrics.Meter; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.lucene.util.BytesRef; |
| import org.apache.solr.common.SolrDocumentBase; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrException.ErrorCode; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.params.SolrParams; |
| import org.apache.solr.common.util.ExecutorUtil; |
| import org.apache.solr.common.util.IOUtils; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.core.PluginInfo; |
| import org.apache.solr.core.SolrCore; |
| import org.apache.solr.core.SolrInfoBean; |
| import org.apache.solr.metrics.SolrMetricManager; |
| import org.apache.solr.metrics.SolrMetricProducer; |
| import org.apache.solr.request.LocalSolrQueryRequest; |
| import org.apache.solr.request.SolrQueryRequest; |
| import org.apache.solr.request.SolrRequestInfo; |
| import org.apache.solr.response.SolrQueryResponse; |
| import org.apache.solr.schema.IndexSchema; |
| import org.apache.solr.search.SolrIndexSearcher; |
| import org.apache.solr.update.processor.DistributedUpdateProcessor; |
| import org.apache.solr.update.processor.UpdateRequestProcessor; |
| import org.apache.solr.update.processor.UpdateRequestProcessorChain; |
| import org.apache.solr.common.util.SolrNamedThreadFactory; |
| import org.apache.solr.util.LongSet; |
| import org.apache.solr.util.OrderedExecutor; |
| import org.apache.solr.util.RTimer; |
| import org.apache.solr.util.RefCounted; |
| import org.apache.solr.util.TestInjection; |
| import org.apache.solr.util.TimeOut; |
| import org.apache.solr.util.plugin.PluginInfoInitialized; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER; |
| import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; |
| |
| |
| /** |
| * This holds references to the transaction logs. It also keeps a map of unique key to location in log |
| * (along with the update's version). This map is only cleared on soft or hard commit |
| * |
| * @lucene.experimental |
| */ |
| public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { |
| private static final long STATUS_TIME = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS); |
| public static String LOG_FILENAME_PATTERN = "%s.%019d"; |
| public static String TLOG_NAME="tlog"; |
| public static String BUFFER_TLOG_NAME="buffer.tlog"; |
| |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| private boolean debug = log.isDebugEnabled(); |
| private boolean trace = log.isTraceEnabled(); |
| private boolean usableForChildDocs; |
| |
| // TODO: hack |
| public FileSystem getFs() { |
| return null; |
| } |
| |
| public enum SyncLevel { NONE, FLUSH, FSYNC; |
| public static SyncLevel getSyncLevel(String level){ |
| if (level == null) { |
| return SyncLevel.FLUSH; |
| } |
| try{ |
| return SyncLevel.valueOf(level.toUpperCase(Locale.ROOT)); |
| } catch(Exception ex){ |
| log.warn("There was an error reading the SyncLevel - default to {}", SyncLevel.FLUSH, ex); |
| return SyncLevel.FLUSH; |
| } |
| } |
| } |
| |
| // NOTE: when adding new states make sure to keep existing numbers, because external metrics |
| // monitoring may depend on these values being stable. |
| public enum State { REPLAYING(0), BUFFERING(1), APPLYING_BUFFERED(2), ACTIVE(3); |
| private final int value; |
| |
| State(final int value) { |
| this.value = value; |
| } |
| |
| public int getValue() { |
| return value; |
| } |
| } |
| |
| public static final int ADD = 0x01; |
| public static final int DELETE = 0x02; |
| public static final int DELETE_BY_QUERY = 0x03; |
| public static final int COMMIT = 0x04; |
| public static final int UPDATE_INPLACE = 0x08; |
| // For backward-compatibility, we should delete this field in 9.0 |
| public static final int OPERATION_MASK = 0x0f; // mask off flags to get the operation |
| |
| /** |
| * The index of the flags value in an entry from the transaction log. |
| */ |
| public static final int FLAGS_IDX = 0; |
| |
| /** |
| * The index of the _version_ value in an entry from the transaction log. |
| */ |
| public static final int VERSION_IDX = 1; |
| |
| /** |
| * The index of the previous pointer in an entry from the transaction log. |
| * This is only relevant if flags (indexed at FLAGS_IDX) includes UPDATE_INPLACE. |
| */ |
| public static final int PREV_POINTER_IDX = 2; |
| |
| /** |
| * The index of the previous version in an entry from the transaction log. |
| * This is only relevant if flags (indexed at FLAGS_IDX) includes UPDATE_INPLACE. |
| */ |
| public static final int PREV_VERSION_IDX = 3; |
| |
| public static class RecoveryInfo { |
| public long positionOfStart; |
| |
| public int adds; |
| public int deletes; |
| public int deleteByQuery; |
| public int errors; |
| |
| public boolean failed; |
| |
| @Override |
| public String toString() { |
| return "RecoveryInfo{adds="+adds+" deletes="+deletes+ " deleteByQuery="+deleteByQuery+" errors="+errors + " positionOfStart="+positionOfStart+"}"; |
| } |
| } |
| |
| long id = -1; |
| protected volatile State state = State.ACTIVE; |
| |
| protected TransactionLog bufferTlog; |
| protected TransactionLog tlog; |
| protected TransactionLog prevTlog; |
| protected TransactionLog prevTlogOnPrecommit; |
| protected final Deque<TransactionLog> logs = new LinkedList<>(); // list of recent logs, newest first |
| protected LinkedList<TransactionLog> newestLogsOnStartup = new LinkedList<>(); |
| protected int numOldRecords; // number of records in the recent logs |
| |
| protected Map<BytesRef,LogPtr> map = new HashMap<>(); |
| protected Map<BytesRef,LogPtr> prevMap; // used while committing/reopening is happening |
| protected Map<BytesRef,LogPtr> prevMap2; // used while committing/reopening is happening |
| protected TransactionLog prevMapLog; // the transaction log used to look up entries found in prevMap |
| protected TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap2 |
| |
| protected final int numDeletesToKeep = 1000; |
| protected final int numDeletesByQueryToKeep = 100; |
| protected int numRecordsToKeep; |
| protected int maxNumLogsToKeep; |
| protected int numVersionBuckets; // This should only be used to initialize VersionInfo... the actual number of buckets may be rounded up to a power of two. |
| protected Long maxVersionFromIndex = null; |
| protected boolean existOldBufferLog = false; |
| |
| // keep track of deletes only... this is not updated on an add |
| protected LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) { |
| @Override |
| protected boolean removeEldestEntry(@SuppressWarnings({"rawtypes"})Map.Entry eldest) { |
| return size() > numDeletesToKeep; |
| } |
| }; |
| |
| /** |
| * Holds the query and the version for a DeleteByQuery command |
| */ |
| public static class DBQ { |
| public String q; // the query string |
| public long version; // positive version of the DBQ |
| |
| @Override |
| public String toString() { |
| return "DBQ{version=" + version + ",q="+q+"}"; |
| } |
| } |
| |
| protected LinkedList<DBQ> deleteByQueries = new LinkedList<>(); |
| |
| protected String[] tlogFiles; |
| protected File tlogDir; |
| protected Collection<String> globalStrings; |
| |
| protected String dataDir; |
| protected String lastDataDir; |
| |
| protected VersionInfo versionInfo; |
| |
| protected SyncLevel defaultSyncLevel = SyncLevel.FLUSH; |
| |
| volatile UpdateHandler uhandler; // a core reload can change this reference! |
| protected volatile boolean cancelApplyBufferUpdate; |
| List<Long> startingVersions; |
| |
| // metrics |
| protected Gauge<Integer> bufferedOpsGauge; |
| protected Meter applyingBufferedOpsMeter; |
| protected Meter replayOpsMeter; |
| protected Meter copyOverOldUpdatesMeter; |
| protected SolrMetricManager metricManager; |
| protected String registryName; |
| |
| public static class LogPtr { |
| final long pointer; |
| final long version; |
| final long previousPointer; // used for entries that are in-place updates and need a pointer to a previous update command |
| |
| /** |
| * Creates an object that contains the position and version of an update. In this constructor, |
| * the effective value of the previousPointer is -1. |
| * |
| * @param pointer Position in the transaction log of an update |
| * @param version Version of the update at the given position |
| */ |
| public LogPtr(long pointer, long version) { |
| this(pointer, version, -1); |
| } |
| |
| /** |
| * |
| * @param pointer Position in the transaction log of an update |
| * @param version Version of the update at the given position |
| * @param previousPointer Position, in the transaction log, of an update on which the current update depends |
| */ |
| public LogPtr(long pointer, long version, long previousPointer) { |
| this.pointer = pointer; |
| this.version = version; |
| this.previousPointer = previousPointer; |
| } |
| |
| @Override |
| public String toString() { |
| return "LogPtr(" + pointer + ")"; |
| } |
| } |
| |
| public long getTotalLogsSize() { |
| long size = 0; |
| synchronized (this) { |
| for (TransactionLog log : logs) { |
| size += log.getLogSize(); |
| } |
| } |
| return size; |
| } |
| |
| /** |
| * @return the current transaction log's size (based on its output stream) |
| */ |
| public synchronized long getCurrentLogSizeFromStream() { |
| return tlog == null ? 0 : tlog.getLogSizeFromStream(); |
| } |
| |
| public long getTotalLogsNumber() { |
| synchronized (this) { |
| return logs.size(); |
| } |
| } |
| |
| public VersionInfo getVersionInfo() { |
| return versionInfo; |
| } |
| |
| public int getNumRecordsToKeep() { |
| return numRecordsToKeep; |
| } |
| |
| public int getMaxNumLogsToKeep() { |
| return maxNumLogsToKeep; |
| } |
| |
| public int getNumVersionBuckets() { |
| return numVersionBuckets; |
| } |
| |
| protected static int objToInt(Object obj, int def) { |
| if (obj != null) { |
| return Integer.parseInt(obj.toString()); |
| } |
| else return def; |
| } |
| |
| @Override |
| public void init(PluginInfo info) { |
| dataDir = (String)info.initArgs.get("dir"); |
| defaultSyncLevel = SyncLevel.getSyncLevel((String)info.initArgs.get("syncLevel")); |
| |
| numRecordsToKeep = objToInt(info.initArgs.get("numRecordsToKeep"), 100); |
| maxNumLogsToKeep = objToInt(info.initArgs.get("maxNumLogsToKeep"), 10); |
| numVersionBuckets = objToInt(info.initArgs.get("numVersionBuckets"), 65536); |
| if (numVersionBuckets <= 0) |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, |
| "Number of version buckets must be greater than 0!"); |
| |
| log.info("Initializing UpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={} numVersionBuckets={}", |
| dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep, numVersionBuckets); |
| } |
| |
| /* Note, when this is called, uhandler is not completely constructed. |
| * This must be called when a new log is created, or |
| * for an existing log whenever the core or update handler changes. |
| */ |
| public void init(UpdateHandler uhandler, SolrCore core) { |
| dataDir = core.getUlogDir(); |
| |
| this.uhandler = uhandler; |
| |
| usableForChildDocs = core.getLatestSchema().isUsableForChildDocs(); |
| |
| if (dataDir.equals(lastDataDir)) { |
| versionInfo.reload(); |
| core.getCoreMetricManager().registerMetricProducer(SolrInfoBean.Category.TLOG.toString(), this); |
| |
| if (debug) { |
| log.debug("UpdateHandler init: tlogDir={}, next id={} this is a reopen...nothing else to do", tlogDir, id); |
| } |
| return; |
| } |
| lastDataDir = dataDir; |
| tlogDir = new File(dataDir, TLOG_NAME); |
| tlogDir.mkdirs(); |
| tlogFiles = getLogList(tlogDir); |
| id = getLastLogId() + 1; // add 1 since we will create a new log for the next update |
| |
| if (debug) { |
| log.debug("UpdateHandler init: tlogDir={}, existing tlogs={}, next id={}", tlogDir, Arrays.asList(tlogFiles), id); |
| } |
| |
| String[] oldBufferTlog = getBufferLogList(tlogDir); |
| if (oldBufferTlog != null && oldBufferTlog.length != 0) { |
| existOldBufferLog = true; |
| } |
| TransactionLog oldLog = null; |
| for (String oldLogName : tlogFiles) { |
| File f = new File(tlogDir, oldLogName); |
| try { |
| oldLog = newTransactionLog(f, null, true); |
| addOldLog(oldLog, false); // don't remove old logs on startup since more than one may be uncapped. |
| } catch (Exception e) { |
| SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e); |
| deleteFile(f); |
| } |
| } |
| |
| // Record first two logs (oldest first) at startup for potential tlog recovery. |
| // It's possible that at abnormal close both "tlog" and "prevTlog" were uncapped. |
| for (TransactionLog ll : logs) { |
| newestLogsOnStartup.addFirst(ll); |
| if (newestLogsOnStartup.size() >= 2) break; |
| } |
| |
| try { |
| versionInfo = new VersionInfo(this, numVersionBuckets); |
| } catch (SolrException e) { |
| log.error("Unable to use updateLog: ", e); |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, |
| "Unable to use updateLog: " + e.getMessage(), e); |
| } |
| |
| // TODO: these startingVersions assume that we successfully recover from all non-complete tlogs. |
| try (RecentUpdates startingUpdates = getRecentUpdates()) { |
| startingVersions = startingUpdates.getVersions(numRecordsToKeep); |
| |
| // populate recent deletes list (since we can't get that info from the index) |
| for (int i = startingUpdates.deleteList.size() - 1; i >= 0; i--) { |
| DeleteUpdate du = startingUpdates.deleteList.get(i); |
| oldDeletes.put(new BytesRef(du.id), new LogPtr(-1, du.version)); |
| } |
| |
| // populate recent deleteByQuery commands |
| for (int i = startingUpdates.deleteByQueryList.size() - 1; i >= 0; i--) { |
| Update update = startingUpdates.deleteByQueryList.get(i); |
| @SuppressWarnings({"unchecked"}) |
| List<Object> dbq = (List<Object>) update.log.lookup(update.pointer); |
| long version = (Long) dbq.get(1); |
| String q = (String) dbq.get(2); |
| trackDeleteByQuery(q, version); |
| } |
| |
| } |
| core.getCoreMetricManager().registerMetricProducer(SolrInfoBean.Category.TLOG.toString(), this); |
| } |
| |
| @Override |
| public void initializeMetrics(SolrMetricManager manager, String registry, String tag, String scope) { |
| this.metricManager = manager; |
| this.registryName = registry; |
| bufferedOpsGauge = () -> { |
| if (state == State.BUFFERING) { |
| if (bufferTlog == null) return 0; |
| // numRecords counts header as a record |
| return bufferTlog.numRecords() - 1; |
| } |
| if (tlog == null) { |
| return 0; |
| } else if (state == State.APPLYING_BUFFERED) { |
| // numRecords counts header as a record |
| return tlog.numRecords() - 1 - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes - recoveryInfo.errors; |
| } else { |
| return 0; |
| } |
| }; |
| |
| manager.registerGauge(null, registry, bufferedOpsGauge, tag, true, "ops", scope, "buffered"); |
| manager.registerGauge(null, registry, () -> logs.size(), tag, true, "logs", scope, "replay", "remaining"); |
| manager.registerGauge(null, registry, () -> getTotalLogsSize(), tag, true, "bytes", scope, "replay", "remaining"); |
| applyingBufferedOpsMeter = manager.meter(null, registry, "ops", scope, "applyingBuffered"); |
| replayOpsMeter = manager.meter(null, registry, "ops", scope, "replay"); |
| copyOverOldUpdatesMeter = manager.meter(null, registry, "ops", scope, "copyOverOldUpdates"); |
| manager.registerGauge(null, registry, () -> state.getValue(), tag, true, "state", scope); |
| } |
| |
| /** |
| * Returns a new {@link org.apache.solr.update.TransactionLog}. Sub-classes can override this method to |
| * change the implementation of the transaction log. |
| */ |
| public TransactionLog newTransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) { |
| return new TransactionLog(tlogFile, globalStrings, openExisting); |
| } |
| |
| public String getLogDir() { |
| return tlogDir.getAbsolutePath(); |
| } |
| |
| public List<Long> getStartingVersions() { |
| return startingVersions; |
| } |
| |
| public boolean existOldBufferLog() { |
| return existOldBufferLog; |
| } |
| |
| /* Takes over ownership of the log, keeping it until no longer needed |
| and then decrementing its reference and dropping it. |
| */ |
| protected synchronized void addOldLog(TransactionLog oldLog, boolean removeOld) { |
| if (oldLog == null) return; |
| |
| numOldRecords += oldLog.numRecords(); |
| |
| int currRecords = numOldRecords; |
| |
| if (oldLog != tlog && tlog != null) { |
| currRecords += tlog.numRecords(); |
| } |
| |
| while (removeOld && logs.size() > 0) { |
| TransactionLog log = logs.peekLast(); |
| int nrec = log.numRecords(); |
| // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if |
| // we already have the limit of 10 log files. |
| if (currRecords - nrec >= numRecordsToKeep || (maxNumLogsToKeep > 0 && logs.size() >= maxNumLogsToKeep)) { |
| currRecords -= nrec; |
| numOldRecords -= nrec; |
| logs.removeLast().decref(); // dereference so it will be deleted when no longer in use |
| continue; |
| } |
| |
| break; |
| } |
| |
| // don't incref... we are taking ownership from the caller. |
| logs.addFirst(oldLog); |
| } |
| |
| public String[] getBufferLogList(File directory) { |
| final String prefix = BUFFER_TLOG_NAME+'.'; |
| return directory.list((dir, name) -> name.startsWith(prefix)); |
| } |
| |
| /** |
| * Does update from old tlogs (not from buffer tlog)? |
| * If yes we must skip writing {@code cmd} to current tlog |
| */ |
| private boolean updateFromOldTlogs(UpdateCommand cmd) { |
| return (cmd.getFlags() & UpdateCommand.REPLAY) != 0 && state == State.REPLAYING; |
| } |
| |
| public String[] getLogList(File directory) { |
| final String prefix = TLOG_NAME+'.'; |
| String[] names = directory.list(new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return name.startsWith(prefix); |
| } |
| }); |
| if (names == null) { |
| throw new RuntimeException(new FileNotFoundException(directory.getAbsolutePath())); |
| } |
| Arrays.sort(names); |
| return names; |
| } |
| |
| public long getLastLogId() { |
| if (id != -1) return id; |
| if (tlogFiles.length == 0) return -1; |
| String last = tlogFiles[tlogFiles.length-1]; |
| return Long.parseLong(last.substring(TLOG_NAME.length() + 1)); |
| } |
| |
| public void add(AddUpdateCommand cmd) { |
| add(cmd, false); |
| } |
| |
| public void add(AddUpdateCommand cmd, boolean clearCaches) { |
| // don't log if we are replaying from another log |
| // TODO: we currently need to log to maintain correct versioning, rtg, etc |
| // if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return; |
| |
| // This hack could be removed after SOLR-15064 when we insist updates to child docs include _root_. |
| // Until then, if we're in a buffering mode, then the solrDoc won't have the _root_ field. |
| // Otherwise, it should already be there, placed by the client. |
| if (usableForChildDocs && cmd.useRouteAsRoot != null && cmd.solrDoc.getField(IndexSchema.ROOT_FIELD_NAME) == null) { |
| cmd.solrDoc.setField(IndexSchema.ROOT_FIELD_NAME, cmd.getIndexedIdStr()); |
| } |
| |
| synchronized (this) { |
| if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) { |
| ensureBufferTlog(); |
| bufferTlog.write(cmd); |
| return; |
| } |
| |
| long pos = -1; |
| long prevPointer = getPrevPointerForUpdate(cmd); |
| |
| // don't log if we are replaying from another log |
| if (!updateFromOldTlogs(cmd)) { |
| ensureLog(); |
| pos = tlog.write(cmd, prevPointer); |
| } |
| |
| if (!clearCaches) { |
| // TODO: in the future we could support a real position for a REPLAY update. |
| // Only currently would be useful for RTG while in recovery mode though. |
| LogPtr ptr = new LogPtr(pos, cmd.getVersion(), prevPointer); |
| |
| map.put(cmd.getIndexedId(), ptr); |
| |
| if (trace) { |
| log.trace("TLOG: added id {} to {} {} map={}", cmd.getPrintableId(), tlog, ptr, System.identityHashCode(map)); |
| } |
| |
| } else { |
| openRealtimeSearcher(); |
| if (log.isTraceEnabled()) { |
| log.trace("TLOG: added id {} to {} clearCaches=true", cmd.getPrintableId(), tlog); |
| } |
| } |
| |
| } |
| } |
| |
| /** |
| * @return If cmd is an in-place update, then returns the pointer (in the tlog) of the previous |
| * update that the given update depends on. |
| * Returns -1 if this is not an in-place update, or if we can't find a previous entry in |
| * the tlog. Upon receiving a -1, it should be clear why it was -1: if the command's |
| * flags|UpdateLog.UPDATE_INPLACE is set, then this command is an in-place update whose |
| * previous update is in the index and not in the tlog; if that flag is not set, it is |
| * not an in-place update at all, and don't bother about the prevPointer value at |
| * all (which is -1 as a dummy value).) |
| */ |
| private synchronized long getPrevPointerForUpdate(AddUpdateCommand cmd) { |
| // note: sync required to ensure maps aren't changed out form under us |
| if (cmd.isInPlaceUpdate()) { |
| BytesRef indexedId = cmd.getIndexedId(); |
| for (Map<BytesRef, LogPtr> currentMap : Arrays.asList(map, prevMap, prevMap2)) { |
| if (currentMap != null) { |
| LogPtr prevEntry = currentMap.get(indexedId); |
| if (null != prevEntry) { |
| return prevEntry.pointer; |
| } |
| } |
| } |
| } |
| return -1; |
| } |
| |
| |
| public void delete(DeleteUpdateCommand cmd) { |
| BytesRef br = cmd.getIndexedId(); |
| |
| synchronized (this) { |
| if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) { |
| ensureBufferTlog(); |
| bufferTlog.writeDelete(cmd); |
| return; |
| } |
| |
| long pos = -1; |
| if (!updateFromOldTlogs(cmd)) { |
| ensureLog(); |
| pos = tlog.writeDelete(cmd); |
| } |
| |
| LogPtr ptr = new LogPtr(pos, cmd.version); |
| map.put(br, ptr); |
| oldDeletes.put(br, ptr); |
| |
| if (trace) { |
| log.trace("TLOG: added delete for id {} to {} {} map={}", cmd.id, tlog, ptr, System.identityHashCode(map)); |
| } |
| } |
| } |
| |
| public void deleteByQuery(DeleteUpdateCommand cmd) { |
| synchronized (this) { |
| if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) { |
| ensureBufferTlog(); |
| bufferTlog.writeDeleteByQuery(cmd); |
| return; |
| } |
| |
| long pos = -1; |
| if (!updateFromOldTlogs(cmd)) { |
| ensureLog(); |
| pos = tlog.writeDeleteByQuery(cmd); |
| } |
| |
| // skip purge our caches in case of tlog replica |
| if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) { |
| // given that we just did a delete-by-query, we don't know what documents were |
| // affected and hence we must purge our caches. |
| openRealtimeSearcher(); |
| trackDeleteByQuery(cmd.getQuery(), cmd.getVersion()); |
| |
| if (trace) { |
| LogPtr ptr = new LogPtr(pos, cmd.getVersion()); |
| int hash = System.identityHashCode(map); |
| log.trace("TLOG: added deleteByQuery {} to {} {} map = {}.", cmd.query, tlog, ptr, hash); |
| } |
| } |
| } |
| } |
| |
| /** Opens a new realtime searcher and clears the id caches. |
| * This may also be called when we updates are being buffered (from PeerSync/IndexFingerprint) |
| */ |
| public void openRealtimeSearcher() { |
| log.debug("openRealtimeSearcher"); |
| synchronized (this) { |
| // We must cause a new IndexReader to be opened before anything looks at these caches again |
| // so that a cache miss will read fresh data. |
| try { |
| RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true); |
| holder.decref(); |
| } catch (Exception e) { |
| SolrException.log(log, "Error opening realtime searcher", e); |
| return; |
| } |
| |
| if (map != null) map.clear(); |
| if (prevMap != null) prevMap.clear(); |
| if (prevMap2 != null) prevMap2.clear(); |
| } |
| } |
| |
| /** currently for testing only */ |
| public void deleteAll() { |
| synchronized (this) { |
| |
| try { |
| RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true); |
| holder.decref(); |
| } catch (Exception e) { |
| SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e); |
| } |
| |
| if (map != null) map.clear(); |
| if (prevMap != null) prevMap.clear(); |
| if (prevMap2 != null) prevMap2.clear(); |
| |
| oldDeletes.clear(); |
| deleteByQueries.clear(); |
| } |
| } |
| |
| |
| void trackDeleteByQuery(String q, long version) { |
| version = Math.abs(version); |
| DBQ dbq = new DBQ(); |
| dbq.q = q; |
| dbq.version = version; |
| |
| synchronized (this) { |
| if (deleteByQueries.isEmpty() || deleteByQueries.getFirst().version < version) { |
| // common non-reordered case |
| deleteByQueries.addFirst(dbq); |
| } else { |
| // find correct insertion point |
| ListIterator<DBQ> iter = deleteByQueries.listIterator(); |
| iter.next(); // we already checked the first element in the previous "if" clause |
| while (iter.hasNext()) { |
| DBQ oldDBQ = iter.next(); |
| if (oldDBQ.version < version) { |
| iter.previous(); |
| break; |
| } else if (oldDBQ.version == version && oldDBQ.q.equals(q)) { |
| // a duplicate |
| return; |
| } |
| } |
| iter.add(dbq); // this also handles the case of adding at the end when hasNext() == false |
| } |
| |
| if (deleteByQueries.size() > numDeletesByQueryToKeep) { |
| deleteByQueries.removeLast(); |
| } |
| } |
| } |
| |
| public List<DBQ> getDBQNewer(long version) { |
| synchronized (this) { |
| if (deleteByQueries.isEmpty() || deleteByQueries.getFirst().version < version) { |
| // fast common case |
| return null; |
| } |
| |
| List<DBQ> dbqList = new ArrayList<>(); |
| for (DBQ dbq : deleteByQueries) { |
| if (dbq.version <= version) break; |
| dbqList.add(dbq); |
| } |
| return dbqList; |
| } |
| } |
| |
| protected void newMap() { |
| prevMap2 = prevMap; |
| prevMapLog2 = prevMapLog; |
| |
| prevMap = map; |
| prevMapLog = tlog; |
| |
| map = new HashMap<>(); |
| } |
| |
| private void clearOldMaps() { |
| prevMap = null; |
| prevMap2 = null; |
| } |
| |
| public boolean hasUncommittedChanges() { |
| return tlog != null; |
| } |
| |
| public void preCommit(CommitUpdateCommand cmd) { |
| synchronized (this) { |
| if (debug) { |
| log.debug("TLOG: preCommit"); |
| } |
| |
| if (getState() != State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { |
| // if we aren't in the active state, and this isn't a replay |
| // from the recovery process, then we shouldn't mess with |
| // the current transaction log. This normally shouldn't happen |
| // as DistributedUpdateProcessor will prevent this. Commits |
| // that don't use the processor are possible though. |
| return; |
| } |
| |
| // since we're changing the log, we must change the map. |
| newMap(); |
| |
| if (prevTlog != null) { |
| globalStrings = prevTlog.getGlobalStrings(); |
| } |
| |
| // since document additions can happen concurrently with commit, create |
| // a new transaction log first so that we know the old one is definitely |
| // in the index. |
| if (prevTlog != null) { |
| // postCommit for prevTlog is not called, may be the index is corrupted |
| // if we override prevTlog value, the correspond tlog will be leaked, close it first |
| postCommit(cmd); |
| } |
| prevTlog = tlog; |
| tlog = null; |
| id++; |
| } |
| } |
| |
| public void postCommit(CommitUpdateCommand cmd) { |
| synchronized (this) { |
| if (debug) { |
| log.debug("TLOG: postCommit"); |
| } |
| if (prevTlog != null) { |
| // if we made it through the commit, write a commit command to the log |
| // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup. |
| prevTlog.writeCommit(cmd); |
| |
| addOldLog(prevTlog, true); |
| // the old log list will decref when no longer needed |
| // prevTlog.decref(); |
| prevTlog = null; |
| } |
| } |
| } |
| |
| public void preSoftCommit(CommitUpdateCommand cmd) { |
| debug = log.isDebugEnabled(); // refresh our view of debugging occasionally |
| trace = log.isTraceEnabled(); |
| |
| synchronized (this) { |
| |
| if (!cmd.softCommit) return; // already handled this at the start of the hard commit |
| newMap(); |
| |
| // start adding documents to a new map since we won't know if |
| // any added documents will make it into this commit or not. |
| // But we do know that any updates already added will definitely |
| // show up in the latest reader after the commit succeeds. |
| map = new HashMap<>(); |
| |
| if (debug) { |
| log.debug("TLOG: preSoftCommit: prevMap={} new map={}", System.identityHashCode(prevMap), System.identityHashCode(map)); |
| } |
| } |
| } |
| |
| public void postSoftCommit(CommitUpdateCommand cmd) { |
| synchronized (this) { |
| // We can clear out all old maps now that a new searcher has been opened. |
| // This currently only works since DUH2 synchronizes around preCommit to avoid |
| // it being called in the middle of a preSoftCommit, postSoftCommit sequence. |
| // If this DUH2 synchronization were to be removed, preSoftCommit should |
| // record what old maps were created and only remove those. |
| |
| if (debug) { |
| SolrCore.verbose("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap) + ", prevMap2=" + System.identityHashCode(prevMap2)); |
| } |
| clearOldMaps(); |
| |
| } |
| } |
| |
| /** |
| * Goes over backwards, following the prevPointer, to merge all partial updates into the passed doc. Stops at either a full |
| * document, or if there are no previous entries to follow in the update log. |
| * |
| * @param id Binary representation of the unique key field |
| * @param prevPointer Pointer to the previous entry in the ulog, based on which the current in-place update was made. |
| * @param prevVersion Version of the previous entry in the ulog, based on which the current in-place update was made. |
| * @param onlyTheseFields When a non-null set of field names is passed in, the resolve process only attempts to populate |
| * the given fields in this set. When this set is null, it resolves all fields. |
| * @param latestPartialDoc Partial document that is to be populated |
| * @return Returns 0 if a full document was found in the log, -1 if no full document was found. If full document was supposed |
| * to be found in the tlogs, but couldn't be found (because the logs were rotated) then the prevPointer is returned. |
| */ |
| @SuppressWarnings({"unchecked"}) |
| synchronized public long applyPartialUpdates(BytesRef id, long prevPointer, long prevVersion, |
| Set<String> onlyTheseFields, @SuppressWarnings({"rawtypes"})SolrDocumentBase latestPartialDoc) { |
| |
| SolrInputDocument partialUpdateDoc = null; |
| |
| List<TransactionLog> lookupLogs = Arrays.asList(tlog, prevMapLog, prevMapLog2); |
| while (prevPointer >= 0) { |
| //go through each partial update and apply it on the incoming doc one after another |
| @SuppressWarnings({"rawtypes"}) |
| List entry; |
| entry = getEntryFromTLog(prevPointer, prevVersion, lookupLogs); |
| if (entry == null) { |
| return prevPointer; // a previous update was supposed to be found, but wasn't found (due to log rotation) |
| } |
| int flags = (int) entry.get(UpdateLog.FLAGS_IDX); |
| |
| // since updates can depend only upon ADD updates or other UPDATE_INPLACE updates, we assert that we aren't |
| // getting something else |
| if ((flags & UpdateLog.ADD) != UpdateLog.ADD && (flags & UpdateLog.UPDATE_INPLACE) != UpdateLog.UPDATE_INPLACE) { |
| throw new SolrException(ErrorCode.INVALID_STATE, entry + " should've been either ADD or UPDATE_INPLACE update" + |
| ", while looking for id=" + new String(id.bytes, Charset.forName("UTF-8"))); |
| } |
| // if this is an ADD (i.e. full document update), stop here |
| if ((flags & UpdateLog.ADD) == UpdateLog.ADD) { |
| partialUpdateDoc = (SolrInputDocument) entry.get(entry.size() - 1); |
| applyOlderUpdates(latestPartialDoc, partialUpdateDoc, onlyTheseFields); |
| return 0; // Full document was found in the tlog itself |
| } |
| if (entry.size() < 5) { |
| throw new SolrException(ErrorCode.INVALID_STATE, entry + " is not a partial doc" + |
| ", while looking for id=" + new String(id.bytes, Charset.forName("UTF-8"))); |
| } |
| // This update is an inplace update, get the partial doc. The input doc is always at last position. |
| partialUpdateDoc = (SolrInputDocument) entry.get(entry.size() - 1); |
| applyOlderUpdates(latestPartialDoc, partialUpdateDoc, onlyTheseFields); |
| prevPointer = (long) entry.get(UpdateLog.PREV_POINTER_IDX); |
| prevVersion = (long) entry.get(UpdateLog.PREV_VERSION_IDX); |
| |
| if (onlyTheseFields != null && latestPartialDoc.keySet().containsAll(onlyTheseFields)) { |
| return 0; // all the onlyTheseFields have been resolved, safe to abort now. |
| } |
| } |
| |
| return -1; // last full document is not supposed to be in tlogs, but it must be in the index |
| } |
| |
| /** |
| * Add all fields from olderDoc into newerDoc if not already present in newerDoc |
| */ |
| private void applyOlderUpdates(@SuppressWarnings({"rawtypes"})SolrDocumentBase newerDoc, SolrInputDocument olderDoc, Set<String> mergeFields) { |
| for (String fieldName : olderDoc.getFieldNames()) { |
| // if the newerDoc has this field, then this field from olderDoc can be ignored |
| if (!newerDoc.containsKey(fieldName) && (mergeFields == null || mergeFields.contains(fieldName))) { |
| for (Object val : olderDoc.getFieldValues(fieldName)) { |
| newerDoc.addField(fieldName, val); |
| } |
| } |
| } |
| } |
| |
| |
| /*** |
| * Get the entry that has the given lookupVersion in the given lookupLogs at the lookupPointer position. |
| * |
| * @return The entry if found, otherwise null |
| */ |
| @SuppressWarnings({"rawtypes"}) |
| private synchronized List getEntryFromTLog(long lookupPointer, long lookupVersion, List<TransactionLog> lookupLogs) { |
| for (TransactionLog lookupLog : lookupLogs) { |
| if (lookupLog != null && lookupLog.getLogSize() > lookupPointer) { |
| lookupLog.incref(); |
| try { |
| Object obj = null; |
| |
| try { |
| obj = lookupLog.lookup(lookupPointer); |
| } catch (Exception | Error ex) { |
| // This can happen when trying to deserialize the entry at position lookupPointer, |
| // but from a different tlog than the one containing the desired entry. |
| // Just ignore the exception, so as to proceed to the next tlog. |
| log.debug("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored" |
| , lookupLog, lookupVersion); |
| } |
| |
| if (obj != null && obj instanceof List) { |
| List tmpEntry = (List) obj; |
| if (tmpEntry.size() >= 2 && |
| (tmpEntry.get(UpdateLog.VERSION_IDX) instanceof Long) && |
| ((Long) tmpEntry.get(UpdateLog.VERSION_IDX)).equals(lookupVersion)) { |
| return tmpEntry; |
| } |
| } |
| } finally { |
| lookupLog.decref(); |
| } |
| } |
| } |
| return null; |
| } |
| |
| public Object lookup(BytesRef indexedId) { |
| LogPtr entry; |
| TransactionLog lookupLog; |
| |
| synchronized (this) { |
| entry = map.get(indexedId); |
| lookupLog = tlog; // something found in "map" will always be in "tlog" |
| // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| if (entry == null && prevMap != null) { |
| entry = prevMap.get(indexedId); |
| // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog) |
| lookupLog = prevMapLog; |
| // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| } |
| if (entry == null && prevMap2 != null) { |
| entry = prevMap2.get(indexedId); |
| // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog) |
| lookupLog = prevMapLog2; |
| // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| } |
| |
| if (entry == null) { |
| return null; |
| } |
| lookupLog.incref(); |
| } |
| |
| try { |
| // now do the lookup outside of the sync block for concurrency |
| return lookupLog.lookup(entry.pointer); |
| } finally { |
| lookupLog.decref(); |
| } |
| |
| } |
| |
| // This method works like realtime-get... it only guarantees to return the latest |
| // version of the *completed* update. There can be updates in progress concurrently |
| // that have already grabbed higher version numbers. Higher level coordination or |
| // synchronization is needed for stronger guarantees (as VersionUpdateProcessor does). |
| public Long lookupVersion(BytesRef indexedId) { |
| LogPtr entry; |
| TransactionLog lookupLog; |
| |
| synchronized (this) { |
| entry = map.get(indexedId); |
| lookupLog = tlog; // something found in "map" will always be in "tlog" |
| // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| if (entry == null && prevMap != null) { |
| entry = prevMap.get(indexedId); |
| // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog) |
| lookupLog = prevMapLog; |
| // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| } |
| if (entry == null && prevMap2 != null) { |
| entry = prevMap2.get(indexedId); |
| // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog) |
| lookupLog = prevMapLog2; |
| // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| } |
| } |
| |
| if (entry != null) { |
| return entry.version; |
| } |
| |
| // Now check real index |
| Long version = versionInfo.getVersionFromIndex(indexedId); |
| |
| if (version != null) { |
| return version; |
| } |
| |
| // We can't get any version info for deletes from the index, so if the doc |
| // wasn't found, check a cache of recent deletes. |
| |
| synchronized (this) { |
| entry = oldDeletes.get(indexedId); |
| } |
| |
| if (entry != null) { |
| return entry.version; |
| } |
| |
| return null; |
| } |
| |
| public void finish(SyncLevel syncLevel) { |
| if (syncLevel == null) { |
| syncLevel = defaultSyncLevel; |
| } |
| if (syncLevel == SyncLevel.NONE) { |
| return; |
| } |
| |
| TransactionLog currLog; |
| synchronized (this) { |
| currLog = tlog; |
| if (currLog == null) return; |
| currLog.incref(); |
| } |
| |
| try { |
| currLog.finish(syncLevel); |
| } finally { |
| currLog.decref(); |
| } |
| } |
| |
| |
| public Future<RecoveryInfo> recoverFromLog() { |
| recoveryInfo = new RecoveryInfo(); |
| |
| List<TransactionLog> recoverLogs = new ArrayList<>(1); |
| for (TransactionLog ll : newestLogsOnStartup) { |
| if (!ll.try_incref()) continue; |
| |
| try { |
| if (ll.endsWithCommit()) { |
| ll.closeOutput(); |
| ll.decref(); |
| continue; |
| } |
| } catch (IOException e) { |
| log.error("Error inspecting tlog {}", ll, e); |
| ll.closeOutput(); |
| ll.decref(); |
| continue; |
| } |
| |
| recoverLogs.add(ll); |
| } |
| |
| if (recoverLogs.isEmpty()) return null; |
| |
| ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor); |
| LogReplayer replayer = new LogReplayer(recoverLogs, false); |
| |
| versionInfo.blockUpdates(); |
| try { |
| state = State.REPLAYING; |
| |
| // The deleteByQueries and oldDeletes lists |
| // would've been populated by items from the logs themselves (which we |
| // will replay now). So lets clear them out here before the replay. |
| deleteByQueries.clear(); |
| oldDeletes.clear(); |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| |
| // At this point, we are guaranteed that any new updates coming in will see the state as "replaying" |
| |
| return cs.submit(replayer, recoveryInfo); |
| } |
| |
| /** |
| * Replay current tlog, so all updates will be written to index. |
| * This is must do task for a tlog replica become a new leader. |
| * @return future of this task |
| */ |
| public Future<RecoveryInfo> recoverFromCurrentLog() { |
| if (tlog == null) { |
| return null; |
| } |
| map.clear(); |
| recoveryInfo = new RecoveryInfo(); |
| tlog.incref(); |
| |
| ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor); |
| LogReplayer replayer = new LogReplayer(Collections.singletonList(tlog), false, true); |
| |
| versionInfo.blockUpdates(); |
| try { |
| state = State.REPLAYING; |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| |
| return cs.submit(replayer, recoveryInfo); |
| } |
| |
| /** |
| * Block updates, append a commit at current tlog, |
| * then copy over buffer updates to new tlog and bring back ulog to active state. |
| * So any updates which hasn't made it to the index is preserved in the current tlog, |
| * this also make RTG work |
| * @param cuc any updates that have version larger than the version of cuc will be copied over |
| */ |
| public void copyOverBufferingUpdates(CommitUpdateCommand cuc) { |
| versionInfo.blockUpdates(); |
| try { |
| synchronized (this) { |
| state = State.ACTIVE; |
| if (bufferTlog == null) { |
| return; |
| } |
| // by calling this, we won't switch to new tlog (compared to applyBufferedUpdates()) |
| // if we switch to new tlog we can possible lose updates on the next fetch |
| copyOverOldUpdates(cuc.getVersion(), bufferTlog); |
| dropBufferTlog(); |
| } |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| } |
| |
| /** |
| * Block updates, append a commit at current tlog, then copy over updates to a new tlog. |
| * So any updates which hasn't made it to the index is preserved in the current tlog |
| * @param cuc any updates that have version larger than the version of cuc will be copied over |
| */ |
| public void commitAndSwitchToNewTlog(CommitUpdateCommand cuc) { |
| versionInfo.blockUpdates(); |
| try { |
| synchronized (this) { |
| if (tlog == null) { |
| return; |
| } |
| preCommit(cuc); |
| try { |
| copyOverOldUpdates(cuc.getVersion()); |
| } finally { |
| postCommit(cuc); |
| } |
| } |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| } |
| |
| public void copyOverOldUpdates(long commitVersion) { |
| TransactionLog oldTlog = prevTlog; |
| if (oldTlog == null && !logs.isEmpty()) { |
| oldTlog = logs.getFirst(); |
| } |
| if (oldTlog == null || oldTlog.refcount.get() == 0) { |
| return; |
| } |
| |
| try { |
| if (oldTlog.endsWithCommit()) return; |
| } catch (IOException e) { |
| log.warn("Exception reading log", e); |
| return; |
| } |
| copyOverOldUpdates(commitVersion, oldTlog); |
| } |
| |
| /** |
| * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog |
| * @param commitVersion any updates that have version larger than the commitVersion will be copied over |
| */ |
| public void copyOverOldUpdates(long commitVersion, TransactionLog oldTlog) { |
| copyOverOldUpdatesMeter.mark(); |
| |
| SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, |
| new ModifiableSolrParams()); |
| TransactionLog.LogReader logReader = oldTlog.getReader(0); |
| Object o = null; |
| try { |
| while ( (o = logReader.next()) != null ) { |
| try { |
| @SuppressWarnings({"rawtypes"}) |
| List entry = (List)o; |
| int operationAndFlags = (Integer) entry.get(0); |
| int oper = operationAndFlags & OPERATION_MASK; |
| long version = (Long) entry.get(1); |
| if (Math.abs(version) > commitVersion) { |
| switch (oper) { |
| case UpdateLog.UPDATE_INPLACE: |
| case UpdateLog.ADD: { |
| AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version); |
| cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT); |
| add(cmd); |
| break; |
| } |
| case UpdateLog.DELETE: { |
| byte[] idBytes = (byte[]) entry.get(2); |
| DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); |
| cmd.setIndexedId(new BytesRef(idBytes)); |
| cmd.setVersion(version); |
| cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT); |
| delete(cmd); |
| break; |
| } |
| |
| case UpdateLog.DELETE_BY_QUERY: { |
| String query = (String) entry.get(2); |
| DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); |
| cmd.query = query; |
| cmd.setVersion(version); |
| cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT); |
| deleteByQuery(cmd); |
| break; |
| } |
| |
| default: |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); |
| } |
| } |
| } catch (ClassCastException e) { |
| log.warn("Unexpected log entry or corrupt log. Entry={}", o, e); |
| } |
| } |
| // Prev tlog will be closed, so nullify prevMap |
| if (prevTlog == oldTlog) { |
| prevMap = null; |
| } |
| } catch (IOException e) { |
| log.error("Exception reading versions from log",e); |
| } catch (InterruptedException e) { |
| log.warn("Exception reading log", e); |
| } finally { |
| if (logReader != null) logReader.close(); |
| } |
| } |
| |
| protected void ensureBufferTlog() { |
| if (bufferTlog != null) return; |
| String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime()); |
| bufferTlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false); |
| bufferTlog.isBuffer = true; |
| } |
| |
| // Cleanup old buffer tlogs |
| protected void deleteBufferLogs() { |
| String[] oldBufferTlog = getBufferLogList(tlogDir); |
| if (oldBufferTlog != null && oldBufferTlog.length != 0) { |
| for (String oldBufferLogName : oldBufferTlog) { |
| deleteFile(new File(tlogDir, oldBufferLogName)); |
| } |
| } |
| } |
| |
| |
| protected void ensureLog() { |
| if (tlog == null) { |
| String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, TLOG_NAME, id); |
| tlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false); |
| } |
| } |
| |
| |
| private void doClose(TransactionLog theLog, boolean writeCommit) { |
| if (theLog != null) { |
| if (writeCommit) { |
| // record a commit |
| log.info("Recording current closed for {} log={}", uhandler.core, theLog); |
| CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false); |
| theLog.writeCommit(cmd); |
| } |
| |
| theLog.deleteOnClose = false; |
| theLog.decref(); |
| theLog.forceClose(); |
| } |
| } |
| |
| public void close(boolean committed) { |
| close(committed, false); |
| } |
| |
| public void close(boolean committed, boolean deleteOnClose) { |
| recoveryExecutor.shutdown(); // no new tasks |
| |
| synchronized (this) { |
| |
| // Don't delete the old tlogs, we want to be able to replay from them and retrieve old versions |
| |
| doClose(prevTlog, committed); |
| doClose(tlog, committed); |
| |
| for (TransactionLog log : logs) { |
| if (log == prevTlog || log == tlog) continue; |
| log.deleteOnClose = false; |
| log.decref(); |
| log.forceClose(); |
| } |
| |
| if (bufferTlog != null) { |
| // should not delete bufferTlog on close, existing bufferTlog is a sign for skip peerSync |
| bufferTlog.deleteOnClose = false; |
| bufferTlog.decref(); |
| bufferTlog.forceClose(); |
| } |
| |
| } |
| |
| try { |
| ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor); |
| } catch (Exception e) { |
| SolrException.log(log, e); |
| } |
| } |
| |
| |
| static class Update { |
| TransactionLog log; |
| long version; |
| long previousVersion; // for in-place updates |
| long pointer; |
| } |
| |
| static class DeleteUpdate { |
| long version; |
| byte[] id; |
| |
| public DeleteUpdate(long version, byte[] id) { |
| this.version = version; |
| this.id = id; |
| } |
| } |
| |
| public class RecentUpdates implements Closeable { |
| |
| final Deque<TransactionLog> logList; // newest first |
| List<List<Update>> updateList; |
| HashMap<Long, Update> updates; |
| List<Update> deleteByQueryList; |
| List<DeleteUpdate> deleteList; |
| Set<Long> bufferUpdates = new HashSet<>(); |
| |
| public RecentUpdates(Deque<TransactionLog> logList) { |
| this.logList = logList; |
| boolean success = false; |
| try { |
| update(); |
| success = true; |
| } finally { |
| // defensive: if some unknown exception is thrown, |
| // make sure we close so that the tlogs are decref'd |
| if (!success) { |
| close(); |
| } |
| } |
| } |
| |
| public List<Long> getVersions(int n){ |
| return getVersions(n, Long.MAX_VALUE); |
| } |
| |
| public Set<Long> getBufferUpdates() { |
| return Collections.unmodifiableSet(bufferUpdates); |
| } |
| |
| public List<Long> getVersions(int n, long maxVersion) { |
| List<Long> ret = new ArrayList<>(n); |
| LongSet set = new LongSet(n); |
| final int nInput = n; |
| |
| for (List<Update> singleList : updateList) { |
| for (Update ptr : singleList) { |
| if(Math.abs(ptr.version) > Math.abs(maxVersion)) continue; |
| if (!set.add(ptr.version)) { |
| if (debug) { |
| log.debug("getVersions(n={}, maxVersion={}) not returning duplicate version = {}", nInput, maxVersion, ptr.version); |
| } |
| continue; |
| } |
| ret.add(ptr.version); |
| if (--n <= 0) return ret; |
| } |
| } |
| |
| return ret; |
| } |
| |
| public Object lookup(long version) { |
| Update update = updates.get(version); |
| if (update == null) return null; |
| |
| return update.log.lookup(update.pointer); |
| } |
| |
| /** Returns the list of deleteByQueries that happened after the given version */ |
| public List<Object> getDeleteByQuery(long afterVersion, LongSet updateVersions) { |
| List<Object> result = new ArrayList<>(deleteByQueryList.size()); |
| for (Update update : deleteByQueryList) { |
| if (Math.abs(update.version) > afterVersion) { |
| if (updateVersions.add(update.version)) { |
| Object dbq = update.log.lookup(update.pointer); |
| result.add(dbq); |
| } else { |
| if (debug) { |
| log.debug("UpdateLog.RecentUpdates.getDeleteByQuery(afterVersion={}) not returning duplicate version = {}", |
| afterVersion, update.version); |
| } |
| } |
| } |
| } |
| return result; |
| } |
| |
| private void update() { |
| int numUpdates = 0; |
| updateList = new ArrayList<>(logList.size()); |
| deleteByQueryList = new ArrayList<>(); |
| deleteList = new ArrayList<>(); |
| updates = new HashMap<>(numRecordsToKeep); |
| |
| for (TransactionLog oldLog : logList) { |
| List<Update> updatesForLog = new ArrayList<>(); |
| |
| TransactionLog.ReverseReader reader = null; |
| try { |
| reader = oldLog.getReverseReader(); |
| |
| while (numUpdates < numRecordsToKeep) { |
| Object o = null; |
| try { |
| o = reader.next(); |
| if (o==null) break; |
| |
| // should currently be a List<Oper,Ver,Doc/Id> |
| @SuppressWarnings({"rawtypes"}) |
| List entry = (List)o; |
| |
| // TODO: refactor this out so we get common error handling |
| int opAndFlags = (Integer)entry.get(UpdateLog.FLAGS_IDX); |
| int oper = opAndFlags & UpdateLog.OPERATION_MASK; |
| long version = (Long) entry.get(UpdateLog.VERSION_IDX); |
| |
| if (oldLog.isBuffer) bufferUpdates.add(version); |
| |
| switch (oper) { |
| case UpdateLog.ADD: |
| case UpdateLog.UPDATE_INPLACE: |
| case UpdateLog.DELETE: |
| case UpdateLog.DELETE_BY_QUERY: |
| Update update = new Update(); |
| update.log = oldLog; |
| update.pointer = reader.position(); |
| update.version = version; |
| |
| if (oper == UpdateLog.UPDATE_INPLACE) { |
| if ((update.log instanceof CdcrTransactionLog && entry.size() == 6) || |
| (!(update.log instanceof CdcrTransactionLog) && entry.size() == 5)) { |
| update.previousVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX); |
| } |
| } |
| updatesForLog.add(update); |
| updates.put(version, update); |
| |
| if (oper == UpdateLog.DELETE_BY_QUERY) { |
| deleteByQueryList.add(update); |
| } else if (oper == UpdateLog.DELETE) { |
| deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2))); |
| } |
| |
| break; |
| |
| case UpdateLog.COMMIT: |
| break; |
| default: |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); |
| } |
| } catch (ClassCastException cl) { |
| log.warn("Unexpected log entry or corrupt log. Entry={}", o, cl); |
| // would be caused by a corrupt transaction log |
| } catch (Exception ex) { |
| log.warn("Exception reverse reading log", ex); |
| break; |
| } |
| |
| numUpdates++; |
| } |
| |
| } catch (IOException | AssertionError e) { // catch AssertionError to handle certain test failures correctly |
| // failure to read a log record isn't fatal |
| log.error("Exception reading versions from log",e); |
| } finally { |
| if (reader != null) reader.close(); |
| } |
| |
| updateList.add(updatesForLog); |
| } |
| |
| } |
| |
| @Override |
| public void close() { |
| for (TransactionLog log : logList) { |
| log.decref(); |
| } |
| } |
| |
| public long getMaxRecentVersion() { |
| long maxRecentVersion = 0L; |
| if (updates != null) { |
| for (Long key : updates.keySet()) |
| maxRecentVersion = Math.max(maxRecentVersion, Math.abs(key.longValue())); |
| } |
| return maxRecentVersion; |
| } |
| } |
| |
| /** The RecentUpdates object returned must be closed after use */ |
| public RecentUpdates getRecentUpdates() { |
| Deque<TransactionLog> logList; |
| synchronized (this) { |
| logList = new LinkedList<>(logs); |
| for (TransactionLog log : logList) { |
| log.incref(); |
| } |
| if (prevTlog != null) { |
| prevTlog.incref(); |
| logList.addFirst(prevTlog); |
| } |
| if (tlog != null) { |
| tlog.incref(); |
| logList.addFirst(tlog); |
| } |
| if (bufferTlog != null) { |
| bufferTlog.incref(); |
| logList.addFirst(bufferTlog); |
| } |
| } |
| |
| // TODO: what if I hand out a list of updates, then do an update, then hand out another list (and |
| // one of the updates I originally handed out fell off the list). Over-request? |
| return new RecentUpdates(logList); |
| |
| } |
| |
| public void bufferUpdates() { |
| // recovery trips this assert under some race - even when |
| // it checks the state first |
| // assert state == State.ACTIVE; |
| |
| // block all updates to eliminate race conditions |
| // reading state and acting on it in the distributed update processor |
| versionInfo.blockUpdates(); |
| try { |
| if (state != State.ACTIVE && state != State.BUFFERING) { |
| // we don't currently have support for handling other states |
| log.warn("Unexpected state for bufferUpdates: {}, Ignoring request", state); |
| return; |
| } |
| dropBufferTlog(); |
| deleteBufferLogs(); |
| |
| recoveryInfo = new RecoveryInfo(); |
| |
| if (log.isInfoEnabled()) { |
| log.info("Starting to buffer updates. {}", this); |
| } |
| |
| state = State.BUFFERING; |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| } |
| |
| /** Returns true if we were able to drop buffered updates and return to the ACTIVE state */ |
| public boolean dropBufferedUpdates() { |
| versionInfo.blockUpdates(); |
| try { |
| if (state != State.BUFFERING) return false; |
| |
| if (log.isInfoEnabled()) { |
| log.info("Dropping buffered updates {}", this); |
| } |
| |
| dropBufferTlog(); |
| |
| state = State.ACTIVE; |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| return true; |
| } |
| |
| private void dropBufferTlog() { |
| synchronized (this) { |
| if (bufferTlog != null) { |
| bufferTlog.decref(); |
| bufferTlog = null; |
| } |
| } |
| } |
| |
| |
| /** Returns the Future to wait on, or null if no replay was needed */ |
| public Future<RecoveryInfo> applyBufferedUpdates() { |
| // recovery trips this assert under some race - even when |
| // it checks the state first |
| // assert state == State.BUFFERING; |
| |
| // block all updates to eliminate race conditions |
| // reading state and acting on it in the update processor |
| versionInfo.blockUpdates(); |
| try { |
| cancelApplyBufferUpdate = false; |
| if (state != State.BUFFERING) return null; |
| |
| synchronized (this) { |
| // handle case when no updates were received. |
| if (bufferTlog == null) { |
| state = State.ACTIVE; |
| return null; |
| } |
| bufferTlog.incref(); |
| } |
| |
| state = State.APPLYING_BUFFERED; |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| |
| if (recoveryExecutor.isShutdown()) { |
| throw new RuntimeException("executor is not running..."); |
| } |
| ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor); |
| LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true); |
| return cs.submit(() -> { |
| replayer.run(); |
| dropBufferTlog(); |
| }, recoveryInfo); |
| } |
| |
| public State getState() { |
| return state; |
| } |
| |
| @Override |
| public String toString() { |
| return "FSUpdateLog{state="+getState()+", tlog="+tlog+"}"; |
| } |
| |
| |
| public static Runnable testing_logReplayHook; // called before each log read |
| public static Runnable testing_logReplayFinishHook; // called when log replay has finished |
| |
| |
| |
| protected RecoveryInfo recoveryInfo; |
| |
| class LogReplayer implements Runnable { |
| private Logger loglog = log; // set to something different? |
| |
| Deque<TransactionLog> translogs; |
| TransactionLog.LogReader tlogReader; |
| boolean activeLog; |
| boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked |
| boolean debug = loglog.isDebugEnabled(); |
| boolean inSortedOrder; |
| |
| public LogReplayer(List<TransactionLog> translogs, boolean activeLog) { |
| this.translogs = new LinkedList<>(); |
| this.translogs.addAll(translogs); |
| this.activeLog = activeLog; |
| } |
| |
| public LogReplayer(List<TransactionLog> translogs, boolean activeLog, boolean inSortedOrder) { |
| this(translogs, activeLog); |
| this.inSortedOrder = inSortedOrder; |
| } |
| |
| private SolrQueryRequest req; |
| private SolrQueryResponse rsp; |
| |
| @Override |
| public void run() { |
| ModifiableSolrParams params = new ModifiableSolrParams(); |
| params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString()); |
| params.set(DistributedUpdateProcessor.LOG_REPLAY, "true"); |
| req = new LocalSolrQueryRequest(uhandler.core, params); |
| rsp = new SolrQueryResponse(); |
| SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // setting request info will help logging |
| |
| try { |
| for (; ; ) { |
| TransactionLog translog = translogs.pollFirst(); |
| if (translog == null) break; |
| doReplay(translog); |
| } |
| } catch (SolrException e) { |
| if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) { |
| SolrException.log(log, e); |
| recoveryInfo.failed = true; |
| } else { |
| recoveryInfo.errors++; |
| SolrException.log(log, e); |
| } |
| } catch (Exception e) { |
| recoveryInfo.errors++; |
| SolrException.log(log, e); |
| } finally { |
| // change the state while updates are still blocked to prevent races |
| state = State.ACTIVE; |
| if (finishing) { |
| |
| // after replay, update the max from the index |
| log.info("Re-computing max version from index after log re-play."); |
| maxVersionFromIndex = null; |
| getMaxVersionFromIndex(); |
| |
| versionInfo.unblockUpdates(); |
| } |
| |
| // clean up in case we hit some unexpected exception and didn't get |
| // to more transaction logs |
| for (TransactionLog translog : translogs) { |
| log.error("ERROR: didn't get to recover from tlog {}", translog); |
| translog.decref(); |
| } |
| } |
| |
| loglog.warn("Log replay finished. recoveryInfo={}", recoveryInfo); |
| |
| if (testing_logReplayFinishHook != null) testing_logReplayFinishHook.run(); |
| |
| SolrRequestInfo.clearRequestInfo(); |
| } |
| |
| |
| public void doReplay(TransactionLog translog) { |
| try { |
| loglog.warn("Starting log replay {} active={} starting pos={} inSortedOrder={}", translog, activeLog, recoveryInfo.positionOfStart, inSortedOrder); |
| long lastStatusTime = System.nanoTime(); |
| if (inSortedOrder) { |
| tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart); |
| } else { |
| tlogReader = translog.getReader(recoveryInfo.positionOfStart); |
| } |
| |
| // NOTE: we don't currently handle a core reload during recovery. This would cause the core |
| // to change underneath us. |
| |
| UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null); |
| UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp); |
| OrderedExecutor executor = inSortedOrder ? null : req.getCore().getCoreContainer().getReplayUpdatesExecutor(); |
| AtomicInteger pendingTasks = new AtomicInteger(0); |
| AtomicReference<SolrException> exceptionOnExecuteUpdate = new AtomicReference<>(); |
| |
| long commitVersion = 0; |
| int operationAndFlags = 0; |
| long nextCount = 0; |
| |
| for (; ; ) { |
| Object o = null; |
| if (cancelApplyBufferUpdate) break; |
| try { |
| if (testing_logReplayHook != null) testing_logReplayHook.run(); |
| if (nextCount++ % 1000 == 0) { |
| long now = System.nanoTime(); |
| if (now - lastStatusTime > STATUS_TIME) { |
| lastStatusTime = now; |
| long cpos = tlogReader.currentPos(); |
| long csize = tlogReader.currentSize(); |
| if (log.isInfoEnabled()) { |
| loglog.info( |
| "log replay status {} active={} starting pos={} current pos={} current size={} % read={}", |
| translog, activeLog, recoveryInfo.positionOfStart, cpos, csize, |
| Math.floor(cpos / (double) csize * 100.)); |
| } |
| |
| } |
| } |
| |
| o = null; |
| o = tlogReader.next(); |
| if (o == null && activeLog) { |
| if (!finishing) { |
| // about to block all the updates including the tasks in the executor |
| // therefore we must wait for them to be finished |
| waitForAllUpdatesGetExecuted(pendingTasks); |
| // from this point, remain updates will be executed in a single thread |
| executor = null; |
| // block to prevent new adds, but don't immediately unlock since |
| // we could be starved from ever completing recovery. Only unlock |
| // after we've finished this recovery. |
| // NOTE: our own updates won't be blocked since the thread holding a write lock can |
| // lock a read lock. |
| versionInfo.blockUpdates(); |
| finishing = true; |
| o = tlogReader.next(); |
| } else { |
| // we had previously blocked updates, so this "null" from the log is final. |
| |
| // Wait until our final commit to change the state and unlock. |
| // This is only so no new updates are written to the current log file, and is |
| // only an issue if we crash before the commit (and we are paying attention |
| // to incomplete log files). |
| // |
| // versionInfo.unblockUpdates(); |
| } |
| } |
| } catch (Exception e) { |
| SolrException.log(log, e); |
| } |
| |
| if (o == null) break; |
| // fail fast |
| if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get(); |
| |
| try { |
| |
| // should currently be a List<Oper,Ver,Doc/Id> |
| @SuppressWarnings({"rawtypes"}) |
| List entry = (List) o; |
| operationAndFlags = (Integer) entry.get(UpdateLog.FLAGS_IDX); |
| int oper = operationAndFlags & OPERATION_MASK; |
| long version = (Long) entry.get(UpdateLog.VERSION_IDX); |
| |
| switch (oper) { |
| case UpdateLog.UPDATE_INPLACE: // fall through to ADD |
| case UpdateLog.ADD: { |
| recoveryInfo.adds++; |
| AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version); |
| cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT); |
| if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd); |
| execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate); |
| break; |
| } |
| case UpdateLog.DELETE: { |
| recoveryInfo.deletes++; |
| byte[] idBytes = (byte[]) entry.get(2); |
| DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); |
| cmd.setIndexedId(new BytesRef(idBytes)); |
| cmd.setVersion(version); |
| cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT); |
| if (debug) log.debug("delete {}", cmd); |
| execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate); |
| break; |
| } |
| |
| case UpdateLog.DELETE_BY_QUERY: { |
| recoveryInfo.deleteByQuery++; |
| String query = (String) entry.get(2); |
| DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); |
| cmd.query = query; |
| cmd.setVersion(version); |
| cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT); |
| if (debug) log.debug("deleteByQuery {}", cmd); |
| waitForAllUpdatesGetExecuted(pendingTasks); |
| // DBQ will be executed in the same thread |
| execute(cmd, null, pendingTasks, proc, exceptionOnExecuteUpdate); |
| break; |
| } |
| case UpdateLog.COMMIT: { |
| commitVersion = version; |
| break; |
| } |
| |
| default: |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); |
| } |
| |
| if (rsp.getException() != null) { |
| loglog.error("REPLAY_ERR: Exception replaying log {}", rsp.getException()); |
| throw rsp.getException(); |
| } |
| if (state == State.REPLAYING) { |
| replayOpsMeter.mark(); |
| } else if (state == State.APPLYING_BUFFERED) { |
| applyingBufferedOpsMeter.mark(); |
| } else { |
| // XXX should not happen? |
| } |
| } catch (ClassCastException cl) { |
| recoveryInfo.errors++; |
| loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log. Entry={}", o, cl); |
| // would be caused by a corrupt transaction log |
| } catch (Exception ex) { |
| recoveryInfo.errors++; |
| loglog.warn("REPLAY_ERR: Exception replaying log", ex); |
| // something wrong with the request? |
| } |
| assert TestInjection.injectUpdateLogReplayRandomPause(); |
| } |
| |
| waitForAllUpdatesGetExecuted(pendingTasks); |
| if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get(); |
| |
| CommitUpdateCommand cmd = new CommitUpdateCommand(req, false); |
| cmd.setVersion(commitVersion); |
| cmd.softCommit = false; |
| cmd.waitSearcher = true; |
| cmd.setFlags(UpdateCommand.REPLAY); |
| try { |
| if (debug) log.debug("commit {}", cmd); |
| uhandler.commit(cmd); // this should cause a commit to be added to the incomplete log and avoid it being replayed again after a restart. |
| } catch (IOException ex) { |
| recoveryInfo.errors++; |
| loglog.error("Replay exception: final commit.", ex); |
| } |
| |
| if (!activeLog) { |
| // if we are replaying an old tlog file, we need to add a commit to the end |
| // so we don't replay it again if we restart right after. |
| translog.writeCommit(cmd); |
| } |
| |
| try { |
| proc.finish(); |
| } catch (IOException ex) { |
| recoveryInfo.errors++; |
| loglog.error("Replay exception: finish()", ex); |
| } finally { |
| IOUtils.closeQuietly(proc); |
| } |
| |
| } finally { |
| if (tlogReader != null) tlogReader.close(); |
| translog.decref(); |
| } |
| } |
| |
| private void waitForAllUpdatesGetExecuted(AtomicInteger pendingTasks) { |
| TimeOut timeOut = new TimeOut(Integer.MAX_VALUE, TimeUnit.MILLISECONDS, TimeSource.CURRENT_TIME); |
| try { |
| timeOut.waitFor("Timeout waiting for replay updates finish", () -> { |
| //TODO handle the case when there are no progress after a long time |
| return pendingTasks.get() == 0; |
| }); |
| } catch (TimeoutException e) { |
| throw new SolrException(ErrorCode.SERVER_ERROR, e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new SolrException(ErrorCode.SERVER_ERROR, e); |
| } |
| |
| } |
| |
| private Integer getBucketHash(UpdateCommand cmd) { |
| if (cmd instanceof AddUpdateCommand) { |
| BytesRef idBytes = ((AddUpdateCommand)cmd).getIndexedId(); |
| if (idBytes == null) return null; |
| return DistributedUpdateProcessor.bucketHash(idBytes); |
| } |
| |
| if (cmd instanceof DeleteUpdateCommand) { |
| BytesRef idBytes = ((DeleteUpdateCommand)cmd).getIndexedId(); |
| if (idBytes == null) return null; |
| return DistributedUpdateProcessor.bucketHash(idBytes); |
| } |
| |
| return null; |
| } |
| |
| private void execute(UpdateCommand cmd, OrderedExecutor executor, |
| AtomicInteger pendingTasks, UpdateRequestProcessor proc, |
| AtomicReference<SolrException> exceptionHolder) { |
| assert cmd instanceof AddUpdateCommand || cmd instanceof DeleteUpdateCommand; |
| |
| if (executor != null) { |
| // by using the same hash as DUP, independent updates can avoid waiting for same bucket |
| executor.execute(getBucketHash(cmd), () -> { |
| try { |
| // fail fast |
| if (exceptionHolder.get() != null) return; |
| if (cmd instanceof AddUpdateCommand) { |
| proc.processAdd((AddUpdateCommand) cmd); |
| } else { |
| proc.processDelete((DeleteUpdateCommand) cmd); |
| } |
| } catch (IOException e) { |
| recoveryInfo.errors++; |
| loglog.warn("REPLAY_ERR: IOException reading log", e); |
| // could be caused by an incomplete flush if recovering from log |
| } catch (SolrException e) { |
| if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) { |
| exceptionHolder.compareAndSet(null, e); |
| return; |
| } |
| recoveryInfo.errors++; |
| loglog.warn("REPLAY_ERR: IOException reading log", e); |
| } finally { |
| pendingTasks.decrementAndGet(); |
| } |
| }); |
| pendingTasks.incrementAndGet(); |
| } else { |
| try { |
| if (cmd instanceof AddUpdateCommand) { |
| proc.processAdd((AddUpdateCommand) cmd); |
| } else { |
| proc.processDelete((DeleteUpdateCommand) cmd); |
| } |
| } catch (IOException e) { |
| recoveryInfo.errors++; |
| loglog.warn("REPLAY_ERR: IOException replaying log", e); |
| // could be caused by an incomplete flush if recovering from log |
| } catch (SolrException e) { |
| if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) { |
| throw e; |
| } |
| recoveryInfo.errors++; |
| loglog.warn("REPLAY_ERR: IOException replaying log", e); |
| } |
| } |
| } |
| |
| |
| } |
| |
| /** |
| * Given a entry from the transaction log containing a document, return a new AddUpdateCommand that |
| * can be applied to ADD the document or do an UPDATE_INPLACE. |
| * |
| * @param req The request to use as the owner of the new AddUpdateCommand |
| * @param entry Entry from the transaction log that contains the document to be added |
| * @param operation The value of the operation flag; this must be either ADD or UPDATE_INPLACE -- |
| * if it is UPDATE_INPLACE then the previous version will also be read from the entry |
| * @param version Version already obtained from the entry. |
| */ |
| public static AddUpdateCommand convertTlogEntryToAddUpdateCommand(SolrQueryRequest req, |
| @SuppressWarnings({"rawtypes"})List entry, |
| int operation, long version) { |
| assert operation == UpdateLog.ADD || operation == UpdateLog.UPDATE_INPLACE; |
| SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size()-1); |
| AddUpdateCommand cmd = new AddUpdateCommand(req); |
| cmd.solrDoc = sdoc; |
| cmd.setVersion(version); |
| |
| if (operation == UPDATE_INPLACE) { |
| long prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX); |
| cmd.prevVersion = prevVersion; |
| } |
| return cmd; |
| } |
| |
| ThreadPoolExecutor recoveryExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, |
| Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), |
| new SolrNamedThreadFactory("recoveryExecutor")); |
| |
| |
| public static void deleteFile(File file) { |
| boolean success = false; |
| try { |
| Files.deleteIfExists(file.toPath()); |
| success = true; |
| } catch (Exception e) { |
| log.error("Error deleting file: {}", file, e); |
| } |
| |
| if (!success) { |
| try { |
| file.deleteOnExit(); |
| } catch (Exception e) { |
| log.error("Error deleting file on exit: {}", file, e); |
| } |
| } |
| } |
| |
| protected String getTlogDir(SolrCore core, PluginInfo info) { |
| String dataDir = (String) info.initArgs.get("dir"); |
| |
| String ulogDir = core.getCoreDescriptor().getUlogDir(); |
| if (ulogDir != null) { |
| dataDir = ulogDir; |
| } |
| |
| if (dataDir == null || dataDir.length() == 0) { |
| dataDir = core.getDataDir(); |
| } |
| |
| return dataDir + "/" + TLOG_NAME; |
| } |
| |
| /** |
| * Clears the logs on the file system. Only call before init. |
| * |
| * @param core the SolrCore |
| * @param ulogPluginInfo the init info for the UpdateHandler |
| */ |
| public void clearLog(SolrCore core, PluginInfo ulogPluginInfo) { |
| if (ulogPluginInfo == null) return; |
| File tlogDir = new File(getTlogDir(core, ulogPluginInfo)); |
| if (tlogDir.exists()) { |
| String[] files = getLogList(tlogDir); |
| for (String file : files) { |
| File f = new File(tlogDir, file); |
| try { |
| Files.delete(f.toPath()); |
| } catch (IOException cause) { |
| // NOTE: still throws SecurityException as before. |
| log.error("Could not remove tlog file:{}", f, cause); |
| } |
| } |
| } |
| } |
| |
| public Long getCurrentMaxVersion() { |
| return maxVersionFromIndex; |
| } |
| |
| // this method is primarily used for unit testing and is not part of the public API for this class |
| Long getMaxVersionFromIndex() { |
| RefCounted<SolrIndexSearcher> newestSearcher = (uhandler != null && uhandler.core != null) |
| ? uhandler.core.getRealtimeSearcher() : null; |
| if (newestSearcher == null) |
| throw new IllegalStateException("No searcher available to lookup max version from index!"); |
| |
| try { |
| seedBucketsWithHighestVersion(newestSearcher.get()); |
| return getCurrentMaxVersion(); |
| } finally { |
| newestSearcher.decref(); |
| } |
| } |
| |
| /** |
| * Used to seed all version buckets with the max value of the version field in the index. |
| */ |
| protected Long seedBucketsWithHighestVersion(SolrIndexSearcher newSearcher, VersionInfo versions) { |
| Long highestVersion = null; |
| final RTimer timer = new RTimer(); |
| |
| try (RecentUpdates recentUpdates = getRecentUpdates()) { |
| long maxVersionFromRecent = recentUpdates.getMaxRecentVersion(); |
| long maxVersionFromIndex = versions.getMaxVersionFromIndex(newSearcher); |
| |
| long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent); |
| if (maxVersion == 0L) { |
| maxVersion = versions.getNewClock(); |
| log.info("Could not find max version in index or recent updates, using new clock {}", maxVersion); |
| } |
| |
| // seed all version buckets with the highest value from recent and index |
| versions.seedBucketsWithHighestVersion(maxVersion); |
| |
| highestVersion = maxVersion; |
| } catch (IOException ioExc) { |
| log.warn("Failed to determine the max value of the version field due to: ", ioExc); |
| } |
| |
| if (debug) { |
| log.debug("Took {}ms to seed version buckets with highest version {}", |
| timer.getTime(), highestVersion); |
| } |
| |
| return highestVersion; |
| } |
| |
| public void seedBucketsWithHighestVersion(SolrIndexSearcher newSearcher) { |
| log.debug("Looking up max value of version field to seed version buckets"); |
| versionInfo.blockUpdates(); |
| try { |
| maxVersionFromIndex = seedBucketsWithHighestVersion(newSearcher, versionInfo); |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| } |
| } |
| |