| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.solr.update; |
| |
| import org.apache.lucene.util.BytesRef; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrException.ErrorCode; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.params.SolrParams; |
| import org.apache.solr.common.params.UpdateParams; |
| import org.apache.solr.common.util.ExecutorUtil; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.core.PluginInfo; |
| import org.apache.solr.core.SolrCore; |
| import org.apache.solr.request.LocalSolrQueryRequest; |
| import org.apache.solr.request.SolrQueryRequest; |
| import org.apache.solr.request.SolrRequestInfo; |
| import org.apache.solr.response.SolrQueryResponse; |
| import org.apache.solr.search.SolrIndexSearcher; |
| import org.apache.solr.update.processor.DistributedUpdateProcessor; |
| import org.apache.solr.update.processor.DistributedUpdateProcessorFactory; |
| import org.apache.solr.update.processor.DistributingUpdateProcessorFactory; |
| import org.apache.solr.update.processor.RunUpdateProcessorFactory; |
| import org.apache.solr.update.processor.UpdateRequestProcessor; |
| import org.apache.solr.update.processor.UpdateRequestProcessorChain; |
| import org.apache.solr.util.DefaultSolrThreadFactory; |
| import org.apache.solr.util.RefCounted; |
| import org.apache.solr.util.plugin.PluginInfoInitialized; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.util.*; |
| import java.util.concurrent.*; |
| |
| import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; |
| import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER; |
| |
| |
| /** @lucene.experimental */ |
| public class UpdateLog implements PluginInfoInitialized { |
| public static String LOG_FILENAME_PATTERN = "%s.%019d"; |
| public static String TLOG_NAME="tlog"; |
| |
| public static Logger log = LoggerFactory.getLogger(UpdateLog.class); |
| public boolean debug = log.isDebugEnabled(); |
| public boolean trace = log.isTraceEnabled(); |
| |
| |
| public enum SyncLevel { NONE, FLUSH, FSYNC; |
| public static SyncLevel getSyncLevel(String level){ |
| if (level == null) { |
| return SyncLevel.FLUSH; |
| } |
| try{ |
| return SyncLevel.valueOf(level.toUpperCase(Locale.ROOT)); |
| } catch(Exception ex){ |
| log.warn("There was an error reading the SyncLevel - default to " + SyncLevel.FLUSH, ex); |
| return SyncLevel.FLUSH; |
| } |
| } |
| } |
| public enum State { REPLAYING, BUFFERING, APPLYING_BUFFERED, ACTIVE } |
| |
| public static final int ADD = 0x01; |
| public static final int DELETE = 0x02; |
| public static final int DELETE_BY_QUERY = 0x03; |
| public static final int COMMIT = 0x04; |
| // Flag indicating that this is a buffered operation, and that a gap exists before buffering started. |
| // for example, if full index replication starts and we are buffering updates, then this flag should |
| // be set to indicate that replaying the log would not bring us into sync (i.e. peersync should |
| // fail if this flag is set on the last update in the tlog). |
| public static final int FLAG_GAP = 0x10; |
| public static final int OPERATION_MASK = 0x0f; // mask off flags to get the operation |
| |
| public static class RecoveryInfo { |
| public long positionOfStart; |
| |
| public int adds; |
| public int deletes; |
| public int deleteByQuery; |
| public int errors; |
| |
| public boolean failed; |
| |
| @Override |
| public String toString() { |
| return "RecoveryInfo{adds="+adds+" deletes="+deletes+ " deleteByQuery="+deleteByQuery+" errors="+errors + " positionOfStart="+positionOfStart+"}"; |
| } |
| } |
| |
| long id = -1; |
| private State state = State.ACTIVE; |
| private int operationFlags; // flags to write in the transaction log with operations (i.e. FLAG_GAP) |
| |
| private TransactionLog tlog; |
| private TransactionLog prevTlog; |
| private Deque<TransactionLog> logs = new LinkedList<TransactionLog>(); // list of recent logs, newest first |
| private LinkedList<TransactionLog> newestLogsOnStartup = new LinkedList<TransactionLog>(); |
| private int numOldRecords; // number of records in the recent logs |
| |
| private Map<BytesRef,LogPtr> map = new HashMap<BytesRef, LogPtr>(); |
| private Map<BytesRef,LogPtr> prevMap; // used while committing/reopening is happening |
| private Map<BytesRef,LogPtr> prevMap2; // used while committing/reopening is happening |
| private TransactionLog prevMapLog; // the transaction log used to look up entries found in prevMap |
| private TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap |
| |
| private final int numDeletesToKeep = 1000; |
| private final int numDeletesByQueryToKeep = 100; |
| public final int numRecordsToKeep = 100; |
| |
| // keep track of deletes only... this is not updated on an add |
| private LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) { |
| @Override |
| protected boolean removeEldestEntry(Map.Entry eldest) { |
| return size() > numDeletesToKeep; |
| } |
| }; |
| |
| public class DBQ { |
| public String q; // the query string |
| public long version; // positive version of the DBQ |
| |
| @Override |
| public String toString() { |
| return "DBQ{version=" + version + ",q="+q+"}"; |
| } |
| } |
| |
| private LinkedList<DBQ> deleteByQueries = new LinkedList<DBQ>(); |
| |
| private String[] tlogFiles; |
| private File tlogDir; |
| private Collection<String> globalStrings; |
| |
| private String dataDir; |
| private String lastDataDir; |
| |
| private VersionInfo versionInfo; |
| |
| private SyncLevel defaultSyncLevel = SyncLevel.FLUSH; |
| |
| volatile UpdateHandler uhandler; // a core reload can change this reference! |
| private volatile boolean cancelApplyBufferUpdate; |
| List<Long> startingVersions; |
| int startingOperation; // last operation in the logs on startup |
| |
| public static class LogPtr { |
| final long pointer; |
| final long version; |
| |
| public LogPtr(long pointer, long version) { |
| this.pointer = pointer; |
| this.version = version; |
| } |
| |
| @Override |
| public String toString() { |
| return "LogPtr(" + pointer + ")"; |
| } |
| } |
| |
| |
| public VersionInfo getVersionInfo() { |
| return versionInfo; |
| } |
| |
| @Override |
| public void init(PluginInfo info) { |
| dataDir = (String)info.initArgs.get("dir"); |
| defaultSyncLevel = SyncLevel.getSyncLevel((String)info.initArgs.get("syncLevel")); |
| } |
| |
| public void init(UpdateHandler uhandler, SolrCore core) { |
| // ulogDir from CoreDescriptor overrides |
| String ulogDir = core.getCoreDescriptor().getUlogDir(); |
| if (ulogDir != null) { |
| dataDir = ulogDir; |
| } |
| |
| if (dataDir == null || dataDir.length()==0) { |
| dataDir = core.getDataDir(); |
| } |
| |
| this.uhandler = uhandler; |
| |
| if (dataDir.equals(lastDataDir)) { |
| if (debug) { |
| log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", next id=" + id, " this is a reopen... nothing else to do."); |
| } |
| |
| versionInfo.reload(); |
| |
| // on a normal reopen, we currently shouldn't have to do anything |
| return; |
| } |
| lastDataDir = dataDir; |
| tlogDir = new File(dataDir, TLOG_NAME); |
| tlogDir.mkdirs(); |
| tlogFiles = getLogList(tlogDir); |
| id = getLastLogId() + 1; // add 1 since we will create a new log for the next update |
| |
| if (debug) { |
| log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id); |
| } |
| |
| TransactionLog oldLog = null; |
| for (String oldLogName : tlogFiles) { |
| File f = new File(tlogDir, oldLogName); |
| try { |
| oldLog = new TransactionLog( f, null, true ); |
| addOldLog(oldLog, false); // don't remove old logs on startup since more than one may be uncapped. |
| } catch (Exception e) { |
| SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e); |
| deleteFile(f); |
| } |
| } |
| |
| // Record first two logs (oldest first) at startup for potential tlog recovery. |
| // It's possible that at abnormal shutdown both "tlog" and "prevTlog" were uncapped. |
| for (TransactionLog ll : logs) { |
| newestLogsOnStartup.addFirst(ll); |
| if (newestLogsOnStartup.size() >= 2) break; |
| } |
| |
| try { |
| versionInfo = new VersionInfo(this, 256); |
| } catch (SolrException e) { |
| log.error("Unable to use updateLog: " + e.getMessage(), e); |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, |
| "Unable to use updateLog: " + e.getMessage(), e); |
| } |
| |
| // TODO: these startingVersions assume that we successfully recover from all non-complete tlogs. |
| UpdateLog.RecentUpdates startingUpdates = getRecentUpdates(); |
| try { |
| startingVersions = startingUpdates.getVersions(numRecordsToKeep); |
| startingOperation = startingUpdates.getLatestOperation(); |
| |
| // populate recent deletes list (since we can't get that info from the index) |
| for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) { |
| DeleteUpdate du = startingUpdates.deleteList.get(i); |
| oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version)); |
| } |
| |
| // populate recent deleteByQuery commands |
| for (int i=startingUpdates.deleteByQueryList.size()-1; i>=0; i--) { |
| Update update = startingUpdates.deleteByQueryList.get(i); |
| List<Object> dbq = (List<Object>) update.log.lookup(update.pointer); |
| long version = (Long) dbq.get(1); |
| String q = (String) dbq.get(2); |
| trackDeleteByQuery(q, version); |
| } |
| |
| } finally { |
| startingUpdates.close(); |
| } |
| |
| } |
| |
| public File getLogDir() { |
| return tlogDir; |
| } |
| |
| public List<Long> getStartingVersions() { |
| return startingVersions; |
| } |
| |
| public int getStartingOperation() { |
| return startingOperation; |
| } |
| |
| /* Takes over ownership of the log, keeping it until no longer needed |
| and then decrementing it's reference and dropping it. |
| */ |
| private void addOldLog(TransactionLog oldLog, boolean removeOld) { |
| if (oldLog == null) return; |
| |
| numOldRecords += oldLog.numRecords(); |
| |
| int currRecords = numOldRecords; |
| |
| if (oldLog != tlog && tlog != null) { |
| currRecords += tlog.numRecords(); |
| } |
| |
| while (removeOld && logs.size() > 0) { |
| TransactionLog log = logs.peekLast(); |
| int nrec = log.numRecords(); |
| // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if |
| // we already have the limit of 10 log files. |
| if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) { |
| currRecords -= nrec; |
| numOldRecords -= nrec; |
| logs.removeLast().decref(); // dereference so it will be deleted when no longer in use |
| continue; |
| } |
| |
| break; |
| } |
| |
| // don't incref... we are taking ownership from the caller. |
| logs.addFirst(oldLog); |
| } |
| |
| |
| public static String[] getLogList(File directory) { |
| final String prefix = TLOG_NAME+'.'; |
| String[] names = directory.list(new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return name.startsWith(prefix); |
| } |
| }); |
| Arrays.sort(names); |
| return names; |
| } |
| |
| |
| public long getLastLogId() { |
| if (id != -1) return id; |
| if (tlogFiles.length == 0) return -1; |
| String last = tlogFiles[tlogFiles.length-1]; |
| return Long.parseLong(last.substring(TLOG_NAME.length()+1)); |
| } |
| |
| |
| public void add(AddUpdateCommand cmd) { |
| add(cmd, false); |
| } |
| |
| |
| public void add(AddUpdateCommand cmd, boolean clearCaches) { |
| // don't log if we are replaying from another log |
| // TODO: we currently need to log to maintain correct versioning, rtg, etc |
| // if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return; |
| |
| synchronized (this) { |
| long pos = -1; |
| |
| // don't log if we are replaying from another log |
| if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { |
| ensureLog(); |
| pos = tlog.write(cmd, operationFlags); |
| } |
| |
| if (!clearCaches) { |
| // TODO: in the future we could support a real position for a REPLAY update. |
| // Only currently would be useful for RTG while in recovery mode though. |
| LogPtr ptr = new LogPtr(pos, cmd.getVersion()); |
| |
| // only update our map if we're not buffering |
| if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) { |
| map.put(cmd.getIndexedId(), ptr); |
| } |
| |
| if (trace) { |
| log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); |
| } |
| |
| } else { |
| // replicate the deleteByQuery logic. See deleteByQuery for comments. |
| |
| if (map != null) map.clear(); |
| if (prevMap != null) prevMap.clear(); |
| if (prevMap2 != null) prevMap2.clear(); |
| |
| try { |
| RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true); |
| holder.decref(); |
| } catch (Throwable e) { |
| SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e); |
| } |
| |
| if (trace) { |
| log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " clearCaches=true"); |
| } |
| |
| } |
| |
| } |
| } |
| |
| |
| |
| public void delete(DeleteUpdateCommand cmd) { |
| BytesRef br = cmd.getIndexedId(); |
| |
| synchronized (this) { |
| long pos = -1; |
| |
| // don't log if we are replaying from another log |
| if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { |
| ensureLog(); |
| pos = tlog.writeDelete(cmd, operationFlags); |
| } |
| |
| LogPtr ptr = new LogPtr(pos, cmd.version); |
| |
| // only update our map if we're not buffering |
| if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) { |
| map.put(br, ptr); |
| |
| oldDeletes.put(br, ptr); |
| } |
| |
| if (trace) { |
| log.trace("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); |
| } |
| } |
| } |
| |
| public void deleteByQuery(DeleteUpdateCommand cmd) { |
| synchronized (this) { |
| long pos = -1; |
| // don't log if we are replaying from another log |
| if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { |
| ensureLog(); |
| pos = tlog.writeDeleteByQuery(cmd, operationFlags); |
| } |
| |
| // only change our caches if we are not buffering |
| if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) { |
| // given that we just did a delete-by-query, we don't know what documents were |
| // affected and hence we must purge our caches. |
| if (map != null) map.clear(); |
| if (prevMap != null) prevMap.clear(); |
| if (prevMap2 != null) prevMap2.clear(); |
| |
| trackDeleteByQuery(cmd.getQuery(), cmd.getVersion()); |
| |
| // oldDeletes.clear(); |
| |
| // We must cause a new IndexReader to be opened before anything looks at these caches again |
| // so that a cache miss will read fresh data. |
| // |
| // TODO: FUTURE: open a new searcher lazily for better throughput with delete-by-query commands |
| try { |
| RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true); |
| holder.decref(); |
| } catch (Throwable e) { |
| SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e); |
| } |
| |
| } |
| |
| LogPtr ptr = new LogPtr(pos, cmd.getVersion()); |
| |
| if (trace) { |
| log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); |
| } |
| } |
| } |
| |
| /** currently for testing only */ |
| public void deleteAll() { |
| synchronized (this) { |
| |
| try { |
| RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true); |
| holder.decref(); |
| } catch (Throwable e) { |
| SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e); |
| } |
| |
| if (map != null) map.clear(); |
| if (prevMap != null) prevMap.clear(); |
| if (prevMap2 != null) prevMap2.clear(); |
| |
| oldDeletes.clear(); |
| deleteByQueries.clear(); |
| } |
| } |
| |
| |
| void trackDeleteByQuery(String q, long version) { |
| version = Math.abs(version); |
| DBQ dbq = new DBQ(); |
| dbq.q = q; |
| dbq.version = version; |
| |
| synchronized (this) { |
| if (deleteByQueries.isEmpty() || deleteByQueries.getFirst().version < version) { |
| // common non-reordered case |
| deleteByQueries.addFirst(dbq); |
| } else { |
| // find correct insertion point |
| ListIterator<DBQ> iter = deleteByQueries.listIterator(); |
| iter.next(); // we already checked the first element in the previous "if" clause |
| while (iter.hasNext()) { |
| DBQ oldDBQ = iter.next(); |
| if (oldDBQ.version < version) { |
| iter.previous(); |
| break; |
| } else if (oldDBQ.version == version && oldDBQ.q.equals(q)) { |
| // a duplicate |
| return; |
| } |
| } |
| iter.add(dbq); // this also handles the case of adding at the end when hasNext() == false |
| } |
| |
| if (deleteByQueries.size() > numDeletesByQueryToKeep) { |
| deleteByQueries.removeLast(); |
| } |
| } |
| } |
| |
| public List<DBQ> getDBQNewer(long version) { |
| synchronized (this) { |
| if (deleteByQueries.isEmpty() || deleteByQueries.getFirst().version < version) { |
| // fast common case |
| return null; |
| } |
| |
| List<DBQ> dbqList = new ArrayList<DBQ>(); |
| for (DBQ dbq : deleteByQueries) { |
| if (dbq.version <= version) break; |
| dbqList.add(dbq); |
| } |
| return dbqList; |
| } |
| } |
| |
| private void newMap() { |
| prevMap2 = prevMap; |
| prevMapLog2 = prevMapLog; |
| |
| prevMap = map; |
| prevMapLog = tlog; |
| |
| map = new HashMap<BytesRef, LogPtr>(); |
| } |
| |
| private void clearOldMaps() { |
| prevMap = null; |
| prevMap2 = null; |
| } |
| |
| public boolean hasUncommittedChanges() { |
| return tlog != null; |
| } |
| |
| public void preCommit(CommitUpdateCommand cmd) { |
| synchronized (this) { |
| if (debug) { |
| log.debug("TLOG: preCommit"); |
| } |
| |
| if (getState() != State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { |
| // if we aren't in the active state, and this isn't a replay |
| // from the recovery process, then we shouldn't mess with |
| // the current transaction log. This normally shouldn't happen |
| // as DistributedUpdateProcessor will prevent this. Commits |
| // that don't use the processor are possible though. |
| return; |
| } |
| |
| // since we're changing the log, we must change the map. |
| newMap(); |
| |
| if (prevTlog != null) { |
| globalStrings = prevTlog.getGlobalStrings(); |
| } |
| |
| // since document additions can happen concurrently with commit, create |
| // a new transaction log first so that we know the old one is definitely |
| // in the index. |
| prevTlog = tlog; |
| tlog = null; |
| id++; |
| } |
| } |
| |
| public void postCommit(CommitUpdateCommand cmd) { |
| synchronized (this) { |
| if (debug) { |
| log.debug("TLOG: postCommit"); |
| } |
| if (prevTlog != null) { |
| // if we made it through the commit, write a commit command to the log |
| // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup. |
| prevTlog.writeCommit(cmd, operationFlags); |
| |
| addOldLog(prevTlog, true); |
| // the old log list will decref when no longer needed |
| // prevTlog.decref(); |
| prevTlog = null; |
| } |
| } |
| } |
| |
| public void preSoftCommit(CommitUpdateCommand cmd) { |
| debug = log.isDebugEnabled(); // refresh our view of debugging occasionally |
| trace = log.isTraceEnabled(); |
| |
| synchronized (this) { |
| |
| if (!cmd.softCommit) return; // already handled this at the start of the hard commit |
| newMap(); |
| |
| // start adding documents to a new map since we won't know if |
| // any added documents will make it into this commit or not. |
| // But we do know that any updates already added will definitely |
| // show up in the latest reader after the commit succeeds. |
| map = new HashMap<BytesRef, LogPtr>(); |
| |
| if (debug) { |
| log.debug("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map)); |
| } |
| } |
| } |
| |
| public void postSoftCommit(CommitUpdateCommand cmd) { |
| synchronized (this) { |
| // We can clear out all old maps now that a new searcher has been opened. |
| // This currently only works since DUH2 synchronizes around preCommit to avoid |
| // it being called in the middle of a preSoftCommit, postSoftCommit sequence. |
| // If this DUH2 synchronization were to be removed, preSoftCommit should |
| // record what old maps were created and only remove those. |
| |
| if (debug) { |
| SolrCore.verbose("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap) + ", prevMap2=" + System.identityHashCode(prevMap2)); |
| } |
| clearOldMaps(); |
| } |
| } |
| |
| public Object lookup(BytesRef indexedId) { |
| LogPtr entry; |
| TransactionLog lookupLog; |
| |
| synchronized (this) { |
| entry = map.get(indexedId); |
| lookupLog = tlog; // something found in "map" will always be in "tlog" |
| // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| if (entry == null && prevMap != null) { |
| entry = prevMap.get(indexedId); |
| // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog) |
| lookupLog = prevMapLog; |
| // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| } |
| if (entry == null && prevMap2 != null) { |
| entry = prevMap2.get(indexedId); |
| // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog) |
| lookupLog = prevMapLog2; |
| // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| } |
| |
| if (entry == null) { |
| return null; |
| } |
| lookupLog.incref(); |
| } |
| |
| try { |
| // now do the lookup outside of the sync block for concurrency |
| return lookupLog.lookup(entry.pointer); |
| } finally { |
| lookupLog.decref(); |
| } |
| |
| } |
| |
| // This method works like realtime-get... it only guarantees to return the latest |
| // version of the *completed* update. There can be updates in progress concurrently |
| // that have already grabbed higher version numbers. Higher level coordination or |
| // synchronization is needed for stronger guarantees (as VersionUpdateProcessor does). |
| public Long lookupVersion(BytesRef indexedId) { |
| LogPtr entry; |
| TransactionLog lookupLog; |
| |
| synchronized (this) { |
| entry = map.get(indexedId); |
| lookupLog = tlog; // something found in "map" will always be in "tlog" |
| // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| if (entry == null && prevMap != null) { |
| entry = prevMap.get(indexedId); |
| // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog) |
| lookupLog = prevMapLog; |
| // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| } |
| if (entry == null && prevMap2 != null) { |
| entry = prevMap2.get(indexedId); |
| // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog) |
| lookupLog = prevMapLog2; |
| // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog); |
| } |
| } |
| |
| if (entry != null) { |
| return entry.version; |
| } |
| |
| // Now check real index |
| Long version = versionInfo.getVersionFromIndex(indexedId); |
| |
| if (version != null) { |
| return version; |
| } |
| |
| // We can't get any version info for deletes from the index, so if the doc |
| // wasn't found, check a cache of recent deletes. |
| |
| synchronized (this) { |
| entry = oldDeletes.get(indexedId); |
| } |
| |
| if (entry != null) { |
| return entry.version; |
| } |
| |
| return null; |
| } |
| |
| public void finish(SyncLevel syncLevel) { |
| if (syncLevel == null) { |
| syncLevel = defaultSyncLevel; |
| } |
| if (syncLevel == SyncLevel.NONE) { |
| return; |
| } |
| |
| TransactionLog currLog; |
| synchronized (this) { |
| currLog = tlog; |
| if (currLog == null) return; |
| currLog.incref(); |
| } |
| |
| try { |
| currLog.finish(syncLevel); |
| } finally { |
| currLog.decref(); |
| } |
| } |
| |
| |
| public Future<RecoveryInfo> recoverFromLog() { |
| recoveryInfo = new RecoveryInfo(); |
| |
| List<TransactionLog> recoverLogs = new ArrayList<TransactionLog>(1); |
| for (TransactionLog ll : newestLogsOnStartup) { |
| if (!ll.try_incref()) continue; |
| |
| try { |
| if (ll.endsWithCommit()) { |
| ll.decref(); |
| continue; |
| } |
| } catch (IOException e) { |
| log.error("Error inspecting tlog " + ll); |
| ll.decref(); |
| continue; |
| } |
| |
| recoverLogs.add(ll); |
| } |
| |
| if (recoverLogs.isEmpty()) return null; |
| |
| ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor); |
| LogReplayer replayer = new LogReplayer(recoverLogs, false); |
| |
| versionInfo.blockUpdates(); |
| try { |
| state = State.REPLAYING; |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| |
| // At this point, we are guaranteed that any new updates coming in will see the state as "replaying" |
| |
| return cs.submit(replayer, recoveryInfo); |
| } |
| |
| |
| private void ensureLog() { |
| if (tlog == null) { |
| String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, TLOG_NAME, id); |
| tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings); |
| } |
| } |
| |
| |
| private void doClose(TransactionLog theLog, boolean writeCommit) { |
| if (theLog != null) { |
| if (writeCommit) { |
| // record a commit |
| log.info("Recording current closed for " + uhandler.core + " log=" + theLog); |
| CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false); |
| theLog.writeCommit(cmd, operationFlags); |
| } |
| |
| theLog.deleteOnClose = false; |
| theLog.decref(); |
| theLog.forceClose(); |
| } |
| } |
| |
| public void close(boolean committed) { |
| close(committed, false); |
| } |
| |
| public void close(boolean committed, boolean deleteOnClose) { |
| synchronized (this) { |
| try { |
| ExecutorUtil.shutdownNowAndAwaitTermination(recoveryExecutor); |
| } catch (Throwable e) { |
| SolrException.log(log, e); |
| } |
| |
| // Don't delete the old tlogs, we want to be able to replay from them and retrieve old versions |
| |
| doClose(prevTlog, committed); |
| doClose(tlog, committed); |
| |
| for (TransactionLog log : logs) { |
| if (log == prevTlog || log == tlog) continue; |
| log.deleteOnClose = false; |
| log.decref(); |
| log.forceClose(); |
| } |
| |
| } |
| } |
| |
| |
| static class Update { |
| TransactionLog log; |
| long version; |
| long pointer; |
| } |
| |
| static class DeleteUpdate { |
| long version; |
| byte[] id; |
| |
| public DeleteUpdate(long version, byte[] id) { |
| this.version = version; |
| this.id = id; |
| } |
| } |
| |
| public class RecentUpdates { |
| Deque<TransactionLog> logList; // newest first |
| List<List<Update>> updateList; |
| HashMap<Long, Update> updates; |
| List<Update> deleteByQueryList; |
| List<DeleteUpdate> deleteList; |
| int latestOperation; |
| |
| public List<Long> getVersions(int n) { |
| List<Long> ret = new ArrayList(n); |
| |
| for (List<Update> singleList : updateList) { |
| for (Update ptr : singleList) { |
| ret.add(ptr.version); |
| if (--n <= 0) return ret; |
| } |
| } |
| |
| return ret; |
| } |
| |
| public Object lookup(long version) { |
| Update update = updates.get(version); |
| if (update == null) return null; |
| |
| return update.log.lookup(update.pointer); |
| } |
| |
| /** Returns the list of deleteByQueries that happened after the given version */ |
| public List<Object> getDeleteByQuery(long afterVersion) { |
| List<Object> result = new ArrayList<Object>(deleteByQueryList.size()); |
| for (Update update : deleteByQueryList) { |
| if (Math.abs(update.version) > afterVersion) { |
| Object dbq = update.log.lookup(update.pointer); |
| result.add(dbq); |
| } |
| } |
| return result; |
| } |
| |
| public int getLatestOperation() { |
| return latestOperation; |
| } |
| |
| |
| private void update() { |
| int numUpdates = 0; |
| updateList = new ArrayList<List<Update>>(logList.size()); |
| deleteByQueryList = new ArrayList<Update>(); |
| deleteList = new ArrayList<DeleteUpdate>(); |
| updates = new HashMap<Long,Update>(numRecordsToKeep); |
| |
| for (TransactionLog oldLog : logList) { |
| List<Update> updatesForLog = new ArrayList<Update>(); |
| |
| TransactionLog.ReverseReader reader = null; |
| try { |
| reader = oldLog.getReverseReader(); |
| |
| while (numUpdates < numRecordsToKeep) { |
| Object o = null; |
| try { |
| o = reader.next(); |
| if (o==null) break; |
| |
| // should currently be a List<Oper,Ver,Doc/Id> |
| List entry = (List)o; |
| |
| // TODO: refactor this out so we get common error handling |
| int opAndFlags = (Integer)entry.get(0); |
| if (latestOperation == 0) { |
| latestOperation = opAndFlags; |
| } |
| int oper = opAndFlags & UpdateLog.OPERATION_MASK; |
| long version = (Long) entry.get(1); |
| |
| switch (oper) { |
| case UpdateLog.ADD: |
| case UpdateLog.DELETE: |
| case UpdateLog.DELETE_BY_QUERY: |
| Update update = new Update(); |
| update.log = oldLog; |
| update.pointer = reader.position(); |
| update.version = version; |
| |
| updatesForLog.add(update); |
| updates.put(version, update); |
| |
| if (oper == UpdateLog.DELETE_BY_QUERY) { |
| deleteByQueryList.add(update); |
| } else if (oper == UpdateLog.DELETE) { |
| deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2))); |
| } |
| |
| break; |
| |
| case UpdateLog.COMMIT: |
| break; |
| default: |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); |
| } |
| } catch (ClassCastException cl) { |
| log.warn("Unexpected log entry or corrupt log. Entry=" + o, cl); |
| // would be caused by a corrupt transaction log |
| } catch (Exception ex) { |
| log.warn("Exception reverse reading log", ex); |
| break; |
| } |
| } |
| |
| } catch (IOException e) { |
| // failure to read a log record isn't fatal |
| log.error("Exception reading versions from log",e); |
| } finally { |
| if (reader != null) reader.close(); |
| } |
| |
| updateList.add(updatesForLog); |
| } |
| |
| } |
| |
| public void close() { |
| for (TransactionLog log : logList) { |
| log.decref(); |
| } |
| } |
| } |
| |
| /** The RecentUpdates object returned must be closed after use */ |
| public RecentUpdates getRecentUpdates() { |
| Deque<TransactionLog> logList; |
| synchronized (this) { |
| logList = new LinkedList<TransactionLog>(logs); |
| for (TransactionLog log : logList) { |
| log.incref(); |
| } |
| if (prevTlog != null) { |
| prevTlog.incref(); |
| logList.addFirst(prevTlog); |
| } |
| if (tlog != null) { |
| tlog.incref(); |
| logList.addFirst(tlog); |
| } |
| } |
| |
| // TODO: what if I hand out a list of updates, then do an update, then hand out another list (and |
| // one of the updates I originally handed out fell off the list). Over-request? |
| |
| boolean success = false; |
| RecentUpdates recentUpdates = null; |
| try { |
| recentUpdates = new RecentUpdates(); |
| recentUpdates.logList = logList; |
| recentUpdates.update(); |
| success = true; |
| } finally { |
| // defensive: if some unknown exception is thrown, |
| // make sure we close so that the tlogs are decref'd |
| if (!success && recentUpdates != null) { |
| recentUpdates.close(); |
| } |
| } |
| |
| return recentUpdates; |
| } |
| |
| public void bufferUpdates() { |
| // recovery trips this assert under some race - even when |
| // it checks the state first |
| // assert state == State.ACTIVE; |
| |
| recoveryInfo = new RecoveryInfo(); |
| |
| // block all updates to eliminate race conditions |
| // reading state and acting on it in the update processor |
| versionInfo.blockUpdates(); |
| try { |
| if (state != State.ACTIVE) return; |
| |
| if (log.isInfoEnabled()) { |
| log.info("Starting to buffer updates. " + this); |
| } |
| |
| // since we blocked updates, this synchronization shouldn't strictly be necessary. |
| synchronized (this) { |
| recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot(); |
| } |
| |
| state = State.BUFFERING; |
| |
| // currently, buffering is only called by recovery, meaning that there is most likely a gap in updates |
| operationFlags |= FLAG_GAP; |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| } |
| |
| /** Returns true if we were able to drop buffered updates and return to the ACTIVE state */ |
| public boolean dropBufferedUpdates() { |
| versionInfo.blockUpdates(); |
| try { |
| if (state != State.BUFFERING) return false; |
| |
| if (log.isInfoEnabled()) { |
| log.info("Dropping buffered updates " + this); |
| } |
| |
| // since we blocked updates, this synchronization shouldn't strictly be necessary. |
| synchronized (this) { |
| if (tlog != null) { |
| tlog.rollback(recoveryInfo.positionOfStart); |
| } |
| } |
| |
| state = State.ACTIVE; |
| operationFlags &= ~FLAG_GAP; |
| } catch (IOException e) { |
| SolrException.log(log,"Error attempting to roll back log", e); |
| return false; |
| } |
| finally { |
| versionInfo.unblockUpdates(); |
| } |
| return true; |
| } |
| |
| |
| /** Returns the Future to wait on, or null if no replay was needed */ |
| public Future<RecoveryInfo> applyBufferedUpdates() { |
| // recovery trips this assert under some race - even when |
| // it checks the state first |
| // assert state == State.BUFFERING; |
| |
| // block all updates to eliminate race conditions |
| // reading state and acting on it in the update processor |
| versionInfo.blockUpdates(); |
| try { |
| cancelApplyBufferUpdate = false; |
| if (state != State.BUFFERING) return null; |
| operationFlags &= ~FLAG_GAP; |
| |
| // handle case when no log was even created because no updates |
| // were received. |
| if (tlog == null) { |
| state = State.ACTIVE; |
| return null; |
| } |
| tlog.incref(); |
| state = State.APPLYING_BUFFERED; |
| } finally { |
| versionInfo.unblockUpdates(); |
| } |
| |
| if (recoveryExecutor.isShutdown()) { |
| tlog.decref(); |
| throw new RuntimeException("executor is not running..."); |
| } |
| ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor); |
| LogReplayer replayer = new LogReplayer(Arrays.asList(new TransactionLog[]{tlog}), true); |
| return cs.submit(replayer, recoveryInfo); |
| } |
| |
| public State getState() { |
| return state; |
| } |
| |
| @Override |
| public String toString() { |
| return "FSUpdateLog{state="+getState()+", tlog="+tlog+"}"; |
| } |
| |
| |
| public static Runnable testing_logReplayHook; // called before each log read |
| public static Runnable testing_logReplayFinishHook; // called when log replay has finished |
| |
| |
| |
| private RecoveryInfo recoveryInfo; |
| |
| class LogReplayer implements Runnable { |
| private Logger loglog = log; // set to something different? |
| |
| Deque<TransactionLog> translogs; |
| TransactionLog.LogReader tlogReader; |
| boolean activeLog; |
| boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked |
| boolean debug = loglog.isDebugEnabled(); |
| |
| public LogReplayer(List<TransactionLog> translogs, boolean activeLog) { |
| this.translogs = new LinkedList<TransactionLog>(); |
| this.translogs.addAll(translogs); |
| this.activeLog = activeLog; |
| } |
| |
| |
| |
| private SolrQueryRequest req; |
| private SolrQueryResponse rsp; |
| |
| |
| @Override |
| public void run() { |
| ModifiableSolrParams params = new ModifiableSolrParams(); |
| params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString()); |
| params.set(DistributedUpdateProcessor.LOG_REPLAY, "true"); |
| req = new LocalSolrQueryRequest(uhandler.core, params); |
| rsp = new SolrQueryResponse(); |
| SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // setting request info will help logging |
| |
| try { |
| for(;;) { |
| TransactionLog translog = translogs.pollFirst(); |
| if (translog == null) break; |
| doReplay(translog); |
| } |
| } catch (SolrException e) { |
| if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) { |
| SolrException.log(log, e); |
| recoveryInfo.failed = true; |
| } else { |
| recoveryInfo.errors++; |
| SolrException.log(log, e); |
| } |
| } catch (Throwable e) { |
| recoveryInfo.errors++; |
| SolrException.log(log, e); |
| } finally { |
| // change the state while updates are still blocked to prevent races |
| state = State.ACTIVE; |
| if (finishing) { |
| versionInfo.unblockUpdates(); |
| } |
| |
| // clean up in case we hit some unexpected exception and didn't get |
| // to more transaction logs |
| for (TransactionLog translog : translogs) { |
| log.error("ERROR: didn't get to recover from tlog " + translog); |
| translog.decref(); |
| } |
| } |
| |
| loglog.warn("Log replay finished. recoveryInfo=" + recoveryInfo); |
| |
| if (testing_logReplayFinishHook != null) testing_logReplayFinishHook.run(); |
| |
| SolrRequestInfo.clearRequestInfo(); |
| } |
| |
| |
| public void doReplay(TransactionLog translog) { |
| try { |
| loglog.warn("Starting log replay " + translog + " active="+activeLog + " starting pos=" + recoveryInfo.positionOfStart); |
| |
| tlogReader = translog.getReader(recoveryInfo.positionOfStart); |
| |
| // NOTE: we don't currently handle a core reload during recovery. This would cause the core |
| // to change underneath us. |
| |
| UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null); |
| UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp); |
| |
| long commitVersion = 0; |
| int operationAndFlags = 0; |
| |
| for(;;) { |
| Object o = null; |
| if (cancelApplyBufferUpdate) break; |
| try { |
| if (testing_logReplayHook != null) testing_logReplayHook.run(); |
| o = null; |
| o = tlogReader.next(); |
| if (o == null && activeLog) { |
| if (!finishing) { |
| // block to prevent new adds, but don't immediately unlock since |
| // we could be starved from ever completing recovery. Only unlock |
| // after we've finished this recovery. |
| // NOTE: our own updates won't be blocked since the thread holding a write lock can |
| // lock a read lock. |
| versionInfo.blockUpdates(); |
| finishing = true; |
| o = tlogReader.next(); |
| } else { |
| // we had previously blocked updates, so this "null" from the log is final. |
| |
| // Wait until our final commit to change the state and unlock. |
| // This is only so no new updates are written to the current log file, and is |
| // only an issue if we crash before the commit (and we are paying attention |
| // to incomplete log files). |
| // |
| // versionInfo.unblockUpdates(); |
| } |
| } |
| } catch (InterruptedException e) { |
| SolrException.log(log,e); |
| } catch (IOException e) { |
| SolrException.log(log,e); |
| } catch (Throwable e) { |
| SolrException.log(log,e); |
| } |
| |
| if (o == null) break; |
| |
| try { |
| |
| // should currently be a List<Oper,Ver,Doc/Id> |
| List entry = (List)o; |
| |
| operationAndFlags = (Integer)entry.get(0); |
| int oper = operationAndFlags & OPERATION_MASK; |
| long version = (Long) entry.get(1); |
| |
| switch (oper) { |
| case UpdateLog.ADD: |
| { |
| recoveryInfo.adds++; |
| // byte[] idBytes = (byte[]) entry.get(2); |
| SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1); |
| AddUpdateCommand cmd = new AddUpdateCommand(req); |
| // cmd.setIndexedId(new BytesRef(idBytes)); |
| cmd.solrDoc = sdoc; |
| cmd.setVersion(version); |
| cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT); |
| if (debug) log.debug("add " + cmd); |
| |
| proc.processAdd(cmd); |
| break; |
| } |
| case UpdateLog.DELETE: |
| { |
| recoveryInfo.deletes++; |
| byte[] idBytes = (byte[]) entry.get(2); |
| DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); |
| cmd.setIndexedId(new BytesRef(idBytes)); |
| cmd.setVersion(version); |
| cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT); |
| if (debug) log.debug("delete " + cmd); |
| proc.processDelete(cmd); |
| break; |
| } |
| |
| case UpdateLog.DELETE_BY_QUERY: |
| { |
| recoveryInfo.deleteByQuery++; |
| String query = (String)entry.get(2); |
| DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); |
| cmd.query = query; |
| cmd.setVersion(version); |
| cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT); |
| if (debug) log.debug("deleteByQuery " + cmd); |
| proc.processDelete(cmd); |
| break; |
| } |
| |
| case UpdateLog.COMMIT: |
| { |
| commitVersion = version; |
| break; |
| } |
| |
| default: |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); |
| } |
| |
| if (rsp.getException() != null) { |
| loglog.error("REPLAY_ERR: Exception replaying log", rsp.getException()); |
| throw rsp.getException(); |
| } |
| } catch (IOException ex) { |
| recoveryInfo.errors++; |
| loglog.warn("REYPLAY_ERR: IOException reading log", ex); |
| // could be caused by an incomplete flush if recovering from log |
| } catch (ClassCastException cl) { |
| recoveryInfo.errors++; |
| loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log. Entry=" + o, cl); |
| // would be caused by a corrupt transaction log |
| } catch (SolrException ex) { |
| if (ex.code() == ErrorCode.SERVICE_UNAVAILABLE.code) { |
| throw ex; |
| } |
| recoveryInfo.errors++; |
| loglog.warn("REYPLAY_ERR: IOException reading log", ex); |
| // could be caused by an incomplete flush if recovering from log |
| } catch (Throwable ex) { |
| recoveryInfo.errors++; |
| loglog.warn("REPLAY_ERR: Exception replaying log", ex); |
| // something wrong with the request? |
| } |
| } |
| |
| CommitUpdateCommand cmd = new CommitUpdateCommand(req, false); |
| cmd.setVersion(commitVersion); |
| cmd.softCommit = false; |
| cmd.waitSearcher = true; |
| cmd.setFlags(UpdateCommand.REPLAY); |
| try { |
| if (debug) log.debug("commit " + cmd); |
| uhandler.commit(cmd); // this should cause a commit to be added to the incomplete log and avoid it being replayed again after a restart. |
| } catch (IOException ex) { |
| recoveryInfo.errors++; |
| loglog.error("Replay exception: final commit.", ex); |
| } |
| |
| if (!activeLog) { |
| // if we are replaying an old tlog file, we need to add a commit to the end |
| // so we don't replay it again if we restart right after. |
| |
| // if the last operation we replayed had FLAG_GAP set, we want to use that again so we don't lose it |
| // as the flag on the last operation. |
| translog.writeCommit(cmd, operationFlags | (operationAndFlags & ~OPERATION_MASK)); |
| } |
| |
| try { |
| proc.finish(); |
| } catch (IOException ex) { |
| recoveryInfo.errors++; |
| loglog.error("Replay exception: finish()", ex); |
| } |
| |
| } finally { |
| if (tlogReader != null) tlogReader.close(); |
| translog.decref(); |
| } |
| } |
| } |
| |
| public void cancelApplyBufferedUpdates() { |
| this.cancelApplyBufferUpdate = true; |
| } |
| |
| ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0, |
| Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), |
| new DefaultSolrThreadFactory("recoveryExecutor")); |
| |
| |
| public static void deleteFile(File file) { |
| boolean success = false; |
| try { |
| success = file.delete(); |
| if (!success) { |
| log.error("Error deleting file: " + file); |
| } |
| } catch (Exception e) { |
| log.error("Error deleting file: " + file, e); |
| } |
| |
| if (!success) { |
| try { |
| file.deleteOnExit(); |
| } catch (Exception e) { |
| log.error("Error deleting file on exit: " + file, e); |
| } |
| } |
| } |
| |
| public static File getTlogDir(SolrCore core, PluginInfo info) { |
| String dataDir = (String) info.initArgs.get("dir"); |
| |
| String ulogDir = core.getCoreDescriptor().getUlogDir(); |
| if (ulogDir != null) { |
| dataDir = ulogDir; |
| } |
| |
| if (dataDir == null || dataDir.length() == 0) { |
| dataDir = core.getDataDir(); |
| } |
| |
| return new File(dataDir, TLOG_NAME); |
| } |
| |
| } |
| |
| |
| |