blob: 365c396b577b0e41dda45204f24985d4c16da4d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SplitShardTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String COLLECTION_NAME = "splitshardtest-collection";
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("metricsEnabled", "true");
configureCluster(1)
.addConfig("conf", configset("cloud-minimal"))
.configure();
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
cluster.deleteAllCollections();
}
@Test
public void doTest() throws IOException, SolrServerException {
CollectionAdminRequest
.createCollection(COLLECTION_NAME, "conf", 2, 1)
.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.setMaxShardsPerNode(100)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(COLLECTION_NAME)
.setNumSubShards(5)
.setShardName("shard1");
splitShard.process(cluster.getSolrClient());
waitForState("Timed out waiting for sub shards to be active. Number of active shards=" +
cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getActiveSlices().size(),
COLLECTION_NAME, activeClusterShape(6, 7));
try {
splitShard = CollectionAdminRequest.splitShard(COLLECTION_NAME).setShardName("shard2").setNumSubShards(10);
splitShard.process(cluster.getSolrClient());
fail("SplitShard should throw an exception when numSubShards > 8");
} catch (HttpSolrClient.RemoteSolrException ex) {
assertTrue(ex.getMessage().contains("A shard can only be split into 2 to 8 subshards in one split request."));
}
try {
splitShard = CollectionAdminRequest.splitShard(COLLECTION_NAME).setShardName("shard2").setNumSubShards(1);
splitShard.process(cluster.getSolrClient());
fail("SplitShard should throw an exception when numSubShards < 2");
} catch (HttpSolrClient.RemoteSolrException ex) {
assertTrue(ex.getMessage().contains("A shard can only be split into 2 to 8 subshards in one split request. Provided numSubShards=1"));
}
}
@Test
public void multipleOptionsSplitTest() throws IOException, SolrServerException {
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(COLLECTION_NAME)
.setNumSubShards(5)
.setRanges("0-c,d-7fffffff")
.setShardName("shard1");
boolean expectedException = false;
try {
splitShard.process(cluster.getSolrClient());
fail("An exception should have been thrown");
} catch (SolrException ex) {
expectedException = true;
}
assertTrue("Expected SolrException but it didn't happen", expectedException);
}
@Test
public void testSplitFuzz() throws Exception {
String collectionName = "splitFuzzCollection";
CollectionAdminRequest
.createCollection(collectionName, "conf", 2, 1)
.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.setMaxShardsPerNode(100)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, 2, 2);
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
.setSplitFuzz(0.5f)
.setShardName("shard1");
splitShard.process(cluster.getSolrClient());
waitForState("Timed out waiting for sub shards to be active. Number of active shards=" +
cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName).getActiveSlices().size(),
collectionName, activeClusterShape(3, 4));
DocCollection coll = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName);
Slice s1_0 = coll.getSlice("shard1_0");
Slice s1_1 = coll.getSlice("shard1_1");
long fuzz = ((long)Integer.MAX_VALUE >> 3) + 1L;
long delta0 = s1_0.getRange().max - s1_0.getRange().min;
long delta1 = s1_1.getRange().max - s1_1.getRange().min;
long expected0 = (Integer.MAX_VALUE >> 1) + fuzz;
long expected1 = (Integer.MAX_VALUE >> 1) - fuzz;
assertEquals("wrong range in s1_0", expected0, delta0);
assertEquals("wrong range in s1_1", expected1, delta1);
}
CloudSolrClient createCollection(String collectionName, int repFactor) throws Exception {
CollectionAdminRequest
.createCollection(collectionName, "conf", 1, repFactor)
.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.setMaxShardsPerNode(100)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, 1, repFactor);
CloudSolrClient client = cluster.getSolrClient();
client.setDefaultCollection(collectionName);
return client;
}
long getNumDocs(CloudSolrClient client) throws Exception {
String collectionName = client.getDefaultCollection();
DocCollection collection = client.getZkStateReader().getClusterState().getCollection(collectionName);
Collection<Slice> slices = collection.getSlices();
long totCount = 0;
for (Slice slice : slices) {
if (!slice.getState().equals(Slice.State.ACTIVE)) continue;
long lastReplicaCount = -1;
for (Replica replica : slice.getReplicas()) {
SolrClient replicaClient = getHttpSolrClient(replica.getBaseUrl() + "/" + replica.getCoreName());
long numFound = 0;
try {
numFound = replicaClient.query(params("q", "*:*", "distrib", "false")).getResults().getNumFound();
log.info("Replica count={} for {}", numFound, replica);
} finally {
replicaClient.close();
}
if (lastReplicaCount >= 0) {
assertEquals("Replica doc count for " + replica, lastReplicaCount, numFound);
}
lastReplicaCount = numFound;
}
totCount += lastReplicaCount;
}
long cloudClientDocs = client.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals("Sum of shard count should equal distrib query doc count", totCount, cloudClientDocs);
return totCount;
}
void doLiveSplitShard(String collectionName, int repFactor, int nThreads) throws Exception {
final CloudSolrClient client = createCollection(collectionName, repFactor);
final ConcurrentHashMap<String,Long> model = new ConcurrentHashMap<>(); // what the index should contain
final AtomicBoolean doIndex = new AtomicBoolean(true);
final AtomicInteger docsIndexed = new AtomicInteger();
Thread[] indexThreads = new Thread[nThreads];
try {
for (int i=0; i<nThreads; i++) {
indexThreads[i] = new Thread(() -> {
while (doIndex.get()) {
try {
// Thread.sleep(10); // cap indexing rate at 100 docs per second per thread
int currDoc = docsIndexed.incrementAndGet();
String docId = "doc_" + currDoc;
// Try all docs in the same update request
UpdateRequest updateReq = new UpdateRequest();
updateReq.add(sdoc("id", docId));
// UpdateResponse ursp = updateReq.commit(client, collectionName); // uncomment this if you want a commit each time
UpdateResponse ursp = updateReq.process(client, collectionName);
assertEquals(0, ursp.getStatus()); // for now, don't accept any failures
if (ursp.getStatus() == 0) {
model.put(docId, 1L); // in the future, keep track of a version per document and reuse ids to keep index from growing too large
}
} catch (Exception e) {
fail(e.getMessage());
break;
}
}
});
}
for (Thread thread : indexThreads) {
thread.start();
}
Thread.sleep(100); // wait for a few docs to be indexed before invoking split
int docCount = model.size();
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
.setShardName("shard1");
splitShard.process(client);
waitForState("Timed out waiting for sub shards to be active.",
collectionName, activeClusterShape(2, 3*repFactor)); // 2 repFactor for the new split shards, 1 repFactor for old replicas
// make sure that docs were able to be indexed during the split
assertTrue(model.size() > docCount);
Thread.sleep(100); // wait for a few more docs to be indexed after split
} finally {
// shut down the indexers
doIndex.set(false);
for (Thread thread : indexThreads) {
thread.join();
}
}
client.commit(); // final commit is needed for visibility
long numDocs = getNumDocs(client);
if (numDocs != model.size()) {
SolrDocumentList results = client.query(new SolrQuery("q","*:*", "fl","id", "rows", Integer.toString(model.size()) )).getResults();
Map<String,Long> leftover = new HashMap<>(model);
for (SolrDocument doc : results) {
String id = (String) doc.get("id");
leftover.remove(id);
}
log.error("MISSING DOCUMENTS: {}", leftover);
}
assertEquals("Documents are missing!", docsIndexed.get(), numDocs);
log.info("Number of documents indexed and queried : {}", numDocs);
}
@Test
public void testLiveSplit() throws Exception {
// Debugging tips: if this fails, it may be easier to debug by lowering the number fo threads to 1 and looping the test
// until you get another failure.
// You may need to further instrument things like DistributedZkUpdateProcessor to display the cluster state for the collection, etc.
// Using more threads increases the chance to hit a concurrency bug, but too many threads can overwhelm single-threaded buffering
// replay after the low level index split and result in subShard leaders that can't catch up and
// become active (a known issue that still needs to be resolved.)
doLiveSplitShard("livesplit1", 1, 4);
}
}