blob: 5a7152504ab652b2f2ee1b31b7101273e70e4c74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.handler.UpdateRequestHandler;
import org.apache.solr.request.SolrQueryRequestBase;
import org.apache.solr.response.SolrQueryResponse;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MaxSizeAutoCommitTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// Given an ID, returns an XML string for an "add document" request
private static String addDoc(int id) {
return adoc("id", Integer.toString(id));
}
// Given an ID, returns an XML string for a "delete document" request
private static String delDoc(int id) {
return delI(Integer.toString(id));
}
// How long to sleep while checking for commits
private static final int COMMIT_CHECKING_SLEEP_TIME_MS = 50;
// max TLOG file size
private static final int MAX_FILE_SIZE = 1000;
private SolrCore core;
private DirectUpdateHandler2 updateHandler;
private CommitTracker hardCommitTracker;
private UpdateRequestHandler updateRequestHandler;
private MockEventListener monitor;
@Before
public void setup() throws Exception {
System.setProperty("solr.ulog", "solr.UpdateLog");
initCore("solrconfig-tlog.xml", "schema.xml");
core = h.getCore();
updateHandler = (DirectUpdateHandler2) core.getUpdateHandler();
// we don't care about auto-commit's opening a new Searcher in this test, just skip it.
updateHandler.softCommitTracker.setOpenSearcher(false);
updateHandler.commitTracker.setOpenSearcher(false);
// we don't care about soft commit's at all
updateHandler.softCommitTracker.setTimeUpperBound(-1);
updateHandler.softCommitTracker.setDocsUpperBound(-1);
updateHandler.softCommitTracker.setTLogFileSizeUpperBound(-1);
hardCommitTracker = updateHandler.commitTracker;
// Only testing file-size based auto hard commits - disable other checks
hardCommitTracker.setTimeUpperBound(-1);
hardCommitTracker.setDocsUpperBound(-1);
hardCommitTracker.setTLogFileSizeUpperBound(MAX_FILE_SIZE);
monitor = new MockEventListener();
updateHandler.registerCommitCallback(monitor);
updateRequestHandler = new UpdateRequestHandler();
updateRequestHandler.init( null );
}
@After
public void tearDown() throws Exception {
if (null != monitor) {
monitor.assertSaneOffers();
monitor.clear();
}
super.tearDown();
System.clearProperty("solr.ulog");
deleteCore();
}
@Test
public void testAdds() throws Exception {
Assert.assertEquals("There have been no updates yet, so there shouldn't have been any commits", 0,
hardCommitTracker.getCommitCount());
long tlogSizePreUpdates = updateHandler.getUpdateLog().getCurrentLogSizeFromStream();
Assert.assertEquals("There have been no updates yet, so tlog should be empty", 0, tlogSizePreUpdates);
// Add a large number of docs - should trigger a commit
int numDocsToAdd = 500;
SolrQueryResponse updateResp = new SolrQueryResponse();
monitor.doStuffAndExpectAtLeastOneCommit(hardCommitTracker, updateHandler, () -> {
updateRequestHandler.handleRequest(constructBatchAddDocRequest(0, numDocsToAdd), updateResp);
});
}
@Test
public void testRedundantDeletes() throws Exception {
Assert.assertEquals("There have been no updates yet, so there shouldn't have been any commits", 0,
hardCommitTracker.getCommitCount());
long tlogSizePreUpdates = updateHandler.getUpdateLog().getCurrentLogSizeFromStream();
Assert.assertEquals("There have been no updates yet, so tlog should be empty", 0, tlogSizePreUpdates);
// Add docs
int numDocsToAdd = 150;
SolrQueryResponse updateResp = new SolrQueryResponse();
monitor.doStuffAndExpectAtLeastOneCommit(hardCommitTracker, updateHandler, () -> {
updateRequestHandler.handleRequest(constructBatchAddDocRequest(0, numDocsToAdd), updateResp);
});
// Send a bunch of redundant deletes
int numDeletesToSend = 500;
int docIdToDelete = 100;
SolrQueryRequestBase batchSingleDeleteRequest = new SolrQueryRequestBase(core, new MapSolrParams(new HashMap<>())) {};
List<String> docs = new ArrayList<>();
for (int i = 0; i < numDeletesToSend; i++) {
docs.add(delI(Integer.toString(docIdToDelete)));
}
batchSingleDeleteRequest.setContentStreams(toContentStreams(docs));
monitor.doStuffAndExpectAtLeastOneCommit(hardCommitTracker, updateHandler, () -> {
updateRequestHandler.handleRequest(batchSingleDeleteRequest, updateResp);
});
}
@Test
public void testDeletes() throws Exception {
Assert.assertEquals("There have been no updates yet, so there shouldn't have been any commits", 0,
hardCommitTracker.getCommitCount());
long tlogSizePreUpdates = updateHandler.getUpdateLog().getCurrentLogSizeFromStream();
Assert.assertEquals("There have been no updates yet, so tlog should be empty", 0, tlogSizePreUpdates);
// Add docs
int numDocsToAdd = 500;
SolrQueryResponse updateResp = new SolrQueryResponse();
monitor.doStuffAndExpectAtLeastOneCommit(hardCommitTracker, updateHandler, () -> {
updateRequestHandler.handleRequest(constructBatchAddDocRequest(0, numDocsToAdd), updateResp);
});
// Delete all documents - should trigger a commit
monitor.doStuffAndExpectAtLeastOneCommit(hardCommitTracker, updateHandler, () -> {
updateRequestHandler.handleRequest(constructBatchDeleteDocRequest(0, numDocsToAdd), updateResp);
});
}
/**
* Construct a batch add document request with a series of very simple Solr docs with increasing IDs.
* @param startId the document ID to begin with
* @param batchSize the number of documents to include in the batch
* @return a SolrQueryRequestBase
*/
private SolrQueryRequestBase constructBatchAddDocRequest(int startId, int batchSize) {
return constructBatchRequestHelper(startId, batchSize, MaxSizeAutoCommitTest::addDoc);
}
/**
* Construct a batch delete document request, with IDs incrementing from startId
* @param startId the document ID to begin with
* @param batchSize the number of documents to include in the batch
* @return a SolrQueryRequestBase
*/
private SolrQueryRequestBase constructBatchDeleteDocRequest(int startId, int batchSize) {
return constructBatchRequestHelper(startId, batchSize, MaxSizeAutoCommitTest::delDoc);
}
/**
* Helper for constructing a batch update request
* @param startId the document ID to begin with
* @param batchSize the number of documents to include in the batch
* @param requestFn a function that takes an (int) ID and returns an XML string of the request to add to the batch request
* @return a SolrQueryRequestBase
*/
private SolrQueryRequestBase constructBatchRequestHelper(int startId, int batchSize, Function<Integer, String> requestFn) {
SolrQueryRequestBase updateReq = new SolrQueryRequestBase(core, new MapSolrParams(new HashMap<>())) {};
List<String> docs = new ArrayList<>();
for (int i = startId; i < startId + batchSize; i++) {
docs.add(requestFn.apply(i));
}
updateReq.setContentStreams(toContentStreams(docs));
return updateReq;
}
/**
* Convert the given list of strings into a list of streams, for Solr update requests
* @param strs strings to convert into streams
* @return list of streams
*/
private List<ContentStream> toContentStreams(List<String> strs) {
ArrayList<ContentStream> streams = new ArrayList<>();
for (String str : strs) {
streams.addAll(ClientUtils.toContentStreams(str, "text/xml"));
}
return streams;
}
private static final class MockEventListener implements SolrEventListener {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public MockEventListener() {
/* No-Op */
}
// use capacity bound Queue just so we're sure we don't OOM
public final BlockingQueue<Long> hard = new LinkedBlockingQueue<>(1000);
// if non enpty, then at least one offer failed (queues full)
private StringBuffer fail = new StringBuffer();
@Override
public void init(@SuppressWarnings({"rawtypes"})NamedList args) {}
@Override
public void newSearcher(SolrIndexSearcher newSearcher, SolrIndexSearcher currentSearcher) {
// No-Op
}
@Override
public void postCommit() {
Long now = System.nanoTime();
if (!hard.offer(now)) fail.append(", hardCommit @ " + now);
}
@Override
public void postSoftCommit() {
// No-Op
}
public void clear() {
hard.clear();
fail.setLength(0);
}
public void doStuffAndExpectAtLeastOneCommit(final CommitTracker commitTracker,
final DirectUpdateHandler2 updateHandler,
final Runnable stuff) throws InterruptedException {
assertSaneOffers();
final int POLL_TIME = 5;
final TimeUnit POLL_UNIT = TimeUnit.SECONDS;
final int preAutoCommitCount = commitTracker.getCommitCount();
log.info("Auto-Commit count prior to doing work: {}", preAutoCommitCount);
stuff.run();
log.info("Work Completed");
int numIters = 0;
Long lastPostCommitTimeStampSeen = null;
final long startTimeNanos = System.nanoTime();
final long cutOffTime = startTimeNanos + TimeUnit.SECONDS.toNanos(300);
while (System.nanoTime() < cutOffTime) {
numIters++;
log.info("Polling at most {} {} for expected (post-)commit#{}", POLL_TIME, POLL_UNIT, numIters);
lastPostCommitTimeStampSeen = hard.poll(POLL_TIME, POLL_UNIT);
assertNotNull("(post-)commit#" + numIters + " didn't occur in allowed time frame",
lastPostCommitTimeStampSeen);
synchronized (commitTracker) {
final int currentAutoCommitCount = commitTracker.getCommitCount() - preAutoCommitCount;
final long currentFileSize = updateHandler.getUpdateLog().getCurrentLogSizeFromStream();
if ((currentFileSize < MAX_FILE_SIZE) &&
(currentAutoCommitCount == numIters) &&
( ! commitTracker.hasPending() )) {
// if all of these condiions are met, then we should be completely done
assertSaneOffers(); // last minute sanity check
return;
}
// else: log & loop...
log.info("(Auto-)commits triggered: {}; (post-)commits seen: {}; current tlog file size: {}",
currentAutoCommitCount, numIters, currentFileSize);
}
}
// if we didn't return already, then we ran out of time
fail("Exhausted cut off time polling for post-commit events (got " + numIters + ")");
}
public void assertSaneOffers() {
assertEquals("Failure of MockEventListener" + fail.toString(),
0, fail.length());
}
}
}