blob: c1a97317cbb7e62be725d87caaf68b56e6d53803 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.util.TestInjection;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.solr.common.util.Utils.fromJSONString;
@Nightly
public class CdcrUpdateLogTest extends SolrTestCaseJ4 {
private static int timeout = 60; // acquire timeout in seconds. change this to a huge number when debugging to prevent threads from advancing.
// TODO: fix this test to not require FSDirectory
static String savedFactory;
@BeforeClass
public static void beforeClass() throws Exception {
savedFactory = System.getProperty("solr.DirectoryFactory");
System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockFSDirectoryFactory");
initCore("solrconfig-cdcrupdatelog.xml", "schema15.xml");
}
@AfterClass
public static void afterClass() {
if (savedFactory == null) {
System.clearProperty("solr.directoryFactory");
} else {
System.setProperty("solr.directoryFactory", savedFactory);
}
}
private void clearCore() throws IOException {
clearIndex();
assertU(commit());
UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
h.close();
String[] files = ulog.getLogList(logDir);
for (String file : files) {
File toDelete = new File(logDir, file);
Files.delete(toDelete.toPath()); // Should we really error out here?
}
assertEquals(0, ulog.getLogList(logDir).length);
createCore();
}
private void deleteByQuery(String q) throws Exception {
deleteByQueryAndGetVersion(q, null);
}
private void addDocs(int nDocs, int start, LinkedList<Long> versions) throws Exception {
for (int i = 0; i < nDocs; i++) {
versions.addFirst(addAndGetVersion(sdoc("id", Integer.toString(start + i)), null));
}
}
private static Long getVer(SolrQueryRequest req) throws Exception {
@SuppressWarnings({"rawtypes"})
Map rsp = (Map) fromJSONString(JQ(req));
@SuppressWarnings({"rawtypes"})
Map doc = null;
if (rsp.containsKey("doc")) {
doc = (Map) rsp.get("doc");
} else if (rsp.containsKey("docs")) {
@SuppressWarnings({"rawtypes"})
List lst = (List) rsp.get("docs");
if (lst.size() > 0) {
doc = (Map) lst.get(0);
}
} else if (rsp.containsKey("response")) {
@SuppressWarnings({"rawtypes"})
Map responseMap = (Map) rsp.get("response");
@SuppressWarnings({"rawtypes"})
List lst = (List) responseMap.get("docs");
if (lst.size() > 0) {
doc = (Map) lst.get(0);
}
}
if (doc == null) return null;
return (Long) doc.get("_version_");
}
@Test
public void testLogReaderNext() throws Exception {
this.clearCore();
int start = 0;
UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
CdcrUpdateLog.CdcrLogReader reader = ((CdcrUpdateLog) ulog).newLogReader(); // test reader on empty updates log
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(11, start, versions);
start += 11;
assertU(commit());
for (int i = 0; i < 10; i++) { // 10 adds
assertNotNull(reader.next());
}
Object o = reader.next();
assertNotNull(o);
@SuppressWarnings({"rawtypes"})
List entry = (List) o;
int opAndFlags = (Integer) entry.get(0);
assertEquals(UpdateLog.COMMIT, opAndFlags & UpdateLog.OPERATION_MASK);
for (int i = 0; i < 11; i++) { // 11 adds
assertNotNull(reader.next());
}
o = reader.next();
assertNotNull(o);
entry = (List) o;
opAndFlags = (Integer) entry.get(0);
assertEquals(UpdateLog.COMMIT, opAndFlags & UpdateLog.OPERATION_MASK);
assertNull(reader.next());
// add a new tlog after having exhausted the reader
addDocs(10, start, versions);
start += 10;
assertU(commit());
// the reader should pick up the new tlog
for (int i = 0; i < 11; i++) { // 10 adds + 1 commit
assertNotNull(reader.next());
}
assertNull(reader.next());
}
/**
* Check the seek method of the log reader.
*/
@Test
public void testLogReaderSeek() throws Exception {
this.clearCore();
int start = 0;
UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
CdcrUpdateLog.CdcrLogReader reader1 = ((CdcrUpdateLog) ulog).newLogReader();
CdcrUpdateLog.CdcrLogReader reader2 = ((CdcrUpdateLog) ulog).newLogReader();
CdcrUpdateLog.CdcrLogReader reader3 = ((CdcrUpdateLog) ulog).newLogReader();
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(11, start, versions);
start += 11;
assertU(commit());
addDocs(10, start, versions);
start += 10;
assertU(commit());
// Test case where target version is equal to startVersion of tlog file
long targetVersion = getVer(req("q", "id:10"));
assertTrue(reader1.seek(targetVersion));
Object o = reader1.next();
assertNotNull(o);
@SuppressWarnings({"rawtypes"})
List entry = (List) o;
long version = (Long) entry.get(1);
assertEquals(targetVersion, version);
assertNotNull(reader1.next());
// test case where target version is superior to startVersion of tlog file
targetVersion = getVer(req("q", "id:26"));
assertTrue(reader2.seek(targetVersion));
o = reader2.next();
assertNotNull(o);
entry = (List) o;
version = (Long) entry.get(1);
assertEquals(targetVersion, version);
assertNotNull(reader2.next());
// test case where target version is inferior to startVersion of oldest tlog file
targetVersion = getVer(req("q", "id:0")) - 1;
assertFalse(reader3.seek(targetVersion));
}
/**
* Check that the log reader is able to read the new tlog
* and pick up new entries as they appear.
*/
@Test
public void testLogReaderNextOnNewTLog() throws Exception {
this.clearCore();
int start = 0;
UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
CdcrUpdateLog.CdcrLogReader reader = ((CdcrUpdateLog) ulog).newLogReader();
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(11, start, versions);
start += 11;
for (int i = 0; i < 22; i++) { // 21 adds + 1 commit
assertNotNull(reader.next());
}
// we should have reach the end of the new tlog
assertNull(reader.next());
addDocs(5, start, versions);
start += 5;
// the reader should now pick up the new updates
for (int i = 0; i < 5; i++) { // 5 adds
assertNotNull(reader.next());
}
assertNull(reader.next());
}
@Test
public void testRemoveOldLogs() throws Exception {
this.clearCore();
UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
int start = 0;
int maxReq = 50;
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
assertU(commit());
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
addDocs(10, start, versions);
start += 10;
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
assertU(commit());
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
assertEquals(2, ulog.getLogList(logDir).length);
// Get a cdcr log reader to initialise a log pointer
CdcrUpdateLog.CdcrLogReader reader = ((CdcrUpdateLog) ulog).newLogReader();
addDocs(105, start, versions);
start += 105;
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
assertU(commit());
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
// the previous two tlogs should not be removed
assertEquals(3, ulog.getLogList(logDir).length);
// move the pointer past the first tlog
for (int i = 0; i <= 11; i++) { // 10 adds + 1 commit
assertNotNull(reader.next());
}
addDocs(10, start, versions);
start += 10;
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
assertU(commit());
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
// the first tlog should be removed
assertEquals(3, ulog.getLogList(logDir).length);
h.close();
createCore();
ulog = h.getCore().getUpdateHandler().getUpdateLog();
addDocs(105, start, versions);
start += 105;
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
assertU(commit());
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
// previous tlogs should be gone now
assertEquals(1, ulog.getLogList(logDir).length);
}
/**
* Check that the removal of old logs is taking into consideration
* multiple log pointers. Check also that the removal takes into consideration the
* numRecordsToKeep limit, even if the log pointers are ahead.
*/
@Test
public void testRemoveOldLogsMultiplePointers() throws Exception {
this.clearCore();
UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
CdcrUpdateLog.CdcrLogReader reader1 = ((CdcrUpdateLog) ulog).newLogReader();
CdcrUpdateLog.CdcrLogReader reader2 = ((CdcrUpdateLog) ulog).newLogReader();
int start = 0;
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(105, start, versions);
start += 105;
assertU(commit());
// the previous two tlogs should not be removed
assertEquals(3, ulog.getLogList(logDir).length);
// move the first pointer past the first tlog
for (int i = 0; i <= 11; i++) { // 10 adds + 1 commit
assertNotNull(reader1.next());
}
addDocs(10, start, versions);
start += 10;
assertU(commit());
// the first tlog should not be removed
assertEquals(4, ulog.getLogList(logDir).length);
// move the second pointer past the first tlog
for (int i = 0; i <= 11; i++) { // 10 adds + 1 commit
assertNotNull(reader2.next());
}
addDocs(10, start, versions);
start += 10;
assertU(commit());
// the first tlog should be removed
assertEquals(4, ulog.getLogList(logDir).length);
// exhaust the readers
while (reader1.next() != null) {
}
while (reader2.next() != null) {
}
// the readers should point to the new tlog
// now add enough documents to trigger the numRecordsToKeep limit
addDocs(80, start, versions);
start += 80;
assertU(commit());
// the update log should kept the last 3 tlogs, which sum up to 100 records
assertEquals(3, ulog.getLogList(logDir).length);
}
/**
* Check that the output stream of an uncapped tlog is correctly reopen
* and that the commit is written during recovery.
*/
@Test
public void testClosingOutputStreamAfterLogReplay() throws Exception {
this.clearCore();
try {
TestInjection.skipIndexWriterCommitOnClose = true;
final Semaphore logReplay = new Semaphore(0);
final Semaphore logReplayFinish = new Semaphore(0);
UpdateLog.testing_logReplayHook = () -> {
try {
assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
} catch (Exception e) {
throw new RuntimeException(e);
}
};
UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release();
Deque<Long> versions = new ArrayDeque<>();
versions.addFirst(addAndGetVersion(sdoc("id", "A11"), null));
versions.addFirst(addAndGetVersion(sdoc("id", "A12"), null));
versions.addFirst(addAndGetVersion(sdoc("id", "A13"), null));
assertJQ(req("q", "*:*"), "/response/numFound==0");
assertJQ(req("qt", "/get", "getVersions", "" + versions.size()), "/versions==" + versions);
h.close();
createCore();
// Solr should kick this off now
// h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
// verify that previous close didn't do a commit
// recovery should be blocked by our hook
assertJQ(req("q", "*:*"), "/response/numFound==0");
// unblock recovery
logReplay.release(1000);
// wait until recovery has finished
assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
assertJQ(req("q", "*:*"), "/response/numFound==3");
// The transaction log should have written a commit and close its output stream
UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
assertEquals(0, ulog.logs.peekLast().refcount.get());
assertNull(ulog.logs.peekLast().channel);
ulog.logs.peekLast().incref(); // reopen the output stream to check if its ends with a commit
assertTrue(ulog.logs.peekLast().endsWithCommit());
ulog.logs.peekLast().decref();
} finally {
TestInjection.skipIndexWriterCommitOnClose = false; // reset
UpdateLog.testing_logReplayHook = null;
UpdateLog.testing_logReplayFinishHook = null;
}
}
/**
* Check the buffering of the old tlogs
*/
@Test
public void testBuffering() throws Exception {
this.clearCore();
CdcrUpdateLog ulog = (CdcrUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
int start = 0;
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(105, start, versions);
start += 105;
assertU(commit());
// the first two tlogs should have been removed
assertEquals(1, ulog.getLogList(logDir).length);
// enable buffer
ulog.enableBuffer();
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(105, start, versions);
start += 105;
assertU(commit());
// no tlog should have been removed
assertEquals(4, ulog.getLogList(logDir).length);
// disable buffer
ulog.disableBuffer();
addDocs(10, start, versions);
start += 10;
assertU(commit());
// old tlogs should have been removed
assertEquals(2, ulog.getLogList(logDir).length);
}
@Test
public void testSubReader() throws Exception {
this.clearCore();
CdcrUpdateLog ulog = (CdcrUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
int start = 0;
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(10, start, versions);
start += 10;
assertU(commit());
assertEquals(2, ulog.getLogList(logDir).length);
// start to read the first tlog
for (int i = 0; i < 10; i++) {
assertNotNull(reader.next());
}
// instantiate a sub reader, and finish to read the first tlog (commit operation), plus start to read the
// second tlog (first five adds)
CdcrUpdateLog.CdcrLogReader subReader = reader.getSubReader();
for (int i = 0; i < 6; i++) {
assertNotNull(subReader.next());
}
// Five adds + one commit
assertEquals(6, subReader.getNumberOfRemainingRecords());
// Generate a new tlog
addDocs(105, start, versions);
start += 105;
assertU(commit());
// Even if the subreader is past the first tlog, the first tlog should not have been removed
// since the parent reader is still pointing to it
assertEquals(3, ulog.getLogList(logDir).length);
// fast forward the parent reader with the subreader
reader.forwardSeek(subReader);
subReader.close();
// After fast forward, the parent reader should be position on the doc15
@SuppressWarnings({"rawtypes"})
List o = (List) reader.next();
assertNotNull(o);
assertTrue("Expected SolrInputDocument but got" + o.toString() ,o.get(3) instanceof SolrInputDocument);
assertEquals("15", ((SolrInputDocument) o.get(3)).getFieldValue("id"));
// Finish to read the second tlog, and start to read the third one
for (int i = 0; i < 6; i++) {
assertNotNull(reader.next());
}
assertEquals(105, reader.getNumberOfRemainingRecords());
// Generate a new tlog to activate tlog cleaning
addDocs(10, start, versions);
start += 10;
assertU(commit());
// If the parent reader was correctly fast forwarded, it should be on the third tlog, and the first two should
// have been removed.
assertEquals(2, ulog.getLogList(logDir).length);
}
/**
* Check that the reader is correctly reset to its last position
*/
@Test
public void testResetToLastPosition() throws Exception {
this.clearCore();
CdcrUpdateLog ulog = (CdcrUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
int start = 0;
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(10, start, versions);
start += 10;
assertU(commit());
assertEquals(2, ulog.getLogList(logDir).length);
for (int i = 0; i < 22; i++) {
Object o = reader.next();
assertNotNull(o);
// reset to last position
reader.resetToLastPosition();
// we should read the same update operation, i.e., same version number
assertEquals(((List) o).get(1), ((List) reader.next()).get(1));
}
assertNull(reader.next());
}
/**
* Check that the reader is correctly reset to its last position
*/
@Test
public void testGetNumberOfRemainingRecords() throws Exception {
try {
TestInjection.skipIndexWriterCommitOnClose = true;
final Semaphore logReplayFinish = new Semaphore(0);
UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release();
this.clearCore();
int start = 0;
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
assertU(commit());
addDocs(10, start, versions);
start += 10;
h.close();
logReplayFinish.drainPermits();
createCore();
// At this stage, we have re-opened a capped tlog, and an uncapped tlog.
// check that the number of remaining records is correctly computed in these two cases
CdcrUpdateLog ulog = (CdcrUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
// wait for the replay to finish
assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
// 20 records + 2 commits
assertEquals(22, reader.getNumberOfRemainingRecords());
for (int i = 0; i < 22; i++) {
Object o = reader.next();
assertNotNull(o);
assertEquals(22 - (i + 1), reader.getNumberOfRemainingRecords());
}
assertNull(reader.next());
assertEquals(0, reader.getNumberOfRemainingRecords());
// It should pick up the new tlog files
addDocs(10, start, versions);
assertEquals(10, reader.getNumberOfRemainingRecords());
} finally {
TestInjection.skipIndexWriterCommitOnClose = false; // reset
UpdateLog.testing_logReplayFinishHook = null;
}
}
/**
* Check that the initialisation of the log reader is picking up the tlog file that is currently being
* written.
*/
@Test
public void testLogReaderInitOnNewTlog() throws Exception {
this.clearCore();
int start = 0;
// Start to index some documents to instantiate the new tlog
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
// Create the reader after the instantiation of the new tlog
UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
CdcrUpdateLog.CdcrLogReader reader = ((CdcrUpdateLog) ulog).newLogReader();
// Continue to index documents and commits
addDocs(11, start, versions);
start += 11;
assertU(commit());
// check that the log reader was initialised with the new tlog
for (int i = 0; i < 22; i++) { // 21 adds + 1 commit
assertNotNull(reader.next());
}
// we should have reach the end of the new tlog
assertNull(reader.next());
}
/**
* Check that the absolute version number is used for the update log index and for the last entry read
*/
@Test
public void testAbsoluteLastVersion() throws Exception {
this.clearCore();
CdcrUpdateLog ulog = (CdcrUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
int start = 0;
LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions);
start += 10;
deleteByQuery("*:*");
assertU(commit());
deleteByQuery("*:*");
addDocs(10, start, versions);
start += 10;
assertU(commit());
assertEquals(2, ulog.getLogList(logDir).length);
for (long version : ulog.getStartingVersions()) {
assertTrue(version > 0);
}
for (int i = 0; i < 10; i++) {
reader.next();
}
// first delete
Object o = reader.next();
assertTrue((Long) ((List) o).get(1) < 0);
assertTrue(reader.getLastVersion() > 0);
reader.next(); // commit
// second delete
o = reader.next();
assertTrue((Long) ((List) o).get(1) < 0);
assertTrue(reader.getLastVersion() > 0);
}
}