LUCENE-5438: test seems to pass now, even when master is migrated to an 'old' replica

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/lucene5438@1580522 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java b/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
index 757fb07..a571e12 100644
--- a/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
+++ b/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
@@ -303,12 +303,13 @@
     // happens during commit() or close():
     synchronized(uncacheLock) {
       if (true || VERBOSE) {
-        System.out.println("nrtdir.unCache name=" + fileName);
+        System.out.println(Thread.currentThread().getName() + ": nrtdir.unCache name=" + fileName);
       }
       if (!cache.fileExists(fileName)) {
+        // nocommit why am i hitting this so much?
         // Another thread beat us...
         if (true || VERBOSE) {
-          System.out.println("nrtdir.unCache skip name=" + fileName);
+          System.out.println(Thread.currentThread().getName() + ": nrtdir.unCache skip name=" + fileName);
         }
         return;
       }
diff --git a/lucene/core/src/java/org/apache/lucene/util/TreeLogger.java b/lucene/core/src/java/org/apache/lucene/util/TreeLogger.java
new file mode 100644
index 0000000..af5b5dd
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/util/TreeLogger.java
@@ -0,0 +1,102 @@
+package org.apache.lucene.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TreeLogger {
+  
+  private final List<String> stack = new ArrayList<String>();
+  private String indent = "";
+  private final String label;
+
+  private static Map<Thread,TreeLogger> loggers = new ConcurrentHashMap<Thread,TreeLogger>();
+
+  public static void setLogger(TreeLogger instance) {
+    loggers.put(Thread.currentThread(), instance);
+  }
+
+  public static TreeLogger getLogger() {
+    return loggers.get(Thread.currentThread());
+  }
+
+  public static void start(String label) {
+    loggers.get(Thread.currentThread()).start0(label);
+  }
+
+  public static void end(String label) {
+    loggers.get(Thread.currentThread()).end0(label);
+  }
+
+  public static void log(String message) {
+    loggers.get(Thread.currentThread()).log0(message);
+  }
+
+  public static void log(String message, Throwable t) {
+    loggers.get(Thread.currentThread()).log0(message, t);
+  }
+
+  public TreeLogger(String label) {
+    this.label = label;
+  }
+
+  public void start0(String label) {
+    stack.add(label);
+    indent += "  ";
+  }
+
+  public void end0(String label) {
+    if (stack.isEmpty()) {
+      throw new IllegalStateException(Thread.currentThread().getName() + ": cannot end: label=" + label + " wasn't pushed");
+    }
+    String last = stack.get(stack.size()-1);
+    if (last.equals(label) == false) {
+      throw new IllegalStateException(Thread.currentThread().getName() + ": cannot end: label=" + label + " doesn't match current label=" + last);
+    }
+    stack.remove(stack.size()-1);
+    indent = indent.substring(0, indent.length()-2);
+  }
+
+  public void log0(String message, Throwable t) {
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    t.printStackTrace(pw);
+    log(message + ":\n" + sw.toString());
+  }
+
+  public void log0(String message) {
+    StringBuilder b = new StringBuilder();
+    while (message.startsWith("\n")) {
+      b.append('\n');
+      message = message.substring(1);
+    }
+    b.append('[');
+    b.append(label);
+    b.append("] ");
+    b.append(indent);
+    b.append("- ");
+    b.append(message.replace("\n", "\n" + indent + "  "));
+    System.out.println(b.toString());
+  }
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java b/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java
new file mode 100644
index 0000000..b81b40a
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java
@@ -0,0 +1,245 @@
+package org.apache.lucene.replicator;
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.TreeLogger;
+
+/** Stupid wrapper to inefficietly compute CRC32 checksum of
+ *  every written file after it's closed. */
+
+// nocommit this is silly wrapper until we integrate
+// checkums at a lower lever / written to each file
+
+public class SlowChecksumDirectory extends FilterDirectory {
+
+  private final Checksums checksums;
+
+  // Closed but not yet sync'd:
+  private final Map<String,Long> pendingChecksums = new ConcurrentHashMap<String,Long>();
+
+  public SlowChecksumDirectory(int id, Directory in) throws IOException {
+    super(in);
+    checksums = new Checksums(id, in);
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    IndexOutput other = in.createOutput(name, context);
+    return new SlowChecksumIndexOutput(this, name, other);
+  }
+
+  /** Stupid: this is called when an output is closed, and
+   *  we go and re-read it to compute the checksum. */
+  void outputClosed(String name) throws IOException {
+    IndexInput input = in.openInput(name, IOContext.READONCE);
+    try {
+      byte[] bytes = new byte[4096];
+      long bytesLeft = in.fileLength(name);
+
+      Checksum checksum = new CRC32();
+      while (bytesLeft > 0) {
+        int chunk = (int) Math.min(bytesLeft, bytes.length);
+        input.readBytes(bytes, 0, chunk);
+        checksum.update(bytes, 0, chunk);
+        bytesLeft -= chunk;
+      }
+
+      long value = checksum.getValue();
+      System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " " + name + ": record pending checksum=" + value);
+      pendingChecksums.put(name, value);
+    } finally {
+      input.close();
+    }
+  }
+
+  public Long getChecksum(String name) {
+    Long v = pendingChecksums.get(name);
+    if (v != null) {
+      return v;
+    }
+    v = checksums.get(name);
+    if (v != null) {
+      return v;
+    }
+    return null;
+  }
+
+  public void sync(Collection<String> names) throws IOException {
+    System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " sync " + names);
+    in.sync(names);
+    for(String name : names) {
+      Long v = pendingChecksums.get(name);
+      if (v == null) {
+        assert checksums.get(name) != null: "name=" + name + " has no checksum";
+      } else {
+        checksums.add(name, v.longValue(), false);
+      }
+    }
+    checksums.save();
+
+    for(String name : names) {
+      pendingChecksums.remove(name);
+    }
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " " + name + " now delete");
+    in.deleteFile(name);
+    pendingChecksums.remove(name);
+    checksums.remove(name);
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      checksums.save();
+    } finally {
+      in.close();
+    }
+  }
+
+  static class Checksums {
+
+    // nocommit we need to remove old checksum file when
+    // writing new one:
+
+    private static final String FILE_NAME_PREFIX = "checksums";
+    private static final String CHECKSUM_CODEC = "checksum";
+    private static final int CHECKSUM_VERSION_START = 0;
+    private static final int CHECKSUM_VERSION_CURRENT = CHECKSUM_VERSION_START;
+
+    // nocommit need to sometimes prune this map
+
+    final Map<String,Long> checksums = new HashMap<String,Long>();
+    private final Directory dir;
+    final int id;
+
+    private long nextWriteGen;
+
+    // nocommit need to test crashing after writing checksum
+    // & before committing
+
+    public Checksums(int id, Directory dir) throws IOException {
+      this.id = id;
+      this.dir = dir;
+      long maxGen = -1;
+      for (String fileName : dir.listAll()) {
+        if (fileName.startsWith(FILE_NAME_PREFIX)) {
+          long gen = Long.parseLong(fileName.substring(1+FILE_NAME_PREFIX.length()),
+                                    Character.MAX_RADIX);
+          if (gen > maxGen) {
+            maxGen = gen;
+          }
+        }
+      }
+
+      while (maxGen > -1) {
+        IndexInput in = dir.openInput(genToFileName(maxGen), IOContext.DEFAULT);
+        try {
+          int version = CodecUtil.checkHeader(in, CHECKSUM_CODEC, CHECKSUM_VERSION_START, CHECKSUM_VERSION_START);
+          if (version != CHECKSUM_VERSION_START) {
+            throw new CorruptIndexException("wrong checksum version");
+          }
+          int count = in.readVInt();
+          for(int i=0;i<count;i++) {
+            String name = in.readString();
+            long checksum = in.readLong();
+            checksums.put(name, checksum);
+          }
+          nextWriteGen = maxGen+1;
+          System.out.println(Thread.currentThread().getName() + ": id=" + id + " " + genToFileName(maxGen) + " loaded checksums");
+          break;
+        } catch (IOException ioe) {
+          // This file was truncated, probably due to
+          // crashing w/o syncing:
+          maxGen--;
+        } finally {
+          in.close();
+        }
+      }
+    }
+
+    public synchronized void remove(String name) {
+      checksums.remove(name);
+    }
+
+    private static String genToFileName(long gen) {
+      return FILE_NAME_PREFIX + "_" + Long.toString(gen, Character.MAX_RADIX);
+    }
+    
+    public synchronized void add(String fileName, long checksum, boolean mustMatch) {
+      Long oldChecksum = checksums.put(fileName, checksum);
+      if (oldChecksum == null) {
+        System.out.println(Thread.currentThread().getName() + ": id=" + id + " " + fileName + " record checksum=" + checksum);
+      } else if (oldChecksum.longValue() != checksum) {
+        System.out.println(Thread.currentThread().getName() + ": id=" + id + " " + fileName + " record new checksum=" + checksum + " replaces old checksum=" + oldChecksum);
+      }
+      // cannot assert this: in the "master rolled back"
+      // case, this can easily happen:
+      // assert mustMatch == false || oldChecksum == null || oldChecksum.longValue() == checksum: "fileName=" + fileName + " oldChecksum=" + oldChecksum + " newChecksum=" + checksum;
+    }
+
+    public synchronized Long get(String name) {
+      return checksums.get(name);
+    }
+
+    public synchronized void save() throws IOException {
+      String fileName = genToFileName(nextWriteGen++);
+      System.out.println(Thread.currentThread().getName() + " id=" + id + " save checksums to file \"" + fileName + "\"; files=" + checksums.keySet());
+      IndexOutput out = dir.createOutput(fileName, IOContext.DEFAULT);
+      try {
+        CodecUtil.writeHeader(out, CHECKSUM_CODEC, CHECKSUM_VERSION_CURRENT);
+        out.writeVInt(checksums.size());
+        for(Map.Entry<String,Long> ent : checksums.entrySet()) {
+          out.writeString(ent.getKey());
+          out.writeLong(ent.getValue());
+        }
+      } finally {
+        out.close();
+      }
+
+      dir.sync(Collections.singletonList(fileName));
+
+      if (nextWriteGen > 1) {
+        String oldFileName = genToFileName(nextWriteGen-2);
+        try {
+          dir.deleteFile(oldFileName);
+        } catch (IOException ioe) {
+        }
+      }
+    }
+  }
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumIndexOutput.java b/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumIndexOutput.java
new file mode 100644
index 0000000..11e1d49
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumIndexOutput.java
@@ -0,0 +1,73 @@
+package org.apache.lucene.replicator;
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import org.apache.lucene.store.IndexOutput;
+import java.io.IOException;
+
+/** Silly: sole purpose is to tell SlowChecksumDirectory when it
+ *  closed! */
+class SlowChecksumIndexOutput extends IndexOutput {
+  private final IndexOutput in;
+  private final SlowChecksumDirectory dir;
+  private final String name;
+
+  public SlowChecksumIndexOutput(SlowChecksumDirectory dir, String name, IndexOutput in) {
+    this.in = in;
+    this.dir = dir;
+    this.name = name;
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    in.writeByte(b);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    in.writeBytes(b, offset, length);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    in.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    in.close();
+    dir.outputClosed(name);
+  }
+
+  @Override
+  public long getFilePointer() {
+    return in.getFilePointer();
+  }
+
+  @Override
+  public long length() throws IOException {
+    return in.length();
+  }
+
+  @Override
+  public void setLength(long length) throws IOException {
+    in.setLength(length);
+  }
+}
+
+
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java
index 0de9611..41802bf 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java
@@ -89,6 +89,12 @@
 import org.apache.lucene.util._TestUtil;
 import org.junit.AfterClass;
 
+// nocommit add fang: master crashes
+
+// nocommit what about network partitioning
+
+// nocommit make MDW throw exceptions sometimes
+
 // nocommit test warming merge w/ O_DIRECT via NativeUnixDir
 
 // nocommit test rare bit errors during copy
@@ -108,11 +114,17 @@
 // a down replica (ie, not just promoting an already running
 // replica)
 
+// nocommit rewrite the test so each node has its own
+// threads, to be closer to the concurrency we'd "really"
+// see across N machines
+
+// nocommit checksum_N files never delete!
+
 @SuppressCodecs({ "SimpleText", "Memory", "Direct" })
 public class TestNRTReplication extends LuceneTestCase {
 
   static volatile Master master;
-  static Lock masterLock = new ReentrantLock();
+  static ReentrantLock masterLock = new ReentrantLock();
 
   @AfterClass
   public static void afterClass() {
@@ -129,6 +141,88 @@
     }
   }
 
+  /*
+  private static Map<String,Long> globalState = new HashMap<>();
+
+  private static void setGlobalStateKeyVal(String key, long value) {
+    Long cur = globalState.get(key);
+    assert cur == null || cur <= value;
+    TreeLogger.log("  push " + key + " cur=" + cur + " new=" + value);
+    globalState.put(key, value);
+  }
+  */
+
+  /** Called just before IndexWriter finishCommit on the
+   *  current master, to push "next write gens" to global
+   *  state. */
+  /*
+  static void pushGlobalState(SegmentInfos infos) {
+    TreeLogger.log("TEST: now pushGlobalState");
+    TreeLogger.start("pushGlobalState");    
+    // NOTE: assumed externally sync'd, i.e. only one master
+    // across the cluster at a time
+    setGlobalStateKeyVal("segmentsGen", infos.getGeneration());
+    // nocommit weird that we must add 2 :)
+    setGlobalStateKeyVal("segmentsVersion", infos.getVersion()+2);
+
+    // Used to generate next segment file name:
+    setGlobalStateKeyVal("segmentsCounter", (long) infos.counter);
+
+    for(SegmentCommitInfo info : infos) {
+      setGlobalStateKeyVal(info.info.name + "_delGen",  info.getNextDelGen());
+      setGlobalStateKeyVal(info.info.name + "_fieldInfosGen",  info.getNextFieldInfosGen());
+    }
+    TreeLogger.end("pushGlobalState");
+  }
+  */
+
+  /** Called just before init of a new writer, to pull the
+   *  "next write gens" and set them in the current infos. */
+  /*
+  static void pullGlobalState(SegmentInfos infos) {
+    TreeLogger.log("TEST: now pullGlobalState");
+    TreeLogger.start("pullGlobalState");
+    Long v = globalState.get("segmentsGen");
+    if (v == null) {
+      TreeLogger.log("no global state yet; skip");
+      TreeLogger.end("pullGlobalState");
+      return;
+    }
+    TreeLogger.log("pull global gen=" + v + " vs cur=" + infos.getGeneration());
+    assert infos.getGeneration() <= v.longValue(): "infos.generation=" + infos.getGeneration() + " global.generation=" + v;
+    infos.setGeneration(v.longValue());
+
+    v = globalState.get("segmentsVersion");
+    assert v != null;
+    assert infos.version <= v.longValue(): "infos.version=" + infos.version + " global.version=" + v;
+    TreeLogger.log("pull global version=" + v + " vs cur=" + infos.version);
+    infos.version = v.longValue();
+
+    v = globalState.get("segmentsCounter");
+    assert v != null;
+    assert infos.counter <= v.longValue(): "infos.counter=" + infos.counter + " global.counter=" + v;
+    TreeLogger.log("pull global counter=" + v + " vs cur=" + infos.counter);
+    infos.counter = v.intValue();
+
+    for(SegmentCommitInfo info : infos) {
+      String key = info.info.name + "_delGen";
+      v = globalState.get(key);
+      long value = v == null ? 1 : v.longValue();
+      assert info.getNextDelGen() <= value: "seg=" + info.info.name + " delGen=" + info.getNextDelGen() + " vs global=" + value;
+      TreeLogger.log("pull global del gen=" + v + " for seg=" + info.info.name + " vs cur=" + info.getNextDelGen());
+      info.setNextDelGen(value);
+
+      key = info.info.name + "_fieldInfosGen";
+      v = globalState.get(key);
+      value = v == null ? 1 : v.longValue();
+      assert info.getNextFieldInfosGen() <= value: "seg=" + info.info.name + " fieldInfosGen=" + info.getNextFieldInfosGen() + " vs global=" + value;
+      TreeLogger.log("pull global fieldInfos gen= " + v + " for seg=" + info.info.name + " vs cur=" + info.getNextFieldInfosGen());
+      info.setNextFieldInfosGen(value);
+    }
+    TreeLogger.end("pullGlobalState");
+  }
+  */
+  
   private void _test() throws Exception {
 
     TreeLogger.setLogger(new TreeLogger("main"));
@@ -186,7 +280,7 @@
       } else {
         if (random().nextInt(100) == 57) {
           closeMaster = true;
-          TreeLogger.log("now move master");
+          TreeLogger.log("top: id=" + master.id + " now move master");
           // Commits & closes current master and pull the
           // infos of the final commit:
           masterLock.lock();
@@ -195,7 +289,7 @@
           } finally {
             masterLock.unlock();
           }
-          TreeLogger.log("done shutdown master");
+          TreeLogger.log("top: done shutdown master; version=" + infos.version);
         } else {
 
           // Have writer do a full flush, and return the
@@ -231,7 +325,7 @@
         // already moved those bytes to disk...
 
         int totDocCount = docCount(infos);
-        String extra = " master.sizeInBytes=" + ((NRTCachingDirectory) master.dir).sizeInBytes();
+        String extra = " master.sizeInBytes=" + ((NRTCachingDirectory) master.dir.getDelegate()).sizeInBytes();
 
         TreeLogger.log("replicate docCount=" + totDocCount + " version=" + infos.version + extra + " segments=" + infos.toString(master.dir));
 
@@ -258,7 +352,7 @@
             // nocommit improve this: load each file ONCE,
             // push to the N replicas that need it
             try {
-              r.sync(master.dir, master.checksums, filesMetaData, infosBytes, infos.version);
+              r.sync(master.dir, filesMetaData, infosBytes, infos.version);
             } catch (AlreadyClosedException ace) {
               // Ignore this: it means the replica shut down
               // while we were trying to sync.  This
@@ -282,9 +376,8 @@
         // needs to:
         master.setInfos(infos);
       } else {
-        // nocommit awkward to do this "after close";
+
         // clean this up:
-        master.checksums.save(new HashSet<String>(infos.files(master.dir, true)));
         TreeLogger.log("close old master dir dir.listAll()=" + Arrays.toString(master.dir.listAll()));
         master.dir.close();
 
@@ -293,7 +386,11 @@
 
           nodes[master.id] = null;
 
-          // Must pick newest replica to promote
+          // nocommit go back to picking random replica
+
+          // Must pick newest replica to promote, else we
+          // can't overwrite open files when trying to copy
+          // to the newer replicas:
           int bestIDX = -1;
           long highestVersion = -1;
           for(int idx=0;idx<nodes.length;idx++) {
@@ -317,12 +414,13 @@
           }
 
           if (nodes[idx] == null) {
-            // Directly start up Master:
-            TreeLogger.log("id=" + idx + " promote down node to master");
+            // Start up Master from scratch:
+            TreeLogger.log("top: id=" + idx + " promote down node to master");
             nodes[idx] = master = new Master(dirs[idx], idx, nodes, versionDocCounts);
           } else {
+            // Promote a running replica to Master:
             assert nodes[idx] instanceof Replica;
-            TreeLogger.log("id=" + idx + " promote replica to master");
+            TreeLogger.log("top: id=" + idx + " promote replica to master");
             master = new Master((Replica) nodes[idx], nodes);
             nodes[idx] = master;
           }
@@ -368,20 +466,15 @@
   static Map<String,FileMetaData> getFilesMetaData(Master master, Collection<String> files) throws IOException {
     Map<String,FileMetaData> filesMetaData = new HashMap<String,FileMetaData>();
     for(String file : files) {
-      Long prevChecksum = master.checksums.get(file);
-      long checksum;
-      if (prevChecksum == null) {
-        checksum = -1;
-      } else {
-        checksum = prevChecksum.longValue();
-      }
-      filesMetaData.put(file, new FileMetaData(master.dir.fileLength(file), checksum));
+      Long checksum = master.dir.getChecksum(file);
+      assert checksum != null;
+      filesMetaData.put(file, new FileMetaData(master.dir.fileLength(file), checksum.longValue()));
     }
 
     return filesMetaData;
   }
 
-  static Set<String> copyFiles(Directory src, Checksums srcChecksums, Replica dst, Map<String,FileMetaData> filesMetaData, boolean lowPriority) throws IOException {
+  static Set<String> copyFiles(SlowChecksumDirectory src, Replica dst, Map<String,FileMetaData> filesMetaData, boolean lowPriority) throws IOException {
     long t0 = System.currentTimeMillis();
     long totBytes = 0;
     Set<String> toCopy = new HashSet<String>();
@@ -389,10 +482,11 @@
       String fileName = ent.getKey();
       // nocommit remove now unused metaData.checksum
       FileMetaData metaData = ent.getValue();
-      Long srcChecksum0 = srcChecksums.get(fileName);
-      long srcChecksum = srcChecksum0 == null ? -1 : srcChecksum0.longValue();
-      
-      Long checksum = dst.checksums.get(fileName);
+      Long srcChecksum0 = src.getChecksum(fileName);
+      assert srcChecksum0 != null: "id=" + dst.id + " name=" + fileName;
+      long srcChecksum = srcChecksum0.longValue();
+
+      Long checksum = dst.dir.getChecksum(fileName);
       if (dst.dir.fileExists(fileName) == false) {
         TreeLogger.log("id=" + dst.id + " " + fileName + " will copy [does not exist] length=" + metaData.sizeInBytes + " srcChecksum=" + srcChecksum);
         toCopy.add(fileName);
@@ -410,7 +504,7 @@
         toCopy.add(fileName);
         totBytes += metaData.sizeInBytes;
       } else {
-        TreeLogger.log("id=" + dst.id + " " + fileName + " skip copy checksum=" + srcChecksum);
+        TreeLogger.log("id=" + dst.id + " " + fileName + " skip copy checksum=" + srcChecksum + " file.length=" + metaData.sizeInBytes);
       }
     }
 
@@ -432,9 +526,23 @@
       totBytes += bytes;
       TreeLogger.log("id=" + dst.id + " " + f + " copy file");
 
-      dst.copyOneFile(dst.id, src, srcChecksums, f, ioContext);
+      dst.copyOneFile(dst.id, src, f, ioContext);
+
+      // nocommit make test that exercises this
+      // Make sure no bits flipped during copy
+      Long v1 = dst.dir.getChecksum(f);
+      assert v1 != null;
+
+      Long v2 = src.getChecksum(f);
+      assert v2 != null;
+
+      if (v1.longValue() != v2.longValue()) {
+        throw new IOException("id=" + dst.id + " " + f + ": copy failed: wrong checksums src=" + v2 + " vs dst=" + v1);
+      }
 
       if (lowPriority) {
+        // Rate limit low priority (copying a merged segment):
+        // nocommit use rate limiter
         try {
           Thread.sleep(bytes/100000);
         } catch (InterruptedException ie) {
@@ -444,7 +552,7 @@
       }
     }
     long t1 = System.currentTimeMillis();
-    TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + "took " + (t1-t0) + " millis for " + totBytes + " bytes; " + toCopy.size() + " files (of " + filesMetaData.size() + "); sizeInBytes=" + ((NRTCachingDirectory) dst.dir).sizeInBytes());
+    TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + "took " + (t1-t0) + " millis for " + totBytes + " bytes; " + toCopy.size() + " files (of " + filesMetaData.size() + "); sizeInBytes=" + ((NRTCachingDirectory) dst.dir.getDelegate()).sizeInBytes());
 
     return toCopy;
   }
@@ -544,24 +652,24 @@
               if (n instanceof Replica) {
                 Replica r = (Replica) n;
                 if (random().nextInt(100) == 17) {
-                  TreeLogger.log("id=" + i + " commit");
+                  TreeLogger.log("top: id=" + i + " commit");
                   r.commit(false);
                 }
                 if (random().nextInt(100) == 17) {
                   // Shutdown this replica
                   nodes[i] = null;
-                  TreeLogger.log("id=" + i + " shutdown");
+                  TreeLogger.log("top: id=" + i + " shutdown replica");
                   r.shutdown();
                 } else if (random().nextInt(100) == 17) {
                   // Crash the replica
                   nodes[i] = null;
-                  TreeLogger.log("id=" + i + " crash");
+                  TreeLogger.log("top: id=" + i + " crash replica");
                   r.crash();
                 }
               } else if (master != null && master.isClosed() == false) {
                 // Randomly commit master:
                 if (random().nextInt(100) == 17) {
-                  TreeLogger.log("id=" + i + " commit master");
+                  TreeLogger.log("top: id=" + i + " commit master");
                   master.commit();
                 }
               }
@@ -570,7 +678,7 @@
               try {
                 nodes[i] = new Replica(dirs[i], i, versionDocCounts, null);
               } catch (Throwable t) {
-                TreeLogger.log("id=" + i + " FAIL startup", t);
+                TreeLogger.log("top: id=" + i + " FAIL startup", t);
                 throw t;
               }
             }
@@ -589,11 +697,12 @@
     }
   }
 
+  // nocommit should extend replica and just add writer
   private static class Master {
 
     final Set<String> finishedMergedSegments = Collections.newSetFromMap(new ConcurrentHashMap<String,Boolean>());
 
-    final Directory dir;
+    final SlowChecksumDirectory dir;
     final Object[] nodes;
 
     final RandomIndexWriter writer;
@@ -601,11 +710,11 @@
     final SearchThread searchThread;
     final IndexThread indexThread;
     final int id;
-    final Checksums checksums;
     private boolean isClosed;
 
     SegmentInfos lastInfos;
 
+    /** Start up a master from scratch. */
     public Master(File path,
                   int id, Object[] nodes, Map<Long,Integer> versionDocCounts) throws IOException {
       final MockDirectoryWrapper dirOrig = newMockFSDirectory(path);
@@ -619,16 +728,15 @@
       // nocommit put back
       dirOrig.setCheckIndexOnClose(false);
 
-      dir = new NRTCachingDirectory(dirOrig, 1.0, 10.0);
+      dir = new SlowChecksumDirectory(id, new NRTCachingDirectory(dirOrig, 1.0, 10.0));
       //((NRTCachingDirectory) master).VERBOSE = true;
-      checksums = new Checksums(id, dir);
       SegmentInfos infos = new SegmentInfos();
       try {
         infos.read(dir);
       } catch (IndexNotFoundException infe) {
       }
 
-      checksums.prune(new HashSet<String>(infos.files(dir, true)));
+      //pullGlobalState(infos);
 
       mgr = new InfosSearcherManager(dir, id, infos);
       searchThread = new SearchThread("master", mgr, versionDocCounts);
@@ -656,7 +764,10 @@
     }
 
     public void commit() throws IOException {
-      checksums.save(null);
+      writer.w.prepareCommit();
+      //pushGlobalState(writer.w.getPendingCommit());
+      // It's harmless if we crash here, because the
+      // global state has already been updated
       writer.w.commit();
     }
 
@@ -669,7 +780,6 @@
       this.nodes = nodes;
       this.mgr = replica.mgr;
       this.searchThread = replica.searchThread;
-      this.checksums = replica.checksums;
 
       // nocommit must somehow "stop" this replica?  e.g. we
       // don't want it doing any more deleting?
@@ -682,6 +792,7 @@
       // "survive" over to the replica
 
       SegmentInfos curInfos = replica.mgr.getCurrentInfos().clone();
+      //pullGlobalState(curInfos);
 
       writer = new RandomIndexWriter(random(), dir, curInfos, iwc);
       _TestUtil.reduceOpenFiles(writer.w);
@@ -717,7 +828,7 @@
               if (n != null && n instanceof Replica) {
                 try {
                   // nocommit do we need to check for merge aborted...?
-                  ((Replica) n).warmMerge(info.info.name, dir, Master.this.checksums, filesMetaData);
+                  ((Replica) n).warmMerge(info.info.name, dir, filesMetaData);
                 } catch (AlreadyClosedException ace) {
                   // Ignore this: it means the replica shut down
                   // while we were trying to copy files.  This
@@ -771,6 +882,8 @@
       // nocommit can we optionally NOT commit here?  it
       // should be decoupled from master migration?
 
+      commit();
+
       // Don't wait for merges now; we already did above:
       writer.close(false);
       TreeLogger.log("done close writer");
@@ -814,6 +927,7 @@
 
     public IndexThread(Master master) {
       this.master = master;
+      setName("IndexThread id=" + master.id);
     }
 
     @Override
@@ -849,13 +963,12 @@
 
   private static class Replica {
     final int id;
-    final Directory dir;
+    final SlowChecksumDirectory dir;
     final InfosRefCounts deleter;
 
     private final InfosSearcherManager mgr;
     private volatile boolean stop;
     private SearchThread searchThread;
-    private final Checksums checksums;
 
     private final Collection<String> lastCommitFiles;
     private final Collection<String> lastNRTFiles;
@@ -864,7 +977,7 @@
     private final IndexReaderWarmer mergedSegmentWarmer;
 
     public Replica(File path, int id, Map<Long,Integer> versionDocCounts, IndexReaderWarmer mergedSegmentWarmer) throws IOException {
-      TreeLogger.log("id=" + id + " replica startup path=" + path);
+      TreeLogger.log("top: id=" + id + " replica startup path=" + path);
       TreeLogger.start("startup");
       
       this.id = id;
@@ -878,11 +991,10 @@
       // nocommit put back
       ((BaseDirectoryWrapper) fsDir).setCheckIndexOnClose(false);
 
-      dir = new NRTCachingDirectory(fsDir, 1.0, 10.0);
-      checksums = new Checksums(id, dir);
+      dir = new SlowChecksumDirectory(id, new NRTCachingDirectory(fsDir, 1.0, 10.0));
       TreeLogger.log("id=" + id + " created dirs, checksums; dir.listAll=" + Arrays.toString(dir.listAll()));
-      TreeLogger.log("id=" + id + " checksum files=" + checksums.checksums.keySet());
 
+      // nocommit don't need copiedFiles anymore:
       // Startup sync to pull latest index over:
       Set<String> copiedFiles = null;
 
@@ -903,36 +1015,32 @@
       // need to be overwritten when the master is against
       // an older index than our copy:
       SegmentInfos infos = null;
-      if (master != null) {
-        masterLock.lock();
+      assert master == null || masterLock.isLocked();
+      if (master != null && master.isClosed() == false) {
         SegmentInfos masterInfos = null;
         try {
+          masterInfos = master.getInfos();
+          // Convert infos to byte[], to send "on the wire":
+          RAMOutputStream out = new RAMOutputStream();
+          masterInfos.write(out);
+          byte[] infosBytes = new byte[(int) out.getFilePointer()];
+          out.writeTo(infosBytes, 0);
 
-          if (master.isClosed() == false) {
-            masterInfos = master.getInfos();
-            // Convert infos to byte[], to send "on the wire":
-            RAMOutputStream out = new RAMOutputStream();
-            masterInfos.write(out);
-            byte[] infosBytes = new byte[(int) out.getFilePointer()];
-            out.writeTo(infosBytes, 0);
+          Map<String,FileMetaData> filesMetaData = getFilesMetaData(master, masterInfos.files(master.dir, false));
 
-            Map<String,FileMetaData> filesMetaData = getFilesMetaData(master, masterInfos.files(master.dir, false));
-
-            try {
-              // Copy files over to replica:
-              copiedFiles = copyFiles(master.dir, master.checksums, this, filesMetaData, false);
-            } catch (Throwable t) {
-              TreeLogger.log("id=" + id + " FAIL", t);
-              throw new RuntimeException(t);
-            }
-      
-            // Turn byte[] back to SegmentInfos:
-            infos = new SegmentInfos();
-            infos.read(dir, new ByteArrayDataInput(infosBytes));
-            lastNRTFiles.addAll(infos.files(dir, false));
+          try {
+            // Copy files over to replica:
+            copiedFiles = copyFiles(master.dir, this, filesMetaData, false);
+          } catch (Throwable t) {
+            TreeLogger.log("id=" + id + " FAIL", t);
+            throw new RuntimeException(t);
           }
+      
+          // Turn byte[] back to SegmentInfos:
+          infos = new SegmentInfos();
+          infos.read(dir, new ByteArrayDataInput(infosBytes));
+          lastNRTFiles.addAll(infos.files(dir, false));
         } finally {
-          masterLock.unlock();
           if (masterInfos != null) {
             master.releaseInfos(masterInfos);
           }
@@ -985,59 +1093,22 @@
       Set<String> validFiles = new HashSet<String>();
       validFiles.addAll(lastNRTFiles);
       validFiles.addAll(lastCommitFiles);
-      checksums.prune(validFiles);
       TreeLogger.end("pruneChecksums");
     }
 
-    public void copyOneFile(int id, Directory src, Checksums srcChecksums, String fileName, IOContext context) throws IOException {
-      IndexOutput os = null;
-      IndexInput is = null;
-      IOException priorException = null;
+    public void copyOneFile(int id, Directory src, String fileName, IOContext context) throws IOException {
       try {
-        os = dir.createOutput(fileName, context);
-        is = src.openInput(fileName, context);
-        byte[] copyBuffer = new byte[16384];
-        long left = is.length();
-        Checksum checksum = new Adler32();
-        while (left > 0) {
-          final int toCopy;
-          if (left > copyBuffer.length) {
-            toCopy = copyBuffer.length;
-          } else {
-            toCopy = (int) left;
-          }
-          is.readBytes(copyBuffer, 0, toCopy);
-          os.writeBytes(copyBuffer, 0, toCopy);
-          checksum.update(copyBuffer, 0, toCopy);
-          left -= toCopy;
-        }
-        long finalChecksum = checksum.getValue();
-        checksums.add(fileName, finalChecksum, false);
-        srcChecksums.add(fileName, finalChecksum, true);
+        src.copy(dir, fileName, fileName, context);
       } catch (IOException ioe) {
-        priorException = ioe;
         TreeLogger.log("id=" + id + " " + fileName + " failed copy1", ioe);
-      } finally {
-        boolean success = false;
-        try {
-          IOUtils.closeWhileHandlingException(priorException, os, is);
-          success = true;
-        } finally {
-          if (!success) {
-            TreeLogger.log("id=" + id + " " + fileName + " failed copy2");
-            try {
-              dir.deleteFile(fileName);
-            } catch (Throwable t) {
-            }
-          }
-        }
+        throw ioe;
       }
     }
 
     // nocommit move this to a thread so N replicas copy at
     // once:
 
-    public synchronized Set<String> sync(Directory master, Checksums masterChecksums, Map<String,FileMetaData> filesMetaData, byte[] infosBytes,
+    public synchronized Set<String> sync(SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData, byte[] infosBytes,
                                          long infosVersion) throws IOException {
 
       if (stop) {
@@ -1056,7 +1127,7 @@
       */
 
       // Copy files over to replica:
-      Set<String> copiedFiles = copyFiles(master, masterChecksums, this, filesMetaData, false);
+      Set<String> copiedFiles = copyFiles(master, this, filesMetaData, false);
 
       // Turn byte[] back to SegmentInfos:
       SegmentInfos infos = new SegmentInfos();
@@ -1095,12 +1166,12 @@
       return copiedFiles;
     }
 
-    public synchronized void warmMerge(String segmentName, Directory master, Checksums masterChecksums, Map<String,FileMetaData> filesMetaData) throws IOException {
+    public synchronized void warmMerge(String segmentName, SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData) throws IOException {
       if (stop) {
         throw new AlreadyClosedException("replica closed");
       }
       TreeLogger.log("id=" + id + " replica warm merge " + segmentName);
-      Set<String> copiedFiles = copyFiles(master, masterChecksums, this, filesMetaData, true);
+      Set<String> copiedFiles = copyFiles(master, this, filesMetaData, true);
 
       // nocommit factor out & share this invalidation
 
@@ -1153,11 +1224,11 @@
     public synchronized void crash() throws IOException, InterruptedException {
       stop = true;
       TreeLogger.log("id=" + id + " replica crash; dir.listAll()=" + Arrays.toString(dir.listAll()));
-      TreeLogger.log("id=" + id + " replica crash; fsdir.listAll()=" + Arrays.toString(((NRTCachingDirectory) dir).getDelegate().listAll()));
+      TreeLogger.log("id=" + id + " replica crash; fsdir.listAll()=" + Arrays.toString(((NRTCachingDirectory) dir.getDelegate()).getDelegate().listAll()));
       searchThread.finish();
       mgr.close();
-      ((MockDirectoryWrapper) ((NRTCachingDirectory) dir).getDelegate()).crash();
-      ((NRTCachingDirectory) dir).getDelegate().close();
+      ((MockDirectoryWrapper) ((NRTCachingDirectory) dir.getDelegate()).getDelegate()).crash();
+      ((NRTCachingDirectory) dir.getDelegate()).getDelegate().close();
     }
 
     /** Commit latest SegmentInfos (fsync'ing all referenced
@@ -1172,7 +1243,6 @@
         // Can only save those files that have been
         // explicitly sync'd; a warmed, but not yet visible,
         // merge cannot be sync'd:
-        checksums.save(fileNames);
         infos.commit(dir);
         TreeLogger.log("id=" + id + " " + infos.getSegmentsFileName() + " committed");
 
@@ -1206,6 +1276,7 @@
       this.id = id;
       this.mgr = mgr;
       this.versionDocCounts = versionDocCounts;
+      setName("SearchThread id=" + id);
     }
 
     @Override
@@ -1320,129 +1391,4 @@
     }
   }
 
-  // nocommit unused currently:
-  static class Checksums {
-
-    // nocommit we need to remove old checksum file when
-    // writing new one:
-
-    private static final String FILE_NAME_PREFIX = "checksums";
-    private static final String CHECKSUM_CODEC = "checksum";
-    private static final int CHECKSUM_VERSION_START = 0;
-    private static final int CHECKSUM_VERSION_CURRENT = CHECKSUM_VERSION_START;
-
-    // nocommit need to sometimes prune this map
-
-    final Map<String,Long> checksums = new HashMap<String,Long>();
-    private final Directory dir;
-    private final int id;
-
-    private long nextWriteGen;
-
-    // nocommit need to test crashing after writing checksum
-    // & before committing
-
-    public Checksums(int id, Directory dir) throws IOException {
-      this.id = id;
-      this.dir = dir;
-      long maxGen = -1;
-      for (String fileName : dir.listAll()) {
-        if (fileName.startsWith(FILE_NAME_PREFIX)) {
-          long gen = Long.parseLong(fileName.substring(1+FILE_NAME_PREFIX.length()),
-                                    Character.MAX_RADIX);
-          if (gen > maxGen) {
-            maxGen = gen;
-          }
-        }
-      }
-
-      while (maxGen > -1) {
-        IndexInput in = dir.openInput(genToFileName(maxGen), IOContext.DEFAULT);
-        try {
-          int version = CodecUtil.checkHeader(in, CHECKSUM_CODEC, CHECKSUM_VERSION_START, CHECKSUM_VERSION_START);
-          if (version != CHECKSUM_VERSION_START) {
-            throw new CorruptIndexException("wrong checksum version");
-          }
-          int count = in.readVInt();
-          for(int i=0;i<count;i++) {
-            String name = in.readString();
-            long checksum = in.readLong();
-            checksums.put(name, checksum);
-          }
-          nextWriteGen = maxGen+1;
-          TreeLogger.log("id=" + id + " " + genToFileName(maxGen) + " loaded checksums");
-          break;
-        } catch (IOException ioe) {
-          // This file was truncated, probably due to
-          // crashing w/o syncing:
-          maxGen--;
-        } finally {
-          in.close();
-        }
-      }
-    }
-
-    private static String genToFileName(long gen) {
-      return FILE_NAME_PREFIX + "_" + Long.toString(gen, Character.MAX_RADIX);
-    }
-    
-    public synchronized void add(String fileName, long checksum, boolean mustMatch) {
-      Long oldChecksum = checksums.put(fileName, checksum);
-      if (oldChecksum == null) {
-        TreeLogger.log("id=" + id + " " + fileName + " record checksum=" + checksum);
-      } else if (oldChecksum.longValue() != checksum) {
-        TreeLogger.log("id=" + id + " " + fileName + " record new checksum=" + checksum + " replaces old checksum=" + oldChecksum);
-      }
-      // cannot assert this: in the "master rolled back"
-      // case, this can easily happen:
-      // assert mustMatch == false || oldChecksum == null || oldChecksum.longValue() == checksum: "fileName=" + fileName + " oldChecksum=" + oldChecksum + " newChecksum=" + checksum;
-    }
-
-    public synchronized Long get(String name) {
-      return checksums.get(name);
-    }
-
-    public synchronized void save(Set<String> toSave) throws IOException {
-      String fileName = genToFileName(nextWriteGen++);
-      TreeLogger.log("id=" + id + " save checksums to file \"" + fileName + "\"; files=" + checksums.keySet());
-      IndexOutput out = dir.createOutput(fileName, IOContext.DEFAULT);
-      try {
-        CodecUtil.writeHeader(out, CHECKSUM_CODEC, CHECKSUM_VERSION_CURRENT);
-        int count;
-        if (toSave == null) {
-          count = checksums.size();
-        } else {
-          count = 0;
-          for(Map.Entry<String,Long> ent : checksums.entrySet()) {
-            if (toSave.contains(ent.getKey())) {
-              count++;
-            }
-          }
-        }
-
-        out.writeVInt(count);
-        for(Map.Entry<String,Long> ent : checksums.entrySet()) {
-          if (toSave == null || toSave.contains(ent.getKey())) {
-            out.writeString(ent.getKey());
-            out.writeLong(ent.getValue());
-          }
-        }
-      } finally {
-        out.close();
-      }
-
-      dir.sync(Collections.singletonList(fileName));
-    }
-
-    public synchronized void prune(Set<String> validFileNames) {
-      Iterator<String> it = checksums.keySet().iterator();
-      while (it.hasNext()) {
-        String fileName = it.next();
-        if (validFileNames.contains(fileName) == false) {
-          it.remove();
-          TreeLogger.log("id=" + id + " " + fileName + " pruned from checksums");
-        }
-      }
-    }
-  }
 }
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index b37d461..01514ce 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -999,7 +999,7 @@
   public static BaseDirectoryWrapper newDirectory(Random r, Directory d) throws IOException {
     Directory impl = newDirectoryImpl(r, TEST_DIRECTORY);
     for (String file : d.listAll()) {
-     d.copy(impl, file, file, newIOContext(r));
+      d.copy(impl, file, file, newIOContext(r));
     }
     return wrapDirectory(r, impl, rarely(r));
   }