blob: 1530ca7b61a85d2c75deea1c86c0ea3b11f601e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.cdcr;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.CdcrParams;
import org.apache.solr.util.TimeOut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CdcrTestsUtil extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected static void cdcrRestart(CloudSolrClient client) throws SolrServerException, IOException {
cdcrStop(client);
cdcrStart(client);
}
protected static void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
}
protected static void cdcrStop(CloudSolrClient client) throws SolrServerException, IOException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.STOP);
assertEquals("stopped", ((NamedList) response.getResponse().get("status")).get("process"));
}
protected static void cdcrEnableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.ENABLEBUFFER);
assertEquals("enabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
}
protected static void cdcrDisableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.DISABLEBUFFER);
assertEquals("disabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
}
protected static QueryResponse invokeCdcrAction(CloudSolrClient client, CdcrParams.CdcrAction action) throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/cdcr");
params.set(CommonParams.ACTION, action.toLower());
return client.query(params);
}
protected static QueryResponse getCdcrQueue(CloudSolrClient client) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/cdcr");
params.set(CommonParams.ACTION, CdcrParams.QUEUES);
return client.query(params);
}
protected static Object getFingerPrintMaxVersion(CloudSolrClient client, String shardNames, int numDocs) throws SolrServerException, IOException, InterruptedException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/get");
params.set("fingerprint", true);
params.set("shards", shardNames);
params.set("getVersions", numDocs);
QueryResponse response = null;
long start = System.nanoTime();
while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(20, TimeUnit.SECONDS)) {
response = client.query(params);
if (response.getResponse() != null && response.getResponse().get("fingerprint") != null) {
return (long) ((LinkedHashMap) response.getResponse().get("fingerprint")).get("maxVersionEncountered");
}
Thread.sleep(200);
}
log.error("maxVersionEncountered not found for client : {} in 20 attempts", client);
return null;
}
protected static long waitForClusterToSync(long numDocs, CloudSolrClient clusterSolrClient) throws Exception {
return waitForClusterToSync((int) numDocs, clusterSolrClient, "*:*");
}
protected static long waitForClusterToSync(int numDocs, CloudSolrClient clusterSolrClient) throws Exception {
return waitForClusterToSync(numDocs, clusterSolrClient, "*:*");
}
protected static long waitForClusterToSync(int numDocs, CloudSolrClient clusterSolrClient, String query) throws Exception {
long start = System.nanoTime();
QueryResponse response = null;
while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS)) {
clusterSolrClient.commit();
response = clusterSolrClient.query(new SolrQuery(query));
if (response.getResults().getNumFound() == numDocs) {
break;
}
Thread.sleep(1000);
}
return response != null ? response.getResults().getNumFound() : null;
}
protected static boolean assertShardInSync(String collection, String shard, CloudSolrClient client) throws IOException, SolrServerException {
TimeOut waitTimeOut = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
DocCollection docCollection = client.getZkStateReader().getClusterState().getCollection(collection);
Slice correctSlice = null;
for (Slice slice : docCollection.getSlices()) {
if (shard.equals(slice.getName())) {
correctSlice = slice;
break;
}
}
assertNotNull(correctSlice);
long leaderDocCount;
try (HttpSolrClient leaderClient = new HttpSolrClient.Builder(correctSlice.getLeader().getCoreUrl()).withHttpClient(client.getHttpClient()).build()) {
leaderDocCount = leaderClient.query(new SolrQuery("*:*").setParam("distrib", "false")).getResults().getNumFound();
}
while (!waitTimeOut.hasTimedOut()) {
int replicasInSync = 0;
for (Replica replica : correctSlice.getReplicas()) {
try (HttpSolrClient leaderClient = new HttpSolrClient.Builder(replica.getCoreUrl()).withHttpClient(client.getHttpClient()).build()) {
long replicaDocCount = leaderClient.query(new SolrQuery("*:*").setParam("distrib", "false")).getResults().getNumFound();
if (replicaDocCount == leaderDocCount) replicasInSync++;
}
}
if (replicasInSync == correctSlice.getReplicas().size()) {
return true;
}
}
return false;
}
public static void indexRandomDocs(Integer start, Integer count, CloudSolrClient solrClient) throws Exception {
// ADD operation on cluster 1
int docs = 0;
if (count == 0) {
docs = (TEST_NIGHTLY ? 100 : 10);
} else {
docs = count;
}
for (int k = start; k < docs; k++) {
UpdateRequest req = new UpdateRequest();
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", k);
req.add(doc);
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
req.process(solrClient);
}
}
public static void indexRandomDocs(Integer count, CloudSolrClient solrClient) throws Exception {
indexRandomDocs(0, count, solrClient);
}
public static void index(MiniSolrCloudCluster cluster, String collection, SolrInputDocument doc, boolean doCommit) throws IOException, SolrServerException {
CloudSolrClient client = createCloudClient(cluster, collection);
try {
client.add(doc);
if (doCommit) {
client.commit(true, true);
} else {
client.commit(true, false);
}
} finally {
client.close();
}
}
public static void index(MiniSolrCloudCluster cluster, String collection, SolrInputDocument doc) throws IOException, SolrServerException {
index(cluster, collection, doc, false);
}
public static CloudSolrClient createCloudClient(MiniSolrCloudCluster cluster, String defaultCollection) {
CloudSolrClient server = getCloudSolrClient(cluster.getZkServer().getZkAddress(), random().nextBoolean());
if (defaultCollection != null) server.setDefaultCollection(defaultCollection);
return server;
}
public static void restartClusterNode(MiniSolrCloudCluster cluster, String collection, int index) throws Exception {
System.setProperty("collection", collection);
restartNode(cluster.getJettySolrRunner(index));
System.clearProperty("collection");
}
public static void restartClusterNodes(MiniSolrCloudCluster cluster, String collection) throws Exception {
System.setProperty("collection", collection);
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
restartNode(jetty);
}
System.clearProperty("collection");
}
public static void restartNode(JettySolrRunner jetty) throws Exception {
jetty.stop();
jetty.start();
Thread.sleep(10000);
}
public static int numberOfFiles(String dir) {
File file = new File(dir);
if (!file.isDirectory()) {
assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
}
if (log.isDebugEnabled()) {
log.debug("Update log dir {} contains: {}", dir, file.listFiles());
}
return file.listFiles().length;
}
public static int getNumberOfTlogFilesOnReplicas(MiniSolrCloudCluster cluster) throws Exception {
int count = 0;
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
for (SolrCore core : jetty.getCoreContainer().getCores()) {
count += numberOfFiles(core.getUlogDir() + "/tlog");
}
}
return count;
}
public static String getNonLeaderNode(MiniSolrCloudCluster cluster, String collection) throws Exception {
String leaderNode = getLeaderNode(cluster, collection);
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
if (!jetty.getNodeName().equals(leaderNode)) {
return jetty.getNodeName();
}
}
return cluster.getJettySolrRunners().get(0).getNodeName();
}
public static String getLeaderNode(MiniSolrCloudCluster cluster, String collection) throws Exception {
for (Replica replica : cluster.getSolrClient().getClusterStateProvider().getCollection(collection).getReplicas()) {
if (cluster.getSolrClient().getClusterStateProvider().getCollection(collection).getLeader("shard1") == replica) {
return replica.getNodeName();
}
}
return "";
}
}