blob: b4eb3a536a4ca84a2c0db3dc7e6967fc0d044215 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.cloud.ZkController.NotInClusterStateException;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrQueuedThreadPool;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.Diagnostics;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.RestTestHarness;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.util.Utils.makeMap;
/**
* TODO: we should still test this works as a custom update chain as well as
* what we test now - the default update chain
*/
@Slow
public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected static final boolean VERBOSE = false ;
private static Path confDir;
protected volatile SolrQueuedThreadPool qtp;
private static void copyConfigFileToTmpConf(Path confDir, String file) throws IOException {
Files.copy(Paths.get(SolrTestUtil.TEST_HOME(), "collection1", "conf", file),
Paths.get(confDir.toString(), file), StandardCopyOption.REPLACE_EXISTING);
}
private static void copyConfigFileToTmpConf(Path confDir, String file, String zkFile) throws IOException {
Files.copy(Paths.get(SolrTestUtil.TEST_HOME(), "collection1", "conf", file),
Paths.get(confDir.toString(), zkFile), StandardCopyOption.REPLACE_EXISTING);
}
@Before
public void beforeTest() throws Exception {
cloudInit = false;
qtp = getQtp();
try {
qtp.start();
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new RuntimeException(e);
}
}
@After
public void afterTest() throws Exception {
if (qtp != null) {
qtp.close();
qtp = null;
}
}
public static final String SHARD1 = "s1";
public static final String SHARD2 = "s2";
protected boolean printLayoutOnTearDown = false;
String t1 = "a_t";
String i1 = "a_i1";
String tlong = "other_tl1";
String oddField = "oddField_s";
String missingField = "ignore_exception__missing_but_valid_field_t";
protected int sliceCount;
protected volatile CloudHttp2SolrClient controlClientCloud; // cloud version of the control client
protected volatile CloudHttp2SolrClient cloudClient;
protected final List<SolrClient> coreClients = Collections.synchronizedList(new ArrayList<>());
protected final List<CloudJettyRunner> cloudJettys = Collections.synchronizedList(new ArrayList<>());
protected final Map<String,List<CloudJettyRunner>> shardToJetty = new ConcurrentHashMap<>();
private AtomicInteger jettyIntCntr = new AtomicInteger(0);
protected volatile ChaosMonkey chaosMonkey;
protected Map<String,CloudJettyRunner> shardToLeaderJetty = new ConcurrentHashMap<>();
private static volatile boolean cloudInit;
protected volatile boolean useJettyDataDir = true;
private final Set<RestTestHarness> restTestHarnesses = ConcurrentHashMap.newKeySet();
public static class CloudJettyRunner {
public JettySolrRunner jetty;
public String nodeName;
public String coreNodeName;
public String url;
public CloudSolrServerClient client;
public ZkNodeProps info;
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((url == null) ? 0 : url.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
CloudJettyRunner other = (CloudJettyRunner) obj;
if (url == null) {
if (other.url != null) return false;
} else if (!url.equals(other.url)) return false;
return true;
}
@Override
public String toString() {
return "CloudJettyRunner [url=" + url + "]";
}
}
public static class CloudSolrServerClient {
SolrClient solrClient;
String shardName;
int port;
public ZkNodeProps info;
public CloudSolrServerClient() {}
public CloudSolrServerClient(SolrClient client) {
this.solrClient = client;
}
public SolrClient getSolrClient() {
return solrClient;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((solrClient == null) ? 0 : solrClient.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
CloudSolrServerClient other = (CloudSolrServerClient) obj;
if (solrClient == null) {
if (other.solrClient != null) return false;
} else if (!solrClient.equals(other.solrClient)) return false;
return true;
}
}
protected static void setErrorHook() {
SolrCmdDistributor.testing_errorHook = new Diagnostics.Callable() {
@Override
public void call(Object... data) {
Exception e = (Exception) data[0];
if (e == null) return;
String msg = e.getMessage();
if (msg != null && msg.contains("Timeout")) {
Diagnostics.logThreadDumps("REQUESTING THREAD DUMP DUE TO TIMEOUT: " + e.getMessage());
}
}
};
}
protected static void clearErrorHook() {
SolrCmdDistributor.testing_errorHook = null;
}
@Override
public void distribSetUp() throws Exception {
super.distribSetUp();
// ignoreException(".*");
Path tmpDir = SolrTestUtil.createTempDir();
confDir = Paths.get(tmpDir.toString(), "conf");
Files.createDirectories(confDir);
copyConfigFileToTmpConf(confDir, configString != null ? configString : "solrconfig.xml", "solrconfig.xml");
copyConfigFileToTmpConf(confDir, schemaString != null ? schemaString : "schema.xml", "schema.xml");
copyConfigFileToTmpConf(confDir, "solrconfig.snippet.randomindexconfig.xml");
copyConfigFileToTmpConf(confDir, "stopwords.txt");
copyConfigFileToTmpConf(confDir, "protwords.txt");
copyConfigFileToTmpConf(confDir, "currency.xml");
copyConfigFileToTmpConf(confDir, "open-exchange-rates.json");
copyConfigFileToTmpConf(confDir, "mapping-ISOLatin1Accent.txt");
copyConfigFileToTmpConf(confDir, "old_synonyms.txt");
copyConfigFileToTmpConf(confDir, "synonyms.txt");
copyConfigFileToTmpConf(confDir, "enumsConfig.xml");
System.setProperty("solr.suppressDefaultConfigBootstrap", "false");
System.setProperty("solr.default.confdir", confDir.toString());
cloudInit = false;
if (sliceCount > 0) {
System.setProperty("numShards", Integer.toString(sliceCount));
} else {
System.clearProperty("numShards");
}
if (isSSLMode()) {
System.clearProperty("urlScheme");
try (ZkStateReader zkStateReader = new ZkStateReader(zkServer.getZkAddress(),
AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT)) {
try {
zkStateReader.getZkClient().create(ZkStateReader.CLUSTER_PROPS,
Utils.toJSON(Collections.singletonMap("urlScheme", "https")),
CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException e) {
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(ZkStateReader.CLUSTER_PROPS,
null, null));
zkStateReader.getZkClient().setData(ZkStateReader.CLUSTER_PROPS, Utils.toJSON(props.plus("urlScheme", "https")), true);
}
}
}
if (useTlogReplicas()) {
log.info("Will use {} replicas unless explicitly asked otherwise", Replica.Type.TLOG);
} else {
log.info("Will use {} replicas unless explicitly asked otherwise", Replica.Type.NRT);
}
}
@BeforeClass
public static void beforeAbstractFullDistribZkTestBase() {
System.setProperty("solrcloud.update.delay", "0");
}
@AfterClass
public static void afterAbstractFullDistribZkTestBase() throws Exception {
}
public AbstractFullDistribZkTestBase() {
sliceCount = 2;
fixShardCount(4);
// TODO: for now, turn off stress because it uses regular clients, and we
// need the cloud client because we kill servers
stress = 0;
useExplicitNodeNames = random().nextBoolean();
}
protected String getDataDir(String dataDir) throws IOException {
return dataDir;
}
protected void initCloud() throws Exception {
assert(cloudInit == false);
cloudInit = true;
cloudClient = createCloudClient(DEFAULT_COLLECTION);
cloudClient.connect();
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
chaosMonkey = new ChaosMonkey(zkServer, zkStateReader, DEFAULT_COLLECTION,
shardToJetty, shardToLeaderJetty);
}
protected boolean useTlogReplicas() {
return false;
}
protected CloudHttp2SolrClient createCloudClient(String defaultCollection) {
CloudHttp2SolrClient client = getCloudSolrClient(zkServer.getZkAddress(), random().nextBoolean());
if (defaultCollection != null) client.setDefaultCollection(defaultCollection);
return client;
}
@Override
protected void createServers(int numServers) throws Exception {
File controlJettyDir = SolrTestUtil.createTempDir("control").toFile();
setupJettySolrHome(controlJettyDir);
controlJetty = createJetty(controlJettyDir, useJettyDataDir ? getDataDir(testDir
+ "/control/data") : null);
controlJetty.start();
try (CloudHttp2SolrClient client = createCloudClient("control_collection")) {
assertEquals(0, CollectionAdminRequest
.createCollection("control_collection", "_default", 1, 1)
// .setCreateNodeSet(controlJetty.getNodeName())
.process(client).getStatus());
waitForActiveReplicaCount(client, "control_collection", 1);
}
controlClient = new HttpSolrClient.Builder(controlJetty.getBaseUrl() + "/control_collection").build();
if (sliceCount <= 0) {
// for now, just create the cloud client for the control if we don't
// create the normal cloud client.
// this can change if more tests need it.
controlClientCloud = createCloudClient("control_collection");
controlClientCloud.connect();
// NOTE: we are skipping creation of the chaos monkey by returning here
cloudClient = controlClientCloud; // temporary - some code needs/uses
// cloudClient
return;
}
initCloud();
createJettys(numServers);
}
public static void waitForCollection(ZkStateReader reader, String collection, int slices) throws Exception {
log.info("waitForCollection ({}): slices={}", collection, slices);
// wait until shards have started registering...
int cnt = 30;
while (!reader.getClusterState().hasCollection(collection)) {
if (cnt == 0) {
throw new RuntimeException("timeout waiting for collection in cluster state: collection=" + collection);
}
cnt--;
Thread.sleep(250);
}
cnt = 30;
while (reader.getClusterState().getCollection(collection).getSlices().size() < slices) {
if (cnt == 0) {
throw new RuntimeException("timeout waiting for collection shards to come up: collection="+collection
+ ", slices.expected="+slices+ " slices.actual= " + reader.getClusterState().getCollection(collection).getSlices().size()
+ " slices : "+ reader.getClusterState().getCollection(collection).getSlices() );
}
cnt--;
Thread.sleep(250);
}
}
protected List<JettySolrRunner> createJettys(int numJettys) throws Exception {
List<JettySolrRunner> jettys = Collections.synchronizedList(new ArrayList<>());
List<SolrClient> clients = Collections.synchronizedList(new ArrayList<>());
List<CollectionAdminRequest> createReplicaRequests = Collections.synchronizedList(new ArrayList<>());
List<CollectionAdminRequest> createPullReplicaRequests = Collections.synchronizedList(new ArrayList<>());
StringBuilder sb = new StringBuilder();
// HACK: Don't be fooled by the replication factor of '1'...
//
// This CREATE command asks for a repFactor of 1, but uses an empty nodeSet.
// This allows this method to create a collection with numShards == sliceCount,
// but no actual cores ... yet. The actual replicas are added later (once the actual
// jetty instances are started)
assertEquals(0, CollectionAdminRequest
.createCollection(DEFAULT_COLLECTION, "_default", sliceCount, 1) // not real rep factor!
.setCreateNodeSet(ZkStateReader.CREATE_NODE_SET_EMPTY) // empty node set prevents creation of cores
.process(cloudClient).getStatus());
AtomicInteger numOtherReplicas = new AtomicInteger(numJettys - getPullReplicaCount() * sliceCount);
if (log.isInfoEnabled()) {
log.info("Creating jetty instances pullReplicaCount={} numOtherReplicas={}"
, getPullReplicaCount(), numOtherReplicas);
}
AtomicInteger addReplicas = new AtomicInteger();
try (ParWork create = new ParWork(this, false, false)) {
for (int i = 1; i <= numJettys; i++) {
if (sb.length() > 0) sb.append(',');
int cnt = this.jettyIntCntr.incrementAndGet();
File jettyDir = SolrTestUtil.createTempDir("shard-" + i).toFile();
jettyDir.mkdirs();
setupJettySolrHome(jettyDir);
int currentI = i;
if (numOtherReplicas.get() > 0) {
numOtherReplicas.decrementAndGet();
if (useTlogReplicas()) {
if (log.isInfoEnabled()) {
log.info("create jetty {} in directory {} of type {} in shard {}"
, i, jettyDir, Replica.Type.TLOG, ((currentI % sliceCount) + 1)); // logOk
}
create.collect("Create Jettys", () -> {
try {
JettySolrRunner j = createJetty(jettyDir, useJettyDataDir ? getDataDir(testDir + "/jetty"
+ cnt) : null, null, "solrconfig.xml", null, Replica.Type.TLOG);
j.start();
jettys.add(j);
createReplicaRequests.add(CollectionAdminRequest
.addReplicaToShard(DEFAULT_COLLECTION, "s" + ((currentI % sliceCount) + 1))
.setNode(j.getNodeName())
.setType(Replica.Type.TLOG));
waitForLiveNode(j);
coreClients.add(createNewSolrClient(coreName, j.getLocalPort()));
SolrClient client = createNewSolrClient(j.getLocalPort());
clients.add(client);
addReplicas.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new RuntimeException(e);
}
});
create.addCollect();
} else {
if (log.isInfoEnabled()) {
log.info("create jetty {} in directory {} of type {} for shard{}"
, i, jettyDir, Replica.Type.NRT, ((currentI % sliceCount) + 1)); // logOk
}
create.collect("Create Jettys", () -> {
try {
JettySolrRunner j = createJetty(jettyDir, useJettyDataDir ? getDataDir(testDir + "/jetty"
+ cnt) : null, null, "solrconfig.xml", null, null);
j.start();
jettys.add(j);
createReplicaRequests.add(CollectionAdminRequest
.addReplicaToShard(DEFAULT_COLLECTION, "s" + ((currentI % sliceCount) + 1))
.setNode(j.getNodeName())
.setType(Replica.Type.NRT));
waitForLiveNode(j);
coreClients.add(createNewSolrClient(coreName, j.getLocalPort()));
SolrClient client = createNewSolrClient(j.getLocalPort());
clients.add(client);
addReplicas.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new RuntimeException(e);
}
});
create.addCollect();
}
} else {
log.info("create jetty {} in directory {} of type {} for shard{}", i, jettyDir, Replica.Type.PULL, ((currentI % sliceCount) + 1)); // logOk
create.collect("Create Jettys", () -> {
try {
JettySolrRunner j = createJetty(jettyDir, useJettyDataDir ? getDataDir(testDir + "/jetty"
+ cnt) : null, null, "solrconfig.xml", null, Replica.Type.PULL);
j.start();
jettys.add(j);
createPullReplicaRequests.add(CollectionAdminRequest
.addReplicaToShard(DEFAULT_COLLECTION, "s" + ((currentI % sliceCount) + 1))
.setNode(j.getNodeName())
.setType(Replica.Type.PULL));
waitForLiveNode(j);
coreClients.add(createNewSolrClient(coreName, j.getLocalPort()));
SolrClient client = createNewSolrClient(j.getLocalPort());
clients.add(client);
addReplicas.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new RuntimeException(e);
}
});
create.addCollect();
}
}
}
cloudClient.getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() >= jettys.size());
try (ParWork closer = new ParWork(this)) {
log.info("creating replicas: {}", createReplicaRequests);
synchronized (createReplicaRequests) {
for (CollectionAdminRequest r : createReplicaRequests) {
closer.collect("createReplica", () -> {
CollectionAdminResponse response;
try {
response = (CollectionAdminResponse) r.process(cloudClient);
} catch (SolrServerException | IOException e) {
throw new RuntimeException(e);
}
assertTrue(response.getResponse().toString(), response.isSuccess());
});
}
}
closer.addCollect();
log.info("creating pull replicas: {}", createPullReplicaRequests);
synchronized (createPullReplicaRequests) {
for (CollectionAdminRequest r : createPullReplicaRequests) {
closer.collect("createPullReplicaRequests", () -> {
CollectionAdminResponse response;
try {
response = (CollectionAdminResponse) r.process(cloudClient);
} catch (SolrServerException | IOException e) {
throw new RuntimeException(e);
}
assertTrue(response.isSuccess());
String coreName = response.getCollectionCoresStatus().keySet().iterator().next();
});
}
}
}
// cloudClient.getZkStateReader().waitForState(DEFAULT_COLLECTION, 10, TimeUnit.SECONDS,
// MiniSolrCloudCluster.expectedShardsAndActiveReplicas(sliceCount, addReplicas.get()));
waitForActiveReplicaCount(cloudClient, DEFAULT_COLLECTION, addReplicas.get());
this.jettys.addAll(jettys);
this.clients.addAll(clients);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
zkStateReader.waitForActiveCollection(cloudClient.getHttpClient(), DEFAULT_COLLECTION, 10, TimeUnit.SECONDS, false, sliceCount, addReplicas.get(), true, true);
// make sure we have a leader for each shard
for (int i = 1; i <= sliceCount; i++) {
zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "s" + i, 10000);
}
if (sliceCount > 0) {
updateMappingsFromZk(this.jettys, this.clients);
}
// build the shard string
for (int i = 1; i <= numJettys / 2; i++) {
JettySolrRunner j = this.jettys.get(i);
JettySolrRunner j2 = this.jettys.get(i + (numJettys / 2 - 1));
if (sb.length() > 0) sb.append(',');
sb.append(buildUrl(j.getLocalPort()));
sb.append("|").append(buildUrl(j2.getLocalPort()));
}
shards = sb.toString();
return jettys;
}
protected void waitForLiveNode(JettySolrRunner j) throws InterruptedException, TimeoutException {
if (log.isInfoEnabled()) {
log.info("waitForLiveNode: {}", j.getNodeName());
}
cloudClient.getZkStateReader().waitForLiveNodes(30, TimeUnit.SECONDS, SolrCloudTestCase.containsLiveNode(j.getNodeName()));
}
protected void waitForActiveReplicaCount(CloudHttp2SolrClient client, String collection, int expectedNumReplicas) throws TimeoutException, NotInClusterStateException {
log.info("Waiting to see {} active replicas in collection: {}", expectedNumReplicas, collection);
AtomicInteger nReplicas = new AtomicInteger();
try {
client.getZkStateReader().waitForState(collection, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
if (collectionState == null) {
return false;
}
int activeReplicas = 0;
for (Slice slice : collectionState) {
for (Replica replica : slice) {
if (replica.isActive(liveNodes)) {
activeReplicas++;
}
}
}
nReplicas.set(activeReplicas);
return (activeReplicas == expectedNumReplicas);
});
} catch (TimeoutException | InterruptedException e) {
try {
printLayout();
} catch (Exception e1) {
ParWork.propagateInterrupt(e1);
throw new RuntimeException(e1);
}
throw new NotInClusterStateException(ErrorCode.SERVER_ERROR,
"Number of replicas in the state does not match what we set:" + nReplicas + " vs " + expectedNumReplicas);
}
}
protected int getPullReplicaCount() {
return 0;
}
/**
* Total number of replicas for all shards as indicated by the cluster state, regardless of status.
*
* @deprecated This method is virtually useless as it does not consider the status of either
* the shard or replica, nor wether the node hosting each replica is alive.
*/
@Deprecated
protected int getTotalReplicas(DocCollection c, String collection) {
if (c == null) return 0; // support for when collection hasn't been created yet
int cnt = 0;
for (Slice slices : c.getSlices()) {
cnt += slices.getReplicas().size();
}
return cnt;
}
public JettySolrRunner createJetty(String dataDir, String ulogDir, String shardList,
String solrConfigOverride) throws Exception {
JettyConfig jettyconfig = JettyConfig.builder()
.setContext(context)
.stopAtShutdown(true)
.withServlets(getExtraServlets())
.withFilters(getExtraRequestFilters())
.withSSLConfig(sslConfig.buildServerSSLConfig())
.withExecutor(qtp)
.build();
Properties props = new Properties();
props.setProperty("solr.data.dir", getDataDir(dataDir));
props.setProperty("shards", shardList);
props.setProperty("solr.ulog.dir", ulogDir);
props.setProperty("solrconfig", solrConfigOverride);
JettySolrRunner jetty = new JettySolrRunner(getSolrHome(), props, jettyconfig);
jetty.start();
return jetty;
}
public final JettySolrRunner createJetty(File solrHome, String dataDir, String shardList, String solrConfigOverride, String schemaOverride) throws Exception {
return createJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, null);
}
public JettySolrRunner createJetty(File solrHome, String dataDir, String shardList, String solrConfigOverride, String schemaOverride, Replica.Type replicaType) throws Exception {
// randomly test a relative solr.home path
if (random().nextBoolean()) {
solrHome = getRelativeSolrHomePath(solrHome);
}
JettyConfig jettyconfig = JettyConfig.builder()
.setContext(context)
.stopAtShutdown(true)
.withServlets(getExtraServlets())
.withFilters(getExtraRequestFilters())
.withSSLConfig(sslConfig.buildServerSSLConfig())
.build();
Properties props = new Properties();
if (solrConfigOverride != null)
props.setProperty("solrconfig", solrConfigOverride);
if (schemaOverride != null)
props.setProperty("schema", schemaOverride);
if (shardList != null)
props.setProperty("shards", shardList);
if (dataDir != null)
props.setProperty("solr.data.dir", getDataDir(dataDir));
if (replicaType != null) {
props.setProperty("replicaType", replicaType.toString());
} else if (random().nextBoolean()) {
props.setProperty("replicaType", Replica.Type.NRT.toString());
}
props.setProperty("coreRootDirectory", solrHome.toPath().resolve("cores").toAbsolutePath().toString());
JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), props, jettyconfig);
return jetty;
}
/**
* Creates a JettySolrRunner with a socket proxy sitting infront of the Jetty server,
* which gives us the ability to simulate network partitions without having to fuss
* with IPTables.
*/
public JettySolrRunner createProxiedJetty(File solrHome, String dataDir,
String shardList, String solrConfigOverride, String schemaOverride, Replica.Type replicaType)
throws Exception {
JettyConfig jettyconfig = JettyConfig.builder()
.setContext(context)
.stopAtShutdown(true)
.withServlets(getExtraServlets())
.withFilters(getExtraRequestFilters())
.withSSLConfig(sslConfig.buildServerSSLConfig())
.enableProxy(true)
.build();
Properties props = new Properties();
if (solrConfigOverride != null)
props.setProperty("solrconfig", solrConfigOverride);
if (schemaOverride != null)
props.setProperty("schema", schemaOverride);
if (shardList != null)
props.setProperty("shards", shardList);
if (dataDir != null)
props.setProperty("solr.data.dir", getDataDir(dataDir));
if (replicaType != null) {
props.setProperty("replicaType", replicaType.toString());
} else if (random().nextBoolean()) {
props.setProperty("replicaType", Replica.Type.NRT.toString());
}
props.setProperty("coreRootDirectory", solrHome.toPath().resolve("cores").toAbsolutePath().toString());
JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), props, jettyconfig);
return jetty;
}
protected int getReplicaPort(Replica replica) {
String replicaNode = replica.getNodeName();
String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
if (tmp.indexOf('_') != -1)
tmp = tmp.substring(0,tmp.indexOf('_'));
return Integer.parseInt(tmp);
}
protected JettySolrRunner getJettyOnPort(int port) {
JettySolrRunner theJetty = null;
for (JettySolrRunner jetty : jettys) {
if (port == jetty.getLocalPort()) {
theJetty = jetty;
break;
}
}
if (theJetty == null) {
if (controlJetty.getLocalPort() == port) {
theJetty = controlJetty;
}
}
if (theJetty == null)
fail("Not able to find JettySolrRunner for port: "+port);
return theJetty;
}
protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
String replicaBaseUrl = replica.getBaseUrl();
assertNotNull(replicaBaseUrl);
List<JettySolrRunner> runners = new ArrayList<>(jettys);
runners.add(controlJetty);
for (JettySolrRunner j : runners) {
if (replicaBaseUrl.replaceAll("/$", "").equals(j.getProxyBaseUrl().replaceAll("/$", ""))) {
return j.getProxy();
}
}
printLayout();
fail("No proxy found for " + replicaBaseUrl + "!");
return null;
}
private File getRelativeSolrHomePath(File solrHome) {
final Path solrHomePath = solrHome.toPath();
final Path curDirPath = new File("").getAbsoluteFile().toPath();
if (!solrHomePath.getRoot().equals(curDirPath.getRoot())) {
// root of current directory and solrHome are not the same, therefore cannot relativize
return solrHome;
}
final Path root = solrHomePath.getRoot();
// relativize current directory to root: /tmp/foo -> /tmp/foo/../..
final File relativizedCurDir = new File(curDirPath.toFile(), curDirPath.relativize(root).toString());
// exclude the root from solrHome: /tmp/foo/solrHome -> tmp/foo/solrHome
final Path solrHomeRelativeToRoot = root.relativize(solrHomePath);
// create the relative solrHome: /tmp/foo/../../tmp/foo/solrHome
return new File(relativizedCurDir, solrHomeRelativeToRoot.toString()).getAbsoluteFile();
}
protected void updateMappingsFromZk(List<JettySolrRunner> jettys, List<SolrClient> clients) throws Exception {
updateMappingsFromZk(jettys, clients, false);
}
protected void updateMappingsFromZk(List<JettySolrRunner> jettys, List<SolrClient> clients, boolean allowOverSharding) throws Exception {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
cloudJettys.clear();
shardToJetty.clear();
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection(DEFAULT_COLLECTION);
List<CloudSolrServerClient> theClients = new ArrayList<>();
for (SolrClient client : clients) {
// find info for this client in zk
nextClient:
// we find out state by simply matching ports...
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
int port = new URI(((Http2SolrClient) client).getBaseURL())
.getPort();
if (replica.getBaseUrl().contains(":" + port)) {
CloudSolrServerClient csc = new CloudSolrServerClient();
csc.solrClient = client;
csc.port = port;
csc.shardName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
csc.info = replica;
theClients .add(csc);
break nextClient;
}
}
}
}
for (JettySolrRunner jetty : jettys) {
int port = jetty.getLocalPort();
if (port == -1) {
throw new RuntimeException("Cannot find the port for jetty");
}
nextJetty:
for (Slice slice : coll.getSlices()) {
Set<Entry<String,Replica>> entries = slice.getReplicasMap().entrySet();
for (Entry<String,Replica> entry : entries) {
Replica replica = entry.getValue();
if (replica.getBaseUrl().contains(":" + port)) {
List<CloudJettyRunner> list = shardToJetty.get(slice.getName());
if (list == null) {
list = new ArrayList<>();
shardToJetty.put(slice.getName(), list);
}
boolean isLeader = slice.getLeader() == replica;
CloudJettyRunner cjr = new CloudJettyRunner();
cjr.jetty = jetty;
cjr.info = replica;
cjr.nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
cjr.coreNodeName = entry.getKey();
cjr.url = replica.getCoreUrl();
cjr.client = findClientByPort(port, theClients);
list.add(cjr);
if (isLeader) {
shardToLeaderJetty.put(slice.getName(), cjr);
}
cloudJettys.add(cjr);
break nextJetty;
}
}
}
}
// # of jetties may not match replicas in shard here, because we don't map
// jetties that are not running - every shard should have at least one
// running jetty though
for (Slice slice : coll.getSlices()) {
// check that things look right
List<CloudJettyRunner> jetties = shardToJetty.get(slice.getName());
if (!allowOverSharding) {
assertNotNull("Test setup problem: We found no jetties for shard: "
+ slice.getName() + " just:" + shardToJetty.keySet(), jetties);
assertEquals("slice:" + slice.getName(), slice.getReplicas().size(),
jetties.size());
}
}
}
private CloudSolrServerClient findClientByPort(int port, List<CloudSolrServerClient> theClients) {
for (CloudSolrServerClient client : theClients) {
if (client.port == port) {
return client;
}
}
throw new IllegalArgumentException("Client with the given port does not exist:" + port);
}
@Override
protected void setDistributedParams(ModifiableSolrParams params) {
if (r.nextBoolean()) {
// don't set shards, let that be figured out from the cloud state
} else {
// use shard ids rather than physical locations
StringBuilder sb = new StringBuilder();
for (int i = 0; i < sliceCount; i++) {
if (i > 0) sb.append(',');
sb.append("s").append(i + 1);
}
params.set("shards", sb.toString());
}
}
protected int sendDocsWithRetry(List<SolrInputDocument> batch, int minRf, int maxRetries, int waitBeforeRetry) throws Exception {
return sendDocsWithRetry(cloudClient, cloudClient.getDefaultCollection(), batch, minRf, maxRetries, waitBeforeRetry);
}
@SuppressWarnings("rawtypes")
protected static int sendDocsWithRetry(CloudHttp2SolrClient cloudClient, String collection, List<SolrInputDocument> batch, int minRf, int maxRetries, int waitBeforeRetry) throws Exception {
UpdateRequest up = new UpdateRequest();
up.add(batch);
NamedList resp = null;
int numRetries = 0;
while(true) {
try {
resp = cloudClient.request(up, collection);
return cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), resp);
} catch (Exception exc) {
ParWork.propagateInterrupt(exc);
Throwable rootCause = SolrException.getRootCause(exc);
if (++numRetries <= maxRetries) {
log.warn("ERROR: {} ... Sleeping for {} seconds before re-try ...", rootCause, waitBeforeRetry);
Thread.sleep(waitBeforeRetry * 1000L);
} else {
log.error("No more retries available! Add batch failed due to: {}", rootCause);
throw exc;
}
}
}
}
protected static void sendDocs(CloudHttp2SolrClient cloudClient, String collection, SolrInputDocument doc) throws Exception {
UpdateRequest up = new UpdateRequest();
up.add(doc);
NamedList resp = null;
try {
resp = cloudClient.request(up, collection);
} catch (Exception exc) {
ParWork.propagateInterrupt(exc);
Throwable rootCause = SolrException.getRootCause(exc);
log.error(" Add doc failed due to: {}", rootCause);
throw exc;
}
}
@Override
protected void indexDoc(SolrInputDocument doc) throws IOException,
SolrServerException {
UpdateRequest req = new UpdateRequest();
req.add(doc);
req.setParam("CONTROL", "TRUE");
req.process(controlClient);
// if we wanted to randomly pick a client - but sometimes they may be
// down...
// boolean pick = random.nextBoolean();
//
// int which = (doc.getField(id).toString().hashCode() & 0x7fffffff) %
// sliceCount;
//
// if (pick && sliceCount > 1) {
// which = which + ((shardCount / sliceCount) *
// random.nextInt(sliceCount-1));
// }
//
// HttpSolrServer client = (HttpSolrServer)
// clients.get(which);
UpdateRequest ureq = new UpdateRequest();
ureq.add(doc);
// ureq.setParam(UpdateParams.UPDATE_CHAIN, DISTRIB_UPDATE_CHAIN);
ureq.process(cloudClient);
}
@Override
protected void index_specific(int serverNumber, Object... fields)
throws Exception {
SolrInputDocument doc = new SolrInputDocument();
for (int i = 0; i < fields.length; i += 2) {
doc.addField((String) (fields[i]), fields[i + 1]);
}
controlClient.add(doc);
HttpSolrClient client = (HttpSolrClient) clients
.get(serverNumber);
UpdateRequest ureq = new UpdateRequest();
ureq.add(doc);
// ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
ureq.process(client);
}
protected void index_specific(SolrClient client, Object... fields)
throws Exception {
SolrInputDocument doc = new SolrInputDocument();
for (int i = 0; i < fields.length; i += 2) {
doc.addField((String) (fields[i]), fields[i + 1]);
}
UpdateRequest ureq = new UpdateRequest();
ureq.add(doc);
// ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
ureq.process(client);
// add to control second in case adding to shards fails
controlClient.add(doc);
}
protected ZkCoreNodeProps getLeaderUrlFromZk(String collection, String slice) {
ClusterState clusterState = getCommonCloudSolrClient().getZkStateReader().getClusterState();
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (docCollection != null && docCollection.getLeader(slice) != null) {
return new ZkCoreNodeProps(docCollection.getLeader(slice));
}
throw new RuntimeException("Could not find leader:" + collection + " " + slice);
}
@Override
protected void del(String q) throws Exception {
controlClient.deleteByQuery(q);
cloudClient.deleteByQuery(q);
/***
for (SolrServer client : clients) {
UpdateRequest ureq = new UpdateRequest();
// ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
ureq.deleteByQuery(q).process(client);
}
***/
}// serial commit...
protected void checkQueries() throws Exception {
handle.put("_version_", SKIPVAL);
query("q", "*:*", "sort", "n_tl1 desc");
handle.put("response", UNORDERED); // get?ids=a,b,c requests are unordered
String ids = "987654";
for (int i=0; i<20; i++) {
query("qt","/get", "id",Integer.toString(i));
query("qt","/get", "ids",Integer.toString(i));
ids = ids + ',' + Integer.toString(i);
query("qt","/get", "ids",ids);
}
handle.remove("response");
// random value sort
for (String f : fieldNames) {
query("q", "*:*", "sort", f + " desc");
query("q", "*:*", "sort", f + " asc");
}
// these queries should be exactly ordered and scores should exactly match
query("q", "*:*", "sort", i1 + " desc");
query("q", "*:*", "sort", i1 + " asc");
query("q", "*:*", "sort", i1 + " desc", "fl", "*,score");
query("q", "*:*", "sort", "n_tl1 asc", "fl", "score"); // test legacy
// behavior -
// "score"=="*,score"
query("q", "*:*", "sort", "n_tl1 desc");
handle.put("maxScore", SKIPVAL);
query("q", "{!func}" + i1);// does not expect maxScore. So if it comes
// ,ignore it.
// JavaBinCodec.writeSolrDocumentList()
// is agnostic of request params.
handle.remove("maxScore");
query("q", "{!func}" + i1, "fl", "*,score"); // even scores should match
// exactly here
handle.put("highlighting", UNORDERED);
handle.put("response", UNORDERED);
handle.put("maxScore", SKIPVAL);
query("q", "quick");
query("q", "all", "fl", "id", "start", "0");
query("q", "all", "fl", "foofoofoo", "start", "0"); // no fields in returned
// docs
query("q", "all", "fl", "id", "start", "100");
handle.put("score", SKIPVAL);
query("q", "quick", "fl", "*,score");
query("q", "all", "fl", "*,score", "start", "1");
query("q", "all", "fl", "*,score", "start", "100");
query("q", "now their fox sat had put", "fl", "*,score", "hl", "true",
"hl.fl", t1);
query("q", "now their fox sat had put", "fl", "foofoofoo", "hl", "true",
"hl.fl", t1);
query("q", "matchesnothing", "fl", "*,score");
query("q", "*:*", "rows", 100, "facet", "true", "facet.field", t1);
query("q", "*:*", "rows", 100, "facet", "true", "facet.field", t1,
"facet.limit", -1, "facet.sort", "count");
query("q", "*:*", "rows", 100, "facet", "true", "facet.field", t1,
"facet.limit", -1, "facet.sort", "count", "facet.mincount", 2);
query("q", "*:*", "rows", 100, "facet", "true", "facet.field", t1,
"facet.limit", -1, "facet.sort", "index");
query("q", "*:*", "rows", 100, "facet", "true", "facet.field", t1,
"facet.limit", -1, "facet.sort", "index", "facet.mincount", 2);
query("q", "*:*", "rows", 100, "facet", "true", "facet.field", t1,
"facet.limit", 1);
query("q", "*:*", "rows", 100, "facet", "true", "facet.query", "quick",
"facet.query", "all", "facet.query", "*:*");
query("q", "*:*", "rows", 100, "facet", "true", "facet.field", t1,
"facet.offset", 1);
query("q", "*:*", "rows", 100, "facet", "true", "facet.field", t1,
"facet.mincount", 2);
// test faceting multiple things at once
query("q", "*:*", "rows", 100, "facet", "true", "facet.query", "quick",
"facet.query", "all", "facet.query", "*:*", "facet.field", t1);
// test filter tagging, facet exclusion, and naming (multi-select facet
// support)
query("q", "*:*", "rows", 100, "facet", "true", "facet.query",
"{!key=myquick}quick", "facet.query", "{!key=myall ex=a}all",
"facet.query", "*:*", "facet.field", "{!key=mykey ex=a}" + t1,
"facet.field", "{!key=other ex=b}" + t1, "facet.field",
"{!key=again ex=a,b}" + t1, "facet.field", t1, "fq",
"{!tag=a}id:[1 TO 7]", "fq", "{!tag=b}id:[3 TO 9]");
query("q", "*:*", "facet", "true", "facet.field",
"{!ex=t1}SubjectTerms_mfacet", "fq",
"{!tag=t1}SubjectTerms_mfacet:(test 1)", "facet.limit", "10",
"facet.mincount", "1");
// test field that is valid in schema but missing in all shards
query("q", "*:*", "rows", 100, "facet", "true", "facet.field",
missingField, "facet.mincount", 2);
// test field that is valid in schema and missing in some shards
query("q", "*:*", "rows", 100, "facet", "true", "facet.field", oddField,
"facet.mincount", 2);
query("q", "*:*", "sort", i1 + " desc", "stats", "true", "stats.field", i1);
// Try to get better coverage for refinement queries by turning off over
// requesting.
// This makes it much more likely that we may not get the top facet values
// and hence
// we turn of that checking.
handle.put("facet_fields", SKIPVAL);
query("q", "*:*", "rows", 0, "facet", "true", "facet.field", t1,
"facet.limit", 5, "facet.shard.limit", 5);
// check a complex key name
query("q", "*:*", "rows", 0, "facet", "true", "facet.field",
"{!key='a b/c \\' \\} foo'}" + t1, "facet.limit", 5,
"facet.shard.limit", 5);
handle.remove("facet_fields");
query("q", "*:*", "sort", "n_tl1 desc");
// index the same document to two shards and make sure things
// don't blow up.
// assumes first n clients are first n shards
if (clients.size() >= 2) {
index(id, 100, i1, 107, t1, "oh no, a duplicate!");
for (int i = 0; i < getShardCount(); i++) {
index_specific(i, id, 100, i1, 107, t1, "oh no, a duplicate!");
}
commit();
query("q", "duplicate", "hl", "true", "hl.fl", t1);
query("q", "fox duplicate horses", "hl", "true", "hl.fl", t1);
query("q", "*:*", "rows", 100);
}
}
protected void indexAbunchOfDocs() throws Exception {
indexr(id, 2, i1, 50, t1, "to come to the aid of their country.");
indexr(id, 3, i1, 2, t1, "how now brown cow");
indexr(id, 4, i1, -100, t1,
"the quick fox jumped over the lazy dog");
indexr(id, 5, i1, 500, t1,
"the quick fox jumped way over the lazy dog");
indexr(id, 6, i1, -600, t1, "humpty dumpy sat on a wall");
indexr(id, 7, i1, 123, t1, "humpty dumpy had a great fall");
indexr(id, 8, i1, 876, t1,
"all the kings horses and all the kings men");
indexr(id, 9, i1, 7, t1, "couldn't put humpty together again");
indexr(id, 10, i1, 4321, t1, "this too shall pass");
indexr(id, 11, i1, -987, t1,
"An eye for eye only ends up making the whole world blind.");
indexr(id, 12, i1, 379, t1,
"Great works are performed, not by strength, but by perseverance.");
indexr(id, 13, i1, 232, t1, "no eggs on wall, lesson learned",
oddField, "odd man out");
indexr(id, 14, "SubjectTerms_mfacet", new String[] {"mathematical models",
"mathematical analysis"});
indexr(id, 15, "SubjectTerms_mfacet", new String[] {"test 1", "test 2",
"test3"});
indexr(id, 16, "SubjectTerms_mfacet", new String[] {"test 1", "test 2",
"test3"});
String[] vals = new String[100];
for (int i = 0; i < 100; i++) {
vals[i] = "test " + i;
}
indexr(id, 17, "SubjectTerms_mfacet", vals);
for (int i = 100; i < 150; i++) {
indexr(id, i);
}
}
/**
* Executes a query against each live and active replica of the specified shard
* and aserts that the results are identical.
*
* @see #queryAndCompare
*/
public QueryResponse queryAndCompareReplicas(SolrParams params, String shard)
throws Exception {
ArrayList<SolrClient> shardClients = new ArrayList<>(7);
updateMappingsFromZk(jettys, clients);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
List<CloudJettyRunner> solrJetties = shardToJetty.get(shard);
assertNotNull("no jetties found for shard: " + shard, solrJetties);
for (CloudJettyRunner cjetty : solrJetties) {
ZkNodeProps props = cjetty.info;
String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP);
boolean active = Replica.State.getState(props.getStr(ZkStateReader.STATE_PROP)) == Replica.State.ACTIVE;
boolean live = zkStateReader.isNodeLive(nodeName);
if (active && live) {
shardClients.add(cjetty.client.solrClient);
}
}
return queryAndCompare(params, shardClients);
}
/**
* For each Shard, executes a query against each live and active replica of that shard
* and asserts that the results are identical for each replica of the same shard.
* Because results are not compared between replicas of different shards, this method
* should be safe for comparing the results of any query, even if it contains
* "distrib=false", because the replicas should all be identical.
*
* @see AbstractFullDistribZkTestBase#queryAndCompareReplicas(SolrParams, String)
*/
public void queryAndCompareShards(SolrParams params) throws Exception {
updateMappingsFromZk(jettys, clients);
List<String> shards = new ArrayList<>(shardToJetty.keySet());
for (String shard : shards) {
queryAndCompareReplicas(params, shard);
}
}
/**
* Returns a non-null string if replicas within the same shard do not have a
* consistent number of documents.
*/
protected void checkShardConsistency(String shard) throws Exception {
checkShardConsistency(shard, false, false);
}
/**
* Returns a non-null string if replicas within the same shard do not have a
* consistent number of documents.
* If expectFailure==false, the exact differences found will be logged since
* this would be an unexpected failure.
* verbose causes extra debugging into to be displayed, even if everything is
* consistent.
*/
protected String checkShardConsistency(String shard, boolean expectFailure, boolean verbose)
throws Exception {
List<CloudJettyRunner> solrJetties = shardToJetty.get(shard);
if (solrJetties == null) {
throw new RuntimeException("shard not found:" + shard + " keys:"
+ shardToJetty.keySet());
}
long num = -1;
long lastNum = -1;
String failMessage = null;
if (verbose) System.err.println("check const of " + shard);
int cnt = 0;
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
assertEquals(
"The client count does not match up with the shard count for slice:"
+ shard,
zkStateReader.getClusterState().getCollection(DEFAULT_COLLECTION).getSlice(shard)
.getReplicasMap().size(), solrJetties.size());
CloudJettyRunner lastJetty = null;
for (CloudJettyRunner cjetty : solrJetties) {
ZkNodeProps props = cjetty.info;
if (verbose) System.err.println("client" + cnt++);
if (verbose) System.err.println("PROPS:" + props);
try {
SolrParams query = params("q","*:*", "rows","0", "distrib","false", "tests","checkShardConsistency"); // "tests" is just a tag that won't do anything except be echoed in logs
num = cjetty.client.solrClient.query(query).getResults().getNumFound();
} catch (SolrException | SolrServerException e) {
if (verbose) System.err.println("error contacting client: "
+ e.getMessage() + "\n");
continue;
}
boolean live = false;
String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP);
if (zkStateReader.isNodeLive(nodeName)) {
live = true;
}
if (verbose) System.err.println(" live:" + live);
if (verbose) System.err.println(" num:" + num + "\n");
boolean active = Replica.State.getState(props.getStr(ZkStateReader.STATE_PROP)) == Replica.State.ACTIVE;
if (active && live) {
if (lastNum > -1 && lastNum != num && failMessage == null) {
failMessage = shard + " is not consistent. Got " + lastNum + " from " + lastJetty.url + " (previous client)"
+ " and got " + num + " from " + cjetty.url;
if (!expectFailure || verbose) {
System.err.println("######" + failMessage);
SolrQuery query = new SolrQuery("*:*");
query.set("distrib", false);
query.set("fl","id,_version_");
query.set("rows","100000");
query.set("sort","id asc");
query.set("tests","checkShardConsistency/showDiff");
SolrDocumentList lst1 = lastJetty.client.solrClient.query(query).getResults();
SolrDocumentList lst2 = cjetty.client.solrClient.query(query).getResults();
CloudInspectUtil.showDiff(lst1, lst2, lastJetty.url, cjetty.url);
}
}
lastNum = num;
lastJetty = cjetty;
}
}
return failMessage;
}
public void showCounts() {
Set<String> theShards = shardToJetty.keySet();
for (String shard : theShards) {
List<CloudJettyRunner> solrJetties = shardToJetty.get(shard);
for (CloudJettyRunner cjetty : solrJetties) {
ZkNodeProps props = cjetty.info;
System.err.println("PROPS:" + props);
try {
SolrParams query = params("q", "*:*", "rows", "0", "distrib",
"false", "tests", "checkShardConsistency"); // "tests" is just a
// tag that won't do
// anything except be
// echoed in logs
long num = cjetty.client.solrClient.query(query).getResults()
.getNumFound();
System.err.println("DOCS:" + num);
} catch (SolrServerException | SolrException | IOException e) {
System.err.println("error contacting client: " + e.getMessage()
+ "\n");
continue;
}
boolean live = false;
String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
if (zkStateReader.isNodeLive(nodeName)) {
live = true;
}
System.err.println(" live:" + live);
}
}
}
protected void randomlyEnableAutoSoftCommit() {
if (r.nextBoolean()) {
enableAutoSoftCommit(1000);
} else {
log.info("Not turning on auto soft commit");
}
}
protected void enableAutoSoftCommit(int time) {
log.info("Turning on auto soft commit: {}", time);
for (List<CloudJettyRunner> jettyList : shardToJetty.values()) {
for (CloudJettyRunner jetty : jettyList) {
CoreContainer cores = jetty.jetty.getCoreContainer();
for (SolrCore core : cores.getCores()) {
((DirectUpdateHandler2) core.getUpdateHandler())
.getSoftCommitTracker().setTimeUpperBound(time);
}
}
}
}
/* Checks both shard replcia consistency and against the control shard.
* The test will be failed if differences are found.
*/
protected void checkShardConsistency() throws Exception {
checkShardConsistency(true, false);
}
/* Checks shard consistency and optionally checks against the control shard.
* The test will be failed if differences are found.
*/
protected void checkShardConsistency(boolean checkVsControl, boolean verbose)
throws Exception {
checkShardConsistency(checkVsControl, verbose, null, null);
}
/* Checks shard consistency and optionally checks against the control shard.
* The test will be failed if differences are found.
*/
protected void checkShardConsistency(boolean checkVsControl, boolean verbose, Set<String> addFails, Set<String> deleteFails)
throws Exception {
updateMappingsFromZk(jettys, coreClients, true);
Set<String> theShards = shardToJetty.keySet();
String failMessage = null;
for (String shard : theShards) {
String shardFailMessage = checkShardConsistency(shard, false, verbose);
if (shardFailMessage != null && failMessage == null) {
failMessage = shardFailMessage;
}
}
if (failMessage != null) {
fail(failMessage);
}
if (!checkVsControl) return;
SolrParams q = params("q","*:*","rows","0", "tests","checkShardConsistency(vsControl)"); // add a tag to aid in debugging via logs
SolrDocumentList controlDocList = controlClient.query(q).getResults();
long controlDocs = controlDocList.getNumFound();
SolrDocumentList cloudDocList = cloudClient.query(q).getResults();
long cloudClientDocs = cloudDocList.getNumFound();
// now check that the right # are on each shard
theShards = shardToJetty.keySet();
int cnt = 0;
for (String s : theShards) {
int times = shardToJetty.get(s).size();
for (int i = 0; i < times; i++) {
try {
CloudJettyRunner cjetty = shardToJetty.get(s).get(i);
ZkNodeProps props = cjetty.info;
SolrClient client = cjetty.client.solrClient;
boolean active = Replica.State.getState(props.getStr(ZkStateReader.STATE_PROP)) == Replica.State.ACTIVE;
if (active) {
SolrQuery query = new SolrQuery("*:*");
query.set("distrib", false);
long results = client.query(query).getResults().getNumFound();
if (verbose) System.err.println(props + " : " + results);
if (verbose) System.err.println("shard:"
+ props.getStr(ZkStateReader.SHARD_ID_PROP));
cnt += results;
break;
}
} catch (Exception e) {
ParWork.propagateInterrupt(e);
// if we have a problem, try the next one
if (i == times - 1) {
throw e;
}
}
}
}
if (controlDocs != cnt || cloudClientDocs != controlDocs) {
String msg = "document count mismatch. control=" + controlDocs + " sum(shards)="+ cnt + " cloudClient="+cloudClientDocs;
log.error(msg);
boolean shouldFail = CloudInspectUtil.compareResults(controlClient, cloudClient, addFails, deleteFails);
if (shouldFail) {
fail(msg);
}
}
}
protected SolrClient getClient(String nodeName) {
for (CloudJettyRunner cjetty : cloudJettys) {
CloudSolrServerClient client = cjetty.client;
if (client.shardName.equals(nodeName)) {
return client.solrClient;
}
}
return null;
}
protected void assertDocCounts(boolean verbose) throws Exception {
// TODO: as we create the clients, we should build a map from shard to
// node/client
// and node/client to shard?
if (verbose) System.err.println("control docs:"
+ controlClient.query(new SolrQuery("*:*")).getResults().getNumFound()
+ "\n\n");
long controlCount = controlClient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// do some really inefficient mapping...
Map<String,Slice> slices = null;
ClusterState clusterState;
try (ZkStateReader zk = new ZkStateReader(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT,
AbstractZkTestCase.TIMEOUT)) {
zk.createClusterStateWatchersAndUpdate();
clusterState = zk.getClusterState();
final DocCollection docCollection = clusterState.getCollectionOrNull(DEFAULT_COLLECTION);
slices = (docCollection != null) ? docCollection.getSlicesMap() : null;
}
if (slices == null) {
throw new RuntimeException("Could not find collection "
+ DEFAULT_COLLECTION + " in " + clusterState.getCollectionsMap().keySet());
}
for (CloudJettyRunner cjetty : cloudJettys) {
CloudSolrServerClient client = cjetty.client;
for (Map.Entry<String,Slice> slice : slices.entrySet()) {
Map<String,Replica> theShards = slice.getValue().getReplicasMap();
for (Map.Entry<String,Replica> shard : theShards.entrySet()) {
String shardName = new URI(
((HttpSolrClient) client.solrClient).getBaseURL()).getPort()
+ "_solr_";
if (verbose && shard.getKey().endsWith(shardName)) {
System.err.println("shard:" + slice.getKey());
System.err.println(shard.getValue());
}
}
}
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
long count = 0;
final Replica.State currentState = Replica.State.getState(cjetty.info.getStr(ZkStateReader.STATE_PROP));
if (currentState == Replica.State.ACTIVE
&& zkStateReader.isNodeLive(cjetty.info.getStr(ZkStateReader.NODE_NAME_PROP))) {
SolrQuery query = new SolrQuery("*:*");
query.set("distrib", false);
count = client.solrClient.query(query).getResults().getNumFound();
}
if (verbose) System.err.println("client docs:" + count + "\n\n");
}
if (verbose) System.err.println("control docs:"
+ controlClient.query(new SolrQuery("*:*")).getResults().getNumFound()
+ "\n\n");
SolrQuery query = new SolrQuery("*:*");
assertEquals("Doc Counts do not add up", controlCount,
cloudClient.query(query).getResults().getNumFound());
}
@Override
protected QueryResponse queryServer(ModifiableSolrParams params)
throws SolrServerException, IOException {
if (r.nextBoolean()) params.set("collection", DEFAULT_COLLECTION);
return cloudClient.query(params);
}
static abstract class StoppableThread extends Thread {
public StoppableThread(String name) {
super(name);
}
public abstract void safeStop();
}
public void waitForNoShardInconsistency() throws Exception {
log.info("Wait for no shard inconsistency");
int cnt = 0;
boolean retry = false;
do {
try {
commit();
} catch (Throwable t) {
log.error("", t);
// we don't care if this commit fails on some nodes
}
updateMappingsFromZk(jettys, clients);
Set<String> theShards = shardToJetty.keySet();
String failMessage = null;
for (String shard : theShards) {
try {
failMessage = checkShardConsistency(shard, true, true);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
// we might hit a node we just stopped
failMessage="hit exception:" + e.getMessage();
}
}
if (failMessage != null) {
log.info("shard inconsistency - waiting ...");
retry = true;
} else {
retry = false;
}
cnt++;
if (cnt > 40) break;
Thread.sleep(500);
} while (retry);
}
void doQuery(String expectedDocs, String... queryParams) throws Exception {
Set<String> expectedIds = new HashSet<>( StrUtils.splitSmart(expectedDocs, ",", true) );
QueryResponse rsp = cloudClient.query(params(queryParams));
Set<String> obtainedIds = new HashSet<>();
for (SolrDocument doc : rsp.getResults()) {
obtainedIds.add((String) doc.get("id"));
}
assertEquals(expectedIds, obtainedIds);
}
@Override
public void distribTearDown() throws Exception {
try {
if (VERBOSE || printLayoutOnTearDown) {
super.printLayout();
}
closeRestTestHarnesses(); // TODO: close here or later?
} finally {
super.distribTearDown();
System.clearProperty("zkHost");
System.clearProperty("numShards");
}
}
@Override
protected void destroyServers() throws Exception {
try (ParWork closer = new ParWork(this, false, false)) {
closer.collect(commonCloudSolrClient, coreClients, controlClientCloud, cloudClient);
}
coreClients.clear();
super.destroyServers();
}
@Override
protected void commit() throws Exception {
controlClient.commit();
cloudClient.commit();
}
protected CollectionAdminResponse createCollection(String collectionName, String configSetName, int numShards, int replicationFactor, int maxShardsPerNode) throws SolrServerException, IOException, InterruptedException, TimeoutException {
return createCollection(null, collectionName, configSetName, numShards, replicationFactor, maxShardsPerNode, null, null);
}
protected CollectionAdminResponse createCollection(Map<String,List<Integer>> collectionInfos, String collectionName, Map<String,Object> collectionProps, SolrClient client) throws SolrServerException, IOException, InterruptedException, TimeoutException{
return createCollection(collectionInfos, collectionName, collectionProps, client, "conf1");
}
// TODO: Use CollectionAdminRequest#createCollection() instead of a raw request
protected CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos, String collectionName, Map<String, Object> collectionProps, SolrClient client, String confSetName) throws SolrServerException, IOException, InterruptedException, TimeoutException{
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.CREATE.toString());
for (Map.Entry<String, Object> entry : collectionProps.entrySet()) {
if(entry.getValue() !=null) params.set(entry.getKey(), String.valueOf(entry.getValue()));
}
Integer numShards = (Integer) collectionProps.get(ZkStateReader.NUM_SHARDS_PROP);
if(numShards==null){
String shardNames = (String) collectionProps.get(OverseerCollectionMessageHandler.SHARDS_PROP);
numShards = StrUtils.splitSmart(shardNames,',').size();
}
Integer numNrtReplicas = (Integer) collectionProps.get(ZkStateReader.NRT_REPLICAS);
if (numNrtReplicas == null) {
numNrtReplicas = (Integer) collectionProps.get(ZkStateReader.REPLICATION_FACTOR);
}
if(numNrtReplicas == null){
numNrtReplicas = (Integer) OverseerCollectionMessageHandler.COLLECTION_PROPS_AND_DEFAULTS.get(ZkStateReader.REPLICATION_FACTOR);
}
if (numNrtReplicas == null) {
numNrtReplicas = Integer.valueOf(0);
}
Integer numTlogReplicas = (Integer) collectionProps.get(ZkStateReader.TLOG_REPLICAS);
if (numTlogReplicas == null) {
numTlogReplicas = Integer.valueOf(0);
}
Integer numPullReplicas = (Integer) collectionProps.get(ZkStateReader.PULL_REPLICAS);
if (numPullReplicas == null) {
numPullReplicas = Integer.valueOf(0);
}
if (confSetName != null) {
params.set("collection.configName", confSetName);
} else {
params.set("collection.configName", "_default");
}
int clientIndex = random().nextInt(2);
List<Integer> list = new ArrayList<>();
list.add(numShards);
list.add(numNrtReplicas + numTlogReplicas + numPullReplicas);
if (collectionInfos != null) {
collectionInfos.put(collectionName, list);
}
params.set("name", collectionName);
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
CollectionAdminResponse res = new CollectionAdminResponse();
if (client == null) {
final String baseUrl = getBaseUrl((Http2SolrClient) clients.get(clientIndex));
try (SolrClient adminClient = createNewSolrClient("", baseUrl)) {
res.setResponse(adminClient.request(request));
}
} else {
res.setResponse(client.request(request));
}
try {
cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(numShards,
numShards * (numNrtReplicas + numTlogReplicas + numPullReplicas)));
} catch (TimeoutException e) {
throw new RuntimeException("Timeout waiting for " + numShards + " shards and " + (numNrtReplicas + numTlogReplicas + numPullReplicas) + " replicas.", e);
}
return res;
}
protected CollectionAdminResponse createCollection(Map<String,List<Integer>> collectionInfos,
String collectionName, String configSetName, int numShards, int replicationFactor, int maxShardsPerNode, SolrClient client, String createNodeSetStr) throws SolrServerException, IOException, InterruptedException, TimeoutException {
int numNrtReplicas = useTlogReplicas()?0:replicationFactor;
int numTlogReplicas = useTlogReplicas()?replicationFactor:0;
return createCollection(collectionInfos, collectionName,
Utils.makeMap(
ZkStateReader.NUM_SHARDS_PROP, numShards,
ZkStateReader.NRT_REPLICAS, numNrtReplicas,
ZkStateReader.TLOG_REPLICAS, numTlogReplicas,
ZkStateReader.PULL_REPLICAS, getPullReplicaCount(),
ZkStateReader.CREATE_NODE_SET, createNodeSetStr,
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode),
client, configSetName);
}
protected CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos,
String collectionName, int numShards, int replicationFactor, int maxShardsPerNode, SolrClient client, String createNodeSetStr, String configName) throws SolrServerException, IOException, InterruptedException, TimeoutException {
int numNrtReplicas = useTlogReplicas()?0:replicationFactor;
int numTlogReplicas = useTlogReplicas()?replicationFactor:0;
return createCollection(collectionInfos, collectionName,
Utils.makeMap(
ZkStateReader.NUM_SHARDS_PROP, numShards,
ZkStateReader.NRT_REPLICAS, numNrtReplicas,
ZkStateReader.TLOG_REPLICAS, numTlogReplicas,
ZkStateReader.PULL_REPLICAS, getPullReplicaCount(),
ZkStateReader.CREATE_NODE_SET, createNodeSetStr,
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode),
client, configName);
}
@Override
protected SolrClient createNewSolrClient(int port) {
return createNewSolrClient(DEFAULT_COLLECTION, port);
}
protected SolrClient createNewSolrClient(int port, int connectionTimeoutMillis, int socketTimeoutMillis) {
return createNewSolrClient(DEFAULT_COLLECTION, port, connectionTimeoutMillis, socketTimeoutMillis);
}
protected SolrClient createNewSolrClient(String coreName, int port) {
try {
// setup the server...
String baseUrl = buildUrl(port);
String url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + coreName;
Http2SolrClient client = getHttpSolrClient(url, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT_MILLIS);
return client;
} catch (Exception ex) {
ParWork.propagateInterrupt(ex);
throw new RuntimeException(ex);
}
}
protected SolrClient createNewSolrClient(String coreName, int port, int connectionTimeoutMillis, int socketTimeoutMillis) {
try {
// setup the server...
String baseUrl = buildUrl(port);
String url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + coreName;
Http2SolrClient client = getHttpSolrClient(url, connectionTimeoutMillis, socketTimeoutMillis);
return client;
} catch (Exception ex) {
ParWork.propagateInterrupt(ex);
throw new RuntimeException(ex);
}
}
protected SolrClient createNewSolrClient(String collection, String baseUrl) {
try {
// setup the server...
Http2SolrClient client = getHttpSolrClient(baseUrl + "/" + collection, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT_MILLIS);
return client;
}
catch (Exception ex) {
ParWork.propagateInterrupt(ex);
throw new RuntimeException(ex);
}
}
protected String getBaseUrl(HttpSolrClient client) {
return client .getBaseURL().substring(
0, client.getBaseURL().length()
- DEFAULT_COLLECTION.length() - 1);
}
protected String getBaseUrl(Http2SolrClient client) {
return client .getBaseURL().substring(
0, client.getBaseURL().length()
- DEFAULT_COLLECTION.length() - 1);
}
public static SolrInputDocument getDoc(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
return doc;
}
private String checkCollectionExpectations(String collectionName, List<Integer> numShardsNumReplicaList, List<String> nodesAllowedToRunShards) {
ClusterState clusterState = getCommonCloudSolrClient().getZkStateReader().getClusterState();
int expectedSlices = numShardsNumReplicaList.get(0);
// The Math.min thing is here, because we expect replication-factor to be reduced to if there are not enough live nodes to spread all shards of a collection over different nodes
int expectedShardsPerSlice = numShardsNumReplicaList.get(1);
int expectedTotalShards = expectedSlices * expectedShardsPerSlice;
// Map<String,DocCollection> collections = clusterState
// .getCollectionStates();
if (clusterState.hasCollection(collectionName)) {
Map<String,Slice> slices = clusterState.getCollection(collectionName).getSlicesMap();
// did we find expectedSlices slices/shards?
if (slices.size() != expectedSlices) {
return "Found new collection " + collectionName + ", but mismatch on number of slices. Expected: " + expectedSlices + ", actual: " + slices.size();
}
int totalShards = 0;
for (String sliceName : slices.keySet()) {
for (Replica replica : slices.get(sliceName).getReplicas()) {
if (nodesAllowedToRunShards != null && !nodesAllowedToRunShards.contains(replica.getStr(ZkStateReader.NODE_NAME_PROP))) {
return "Shard " + replica.getName() + " created on node " + replica.getNodeName() + " not allowed to run shards for the created collection " + collectionName;
}
}
totalShards += slices.get(sliceName).getReplicas().size();
}
if (totalShards != expectedTotalShards) {
return "Found new collection " + collectionName + " with correct number of slices, but mismatch on number of shards. Expected: " + expectedTotalShards + ", actual: " + totalShards;
}
return null;
} else {
return "Could not find new collection " + collectionName;
}
}
protected void checkForCollection(String collectionName,
List<Integer> numShardsNumReplicaList,
List<String> nodesAllowedToRunShards) throws Exception {
// check for an expectedSlices new collection - we poll the state
final TimeOut timeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
boolean success = false;
String checkResult = "Didnt get to perform a single check";
while (! timeout.hasTimedOut()) {
checkResult = checkCollectionExpectations(collectionName,
numShardsNumReplicaList, nodesAllowedToRunShards);
if (checkResult == null) {
success = true;
break;
}
Thread.sleep(250);
}
if (!success) {
super.printLayout();
fail(checkResult);
}
}
private CloudHttp2SolrClient commonCloudSolrClient;
protected CloudHttp2SolrClient getCommonCloudSolrClient() {
synchronized (this) {
if (commonCloudSolrClient == null) {
commonCloudSolrClient = getCloudSolrClient(zkServer.getZkAddress(), random().nextBoolean());
commonCloudSolrClient.setDefaultCollection(DEFAULT_COLLECTION);
commonCloudSolrClient.connect();
if (log.isInfoEnabled()) {
log.info("Created commonCloudSolrClient with updatesToLeaders={} and parallelUpdates={}",
commonCloudSolrClient.isUpdatesToLeaders(), commonCloudSolrClient.isParallelUpdates());
}
}
}
return commonCloudSolrClient;
}
public static String getUrlFromZk(ZkStateReader zkStateReader, ClusterState clusterState, String collection) {
Map<String,Slice> slices = clusterState.getCollection(collection).getSlicesMap();
if (slices == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection:" + collection);
}
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
Slice slice = entry.getValue();
Map<String,Replica> shards = slice.getReplicasMap();
Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
for (Map.Entry<String,Replica> shardEntry : shardEntries) {
final Replica node = shardEntry.getValue();
if (zkStateReader.isNodeLive(node.getStr(ZkStateReader.NODE_NAME_PROP))) {
return node.getCoreUrl(); //new ZkCoreNodeProps(node).getCoreUrl();
}
}
}
throw new RuntimeException("Could not find a live node for collection:" + collection);
}
public static void waitForNon403or404or503(Http2SolrClient collectionClient)
throws Exception {
SolrException exp = null;
final TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (! timeout.hasTimedOut()) {
boolean missing = false;
try {
collectionClient.query(new SolrQuery("*:*"));
} catch (SolrException e) {
if (!(e.code() == 403 || e.code() == 503 || e.code() == 404)) {
throw e;
}
exp = e;
missing = true;
}
if (!missing) {
return;
}
Thread.sleep(250);
}
fail("Could not find the new collection - " + exp.code() + " : " + collectionClient.getBaseURL());
}
protected void createCollection(String collName,
CloudSolrClient client,
int replicationFactor ,
int numShards ) throws Exception {
int maxShardsPerNode = ((((numShards+1) * replicationFactor) / getCommonCloudSolrClient()
.getZkStateReader().getLiveNodes().size())) + 1;
int numNrtReplicas = useTlogReplicas()?0:replicationFactor;
int numTlogReplicas = useTlogReplicas()?replicationFactor:0;
Map<String, Object> props = makeMap(
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode,
ZkStateReader.NRT_REPLICAS, numNrtReplicas,
ZkStateReader.TLOG_REPLICAS, numTlogReplicas,
ZkStateReader.PULL_REPLICAS, getPullReplicaCount(),
ZkStateReader.NUM_SHARDS_PROP, numShards);
Map<String,List<Integer>> collectionInfos = new HashMap<>();
createCollection(collectionInfos, collName, props, client);
}
protected void createCollectionRetry(String testCollectionName, String configSetName, int numShards, int replicationFactor, int maxShardsPerNode)
throws SolrServerException, IOException, InterruptedException, TimeoutException {
CollectionAdminResponse resp = createCollection(testCollectionName, configSetName, numShards, replicationFactor, maxShardsPerNode);
if (resp.getResponse().get("failure") != null) {
CollectionAdminRequest.Delete req = CollectionAdminRequest.deleteCollection(testCollectionName);
req.process(cloudClient);
resp = createCollection(testCollectionName, configSetName, numShards, replicationFactor, maxShardsPerNode);
if (resp.getResponse().get("failure") != null) {
fail("Could not create " + testCollectionName);
}
}
}
protected Replica getShardLeader(String testCollectionName, String shardId, int timeoutms) throws Exception {
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId, timeoutms);
assertNotNull("Could not find active leader for " + shardId + " of " +
testCollectionName + " after "+timeoutms+" secs; clusterState: " +
printClusterStateInfo(testCollectionName), leader);
return leader;
}
protected String printClusterStateInfo() throws Exception {
return printClusterStateInfo(null);
}
protected String printClusterStateInfo(String collection) throws Exception {
String cs = null;
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
if (collection != null) {
cs = clusterState.getCollection(collection).toString();
} else {
Map<String,DocCollection> map = clusterState.getCollectionsMap();
CharArr out = new CharArr();
new JSONWriter(out, 2).write(map);
cs = out.toString();
}
return cs + "\nLive Nodes:" + cloudClient.getZkStateReader().getLiveNodes();
}
protected boolean reloadCollection(Replica replica, String testCollectionName) throws Exception {
String coreName = replica.getName();
boolean reloadedOk = false;
try (Http2SolrClient client = getHttpSolrClient(replica.getBaseUrl())) {
CoreAdminResponse statusResp = CoreAdminRequest.getStatus(coreName, client);
long leaderCoreStartTime = statusResp.getStartTime(coreName).getTime();
// send reload command for the collection
log.info("Sending RELOAD command for {}", testCollectionName);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.RELOAD.toString());
params.set("name", testCollectionName);
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
client.request(request);
// verify reload is done, waiting up to 30 seconds for slow test environments
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
while (System.nanoTime() < timeout) {
statusResp = CoreAdminRequest.getStatus(coreName, client);
long startTimeAfterReload = statusResp.getStartTime(coreName).getTime();
if (startTimeAfterReload > leaderCoreStartTime) {
reloadedOk = true;
break;
}
// else ... still waiting to see the reloaded core report a later start time
Thread.sleep(250);
}
}
return reloadedOk;
}
protected void logReplicaTypesReplicationInfo(String collectionName, ZkStateReader zkStateReader) throws KeeperException, InterruptedException, IOException {
log.info("## Collecting extra Replica.Type information of the cluster");
zkStateReader.updateLiveNodes();
StringBuilder builder = new StringBuilder();
DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
for(Slice s:collection.getSlices()) {
Replica leader = s.getLeader();
for (Replica r:s.getReplicas()) {
if (!r.isActive(zkStateReader.getLiveNodes())) {
builder.append(String.format(Locale.ROOT, "Replica %s not in liveNodes or is not active%s", r.getName(), System.lineSeparator()));
continue;
}
if (r.equals(leader)) {
builder.append(String.format(Locale.ROOT, "Replica %s is leader%s", r.getName(), System.lineSeparator()));
}
logReplicationDetails(r, builder);
}
}
log.info("Summary of the cluster: {}", builder);
}
protected void waitForReplicationFromReplicas(String collectionName, ZkStateReader zkStateReader, TimeOut timeout) throws KeeperException, InterruptedException, IOException, TimeoutException {
log.info("waitForReplicationFromReplicas: {}", collectionName);
DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
Map<String, CoreContainer> containers = new HashMap<>();
for (JettySolrRunner runner:jettys) {
if (!runner.isRunning()) {
continue;
}
containers.put(runner.getNodeName(), runner.getCoreContainer());
}
for(Slice s:collection.getSlices()) {
Replica leader = zkStateReader.getLeaderRetry(collectionName, s.getName(), (int)timeout.timeLeft(TimeUnit.MILLISECONDS));
long leaderIndexVersion = -1;
while (!timeout.hasTimedOut()) {
leaderIndexVersion = getIndexVersion(leader);
if (leaderIndexVersion >= 0) {
break;
}
Thread.sleep(250);
}
if (timeout.hasTimedOut()) {
fail("Unable to get leader indexVersion");
}
for (Replica pullReplica:s.getReplicas(EnumSet.of(Replica.Type.PULL,Replica.Type.TLOG))) {
if (!zkStateReader.isNodeLive(pullReplica.getNodeName())) {
continue;
}
while (true) {
long replicaIndexVersion = getIndexVersion(pullReplica);
if (leaderIndexVersion == replicaIndexVersion) {
if (log.isInfoEnabled()) {
log.info("Leader replica's version ({}) in sync with replica({}): {} == {}"
, leader.getName(), pullReplica.getName(), leaderIndexVersion, replicaIndexVersion);
}
// Make sure the host is serving the correct version
try (SolrCore core = containers.get(pullReplica.getNodeName()).getCore(pullReplica.getName())) {
RefCounted<SolrIndexSearcher> ref = core.getRegisteredSearcher();
try {
SolrIndexSearcher searcher = ref.get();
String servingVersion = searcher.getIndexReader().getIndexCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
if (Long.parseLong(servingVersion) == replicaIndexVersion) {
break;
} else {
if (log.isInfoEnabled()) {
log.info("Replica {} has the correct version replicated, but the searcher is not ready yet. Replicated version: {}, Serving version: {}"
, pullReplica.getName(), replicaIndexVersion, servingVersion);
}
}
} finally {
if (ref != null) ref.decref();
}
}
} else {
if (timeout.hasTimedOut()) {
logReplicaTypesReplicationInfo(collectionName, zkStateReader);
fail(String.format(Locale.ROOT, "Timed out waiting for replica %s (%d) to replicate from leader %s (%d)", pullReplica.getName(), replicaIndexVersion, leader.getName(), leaderIndexVersion));
}
if (leaderIndexVersion > replicaIndexVersion) {
if (log.isInfoEnabled()) {
log.info("{} version is {} and leader's is {}, will wait for replication"
, pullReplica.getName(), replicaIndexVersion, leaderIndexVersion);
}
} else {
if (log.isInfoEnabled()) {
log.info("Leader replica's version ({}) is lower than pull replica({}): {} < {}"
, leader.getName(), pullReplica.getName(), leaderIndexVersion, replicaIndexVersion);
}
}
}
Thread.sleep(250);
}
}
}
}
protected void waitForAllWarmingSearchers() throws InterruptedException {
log.info("waitForAllWarmingSearchers");
for (JettySolrRunner jetty:jettys) {
if (!jetty.isRunning()) {
continue;
}
for (SolrCore core:jetty.getCoreContainer().getCores()) {
waitForWarming(core);
}
}
}
protected long getIndexVersion(Replica replica) throws IOException {
try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl()).build()) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", "/replication");
params.set(ReplicationHandler.COMMAND, ReplicationHandler.CMD_SHOW_COMMITS);
try {
QueryResponse response = client.query(params);
@SuppressWarnings("unchecked")
List<NamedList<Object>> commits = (List<NamedList<Object>>)response.getResponse().get(ReplicationHandler.CMD_SHOW_COMMITS);
Collections.max(commits, (a,b)->((Long)a.get("indexVersion")).compareTo((Long)b.get("indexVersion")));
return (long) Collections.max(commits, (a,b)->((Long)a.get("indexVersion")).compareTo((Long)b.get("indexVersion"))).get("indexVersion");
} catch (SolrServerException e) {
log.warn("Exception getting version from {}, will return an invalid version to retry.", replica.getName(), e);
return -1;
}
}
}
/**
* Logs a WARN if collection can't be deleted, but does not fail or throw an exception
* @return true if success, else false
*/
protected static boolean attemptCollectionDelete(CloudHttp2SolrClient client, String collectionName) {
// try to clean up
try {
CollectionAdminRequest.deleteCollection(collectionName).process(client);
return true;
} catch (Exception e) {
ParWork.propagateInterrupt(e);
// don't fail the test
log.warn("Could not delete collection {} - ignoring", collectionName);
}
return false;
}
protected void logReplicationDetails(Replica replica, StringBuilder builder) throws IOException {
try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl()).build()) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", "/replication");
params.set(ReplicationHandler.COMMAND, ReplicationHandler.CMD_DETAILS);
try {
QueryResponse response = client.query(params);
builder.append(String.format(Locale.ROOT, "%s: %s%s", replica.getName(), response.getResponse(), System.lineSeparator()));
} catch (SolrServerException e) {
log.warn("Unable to ger replication details for replica {}", replica.getName(), e);
}
}
}
public static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client)
throws IOException, SolrServerException {
RequestStatusState state = null;
final TimeOut timeout = new TimeOut(waitForSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeout.hasTimedOut()) {
state = getRequestState(requestId, client);
if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED) {
return state;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted whie waiting for request completion. Last state seen: " + state, e);
}
}
return state;
}
static RequestStatusState getRequestState(int requestId, SolrClient client) throws IOException, SolrServerException {
return getRequestState(String.valueOf(requestId), client);
}
static RequestStatusState getRequestState(String requestId, SolrClient client) throws IOException, SolrServerException {
CollectionAdminResponse response = getStatusResponse(requestId, client);
NamedList innerResponse = (NamedList) response.getResponse().get("status");
return RequestStatusState.fromKey((String) innerResponse.get("state"));
}
static CollectionAdminResponse getStatusResponse(String requestId, SolrClient client) throws SolrServerException, IOException {
return CollectionAdminRequest.requestStatus(requestId).process(client);
}
protected void setupRestTestHarnesses() {
for (final SolrClient client : clients) {
RestTestHarness harness = new RestTestHarness(() -> ((Http2SolrClient) client).getBaseURL(),
(Http2SolrClient) client, jettys.get(0).getCoreContainer().getCores().iterator().next().getSolrConfig().getResourceLoader());
restTestHarnesses.add(harness);
}
}
protected void closeRestTestHarnesses() throws IOException {
restTestHarnesses.forEach(restTestHarness -> IOUtils.closeQuietly(restTestHarness));
restTestHarnesses.clear();
}
protected RestTestHarness randomRestTestHarness() {
List<RestTestHarness> harnesses = new ArrayList<>(restTestHarnesses);
return harnesses.get(random().nextInt(restTestHarnesses.size()));
}
protected void forAllRestTestHarnesses(Consumer<RestTestHarness> op) {
restTestHarnesses.forEach(op);
}
}