blob: 60c704cd6d32cd65c52b901e301ece9daa605352 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.api.collections;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.BasicDistributedZkTest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.StoppableIndexingThread;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.HashBasedRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
@Slow
@LogLevel("org.apache.solr.common.cloud.PerReplicaStates=DEBUG;org.apache.solr.common.cloud=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.cloud.api.collections=DEBUG;org.apache.solr.cloud.OverseerTaskProcessor=DEBUG;org.apache.solr.util.TestInjection=DEBUG")
public class ShardSplitTest extends BasicDistributedZkTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String SHARD1_0 = SHARD1 + "_0";
private static final String SHARD1_1 = SHARD1 + "_1";
public ShardSplitTest() {
schemaString = "schema15.xml"; // we need a string id
}
@Override
public void distribSetUp() throws Exception {
super.distribSetUp();
useFactory(null);
}
@Test
@Nightly
public void test() throws Exception {
waitForThingsToLevelOut(15);
if (usually()) {
log.info("Using legacyCloud=false for cluster");
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
.process(cloudClient);
}
incompleteOrOverlappingCustomRangeTest();
splitByUniqueKeyTest();
splitByRouteFieldTest();
splitByRouteKeyTest();
// todo can't call waitForThingsToLevelOut because it looks for jettys of all shards
// and the new sub-shards don't have any.
waitForRecoveriesToFinish(true);
//waitForThingsToLevelOut(15);
}
/*
Creates a collection with replicationFactor=1, splits a shard. Restarts the sub-shard leader node.
Add a replica. Ensure count matches in leader and replica.
*/
@Test
public void testSplitStaticIndexReplication() throws Exception {
doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod.REWRITE);
}
@Test
public void testSplitStaticIndexReplicationLink() throws Exception {
doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod.LINK);
}
private void doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
waitForThingsToLevelOut(15);
DocCollection defCol = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
Replica replica = defCol.getReplicas().get(0);
String nodeName = replica.getNodeName();
String collectionName = "testSplitStaticIndexReplication_" + splitMethod.toLower();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
create.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE);
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
create.setCreateNodeSet(nodeName); // we want to create the leader on a fixed node so that we know which one to restart later
create.process(cloudClient);
cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(1, 1));
try (CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), true, cloudClient.getLbClient().getHttpClient())) {
client.setDefaultCollection(collectionName);
StoppableIndexingThread thread = new StoppableIndexingThread(controlClient, client, "i1", true);
try {
thread.start();
Thread.sleep(1000); // give the indexer sometime to do its work
thread.safeStop();
thread.join();
client.commit();
controlClient.commit();
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
splitShard.setShardName(SHARD1);
splitShard.setSplitMethod(splitMethod.toLower());
String asyncId = splitShard.processAsync(client);
RequestStatusState state = CollectionAdminRequest.requestStatus(asyncId).waitFor(client, 120);
if (state == RequestStatusState.COMPLETED) {
waitForRecoveriesToFinish(collectionName, true);
// let's wait to see parent shard become inactive
CountDownLatch latch = new CountDownLatch(1);
client.getZkStateReader().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> {
Slice parent = collectionState.getSlice(SHARD1);
Slice slice10 = collectionState.getSlice(SHARD1_0);
Slice slice11 = collectionState.getSlice(SHARD1_1);
if (slice10 != null && slice11 != null &&
parent.getState() == Slice.State.INACTIVE &&
slice10.getState() == Slice.State.ACTIVE &&
slice11.getState() == Slice.State.ACTIVE) {
latch.countDown();
return true; // removes the watch
}
return false;
});
latch.await(1, TimeUnit.MINUTES);
if (latch.getCount() != 0) {
// sanity check
fail("Sub-shards did not become active even after waiting for 1 minute");
}
int liveNodeCount = client.getZkStateReader().getClusterState().getLiveNodes().size();
// restart the sub-shard leader node
String stoppedNodeName = null;
boolean restarted = false;
for (JettySolrRunner jetty : jettys) {
int port = jetty.getBaseUrl().getPort();
if (replica.getBaseUrl().contains(":" + port)) {
stoppedNodeName = jetty.getNodeName();
jetty.stop();
jetty.start();
restarted = true;
break;
}
}
if (!restarted) {
// sanity check
fail("We could not find a jetty to kill for replica: " + replica.getCoreUrl());
}
cloudClient.getZkStateReader().waitForLiveNodes(30, TimeUnit.SECONDS, SolrCloudTestCase.containsLiveNode(stoppedNodeName));
// add a new replica for the sub-shard
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collectionName, SHARD1_0);
// use control client because less chances of it being the node being restarted
// this is to avoid flakiness of test because of NoHttpResponseExceptions
String control_collection = client.getZkStateReader().getClusterState().getCollection("control_collection").getReplicas().get(0).getBaseUrl();
try (HttpSolrClient control = new HttpSolrClient.Builder(control_collection).withHttpClient(client.getLbClient().getHttpClient()).build()) {
state = addReplica.processAndWait(control, 30);
}
cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(2, 4));
if (state == RequestStatusState.COMPLETED) {
CountDownLatch newReplicaLatch = new CountDownLatch(1);
client.getZkStateReader().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> {
if (liveNodes.size() != liveNodeCount) {
return false;
}
Slice slice = collectionState.getSlice(SHARD1_0);
if (slice.getReplicas().size() == 2) {
if (slice.getReplicas().stream().noneMatch(r -> r.getState() == Replica.State.RECOVERING)) {
// we see replicas and none of them are recovering
newReplicaLatch.countDown();
return true;
}
}
return false;
});
newReplicaLatch.await(30, TimeUnit.SECONDS);
// check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't
// handle new shards/replica so well.
ClusterState clusterState = client.getZkStateReader().getClusterState();
DocCollection collection = clusterState.getCollection(collectionName);
int numReplicasChecked = assertConsistentReplicas(collection.getSlice(SHARD1_0));
assertEquals("We should have checked consistency for exactly 2 replicas of shard1_0", 2, numReplicasChecked);
} else {
fail("Adding a replica to sub-shard did not complete even after waiting for 30 seconds!. Saw state = " + state.getKey());
}
} else {
fail("We expected shard split to succeed on a static index but it didn't. Found state = " + state.getKey());
}
} finally {
thread.safeStop();
thread.join();
}
}
}
private int assertConsistentReplicas(Slice shard) throws SolrServerException, IOException {
long numFound = Long.MIN_VALUE;
int count = 0;
for (Replica replica : shard.getReplicas()) {
HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl())
.withHttpClient(cloudClient.getLbClient().getHttpClient()).build();
QueryResponse response = client.query(new SolrQuery("q", "*:*", "distrib", "false"));
if (log.isInfoEnabled()) {
log.info("Found numFound={} on replica: {}", response.getResults().getNumFound(), replica.getCoreUrl());
}
if (numFound == Long.MIN_VALUE) {
numFound = response.getResults().getNumFound();
} else {
assertEquals("Shard " + shard.getName() + " replicas do not have same number of documents", numFound, response.getResults().getNumFound());
}
count++;
}
return count;
}
/**
* Used to test that we can split a shard when a previous split event
* left sub-shards in construction or recovery state.
*
* See SOLR-9439
*/
@Test
//05-Jul-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
// commented out on: 24-Dec-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 15-Sep-2018
public void testSplitAfterFailedSplit() throws Exception {
waitForThingsToLevelOut(15);
TestInjection.splitFailureBeforeReplicaCreation = "true:100"; // we definitely want split to fail
try {
splitAfterFailedSplit();
} finally {
TestInjection.reset();
}
}
private void splitAfterFailedSplit() throws KeeperException, InterruptedException {
try {
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
splitShard.setShardName(SHARD1);
splitShard.process(cloudClient);
fail("Shard split was not supposed to succeed after failure injection!");
} catch (Exception e) {
// expected
}
// assert that sub-shards cores exist and sub-shard is in construction state
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
zkStateReader.forceUpdateCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
ClusterState state = zkStateReader.getClusterState();
DocCollection collection = state.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
// should be cleaned up
Slice shard10 = collection.getSlice(SHARD1_0);
assertNull(shard10);
Slice shard11 = collection.getSlice(SHARD1_1);
assertNull(shard11);
// lets retry the split
TestInjection.reset(); // let the split succeed
try {
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
splitShard.setShardName(SHARD1);
splitShard.process(cloudClient);
// Yay!
} catch (Exception e) {
log.error("Shard split failed", e);
fail("Shard split did not succeed after a previous failed split attempt left sub-shards in construction state");
}
}
@Test
@Nightly
public void testSplitAfterFailedSplit2() throws Exception {
waitForThingsToLevelOut(15);
TestInjection.splitFailureAfterReplicaCreation = "true:100"; // we definitely want split to fail
try {
splitAfterFailedSplit();
} finally {
TestInjection.reset();
}
}
@Test
// commented out on: 17-Feb-2019 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 15-Sep-2018
public void testSplitMixedReplicaTypes() throws Exception {
doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod.REWRITE);
}
@Test
// commented out on: 17-Feb-2019 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 14-Oct-2018
public void testSplitMixedReplicaTypesLink() throws Exception {
doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod.LINK);
}
private void doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
waitForThingsToLevelOut(15);
String collectionName = "testSplitMixedReplicaTypes_" + splitMethod.toLower();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2, 0, 2); // TODO tlog replicas disabled right now.
create.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE);
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
create.process(cloudClient);
cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(1, 4));
waitForRecoveriesToFinish(collectionName, false);
for (int i = 0; i < 100; i++) {
cloudClient.add(collectionName, getDoc("id", "id-" + i, "foo_s", "bar " + i));
}
cloudClient.commit(collectionName);
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
splitShard.setShardName(SHARD1);
splitShard.setSplitMethod(splitMethod.toLower());
CollectionAdminResponse rsp = splitShard.process(cloudClient);
waitForThingsToLevelOut(30);
cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(2, 12));
cloudClient.getZkStateReader().forceUpdateCollection(collectionName);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
DocCollection coll = clusterState.getCollection(collectionName);
log.info("coll: {}", coll);
// verify the original shard
verifyShard(coll, SHARD1, Slice.State.INACTIVE, 2, 0, 2);
// verify new sub-shards
verifyShard(coll, SHARD1_0, Slice.State.ACTIVE, 2, 0, 2);
verifyShard(coll, SHARD1_1, Slice.State.ACTIVE, 2, 0, 2);
}
private void verifyShard(DocCollection coll, String shard, Slice.State expectedState, int numNrt, int numTlog, int numPull) throws Exception {
Slice s = coll.getSlice(shard);
assertEquals("unexpected shard state", expectedState, s.getState());
AtomicInteger actualNrt = new AtomicInteger();
AtomicInteger actualTlog = new AtomicInteger();
AtomicInteger actualPull = new AtomicInteger();
s.getReplicas().forEach(r -> {
switch (r.getType()) {
case NRT: actualNrt.incrementAndGet(); break;
case TLOG: actualTlog.incrementAndGet(); break;
case PULL: actualPull.incrementAndGet(); break;
}
});
assertEquals("actual NRT", numNrt, actualNrt.get());
assertEquals("actual TLOG", numTlog, actualTlog.get());
assertEquals("actual PULL", numPull, actualPull.get());
}
@Test
@Nightly
public void testSplitWithChaosMonkey() throws Exception {
waitForThingsToLevelOut(15);
log.info("Using legacyCloud=false for cluster");
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
.process(cloudClient);
List<StoppableIndexingThread> indexers = new ArrayList<>();
try {
for (int i = 0; i < 1; i++) {
StoppableIndexingThread thread = new StoppableIndexingThread(controlClient, cloudClient, String.valueOf(i), true);
indexers.add(thread);
thread.start();
}
Thread.sleep(1000); // give the indexers some time to do their work
} catch (Exception e) {
log.error("Error in test", e);
} finally {
for (StoppableIndexingThread indexer : indexers) {
indexer.safeStop();
indexer.join();
}
}
cloudClient.commit();
controlClient.commit();
AtomicBoolean stop = new AtomicBoolean();
AtomicBoolean killed = new AtomicBoolean(false);
Runnable monkey = () -> {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
zkStateReader.registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, (liveNodes, collectionState) -> {
if (stop.get()) {
return true; // abort and remove the watch
}
Slice slice = collectionState.getSlice(SHARD1_0);
if (slice != null && slice.getReplicas().size() > 1) {
// ensure that only one watcher invocation thread can kill!
if (killed.compareAndSet(false, true)) {
log.info("Monkey thread found 2 replicas for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1);
try {
Thread.sleep(1000 + random().nextInt(500));
cjetty.jetty.stop();
stop.set(true);
return true;
} catch (Exception e) {
log.error("Monkey unable to kill jetty at port {}", cjetty.jetty.getLocalPort(), e);
}
}
}
log.info("Monkey thread found only one replica for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
return false;
});
};
Thread monkeyThread = new Thread(monkey);
monkeyThread.start();
try {
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
splitShard.setShardName(SHARD1);
String asyncId = splitShard.processAsync(cloudClient);
RequestStatusState splitStatus = null;
try {
splitStatus = CollectionAdminRequest.requestStatus(asyncId).waitFor(cloudClient, 120);
} catch (Exception e) {
log.warn("Failed to get request status, maybe because the overseer node was shutdown by monkey", e);
}
// we don't care if the split failed because we are injecting faults and it is likely
// that the split has failed but in any case we want to assert that all docs that got
// indexed are available in SolrCloud and if the split succeeded then all replicas of the sub-shard
// must be consistent (i.e. have same numdocs)
if (log.isInfoEnabled()) {
log.info("Shard split request state is {}", splitStatus == null ? "unknown" : splitStatus.getKey());
}
stop.set(true);
monkeyThread.join();
Set<String> addFails = new HashSet<>();
Set<String> deleteFails = new HashSet<>();
for (StoppableIndexingThread indexer : indexers) {
addFails.addAll(indexer.getAddFails());
deleteFails.addAll(indexer.getDeleteFails());
}
CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1);
if (log.isInfoEnabled()) {
log.info("Starting shard1 leader jetty at port {}", cjetty.jetty.getLocalPort());
}
cjetty.jetty.start();
cloudClient.getZkStateReader().forceUpdateCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
if (log.isInfoEnabled()) {
log.info("Current collection state: {}", printClusterStateInfo(AbstractDistribZkTestBase.DEFAULT_COLLECTION));
}
// true if sub-shard states switch to 'active' eventually
AtomicBoolean areSubShardsActive = new AtomicBoolean(false);
if (splitStatus == RequestStatusState.COMPLETED) {
// all sub-shard replicas were created successfully so all cores must recover eventually
waitForRecoveriesToFinish(AbstractDistribZkTestBase.DEFAULT_COLLECTION, true);
// let's wait for the overseer to switch shard states
CountDownLatch latch = new CountDownLatch(1);
cloudClient.getZkStateReader().registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, (liveNodes, collectionState) -> {
Slice parent = collectionState.getSlice(SHARD1);
Slice slice10 = collectionState.getSlice(SHARD1_0);
Slice slice11 = collectionState.getSlice(SHARD1_1);
if (slice10 != null && slice11 != null &&
parent.getState() == Slice.State.INACTIVE &&
slice10.getState() == Slice.State.ACTIVE &&
slice11.getState() == Slice.State.ACTIVE) {
areSubShardsActive.set(true);
latch.countDown();
return true; // removes the watch
} else if (slice10 != null && slice11 != null &&
parent.getState() == Slice.State.ACTIVE &&
slice10.getState() == Slice.State.RECOVERY_FAILED &&
slice11.getState() == Slice.State.RECOVERY_FAILED) {
areSubShardsActive.set(false);
latch.countDown();
return true;
}
return false;
});
latch.await(2, TimeUnit.MINUTES);
if (latch.getCount() != 0) {
// sanity check
fail("We think that split was successful but sub-shard states were not updated even after 2 minutes.");
}
}
cloudClient.commit(); // for visibility of results on sub-shards
checkShardConsistency(true, true, addFails, deleteFails);
long ctrlDocs = controlClient.query(new SolrQuery("*:*")).getResults().getNumFound();
// ensure we have added more than 0 docs
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertTrue("Found " + ctrlDocs + " control docs", cloudClientDocs > 0);
assertEquals("Found " + ctrlDocs + " control docs and " + cloudClientDocs + " cloud docs", ctrlDocs, cloudClientDocs);
// check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't
// handle new shards/replica so well.
if (areSubShardsActive.get()) {
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
DocCollection collection = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
int numReplicasChecked = assertConsistentReplicas(collection.getSlice(SHARD1_0));
assertEquals("We should have checked consistency for exactly 2 replicas of shard1_0", 2, numReplicasChecked);
numReplicasChecked = assertConsistentReplicas(collection.getSlice(SHARD1_1));
assertEquals("We should have checked consistency for exactly 2 replicas of shard1_1", 2, numReplicasChecked);
}
} finally {
stop.set(true);
monkeyThread.join();
}
}
@Test
public void testSplitLocking() throws Exception {
waitForThingsToLevelOut(15);
String collectionName = "testSplitLocking";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2);
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
create.process(cloudClient);
cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(1, 2));
waitForRecoveriesToFinish(collectionName, false);
TestInjection.splitLatch = new CountDownLatch(1); // simulate a long split operation
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/" + SHARD1 + "-splitting";
final AtomicReference<Exception> exc = new AtomicReference<>();
try {
Runnable r = () -> {
try {
trySplit(collectionName, null, SHARD1, 1);
} catch (Exception e) {
exc.set(e);
}
};
Thread t = new Thread(r);
t.start();
// wait for the split to start executing
TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
timeOut.sleep(500);
if (cloudClient.getZkStateReader().getZkClient().exists(path, true)) {
log.info("=== found lock node");
break;
}
}
assertFalse("timed out waiting for the lock znode to appear", timeOut.hasTimedOut());
assertNull("unexpected exception: " + exc.get(), exc.get());
log.info("=== trying second split");
try {
trySplit(collectionName, null, SHARD1, 1);
fail("expected to fail due to locking but succeeded");
} catch (Exception e) {
log.info("Expected failure: {}", e);
}
// make sure the lock still exists
assertTrue("lock znode expected but missing", cloudClient.getZkStateReader().getZkClient().exists(path, true));
// let the first split proceed
TestInjection.splitLatch.countDown();
timeOut = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
timeOut.sleep(500);
if (!cloudClient.getZkStateReader().getZkClient().exists(path, true)) {
break;
}
}
assertFalse("timed out waiting for the lock znode to disappear", timeOut.hasTimedOut());
} finally {
TestInjection.reset();
}
}
@Test
public void testSplitShardWithRule() throws Exception {
doSplitShardWithRule(SolrIndexSplitter.SplitMethod.REWRITE);
}
@Test
public void testSplitShardWithRuleLink() throws Exception {
doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK);
}
private void doSplitShardWithRule(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
waitForThingsToLevelOut(15);
if (usually()) {
log.info("Using legacyCloud=false for cluster");
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
.process(cloudClient);
}
log.info("Starting testSplitShardWithRule");
String collectionName = "shardSplitWithRule_" + splitMethod.toLower();
CollectionAdminRequest.Create createRequest = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2)
.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.setRule("shard:*,replica:<2,node:*");
CollectionAdminResponse response = createRequest.process(cloudClient);
assertEquals(0, response.getStatus());
try {
cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(1, 2));
} catch (TimeoutException e) {
new RuntimeException("Timeout waiting for 1shards and 2 replicas.", e);
}
CollectionAdminRequest.SplitShard splitShardRequest = CollectionAdminRequest.splitShard(collectionName)
.setShardName("shard1").setSplitMethod(splitMethod.toLower());
response = splitShardRequest.process(cloudClient);
assertEquals(String.valueOf(response.getErrorMessages()), 0, response.getStatus());
}
private void incompleteOrOverlappingCustomRangeTest() throws Exception {
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
final DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
Slice shard1 = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getSlice(SHARD1);
DocRouter.Range shard1Range = shard1.getRange() != null ? shard1.getRange() : router.fullRange();
List<DocRouter.Range> subRanges = new ArrayList<>();
List<DocRouter.Range> ranges = router.partitionRange(4, shard1Range);
// test with only one range
subRanges.add(ranges.get(0));
try {
splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
fail("Shard splitting with just one custom hash range should not succeed");
} catch (HttpSolrClient.RemoteSolrException e) {
log.info("Expected exception:", e);
}
subRanges.clear();
// test with ranges with a hole in between them
subRanges.add(ranges.get(3)); // order shouldn't matter
subRanges.add(ranges.get(0));
try {
splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
fail("Shard splitting with missing hashes in between given ranges should not succeed");
} catch (HttpSolrClient.RemoteSolrException e) {
log.info("Expected exception:", e);
}
subRanges.clear();
// test with overlapping ranges
subRanges.add(ranges.get(0));
subRanges.add(ranges.get(1));
subRanges.add(ranges.get(2));
subRanges.add(new DocRouter.Range(ranges.get(3).min - 15, ranges.get(3).max));
try {
splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
fail("Shard splitting with overlapping ranges should not succeed");
} catch (HttpSolrClient.RemoteSolrException e) {
log.info("Expected exception:", e);
}
subRanges.clear();
}
private void splitByUniqueKeyTest() throws Exception {
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
final DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
Slice shard1 = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getSlice(SHARD1);
DocRouter.Range shard1Range = shard1.getRange() != null ? shard1.getRange() : router.fullRange();
List<DocRouter.Range> subRanges = new ArrayList<>();
if (usually()) {
List<DocRouter.Range> ranges = router.partitionRange(4, shard1Range);
// 75% of range goes to shard1_0 and the rest to shard1_1
subRanges.add(new DocRouter.Range(ranges.get(0).min, ranges.get(2).max));
subRanges.add(ranges.get(3));
} else {
subRanges = router.partitionRange(2, shard1Range);
}
final Set<String> documentIds = ConcurrentHashMap.newKeySet(1024);
final List<DocRouter.Range> ranges = subRanges;
final int[] docCounts = new int[ranges.size()];
int numReplicas = shard1.getReplicas().size();
cloudClient.getZkStateReader().forceUpdateCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
clusterState = cloudClient.getZkStateReader().getClusterState();
if (log.isDebugEnabled()) {
log.debug("-- COLLECTION: {}", clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION));
}
del("*:*");
for (int id = 0; id <= 100; id++) {
String shardKey = "" + (char)('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
indexAndUpdateCount(router, ranges, docCounts, shardKey + "!" + String.valueOf(id), id, documentIds);
}
commit();
Thread indexThread = new Thread(() -> {
Random random = random();
int max = atLeast(random, 401);
int sleep = atLeast(random, 25);
log.info("SHARDSPLITTEST: Going to add {} number of docs at 1 doc per {} ms", max, sleep);
Set<String> deleted = new HashSet<>();
for (int id = 101; id < max; id++) {
try {
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds);
Thread.sleep(sleep);
if (usually(random)) {
String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101);
if (deleted.contains(delId)) continue;
try {
deleteAndUpdateCount(router, ranges, docCounts, delId);
deleted.add(delId);
documentIds.remove(String.valueOf(delId));
} catch (Exception e) {
log.error("Exception while deleting docs", e);
}
}
} catch (Exception e) {
log.error("Exception while adding doc id = {}", id, e);
// do not select this id for deletion ever
deleted.add(String.valueOf(id));
}
}
});
indexThread.start();
try {
for (int i = 0; i < 3; i++) {
try {
splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
log.info("Layout after split: \n");
printLayout();
break;
} catch (HttpSolrClient.RemoteSolrException e) {
if (e.code() != 500) {
throw e;
}
log.error("SPLITSHARD failed. {}", (i < 2 ? " Retring split" : ""), e);
if (i == 2) {
fail("SPLITSHARD was not successful even after three tries");
}
}
}
} finally {
try {
indexThread.join();
} catch (InterruptedException e) {
log.error("Indexing thread interrupted", e);
}
}
waitForRecoveriesToFinish(true);
checkDocCountsAndShardStates(docCounts, numReplicas, documentIds);
}
public void splitByRouteFieldTest() throws Exception {
log.info("Starting testSplitWithRouteField");
String collectionName = "routeFieldColl";
int numShards = 4;
int replicationFactor = 2;
int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrClient()
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
HashMap<String, List<Integer>> collectionInfos = new HashMap<>();
String shard_fld = "shard_s";
try (CloudSolrClient client = createCloudClient(null)) {
Map<String, Object> props = Utils.makeMap(
REPLICATION_FACTOR, replicationFactor,
MAX_SHARDS_PER_NODE, maxShardsPerNode,
OverseerCollectionMessageHandler.NUM_SLICES, numShards,
"router.field", shard_fld);
if (SolrCloudTestCase.USE_PER_REPLICA_STATE) {
props.put(DocCollection.PER_REPLICA_STATE, Boolean.TRUE);
}
createCollection(collectionInfos, collectionName, props, client);
}
List<Integer> list = collectionInfos.get(collectionName);
checkForCollection(collectionName, list, null);
waitForRecoveriesToFinish(false);
String url = getUrlFromZk(getCommonCloudSolrClient().getZkStateReader().getClusterState(), collectionName);
try (HttpSolrClient collectionClient = getHttpSolrClient(url)) {
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
final DocRouter router = clusterState.getCollection(collectionName).getRouter();
Slice shard1 = clusterState.getCollection(collectionName).getSlice(SHARD1);
DocRouter.Range shard1Range = shard1.getRange() != null ? shard1.getRange() : router.fullRange();
final List<DocRouter.Range> ranges = router.partitionRange(2, shard1Range);
final int[] docCounts = new int[ranges.size()];
for (int i = 100; i <= 200; i++) {
String shardKey = "" + (char) ('a' + (i % 26)); // See comment in ShardRoutingTest for hash distribution
collectionClient.add(getDoc(id, i, "n_ti", i, shard_fld, shardKey));
int idx = getHashRangeIdx(router, ranges, shardKey);
if (idx != -1) {
docCounts[idx]++;
}
}
for (int i = 0; i < docCounts.length; i++) {
int docCount = docCounts[i];
log.info("Shard shard1_{} docCount = {}", i, docCount);
}
collectionClient.commit();
trySplit(collectionName, null, SHARD1, 3);
waitForRecoveriesToFinish(collectionName, false);
assertEquals(docCounts[0], collectionClient.query(new SolrQuery("*:*").setParam("shards", "shard1_0")).getResults().getNumFound());
assertEquals(docCounts[1], collectionClient.query(new SolrQuery("*:*").setParam("shards", "shard1_1")).getResults().getNumFound());
}
}
private void splitByRouteKeyTest() throws Exception {
log.info("Starting splitByRouteKeyTest");
String collectionName = "splitByRouteKeyTest";
int numShards = 4;
int replicationFactor = 2;
int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrClient()
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
HashMap<String, List<Integer>> collectionInfos = new HashMap<>();
try (CloudSolrClient client = createCloudClient(null)) {
Map<String, Object> props = Utils.makeMap(
REPLICATION_FACTOR, replicationFactor,
MAX_SHARDS_PER_NODE, maxShardsPerNode,
OverseerCollectionMessageHandler.NUM_SLICES, numShards);
if (SolrCloudTestCase.USE_PER_REPLICA_STATE) {
props.put(DocCollection.PER_REPLICA_STATE, Boolean.TRUE);
}
createCollection(collectionInfos, collectionName,props,client);
}
List<Integer> list = collectionInfos.get(collectionName);
checkForCollection(collectionName, list, null);
waitForRecoveriesToFinish(false);
String url = getUrlFromZk(getCommonCloudSolrClient().getZkStateReader().getClusterState(), collectionName);
try (HttpSolrClient collectionClient = getHttpSolrClient(url)) {
String splitKey = "b!";
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
final DocRouter router = clusterState.getCollection(collectionName).getRouter();
Slice shard1 = clusterState.getCollection(collectionName).getSlice(SHARD1);
DocRouter.Range shard1Range = shard1.getRange() != null ? shard1.getRange() : router.fullRange();
final List<DocRouter.Range> ranges = ((CompositeIdRouter) router).partitionRangeByKey(splitKey, shard1Range);
final int[] docCounts = new int[ranges.size()];
int uniqIdentifier = (1 << 12);
int splitKeyDocCount = 0;
for (int i = 100; i <= 200; i++) {
String shardKey = "" + (char) ('a' + (i % 26)); // See comment in ShardRoutingTest for hash distribution
String idStr = shardKey + "!" + i;
collectionClient.add(getDoc(id, idStr, "n_ti", (shardKey + "!").equals(splitKey) ? uniqIdentifier : i));
int idx = getHashRangeIdx(router, ranges, idStr);
if (idx != -1) {
docCounts[idx]++;
}
if (splitKey.equals(shardKey + "!"))
splitKeyDocCount++;
}
for (int i = 0; i < docCounts.length; i++) {
int docCount = docCounts[i];
log.info("Shard shard1_{} docCount = {}", i, docCount);
}
log.info("Route key doc count = {}", splitKeyDocCount);
collectionClient.commit();
trySplit(collectionName, splitKey, null, 3);
waitForRecoveriesToFinish(collectionName, false);
SolrQuery solrQuery = new SolrQuery("*:*");
assertEquals("DocCount on shard1_0 does not match", docCounts[0], collectionClient.query(solrQuery.setParam("shards", "shard1_0")).getResults().getNumFound());
assertEquals("DocCount on shard1_1 does not match", docCounts[1], collectionClient.query(solrQuery.setParam("shards", "shard1_1")).getResults().getNumFound());
assertEquals("DocCount on shard1_2 does not match", docCounts[2], collectionClient.query(solrQuery.setParam("shards", "shard1_2")).getResults().getNumFound());
solrQuery = new SolrQuery("n_ti:" + uniqIdentifier);
assertEquals("shard1_0 must have 0 docs for route key: " + splitKey, 0, collectionClient.query(solrQuery.setParam("shards", "shard1_0")).getResults().getNumFound());
assertEquals("Wrong number of docs on shard1_1 for route key: " + splitKey, splitKeyDocCount, collectionClient.query(solrQuery.setParam("shards", "shard1_1")).getResults().getNumFound());
assertEquals("shard1_2 must have 0 docs for route key: " + splitKey, 0, collectionClient.query(solrQuery.setParam("shards", "shard1_2")).getResults().getNumFound());
}
}
private void trySplit(String collectionName, String splitKey, String shardId, int maxTries) throws SolrServerException, IOException {
for (int i = 0; i < maxTries; i++) {
try {
splitShard(collectionName, shardId, null, splitKey, false);
break;
} catch (HttpSolrClient.RemoteSolrException e) {
if (e.code() != 500) {
throw e;
}
log.error("SPLITSHARD failed. {}", (i < maxTries - 1 ? " Retring split" : ""), e);
if (i == 2) {
fail("SPLITSHARD was not successful even after three tries");
}
}
}
}
protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas, Set<String> documentIds) throws Exception {
ClusterState clusterState = null;
Slice slice1_0 = null, slice1_1 = null;
int i = 0;
for (i = 0; i < 10; i++) {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
clusterState = zkStateReader.getClusterState();
slice1_0 = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getSlice("shard1_0");
slice1_1 = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getSlice("shard1_1");
if (slice1_0.getState() == Slice.State.ACTIVE && slice1_1.getState() == Slice.State.ACTIVE) {
break;
}
Thread.sleep(500);
}
log.info("ShardSplitTest waited for {} ms for shard state to be set to active", i * 500);
assertNotNull("Cluster state does not contain shard1_0", slice1_0);
assertNotNull("Cluster state does not contain shard1_0", slice1_1);
assertSame("shard1_0 is not active", Slice.State.ACTIVE, slice1_0.getState());
assertSame("shard1_1 is not active", Slice.State.ACTIVE, slice1_1.getState());
assertEquals("Wrong number of replicas created for shard1_0", numReplicas, slice1_0.getReplicas().size());
assertEquals("Wrong number of replicas created for shard1_1", numReplicas, slice1_1.getReplicas().size());
commit();
// can't use checkShardConsistency because it insists on jettys and clients for each shard
checkSubShardConsistency(SHARD1_0);
checkSubShardConsistency(SHARD1_1);
SolrQuery query = new SolrQuery("*:*").setRows(1000).setFields("id", "_version_");
query.set("distrib", false);
ZkCoreNodeProps shard1_0 = getLeaderUrlFromZk(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1_0);
QueryResponse response;
try (HttpSolrClient shard1_0Client = getHttpSolrClient(shard1_0.getCoreUrl())) {
response = shard1_0Client.query(query);
}
long shard10Count = response.getResults().getNumFound();
ZkCoreNodeProps shard1_1 = getLeaderUrlFromZk(
AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1_1);
QueryResponse response2;
try (HttpSolrClient shard1_1Client = getHttpSolrClient(shard1_1.getCoreUrl())) {
response2 = shard1_1Client.query(query);
}
long shard11Count = response2.getResults().getNumFound();
logDebugHelp(docCounts, response, shard10Count, response2, shard11Count, documentIds);
assertEquals("Wrong doc count on shard1_0. See SOLR-5309", docCounts[0], shard10Count);
assertEquals("Wrong doc count on shard1_1. See SOLR-5309", docCounts[1], shard11Count);
}
protected void checkSubShardConsistency(String shard) throws SolrServerException, IOException {
SolrQuery query = new SolrQuery("*:*").setRows(1000).setFields("id", "_version_");
query.set("distrib", false);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
Slice slice = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getSlice(shard);
long[] numFound = new long[slice.getReplicasMap().size()];
int c = 0;
for (Replica replica : slice.getReplicas()) {
String coreUrl = new ZkCoreNodeProps(replica).getCoreUrl();
QueryResponse response;
try (HttpSolrClient client = getHttpSolrClient(coreUrl)) {
response = client.query(query);
}
numFound[c++] = response.getResults().getNumFound();
if (log.isInfoEnabled()) {
log.info("Shard: {} Replica: {} has {} docs", shard, coreUrl, String.valueOf(response.getResults().getNumFound()));
}
assertTrue("Shard: " + shard + " Replica: " + coreUrl + " has 0 docs", response.getResults().getNumFound() > 0);
}
for (int i = 0; i < slice.getReplicasMap().size(); i++) {
assertEquals(shard + " is not consistent", numFound[0], numFound[i]);
}
}
protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges, String splitKey, boolean offline) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.SPLITSHARD.toString());
params.set("timing", "true");
params.set("offline", String.valueOf(offline));
params.set("collection", collection);
if (shardId != null) {
params.set("shard", shardId);
}
if (subRanges != null) {
StringBuilder ranges = new StringBuilder();
for (int i = 0; i < subRanges.size(); i++) {
DocRouter.Range subRange = subRanges.get(i);
ranges.append(subRange.toString());
if (i < subRanges.size() - 1)
ranges.append(",");
}
params.set("ranges", ranges.toString());
}
if (splitKey != null) {
params.set("split.key", splitKey);
}
@SuppressWarnings({"rawtypes"})
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
String baseUrl = ((HttpSolrClient) shardToJetty.get(SHARD1).get(0).client.getSolrClient()).getBaseURL();
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
try (HttpSolrClient baseServer = getHttpSolrClient(baseUrl, 30000, 60000 * 5)) {
NamedList<Object> rsp = baseServer.request(request);
if (log.isInfoEnabled()) {
log.info("Shard split response: {}", Utils.toJSONString(rsp));
}
}
}
protected void indexAndUpdateCount(DocRouter router, List<DocRouter.Range> ranges, int[] docCounts, String id, int n, Set<String> documentIds) throws Exception {
index("id", id, "n_ti", n);
int idx = getHashRangeIdx(router, ranges, id);
if (idx != -1) {
docCounts[idx]++;
documentIds.add(String.valueOf(id));
}
}
protected void deleteAndUpdateCount(DocRouter router, List<DocRouter.Range> ranges, int[] docCounts, String id) throws Exception {
controlClient.deleteById(id);
cloudClient.deleteById(id);
int idx = getHashRangeIdx(router, ranges, id);
if (idx != -1) {
docCounts[idx]--;
}
}
public static int getHashRangeIdx(DocRouter router, List<DocRouter.Range> ranges, String id) {
int hash = 0;
if (router instanceof HashBasedRouter) {
HashBasedRouter hashBasedRouter = (HashBasedRouter) router;
hash = hashBasedRouter.sliceHash(id, null, null,null);
}
for (int i = 0; i < ranges.size(); i++) {
DocRouter.Range range = ranges.get(i);
if (range.includes(hash))
return i;
}
return -1;
}
protected void logDebugHelp(int[] docCounts, QueryResponse response, long shard10Count, QueryResponse response2, long shard11Count, Set<String> documentIds) {
for (int i = 0; i < docCounts.length; i++) {
int docCount = docCounts[i];
log.info("Expected docCount for shard1_{} = {}", i, docCount);
}
Set<String> found = new HashSet<>(1024);
log.info("Actual docCount for shard1_0 = {}", shard10Count);
log.info("Actual docCount for shard1_1 = {}", shard11Count);
Map<String, String> idVsVersion = new HashMap<>();
Map<String, SolrDocument> shard10Docs = new HashMap<>();
Map<String, SolrDocument> shard11Docs = new HashMap<>();
for (int i = 0; i < response.getResults().size(); i++) {
SolrDocument document = response.getResults().get(i);
idVsVersion.put(document.getFieldValue("id").toString(), document.getFieldValue("_version_").toString());
SolrDocument old = shard10Docs.put(document.getFieldValue("id").toString(), document);
if (old != null) {
log.error("EXTRA: ID: {} on shard1_0. Old version: {} new version: {}", document.getFieldValue("id"), old.getFieldValue("_version_"), document.getFieldValue("_version_"));
}
found.add(document.getFieldValue("id").toString());
}
for (int i = 0; i < response2.getResults().size(); i++) {
SolrDocument document = response2.getResults().get(i);
String value = document.getFieldValue("id").toString();
String version = idVsVersion.get(value);
if (version != null) {
log.error("DUPLICATE: ID: {}, shard1_0Version {} shard1_1Version: {}", value, version, document.getFieldValue("_version_"));
}
SolrDocument old = shard11Docs.put(document.getFieldValue("id").toString(), document);
if (old != null) {
log.error("EXTRA: ID: {} on shard1_1. Old version: {} new version: {}"
,document.getFieldValue("id"), old.getFieldValue("_version_"), document.getFieldValue("_version_"));
}
found.add(document.getFieldValue("id").toString());
}
if (found.size() < documentIds.size()) {
documentIds.removeAll(found);
log.error("MISSING: ID: {}", documentIds);
} else if (found.size() > documentIds.size()) {
found.removeAll(documentIds);
log.error("EXTRA: ID: {}", found);
}
}
}