| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.search; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayDeque; |
| import java.util.Arrays; |
| import java.util.Deque; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| |
| import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.lucene.util.QuickPatchThreadsFilter; |
| import org.apache.solr.SolrIgnoredThreadsFilter; |
| import org.apache.solr.SolrTestCaseJ4; |
| import org.apache.solr.cloud.hdfs.HdfsTestUtil; |
| import org.apache.solr.common.util.IOUtils; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.common.util.Utils; |
| import org.apache.solr.request.SolrQueryRequest; |
| import org.apache.solr.update.HdfsUpdateLog; |
| import org.apache.solr.update.UpdateHandler; |
| import org.apache.solr.update.UpdateLog; |
| import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; |
| import org.apache.solr.util.BadHdfsThreadsFilter; |
| import org.apache.solr.util.TestInjection; |
| import org.apache.solr.util.TimeOut; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; |
| |
| @ThreadLeakFilters(defaultFilters = true, filters = { |
| SolrIgnoredThreadsFilter.class, |
| QuickPatchThreadsFilter.class, |
| BadHdfsThreadsFilter.class // hdfs currently leaks thread(s) |
| }) |
| // TODO: longer term this should be combined with TestRecovery somehow ?? |
| public class TestRecoveryHdfs extends SolrTestCaseJ4 { |
| // means that we've seen the leader and have version info (i.e. we are a non-leader replica) |
| private static final String FROM_LEADER = DistribPhase.FROMLEADER.toString(); |
| |
| // acquire timeout in seconds. change this to a huge number when debugging to prevent threads from advancing. |
| private static final int TIMEOUT = 60; |
| |
| private static MiniDFSCluster dfsCluster; |
| private static String hdfsUri; |
| private static FileSystem fs; |
| |
| @After |
| public void afterTest() { |
| TestInjection.reset(); // do after every test, don't wait for AfterClass |
| } |
| |
| @BeforeClass |
| public static void beforeClass() throws Exception { |
| dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath()); |
| hdfsUri = HdfsTestUtil.getURI(dfsCluster); |
| |
| try { |
| URI uri = new URI(hdfsUri); |
| Configuration conf = HdfsTestUtil.getClientConfiguration(dfsCluster); |
| fs = FileSystem.get(uri, conf); |
| } catch (IOException | URISyntaxException e) { |
| throw new RuntimeException(e); |
| } |
| |
| System.setProperty("solr.ulog.dir", hdfsUri + "/solr/shard1"); |
| |
| initCore("solrconfig-tlog.xml","schema15.xml"); |
| } |
| |
| @AfterClass |
| public static void afterClass() throws Exception { |
| IOUtils.closeQuietly(fs); |
| fs = null; |
| try { |
| deleteCore(); |
| } finally { |
| try { |
| HdfsTestUtil.teardownClass(dfsCluster); |
| } finally { |
| dfsCluster = null; |
| hdfsUri = null; |
| System.clearProperty("solr.ulog.dir"); |
| System.clearProperty("test.build.data"); |
| System.clearProperty("test.cache.data"); |
| } |
| } |
| } |
| |
| @Test |
| public void testReplicationFactor() throws Exception { |
| clearIndex(); |
| |
| HdfsUpdateLog ulog = (HdfsUpdateLog) h.getCore().getUpdateHandler().getUpdateLog(); |
| |
| assertU(commit()); |
| addAndGetVersion(sdoc("id", "REP1"), null); |
| assertU(commit()); |
| |
| String[] logList = ulog.getLogList(new Path(ulog.getLogDir())); |
| boolean foundRep2 = false; |
| for (String tl : logList) { |
| FileStatus status = fs.getFileStatus(new Path(ulog.getLogDir(), tl)); |
| if (status.getReplication() == 2) { |
| foundRep2 = true; |
| break; |
| } |
| } |
| |
| assertTrue("Expected to find tlogs with a replication factor of 2", foundRep2); |
| } |
| |
| @Test |
| public void testLogReplay() throws Exception { |
| try { |
| TestInjection.skipIndexWriterCommitOnClose = true; |
| final Semaphore logReplay = new Semaphore(0); |
| final Semaphore logReplayFinish = new Semaphore(0); |
| |
| UpdateLog.testing_logReplayHook = () -> { |
| try { |
| assertTrue(logReplay.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }; |
| |
| UpdateLog.testing_logReplayFinishHook = logReplayFinish::release; |
| |
| clearIndex(); |
| assertU(commit()); |
| |
| Deque<Long> versions = new ArrayDeque<>(); |
| versions.addFirst(addAndGetVersion(sdoc("id", "A1"), null)); |
| versions.addFirst(addAndGetVersion(sdoc("id", "A11"), null)); |
| versions.addFirst(addAndGetVersion(sdoc("id", "A12"), null)); |
| versions.addFirst(deleteByQueryAndGetVersion("id:A11", null)); |
| versions.addFirst(addAndGetVersion(sdoc("id", "A13"), null)); |
| |
| assertJQ(req("q","*:*"),"/response/numFound==0"); |
| |
| assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions); |
| |
| h.close(); |
| createCore(); |
| // Solr should kick this off now |
| // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog(); |
| |
| // verify that previous close didn't do a commit |
| // recovery should be blocked by our hook |
| assertJQ(req("q","*:*") ,"/response/numFound==0"); |
| |
| // make sure we can still access versions after a restart |
| assertJQ(req("qt","/get", "getVersions",""+versions.size()),"/versions==" + versions); |
| |
| // unblock recovery |
| logReplay.release(1000); |
| |
| // make sure we can still access versions during recovery |
| assertJQ(req("qt","/get", "getVersions",""+versions.size()),"/versions==" + versions); |
| |
| // wait until recovery has finished |
| assertTrue(logReplayFinish.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| |
| assertJQ(req("q","*:*") ,"/response/numFound==3"); |
| |
| // make sure we can still access versions after recovery |
| assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions); |
| |
| assertU(adoc("id","A2")); |
| assertU(adoc("id","A3")); |
| assertU(delI("A2")); |
| assertU(adoc("id","A4")); |
| |
| assertJQ(req("q","*:*") ,"/response/numFound==3"); |
| |
| h.close(); |
| createCore(); |
| // Solr should kick this off now |
| // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog(); |
| |
| // wait until recovery has finished |
| assertTrue(logReplayFinish.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| assertJQ(req("q","*:*") ,"/response/numFound==5"); |
| assertJQ(req("q","id:A2") ,"/response/numFound==0"); |
| |
| // no updates, so insure that recovery does not run |
| h.close(); |
| int permits = logReplay.availablePermits(); |
| createCore(); |
| // Solr should kick this off now |
| // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog(); |
| |
| assertJQ(req("q","*:*") ,"/response/numFound==5"); |
| Thread.sleep(100); |
| assertEquals(permits, logReplay.availablePermits()); // no updates, so insure that recovery didn't run |
| |
| assertEquals(UpdateLog.State.ACTIVE, h.getCore().getUpdateHandler().getUpdateLog().getState()); |
| |
| } finally { |
| UpdateLog.testing_logReplayHook = null; |
| UpdateLog.testing_logReplayFinishHook = null; |
| } |
| } |
| |
| @Test |
| public void testBuffering() throws Exception { |
| TestInjection.skipIndexWriterCommitOnClose = true; |
| final Semaphore logReplay = new Semaphore(0); |
| final Semaphore logReplayFinish = new Semaphore(0); |
| |
| UpdateLog.testing_logReplayHook = () -> { |
| try { |
| assertTrue(logReplay.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }; |
| |
| UpdateLog.testing_logReplayFinishHook = logReplayFinish::release; |
| |
| SolrQueryRequest req = req(); |
| UpdateHandler uhandler = req.getCore().getUpdateHandler(); |
| UpdateLog ulog = uhandler.getUpdateLog(); |
| |
| try { |
| clearIndex(); |
| assertU(commit()); |
| |
| assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); |
| ulog.bufferUpdates(); |
| assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); |
| Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates(); |
| assertNull(rinfoFuture); |
| assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); |
| |
| ulog.bufferUpdates(); |
| assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); |
| |
| // simulate updates from a leader |
| updateJ(jsonAdd(sdoc("id","B1", "_version_","1010")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","B11", "_version_","1015")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonDelQ("id:B1 id:B11 id:B2 id:B3"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-1017")); |
| updateJ(jsonAdd(sdoc("id","B2", "_version_","1020")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","B3", "_version_","1030")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| deleteAndGetVersion("B1", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-2010")); |
| |
| assertJQ(req("qt","/get", "getVersions","6") |
| ,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}" |
| ); |
| |
| assertU(commit()); |
| |
| assertJQ(req("qt","/get", "getVersions","6") |
| ,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}" |
| ); |
| |
| // updates should be buffered, so we should not see any results yet. |
| assertJQ(req("q", "*:*") |
| , "/response/numFound==0" |
| ); |
| |
| // real-time get should also not show anything (this could change in the future, |
| // but it's currently used for validating version numbers too, so it would |
| // be bad for updates to be visible if we're just buffering. |
| assertJQ(req("qt","/get", "id","B3") |
| ,"=={'doc':null}" |
| ); |
| |
| |
| rinfoFuture = ulog.applyBufferedUpdates(); |
| assertNotNull(rinfoFuture); |
| |
| assertEquals(UpdateLog.State.APPLYING_BUFFERED, ulog.getState()); |
| |
| logReplay.release(1000); |
| |
| UpdateLog.RecoveryInfo rinfo = rinfoFuture.get(); |
| assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); |
| |
| |
| assertJQ(req("qt","/get", "getVersions","6") |
| ,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}" |
| ); |
| |
| |
| assertJQ(req("q", "*:*") |
| , "/response/numFound==2" |
| ); |
| |
| // move back to recovering |
| ulog.bufferUpdates(); |
| assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); |
| |
| Long ver = getVer(req("qt","/get", "id","B3")); |
| assertEquals(1030L, ver.longValue()); |
| |
| // add a reordered doc that shouldn't overwrite one in the index |
| updateJ(jsonAdd(sdoc("id","B3", "_version_","3")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // reorder two buffered updates |
| updateJ(jsonAdd(sdoc("id","B4", "_version_","1040")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| deleteAndGetVersion("B4", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-940")); // this update should not take affect |
| updateJ(jsonAdd(sdoc("id","B6", "_version_","1060")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","B5", "_version_","1050")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","B8", "_version_","1080")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // test that delete by query is at least buffered along with everything else so it will delete the |
| // currently buffered id:8 (even if it doesn't currently support versioning) |
| updateJ("{\"delete\": { \"query\":\"id:B2 OR id:B8\" }}", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-3000")); |
| |
| assertJQ(req("qt","/get", "getVersions","13") |
| ,"=={'versions':[-3000,1080,1050,1060,-940,1040,3,-2010,1030,1020,-1017,1015,1010]}" // the "3" appears because versions aren't checked while buffering |
| ); |
| |
| logReplay.drainPermits(); |
| rinfoFuture = ulog.applyBufferedUpdates(); |
| assertNotNull(rinfoFuture); |
| assertEquals(UpdateLog.State.APPLYING_BUFFERED, ulog.getState()); |
| |
| // apply a single update |
| logReplay.release(1); |
| |
| // now add another update |
| updateJ(jsonAdd(sdoc("id","B7", "_version_","1070")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // a reordered update that should be dropped |
| deleteAndGetVersion("B5", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-950")); |
| |
| deleteAndGetVersion("B6", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-2060")); |
| |
| logReplay.release(1000); |
| UpdateLog.RecoveryInfo recInfo = rinfoFuture.get(); |
| |
| assertJQ(req("q", "*:*", "sort","id asc", "fl","id,_version_") |
| , "/response/docs==[" |
| + "{'id':'B3','_version_':1030}" |
| + ",{'id':'B4','_version_':1040}" |
| + ",{'id':'B5','_version_':1050}" |
| + ",{'id':'B7','_version_':1070}" |
| +"]" |
| ); |
| |
| assertEquals(1, recInfo.deleteByQuery); |
| |
| assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state |
| } finally { |
| UpdateLog.testing_logReplayHook = null; |
| UpdateLog.testing_logReplayFinishHook = null; |
| |
| req().close(); |
| } |
| } |
| |
| @Test |
| @Ignore("HDFS-3107: no truncate support yet") |
| public void testDropBuffered() throws Exception { |
| TestInjection.skipIndexWriterCommitOnClose = true; |
| final Semaphore logReplay = new Semaphore(0); |
| final Semaphore logReplayFinish = new Semaphore(0); |
| |
| UpdateLog.testing_logReplayHook = () -> { |
| try { |
| assertTrue(logReplay.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }; |
| |
| UpdateLog.testing_logReplayFinishHook = logReplayFinish::release; |
| |
| |
| SolrQueryRequest req = req(); |
| UpdateHandler uhandler = req.getCore().getUpdateHandler(); |
| UpdateLog ulog = uhandler.getUpdateLog(); |
| |
| try { |
| clearIndex(); |
| assertU(commit()); |
| |
| assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); |
| ulog.bufferUpdates(); |
| assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); |
| Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates(); |
| assertNull(rinfoFuture); |
| assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); |
| |
| ulog.bufferUpdates(); |
| assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); |
| |
| // simulate updates from a leader |
| updateJ(jsonAdd(sdoc("id","C1", "_version_","101")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","C2", "_version_","102")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","C3", "_version_","103")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| assertTrue(ulog.dropBufferedUpdates()); |
| ulog.bufferUpdates(); |
| updateJ(jsonAdd(sdoc("id", "C4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id", "C5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| logReplay.release(1000); |
| rinfoFuture = ulog.applyBufferedUpdates(); |
| UpdateLog.RecoveryInfo rinfo = rinfoFuture.get(); |
| assertEquals(2, rinfo.adds); |
| |
| assertJQ(req("qt","/get", "getVersions","2") |
| ,"=={'versions':[105,104]}" |
| ); |
| |
| // this time add some docs first before buffering starts (so tlog won't be at pos 0) |
| updateJ(jsonAdd(sdoc("id","C100", "_version_","200")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","C101", "_version_","201")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| ulog.bufferUpdates(); |
| updateJ(jsonAdd(sdoc("id","C103", "_version_","203")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","C104", "_version_","204")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| assertTrue(ulog.dropBufferedUpdates()); |
| ulog.bufferUpdates(); |
| updateJ(jsonAdd(sdoc("id","C105", "_version_","205")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","C106", "_version_","206")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| rinfoFuture = ulog.applyBufferedUpdates(); |
| rinfo = rinfoFuture.get(); |
| assertEquals(2, rinfo.adds); |
| |
| assertJQ(req("q", "*:*", "sort","_version_ asc", "fl","id,_version_") |
| , "/response/docs==[" |
| + "{'id':'C4','_version_':104}" |
| + ",{'id':'C5','_version_':105}" |
| + ",{'id':'C100','_version_':200}" |
| + ",{'id':'C101','_version_':201}" |
| + ",{'id':'C105','_version_':205}" |
| + ",{'id':'C106','_version_':206}" |
| +"]" |
| ); |
| |
| assertJQ(req("qt","/get", "getVersions","6") |
| ,"=={'versions':[206,205,201,200,105,104]}" |
| ); |
| |
| ulog.bufferUpdates(); |
| assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); |
| updateJ(jsonAdd(sdoc("id","C301", "_version_","998")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","C302", "_version_","999")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| assertTrue(ulog.dropBufferedUpdates()); |
| |
| // make sure we can overwrite with a lower version |
| // TODO: is this functionality needed? |
| updateJ(jsonAdd(sdoc("id","C301", "_version_","301")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","C302", "_version_","302")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| assertU(commit()); |
| |
| assertJQ(req("qt","/get", "getVersions","2") |
| ,"=={'versions':[302,301]}" |
| ); |
| |
| assertJQ(req("q", "*:*", "sort","_version_ desc", "fl","id,_version_", "rows","2") |
| , "/response/docs==[" |
| + "{'id':'C302','_version_':302}" |
| + ",{'id':'C301','_version_':301}" |
| +"]" |
| ); |
| |
| updateJ(jsonAdd(sdoc("id","C2", "_version_","302")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state |
| } finally { |
| UpdateLog.testing_logReplayHook = null; |
| UpdateLog.testing_logReplayFinishHook = null; |
| |
| req().close(); |
| } |
| } |
| |
| @Test |
| public void testExistOldBufferLog() throws Exception { |
| TestInjection.skipIndexWriterCommitOnClose = true; |
| |
| SolrQueryRequest req = req(); |
| UpdateHandler uhandler = req.getCore().getUpdateHandler(); |
| UpdateLog ulog = uhandler.getUpdateLog(); |
| |
| try { |
| clearIndex(); |
| assertU(commit()); |
| |
| assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); |
| ulog.bufferUpdates(); |
| |
| // simulate updates from a leader |
| updateJ(jsonAdd(sdoc("id","Q1", "_version_","101")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","Q2", "_version_","102")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","Q3", "_version_","103")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); |
| |
| req.close(); |
| h.close(); |
| createCore(); |
| |
| req = req(); |
| uhandler = req.getCore().getUpdateHandler(); |
| ulog = uhandler.getUpdateLog(); |
| |
| // the core no longer replay updates from buffer tlog on startup |
| assertTrue(ulog.existOldBufferLog()); // since we died while buffering, we should see this last |
| |
| // buffer tlog won't be removed on restart |
| req.close(); |
| h.close(); |
| createCore(); |
| |
| req = req(); |
| uhandler = req.getCore().getUpdateHandler(); |
| ulog = uhandler.getUpdateLog(); |
| |
| assertTrue(ulog.existOldBufferLog()); |
| |
| ulog.bufferUpdates(); |
| // simulate receiving no updates |
| ulog.applyBufferedUpdates(); |
| updateJ(jsonAdd(sdoc("id","Q7", "_version_","117")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // do another add to make sure flags are back to normal |
| |
| req.close(); |
| h.close(); |
| createCore(); |
| |
| req = req(); |
| uhandler = req.getCore().getUpdateHandler(); |
| ulog = uhandler.getUpdateLog(); |
| |
| assertFalse(ulog.existOldBufferLog()); |
| // Timeout for Q7 get replayed, because it was added on tlog, therefore it will be replayed on restart |
| TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| timeout.waitFor("Timeout waiting for finish replay updates", |
| () -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE); |
| assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7"); |
| } finally { |
| UpdateLog.testing_logReplayHook = null; |
| UpdateLog.testing_logReplayFinishHook = null; |
| |
| req().close(); |
| } |
| } |
| |
| // make sure that on a restart, versions don't start too low |
| @Test |
| public void testVersionsOnRestart() throws Exception { |
| clearIndex(); |
| assertU(commit()); |
| |
| assertU(adoc("id","D1", "val_i","1")); |
| assertU(adoc("id","D2", "val_i","1")); |
| assertU(commit()); |
| long v1 = getVer(req("q","id:D1")); |
| long v1a = getVer(req("q","id:D2")); |
| |
| h.close(); |
| createCore(); |
| |
| assertU(adoc("id","D1", "val_i","2")); |
| assertU(commit()); |
| long v2 = getVer(req("q","id:D1")); |
| |
| assert(v2 > v1); |
| |
| assertJQ(req("qt","/get", "getVersions","2") |
| ,"/versions==[" + v2 + "," + v1a + "]" |
| ); |
| } |
| |
| // make sure that log isn't needlessly replayed after a clean close |
| @Test |
| public void testCleanShutdown() throws Exception { |
| final Semaphore logReplay = new Semaphore(0); |
| final Semaphore logReplayFinish = new Semaphore(0); |
| |
| UpdateLog.testing_logReplayHook = () -> { |
| try { |
| assertTrue(logReplay.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }; |
| |
| UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release(); |
| |
| SolrQueryRequest req = req(); |
| UpdateHandler uhandler = req.getCore().getUpdateHandler(); |
| UpdateLog ulog = uhandler.getUpdateLog(); |
| |
| try { |
| clearIndex(); |
| assertU(commit()); |
| |
| assertU(adoc("id","E1", "val_i","1")); |
| assertU(adoc("id","E2", "val_i","1")); |
| |
| // set to a high enough number so this test won't hang on a bug |
| logReplay.release(10); |
| |
| h.close(); |
| createCore(); |
| |
| // make sure the docs got committed |
| assertJQ(req("q","*:*"),"/response/numFound==2"); |
| |
| // make sure no replay happened |
| assertEquals(10, logReplay.availablePermits()); |
| |
| } finally { |
| UpdateLog.testing_logReplayHook = null; |
| UpdateLog.testing_logReplayFinishHook = null; |
| |
| req().close(); |
| } |
| } |
| |
| private void addDocs(int nDocs, int start, LinkedList<Long> versions) throws Exception { |
| for (int i=0; i<nDocs; i++) { |
| versions.addFirst( addAndGetVersion( sdoc("id",Integer.toString(start + nDocs)) , null) ); |
| } |
| } |
| |
| @Test |
| public void testRemoveOldLogs() throws Exception { |
| try { |
| TestInjection.skipIndexWriterCommitOnClose = true; |
| final Semaphore logReplay = new Semaphore(0); |
| final Semaphore logReplayFinish = new Semaphore(0); |
| |
| UpdateLog.testing_logReplayHook = () -> { |
| try { |
| assertTrue(logReplay.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }; |
| |
| UpdateLog.testing_logReplayFinishHook = logReplayFinish::release; |
| |
| |
| clearIndex(); |
| assertU(commit()); |
| |
| String logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir(); |
| |
| h.close(); |
| |
| String[] files = HdfsUpdateLog.getLogList(fs, new Path(logDir)); |
| for (String file : files) { |
| fs.delete(new Path(logDir, file), false); |
| } |
| |
| assertEquals(0, HdfsUpdateLog.getLogList(fs, new Path(logDir)).length); |
| |
| createCore(); |
| |
| int start = 0; |
| int maxReq = 50; |
| |
| LinkedList<Long> versions = new LinkedList<>(); |
| addDocs(10, start, versions); start+=10; |
| assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); |
| assertU(commit()); |
| assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); |
| |
| addDocs(10, start, versions); start+=10; |
| assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); |
| assertU(commit()); |
| assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); |
| |
| assertEquals(2, HdfsUpdateLog.getLogList(fs, new Path(logDir)).length); |
| |
| addDocs(105, start, versions); start+=105; |
| assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); |
| assertU(commit()); |
| assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); |
| |
| // previous two logs should be gone now |
| assertEquals(1, HdfsUpdateLog.getLogList(fs, new Path(logDir)).length); |
| |
| addDocs(1, start, versions); start+=1; |
| h.close(); |
| createCore(); // trigger recovery, make sure that tlog reference handling is correct |
| |
| // test we can get versions while replay is happening |
| assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); |
| |
| logReplay.release(1000); |
| assertTrue(logReplayFinish.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| |
| assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); |
| |
| addDocs(105, start, versions); start+=105; |
| assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); |
| assertU(commit()); |
| assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); |
| |
| // previous logs should be gone now |
| assertEquals(1, HdfsUpdateLog.getLogList(fs, new Path(logDir)).length); |
| |
| // |
| // test that a corrupt tlog file doesn't stop us from coming up, or seeing versions before that tlog file. |
| // |
| addDocs(1, start, new LinkedList<Long>()); // don't add this to the versions list because we are going to lose it... |
| h.close(); |
| files = HdfsUpdateLog.getLogList(fs, new Path(logDir)); |
| Arrays.sort(files); |
| |
| FSDataOutputStream dos = fs.create(new Path(new Path(logDir), files[files.length-1]), (short)1); |
| dos.writeUTF("This is a trashed log file that really shouldn't work at all, but we'll see.."); |
| dos.close(); |
| |
| ignoreException("Failure to open existing"); |
| createCore(); |
| // we should still be able to get the list of versions (not including the trashed log file) |
| assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start))); |
| resetExceptionIgnores(); |
| |
| } finally { |
| UpdateLog.testing_logReplayHook = null; |
| UpdateLog.testing_logReplayFinishHook = null; |
| } |
| } |
| |
| // |
| // test that a partially written last tlog entry (that will cause problems for both reverse reading and for |
| // log replay) doesn't stop us from coming up, and from recovering the documents that were not cut off. |
| // |
| @Test |
| public void testTruncatedLog() throws Exception { |
| try { |
| TestInjection.skipIndexWriterCommitOnClose = true; |
| final Semaphore logReplay = new Semaphore(0); |
| final Semaphore logReplayFinish = new Semaphore(0); |
| |
| UpdateLog.testing_logReplayHook = () -> { |
| try { |
| assertTrue(logReplay.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }; |
| |
| UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release(); |
| |
| String logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir(); |
| |
| clearIndex(); |
| assertU(commit()); |
| |
| assertU(adoc("id","F1")); |
| assertU(adoc("id","F2")); |
| assertU(adoc("id","F3")); |
| |
| h.close(); |
| |
| |
| |
| String[] files = HdfsUpdateLog.getLogList(fs, new Path(logDir)); |
| Arrays.sort(files); |
| |
| FSDataOutputStream dos = fs.append(new Path(logDir, files[files.length-1])); |
| |
| dos.writeLong(0xffffffffffffffffL); |
| dos.writeChars("This should be appended to a good log file, representing a bad partially written record."); |
| dos.close(); |
| |
| logReplay.release(1000); |
| logReplayFinish.drainPermits(); |
| ignoreException("OutOfBoundsException"); // this is what the corrupted log currently produces... subject to change. |
| createCore(); |
| assertTrue(logReplayFinish.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| resetExceptionIgnores(); |
| assertJQ(req("q","*:*") ,"/response/numFound==3"); |
| |
| // |
| // Now test that the bad log file doesn't mess up retrieving latest versions |
| // |
| |
| updateJ(jsonAdd(sdoc("id","F4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","F5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","F6", "_version_","106")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // This currently skips the bad log file and also returns the version of the clearIndex (del *:*) |
| // assertJQ(req("qt","/get", "getVersions","6"), "/versions==[106,105,104]"); |
| assertJQ(req("qt","/get", "getVersions","3"), "/versions==[106,105,104]"); |
| |
| } finally { |
| UpdateLog.testing_logReplayHook = null; |
| UpdateLog.testing_logReplayFinishHook = null; |
| } |
| } |
| |
| // |
| // test that a corrupt tlog doesn't stop us from coming up |
| // |
| @Test |
| public void testCorruptLog() throws Exception { |
| try { |
| TestInjection.skipIndexWriterCommitOnClose = true; |
| |
| String logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir(); |
| |
| clearIndex(); |
| assertU(commit()); |
| |
| assertU(adoc("id","G1")); |
| assertU(adoc("id","G2")); |
| assertU(adoc("id","G3")); |
| |
| h.close(); |
| |
| String[] files = HdfsUpdateLog.getLogList(fs, new Path(logDir)); |
| Arrays.sort(files); |
| |
| FSDataOutputStream dos = fs.create(new Path(logDir, files[files.length-1]), (short)1); |
| dos.write(new byte[800]); // zero out file |
| dos.close(); |
| |
| |
| ignoreException("Failure to open existing log file"); // this is what the corrupted log currently produces... subject to change. |
| createCore(); |
| resetExceptionIgnores(); |
| |
| // just make sure it responds |
| assertJQ(req("q","*:*") ,"/response/numFound==0"); |
| |
| // |
| // Now test that the bad log file doesn't mess up retrieving latest versions |
| // |
| |
| updateJ(jsonAdd(sdoc("id","G4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","G5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","G6", "_version_","106")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // This currently skips the bad log file and also returns the version of the clearIndex (del *:*) |
| // assertJQ(req("qt","/get", "getVersions","6"), "/versions==[106,105,104]"); |
| assertJQ(req("qt","/get", "getVersions","3"), "/versions==[106,105,104]"); |
| |
| assertU(commit()); |
| |
| assertJQ(req("q","*:*") ,"/response/numFound==3"); |
| |
| // This messes up some other tests (on windows) if we don't remove the bad log. |
| // This *should* hopefully just be because the tests are too fragile and not because of real bugs - but it should be investigated further. |
| deleteLogs(); |
| |
| } finally { |
| UpdateLog.testing_logReplayHook = null; |
| UpdateLog.testing_logReplayFinishHook = null; |
| } |
| } |
| |
| // in rare circumstances, two logs can be left uncapped (lacking a commit at the end signifying that all the content in the log was committed) |
| @Test |
| public void testRecoveryMultipleLogs() throws Exception { |
| try { |
| TestInjection.skipIndexWriterCommitOnClose = true; |
| final Semaphore logReplay = new Semaphore(0); |
| final Semaphore logReplayFinish = new Semaphore(0); |
| |
| UpdateLog.testing_logReplayHook = () -> { |
| try { |
| assertTrue(logReplay.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }; |
| |
| UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release(); |
| |
| String logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir(); |
| |
| clearIndex(); |
| assertU(commit()); |
| |
| assertU(adoc("id","AAAAAA")); |
| assertU(adoc("id","BBBBBB")); |
| assertU(adoc("id","CCCCCC")); |
| |
| h.close(); |
| String[] files = HdfsUpdateLog.getLogList(fs, new Path(logDir)); |
| Arrays.sort(files); |
| String fname = files[files.length-1]; |
| |
| try(FSDataOutputStream dos = fs.append(new Path(logDir, files[files.length-1]))) { |
| dos.writeLong(0xffffffffffffffffL); |
| dos.writeChars("This should be appended to a good log file, representing a bad partially written record."); |
| } |
| |
| try(FSDataInputStream dis = fs.open(new Path(logDir, files[files.length-1]))) { |
| byte[] content = new byte[dis.available()]; |
| dis.readFully(content); |
| |
| // Now make a newer log file with just the IDs changed. NOTE: this may not work if log format changes too much! |
| findReplace("AAAAAA".getBytes(StandardCharsets.UTF_8), "aaaaaa".getBytes(StandardCharsets.UTF_8), content); |
| findReplace("BBBBBB".getBytes(StandardCharsets.UTF_8), "bbbbbb".getBytes(StandardCharsets.UTF_8), content); |
| findReplace("CCCCCC".getBytes(StandardCharsets.UTF_8), "cccccc".getBytes(StandardCharsets.UTF_8), content); |
| |
| // WARNING... assumes format of .00000n where n is less than 9 |
| long logNumber = Long.parseLong(fname.substring(fname.lastIndexOf(".") + 1)); |
| String fname2 = String.format(Locale.ROOT, |
| UpdateLog.LOG_FILENAME_PATTERN, |
| UpdateLog.TLOG_NAME, |
| logNumber + 1); |
| |
| try(FSDataOutputStream dos = fs.create(new Path(logDir, fname2), (short)1)) { |
| dos.write(content); |
| } |
| } |
| |
| logReplay.release(1000); |
| logReplayFinish.drainPermits(); |
| ignoreException("OutOfBoundsException"); // this is what the corrupted log currently produces... subject to change. |
| createCore(); |
| assertTrue(logReplayFinish.tryAcquire(TIMEOUT, TimeUnit.SECONDS)); |
| resetExceptionIgnores(); |
| assertJQ(req("q","*:*") ,"/response/numFound==6"); |
| |
| } finally { |
| UpdateLog.testing_logReplayHook = null; |
| UpdateLog.testing_logReplayFinishHook = null; |
| } |
| } |
| |
| |
| // NOTE: replacement must currently be same size |
| private static void findReplace(byte[] from, byte[] to, byte[] data) { |
| int idx = -from.length; |
| for(;;) { |
| idx = indexOf(from, data, idx + from.length); // skip over previous match |
| if (idx < 0) break; |
| System.arraycopy(to, 0, data, idx, to.length); |
| } |
| } |
| |
| private static int indexOf(byte[] target, byte[] data, int start) { |
| outer: for (int i=start; i<data.length - target.length; i++) { |
| for (int j=0; j<target.length; j++) { |
| if (data[i+j] != target[j]) continue outer; |
| } |
| return i; |
| } |
| return -1; |
| } |
| |
| // stops the core, removes the transaction logs, restarts the core. |
| void deleteLogs() throws Exception { |
| String logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir(); |
| |
| h.close(); |
| |
| try { |
| String[] files = HdfsUpdateLog.getLogList(fs, new Path(logDir)); |
| for (String file : files) { |
| //new File(logDir, file).delete(); |
| fs.delete(new Path(logDir, file), false); |
| } |
| |
| assertEquals(0, HdfsUpdateLog.getLogList(fs, new Path(logDir)).length); |
| } finally { |
| // make sure we create the core again, even if the assert fails so it won't mess |
| // up the next test. |
| createCore(); |
| assertJQ(req("q","*:*") ,"/response/numFound=="); // ensure it works |
| } |
| } |
| |
| private static Long getVer(SolrQueryRequest req) throws Exception { |
| String response = JQ(req); |
| @SuppressWarnings({"rawtypes"}) |
| Map rsp = (Map) Utils.fromJSONString(response); |
| @SuppressWarnings({"rawtypes"}) |
| Map doc = null; |
| if (rsp.containsKey("doc")) { |
| doc = (Map)rsp.get("doc"); |
| } else if (rsp.containsKey("docs")) { |
| @SuppressWarnings({"rawtypes"}) |
| List lst = (List)rsp.get("docs"); |
| if (lst.size() > 0) { |
| doc = (Map)lst.get(0); |
| } |
| } else if (rsp.containsKey("response")) { |
| @SuppressWarnings({"rawtypes"}) |
| Map responseMap = (Map)rsp.get("response"); |
| @SuppressWarnings({"rawtypes"}) |
| List lst = (List)responseMap.get("docs"); |
| if (lst.size() > 0) { |
| doc = (Map)lst.get(0); |
| } |
| } |
| |
| if (doc == null) return null; |
| |
| return (Long)doc.get("_version_"); |
| } |
| } |
| |