blob: 853b3e3d5f513fdf21f85db2ed95632f7efa02d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.util.LogLevel;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
/**
* This test would be faster if we simulated the zk state instead.
*/
@Slow
@LogLevel("org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
public class CloudSolrClientTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION = "collection1";
private static final String COLLECTION2 = "2nd_collection";
private static final String id = "id";
private static final int TIMEOUT = 30;
private static final int NODE_COUNT = 3;
private static CloudSolrClient httpBasedCloudSolrClient = null;
@Before
public void setupCluster() throws Exception {
System.setProperty("metricsEnabled", "true");
configureCluster(NODE_COUNT)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
final List<String> solrUrls = new ArrayList<>();
solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
httpBasedCloudSolrClient = new CloudSolrClient.Builder(solrUrls).build();
}
@After
public void tearDown() throws Exception {
if (httpBasedCloudSolrClient != null) {
try {
httpBasedCloudSolrClient.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
shutdownCluster();
super.tearDown();
}
@AfterClass
public static void cleanUpAfterClass() throws Exception {
httpBasedCloudSolrClient = null;
}
/**
* Randomly return the cluster's ZK based CSC, or HttpClusterProvider based CSC.
*/
private CloudSolrClient getRandomClient() {
return random().nextBoolean() ? cluster.getSolrClient() : httpBasedCloudSolrClient;
}
@Test
public void testParallelUpdateQTime() throws Exception {
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION, 2, 2);
UpdateRequest req = new UpdateRequest();
for (int i = 0; i < 10; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", String.valueOf(TestUtil.nextInt(random(), 1000, 1100)));
req.add(doc);
}
UpdateResponse response = req.process(getRandomClient(), COLLECTION);
// See SOLR-6547, we just need to ensure that no exception is thrown here
assertTrue(response.getQTime() >= 0);
}
@Test
public void testOverwriteOption() throws Exception {
CollectionAdminRequest.createCollection("overwrite", "conf", 1, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
cluster.waitForActiveCollection("overwrite", 1, 1);
new UpdateRequest()
.add("id", "0", "a_t", "hello1")
.add("id", "0", "a_t", "hello2")
.commit(cluster.getSolrClient(), "overwrite");
QueryResponse resp = cluster.getSolrClient().query("overwrite", new SolrQuery("*:*"));
assertEquals("There should be one document because overwrite=true", 1, resp.getResults().getNumFound());
new UpdateRequest()
.add(new SolrInputDocument(id, "1", "a_t", "hello1"), /* overwrite = */ false)
.add(new SolrInputDocument(id, "1", "a_t", "hello2"), false)
.commit(cluster.getSolrClient(), "overwrite");
resp = getRandomClient().query("overwrite", new SolrQuery("*:*"));
assertEquals("There should be 3 documents because there should be two id=1 docs due to overwrite=false", 3, resp.getResults().getNumFound());
}
@Test
public void testAliasHandling() throws Exception {
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION, 2, 2);
CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION2, 2, 2);
CloudSolrClient client = getRandomClient();
SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
client.add(COLLECTION, doc);
client.commit(COLLECTION);
CollectionAdminRequest.createAlias("testalias", COLLECTION).process(cluster.getSolrClient());
SolrInputDocument doc2 = new SolrInputDocument("id", "2", "title_s", "my doc too");
client.add(COLLECTION2, doc2);
client.commit(COLLECTION2);
CollectionAdminRequest.createAlias("testalias2", COLLECTION2).process(cluster.getSolrClient());
CollectionAdminRequest.createAlias("testaliascombined", COLLECTION + "," + COLLECTION2).process(cluster.getSolrClient());
// ensure that the aliases have been registered
Map<String, String> aliases = new CollectionAdminRequest.ListAliases().process(cluster.getSolrClient()).getAliases();
assertEquals(COLLECTION, aliases.get("testalias"));
assertEquals(COLLECTION2, aliases.get("testalias2"));
assertEquals(COLLECTION + "," + COLLECTION2, aliases.get("testaliascombined"));
assertEquals(1, client.query(COLLECTION, params("q", "*:*")).getResults().getNumFound());
assertEquals(1, client.query("testalias", params("q", "*:*")).getResults().getNumFound());
assertEquals(1, client.query(COLLECTION2, params("q", "*:*")).getResults().getNumFound());
assertEquals(1, client.query("testalias2", params("q", "*:*")).getResults().getNumFound());
assertEquals(2, client.query("testaliascombined", params("q", "*:*")).getResults().getNumFound());
ModifiableSolrParams paramsWithBothCollections = params("q", "*:*", "collection", COLLECTION + "," + COLLECTION2);
assertEquals(2, client.query(null, paramsWithBothCollections).getResults().getNumFound());
ModifiableSolrParams paramsWithBothAliases = params("q", "*:*", "collection", "testalias,testalias2");
assertEquals(2, client.query(null, paramsWithBothAliases).getResults().getNumFound());
ModifiableSolrParams paramsWithCombinedAlias = params("q", "*:*", "collection", "testaliascombined");
assertEquals(2, client.query(null, paramsWithCombinedAlias).getResults().getNumFound());
ModifiableSolrParams paramsWithMixedCollectionAndAlias = params("q", "*:*", "collection", "testalias," + COLLECTION2);
assertEquals(2, client.query(null, paramsWithMixedCollectionAndAlias).getResults().getNumFound());
}
@Test
public void testRouting() throws Exception {
CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection("routing_collection", 2, 2);
AbstractUpdateRequest request = new UpdateRequest()
.add(id, "0", "a_t", "hello1")
.add(id, "2", "a_t", "hello2")
.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
// Test single threaded routed updates for UpdateRequest
NamedList<Object> response = getRandomClient().request(request, "routing_collection");
if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
checkSingleServer(response);
}
CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
Map<String, LBHttpSolrClient.Req> routes = rr.getRoutes();
Iterator<Map.Entry<String, LBHttpSolrClient.Req>> it = routes.entrySet()
.iterator();
while (it.hasNext()) {
Map.Entry<String, LBHttpSolrClient.Req> entry = it.next();
String url = entry.getKey();
UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
.getRequest();
SolrInputDocument doc = updateRequest.getDocuments().get(0);
String id = doc.getField("id").getValue().toString();
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("q", "id:" + id);
params.add("distrib", "false");
QueryRequest queryRequest = new QueryRequest(params);
try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
QueryResponse queryResponse = queryRequest.process(solrClient);
SolrDocumentList docList = queryResponse.getResults();
assertTrue(docList.getNumFound() == 1);
}
}
// Test the deleteById routing for UpdateRequest
final UpdateResponse uResponse = new UpdateRequest()
.deleteById("0")
.deleteById("2")
.commit(cluster.getSolrClient(), "routing_collection");
if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
checkSingleServer(uResponse.getResponse());
}
QueryResponse qResponse = getRandomClient().query("routing_collection", new SolrQuery("*:*"));
SolrDocumentList docs = qResponse.getResults();
assertEquals(0, docs.getNumFound());
// Test Multi-Threaded routed updates for UpdateRequest
try (CloudSolrClient threadedClient = new CloudSolrClientBuilder
(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
.withParallelUpdates(true)
.build()) {
threadedClient.setDefaultCollection("routing_collection");
response = threadedClient.request(request);
if (threadedClient.isDirectUpdatesToLeadersOnly()) {
checkSingleServer(response);
}
rr = (CloudSolrClient.RouteResponse) response;
routes = rr.getRoutes();
it = routes.entrySet()
.iterator();
while (it.hasNext()) {
Map.Entry<String, LBHttpSolrClient.Req> entry = it.next();
String url = entry.getKey();
UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
.getRequest();
SolrInputDocument doc = updateRequest.getDocuments().get(0);
String id = doc.getField("id").getValue().toString();
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("q", "id:" + id);
params.add("distrib", "false");
QueryRequest queryRequest = new QueryRequest(params);
try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
QueryResponse queryResponse = queryRequest.process(solrClient);
SolrDocumentList docList = queryResponse.getResults();
assertTrue(docList.getNumFound() == 1);
}
}
}
// Test that queries with _route_ params are routed by the client
// Track request counts on each node before query calls
ClusterState clusterState = cluster.getSolrClient().getZkStateReader().getClusterState();
DocCollection col = clusterState.getCollection("routing_collection");
Map<String, Long> requestCountsMap = Maps.newHashMap();
for (Slice slice : col.getSlices()) {
for (Replica replica : slice.getReplicas()) {
String baseURL = replica.getBaseUrl();
requestCountsMap.put(baseURL, getNumRequests(baseURL, "routing_collection"));
}
}
// Collect the base URLs of the replicas of shard that's expected to be hit
DocRouter router = col.getRouter();
Collection<Slice> expectedSlices = router.getSearchSlicesSingle("0", null, col);
Set<String> expectedBaseURLs = Sets.newHashSet();
for (Slice expectedSlice : expectedSlices) {
for (Replica replica : expectedSlice.getReplicas()) {
expectedBaseURLs.add(replica.getBaseUrl());
}
}
assertTrue("expected urls is not fewer than all urls! expected=" + expectedBaseURLs
+ "; all=" + requestCountsMap.keySet(),
expectedBaseURLs.size() < requestCountsMap.size());
// Calculate a number of shard keys that route to the same shard.
int n;
if (TEST_NIGHTLY) {
n = random().nextInt(999) + 2;
} else {
n = random().nextInt(9) + 2;
}
List<String> sameShardRoutes = Lists.newArrayList();
sameShardRoutes.add("0");
for (int i = 1; i < n; i++) {
String shardKey = Integer.toString(i);
Collection<Slice> slices = router.getSearchSlicesSingle(shardKey, null, col);
log.info("Expected Slices {}", slices);
if (expectedSlices.equals(slices)) {
sameShardRoutes.add(shardKey);
}
}
assertTrue(sameShardRoutes.size() > 1);
// Do N queries with _route_ parameter to the same shard
for (int i = 0; i < n; i++) {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(CommonParams.Q, "*:*");
solrParams.set(ShardParams._ROUTE_, sameShardRoutes.get(random().nextInt(sameShardRoutes.size())));
if (log.isInfoEnabled()) {
log.info("output: {}", getRandomClient().query("routing_collection", solrParams));
}
}
// Request counts increase from expected nodes should aggregate to 1000, while there should be
// no increase in unexpected nodes.
int increaseFromExpectedUrls = 0;
int increaseFromUnexpectedUrls = 0;
Map<String, Long> numRequestsToUnexpectedUrls = Maps.newHashMap();
for (Slice slice : col.getSlices()) {
for (Replica replica : slice.getReplicas()) {
String baseURL = replica.getBaseUrl();
Long prevNumRequests = requestCountsMap.get(baseURL);
Long curNumRequests = getNumRequests(baseURL, "routing_collection");
long delta = curNumRequests - prevNumRequests;
if (expectedBaseURLs.contains(baseURL)) {
increaseFromExpectedUrls += delta;
} else {
increaseFromUnexpectedUrls += delta;
numRequestsToUnexpectedUrls.put(baseURL, delta);
}
}
}
assertEquals("Unexpected number of requests to expected URLs", n, increaseFromExpectedUrls);
assertEquals("Unexpected number of requests to unexpected URLs: " + numRequestsToUnexpectedUrls,
0, increaseFromUnexpectedUrls);
}
/**
* Tests if the specification of 'preferLocalShards' in the query-params
* limits the distributed query to locally hosted shards only
*/
@Test
// commented 4-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
public void preferLocalShardsTest() throws Exception {
String collectionName = "localShardsTestColl";
int liveNodes = cluster.getJettySolrRunners().size();
// For preferLocalShards to succeed in a test, every shard should have
// all its cores on the same node.
// Hence the below configuration for our collection
CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, liveNodes)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.setMaxShardsPerNode(liveNodes * liveNodes)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * liveNodes);
// Add some new documents
new UpdateRequest()
.add(id, "0", "a_t", "hello1")
.add(id, "2", "a_t", "hello2")
.add(id, "3", "a_t", "hello2")
.commit(getRandomClient(), collectionName);
// Run the actual test for 'preferLocalShards'
queryWithShardsPreferenceRules(getRandomClient(), false, collectionName);
queryWithShardsPreferenceRules(getRandomClient(), true, collectionName);
}
@SuppressWarnings("deprecation")
private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient,
boolean useShardsPreference,
String collectionName)
throws Exception {
SolrQuery qRequest = new SolrQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams();
if (useShardsPreference) {
qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
} else {
qParams.add(CommonParams.PREFER_LOCAL_SHARDS, "true");
}
qParams.add(ShardParams.SHARDS_INFO, "true");
qRequest.add(qParams);
// CloudSolrClient sends the request to some node.
// And since all the nodes are hosting cores from all shards, the
// distributed query formed by this node will select cores from the
// local shards only
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
assertNotNull("Unable to obtain " + ShardParams.SHARDS_INFO, shardsInfo);
// Iterate over shards-info and check what cores responded
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>) shardsInfo;
@SuppressWarnings({"unchecked"})
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
List<String> shardAddresses = new ArrayList<String>();
while (itr.hasNext()) {
Map.Entry<String, ?> e = itr.next();
assertTrue("Did not find map-type value in " + ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
String shardAddress = (String) ((Map) e.getValue()).get("shardAddress");
assertNotNull(ShardParams.SHARDS_INFO + " did not return 'shardAddress' parameter", shardAddress);
shardAddresses.add(shardAddress);
}
if (log.isInfoEnabled()) {
log.info("Shards giving the response: {}", Arrays.toString(shardAddresses.toArray()));
}
// Make sure the distributed queries were directed to a single node only
Set<Integer> ports = new HashSet<Integer>();
for (String shardAddr : shardAddresses) {
URL url = new URL(shardAddr);
ports.add(url.getPort());
}
// This assertion would hold true as long as every shard has a core on each node
assertTrue("Response was not received from shards on a single node",
shardAddresses.size() > 1 && ports.size() == 1);
}
/**
* Tests if the 'shards.preference' parameter works with single-sharded collections.
*/
@Test
public void singleShardedPreferenceRules() throws Exception {
String collectionName = "singleShardPreferenceTestColl";
int liveNodes = cluster.getJettySolrRunners().size();
// For testing replica.type, we want to have all replica types available for the collection
CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes / 3, liveNodes / 3, liveNodes / 3)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.setMaxShardsPerNode(liveNodes)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
cluster.waitForActiveCollection(collectionName, 1, liveNodes);
// Add some new documents
new UpdateRequest()
.add(id, "0", "a_t", "hello1")
.add(id, "2", "a_t", "hello2")
.add(id, "3", "a_t", "hello2")
.commit(getRandomClient(), collectionName);
// Run the actual test for 'queryReplicaType'
queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName);
queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName);
queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName);
}
private void queryReplicaType(CloudSolrClient cloudClient,
Replica.Type typeToQuery,
String collectionName)
throws Exception {
SolrQuery qRequest = new SolrQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams();
qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString());
qParams.add(ShardParams.SHARDS_INFO, "true");
qRequest.add(qParams);
Map<String, String> replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName));
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
assertNotNull("Unable to obtain " + ShardParams.SHARDS_INFO, shardsInfo);
// Iterate over shards-info and check what cores responded
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>) shardsInfo;
@SuppressWarnings({"unchecked"})
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
List<String> shardAddresses = new ArrayList<String>();
while (itr.hasNext()) {
Map.Entry<String, ?> e = itr.next();
assertTrue("Did not find map-type value in " + ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
String shardAddress = (String) ((Map) e.getValue()).get("shardAddress");
if (shardAddress.endsWith("/")) {
shardAddress = shardAddress.substring(0, shardAddress.length() - 1);
}
assertNotNull(ShardParams.SHARDS_INFO + " did not return 'shardAddress' parameter", shardAddress);
shardAddresses.add(shardAddress);
}
assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size());
assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT));
}
private Long getNumRequests(String baseUrl, String collectionName) throws
SolrServerException, IOException {
return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
}
private Long getNumRequests(String baseUrl, String collectionName, String category, String key, String scope, boolean returnNumErrors) throws
SolrServerException, IOException {
NamedList<Object> resp;
try (HttpSolrClient client = getHttpSolrClient(baseUrl + "/" + collectionName, 15000, 60000)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", "/admin/mbeans");
params.set("stats", "true");
params.set("key", key);
params.set("cat", category);
// use generic request to avoid extra processing of queries
QueryRequest req = new QueryRequest(params);
resp = client.request(req);
}
String name;
if (returnNumErrors) {
name = category + "." + (scope != null ? scope : key) + ".errors";
} else {
name = category + "." + (scope != null ? scope : key) + ".requests";
}
@SuppressWarnings({"unchecked"})
Map<String, Object> map = (Map<String, Object>) resp.findRecursive("solr-mbeans", category, key, "stats");
if (map == null) {
return null;
}
if (scope != null) { // admin handler uses a meter instead of counter here
return (Long) map.get(name + ".count");
} else {
return (Long) map.get(name);
}
}
@Test
public void testNonRetryableRequests() throws Exception {
try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
// important to have one replica on each node
RequestStatusState state = CollectionAdminRequest.createCollection("foo", "conf", 1, NODE_COUNT).processAndWait(client, 60);
if (state == RequestStatusState.COMPLETED) {
cluster.waitForActiveCollection("foo", 1, NODE_COUNT);
client.setDefaultCollection("foo");
Map<String, String> adminPathToMbean = new HashMap<>(CommonParams.ADMIN_PATHS.size());
adminPathToMbean.put(CommonParams.COLLECTIONS_HANDLER_PATH, CollectionsHandler.class.getName());
adminPathToMbean.put(CommonParams.CORES_HANDLER_PATH, CoreAdminHandler.class.getName());
adminPathToMbean.put(CommonParams.CONFIGSETS_HANDLER_PATH, ConfigSetsHandler.class.getName());
// we do not add the authc/authz handlers because they do not currently expose any mbeans
for (String adminPath : adminPathToMbean.keySet()) {
long errorsBefore = 0;
for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
Long numRequests = getNumRequests(runner.getBaseUrl().toString(), "foo", "ADMIN", adminPathToMbean.get(adminPath), adminPath, true);
errorsBefore += numRequests;
if (log.isInfoEnabled()) {
log.info("Found {} requests to {} on {}", numRequests, adminPath, runner.getBaseUrl());
}
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", adminPath);
params.set("action", "foobar"); // this should cause an error
QueryRequest req = new QueryRequest(params);
try {
NamedList<Object> resp = client.request(req);
fail("call to foo for admin path " + adminPath + " should have failed");
} catch (Exception e) {
// expected
}
long errorsAfter = 0;
for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
Long numRequests = getNumRequests(runner.getBaseUrl().toString(), "foo", "ADMIN", adminPathToMbean.get(adminPath), adminPath, true);
errorsAfter += numRequests;
if (log.isInfoEnabled()) {
log.info("Found {} requests to {} on {}", numRequests, adminPath, runner.getBaseUrl());
}
}
assertEquals(errorsBefore + 1, errorsAfter);
}
} else {
fail("Collection could not be created within 60 seconds");
}
}
}
@Test
public void checkCollectionParameters() throws Exception {
try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
String async1 = CollectionAdminRequest.createCollection("multicollection1", "conf", 2, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.processAsync(client);
String async2 = CollectionAdminRequest.createCollection("multicollection2", "conf", 2, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.processAsync(client);
CollectionAdminRequest.waitForAsyncRequest(async1, client, TIMEOUT);
CollectionAdminRequest.waitForAsyncRequest(async2, client, TIMEOUT);
cluster.waitForActiveCollection("multicollection1", 2, 2);
cluster.waitForActiveCollection("multicollection2", 2, 2);
client.setDefaultCollection("multicollection1");
List<SolrInputDocument> docs = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, Integer.toString(i));
doc.addField("a_t", "hello");
docs.add(doc);
}
client.add(docs); // default - will add them to multicollection1
client.commit();
ModifiableSolrParams queryParams = new ModifiableSolrParams();
queryParams.add("q", "*:*");
assertEquals(3, client.query(queryParams).getResults().size());
assertEquals(0, client.query("multicollection2", queryParams).getResults().size());
SolrQuery query = new SolrQuery("*:*");
query.set("collection", "multicollection2");
assertEquals(0, client.query(query).getResults().size());
client.add("multicollection2", docs);
client.commit("multicollection2");
assertEquals(3, client.query("multicollection2", queryParams).getResults().size());
}
}
@Test
public void stateVersionParamTest() throws Exception {
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION, 2, 2);
DocCollection coll = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
SolrQuery q = new SolrQuery().setQuery("*:*");
HttpSolrClient.RemoteSolrException sse = null;
final String url = r.getBaseUrl() + "/" + COLLECTION;
try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
if (log.isInfoEnabled()) {
log.info("should work query, result {}", solrClient.query(q));
}
//no problem
q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + coll.getZNodeVersion());
if (log.isInfoEnabled()) {
log.info("2nd query , result {}", solrClient.query(q));
}
//no error yet good
q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error
QueryResponse rsp = solrClient.query(q);
@SuppressWarnings({"rawtypes"})
Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size() - 1);
assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
assertNotNull(m.get(COLLECTION));
}
//now send the request to another node that does not serve the collection
Set<String> allNodesOfColl = new HashSet<>();
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
allNodesOfColl.add(replica.getBaseUrl());
}
}
String theNode = null;
Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
for (String s : liveNodes) {
String n = cluster.getSolrClient().getZkStateReader().getBaseUrlForNodeName(s);
if (!allNodesOfColl.contains(n)) {
theNode = n;
break;
}
}
log.info("the node which does not serve this collection{} ", theNode);
assertNotNull(theNode);
final String solrClientUrl = theNode + "/" + COLLECTION;
try (SolrClient solrClient = getHttpSolrClient(solrClientUrl)) {
q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion() - 1));
try {
QueryResponse rsp = solrClient.query(q);
log.info("error was expected");
} catch (HttpSolrClient.RemoteSolrException e) {
sse = e;
}
assertNotNull(sse);
assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code());
}
}
@Test
public void testShutdown() throws IOException {
try (CloudSolrClient client = getCloudSolrClient(DEAD_HOST_1)) {
client.setZkConnectTimeout(100);
SolrException ex = expectThrows(SolrException.class, client::connect);
assertTrue(ex.getCause() instanceof TimeoutException);
}
}
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void testWrongZkChrootTest() throws IOException {
try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress() + "/xyz/foo")) {
client.setZkClientTimeout(1000 * 60);
SolrException ex = expectThrows(SolrException.class, client::connect);
assertTrue(ex.getMessage().contains("cluster not found/not ready"));
}
}
@Test
public void forwardCompatZkChrootTest() throws IOException, InterruptedException, KeeperException {
String newPath = "/forward/compat";
// Make sure that the ZKStateReader can connect when only a collections ZNode exists, not the clusterstate.json
cluster.getZkServer().getZkClient().makePath(newPath + ZkStateReader.COLLECTIONS_ZKNODE, new byte[0], CreateMode.PERSISTENT, true);
cluster.getZkServer().getZkClient().makePath(newPath + ZkStateReader.ALIASES, new byte[0], CreateMode.PERSISTENT, true);
try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress() + newPath)) {
client.setZkClientTimeout(1000 * 60);
client.connect();
}
}
@Test
public void customHttpClientTest() throws IOException {
CloseableHttpClient client = HttpClientUtil.createClient(null);
try (CloudSolrClient solrClient = getCloudSolrClient(cluster.getZkServer().getZkAddress(), client)) {
assertTrue(solrClient.getLbClient().getHttpClient() == client);
} finally {
HttpClientUtil.close(client);
}
}
@Test
public void testVersionsAreReturned() throws Exception {
CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection("versions_collection", 2, 2);
// assert that "adds" are returned
UpdateRequest updateRequest = new UpdateRequest()
.add("id", "1", "a_t", "hello1")
.add("id", "2", "a_t", "hello2");
updateRequest.setParam(UpdateParams.VERSIONS, Boolean.TRUE.toString());
NamedList<Object> response = updateRequest.commit(getRandomClient(), "versions_collection").getResponse();
Object addsObject = response.get("adds");
assertNotNull("There must be a adds parameter", addsObject);
assertTrue(addsObject instanceof NamedList<?>);
NamedList<?> adds = (NamedList<?>) addsObject;
assertEquals("There must be 2 versions (one per doc)", 2, adds.size());
Map<String, Long> versions = new HashMap<>();
Object object = adds.get("1");
assertNotNull("There must be a version for id 1", object);
assertTrue("Version for id 1 must be a long", object instanceof Long);
versions.put("1", (Long) object);
object = adds.get("2");
assertNotNull("There must be a version for id 2", object);
assertTrue("Version for id 2 must be a long", object instanceof Long);
versions.put("2", (Long) object);
QueryResponse resp = getRandomClient().query("versions_collection", new SolrQuery("*:*"));
assertEquals("There should be one document because overwrite=true", 2, resp.getResults().getNumFound());
for (SolrDocument doc : resp.getResults()) {
Long version = versions.get(doc.getFieldValue("id"));
assertEquals("Version on add must match _version_ field", version, doc.getFieldValue("_version_"));
}
// assert that "deletes" are returned
UpdateRequest deleteRequest = new UpdateRequest().deleteById("1");
deleteRequest.setParam(UpdateParams.VERSIONS, Boolean.TRUE.toString());
response = deleteRequest.commit(getRandomClient(), "versions_collection").getResponse();
Object deletesObject = response.get("deletes");
assertNotNull("There must be a deletes parameter", deletesObject);
@SuppressWarnings({"rawtypes"})
NamedList deletes = (NamedList) deletesObject;
assertEquals("There must be 1 version", 1, deletes.size());
}
@Test
public void testInitializationWithSolrUrls() throws Exception {
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION, 2, 2);
CloudSolrClient client = httpBasedCloudSolrClient;
SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
client.add(COLLECTION, doc);
client.commit(COLLECTION);
assertEquals(1, client.query(COLLECTION, params("q", "*:*")).getResults().getNumFound());
}
@Test
public void testCollectionDoesntExist() throws Exception {
CloudSolrClient client = getRandomClient();
SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
SolrException ex = expectThrows(SolrException.class, () -> client.add("boguscollectionname", doc));
assertEquals("Collection not found: boguscollectionname", ex.getMessage());
}
public void testRetryUpdatesWhenClusterStateIsStale() throws Exception {
final String COL = "stale_state_test_col";
assert cluster.getJettySolrRunners().size() >= 2;
final JettySolrRunner old_leader_node = cluster.getJettySolrRunners().get(0);
final JettySolrRunner new_leader_node = cluster.getJettySolrRunners().get(1);
// start with exactly 1 shard/replica...
assertEquals("Couldn't create collection", 0,
CollectionAdminRequest.createCollection(COL, "conf", 1, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.setCreateNodeSet(old_leader_node.getNodeName())
.process(cluster.getSolrClient()).getStatus());
cluster.waitForActiveCollection(COL, 1, 1);
// determine the coreNodeName of only current replica
Collection<Slice> slices = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COL).getSlices();
assertEquals(1, slices.size()); // sanity check
Slice slice = slices.iterator().next();
assertEquals(1, slice.getReplicas().size()); // sanity check
final String old_leader_core_node_name = slice.getLeader().getName();
// NOTE: creating our own CloudSolrClient whose settings we can muck with...
try (CloudSolrClient stale_client = new CloudSolrClientBuilder
(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
.sendDirectUpdatesToAnyShardReplica()
.withParallelUpdates(true)
.build()) {
// don't let collection cache entries get expired, even on a slow machine...
stale_client.setCollectionCacheTTl(Integer.MAX_VALUE);
stale_client.setDefaultCollection(COL);
// do a query to populate stale_client's cache...
assertEquals(0, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
// add 1 replica on a diff node...
assertEquals("Couldn't create collection", 0,
CollectionAdminRequest.addReplicaToShard(COL, "shard1")
.setNode(new_leader_node.getNodeName())
// NOTE: don't use our stale_client for this -- don't tip it off of a collection change
.process(cluster.getSolrClient()).getStatus());
AbstractDistribZkTestBase.waitForRecoveriesToFinish
(COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
// ...and delete our original leader.
assertEquals("Couldn't create collection", 0,
CollectionAdminRequest.deleteReplica(COL, "shard1", old_leader_core_node_name)
// NOTE: don't use our stale_client for this -- don't tip it off of a collection change
.process(cluster.getSolrClient()).getStatus());
AbstractDistribZkTestBase.waitForRecoveriesToFinish
(COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
// stale_client's collection state cache should now only point at a leader that no longer exists.
// attempt a (direct) update that should succeed in spite of cached cluster state
// pointing solely to a node that's no longer part of our collection...
assertEquals(0, (new UpdateRequest().add("id", "1").commit(stale_client, COL)).getStatus());
assertEquals(1, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
}
}
private static void checkSingleServer(NamedList<Object> response) {
final CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
final Map<String, LBHttpSolrClient.Req> routes = rr.getRoutes();
final Iterator<Map.Entry<String, LBHttpSolrClient.Req>> it =
routes.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, LBHttpSolrClient.Req> entry = it.next();
assertEquals("wrong number of servers: " + entry.getValue().getServers(),
1, entry.getValue().getServers().size());
}
}
/**
* Tests if the specification of 'preferReplicaTypes` in the query-params
* limits the distributed query to locally hosted shards only
*/
@Test
// commented 15-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
public void preferReplicaTypesTest() throws Exception {
String collectionName = "replicaTypesTestColl";
int liveNodes = cluster.getJettySolrRunners().size();
// For these tests we need to have multiple replica types.
// Hence the below configuration for our collection
int pullReplicas = Math.max(1, liveNodes - 2);
CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, 1, 1, pullReplicas)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.setMaxShardsPerNode(liveNodes)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * (2 + pullReplicas));
// Add some new documents
new UpdateRequest()
.add(id, "0", "a_t", "hello1")
.add(id, "2", "a_t", "hello2")
.add(id, "3", "a_t", "hello2")
.commit(getRandomClient(), collectionName);
// Run the actual tests for 'shards.preference=replica.type:*'
queryWithPreferReplicaTypes(getRandomClient(), "PULL", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "TLOG", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", false, collectionName);
// Test to verify that preferLocalShards=true doesn't break this
queryWithPreferReplicaTypes(getRandomClient(), "PULL", true, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", true, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "TLOG", true, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", true, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", true, collectionName);
CollectionAdminRequest.deleteCollection(collectionName)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
}
private void queryWithPreferReplicaTypes(CloudSolrClient cloudClient,
String preferReplicaTypes,
boolean preferLocalShards,
String collectionName)
throws Exception {
SolrQuery qRequest = new SolrQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams();
final List<String> preferredTypes = Arrays.asList(preferReplicaTypes.split("\\|"));
StringBuilder rule = new StringBuilder();
preferredTypes.forEach(type -> {
if (rule.length() != 0) {
rule.append(',');
}
rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE);
rule.append(':');
rule.append(type);
});
if (preferLocalShards) {
if (rule.length() != 0) {
rule.append(',');
}
rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION);
rule.append(":local");
}
qParams.add(ShardParams.SHARDS_PREFERENCE, rule.toString());
qParams.add(ShardParams.SHARDS_INFO, "true");
qRequest.add(qParams);
// CloudSolrClient sends the request to some node.
// And since all the nodes are hosting cores from all shards, the
// distributed query formed by this node will select cores from the
// local shards only
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
assertNotNull("Unable to obtain " + ShardParams.SHARDS_INFO, shardsInfo);
Map<String, String> replicaTypeMap = new HashMap<String, String>();
DocCollection collection = getCollectionState(collectionName);
for (Slice slice : collection.getSlices()) {
for (Replica replica : slice.getReplicas()) {
String coreUrl = replica.getCoreUrl();
// It seems replica reports its core URL with a trailing slash while shard
// info returned from the query doesn't. Oh well.
if (coreUrl.endsWith("/")) {
coreUrl = coreUrl.substring(0, coreUrl.length() - 1);
}
replicaTypeMap.put(coreUrl, replica.getType().toString());
}
}
// Iterate over shards-info and check that replicas of correct type responded
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>) shardsInfo;
@SuppressWarnings({"unchecked"})
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
List<String> shardAddresses = new ArrayList<String>();
while (itr.hasNext()) {
Map.Entry<String, ?> e = itr.next();
assertTrue("Did not find map-type value in " + ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
String shardAddress = (String) ((Map) e.getValue()).get("shardAddress");
assertNotNull(ShardParams.SHARDS_INFO + " did not return 'shardAddress' parameter", shardAddress);
assertTrue(replicaTypeMap.containsKey(shardAddress));
assertTrue(preferredTypes.indexOf(replicaTypeMap.get(shardAddress)) == 0);
shardAddresses.add(shardAddress);
}
assertTrue("No responses", shardAddresses.size() > 0);
if (log.isInfoEnabled()) {
log.info("Shards giving the response: {}", Arrays.toString(shardAddresses.toArray()));
}
}
@Test
public void testPing() throws Exception {
final String testCollection = "ping_test";
CollectionAdminRequest.createCollection(testCollection, "conf", 2, 1)
.setPerReplicaState(USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(testCollection, 2, 2);
final SolrClient clientUnderTest = getRandomClient();
final SolrPingResponse response = clientUnderTest.ping(testCollection);
assertEquals("This should be OK", 0, response.getStatus());
}
public void testPerReplicaStateCollection() throws Exception {
CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1)
.process(cluster.getSolrClient());
String testCollection = "perReplicaState_test";
int liveNodes = cluster.getJettySolrRunners().size();
CollectionAdminRequest.createCollection(testCollection, "conf", 2, 2)
.setMaxShardsPerNode(liveNodes)
.setPerReplicaState(Boolean.TRUE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(testCollection, 2, 4);
final SolrClient clientUnderTest = getRandomClient();
final SolrPingResponse response = clientUnderTest.ping(testCollection);
assertEquals("This should be OK", 0, response.getStatus());
DocCollection c = cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
assertEquals(4, prs.states.size());
JettySolrRunner jsr = cluster.startJettySolrRunner();
// Now let's do an add replica
CollectionAdminRequest
.addReplicaToShard(testCollection, "shard1")
.process(cluster.getSolrClient());
prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
assertEquals(5, prs.states.size());
c = cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
Replica toDelete = c.getReplica((shard, replica) -> "shard1".equals(shard) && !replica.isLeader());
CollectionAdminRequest.deleteReplica(testCollection, "shard1", toDelete.getName()).process(cluster.getSolrClient());
cluster.waitForActiveCollection(testCollection, 2, 4);
testCollection = "perReplicaState_testv2";
new V2Request.Builder("/collections")
.withMethod(POST)
.withPayload("{create: {name: perReplicaState_testv2, config : conf, numShards : 2, nrtReplicas : 2, perReplicaState : true, maxShardsPerNode : 5}}")
.build()
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(testCollection, 2, 4);
c = cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
assertEquals(4, prs.states.size());
}
}