| package org.apache.lucene.replicator.nrt; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import java.io.BufferedReader; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.UnsupportedEncodingException; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.regex.Pattern; |
| |
| import org.apache.lucene.document.Document; |
| import org.apache.lucene.util.IOUtils; |
| import org.apache.lucene.util.LineFileDocs; |
| import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; |
| import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks; |
| import org.apache.lucene.util.LuceneTestCase; |
| import org.apache.lucene.util.SuppressForbidden; |
| import org.apache.lucene.util.TestUtil; |
| |
| import com.carrotsearch.randomizedtesting.SeedUtils; |
| |
| // MockRandom's .sd file has no index header/footer: |
| @SuppressCodecs({"MockRandom", "Memory", "Direct", "SimpleText"}) |
| @SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure") |
| public class TestNRTReplication extends LuceneTestCase { |
| |
| /** cwd where we start each child (server) node */ |
| private Path childTempDir; |
| |
| final AtomicLong nodeStartCounter = new AtomicLong(); |
| private long nextPrimaryGen; |
| private long lastPrimaryGen; |
| LineFileDocs docs; |
| |
| /** Launches a child "server" (separate JVM), which is either primary or replica node */ |
| @SuppressForbidden(reason = "ProcessBuilder requires java.io.File for CWD") |
| private NodeProcess startNode(int primaryTCPPort, final int id, Path indexPath, long forcePrimaryVersion, boolean willCrash) throws IOException { |
| List<String> cmd = new ArrayList<>(); |
| |
| cmd.add(System.getProperty("java.home") |
| + System.getProperty("file.separator") |
| + "bin" |
| + System.getProperty("file.separator") |
| + "java"); |
| cmd.add("-Xmx512m"); |
| |
| long myPrimaryGen; |
| if (primaryTCPPort != -1) { |
| // I am a replica |
| cmd.add("-Dtests.nrtreplication.primaryTCPPort=" + primaryTCPPort); |
| myPrimaryGen = lastPrimaryGen; |
| } else { |
| myPrimaryGen = nextPrimaryGen++; |
| lastPrimaryGen = myPrimaryGen; |
| } |
| cmd.add("-Dtests.nrtreplication.primaryGen=" + myPrimaryGen); |
| cmd.add("-Dtests.nrtreplication.closeorcrash=false"); |
| |
| cmd.add("-Dtests.nrtreplication.node=true"); |
| cmd.add("-Dtests.nrtreplication.nodeid=" + id); |
| cmd.add("-Dtests.nrtreplication.startNS=" + Node.globalStartNS); |
| cmd.add("-Dtests.nrtreplication.indexpath=" + indexPath); |
| cmd.add("-Dtests.nrtreplication.checkonclose=true"); |
| |
| if (primaryTCPPort == -1) { |
| // We are the primary node |
| cmd.add("-Dtests.nrtreplication.isPrimary=true"); |
| cmd.add("-Dtests.nrtreplication.forcePrimaryVersion=" + forcePrimaryVersion); |
| } |
| |
| // Mixin our own counter because this is called from a fresh thread which means the seed otherwise isn't changing each time we spawn a |
| // new node: |
| long seed = random().nextLong() * nodeStartCounter.incrementAndGet(); |
| |
| cmd.add("-Dtests.seed=" + SeedUtils.formatSeed(seed)); |
| cmd.add("-ea"); |
| cmd.add("-cp"); |
| cmd.add(System.getProperty("java.class.path")); |
| cmd.add("org.junit.runner.JUnitCore"); |
| cmd.add(getClass().getName().replace(getClass().getSimpleName(), "SimpleServer")); |
| |
| message("child process command: " + cmd); |
| ProcessBuilder pb = new ProcessBuilder(cmd); |
| pb.redirectErrorStream(true); |
| |
| // Important, so that the scary looking hs_err_<pid>.log appear under our test temp dir: |
| pb.directory(childTempDir.toFile()); |
| |
| Process p = pb.start(); |
| |
| BufferedReader r; |
| try { |
| r = new BufferedReader(new InputStreamReader(p.getInputStream(), IOUtils.UTF_8)); |
| } catch (UnsupportedEncodingException uee) { |
| throw new RuntimeException(uee); |
| } |
| |
| int tcpPort = -1; |
| long initCommitVersion = -1; |
| long initInfosVersion = -1; |
| Pattern logTimeStart = Pattern.compile("^[0-9\\.]+s .*"); |
| boolean sawExistingSegmentsFile = false; |
| |
| while (true) { |
| String l = r.readLine(); |
| if (l == null) { |
| message("top: node=" + id + " failed to start"); |
| try { |
| p.waitFor(); |
| } catch (InterruptedException ie) { |
| throw new RuntimeException(ie); |
| } |
| message("exit value=" + p.exitValue()); |
| message("top: now fail test replica R" + id + " failed to start"); |
| throw new RuntimeException("replica R" + id + " failed to start"); |
| } |
| |
| if (logTimeStart.matcher(l).matches()) { |
| // Already a well-formed log output: |
| System.out.println(l); |
| } else { |
| message(l); |
| } |
| |
| if (l.startsWith("PORT: ")) { |
| tcpPort = Integer.parseInt(l.substring(6).trim()); |
| } else if (l.startsWith("COMMIT VERSION: ")) { |
| initCommitVersion = Integer.parseInt(l.substring(16).trim()); |
| } else if (l.startsWith("INFOS VERSION: ")) { |
| initInfosVersion = Integer.parseInt(l.substring(15).trim()); |
| } else if (l.contains("will crash after")) { |
| willCrash = true; |
| } else if (l.startsWith("NODE STARTED")) { |
| break; |
| } else if (l.contains("replica cannot start: existing segments file=")) { |
| sawExistingSegmentsFile = true; |
| } |
| } |
| |
| final boolean finalWillCrash = willCrash; |
| |
| // Baby sits the child process, pulling its stdout and printing to our stdout: |
| AtomicBoolean nodeClosing = new AtomicBoolean(); |
| Thread pumper = ThreadPumper.start( |
| new Runnable() { |
| @Override |
| public void run() { |
| message("now wait for process " + p); |
| try { |
| p.waitFor(); |
| } catch (Throwable t) { |
| throw new RuntimeException(t); |
| } |
| |
| message("done wait for process " + p); |
| int exitValue = p.exitValue(); |
| message("exit value=" + exitValue + " willCrash=" + finalWillCrash); |
| if (exitValue != 0 && finalWillCrash == false) { |
| // should fail test |
| throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue); |
| } |
| } |
| }, r, System.out, null, nodeClosing); |
| pumper.setName("pump" + id); |
| |
| message("top: node=" + id + " started at tcpPort=" + tcpPort + " initCommitVersion=" + initCommitVersion + " initInfosVersion=" + initInfosVersion); |
| return new NodeProcess(p, id, tcpPort, pumper, primaryTCPPort == -1, initCommitVersion, initInfosVersion, nodeClosing); |
| } |
| |
| @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| Node.globalStartNS = System.nanoTime(); |
| childTempDir = createTempDir("child"); |
| docs = new LineFileDocs(random()); |
| } |
| |
| @Override |
| public void tearDown() throws Exception { |
| super.tearDown(); |
| docs.close(); |
| } |
| |
| public void testReplicateDeleteAllDocuments() throws Exception { |
| |
| Path primaryPath = createTempDir("primary"); |
| NodeProcess primary = startNode(-1, 0, primaryPath, -1, false); |
| |
| Path replicaPath = createTempDir("replica"); |
| NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, false); |
| |
| // Tell primary current replicas: |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 10 docs into primary: |
| LineFileDocs docs = new LineFileDocs(random()); |
| Connection primaryC = new Connection(primary.tcpPort); |
| primaryC.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(primaryC, doc, false); |
| } |
| |
| // Nothing in replica index yet |
| assertVersionAndHits(replica, 0, 0); |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| |
| // Wait for replica to show the change |
| waitForVersionAndHits(replica, primaryVersion1, 10); |
| |
| // Delete all docs from primary |
| if (random().nextBoolean()) { |
| // Inefficiently: |
| for(int id=0;id<10;id++) { |
| primary.deleteDocument(primaryC, Integer.toString(id)); |
| } |
| } else { |
| // Efficiently: |
| primary.deleteAllDocuments(primaryC); |
| } |
| |
| // Replica still shows 10 docs: |
| assertVersionAndHits(replica, primaryVersion1, 10); |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion2 = primary.flush(0); |
| assertTrue(primaryVersion2 > primaryVersion1); |
| |
| // Wait for replica to show the change |
| waitForVersionAndHits(replica, primaryVersion2, 0); |
| |
| // Index 10 docs again: |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(primaryC, doc, false); |
| } |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion3 = primary.flush(0); |
| assertTrue(primaryVersion3 > primaryVersion2); |
| |
| // Wait for replica to show the change |
| waitForVersionAndHits(replica, primaryVersion3, 10); |
| |
| primaryC.close(); |
| |
| replica.close(); |
| primary.close(); |
| } |
| |
| public void testReplicateForceMerge() throws Exception { |
| |
| Path primaryPath = createTempDir("primary"); |
| NodeProcess primary = startNode(-1, 0, primaryPath, -1, false); |
| |
| Path replicaPath = createTempDir("replica"); |
| NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, false); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 10 docs into primary: |
| LineFileDocs docs = new LineFileDocs(random()); |
| Connection primaryC = new Connection(primary.tcpPort); |
| primaryC.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(primaryC, doc, false); |
| } |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| |
| // Index 10 more docs into primary: |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(primaryC, doc, false); |
| } |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion2 = primary.flush(0); |
| assertTrue(primaryVersion2 > primaryVersion1); |
| |
| primary.forceMerge(primaryC); |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion3 = primary.flush(0); |
| assertTrue(primaryVersion3 > primaryVersion2); |
| |
| // Wait for replica to show the change |
| waitForVersionAndHits(replica, primaryVersion3, 20); |
| |
| primaryC.close(); |
| |
| replica.close(); |
| primary.close(); |
| } |
| |
| // Start up, index 10 docs, replicate, but crash and restart the replica without committing it: |
| public void testReplicaCrashNoCommit() throws Exception { |
| |
| Path primaryPath = createTempDir("primary"); |
| NodeProcess primary = startNode(-1, 0, primaryPath, -1, false); |
| |
| Path replicaPath = createTempDir("replica"); |
| NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, true); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 10 docs into primary: |
| LineFileDocs docs = new LineFileDocs(random()); |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| |
| // Wait for replica to sync up: |
| waitForVersionAndHits(replica, primaryVersion1, 10); |
| |
| // Crash replica: |
| replica.crash(); |
| |
| // Restart replica: |
| replica = startNode(primary.tcpPort, 1, replicaPath, -1, false); |
| |
| // On startup the replica searches the last commit (empty here): |
| assertVersionAndHits(replica, 0, 0); |
| |
| // Ask replica to sync: |
| replica.newNRTPoint(primaryVersion1, 0, primary.tcpPort); |
| waitForVersionAndHits(replica, primaryVersion1, 10); |
| |
| replica.close(); |
| primary.close(); |
| } |
| |
| // Start up, index 10 docs, replicate, commit, crash and restart the replica |
| public void testReplicaCrashWithCommit() throws Exception { |
| |
| Path primaryPath = createTempDir("primary"); |
| NodeProcess primary = startNode(-1, 0, primaryPath, -1, false); |
| |
| Path replicaPath = createTempDir("replica"); |
| NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, true); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 10 docs into primary: |
| LineFileDocs docs = new LineFileDocs(random()); |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| |
| // Wait for replica to sync up: |
| waitForVersionAndHits(replica, primaryVersion1, 10); |
| |
| // Commit and crash replica: |
| replica.commit(); |
| replica.crash(); |
| |
| // Restart replica: |
| replica = startNode(primary.tcpPort, 1, replicaPath, -1, false); |
| |
| // On startup the replica searches the last commit: |
| assertVersionAndHits(replica, primaryVersion1, 10); |
| |
| replica.close(); |
| primary.close(); |
| } |
| |
| // Start up, index 10 docs, replicate, commit, crash, index more docs, replicate, then restart the replica |
| public void testIndexingWhileReplicaIsDown() throws Exception { |
| |
| Path primaryPath = createTempDir("primary"); |
| NodeProcess primary = startNode(-1, 0, primaryPath, -1, false); |
| |
| Path replicaPath = createTempDir("replica"); |
| NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, true); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 10 docs into primary: |
| LineFileDocs docs = new LineFileDocs(random()); |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| |
| // Wait for replica to sync up: |
| waitForVersionAndHits(replica, primaryVersion1, 10); |
| |
| // Commit and crash replica: |
| replica.commit(); |
| replica.crash(); |
| |
| sendReplicasToPrimary(primary); |
| |
| // Index 10 more docs, while replica is down |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // And flush: |
| long primaryVersion2 = primary.flush(0); |
| assertTrue(primaryVersion2 > primaryVersion1); |
| |
| // Now restart replica: |
| replica = startNode(primary.tcpPort, 1, replicaPath, -1, false); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // On startup the replica still searches its last commit: |
| assertVersionAndHits(replica, primaryVersion1, 10); |
| |
| // Now ask replica to sync: |
| replica.newNRTPoint(primaryVersion2, 0, primary.tcpPort); |
| |
| waitForVersionAndHits(replica, primaryVersion2, 20); |
| |
| replica.close(); |
| primary.close(); |
| } |
| |
| // Crash primary and promote a replica |
| public void testCrashPrimary1() throws Exception { |
| |
| Path path1 = createTempDir("1"); |
| NodeProcess primary = startNode(-1, 0, path1, -1, true); |
| |
| Path path2 = createTempDir("2"); |
| NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 10 docs into primary: |
| LineFileDocs docs = new LineFileDocs(random()); |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| |
| // Wait for replica to sync up: |
| waitForVersionAndHits(replica, primaryVersion1, 10); |
| |
| // Crash primary: |
| primary.crash(); |
| |
| // Promote replica: |
| replica.commit(); |
| replica.close(); |
| |
| primary = startNode(-1, 1, path2, -1, false); |
| |
| // Should still see 10 docs: |
| assertVersionAndHits(primary, primaryVersion1, 10); |
| |
| primary.close(); |
| } |
| |
| // Crash primary and then restart it |
| public void testCrashPrimary2() throws Exception { |
| |
| Path path1 = createTempDir("1"); |
| NodeProcess primary = startNode(-1, 0, path1, -1, true); |
| |
| Path path2 = createTempDir("2"); |
| NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 10 docs into primary: |
| LineFileDocs docs = new LineFileDocs(random()); |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| |
| // Wait for replica to sync up: |
| waitForVersionAndHits(replica, primaryVersion1, 10); |
| |
| primary.commit(); |
| |
| // Index 10 docs, but crash before replicating or committing: |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Crash primary: |
| primary.crash(); |
| |
| // Restart it: |
| primary = startNode(-1, 0, path1, -1, true); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 10 more docs |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| long primaryVersion2 = primary.flush(0); |
| assertTrue(primaryVersion2 > primaryVersion1); |
| |
| // Wait for replica to sync up: |
| waitForVersionAndHits(replica, primaryVersion2, 20); |
| |
| primary.close(); |
| replica.close(); |
| } |
| |
| // Crash primary and then restart it, while a replica node is down, then bring replica node back up and make sure it properly "unforks" itself |
| public void testCrashPrimary3() throws Exception { |
| |
| Path path1 = createTempDir("1"); |
| NodeProcess primary = startNode(-1, 0, path1, -1, true); |
| |
| Path path2 = createTempDir("2"); |
| NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 10 docs into primary: |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| |
| // Wait for replica to sync up: |
| waitForVersionAndHits(replica, primaryVersion1, 10); |
| |
| replica.commit(); |
| |
| replica.close(); |
| primary.crash(); |
| |
| // At this point replica is "in the future": it has 10 docs committed, but the primary crashed before committing so it has 0 docs |
| |
| // Restart primary: |
| primary = startNode(-1, 0, path1, -1, true); |
| |
| // Index 20 docs into primary: |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<20;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Flush primary, but there are no replicas to sync to: |
| long primaryVersion2 = primary.flush(0); |
| |
| // Now restart replica, which on init should detect on a "lost branch" because its 10 docs that were committed came from a different |
| // primary node: |
| replica = startNode(primary.tcpPort, 1, path2, -1, true); |
| |
| assertVersionAndHits(replica, primaryVersion2, 20); |
| |
| primary.close(); |
| replica.close(); |
| } |
| |
| public void testCrashPrimaryWhileCopying() throws Exception { |
| |
| Path path1 = createTempDir("1"); |
| NodeProcess primary = startNode(-1, 0, path1, -1, true); |
| |
| Path path2 = createTempDir("2"); |
| NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 100 docs into primary: |
| LineFileDocs docs = new LineFileDocs(random()); |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<100;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Refresh primary, which also pushes (async) to replica: |
| long primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| |
| Thread.sleep(TestUtil.nextInt(random(), 1, 30)); |
| |
| // Crash primary, likely/hopefully while replica is still copying |
| primary.crash(); |
| |
| // Could see either 100 docs (replica finished before crash) or 0 docs: |
| try (Connection c = new Connection(replica.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL); |
| c.flush(); |
| long version = c.in.readVLong(); |
| int hitCount = c.in.readVInt(); |
| if (version == 0) { |
| assertEquals(0, hitCount); |
| } else { |
| assertEquals(primaryVersion1, version); |
| assertEquals(100, hitCount); |
| } |
| } |
| |
| primary.close(); |
| replica.close(); |
| } |
| |
| public void testCrashReplica() throws Exception { |
| |
| Path path1 = createTempDir("1"); |
| NodeProcess primary = startNode(-1, 0, path1, -1, true); |
| |
| Path path2 = createTempDir("2"); |
| NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Index 10 docs into primary: |
| LineFileDocs docs = new LineFileDocs(random()); |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Refresh primary, which also pushes to replica: |
| long primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| |
| // Wait for replica to sync up: |
| waitForVersionAndHits(replica, primaryVersion1, 10); |
| |
| // Crash replica |
| replica.crash(); |
| |
| sendReplicasToPrimary(primary); |
| |
| // Lots of new flushes while replica is down: |
| long primaryVersion2 = 0; |
| for(int iter=0;iter<10;iter++) { |
| // Index 10 docs into primary: |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| primaryVersion2 = primary.flush(0); |
| } |
| |
| // Start up replica again: |
| replica = startNode(primary.tcpPort, 1, path2, -1, true); |
| |
| sendReplicasToPrimary(primary, replica); |
| |
| // Now ask replica to sync: |
| replica.newNRTPoint(primaryVersion2, 0, primary.tcpPort); |
| |
| // Make sure it sees all docs that were indexed while it was down: |
| assertVersionAndHits(primary, primaryVersion2, 110); |
| |
| replica.close(); |
| primary.close(); |
| } |
| |
| public void testFullClusterCrash() throws Exception { |
| |
| Path path1 = createTempDir("1"); |
| NodeProcess primary = startNode(-1, 0, path1, -1, true); |
| |
| Path path2 = createTempDir("2"); |
| NodeProcess replica1 = startNode(primary.tcpPort, 1, path2, -1, true); |
| |
| Path path3 = createTempDir("3"); |
| NodeProcess replica2 = startNode(primary.tcpPort, 2, path3, -1, true); |
| |
| sendReplicasToPrimary(primary, replica1, replica2); |
| |
| // Index 50 docs into primary: |
| LineFileDocs docs = new LineFileDocs(random()); |
| long primaryVersion1 = 0; |
| for (int iter=0;iter<5;iter++) { |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Refresh primary, which also pushes to replicas: |
| primaryVersion1 = primary.flush(0); |
| assertTrue(primaryVersion1 > 0); |
| } |
| |
| // Wait for replicas to sync up: |
| waitForVersionAndHits(replica1, primaryVersion1, 50); |
| waitForVersionAndHits(replica2, primaryVersion1, 50); |
| |
| primary.commit(); |
| replica1.commit(); |
| replica2.commit(); |
| |
| // Index 10 more docs, but don't sync to replicas: |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); |
| for(int i=0;i<10;i++) { |
| Document doc = docs.nextDoc(); |
| primary.addOrUpdateDocument(c, doc, false); |
| } |
| } |
| |
| // Full cluster crash |
| primary.crash(); |
| replica1.crash(); |
| replica2.crash(); |
| |
| // Full cluster restart |
| primary = startNode(-1, 0, path1, -1, true); |
| replica1 = startNode(primary.tcpPort, 1, path2, -1, true); |
| replica2 = startNode(primary.tcpPort, 2, path3, -1, true); |
| |
| // Only 50 because we didn't commit primary before the crash: |
| |
| // It's -1 because it's unpredictable how IW changes segments version on init: |
| assertVersionAndHits(primary, -1, 50); |
| assertVersionAndHits(replica1, primary.initInfosVersion, 50); |
| assertVersionAndHits(replica2, primary.initInfosVersion, 50); |
| |
| primary.close(); |
| replica1.close(); |
| replica2.close(); |
| } |
| |
| /** Tell primary current replicas. */ |
| private void sendReplicasToPrimary(NodeProcess primary, NodeProcess... replicas) throws IOException { |
| try (Connection c = new Connection(primary.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS); |
| c.out.writeVInt(replicas.length); |
| for(int id=0;id<replicas.length;id++) { |
| NodeProcess replica = replicas[id]; |
| c.out.writeVInt(replica.id); |
| c.out.writeVInt(replica.tcpPort); |
| } |
| c.flush(); |
| c.in.readByte(); |
| } |
| } |
| |
| /** Verifies this node is currently searching the specified version with the specified total hit count, or that it eventually does when |
| * keepTrying is true. */ |
| private void assertVersionAndHits(NodeProcess node, long expectedVersion, int expectedHitCount) throws Exception { |
| try (Connection c = new Connection(node.tcpPort)) { |
| c.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL); |
| c.flush(); |
| long version = c.in.readVLong(); |
| int hitCount = c.in.readVInt(); |
| if (expectedVersion != -1) { |
| assertEquals("wrong searcher version, with hitCount=" + hitCount, expectedVersion, version); |
| } |
| assertEquals(expectedHitCount, hitCount); |
| } |
| } |
| |
| private void waitForVersionAndHits(NodeProcess node, long expectedVersion, int expectedHitCount) throws Exception { |
| try (Connection c = new Connection(node.tcpPort)) { |
| while (true) { |
| c.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL); |
| c.flush(); |
| long version = c.in.readVLong(); |
| int hitCount = c.in.readVInt(); |
| |
| if (version == expectedVersion) { |
| assertEquals(expectedHitCount, hitCount); |
| break; |
| } |
| |
| assertTrue(version < expectedVersion); |
| Thread.sleep(10); |
| } |
| } |
| } |
| |
| static void message(String message) { |
| long now = System.nanoTime(); |
| System.out.println(String.format(Locale.ROOT, |
| "%5.3fs : parent [%11s] %s", |
| (now-Node.globalStartNS)/1000000000., |
| Thread.currentThread().getName(), |
| message)); |
| } |
| } |