blob: 5238704be422c574271c9a610e3514eaefd08c13 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Super basic testing, no shard restarting or anything.
*/
@Slow
public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
@BeforeClass
public static void setupCluster() throws Exception {
// use a 5 node cluster so with a typical 2x2 collection one node isn't involved
// helps to randomly test edge cases of hitting a node not involved in collection
configureCluster(5).configure();
}
@After
public void purgeAllCollections() throws Exception {
cluster.deleteAllCollections();
cluster.getSolrClient().setDefaultCollection(null);
}
/**
* Creates a new 2x2 collection using a unique name, blocking until it's state is fully active,
* and sets that collection as the default on the cluster's default CloudSolrClient.
*
* @return the name of the new collection
*/
public static String createAndSetNewDefaultCollection() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
final String name = "test_collection_" + NAME_COUNTER.getAndIncrement();
assertEquals(RequestStatusState.COMPLETED,
CollectionAdminRequest.createCollection(name, "_default", 2, 2)
.processAndWait(cloudClient, DEFAULT_TIMEOUT));
cloudClient.waitForState(name, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
cloudClient.setDefaultCollection(name);
return name;
}
@Test
public void testBasicUpdates() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
final String collectionName = createAndSetNewDefaultCollection();
// add a doc, update it, and delete it
addUpdateDelete("doc1");
assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
// add 2 docs in a single request
addTwoDocsInOneRequest("doc2", "doc3");
assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound());
// 2 deletes in a single request...
assertEquals(0, (new UpdateRequest().deleteById("doc2").deleteById("doc3"))
.process(cloudClient).getStatus());
assertEquals(0, cloudClient.commit().getStatus());
assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
// add a doc that we will then delete later after adding two other docs (all before next commit).
assertEquals(0, cloudClient.add(sdoc("id", "doc4", "content_s", "will_delete_later")).getStatus());
assertEquals(0, cloudClient.add(sdocs(sdoc("id", "doc5"),
sdoc("id", "doc6"))).getStatus());
assertEquals(0, cloudClient.deleteById("doc4").getStatus());
assertEquals(0, cloudClient.commit().getStatus());
assertEquals(0, cloudClient.query(params("q", "id:doc4")).getResults().getNumFound());
assertEquals(1, cloudClient.query(params("q", "id:doc5")).getResults().getNumFound());
assertEquals(1, cloudClient.query(params("q", "id:doc6")).getResults().getNumFound());
assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound());
checkShardConsistency(params("q","*:*", "rows", "9999","_trace","post_doc_5_6"));
// delete everything....
assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus());
assertEquals(0, cloudClient.commit().getStatus());
assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll"));
}
public void testDeleteByIdImplicitRouter() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
final String name = "implicit_collection_without_routerfield_" + NAME_COUNTER.getAndIncrement();
assertEquals(RequestStatusState.COMPLETED,
CollectionAdminRequest.createCollectionWithImplicitRouter(name, "_default", "shard1,shard2", 2)
.processAndWait(cloudClient, DEFAULT_TIMEOUT));
cloudClient.waitForState(name, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
cloudClient.setDefaultCollection(name);
final DocCollection docCol = cloudClient.getZkStateReader().getClusterState().getCollection(name);
try (SolrClient shard1 = getHttpSolrClient(docCol.getSlice("shard1").getLeader().getCoreUrl());
SolrClient shard2 = getHttpSolrClient(docCol.getSlice("shard2").getLeader().getCoreUrl())) {
// Add three documents to shard1
shard1.add(sdoc("id", "1", "title", "s1 one"));
shard1.add(sdoc("id", "2", "title", "s1 two"));
shard1.add(sdoc("id", "3", "title", "s1 three"));
shard1.commit();
final AtomicInteger docCounts1 = new AtomicInteger(3);
// Add two documents to shard2
shard2.add(sdoc("id", "4", "title", "s2 four"));
shard2.add(sdoc("id", "5", "title", "s2 five"));
shard2.commit();
final AtomicInteger docCounts2 = new AtomicInteger(2);
// A re-usable helper to verify that the expected number of documents can be found on each shard...
Runnable checkShardCounts = () -> {
try {
// including cloudClient helps us test view from other nodes that aren't the leaders...
for (SolrClient c : Arrays.asList(cloudClient, shard1, shard2)) {
assertEquals(docCounts1.get() + docCounts2.get(), c.query(params("q", "*:*")).getResults().getNumFound());
assertEquals(docCounts1.get(), c.query(params("q", "*:*", "shards", "shard1")).getResults().getNumFound());
assertEquals(docCounts2.get(), c.query(params("q", "*:*", "shards", "shard2")).getResults().getNumFound());
assertEquals(docCounts1.get() + docCounts2.get(), c.query(params("q", "*:*", "shards", "shard2,shard1")).getResults().getNumFound());
}
assertEquals(docCounts1.get(), shard1.query(params("q", "*:*", "distrib", "false")).getResults().getNumFound());
assertEquals(docCounts2.get(), shard2.query(params("q", "*:*", "distrib", "false")).getResults().getNumFound());
} catch (Exception sse) {
throw new RuntimeException(sse);
}
};
checkShardCounts.run();
{ // Send a delete request for a doc on shard1 to core hosting shard1 with NO routing info
// Should delete (implicitly) since doc is (implicitly) located on this shard
final UpdateRequest deleteRequest = new UpdateRequest();
deleteRequest.deleteById("1");
shard1.request(deleteRequest);
shard1.commit();
docCounts1.decrementAndGet();
}
checkShardCounts.run();
{ // Send a delete request to core hosting shard1 with a route param for a document that is actually in shard2
// Should delete.
final UpdateRequest deleteRequest = new UpdateRequest();
deleteRequest.deleteById("4").withRoute("shard2");
shard1.request(deleteRequest);
shard1.commit();
docCounts2.decrementAndGet();
}
checkShardCounts.run();
{ // Send a delete request to core hosting shard1 with NO route param for a document that is actually in shard2
// Shouldn't delete, since deleteById requests are not broadcast to all shard leaders.
// (This is effictively a request to delete "5" if an only if it is on shard1)
final UpdateRequest deleteRequest = new UpdateRequest();
deleteRequest.deleteById("5");
shard1.request(deleteRequest);
shard1.commit();
}
checkShardCounts.run();
{ // Multiple deleteById commands for different shards in a single request
final UpdateRequest deleteRequest = new UpdateRequest();
deleteRequest.deleteById("2", "shard1");
deleteRequest.deleteById("5", "shard2");
shard1.request(deleteRequest);
shard1.commit();
docCounts1.decrementAndGet();
docCounts2.decrementAndGet();
}
checkShardCounts.run();
}
}
public void testDeleteByIdCompositeRouterWithRouterField() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
final String name = "composite_collection_with_routerfield_" + NAME_COUNTER.getAndIncrement();
assertEquals(RequestStatusState.COMPLETED,
CollectionAdminRequest.createCollection(name, "_default", 2, 2)
.setRouterName("compositeId")
.setRouterField("routefield_s")
.setShards("shard1,shard2")
.processAndWait(cloudClient, DEFAULT_TIMEOUT));
cloudClient.waitForState(name, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
cloudClient.setDefaultCollection(name);
final DocCollection docCol = cloudClient.getZkStateReader().getClusterState().getCollection(name);
try (SolrClient shard1 = getHttpSolrClient(docCol.getSlice("shard1").getLeader().getCoreUrl());
SolrClient shard2 = getHttpSolrClient(docCol.getSlice("shard2").getLeader().getCoreUrl())) {
// Add three documents w/diff routes (all sent to shard1 leader's core)
shard1.add(sdoc("id", "1", "routefield_s", "europe"));
shard1.add(sdoc("id", "3", "routefield_s", "europe"));
shard1.add(sdoc("id", "5", "routefield_s", "africa"));
shard1.commit();
// Add two documents w/diff routes (all sent to shard2 leader's core)
shard2.add(sdoc("id", "4", "routefield_s", "africa"));
shard2.add(sdoc("id", "2", "routefield_s", "europe"));
shard2.commit();
final AtomicInteger docCountsEurope = new AtomicInteger(3);
final AtomicInteger docCountsAfrica = new AtomicInteger(2);
// A re-usable helper to verify that the expected number of documents can be found based on _route_ key...
Runnable checkShardCounts = () -> {
try {
// including cloudClient helps us test view from other nodes that aren't the leaders...
for (SolrClient c : Arrays.asList(cloudClient, shard1, shard2)) {
assertEquals(docCountsEurope.get() + docCountsAfrica.get(), c.query(params("q", "*:*")).getResults().getNumFound());
assertEquals(docCountsEurope.get(), c.query(params("q", "*:*", "_route_", "europe")).getResults().getNumFound());
assertEquals(docCountsAfrica.get(), c.query(params("q", "*:*", "_route_", "africa")).getResults().getNumFound());
}
} catch (Exception sse) {
throw new RuntimeException(sse);
}
};
checkShardCounts.run();
{ // Send a delete request to core hosting shard1 with a route param for a document that was originally added via core on shard2
final UpdateRequest deleteRequest = new UpdateRequest();
deleteRequest.deleteById("4", "africa");
shard1.request(deleteRequest);
shard1.commit();
docCountsAfrica.decrementAndGet();
}
checkShardCounts.run();
{ // Multiple deleteById commands with different routes in a single request
final UpdateRequest deleteRequest = new UpdateRequest();
deleteRequest.deleteById("2", "europe");
deleteRequest.deleteById("5", "africa");
shard1.request(deleteRequest);
shard1.commit();
docCountsEurope.decrementAndGet();
docCountsAfrica.decrementAndGet();
}
checkShardCounts.run();
}
}
public void testThatCantForwardToLeaderFails() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
final String collectionName = "test_collection_" + NAME_COUNTER.getAndIncrement();
cloudClient.setDefaultCollection(collectionName);
// get a random node for use in our collection before creating the one we'll partition..
final JettySolrRunner otherLeader = cluster.getRandomJetty(random());
// pick a (second) random node (which may be the same) for sending updates to
// (if it's the same, we're testing routing from another shard, if diff we're testing routing
// from a non-collection node)
final String indexingUrl = cluster.getRandomJetty(random()).getProxyBaseUrl() + "/" + collectionName;
// create a new node for the purpose of killing it...
final JettySolrRunner leaderToPartition = cluster.startJettySolrRunner();
try {
cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT);
// HACK: we have to stop the node in order to enable the proxy, in order to then restart the node
// (in order to then "partition it" later via the proxy)
final SocketProxy proxy = new SocketProxy();
cluster.stopJettySolrRunner(leaderToPartition);
cluster.waitForJettyToStop(leaderToPartition);
leaderToPartition.setProxyPort(proxy.getListenPort());
cluster.startJettySolrRunner(leaderToPartition);
proxy.open(leaderToPartition.getBaseUrl().toURI());
try {
log.info("leaderToPartition's Proxy: {}", proxy);
cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT);
// create a 2x1 collection using a nodeSet that includes our leaderToPartition...
assertEquals(RequestStatusState.COMPLETED,
CollectionAdminRequest.createCollection(collectionName, 2, 1)
.setCreateNodeSet(leaderToPartition.getNodeName() + "," + otherLeader.getNodeName())
.processAndWait(cloudClient, DEFAULT_TIMEOUT));
cloudClient.waitForState(collectionName, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 2, 1));
{ // HACK: Check the leaderProps for the shard hosted on the node we're going to kill...
final Replica leaderProps = cloudClient.getZkStateReader()
.getClusterState().getCollection(collectionName)
.getLeaderReplicas(leaderToPartition.getNodeName()).get(0);
// No point in this test if these aren't true...
assertNotNull("Sanity check: leaderProps isn't a leader?: " + leaderProps.toString(),
leaderProps.getStr(Slice.LEADER));
assertTrue("Sanity check: leaderProps isn't using the proxy port?: " + leaderProps.toString(),
leaderProps.getCoreUrl().contains(""+proxy.getListenPort()));
}
// create client to send our updates to...
try (HttpSolrClient indexClient = getHttpSolrClient(indexingUrl)) {
// Sanity check: we should be able to send a bunch of updates that work right now...
for (int i = 0; i < 100; i++) {
final UpdateResponse rsp = indexClient.add
(sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
assertEquals(0, rsp.getStatus());
}
log.info("Closing leaderToPartition's proxy: {}", proxy);
proxy.close(); // NOTE: can't use halfClose, won't ensure a garunteed failure
final SolrException e = expectThrows(SolrException.class, () -> {
// start at 50 so that we have some "updates" to previous docs and some "adds"...
for (int i = 50; i < 250; i++) {
// Pure random odds of all of these docs belonging to the live shard are 1 in 2**200...
// Except we know the hashing algorithm isn't purely random,
// So the actual odds are "0" unless the hashing algorithm is changed to suck badly...
final UpdateResponse rsp = indexClient.add
(sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
// if the update didn't throw an exception, it better be a success..
assertEquals(0, rsp.getStatus());
}
});
assertEquals(500, e.code());
}
} finally {
proxy.close(); // don't leak this port
}
} finally {
cluster.stopJettySolrRunner(leaderToPartition); // don't let this jetty bleed into other tests
cluster.waitForJettyToStop(leaderToPartition);
}
}
/** NOTE: uses the cluster's CloudSolrClient and asumes default collection has been set */
private void addTwoDocsInOneRequest(String docIdA, String docIdB) throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
assertEquals(0, cloudClient.add(sdocs(sdoc("id", docIdA),
sdoc("id", docIdB))).getStatus());
assertEquals(0, cloudClient.commit().getStatus());
assertEquals(2, cloudClient.query(params("q","id:(" + docIdA + " OR " + docIdB + ")")
).getResults().getNumFound());
checkShardConsistency(params("q","*:*", "rows", "99","_trace","two_docs"));
}
/** NOTE: uses the cluster's CloudSolrClient and asumes default collection has been set */
private void addUpdateDelete(String docId) throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
// add the doc, confirm we can query it...
assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "originalcontent")).getStatus());
assertEquals(0, cloudClient.commit().getStatus());
assertEquals(1, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound());
assertEquals(1, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound());
assertEquals(1,
cloudClient.query(params("q", "content_t:originalcontent AND id:" + docId))
.getResults().getNumFound());
checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","original_doc"));
// update doc
assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "updatedcontent")).getStatus());
assertEquals(0, cloudClient.commit().getStatus());
// confirm we can query the doc by updated content and not original...
assertEquals(0, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound());
assertEquals(1, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound());
assertEquals(1,
cloudClient.query(params("q", "content_t:updatedcontent AND id:" + docId))
.getResults().getNumFound());
// delete the doc, confim it no longer matches in queries...
assertEquals(0, cloudClient.deleteById(docId).getStatus());
assertEquals(0, cloudClient.commit().getStatus());
assertEquals(0, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound());
assertEquals(0, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound());
checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","del_updated_doc"));
}
public long testIndexQueryDeleteHierarchical() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
final String collectionName = createAndSetNewDefaultCollection();
// index
long docId = 42;
int topDocsNum = atLeast(5);
int childsNum = 5+random().nextInt(5);
for (int i = 0; i < topDocsNum; ++i) {
UpdateRequest uReq = new UpdateRequest();
SolrInputDocument topDocument = new SolrInputDocument();
topDocument.addField("id", docId++);
topDocument.addField("type_s", "parent");
topDocument.addField(i + "parent_f1_s", "v1");
topDocument.addField(i + "parent_f2_s", "v2");
for (int index = 0; index < childsNum; ++index) {
docId = addChildren("child", topDocument, index, false, docId);
}
uReq.add(topDocument);
assertEquals(i + "/" + docId,
0, uReq.process(cloudClient).getStatus());
}
assertEquals(0, cloudClient.commit().getStatus());
checkShardConsistency(params("q","*:*", "rows", "9999","_trace","added_all_top_docs_with_kids"));
// query
// parents
assertEquals(topDocsNum,
cloudClient.query(new SolrQuery("type_s:parent")).getResults().getNumFound());
// childs
assertEquals(topDocsNum * childsNum,
cloudClient.query(new SolrQuery("type_s:child")).getResults().getNumFound());
// grandchilds
//
//each topDoc has t childs where each child has x = 0 + 2 + 4 + ..(t-1)*2 grands
//x = 2 * (1 + 2 + 3 +.. (t-1)) => arithmetic summ of t-1
//x = 2 * ((t-1) * t / 2) = t * (t - 1)
assertEquals(topDocsNum * childsNum * (childsNum - 1),
cloudClient.query(new SolrQuery("type_s:grand")).getResults().getNumFound());
//delete
assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus());
assertEquals(0, cloudClient.commit().getStatus());
assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll"));
return docId;
}
/**
* Recursive helper function for building out child and grandchild docs
*/
private long addChildren(String prefix, SolrInputDocument topDocument, int childIndex, boolean lastLevel, long docId) {
SolrInputDocument childDocument = new SolrInputDocument();
childDocument.addField("id", docId++);
childDocument.addField("type_s", prefix);
for (int index = 0; index < childIndex; ++index) {
childDocument.addField(childIndex + prefix + index + "_s", childIndex + "value"+ index);
}
if (!lastLevel) {
for (int i = 0; i < childIndex * 2; ++i) {
docId = addChildren("grand", childDocument, i, true, docId);
}
}
topDocument.addChildDocument(childDocument);
return docId;
}
public void testIndexingOneDocPerRequestWithHttpSolrClient() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
final String collectionName = createAndSetNewDefaultCollection();
final int numDocs = atLeast(50);
for (int i = 0; i < numDocs; i++) {
UpdateRequest uReq;
uReq = new UpdateRequest();
assertEquals(0, cloudClient.add
(sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200))).getStatus());
}
assertEquals(0, cloudClient.commit().getStatus());
assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
}
public void testIndexingBatchPerRequestWithHttpSolrClient() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
final String collectionName = createAndSetNewDefaultCollection();
final int numDocsPerBatch = atLeast(5);
final int numBatchesPerThread = atLeast(5);
final CountDownLatch abort = new CountDownLatch(1);
class BatchIndexer implements Runnable {
private boolean keepGoing() {
return 0 < abort.getCount();
}
final int name;
public BatchIndexer(int name) {
this.name = name;
}
@Override
public void run() {
try {
for (int batchId = 0; batchId < numBatchesPerThread && keepGoing(); batchId++) {
final UpdateRequest req = new UpdateRequest();
for (int docId = 0; docId < numDocsPerBatch && keepGoing(); docId++) {
req.add(sdoc("id", "indexer" + name + "_" + batchId + "_" + docId,
"test_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
}
assertEquals(0, req.process(cloudClient).getStatus());
}
} catch (Throwable e) {
abort.countDown();
throw new RuntimeException(e);
}
}
};
final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("batchIndexing");
final int numThreads = random().nextInt(TEST_NIGHTLY ? 4 : 2) + 1;
final List<Future<?>> futures = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
futures.add(executor.submit(new BatchIndexer(i)));
}
final int totalDocsExpected = numThreads * numBatchesPerThread * numDocsPerBatch;
ExecutorUtil.shutdownAndAwaitTermination(executor);
for (@SuppressWarnings({"rawtypes"})Future result : futures) {
assertFalse(result.isCancelled());
assertTrue(result.isDone());
// all we care about is propogating any possibile execution exception...
final Object ignored = result.get();
}
cloudClient.commit();
assertEquals(totalDocsExpected, cloudClient.query(params("q","*:*")).getResults().getNumFound());
checkShardConsistency(params("q","*:*", "rows", ""+totalDocsExpected, "_trace","batches_done"));
}
public void testConcurrentIndexing() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
final String collectionName = createAndSetNewDefaultCollection();
final int numDocs = atLeast(50);
final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random());
try (ConcurrentUpdateSolrClient indexClient
= getConcurrentUpdateSolrClient(nodeToUpdate.getProxyBaseUrl() + "/" + collectionName, 10, 2)) {
for (int i = 0; i < numDocs; i++) {
indexClient.add(sdoc("id", i, "text_t",
TestUtil.randomRealisticUnicodeString(random(), 200)));
}
indexClient.blockUntilFinished();
assertEquals(0, indexClient.commit().getStatus());
assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
}
}
/**
* Inspects the cluster to determine all active shards/replicas for the default collection then,
* executes a <code>distrib=false</code> query using the specified params, and compares the resulting
* {@link SolrDocumentList}, failing if any replica does not agree with it's leader.
*
* @see #cluster
* @see CloudInspectUtil#showDiff
*/
private void checkShardConsistency(final SolrParams params) throws Exception {
// TODO: refactor into static in CloudInspectUtil w/ DocCollection param?
// TODO: refactor to take in a BiFunction<QueryResponse,QueryResponse,Boolean> ?
final SolrParams perReplicaParams = SolrParams.wrapDefaults(params("distrib", "false"),
params);
final DocCollection collection = cluster.getSolrClient().getZkStateReader()
.getClusterState().getCollection(cluster.getSolrClient().getDefaultCollection());
log.info("Checking shard consistency via: {}", perReplicaParams);
for (Map.Entry<String,Slice> entry : collection.getActiveSlicesMap().entrySet()) {
final String shardName = entry.getKey();
final Slice slice = entry.getValue();
log.info("Checking: {} -> {}", shardName, slice);
final Replica leader = entry.getValue().getLeader();
try (HttpSolrClient leaderClient = getHttpSolrClient(leader.getCoreUrl())) {
final SolrDocumentList leaderResults = leaderClient.query(perReplicaParams).getResults();
log.debug("Shard {}: Leader results: {}", shardName, leaderResults);
for (Replica replica : slice) {
try (HttpSolrClient replicaClient = getHttpSolrClient(replica.getCoreUrl())) {
final SolrDocumentList replicaResults = replicaClient.query(perReplicaParams).getResults();
if (log.isDebugEnabled()) {
log.debug("Shard {}: Replica ({}) results: {}", shardName, replica.getCoreName(), replicaResults);
}
assertEquals("inconsistency w/leader: shard=" + shardName + "core=" + replica.getCoreName(),
Collections.emptySet(),
CloudInspectUtil.showDiff(leaderResults, replicaResults,
shardName + " leader: " + leader.getCoreUrl(),
shardName + ": " + replica.getCoreUrl()));
}
}
}
}
}
}