LUCENE-5438: test seems to pass now, even when master is migrated to an 'old' replica
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/lucene5438@1580522 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java b/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
index 757fb07..a571e12 100644
--- a/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
+++ b/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
@@ -303,12 +303,13 @@
// happens during commit() or close():
synchronized(uncacheLock) {
if (true || VERBOSE) {
- System.out.println("nrtdir.unCache name=" + fileName);
+ System.out.println(Thread.currentThread().getName() + ": nrtdir.unCache name=" + fileName);
}
if (!cache.fileExists(fileName)) {
+ // nocommit why am i hitting this so much?
// Another thread beat us...
if (true || VERBOSE) {
- System.out.println("nrtdir.unCache skip name=" + fileName);
+ System.out.println(Thread.currentThread().getName() + ": nrtdir.unCache skip name=" + fileName);
}
return;
}
diff --git a/lucene/core/src/java/org/apache/lucene/util/TreeLogger.java b/lucene/core/src/java/org/apache/lucene/util/TreeLogger.java
new file mode 100644
index 0000000..af5b5dd
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/util/TreeLogger.java
@@ -0,0 +1,102 @@
+package org.apache.lucene.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TreeLogger {
+
+ private final List<String> stack = new ArrayList<String>();
+ private String indent = "";
+ private final String label;
+
+ private static Map<Thread,TreeLogger> loggers = new ConcurrentHashMap<Thread,TreeLogger>();
+
+ public static void setLogger(TreeLogger instance) {
+ loggers.put(Thread.currentThread(), instance);
+ }
+
+ public static TreeLogger getLogger() {
+ return loggers.get(Thread.currentThread());
+ }
+
+ public static void start(String label) {
+ loggers.get(Thread.currentThread()).start0(label);
+ }
+
+ public static void end(String label) {
+ loggers.get(Thread.currentThread()).end0(label);
+ }
+
+ public static void log(String message) {
+ loggers.get(Thread.currentThread()).log0(message);
+ }
+
+ public static void log(String message, Throwable t) {
+ loggers.get(Thread.currentThread()).log0(message, t);
+ }
+
+ public TreeLogger(String label) {
+ this.label = label;
+ }
+
+ public void start0(String label) {
+ stack.add(label);
+ indent += " ";
+ }
+
+ public void end0(String label) {
+ if (stack.isEmpty()) {
+ throw new IllegalStateException(Thread.currentThread().getName() + ": cannot end: label=" + label + " wasn't pushed");
+ }
+ String last = stack.get(stack.size()-1);
+ if (last.equals(label) == false) {
+ throw new IllegalStateException(Thread.currentThread().getName() + ": cannot end: label=" + label + " doesn't match current label=" + last);
+ }
+ stack.remove(stack.size()-1);
+ indent = indent.substring(0, indent.length()-2);
+ }
+
+ public void log0(String message, Throwable t) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ t.printStackTrace(pw);
+ log(message + ":\n" + sw.toString());
+ }
+
+ public void log0(String message) {
+ StringBuilder b = new StringBuilder();
+ while (message.startsWith("\n")) {
+ b.append('\n');
+ message = message.substring(1);
+ }
+ b.append('[');
+ b.append(label);
+ b.append("] ");
+ b.append(indent);
+ b.append("- ");
+ b.append(message.replace("\n", "\n" + indent + " "));
+ System.out.println(b.toString());
+ }
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java b/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java
new file mode 100644
index 0000000..b81b40a
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java
@@ -0,0 +1,245 @@
+package org.apache.lucene.replicator;
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.TreeLogger;
+
+/** Stupid wrapper to inefficietly compute CRC32 checksum of
+ * every written file after it's closed. */
+
+// nocommit this is silly wrapper until we integrate
+// checkums at a lower lever / written to each file
+
+public class SlowChecksumDirectory extends FilterDirectory {
+
+ private final Checksums checksums;
+
+ // Closed but not yet sync'd:
+ private final Map<String,Long> pendingChecksums = new ConcurrentHashMap<String,Long>();
+
+ public SlowChecksumDirectory(int id, Directory in) throws IOException {
+ super(in);
+ checksums = new Checksums(id, in);
+ }
+
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws IOException {
+ IndexOutput other = in.createOutput(name, context);
+ return new SlowChecksumIndexOutput(this, name, other);
+ }
+
+ /** Stupid: this is called when an output is closed, and
+ * we go and re-read it to compute the checksum. */
+ void outputClosed(String name) throws IOException {
+ IndexInput input = in.openInput(name, IOContext.READONCE);
+ try {
+ byte[] bytes = new byte[4096];
+ long bytesLeft = in.fileLength(name);
+
+ Checksum checksum = new CRC32();
+ while (bytesLeft > 0) {
+ int chunk = (int) Math.min(bytesLeft, bytes.length);
+ input.readBytes(bytes, 0, chunk);
+ checksum.update(bytes, 0, chunk);
+ bytesLeft -= chunk;
+ }
+
+ long value = checksum.getValue();
+ System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " " + name + ": record pending checksum=" + value);
+ pendingChecksums.put(name, value);
+ } finally {
+ input.close();
+ }
+ }
+
+ public Long getChecksum(String name) {
+ Long v = pendingChecksums.get(name);
+ if (v != null) {
+ return v;
+ }
+ v = checksums.get(name);
+ if (v != null) {
+ return v;
+ }
+ return null;
+ }
+
+ public void sync(Collection<String> names) throws IOException {
+ System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " sync " + names);
+ in.sync(names);
+ for(String name : names) {
+ Long v = pendingChecksums.get(name);
+ if (v == null) {
+ assert checksums.get(name) != null: "name=" + name + " has no checksum";
+ } else {
+ checksums.add(name, v.longValue(), false);
+ }
+ }
+ checksums.save();
+
+ for(String name : names) {
+ pendingChecksums.remove(name);
+ }
+ }
+
+ @Override
+ public void deleteFile(String name) throws IOException {
+ System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " " + name + " now delete");
+ in.deleteFile(name);
+ pendingChecksums.remove(name);
+ checksums.remove(name);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ checksums.save();
+ } finally {
+ in.close();
+ }
+ }
+
+ static class Checksums {
+
+ // nocommit we need to remove old checksum file when
+ // writing new one:
+
+ private static final String FILE_NAME_PREFIX = "checksums";
+ private static final String CHECKSUM_CODEC = "checksum";
+ private static final int CHECKSUM_VERSION_START = 0;
+ private static final int CHECKSUM_VERSION_CURRENT = CHECKSUM_VERSION_START;
+
+ // nocommit need to sometimes prune this map
+
+ final Map<String,Long> checksums = new HashMap<String,Long>();
+ private final Directory dir;
+ final int id;
+
+ private long nextWriteGen;
+
+ // nocommit need to test crashing after writing checksum
+ // & before committing
+
+ public Checksums(int id, Directory dir) throws IOException {
+ this.id = id;
+ this.dir = dir;
+ long maxGen = -1;
+ for (String fileName : dir.listAll()) {
+ if (fileName.startsWith(FILE_NAME_PREFIX)) {
+ long gen = Long.parseLong(fileName.substring(1+FILE_NAME_PREFIX.length()),
+ Character.MAX_RADIX);
+ if (gen > maxGen) {
+ maxGen = gen;
+ }
+ }
+ }
+
+ while (maxGen > -1) {
+ IndexInput in = dir.openInput(genToFileName(maxGen), IOContext.DEFAULT);
+ try {
+ int version = CodecUtil.checkHeader(in, CHECKSUM_CODEC, CHECKSUM_VERSION_START, CHECKSUM_VERSION_START);
+ if (version != CHECKSUM_VERSION_START) {
+ throw new CorruptIndexException("wrong checksum version");
+ }
+ int count = in.readVInt();
+ for(int i=0;i<count;i++) {
+ String name = in.readString();
+ long checksum = in.readLong();
+ checksums.put(name, checksum);
+ }
+ nextWriteGen = maxGen+1;
+ System.out.println(Thread.currentThread().getName() + ": id=" + id + " " + genToFileName(maxGen) + " loaded checksums");
+ break;
+ } catch (IOException ioe) {
+ // This file was truncated, probably due to
+ // crashing w/o syncing:
+ maxGen--;
+ } finally {
+ in.close();
+ }
+ }
+ }
+
+ public synchronized void remove(String name) {
+ checksums.remove(name);
+ }
+
+ private static String genToFileName(long gen) {
+ return FILE_NAME_PREFIX + "_" + Long.toString(gen, Character.MAX_RADIX);
+ }
+
+ public synchronized void add(String fileName, long checksum, boolean mustMatch) {
+ Long oldChecksum = checksums.put(fileName, checksum);
+ if (oldChecksum == null) {
+ System.out.println(Thread.currentThread().getName() + ": id=" + id + " " + fileName + " record checksum=" + checksum);
+ } else if (oldChecksum.longValue() != checksum) {
+ System.out.println(Thread.currentThread().getName() + ": id=" + id + " " + fileName + " record new checksum=" + checksum + " replaces old checksum=" + oldChecksum);
+ }
+ // cannot assert this: in the "master rolled back"
+ // case, this can easily happen:
+ // assert mustMatch == false || oldChecksum == null || oldChecksum.longValue() == checksum: "fileName=" + fileName + " oldChecksum=" + oldChecksum + " newChecksum=" + checksum;
+ }
+
+ public synchronized Long get(String name) {
+ return checksums.get(name);
+ }
+
+ public synchronized void save() throws IOException {
+ String fileName = genToFileName(nextWriteGen++);
+ System.out.println(Thread.currentThread().getName() + " id=" + id + " save checksums to file \"" + fileName + "\"; files=" + checksums.keySet());
+ IndexOutput out = dir.createOutput(fileName, IOContext.DEFAULT);
+ try {
+ CodecUtil.writeHeader(out, CHECKSUM_CODEC, CHECKSUM_VERSION_CURRENT);
+ out.writeVInt(checksums.size());
+ for(Map.Entry<String,Long> ent : checksums.entrySet()) {
+ out.writeString(ent.getKey());
+ out.writeLong(ent.getValue());
+ }
+ } finally {
+ out.close();
+ }
+
+ dir.sync(Collections.singletonList(fileName));
+
+ if (nextWriteGen > 1) {
+ String oldFileName = genToFileName(nextWriteGen-2);
+ try {
+ dir.deleteFile(oldFileName);
+ } catch (IOException ioe) {
+ }
+ }
+ }
+ }
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumIndexOutput.java b/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumIndexOutput.java
new file mode 100644
index 0000000..11e1d49
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumIndexOutput.java
@@ -0,0 +1,73 @@
+package org.apache.lucene.replicator;
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import org.apache.lucene.store.IndexOutput;
+import java.io.IOException;
+
+/** Silly: sole purpose is to tell SlowChecksumDirectory when it
+ * closed! */
+class SlowChecksumIndexOutput extends IndexOutput {
+ private final IndexOutput in;
+ private final SlowChecksumDirectory dir;
+ private final String name;
+
+ public SlowChecksumIndexOutput(SlowChecksumDirectory dir, String name, IndexOutput in) {
+ this.in = in;
+ this.dir = dir;
+ this.name = name;
+ }
+
+ @Override
+ public void writeByte(byte b) throws IOException {
+ in.writeByte(b);
+ }
+
+ @Override
+ public void writeBytes(byte[] b, int offset, int length) throws IOException {
+ in.writeBytes(b, offset, length);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ in.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ dir.outputClosed(name);
+ }
+
+ @Override
+ public long getFilePointer() {
+ return in.getFilePointer();
+ }
+
+ @Override
+ public long length() throws IOException {
+ return in.length();
+ }
+
+ @Override
+ public void setLength(long length) throws IOException {
+ in.setLength(length);
+ }
+}
+
+
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java
index 0de9611..41802bf 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java
@@ -89,6 +89,12 @@
import org.apache.lucene.util._TestUtil;
import org.junit.AfterClass;
+// nocommit add fang: master crashes
+
+// nocommit what about network partitioning
+
+// nocommit make MDW throw exceptions sometimes
+
// nocommit test warming merge w/ O_DIRECT via NativeUnixDir
// nocommit test rare bit errors during copy
@@ -108,11 +114,17 @@
// a down replica (ie, not just promoting an already running
// replica)
+// nocommit rewrite the test so each node has its own
+// threads, to be closer to the concurrency we'd "really"
+// see across N machines
+
+// nocommit checksum_N files never delete!
+
@SuppressCodecs({ "SimpleText", "Memory", "Direct" })
public class TestNRTReplication extends LuceneTestCase {
static volatile Master master;
- static Lock masterLock = new ReentrantLock();
+ static ReentrantLock masterLock = new ReentrantLock();
@AfterClass
public static void afterClass() {
@@ -129,6 +141,88 @@
}
}
+ /*
+ private static Map<String,Long> globalState = new HashMap<>();
+
+ private static void setGlobalStateKeyVal(String key, long value) {
+ Long cur = globalState.get(key);
+ assert cur == null || cur <= value;
+ TreeLogger.log(" push " + key + " cur=" + cur + " new=" + value);
+ globalState.put(key, value);
+ }
+ */
+
+ /** Called just before IndexWriter finishCommit on the
+ * current master, to push "next write gens" to global
+ * state. */
+ /*
+ static void pushGlobalState(SegmentInfos infos) {
+ TreeLogger.log("TEST: now pushGlobalState");
+ TreeLogger.start("pushGlobalState");
+ // NOTE: assumed externally sync'd, i.e. only one master
+ // across the cluster at a time
+ setGlobalStateKeyVal("segmentsGen", infos.getGeneration());
+ // nocommit weird that we must add 2 :)
+ setGlobalStateKeyVal("segmentsVersion", infos.getVersion()+2);
+
+ // Used to generate next segment file name:
+ setGlobalStateKeyVal("segmentsCounter", (long) infos.counter);
+
+ for(SegmentCommitInfo info : infos) {
+ setGlobalStateKeyVal(info.info.name + "_delGen", info.getNextDelGen());
+ setGlobalStateKeyVal(info.info.name + "_fieldInfosGen", info.getNextFieldInfosGen());
+ }
+ TreeLogger.end("pushGlobalState");
+ }
+ */
+
+ /** Called just before init of a new writer, to pull the
+ * "next write gens" and set them in the current infos. */
+ /*
+ static void pullGlobalState(SegmentInfos infos) {
+ TreeLogger.log("TEST: now pullGlobalState");
+ TreeLogger.start("pullGlobalState");
+ Long v = globalState.get("segmentsGen");
+ if (v == null) {
+ TreeLogger.log("no global state yet; skip");
+ TreeLogger.end("pullGlobalState");
+ return;
+ }
+ TreeLogger.log("pull global gen=" + v + " vs cur=" + infos.getGeneration());
+ assert infos.getGeneration() <= v.longValue(): "infos.generation=" + infos.getGeneration() + " global.generation=" + v;
+ infos.setGeneration(v.longValue());
+
+ v = globalState.get("segmentsVersion");
+ assert v != null;
+ assert infos.version <= v.longValue(): "infos.version=" + infos.version + " global.version=" + v;
+ TreeLogger.log("pull global version=" + v + " vs cur=" + infos.version);
+ infos.version = v.longValue();
+
+ v = globalState.get("segmentsCounter");
+ assert v != null;
+ assert infos.counter <= v.longValue(): "infos.counter=" + infos.counter + " global.counter=" + v;
+ TreeLogger.log("pull global counter=" + v + " vs cur=" + infos.counter);
+ infos.counter = v.intValue();
+
+ for(SegmentCommitInfo info : infos) {
+ String key = info.info.name + "_delGen";
+ v = globalState.get(key);
+ long value = v == null ? 1 : v.longValue();
+ assert info.getNextDelGen() <= value: "seg=" + info.info.name + " delGen=" + info.getNextDelGen() + " vs global=" + value;
+ TreeLogger.log("pull global del gen=" + v + " for seg=" + info.info.name + " vs cur=" + info.getNextDelGen());
+ info.setNextDelGen(value);
+
+ key = info.info.name + "_fieldInfosGen";
+ v = globalState.get(key);
+ value = v == null ? 1 : v.longValue();
+ assert info.getNextFieldInfosGen() <= value: "seg=" + info.info.name + " fieldInfosGen=" + info.getNextFieldInfosGen() + " vs global=" + value;
+ TreeLogger.log("pull global fieldInfos gen= " + v + " for seg=" + info.info.name + " vs cur=" + info.getNextFieldInfosGen());
+ info.setNextFieldInfosGen(value);
+ }
+ TreeLogger.end("pullGlobalState");
+ }
+ */
+
private void _test() throws Exception {
TreeLogger.setLogger(new TreeLogger("main"));
@@ -186,7 +280,7 @@
} else {
if (random().nextInt(100) == 57) {
closeMaster = true;
- TreeLogger.log("now move master");
+ TreeLogger.log("top: id=" + master.id + " now move master");
// Commits & closes current master and pull the
// infos of the final commit:
masterLock.lock();
@@ -195,7 +289,7 @@
} finally {
masterLock.unlock();
}
- TreeLogger.log("done shutdown master");
+ TreeLogger.log("top: done shutdown master; version=" + infos.version);
} else {
// Have writer do a full flush, and return the
@@ -231,7 +325,7 @@
// already moved those bytes to disk...
int totDocCount = docCount(infos);
- String extra = " master.sizeInBytes=" + ((NRTCachingDirectory) master.dir).sizeInBytes();
+ String extra = " master.sizeInBytes=" + ((NRTCachingDirectory) master.dir.getDelegate()).sizeInBytes();
TreeLogger.log("replicate docCount=" + totDocCount + " version=" + infos.version + extra + " segments=" + infos.toString(master.dir));
@@ -258,7 +352,7 @@
// nocommit improve this: load each file ONCE,
// push to the N replicas that need it
try {
- r.sync(master.dir, master.checksums, filesMetaData, infosBytes, infos.version);
+ r.sync(master.dir, filesMetaData, infosBytes, infos.version);
} catch (AlreadyClosedException ace) {
// Ignore this: it means the replica shut down
// while we were trying to sync. This
@@ -282,9 +376,8 @@
// needs to:
master.setInfos(infos);
} else {
- // nocommit awkward to do this "after close";
+
// clean this up:
- master.checksums.save(new HashSet<String>(infos.files(master.dir, true)));
TreeLogger.log("close old master dir dir.listAll()=" + Arrays.toString(master.dir.listAll()));
master.dir.close();
@@ -293,7 +386,11 @@
nodes[master.id] = null;
- // Must pick newest replica to promote
+ // nocommit go back to picking random replica
+
+ // Must pick newest replica to promote, else we
+ // can't overwrite open files when trying to copy
+ // to the newer replicas:
int bestIDX = -1;
long highestVersion = -1;
for(int idx=0;idx<nodes.length;idx++) {
@@ -317,12 +414,13 @@
}
if (nodes[idx] == null) {
- // Directly start up Master:
- TreeLogger.log("id=" + idx + " promote down node to master");
+ // Start up Master from scratch:
+ TreeLogger.log("top: id=" + idx + " promote down node to master");
nodes[idx] = master = new Master(dirs[idx], idx, nodes, versionDocCounts);
} else {
+ // Promote a running replica to Master:
assert nodes[idx] instanceof Replica;
- TreeLogger.log("id=" + idx + " promote replica to master");
+ TreeLogger.log("top: id=" + idx + " promote replica to master");
master = new Master((Replica) nodes[idx], nodes);
nodes[idx] = master;
}
@@ -368,20 +466,15 @@
static Map<String,FileMetaData> getFilesMetaData(Master master, Collection<String> files) throws IOException {
Map<String,FileMetaData> filesMetaData = new HashMap<String,FileMetaData>();
for(String file : files) {
- Long prevChecksum = master.checksums.get(file);
- long checksum;
- if (prevChecksum == null) {
- checksum = -1;
- } else {
- checksum = prevChecksum.longValue();
- }
- filesMetaData.put(file, new FileMetaData(master.dir.fileLength(file), checksum));
+ Long checksum = master.dir.getChecksum(file);
+ assert checksum != null;
+ filesMetaData.put(file, new FileMetaData(master.dir.fileLength(file), checksum.longValue()));
}
return filesMetaData;
}
- static Set<String> copyFiles(Directory src, Checksums srcChecksums, Replica dst, Map<String,FileMetaData> filesMetaData, boolean lowPriority) throws IOException {
+ static Set<String> copyFiles(SlowChecksumDirectory src, Replica dst, Map<String,FileMetaData> filesMetaData, boolean lowPriority) throws IOException {
long t0 = System.currentTimeMillis();
long totBytes = 0;
Set<String> toCopy = new HashSet<String>();
@@ -389,10 +482,11 @@
String fileName = ent.getKey();
// nocommit remove now unused metaData.checksum
FileMetaData metaData = ent.getValue();
- Long srcChecksum0 = srcChecksums.get(fileName);
- long srcChecksum = srcChecksum0 == null ? -1 : srcChecksum0.longValue();
-
- Long checksum = dst.checksums.get(fileName);
+ Long srcChecksum0 = src.getChecksum(fileName);
+ assert srcChecksum0 != null: "id=" + dst.id + " name=" + fileName;
+ long srcChecksum = srcChecksum0.longValue();
+
+ Long checksum = dst.dir.getChecksum(fileName);
if (dst.dir.fileExists(fileName) == false) {
TreeLogger.log("id=" + dst.id + " " + fileName + " will copy [does not exist] length=" + metaData.sizeInBytes + " srcChecksum=" + srcChecksum);
toCopy.add(fileName);
@@ -410,7 +504,7 @@
toCopy.add(fileName);
totBytes += metaData.sizeInBytes;
} else {
- TreeLogger.log("id=" + dst.id + " " + fileName + " skip copy checksum=" + srcChecksum);
+ TreeLogger.log("id=" + dst.id + " " + fileName + " skip copy checksum=" + srcChecksum + " file.length=" + metaData.sizeInBytes);
}
}
@@ -432,9 +526,23 @@
totBytes += bytes;
TreeLogger.log("id=" + dst.id + " " + f + " copy file");
- dst.copyOneFile(dst.id, src, srcChecksums, f, ioContext);
+ dst.copyOneFile(dst.id, src, f, ioContext);
+
+ // nocommit make test that exercises this
+ // Make sure no bits flipped during copy
+ Long v1 = dst.dir.getChecksum(f);
+ assert v1 != null;
+
+ Long v2 = src.getChecksum(f);
+ assert v2 != null;
+
+ if (v1.longValue() != v2.longValue()) {
+ throw new IOException("id=" + dst.id + " " + f + ": copy failed: wrong checksums src=" + v2 + " vs dst=" + v1);
+ }
if (lowPriority) {
+ // Rate limit low priority (copying a merged segment):
+ // nocommit use rate limiter
try {
Thread.sleep(bytes/100000);
} catch (InterruptedException ie) {
@@ -444,7 +552,7 @@
}
}
long t1 = System.currentTimeMillis();
- TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + "took " + (t1-t0) + " millis for " + totBytes + " bytes; " + toCopy.size() + " files (of " + filesMetaData.size() + "); sizeInBytes=" + ((NRTCachingDirectory) dst.dir).sizeInBytes());
+ TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + "took " + (t1-t0) + " millis for " + totBytes + " bytes; " + toCopy.size() + " files (of " + filesMetaData.size() + "); sizeInBytes=" + ((NRTCachingDirectory) dst.dir.getDelegate()).sizeInBytes());
return toCopy;
}
@@ -544,24 +652,24 @@
if (n instanceof Replica) {
Replica r = (Replica) n;
if (random().nextInt(100) == 17) {
- TreeLogger.log("id=" + i + " commit");
+ TreeLogger.log("top: id=" + i + " commit");
r.commit(false);
}
if (random().nextInt(100) == 17) {
// Shutdown this replica
nodes[i] = null;
- TreeLogger.log("id=" + i + " shutdown");
+ TreeLogger.log("top: id=" + i + " shutdown replica");
r.shutdown();
} else if (random().nextInt(100) == 17) {
// Crash the replica
nodes[i] = null;
- TreeLogger.log("id=" + i + " crash");
+ TreeLogger.log("top: id=" + i + " crash replica");
r.crash();
}
} else if (master != null && master.isClosed() == false) {
// Randomly commit master:
if (random().nextInt(100) == 17) {
- TreeLogger.log("id=" + i + " commit master");
+ TreeLogger.log("top: id=" + i + " commit master");
master.commit();
}
}
@@ -570,7 +678,7 @@
try {
nodes[i] = new Replica(dirs[i], i, versionDocCounts, null);
} catch (Throwable t) {
- TreeLogger.log("id=" + i + " FAIL startup", t);
+ TreeLogger.log("top: id=" + i + " FAIL startup", t);
throw t;
}
}
@@ -589,11 +697,12 @@
}
}
+ // nocommit should extend replica and just add writer
private static class Master {
final Set<String> finishedMergedSegments = Collections.newSetFromMap(new ConcurrentHashMap<String,Boolean>());
- final Directory dir;
+ final SlowChecksumDirectory dir;
final Object[] nodes;
final RandomIndexWriter writer;
@@ -601,11 +710,11 @@
final SearchThread searchThread;
final IndexThread indexThread;
final int id;
- final Checksums checksums;
private boolean isClosed;
SegmentInfos lastInfos;
+ /** Start up a master from scratch. */
public Master(File path,
int id, Object[] nodes, Map<Long,Integer> versionDocCounts) throws IOException {
final MockDirectoryWrapper dirOrig = newMockFSDirectory(path);
@@ -619,16 +728,15 @@
// nocommit put back
dirOrig.setCheckIndexOnClose(false);
- dir = new NRTCachingDirectory(dirOrig, 1.0, 10.0);
+ dir = new SlowChecksumDirectory(id, new NRTCachingDirectory(dirOrig, 1.0, 10.0));
//((NRTCachingDirectory) master).VERBOSE = true;
- checksums = new Checksums(id, dir);
SegmentInfos infos = new SegmentInfos();
try {
infos.read(dir);
} catch (IndexNotFoundException infe) {
}
- checksums.prune(new HashSet<String>(infos.files(dir, true)));
+ //pullGlobalState(infos);
mgr = new InfosSearcherManager(dir, id, infos);
searchThread = new SearchThread("master", mgr, versionDocCounts);
@@ -656,7 +764,10 @@
}
public void commit() throws IOException {
- checksums.save(null);
+ writer.w.prepareCommit();
+ //pushGlobalState(writer.w.getPendingCommit());
+ // It's harmless if we crash here, because the
+ // global state has already been updated
writer.w.commit();
}
@@ -669,7 +780,6 @@
this.nodes = nodes;
this.mgr = replica.mgr;
this.searchThread = replica.searchThread;
- this.checksums = replica.checksums;
// nocommit must somehow "stop" this replica? e.g. we
// don't want it doing any more deleting?
@@ -682,6 +792,7 @@
// "survive" over to the replica
SegmentInfos curInfos = replica.mgr.getCurrentInfos().clone();
+ //pullGlobalState(curInfos);
writer = new RandomIndexWriter(random(), dir, curInfos, iwc);
_TestUtil.reduceOpenFiles(writer.w);
@@ -717,7 +828,7 @@
if (n != null && n instanceof Replica) {
try {
// nocommit do we need to check for merge aborted...?
- ((Replica) n).warmMerge(info.info.name, dir, Master.this.checksums, filesMetaData);
+ ((Replica) n).warmMerge(info.info.name, dir, filesMetaData);
} catch (AlreadyClosedException ace) {
// Ignore this: it means the replica shut down
// while we were trying to copy files. This
@@ -771,6 +882,8 @@
// nocommit can we optionally NOT commit here? it
// should be decoupled from master migration?
+ commit();
+
// Don't wait for merges now; we already did above:
writer.close(false);
TreeLogger.log("done close writer");
@@ -814,6 +927,7 @@
public IndexThread(Master master) {
this.master = master;
+ setName("IndexThread id=" + master.id);
}
@Override
@@ -849,13 +963,12 @@
private static class Replica {
final int id;
- final Directory dir;
+ final SlowChecksumDirectory dir;
final InfosRefCounts deleter;
private final InfosSearcherManager mgr;
private volatile boolean stop;
private SearchThread searchThread;
- private final Checksums checksums;
private final Collection<String> lastCommitFiles;
private final Collection<String> lastNRTFiles;
@@ -864,7 +977,7 @@
private final IndexReaderWarmer mergedSegmentWarmer;
public Replica(File path, int id, Map<Long,Integer> versionDocCounts, IndexReaderWarmer mergedSegmentWarmer) throws IOException {
- TreeLogger.log("id=" + id + " replica startup path=" + path);
+ TreeLogger.log("top: id=" + id + " replica startup path=" + path);
TreeLogger.start("startup");
this.id = id;
@@ -878,11 +991,10 @@
// nocommit put back
((BaseDirectoryWrapper) fsDir).setCheckIndexOnClose(false);
- dir = new NRTCachingDirectory(fsDir, 1.0, 10.0);
- checksums = new Checksums(id, dir);
+ dir = new SlowChecksumDirectory(id, new NRTCachingDirectory(fsDir, 1.0, 10.0));
TreeLogger.log("id=" + id + " created dirs, checksums; dir.listAll=" + Arrays.toString(dir.listAll()));
- TreeLogger.log("id=" + id + " checksum files=" + checksums.checksums.keySet());
+ // nocommit don't need copiedFiles anymore:
// Startup sync to pull latest index over:
Set<String> copiedFiles = null;
@@ -903,36 +1015,32 @@
// need to be overwritten when the master is against
// an older index than our copy:
SegmentInfos infos = null;
- if (master != null) {
- masterLock.lock();
+ assert master == null || masterLock.isLocked();
+ if (master != null && master.isClosed() == false) {
SegmentInfos masterInfos = null;
try {
+ masterInfos = master.getInfos();
+ // Convert infos to byte[], to send "on the wire":
+ RAMOutputStream out = new RAMOutputStream();
+ masterInfos.write(out);
+ byte[] infosBytes = new byte[(int) out.getFilePointer()];
+ out.writeTo(infosBytes, 0);
- if (master.isClosed() == false) {
- masterInfos = master.getInfos();
- // Convert infos to byte[], to send "on the wire":
- RAMOutputStream out = new RAMOutputStream();
- masterInfos.write(out);
- byte[] infosBytes = new byte[(int) out.getFilePointer()];
- out.writeTo(infosBytes, 0);
+ Map<String,FileMetaData> filesMetaData = getFilesMetaData(master, masterInfos.files(master.dir, false));
- Map<String,FileMetaData> filesMetaData = getFilesMetaData(master, masterInfos.files(master.dir, false));
-
- try {
- // Copy files over to replica:
- copiedFiles = copyFiles(master.dir, master.checksums, this, filesMetaData, false);
- } catch (Throwable t) {
- TreeLogger.log("id=" + id + " FAIL", t);
- throw new RuntimeException(t);
- }
-
- // Turn byte[] back to SegmentInfos:
- infos = new SegmentInfos();
- infos.read(dir, new ByteArrayDataInput(infosBytes));
- lastNRTFiles.addAll(infos.files(dir, false));
+ try {
+ // Copy files over to replica:
+ copiedFiles = copyFiles(master.dir, this, filesMetaData, false);
+ } catch (Throwable t) {
+ TreeLogger.log("id=" + id + " FAIL", t);
+ throw new RuntimeException(t);
}
+
+ // Turn byte[] back to SegmentInfos:
+ infos = new SegmentInfos();
+ infos.read(dir, new ByteArrayDataInput(infosBytes));
+ lastNRTFiles.addAll(infos.files(dir, false));
} finally {
- masterLock.unlock();
if (masterInfos != null) {
master.releaseInfos(masterInfos);
}
@@ -985,59 +1093,22 @@
Set<String> validFiles = new HashSet<String>();
validFiles.addAll(lastNRTFiles);
validFiles.addAll(lastCommitFiles);
- checksums.prune(validFiles);
TreeLogger.end("pruneChecksums");
}
- public void copyOneFile(int id, Directory src, Checksums srcChecksums, String fileName, IOContext context) throws IOException {
- IndexOutput os = null;
- IndexInput is = null;
- IOException priorException = null;
+ public void copyOneFile(int id, Directory src, String fileName, IOContext context) throws IOException {
try {
- os = dir.createOutput(fileName, context);
- is = src.openInput(fileName, context);
- byte[] copyBuffer = new byte[16384];
- long left = is.length();
- Checksum checksum = new Adler32();
- while (left > 0) {
- final int toCopy;
- if (left > copyBuffer.length) {
- toCopy = copyBuffer.length;
- } else {
- toCopy = (int) left;
- }
- is.readBytes(copyBuffer, 0, toCopy);
- os.writeBytes(copyBuffer, 0, toCopy);
- checksum.update(copyBuffer, 0, toCopy);
- left -= toCopy;
- }
- long finalChecksum = checksum.getValue();
- checksums.add(fileName, finalChecksum, false);
- srcChecksums.add(fileName, finalChecksum, true);
+ src.copy(dir, fileName, fileName, context);
} catch (IOException ioe) {
- priorException = ioe;
TreeLogger.log("id=" + id + " " + fileName + " failed copy1", ioe);
- } finally {
- boolean success = false;
- try {
- IOUtils.closeWhileHandlingException(priorException, os, is);
- success = true;
- } finally {
- if (!success) {
- TreeLogger.log("id=" + id + " " + fileName + " failed copy2");
- try {
- dir.deleteFile(fileName);
- } catch (Throwable t) {
- }
- }
- }
+ throw ioe;
}
}
// nocommit move this to a thread so N replicas copy at
// once:
- public synchronized Set<String> sync(Directory master, Checksums masterChecksums, Map<String,FileMetaData> filesMetaData, byte[] infosBytes,
+ public synchronized Set<String> sync(SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData, byte[] infosBytes,
long infosVersion) throws IOException {
if (stop) {
@@ -1056,7 +1127,7 @@
*/
// Copy files over to replica:
- Set<String> copiedFiles = copyFiles(master, masterChecksums, this, filesMetaData, false);
+ Set<String> copiedFiles = copyFiles(master, this, filesMetaData, false);
// Turn byte[] back to SegmentInfos:
SegmentInfos infos = new SegmentInfos();
@@ -1095,12 +1166,12 @@
return copiedFiles;
}
- public synchronized void warmMerge(String segmentName, Directory master, Checksums masterChecksums, Map<String,FileMetaData> filesMetaData) throws IOException {
+ public synchronized void warmMerge(String segmentName, SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData) throws IOException {
if (stop) {
throw new AlreadyClosedException("replica closed");
}
TreeLogger.log("id=" + id + " replica warm merge " + segmentName);
- Set<String> copiedFiles = copyFiles(master, masterChecksums, this, filesMetaData, true);
+ Set<String> copiedFiles = copyFiles(master, this, filesMetaData, true);
// nocommit factor out & share this invalidation
@@ -1153,11 +1224,11 @@
public synchronized void crash() throws IOException, InterruptedException {
stop = true;
TreeLogger.log("id=" + id + " replica crash; dir.listAll()=" + Arrays.toString(dir.listAll()));
- TreeLogger.log("id=" + id + " replica crash; fsdir.listAll()=" + Arrays.toString(((NRTCachingDirectory) dir).getDelegate().listAll()));
+ TreeLogger.log("id=" + id + " replica crash; fsdir.listAll()=" + Arrays.toString(((NRTCachingDirectory) dir.getDelegate()).getDelegate().listAll()));
searchThread.finish();
mgr.close();
- ((MockDirectoryWrapper) ((NRTCachingDirectory) dir).getDelegate()).crash();
- ((NRTCachingDirectory) dir).getDelegate().close();
+ ((MockDirectoryWrapper) ((NRTCachingDirectory) dir.getDelegate()).getDelegate()).crash();
+ ((NRTCachingDirectory) dir.getDelegate()).getDelegate().close();
}
/** Commit latest SegmentInfos (fsync'ing all referenced
@@ -1172,7 +1243,6 @@
// Can only save those files that have been
// explicitly sync'd; a warmed, but not yet visible,
// merge cannot be sync'd:
- checksums.save(fileNames);
infos.commit(dir);
TreeLogger.log("id=" + id + " " + infos.getSegmentsFileName() + " committed");
@@ -1206,6 +1276,7 @@
this.id = id;
this.mgr = mgr;
this.versionDocCounts = versionDocCounts;
+ setName("SearchThread id=" + id);
}
@Override
@@ -1320,129 +1391,4 @@
}
}
- // nocommit unused currently:
- static class Checksums {
-
- // nocommit we need to remove old checksum file when
- // writing new one:
-
- private static final String FILE_NAME_PREFIX = "checksums";
- private static final String CHECKSUM_CODEC = "checksum";
- private static final int CHECKSUM_VERSION_START = 0;
- private static final int CHECKSUM_VERSION_CURRENT = CHECKSUM_VERSION_START;
-
- // nocommit need to sometimes prune this map
-
- final Map<String,Long> checksums = new HashMap<String,Long>();
- private final Directory dir;
- private final int id;
-
- private long nextWriteGen;
-
- // nocommit need to test crashing after writing checksum
- // & before committing
-
- public Checksums(int id, Directory dir) throws IOException {
- this.id = id;
- this.dir = dir;
- long maxGen = -1;
- for (String fileName : dir.listAll()) {
- if (fileName.startsWith(FILE_NAME_PREFIX)) {
- long gen = Long.parseLong(fileName.substring(1+FILE_NAME_PREFIX.length()),
- Character.MAX_RADIX);
- if (gen > maxGen) {
- maxGen = gen;
- }
- }
- }
-
- while (maxGen > -1) {
- IndexInput in = dir.openInput(genToFileName(maxGen), IOContext.DEFAULT);
- try {
- int version = CodecUtil.checkHeader(in, CHECKSUM_CODEC, CHECKSUM_VERSION_START, CHECKSUM_VERSION_START);
- if (version != CHECKSUM_VERSION_START) {
- throw new CorruptIndexException("wrong checksum version");
- }
- int count = in.readVInt();
- for(int i=0;i<count;i++) {
- String name = in.readString();
- long checksum = in.readLong();
- checksums.put(name, checksum);
- }
- nextWriteGen = maxGen+1;
- TreeLogger.log("id=" + id + " " + genToFileName(maxGen) + " loaded checksums");
- break;
- } catch (IOException ioe) {
- // This file was truncated, probably due to
- // crashing w/o syncing:
- maxGen--;
- } finally {
- in.close();
- }
- }
- }
-
- private static String genToFileName(long gen) {
- return FILE_NAME_PREFIX + "_" + Long.toString(gen, Character.MAX_RADIX);
- }
-
- public synchronized void add(String fileName, long checksum, boolean mustMatch) {
- Long oldChecksum = checksums.put(fileName, checksum);
- if (oldChecksum == null) {
- TreeLogger.log("id=" + id + " " + fileName + " record checksum=" + checksum);
- } else if (oldChecksum.longValue() != checksum) {
- TreeLogger.log("id=" + id + " " + fileName + " record new checksum=" + checksum + " replaces old checksum=" + oldChecksum);
- }
- // cannot assert this: in the "master rolled back"
- // case, this can easily happen:
- // assert mustMatch == false || oldChecksum == null || oldChecksum.longValue() == checksum: "fileName=" + fileName + " oldChecksum=" + oldChecksum + " newChecksum=" + checksum;
- }
-
- public synchronized Long get(String name) {
- return checksums.get(name);
- }
-
- public synchronized void save(Set<String> toSave) throws IOException {
- String fileName = genToFileName(nextWriteGen++);
- TreeLogger.log("id=" + id + " save checksums to file \"" + fileName + "\"; files=" + checksums.keySet());
- IndexOutput out = dir.createOutput(fileName, IOContext.DEFAULT);
- try {
- CodecUtil.writeHeader(out, CHECKSUM_CODEC, CHECKSUM_VERSION_CURRENT);
- int count;
- if (toSave == null) {
- count = checksums.size();
- } else {
- count = 0;
- for(Map.Entry<String,Long> ent : checksums.entrySet()) {
- if (toSave.contains(ent.getKey())) {
- count++;
- }
- }
- }
-
- out.writeVInt(count);
- for(Map.Entry<String,Long> ent : checksums.entrySet()) {
- if (toSave == null || toSave.contains(ent.getKey())) {
- out.writeString(ent.getKey());
- out.writeLong(ent.getValue());
- }
- }
- } finally {
- out.close();
- }
-
- dir.sync(Collections.singletonList(fileName));
- }
-
- public synchronized void prune(Set<String> validFileNames) {
- Iterator<String> it = checksums.keySet().iterator();
- while (it.hasNext()) {
- String fileName = it.next();
- if (validFileNames.contains(fileName) == false) {
- it.remove();
- TreeLogger.log("id=" + id + " " + fileName + " pruned from checksums");
- }
- }
- }
- }
}
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index b37d461..01514ce 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -999,7 +999,7 @@
public static BaseDirectoryWrapper newDirectory(Random r, Directory d) throws IOException {
Directory impl = newDirectoryImpl(r, TEST_DIRECTORY);
for (String file : d.listAll()) {
- d.copy(impl, file, file, newIOContext(r));
+ d.copy(impl, file, file, newIOContext(r));
}
return wrapDirectory(r, impl, rarely(r));
}