| package org.apache.lucene.replicator; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.nio.file.NoSuchFileException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.zip.Adler32; |
| import java.util.zip.Checksum; |
| |
| import org.apache.lucene.analysis.MockAnalyzer; |
| import org.apache.lucene.codecs.CodecUtil; |
| import org.apache.lucene.document.Document; |
| import org.apache.lucene.document.Field; |
| import org.apache.lucene.index.AtomicReader; |
| import org.apache.lucene.index.AtomicReaderContext; |
| import org.apache.lucene.index.ConcurrentMergeScheduler; |
| import org.apache.lucene.index.CorruptIndexException; |
| import org.apache.lucene.index.DirectoryReader; |
| import org.apache.lucene.index.IndexFileNames; |
| import org.apache.lucene.index.IndexNotFoundException; |
| import org.apache.lucene.index.IndexReader; |
| import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; |
| import org.apache.lucene.index.IndexWriter; |
| import org.apache.lucene.index.IndexWriterConfig; |
| import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; |
| import org.apache.lucene.index.MergePolicy; |
| import org.apache.lucene.index.RandomIndexWriter; |
| import org.apache.lucene.index.SegmentCommitInfo; |
| import org.apache.lucene.index.SegmentInfos; |
| import org.apache.lucene.index.SegmentReader; |
| import org.apache.lucene.index.StandardDirectoryReader; |
| import org.apache.lucene.index.Term; |
| import org.apache.lucene.search.IndexSearcher; |
| import org.apache.lucene.search.MatchAllDocsQuery; |
| import org.apache.lucene.search.ReferenceManager; |
| import org.apache.lucene.search.TopDocs; |
| import org.apache.lucene.store.AlreadyClosedException; |
| import org.apache.lucene.store.BaseDirectoryWrapper; |
| import org.apache.lucene.store.ByteArrayDataInput; |
| import org.apache.lucene.store.ChecksumIndexOutput; |
| import org.apache.lucene.store.DataInput; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.FlushInfo; |
| import org.apache.lucene.store.IOContext; |
| import org.apache.lucene.store.IndexInput; |
| import org.apache.lucene.store.IndexOutput; |
| import org.apache.lucene.store.MergeInfo; |
| import org.apache.lucene.store.MockDirectoryWrapper; |
| import org.apache.lucene.store.NRTCachingDirectory; |
| import org.apache.lucene.store.RAMDirectory; |
| import org.apache.lucene.store.RAMOutputStream; |
| import org.apache.lucene.store.RateLimitedIndexOutput; |
| import org.apache.lucene.store.RateLimiter; |
| import org.apache.lucene.util.IOUtils; |
| import org.apache.lucene.util.InfoStream; |
| import org.apache.lucene.util.LineFileDocs; |
| import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; |
| import org.apache.lucene.util.LuceneTestCase; |
| import org.apache.lucene.util.PrintStreamInfoStream; |
| import org.apache.lucene.util.TreeLogger; |
| import org.apache.lucene.util._TestUtil; |
| import org.junit.AfterClass; |
| |
| // nocommit add SLM |
| |
| // nocommit also allow downgrade of Master to Replica, |
| // instead of Master.close then Replica init |
| |
| // nocommit make sure we are not over-IncRef'ing infos on master |
| |
| // nocommit test replicas that are slow to copy |
| |
| // nocommit add fang: master crashes |
| |
| // nocommit provoke more "going backwards", e.g. randomly |
| // sometimes shutdown whole cluster and pick random node to |
| // be the new master |
| |
| // nocommit should we have support for "flush as frequently |
| // as you can"? or at least, "do not flush so frequently |
| // that replicas can't finish copying before next flush"? |
| |
| // nocommit what about network partitioning |
| |
| // nocommit make MDW throw exceptions sometimes |
| |
| // nocommit test warming merge w/ O_DIRECT via NativeUnixDir |
| |
| // nocommit test rare bit errors during copy |
| |
| // nocommit test slow replica |
| |
| // nocommit test replica that "falls out" because it's too |
| // slow and then tries to join back again w/o having |
| // "properly" restarted |
| |
| // nocommit rewrite the test so each node has its own |
| // threads, to be closer to the concurrency we'd "really" |
| // see across N machines |
| |
| // nocommit also allow replicas pulling files from replicas; |
| // they need not always come from master |
| |
| @SuppressCodecs({ "SimpleText", "Memory", "Direct" }) |
| public class TestNRTReplication extends LuceneTestCase { |
| |
| static volatile Master master; |
| static final AtomicInteger masterCount = new AtomicInteger(); |
| static final Lock masterLock = new ReentrantLock(); |
| static Object[] nodes; |
| |
| @AfterClass |
| public static void afterClass() { |
| System.out.println("TEST: now afterClass"); |
| master = null; |
| nodes = null; |
| } |
| |
| public void test() throws Throwable { |
| try { |
| _test(); |
| } catch (Throwable t) { |
| System.out.println("FAILED:"); |
| t.printStackTrace(System.out); |
| throw t; |
| } |
| } |
| |
| private void _test() throws Exception { |
| |
| Thread.currentThread().setName("main"); |
| TreeLogger.setLogger(new TreeLogger("main")); |
| |
| // Maps all segmentInfos.getVersion() we've seen, to the |
| // expected doc count. On each replica when we do a |
| // *:* search we verify the totalHits is correct: |
| final Map<Long,Integer> versionDocCounts = new ConcurrentHashMap<Long,Integer>(); |
| |
| int numDirs = 1+_TestUtil.nextInt(random(), 2, 6); |
| // nocommit |
| //int numDirs = 2; |
| |
| final File[] dirs = new File[numDirs]; |
| |
| // One Master (initially node 0) and N-1 Replica: |
| nodes = new Object[numDirs]; |
| System.out.println("TEST: " + nodes.length + " nodes"); |
| for(int i=0;i<numDirs;i++) { |
| dirs[i] = _TestUtil.getTempDir("NRTReplication." + i + "_"); |
| if (i > 0) { |
| // Some replicas don't start on init: |
| if (random().nextInt(10) < 7) { |
| nodes[i] = new Replica(dirs[i], i, versionDocCounts, null); |
| } else { |
| System.out.println("TEST: skip replica " + i + " startup"); |
| } |
| } |
| } |
| |
| nodes[0] = master = new Master(dirs[0], 0, versionDocCounts); |
| |
| // nocommit test graceful full shutdown / restart |
| |
| // nocommit test all nodes hard shutdown |
| |
| long endTime = System.currentTimeMillis() + 60000; |
| //long endTime = System.currentTimeMillis() + 10000; |
| |
| while (System.currentTimeMillis() < endTime) { |
| Thread.sleep(_TestUtil.nextInt(random(), 2, 50)); |
| |
| assert master != null; |
| |
| TreeLogger.log("\nTEST: now replicate master id=" + master.id); |
| TreeLogger.start("replicate"); |
| |
| SegmentInfos infos; |
| boolean closeMaster = random().nextInt(100) == 57; |
| if (closeMaster) { |
| TreeLogger.log("top: id=" + master.id + " now move master"); |
| // Commits & closes current master and pull the |
| // infos of the final commit: |
| master.close(random().nextBoolean()); |
| TreeLogger.log("top: done shutdown master"); |
| } else { |
| |
| // Have writer do a full flush, and return the |
| // resulting segments, protected from deletion |
| // (incRef'd) just while we copy the files out to |
| // the replica (s). This is just like pulling an |
| // NRT reader, except we don't actually open the |
| // readers on the newly flushed segments: |
| TreeLogger.log("flush current master"); |
| master.flush(); |
| TreeLogger.log("done flush current master"); |
| } |
| |
| CopyState copyState = master.getCopyState(); |
| |
| // nocommit also allow downgrade Master -> Replica, |
| // NOT a full close |
| |
| int count = docCount(copyState.infos); |
| TreeLogger.log("record version=" + copyState.version + " count=" + count + " segs=" + copyState.infos.toString(copyState.dir)); |
| Integer oldCount = versionDocCounts.put(copyState.version, count); |
| |
| // Refresh the local searcher on master: |
| if (closeMaster == false) { |
| master.mgr.setCurrentInfos(copyState.infos); |
| master.mgr.maybeRefresh(); |
| } |
| |
| // nocommit break this into separate tests, so we can |
| // test the "clean" case where versions are "correct": |
| |
| // nocommit cannot do this: versions can go backwards |
| //if (oldCount != null) { |
| //assertEquals("version=" + infos.version + " oldCount=" + oldCount + " newCount=" + count, oldCount.intValue(), count); |
| //} |
| |
| // nocommit can we have commit commit the "older" SIS? |
| // they will commit quickly since the OS will have |
| // already moved those bytes to disk... |
| |
| String extra = " master.sizeInBytes=" + ((NRTCachingDirectory) master.dir.getDelegate()).sizeInBytes(); |
| |
| TreeLogger.log("replicate docCount=" + count + " version=" + copyState.version + extra + " segments=" + copyState.infos.toString(copyState.dir)); |
| |
| // nocommit test master crash (promoting replica to master) |
| |
| // nocommit simulate super-slow replica: it should not |
| // hold up the copying of other replicas, nor new |
| // flushing; the copy of a given SIS to a given |
| // replica should be fully concurrent/bg |
| |
| // Notify all running replicas that they should now |
| // pull the new flush over: |
| int upto = 0; |
| for(Object n : nodes) { |
| if (n != null && n instanceof Replica) { |
| Replica r = (Replica) n; |
| // nocommit can we "broadcast" the new files |
| // instead of each replica pulling its own copy |
| // ... |
| TreeLogger.log("id=" + upto + ": signal new flush"); |
| r.newFlush(); |
| } else if (n == null) { |
| TreeLogger.log("id=" + upto + " skip down replica"); |
| } |
| upto++; |
| } |
| |
| master.releaseCopyState(copyState); |
| |
| if (closeMaster) { |
| if (random().nextBoolean()) { |
| TreeLogger.log("top: id=" + master.id + " now waitIdle"); |
| master.waitIdle(); |
| TreeLogger.log("top: id=" + master.id + " done waitIdle"); |
| } else { |
| TreeLogger.log("top: id=" + master.id + " skip waitIdle"); |
| Thread.sleep(random().nextInt(5)); |
| } |
| |
| TreeLogger.log("top: id=" + master.id + " close old master dir dir.listAll()=" + Arrays.toString(master.dir.listAll())); |
| master.dir.close(); |
| |
| masterLock.lock(); |
| try { |
| masterCount.incrementAndGet(); |
| } finally { |
| masterLock.unlock(); |
| } |
| |
| nodes[master.id] = null; |
| |
| // nocommit make sure we test race here, where |
| // replica is coming up just as we are electing a |
| // new master |
| |
| // Must pick newest replica to promote, else we |
| // can't overwrite open files when trying to copy |
| // to the newer replicas: |
| int bestIDX = -1; |
| long highestVersion = -1; |
| for (int idx=0;idx<nodes.length;idx++) { |
| if (nodes[idx] instanceof Replica) { |
| Replica r = (Replica) nodes[idx]; |
| long version = r.mgr.getCurrentInfos().version; |
| TreeLogger.log("top: id=" + r.id + " check version=" + version); |
| if (version > highestVersion) { |
| bestIDX = idx; |
| highestVersion = version; |
| TreeLogger.log("top: id=" + r.id + " check version=" + version + " max so far"); |
| } |
| } |
| } |
| |
| int idx; |
| if (bestIDX != -1) { |
| idx = bestIDX; |
| } else { |
| // All replicas are down; it doesn't matter |
| // which one we pick |
| idx = random().nextInt(nodes.length); |
| } |
| |
| if (nodes[idx] == null) { |
| // Start up Master from scratch: |
| TreeLogger.log("top: id=" + idx + " promote down node to master"); |
| nodes[idx] = master = new Master(dirs[idx], idx, versionDocCounts); |
| } else { |
| // Promote a running replica to Master: |
| assert nodes[idx] instanceof Replica; |
| TreeLogger.log("top: id=" + idx + " promote replica to master"); |
| master = new Master((Replica) nodes[idx]); |
| nodes[idx] = master; |
| } |
| } else { |
| if (random().nextInt(100) == 17) { |
| TreeLogger.log("top: id=" + master.id + " commit master"); |
| master.commit(); |
| } |
| } |
| |
| // Maybe restart a down replica, or commit / shutdown |
| // / crash one: |
| for(int i=0;i<nodes.length;i++) { |
| if (nodes[i] == null && random().nextInt(20) == 17) { |
| // Restart this replica |
| try { |
| nodes[i] = new Replica(dirs[i], i, versionDocCounts, null); |
| } catch (Throwable t) { |
| TreeLogger.log("top: id=" + i + " FAIL startup", t); |
| throw t; |
| } |
| } else { |
| if (nodes[i] instanceof Replica) { |
| Replica r = (Replica) nodes[i]; |
| |
| if (random().nextInt(100) == 17) { |
| TreeLogger.log("top: id=" + i + " commit replica"); |
| r.commit(false); |
| } |
| |
| if (random().nextInt(100) == 17) { |
| // Now shutdown this replica |
| TreeLogger.log("top: id=" + i + " shutdown replica"); |
| r.shutdown(); |
| nodes[i] = null; |
| break; |
| } else if (random().nextInt(100) == 17) { |
| // Now crash the replica |
| TreeLogger.log("top: id=" + i + " crash replica"); |
| r.crash(); |
| nodes[i] = null; |
| break; |
| } |
| } |
| } |
| } |
| |
| TreeLogger.end("replicate"); |
| } |
| |
| System.out.println("TEST: close replicas"); |
| for(Object n : nodes) { |
| if (n != null && n instanceof Replica) { |
| ((Replica) n).shutdown(); |
| } |
| } |
| |
| if (master != null) { |
| System.out.println("TEST: close master"); |
| master.close(random().nextBoolean()); |
| master.dir.close(); |
| } |
| } |
| |
| static class FileMetaData { |
| public final long length; |
| public final long checksum; |
| |
| public FileMetaData(long length, long checksum) { |
| this.length = length; |
| this.checksum = checksum; |
| } |
| } |
| |
| static Map<String,FileMetaData> getFilesMetaData(Master master, Collection<String> files) throws IOException { |
| Map<String,FileMetaData> filesMetaData = new HashMap<String,FileMetaData>(); |
| for(String file : files) { |
| filesMetaData.put(file, new FileMetaData(master.dir.fileLength(file), master.dir.getChecksum(file))); |
| } |
| |
| return filesMetaData; |
| } |
| |
| static Map<String,FileMetaData> copyFiles(SlowChecksumDirectory src, Replica dst, Map<String,FileMetaData> files, boolean lowPriority, long totBytes) throws IOException { |
| long t0 = System.currentTimeMillis(); |
| |
| // nocommit should we "organize" the files to be copied |
| // by segment name? so that NRTCachingDir can |
| // separately decide on a seg by seg basis whether to |
| // cache the seg's files? |
| IOContext ioContext; |
| if (lowPriority) { |
| // nocommit do we need the tot docs / merge num segs? |
| ioContext = new IOContext(new MergeInfo(0, totBytes, false, 0)); |
| } else { |
| // nocommit can we get numDocs? |
| ioContext = new IOContext(new FlushInfo(0, totBytes)); |
| } |
| |
| CopyResult copyResult = dst.fileCopier.add(src, files, lowPriority, ioContext); |
| |
| try { |
| copyResult.done.await(); |
| } catch (InterruptedException ie) { |
| // nocommit |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(ie); |
| } |
| |
| if (copyResult.failed.get()) { |
| TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + " failed to copy some files"); |
| return null; |
| } |
| |
| long t1 = System.currentTimeMillis(); |
| TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + "took " + (t1-t0) + " millis for " + totBytes + " bytes; " + files.size() + " files; nrtDir.sizeInBytes=" + ((NRTCachingDirectory) dst.dir.getDelegate()).sizeInBytes()); |
| |
| return files; |
| } |
| |
| /** Like SearcherManager, except it refreshes via an |
| * externally provided (NRT) SegmentInfos. */ |
| private static class InfosSearcherManager extends ReferenceManager<IndexSearcher> { |
| private volatile SegmentInfos currentInfos; |
| private final Directory dir; |
| |
| public InfosSearcherManager(Directory dir, int id, SegmentInfos infosIn) throws IOException { |
| this.dir = dir; |
| if (infosIn != null) { |
| currentInfos = infosIn; |
| TreeLogger.log("InfosSearcherManager.init id=" + id + ": use incoming infos=" + infosIn.toString(dir)); |
| } else { |
| currentInfos = new SegmentInfos(); |
| String fileName = SegmentInfos.getLastCommitSegmentsFileName(dir); |
| if (fileName != null) { |
| // Load last commit: |
| currentInfos.read(dir, fileName); |
| TreeLogger.log("InfosSearcherManager.init id=" + id + ": load initial segments file " + fileName + ": loaded version=" + currentInfos.version + " docCount=" + docCount(currentInfos)); |
| } |
| } |
| current = new IndexSearcher(StandardDirectoryReader.open(dir, currentInfos, null)); |
| } |
| |
| @Override |
| protected int getRefCount(IndexSearcher s) { |
| return s.getIndexReader().getRefCount(); |
| } |
| |
| @Override |
| protected boolean tryIncRef(IndexSearcher s) { |
| return s.getIndexReader().tryIncRef(); |
| } |
| |
| @Override |
| protected void decRef(IndexSearcher s) throws IOException { |
| s.getIndexReader().decRef(); |
| } |
| |
| public SegmentInfos getCurrentInfos() { |
| return currentInfos; |
| } |
| |
| public void setCurrentInfos(SegmentInfos infos) { |
| if (currentInfos != null) { |
| // So that if we commit, we will go to the next |
| // (unwritten so far) generation: |
| infos.updateGeneration(currentInfos); |
| TreeLogger.log("mgr.setCurrentInfos: carry over infos gen=" + infos.getSegmentsFileName()); |
| } |
| currentInfos = infos; |
| } |
| |
| // nocommit who manages the "ref" for currentInfos? |
| |
| @Override |
| protected IndexSearcher refreshIfNeeded(IndexSearcher old) throws IOException { |
| List<AtomicReader> subs; |
| if (old == null) { |
| subs = null; |
| } else { |
| subs = new ArrayList<AtomicReader>(); |
| for(AtomicReaderContext ctx : old.getIndexReader().leaves()) { |
| subs.add(ctx.reader()); |
| } |
| } |
| |
| return new IndexSearcher(StandardDirectoryReader.open(dir, currentInfos, subs)); |
| } |
| } |
| |
| private static class CopyState { |
| public final SlowChecksumDirectory dir; |
| public final Map<String,FileMetaData> files; |
| public final long version; |
| public final byte[] infosBytes; |
| public final SegmentInfos infos; |
| |
| public CopyState(SlowChecksumDirectory dir, Map<String,FileMetaData> files, long version, byte[] infosBytes, SegmentInfos infos) { |
| this.dir = dir; |
| this.files = files; |
| this.version = version; |
| this.infosBytes = infosBytes; |
| this.infos = infos; |
| } |
| } |
| |
| // nocommit should extend replica and just add writer |
| private static class Master { |
| |
| final Set<String> finishedMergedSegments = Collections.newSetFromMap(new ConcurrentHashMap<String,Boolean>()); |
| |
| final SlowChecksumDirectory dir; |
| |
| final RandomIndexWriter writer; |
| final InfosSearcherManager mgr; |
| final SearchThread searchThread; |
| final IndexThread indexThread; |
| final int id; |
| private volatile boolean isClosed; |
| private AtomicInteger infosRefCount = new AtomicInteger(); |
| |
| SegmentInfos lastInfos; |
| |
| /** Start up a master from scratch. */ |
| public Master(File path, |
| int id, Map<Long,Integer> versionDocCounts) throws IOException { |
| final MockDirectoryWrapper dirOrig = newMockFSDirectory(path); |
| |
| // In some legitimate cases we will double-write: |
| dirOrig.setPreventDoubleWrite(false); |
| |
| this.id = id; |
| |
| // nocommit put back |
| dirOrig.setCheckIndexOnClose(false); |
| |
| // Master may legitimately close while replicas are |
| // still copying from it: |
| dirOrig.setAllowCloseWithOpenFiles(true); |
| |
| dir = new SlowChecksumDirectory(id, new NRTCachingDirectory(dirOrig, 1.0, 10.0)); |
| //((NRTCachingDirectory) master).VERBOSE = true; |
| SegmentInfos infos = new SegmentInfos(); |
| try { |
| infos.read(dir); |
| } catch (IndexNotFoundException infe) { |
| } |
| |
| mgr = new InfosSearcherManager(dir, id, infos); |
| searchThread = new SearchThread("master", mgr, versionDocCounts); |
| searchThread.start(); |
| |
| SegmentInfos curInfos = mgr.getCurrentInfos().clone(); |
| |
| IndexWriterConfig iwc = getIWC(); |
| iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); |
| |
| // nocommit also test periodically committing, and |
| // preserving multiple commit points; verify these |
| // "survive" over to the replica |
| writer = new RandomIndexWriter(random(), dir, curInfos, iwc); |
| _TestUtil.reduceOpenFiles(writer.w); |
| System.out.println("after reduce: " + writer.w.getConfig()); |
| |
| lastInfos = mgr.getCurrentInfos(); |
| // nocommit thread hazard here? IW could have already |
| // nuked some segments...? |
| writer.w.incRefDeleter(lastInfos); |
| setCopyState(); |
| |
| indexThread = new IndexThread(this); |
| indexThread.start(); |
| } |
| |
| public void commit() throws IOException { |
| writer.w.prepareCommit(); |
| // It's harmless if we crash here, because the |
| // global state has already been updated |
| writer.w.commit(); |
| } |
| |
| /** Promotes an existing Replica to Master, re-using the |
| * open NRTCachingDir, the SearcherManager, the search |
| * thread, etc. */ |
| public Master(Replica replica) throws IOException { |
| this.id = replica.id; |
| this.dir = replica.dir; |
| this.mgr = replica.mgr; |
| this.searchThread = replica.searchThread; |
| |
| // Master may legitimately close while replicas are |
| // still copying from it: |
| ((MockDirectoryWrapper) ((NRTCachingDirectory) dir.getDelegate()).getDelegate()).setAllowCloseWithOpenFiles(true); |
| |
| // Do not copy from ourself: |
| try { |
| replica.copyThread.finish(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(ie); |
| } |
| replica.fileCopier.close(); |
| |
| // nocommit must somehow "stop" this replica? e.g. we |
| // don't want it doing any more deleting? |
| |
| IndexWriterConfig iwc = getIWC(); |
| iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); |
| |
| // nocommit also test periodically committing, and |
| // preserving multiple commit points; verify these |
| // "survive" over to the replica |
| SegmentInfos infos = replica.mgr.getCurrentInfos().clone(); |
| TreeLogger.log("top: id=" + replica.id + " sis version=" + infos.version); |
| |
| writer = new RandomIndexWriter(random(), dir, replica.mgr.getCurrentInfos().clone(), iwc); |
| _TestUtil.reduceOpenFiles(writer.w); |
| System.out.println("after reduce: " + writer.w.getConfig()); |
| |
| lastInfos = mgr.getCurrentInfos(); |
| |
| writer.w.incRefDeleter(lastInfos); |
| |
| setCopyState(); |
| |
| // nocommit thread hazard here? IW could have already |
| // nuked some segments...? |
| writer.w.incRefDeleter(lastInfos); |
| |
| indexThread = new IndexThread(this); |
| indexThread.start(); |
| } |
| |
| private IndexWriterConfig getIWC() { |
| IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); |
| |
| // Install a merged segment warmer that pre-copies (low |
| // priority) merged segment files out to replica(s) |
| // before the merge is swapped into the live segments; |
| // this way a large merge won't interfere with |
| // NRT turnaround time: |
| iwc.setMergedSegmentWarmer(new IndexReaderWarmer() { |
| @Override |
| public void warm(AtomicReader reader) throws IOException { |
| SegmentCommitInfo info = ((SegmentReader) reader).getSegmentInfo(); |
| if (TreeLogger.getLogger() == null) { |
| TreeLogger.setLogger(new TreeLogger("merge")); |
| } |
| //System.out.println("TEST: warm merged segment files " + info); |
| Map<String,FileMetaData> toCopy = getFilesMetaData(Master.this, info.files()); |
| long totBytes = 0; |
| for(FileMetaData metaData : toCopy.values()) { |
| totBytes += metaData.length; |
| } |
| TreeLogger.log("warm files=" + toCopy.keySet() + " totBytes=" + totBytes); |
| TreeLogger.start("warmMerge"); |
| |
| IOContext context = new IOContext(new MergeInfo(0, totBytes, false, 0)); |
| |
| List<CopyResult> copyResults = new ArrayList<>(); |
| for(Object n : nodes) { |
| if (n != null && n instanceof Replica) { |
| Replica r = (Replica) n; |
| |
| try { |
| |
| // nocommit we could also have replica pre-warm a SegmentReader |
| // here, and add it onto subReader list |
| // for next reopen ...? |
| |
| // Must call filter, in case we are |
| // overwriting files and must invalidate |
| // the last commit: |
| int sizeBefore = toCopy.size(); |
| Map<String,FileMetaData> toCopy2 = r.filterFilesToCopy(toCopy); |
| |
| // Since this is a newly merged segment, |
| // all files should be new and need |
| // copying: |
| assert toCopy.size() == sizeBefore: "before=" + toCopy.keySet() + " after=" + toCopy2.keySet(); |
| |
| TreeLogger.log("copy to replica " + r.id); |
| copyResults.add(r.copyMergedFiles(Master.this.dir, toCopy2, context)); |
| } catch (AlreadyClosedException ace) { |
| // Ignore this: it means the replica shut down |
| // while we were trying to copy files. This |
| // "approximates" an exception the master would |
| // see trying to push file bytes to a replica |
| // that was just taken offline. |
| } |
| } |
| } |
| |
| TreeLogger.log("now wait for " + copyResults.size() + " replicas to finish copying"); |
| for(CopyResult result : copyResults) { |
| try { |
| result.done.await(); |
| } catch (InterruptedException ie) { |
| // nocommit |
| } |
| |
| // nocommit if there's an error ... what to |
| // do? |
| |
| // nocommit we should check merge.abort |
| // somehow in here, so if the master is in |
| // a hurry to shutdown, we respect that |
| } |
| TreeLogger.log("done warm merge for " + copyResults.size() + " replicas"); |
| TreeLogger.end("warmMerge"); |
| } |
| }); |
| |
| return iwc; |
| } |
| |
| /** Gracefully shuts down the master */ |
| public void close(boolean waitForMerges) throws IOException { |
| TreeLogger.log("id=" + id + " close master waitForMerges=" + waitForMerges); |
| |
| try { |
| searchThread.finish(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(ie); |
| } |
| |
| if (waitForMerges) { |
| // Do it here, instead of on close, so we can |
| // continue indexing while waiting for merges: |
| writer.w.waitForMerges(); |
| TreeLogger.log("waitForMerges done"); |
| } |
| |
| mgr.close(); |
| |
| try { |
| indexThread.finish(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(ie); |
| } |
| |
| TreeLogger.log("close writer"); |
| |
| // nocommit can we optionally NOT commit here? it |
| // should be decoupled from master migration? |
| |
| commit(); |
| |
| synchronized(this) { |
| isClosed = true; |
| if (lastInfos != null) { |
| writer.w.decRefDeleter(lastInfos); |
| lastInfos = null; |
| copyState = null; |
| } |
| |
| // nocommit what about exc from writer.close... |
| // Don't wait for merges now; we already did above: |
| writer.close(false); |
| TreeLogger.log("done close writer"); |
| |
| SegmentInfos infos = new SegmentInfos(); |
| infos.read(dir); |
| |
| TreeLogger.log("final infos=" + infos.toString(master.dir)); |
| |
| // nocommit caller must close |
| // dir.close(); |
| |
| lastInfos = infos; |
| setCopyState(); |
| } |
| } |
| |
| public synchronized boolean isClosed() { |
| return isClosed; |
| } |
| |
| private CopyState copyState; |
| |
| private synchronized void setCopyState() throws IOException { |
| RAMOutputStream out = new RAMOutputStream(); |
| lastInfos.write(out); |
| byte[] infosBytes = new byte[(int) out.getFilePointer()]; |
| out.writeTo(infosBytes, 0); |
| copyState = new CopyState(dir, |
| getFilesMetaData(this, lastInfos.files(dir, false)), |
| lastInfos.version, infosBytes, lastInfos); |
| } |
| |
| public synchronized CopyState getCopyState() throws IOException { |
| //if (isClosed && lastInfos == null) { |
| //return null; |
| //} |
| TreeLogger.log("master.getCopyState version=" + lastInfos.version + " files=" + lastInfos.files(dir, false) + " writer=" + writer); |
| if (isClosed == false) { |
| writer.w.incRefDeleter(copyState.infos); |
| } |
| int count = infosRefCount.incrementAndGet(); |
| assert count >= 1; |
| return copyState; |
| } |
| |
| /** Flushes, returns a ref with the resulting infos. */ |
| public void flush() throws IOException { |
| SegmentInfos newInfos = master.writer.w.flushAndIncRef(); |
| |
| synchronized(this) { |
| writer.w.decRefDeleter(lastInfos); |
| // Steals the reference returned by IW: |
| lastInfos = newInfos; |
| setCopyState(); |
| } |
| } |
| |
| public synchronized void releaseCopyState(CopyState copyState) throws IOException { |
| TreeLogger.log("master.releaseCopyState version=" + copyState.version + " files=" + copyState.files.keySet()); |
| // Must check because by the time the replica releases |
| // it's possible it's a different master: |
| if (copyState.dir == dir) { |
| if (isClosed == false) { |
| writer.w.decRefDeleter(copyState.infos); |
| } |
| int count = infosRefCount.decrementAndGet(); |
| assert count >= 0; |
| TreeLogger.log(" infosRefCount=" + infosRefCount); |
| if (count == 0) { |
| notify(); |
| } |
| } else { |
| TreeLogger.log(" skip: wrong master"); |
| } |
| } |
| |
| /** Waits until all outstanding infos refs are dropped. */ |
| public synchronized void waitIdle() throws InterruptedException { |
| while (true) { |
| if (infosRefCount.get() == 0) { |
| return; |
| } |
| wait(); |
| } |
| } |
| } |
| |
| static class IndexThread extends Thread { |
| |
| final Master master; |
| volatile boolean stop; |
| |
| public IndexThread(Master master) { |
| this.master = master; |
| setName("IndexThread id=" + master.id); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| int docID = 0; |
| LineFileDocs docs = new LineFileDocs(random()); |
| while (stop == false) { |
| Document doc = docs.nextDoc(); |
| Field idField = doc.getField("docid"); |
| if (random().nextInt(10) == 9 && docID > 0) { |
| int randomDocID = random().nextInt(docID); |
| idField.setStringValue(""+randomDocID); |
| master.writer.updateDocument(new Term("docid", ""+randomDocID), doc); |
| } else { |
| idField.setStringValue(""+docID); |
| master.writer.addDocument(doc); |
| docID++; |
| } |
| // ~500 docs/sec |
| Thread.sleep(2); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException("IndexThread on id=" + master.id + ": " + e, e); |
| } |
| } |
| |
| public void finish() throws InterruptedException { |
| stop = true; |
| join(); |
| } |
| } |
| |
| private static class Replica { |
| final int id; |
| final SlowChecksumDirectory dir; |
| final InfosRefCounts deleter; |
| |
| private final InfosSearcherManager mgr; |
| private volatile boolean stop; |
| final SearchThread searchThread; |
| final SimpleFileCopier fileCopier; |
| CopyThread copyThread; |
| |
| private final Collection<String> lastCommitFiles; |
| private final Collection<String> lastNRTFiles; |
| |
| // nocommit unused so far: |
| private final IndexReaderWarmer mergedSegmentWarmer; |
| |
| public Replica(File path, int id, Map<Long,Integer> versionDocCounts, IndexReaderWarmer mergedSegmentWarmer) throws IOException { |
| TreeLogger.log("top: id=" + id + " replica startup path=" + path); |
| TreeLogger.start("startup"); |
| |
| this.id = id; |
| this.mergedSegmentWarmer = mergedSegmentWarmer; |
| |
| Directory fsDir = newMockFSDirectory(path); |
| |
| // In some legitimate cases we will double-write: |
| ((MockDirectoryWrapper) fsDir).setPreventDoubleWrite(false); |
| |
| // nocommit put back |
| ((BaseDirectoryWrapper) fsDir).setCheckIndexOnClose(false); |
| |
| dir = new SlowChecksumDirectory(id, new NRTCachingDirectory(fsDir, 1.0, 10.0)); |
| TreeLogger.log("id=" + id + " created dirs, checksums; dir.listAll=" + Arrays.toString(dir.listAll())); |
| |
| fileCopier = new SimpleFileCopier(dir, id); |
| fileCopier.start(); |
| |
| lastCommitFiles = new HashSet<String>(); |
| lastNRTFiles = new HashSet<String>(); |
| |
| deleter = new InfosRefCounts(id, dir); |
| |
| String segmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(dir); |
| SegmentInfos lastCommit = null; |
| if (segmentsFileName != null) { |
| lastCommit = new SegmentInfos(); |
| TreeLogger.log("id=" + id + " " + segmentsFileName + " now load"); |
| lastCommit.read(dir, segmentsFileName); |
| lastCommitFiles.addAll(lastCommit.files(dir, true)); |
| TreeLogger.log("id=" + id + " incRef lastCommitFiles"); |
| deleter.incRef(lastCommitFiles); |
| TreeLogger.log("id=" + id + " loaded version=" + lastCommit.version + " lastCommitFiles = " + lastCommitFiles); |
| } |
| |
| // Must sync latest index from master before starting |
| // up mgr, so that we don't hold open any files that |
| // need to be overwritten when the master is against |
| // an older index than our copy, and so we rollback |
| // our version if we had been at a higher version but |
| // were down when master moved: |
| SegmentInfos infos = null; |
| |
| // Startup sync to pull latest index over: |
| Map<String,FileMetaData> copiedFiles = null; |
| |
| if (master != null) { |
| |
| // nocommit this logic isn't right; else, on a full |
| // restart how will a master be found: |
| |
| // Repeat until we find a working master; this is to |
| // handle the case when a replica starts up but no |
| // new master has yet been selected when moving |
| // master: |
| while (true) { |
| Master curMaster = master; |
| CopyState copyState = curMaster.getCopyState(); |
| try { |
| // nocommit factor this out & share w/ CopyThread: |
| |
| deleter.incRef(copyState.files.keySet()); |
| |
| Map<String,FileMetaData> toCopy = filterFilesToCopy(copyState.files); |
| long totBytes = 0; |
| for(FileMetaData metaData : toCopy.values()) { |
| totBytes += metaData.length; |
| } |
| |
| // Copy files over to replica: |
| copiedFiles = copyFiles(copyState.dir, this, toCopy, false, totBytes); |
| if (copiedFiles == null) { |
| TreeLogger.log("id=" + id + " startup copyFiles failed"); |
| deleter.decRef(copyState.files.keySet()); |
| try { |
| Thread.sleep(5); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| } |
| continue; |
| } |
| |
| // Turn byte[] back to SegmentInfos: |
| infos = new SegmentInfos(); |
| infos.read(dir, new ByteArrayDataInput(copyState.infosBytes)); |
| lastNRTFiles.addAll(copyState.files.keySet()); |
| } finally { |
| curMaster.releaseCopyState(copyState); |
| } |
| break; |
| } |
| } else { |
| TreeLogger.log("id=" + id + " no master on startup; fallback to last commit"); |
| } |
| |
| if (lastCommit != null) { |
| infos.updateGeneration(lastCommit); |
| } |
| |
| mgr = new InfosSearcherManager(dir, id, infos); |
| |
| deleter.deleteUnknownFiles(); |
| |
| searchThread = new SearchThread(""+id, mgr, versionDocCounts); |
| searchThread.start(); |
| |
| copyThread = new CopyThread(this); |
| copyThread.start(); |
| |
| TreeLogger.log("done startup"); |
| TreeLogger.end("startup"); |
| } |
| |
| /** From the incoming files, determines which ones need |
| * copying on this replica, because they weren't yet |
| * copied, or they were previously copied but the |
| * checksum or file length is different. */ |
| public Map<String,FileMetaData> filterFilesToCopy(Map<String,FileMetaData> files) throws IOException { |
| Map<String,FileMetaData> toCopy = new HashMap<>(); |
| |
| for(Map.Entry<String,FileMetaData> ent : files.entrySet()) { |
| String fileName = ent.getKey(); |
| FileMetaData metaData = ent.getValue(); |
| long srcChecksum = ent.getValue().checksum; |
| |
| Long checksum = dir.getChecksum(fileName); |
| |
| long curLength; |
| try { |
| curLength = dir.fileLength(fileName); |
| } catch (FileNotFoundException | NoSuchFileException e) { |
| curLength = -1; |
| } |
| if (curLength != metaData.length) { |
| TreeLogger.log("id=" + id + " " + fileName + " will copy [wrong file length old=" + curLength + " new=" + metaData.length + "]"); |
| toCopy.put(fileName, metaData); |
| } else if (checksum == null) { |
| TreeLogger.log("id=" + id + " " + fileName + " will copy [no checksum] length=" + metaData.length + " srcChecksum=" + srcChecksum); |
| toCopy.put(fileName, metaData); |
| } else if (checksum.longValue() != srcChecksum) { |
| TreeLogger.log("id=" + id + " " + fileName + " will copy [wrong checksum: cur=" + checksum.longValue() + " new=" + srcChecksum + "] length=" + metaData.length); |
| toCopy.put(fileName, metaData); |
| } else { |
| TreeLogger.log("id=" + id + " " + fileName + " skip copy checksum=" + srcChecksum + " file.length=" + metaData.length); |
| } |
| } |
| |
| // nocommit what about multiple commit points? |
| |
| // Invalidate the current commit if we will overwrite |
| // any files from it: |
| |
| // nocommit do we need sync'd here? |
| for(String fileName : toCopy.keySet()) { |
| if (lastCommitFiles.contains(fileName)) { |
| TreeLogger.log("id=" + id + " " + fileName + " is being copied but is referenced by last commit; now drop last commit; lastCommitFiles=" + lastCommitFiles); |
| deleter.decRef(lastCommitFiles); |
| dir.deleteFile("segments.gen"); |
| lastCommitFiles.clear(); |
| break; |
| } |
| } |
| |
| return toCopy; |
| } |
| |
| public void newFlush() { |
| copyThread.lock.lock(); |
| try { |
| copyThread.cond.signal(); |
| } finally { |
| copyThread.lock.unlock(); |
| } |
| } |
| |
| public CopyResult copyMergedFiles(SlowChecksumDirectory src, Map<String,FileMetaData> files, IOContext context) throws IOException { |
| if (stop) { |
| throw new AlreadyClosedException("replica closed"); |
| } |
| return fileCopier.add(src, files, true, context); |
| } |
| |
| void sync(int curMasterMoveCount, SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData, byte[] infosBytes, |
| long infosVersion) throws IOException { |
| |
| SegmentInfos currentInfos = mgr.getCurrentInfos(); |
| TreeLogger.log("id=" + id + " sync version=" + infosVersion + " vs current version=" + currentInfos.getVersion()); |
| TreeLogger.start("sync"); |
| |
| // nocommit make overall test "modal", ie up front |
| // decide whether any docs are allowed to be lost |
| // (version goes backwards on replicas) or not |
| |
| /* |
| if (currentInfos != null && currentInfos.getVersion() >= infosVersion) { |
| System.out.println(" replica id=" + id + ": skip sync current version=" + currentInfos.getVersion() + " vs new version=" + infosVersion); |
| return; |
| } |
| */ |
| |
| // Must incRef before filter in case filter decRefs |
| // the last commit: |
| deleter.incRef(filesMetaData.keySet()); |
| |
| Map<String,FileMetaData> toCopy = filterFilesToCopy(filesMetaData); |
| long totBytes = 0; |
| for(FileMetaData metaData : toCopy.values()) { |
| totBytes += metaData.length; |
| } |
| |
| // Copy files over to replica: |
| Map<String,FileMetaData> copiedFiles = copyFiles(master, this, toCopy, false, totBytes); |
| |
| if (copiedFiles == null) { |
| // At least one file failed to copy; skip cutover |
| TreeLogger.log("top: id=" + id + " replica sync failed; abort"); |
| deleter.decRef(filesMetaData.keySet()); |
| TreeLogger.end("sync"); |
| return; |
| } |
| |
| TreeLogger.log("top: id=" + id + " replica sync done file copy"); |
| |
| // OK all files copied successfully, now rebuild the |
| // infos and cutover searcher mgr |
| |
| // Turn byte[] back to SegmentInfos: |
| SegmentInfos infos = new SegmentInfos(); |
| infos.read(dir, new ByteArrayDataInput(infosBytes)); |
| TreeLogger.log("id=" + id + " replica sync version=" + infos.version + " segments=" + infos.toString(dir)); |
| |
| masterLock.lock(); |
| try { |
| if (curMasterMoveCount != masterCount.get()) { |
| // At least one file failed to copy; skip cutover |
| TreeLogger.log("top: id=" + id + " master moved during sync; abort"); |
| deleter.decRef(filesMetaData.keySet()); |
| TreeLogger.end("sync"); |
| return; |
| } |
| |
| synchronized (this) { |
| mgr.setCurrentInfos(infos); |
| deleter.decRef(lastNRTFiles); |
| lastNRTFiles.clear(); |
| lastNRTFiles.addAll(filesMetaData.keySet()); |
| } |
| } finally { |
| masterLock.unlock(); |
| } |
| |
| // Cutover to new searcher |
| mgr.maybeRefresh(); |
| |
| TreeLogger.log("top: id=" + id + " done mgr refresh"); |
| |
| TreeLogger.end("sync"); |
| } |
| |
| /** Gracefully close & shutdown this replica. */ |
| public void shutdown() throws IOException, InterruptedException { |
| if (stop) { |
| return; |
| } |
| stop = true; |
| |
| copyThread.finish(); |
| fileCopier.close(); |
| |
| // Sometimes shutdown w/o commiting |
| TreeLogger.log("id=" + id + " replica shutdown"); |
| TreeLogger.start("shutdown"); |
| if (random().nextBoolean()) { |
| TreeLogger.log("id=" + id + " commit before shutdown"); |
| commit(true); |
| } |
| |
| searchThread.finish(); |
| mgr.close(); |
| |
| // Delete now un-referenced files: |
| TreeLogger.log("id=" + id + " now deletePending during shutdown"); |
| deleter.decRef(lastNRTFiles); |
| lastNRTFiles.clear(); |
| deleter.deletePending(); |
| TreeLogger.log("id=" + id + " at shutdown dir.listAll()=" + Arrays.toString(dir.listAll())); |
| dir.close(); |
| TreeLogger.end("shutdown"); |
| } |
| |
| /** Crashes the underlying directory, corrupting any |
| * un-sync'd files. */ |
| public void crash() throws IOException, InterruptedException { |
| if (stop) { |
| return; |
| } |
| stop = true; |
| TreeLogger.log("id=" + id + " replica crash; dir.listAll()=" + Arrays.toString(dir.listAll())); |
| TreeLogger.log("id=" + id + " replica crash; fsdir.listAll()=" + Arrays.toString(((NRTCachingDirectory) dir.getDelegate()).getDelegate().listAll())); |
| copyThread.finish(); |
| fileCopier.close(); |
| searchThread.finish(); |
| mgr.close(); |
| ((MockDirectoryWrapper) ((NRTCachingDirectory) dir.getDelegate()).getDelegate()).crash(); |
| ((NRTCachingDirectory) dir.getDelegate()).getDelegate().close(); |
| } |
| |
| /** Commit latest SegmentInfos (fsync'ing all referenced |
| * files). */ |
| public void commit(boolean deleteAll) throws IOException { |
| SegmentInfos infos; |
| Collection<String> newFiles; |
| |
| synchronized (this) { |
| infos = mgr.getCurrentInfos(); |
| if (infos != null) { |
| newFiles = infos.files(dir, false); |
| deleter.incRef(newFiles); |
| } else { |
| // nocommit is infos ever null? |
| newFiles = null; |
| } |
| } |
| |
| if (infos != null) { |
| TreeLogger.log("top: id=" + id + " commit deleteAll=" + deleteAll + "; infos.version=" + infos.getVersion() + " infos.files=" + newFiles + " segs=" + infos.toString(dir)); |
| TreeLogger.start("commit"); |
| dir.sync(newFiles); |
| infos.commit(dir); |
| |
| String segmentsFileName = infos.getSegmentsFileName(); |
| deleter.incRef(Collections.singletonList(segmentsFileName)); |
| |
| synchronized (this) { |
| // If a sync happened while we were committing, we |
| // must carry over the commit gen: |
| SegmentInfos curInfos = mgr.getCurrentInfos(); |
| if (curInfos != infos) { |
| curInfos.updateGeneration(infos); |
| } |
| } |
| |
| TreeLogger.log("top: id=" + id + " " + segmentsFileName + " committed; now decRef lastCommitFiles=" + lastCommitFiles); |
| |
| deleter.decRef(lastCommitFiles); |
| lastCommitFiles.clear(); |
| lastCommitFiles.addAll(newFiles); |
| lastCommitFiles.add(segmentsFileName); |
| TreeLogger.log("id=" + id + " " + infos.getSegmentsFileName() + " lastCommitFiles=" + lastCommitFiles); |
| |
| if (deleteAll) { |
| // nocommit this is messy: we may delete a merge's files |
| // that just copied over as we closed the writer: |
| TreeLogger.log("id=" + id + " now deleteUnknownFiles during commit"); |
| deleter.deleteUnknownFiles(); |
| TreeLogger.log("id=" + id + " done deleteUnknownFiles during commit"); |
| } |
| |
| TreeLogger.end("commit"); |
| } |
| } |
| } |
| |
| static class CopyFileJob { |
| public final CopyResult result; |
| public final Directory src; |
| public final FileMetaData metaData; |
| public final String fileName; |
| public final IOContext context; |
| |
| public CopyFileJob(CopyResult result, IOContext context, Directory src, String fileName, FileMetaData metaData) { |
| this.result = result; |
| this.context = context; |
| this.src = src; |
| this.fileName = fileName; |
| this.metaData = metaData; |
| } |
| } |
| |
| static class CopyOneFile implements Closeable { |
| private long bytesLeft; |
| |
| // TODO: reuse...? |
| private final byte[] buffer = new byte[65536]; |
| |
| private final IndexInput in; |
| private final IndexOutput out; |
| private final SlowChecksumDirectory dest; |
| |
| public final CopyFileJob job; |
| public boolean failed; |
| |
| public CopyOneFile(CopyFileJob job, SlowChecksumDirectory dest, RateLimiter rateLimiter) throws IOException { |
| this.job = job; |
| this.dest = dest; |
| in = job.src.openInput(job.fileName, job.context); |
| boolean success = false; |
| try { |
| IndexOutput out0 = dest.createOutput(job.fileName, job.context); |
| if (rateLimiter == null) { |
| // No IO rate limiting |
| out = out0; |
| } else { |
| out = new RateLimitedIndexOutput(rateLimiter, out0); |
| } |
| success = true; |
| } finally { |
| if (success == false) { |
| IOUtils.closeWhileHandlingException(in); |
| } |
| } |
| |
| bytesLeft = job.metaData.length; |
| } |
| |
| public boolean visit() throws IOException { |
| int chunk = bytesLeft > buffer.length ? buffer.length : (int) bytesLeft; |
| try { |
| in.readBytes(buffer, 0, chunk); |
| out.writeBytes(buffer, 0, chunk); |
| } catch (Exception e) { |
| TreeLogger.log("failed to copy " + job.fileName, e); |
| failed = true; |
| } |
| bytesLeft -= chunk; |
| return bytesLeft != 0; |
| } |
| |
| /** Abort the copy. */ |
| public void abort() { |
| TreeLogger.log("now abort copy file " + job.fileName); |
| try { |
| close(); |
| } catch (IOException ioe) { |
| } |
| try { |
| dest.deleteFile(job.fileName); |
| } catch (IOException ioe) { |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| IOUtils.close(in, out); |
| if (failed) { |
| dest.deleteFile(job.fileName); |
| throw new IOException("copy failed"); |
| } |
| |
| Long actual = dest.getChecksum(job.fileName); |
| assert actual != null; |
| |
| if (actual.longValue() != job.metaData.checksum) { |
| // Uh oh, bits flipped during copy: |
| dest.deleteFile(job.fileName); |
| throw new IOException("checksum mismatch"); |
| } |
| } |
| } |
| |
| static class CopyResult { |
| public final CountDownLatch done; |
| public final AtomicBoolean failed = new AtomicBoolean(); |
| |
| public CopyResult(int fileCount) { |
| done = new CountDownLatch(fileCount); |
| } |
| } |
| |
| // TODO: abstract this, enable swapping in different |
| // low-level tools ... rsync, bittorrent, etc. |
| |
| /** Simple class to copy low (merged segments) & high |
| * (flushes) priority files. It has a simplistic |
| * "ionice" implementation: if we are copying a low-pri |
| * (merge) file and a high-pri (flush) job shows up, then |
| * we pause the low-pri copy and finish all high-pri |
| * copies, then resume it. */ |
| |
| static class SimpleFileCopier extends Thread implements Closeable { |
| |
| private final Queue<CopyFileJob> highPriorityJobs = new ConcurrentLinkedQueue<>(); |
| private final Queue<CopyFileJob> lowPriorityJobs = new ConcurrentLinkedQueue<>(); |
| private final SlowChecksumDirectory dest; |
| |
| // nocommit make rate limit (10 MB/sec now) controllable: |
| private final RateLimiter mergeRateLimiter = new RateLimiter.SimpleRateLimiter(10.0); |
| private final int id; |
| private boolean stop; |
| |
| /** We always copy files into this dest. */ |
| public SimpleFileCopier(SlowChecksumDirectory dest, int id) { |
| this.dest = dest; |
| this.id = id; |
| } |
| |
| // nocommit use Future here |
| public synchronized CopyResult add(SlowChecksumDirectory src, Map<String,FileMetaData> files, boolean lowPriority, IOContext context) { |
| if (stop) { |
| throw new AlreadyClosedException("closed"); |
| } |
| CopyResult result = new CopyResult(files.size()); |
| Queue<CopyFileJob> queue = lowPriority ? lowPriorityJobs : highPriorityJobs; |
| for(Map.Entry<String,FileMetaData> ent : files.entrySet()) { |
| queue.add(new CopyFileJob(result, context, src, ent.getKey(), ent.getValue())); |
| } |
| notify(); |
| return result; |
| } |
| |
| @Override |
| public void run() { |
| Thread.currentThread().setName("fileCopier id=" + id); |
| TreeLogger.setLogger(new TreeLogger("fileCopier id=" + id)); |
| |
| CopyOneFile curLowPri = null; |
| CopyOneFile curHighPri = null; |
| |
| boolean success = false; |
| |
| try { |
| while (stop == false) { |
| if (curHighPri != null) { |
| if (curHighPri.visit() == false) { |
| TreeLogger.log("id=" + id + " " + curHighPri.job.fileName + " high-priority copy done"); |
| // Finished copying; now close & verify checksums: |
| try { |
| curHighPri.close(); |
| } catch (IOException ioe) { |
| curHighPri.job.result.failed.set(true); |
| TreeLogger.log("WARNING: id=" + id + " " + curHighPri.job.fileName + ": failed to copy high-priority file"); |
| } finally { |
| curHighPri.job.result.done.countDown(); |
| } |
| curHighPri = null; |
| } |
| } else if (highPriorityJobs.isEmpty() == false) { |
| // Start new high-priority copy: |
| CopyFileJob job = highPriorityJobs.poll(); |
| try { |
| curHighPri = new CopyOneFile(job, dest, null); |
| TreeLogger.log("id=" + id + " " + curHighPri.job.fileName + " now start high-priority copy"); |
| } catch (AlreadyClosedException ace) { |
| TreeLogger.log("id=" + id + " " + job.fileName + " skip copy: hit AlreadyClosedException"); |
| job.result.failed.set(true); |
| job.result.done.countDown(); |
| } |
| } else if (curLowPri != null) { |
| if (curLowPri.visit() == false) { |
| TreeLogger.log("id=" + id + " " + curLowPri.job.fileName + " low-priority copy done"); |
| // Finished copying; now close & verify checksums: |
| try { |
| curLowPri.close(); |
| } catch (IOException ioe) { |
| TreeLogger.log("WARNING: id=" + id + " " + curLowPri.job.fileName + ": failed to copy low-priority file"); |
| curLowPri.job.result.failed.set(true); |
| } finally { |
| curLowPri.job.result.done.countDown(); |
| } |
| curLowPri = null; |
| } |
| } else if (lowPriorityJobs.isEmpty() == false) { |
| // Start new low-priority copy: |
| CopyFileJob job = lowPriorityJobs.poll(); |
| try { |
| curLowPri = new CopyOneFile(job, dest, mergeRateLimiter); |
| TreeLogger.log("id=" + id + " " + curLowPri.job.fileName + " now start low-priority copy"); |
| } catch (AlreadyClosedException ace) { |
| TreeLogger.log("id=" + id + " " + job.fileName + " skip copy: hit AlreadyClosedException"); |
| job.result.failed.set(true); |
| job.result.done.countDown(); |
| } |
| } else { |
| // Wait for another job: |
| synchronized (this) { |
| if (highPriorityJobs.isEmpty() && lowPriorityJobs.isEmpty()) { |
| TreeLogger.log("id=" + id + " copy thread now idle"); |
| try { |
| wait(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| } |
| TreeLogger.log("id=" + id + " copy thread now wake up"); |
| } |
| } |
| } |
| } |
| success = true; |
| } catch (IOException ioe) { |
| // nocommit catch each op & retry instead? |
| throw new RuntimeException(ioe); |
| } finally { |
| synchronized(this) { |
| stop = true; |
| if (curLowPri != null) { |
| curLowPri.abort(); |
| curLowPri.job.result.failed.set(true); |
| curLowPri.job.result.done.countDown(); |
| } |
| if (curHighPri != null) { |
| curHighPri.abort(); |
| curHighPri.job.result.failed.set(true); |
| curHighPri.job.result.done.countDown(); |
| } |
| |
| for(CopyFileJob job : highPriorityJobs) { |
| job.result.failed.set(true); |
| job.result.done.countDown(); |
| } |
| |
| for(CopyFileJob job : lowPriorityJobs) { |
| job.result.failed.set(true); |
| job.result.done.countDown(); |
| } |
| } |
| } |
| } |
| |
| // nocommit cutover all other finishes to closeable |
| |
| @Override |
| public synchronized void close() throws IOException { |
| stop = true; |
| try { |
| notify(); |
| join(); |
| } catch (InterruptedException ie) { |
| // nocommit |
| throw new IOException(ie); |
| } |
| } |
| } |
| |
| // TODO: we could pre-copy merged files out event before |
| // warmMerge is called? e.g. if we "notice" files being |
| // written to the dir ... could give us a "head start" |
| |
| /** Runs in each replica, to handle copying over new |
| * flushes. */ |
| static class CopyThread extends Thread { |
| final Lock lock; |
| final Condition cond; |
| private final Replica replica; |
| private volatile boolean stop; |
| |
| public CopyThread(Replica replica) { |
| this.lock = new ReentrantLock(); |
| this.cond = lock.newCondition(); |
| this.stop = false; |
| this.replica = replica; |
| } |
| |
| @Override |
| public void run() { |
| Thread.currentThread().setName("replica id=" + replica.id); |
| TreeLogger.setLogger(new TreeLogger("replica id=" + replica.id)); |
| |
| try { |
| // While loop to keep pulling newly flushed/merged |
| // segments until we shutdown |
| while (stop == false) { |
| |
| try { |
| |
| // While loop to pull a single new segment: |
| long curVersion = replica.mgr.getCurrentInfos().version; |
| |
| SegmentInfos newInfos = null; |
| CopyState copyState = null; |
| Master curMaster = null; |
| int curMasterMoveCount = -1; |
| while (stop == false) { |
| lock.lock(); |
| try { |
| curMaster = master; |
| if (curMaster != null) { |
| copyState = curMaster.getCopyState(); |
| curMasterMoveCount = masterCount.get(); |
| if (copyState != null) { |
| assert copyState.version >= curVersion: "copyState.version=" + copyState.version + " vs curVersion=" + curVersion; |
| if (copyState.version > curVersion) { |
| TreeLogger.log("got new copyState"); |
| break; |
| } else { |
| TreeLogger.log("skip newInfos"); |
| curMaster.releaseCopyState(copyState); |
| copyState = null; |
| } |
| } else { |
| // Master is closed |
| Thread.sleep(5); |
| } |
| } else { |
| // Master hasn't started yet |
| Thread.sleep(5); |
| } |
| cond.awaitUninterruptibly(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| // We hold a ref on newInfos at this point |
| |
| // nocommit what if master closes now? |
| |
| if (stop) { |
| if (copyState != null) { |
| curMaster.releaseCopyState(copyState); |
| } |
| break; |
| } |
| |
| // nocommit need to sometimes crash during |
| // replication; ooh: we could just Thread.interrupt()? |
| |
| // nocommit needs to cross the "wire", ie turn into |
| // byte[] and back, and manage the "reservation" |
| // separately on master |
| |
| try { |
| // nocommit just pass copyState |
| replica.sync(curMasterMoveCount, copyState.dir, copyState.files, copyState.infosBytes, copyState.version); |
| } finally { |
| curMaster.releaseCopyState(copyState); |
| } |
| } catch (AlreadyClosedException ace) { |
| // OK: master closed while we were replicating; |
| // we will just retry again against the next master: |
| TreeLogger.log("top: id=" + replica.id + " ignore AlreadyClosedException"); |
| Thread.sleep(5); |
| continue; |
| } |
| } |
| } catch (InterruptedException ie) { |
| throw new RuntimeException(ie); |
| } catch (IOException ioe) { |
| // nocommit how to properly handle... |
| throw new RuntimeException(ioe); |
| } |
| } |
| |
| /** Shuts down the thread and only returns once |
| * it's done. */ |
| public void finish() throws InterruptedException { |
| stop = true; |
| lock.lock(); |
| try { |
| cond.signal(); |
| } finally { |
| lock.unlock(); |
| } |
| join(); |
| } |
| } |
| |
| static class SearchThread extends Thread { |
| private volatile boolean stop; |
| private final InfosSearcherManager mgr; |
| private final Map<Long,Integer> versionDocCounts; |
| private final String id; |
| |
| public SearchThread(String id, InfosSearcherManager mgr, Map<Long,Integer> versionDocCounts) { |
| this.id = id; |
| this.mgr = mgr; |
| this.versionDocCounts = versionDocCounts; |
| setName("SearchThread id=" + id); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| while (stop == false) { |
| IndexSearcher s = mgr.acquire(); |
| try { |
| // Sleep so that sometimes our searcher is "stale": |
| Thread.sleep(_TestUtil.nextInt(random(), 1, 10)); |
| // nocommit do more interesting searches |
| int totalHits = s.search(new MatchAllDocsQuery(), 10).totalHits; |
| if (totalHits > 0) { |
| long version = ((DirectoryReader) s.getIndexReader()).getVersion(); |
| Integer expectedCount = versionDocCounts.get(version); |
| assertNotNull("searcher " + s + " version=" + version + " is missing expected count", expectedCount); |
| // nocommit since master may roll back in time |
| // we cannot assert this: |
| //assertEquals("searcher version=" + version + " replica id=" + id + " searcher=" + s, expectedCount.intValue(), totalHits); |
| } |
| } finally { |
| mgr.release(s); |
| } |
| } |
| } catch (Exception e) { |
| System.out.println("FAILED: id=" + id); |
| e.printStackTrace(System.out); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public void finish() throws InterruptedException { |
| stop = true; |
| join(); |
| } |
| } |
| |
| static int docCount(SegmentInfos infos) { |
| int totDocCount = 0; |
| for(SegmentCommitInfo info : infos) { |
| totDocCount += info.info.getDocCount() - info.getDelCount(); |
| } |
| return totDocCount; |
| } |
| |
| // nocommit factor/share with IFD |
| static class InfosRefCounts { |
| private final Map<String,Integer> refCounts = new HashMap<String,Integer>(); |
| private final Set<String> pending = new HashSet<String>(); |
| private final Directory dir; |
| private final int id; |
| |
| public InfosRefCounts(int id, Directory dir) throws IOException { |
| this.dir = dir; |
| this.id = id; |
| } |
| |
| public synchronized void incRef(Collection<String> fileNames) { |
| for(String fileName : fileNames) { |
| Integer curCount = refCounts.get(fileName); |
| if (curCount == null) { |
| refCounts.put(fileName, 1); |
| } else { |
| refCounts.put(fileName, curCount.intValue() + 1); |
| } |
| |
| // Necessary in case we had tried to delete this |
| // fileName before, it failed, but then it was later |
| // overwritten: |
| pending.remove(fileName); |
| } |
| } |
| |
| public synchronized void decRef(Collection<String> fileNames) { |
| for(String fileName : fileNames) { |
| Integer curCount = refCounts.get(fileName); |
| assert curCount != null: "fileName=" + fileName; |
| assert curCount.intValue() > 0; |
| if (curCount.intValue() == 1) { |
| refCounts.remove(fileName); |
| delete(fileName); |
| } else { |
| refCounts.put(fileName, curCount.intValue() - 1); |
| } |
| } |
| } |
| |
| private synchronized void delete(String fileName) { |
| try { |
| TreeLogger.log("id=" + id + " " + fileName + " now delete"); |
| dir.deleteFile(fileName); |
| } catch (IOException ioe) { |
| // nocommit why do we keep trying to delete the file |
| // "segments" ... |
| TreeLogger.log("id=" + id + " " + fileName + " delete failed; will retry later"); |
| pending.add(fileName); |
| } |
| } |
| |
| public synchronized void deletePending() { |
| Set<String> copy = new HashSet<String>(pending); |
| pending.clear(); |
| for(String fileName : copy) { |
| delete(fileName); |
| } |
| } |
| |
| public synchronized void deleteUnknownFiles() throws IOException { |
| for(String fileName : dir.listAll()) { |
| if (refCounts.containsKey(fileName) == false && |
| fileName.startsWith("segments") == false && |
| fileName.startsWith("checksum") == false) { |
| TreeLogger.log("id=" + id + " delete unknown file \"" + fileName + "\""); |
| delete(fileName); |
| } |
| } |
| } |
| } |
| } |