blob: 4773982c5f3dc429bcd7f39460c1d933c0ff0b01 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.cdcr;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.CdcrParams;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.common.cloud.ZkStateReader.CLUSTER_PROPS;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
/**
* <p>
* Abstract class for CDCR unit testing. This class emulates two clusters, a source and target, by using different
* collections in the same SolrCloud cluster. Therefore, the two clusters will share the same Zookeeper cluster. In
* real scenario, the two collections/clusters will likely have their own zookeeper cluster.
* </p>
* <p>
* This class will automatically create two collections, the source and the target. Each collection will have
* {@link #shardCount} shards, and {@link #replicationFactor} replicas per shard. One jetty instance will
* be created per core.
* </p>
* <p>
* The source and target collection can be reinitialised at will by calling {@link #clearSourceCollection()} and
* {@link #clearTargetCollection()}. After reinitialisation, a collection will have a new fresh index and update log.
* </p>
* <p>
* Servers can be restarted at will by calling
* {@link #restartServer(BaseCdcrDistributedZkTest.CloudJettyRunner)} or
* {@link #restartServers(java.util.List)}.
* </p>
* <p>
* The creation of the target collection can be disabled with the flag {@link #createTargetCollection};
* </p>
* <p>
* NB: We cannot use multiple cores per jetty instance, as jetty will load only one core when restarting. It seems
* that this is a limitation of the {@link org.apache.solr.client.solrj.embedded.JettySolrRunner}. This class
* tries to ensure that there always is one single core per jetty instance.
* </p>
*/
public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected int shardCount = 2;
protected int replicationFactor = 2;
protected boolean createTargetCollection = true;
private static final String CDCR_PATH = "/cdcr";
protected static final String SOURCE_COLLECTION = "source_collection";
protected static final String TARGET_COLLECTION = "target_collection";
public static final String SHARD1 = "shard1";
public static final String SHARD2 = "shard2";
@Override
protected String getCloudSolrConfig() {
return "solrconfig-cdcr.xml";
}
@Override
public void distribSetUp() throws Exception {
super.distribSetUp();
if (shardCount > 0) {
System.setProperty("numShards", Integer.toString(shardCount));
} else {
System.clearProperty("numShards");
}
if (isSSLMode()) {
System.clearProperty("urlScheme");
ZkStateReader zkStateReader = new ZkStateReader(zkServer.getZkAddress(),
AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT);
try {
zkStateReader.getZkClient().create(ZkStateReader.CLUSTER_PROPS,
Utils.toJSON(Collections.singletonMap("urlScheme", "https")),
CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException e) {
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(ZkStateReader.CLUSTER_PROPS,
null, null, true));
props = props.plus("urlScheme", "https");
zkStateReader.getZkClient().setData(CLUSTER_PROPS, Utils.toJSON(props), true);
} finally {
zkStateReader.close();
}
}
}
@Override
protected void createServers(int numServers) throws Exception {
}
@BeforeClass
public static void beforeClass() {
System.setProperty("solrcloud.update.delay", "0");
}
@AfterClass
public static void afterClass() throws Exception {
System.clearProperty("solrcloud.update.delay");
}
@Before
@SuppressWarnings({"rawtypes"})
public void baseBefore() throws Exception {
this.createSourceCollection();
if (this.createTargetCollection) this.createTargetCollection();
RandVal.uniqueValues = new HashSet(); //reset random values
}
@After
public void baseAfter() throws Exception {
for (List<CloudJettyRunner> runners : cloudJettys.values()) {
for (CloudJettyRunner runner : runners) {
runner.client.close();
}
}
destroyServers();
}
protected CloudSolrClient createCloudClient(String defaultCollection) {
CloudSolrClient server = getCloudSolrClient(zkServer.getZkAddress(), random().nextBoolean());
if (defaultCollection != null) server.setDefaultCollection(defaultCollection);
return server;
}
protected SolrInputDocument getDoc(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
return doc;
}
protected void index(String collection, SolrInputDocument doc) throws IOException, SolrServerException {
CloudSolrClient client = createCloudClient(collection);
try {
client.add(doc);
client.commit(true, true);
} finally {
client.close();
}
}
protected void index(String collection, List<SolrInputDocument> docs) throws IOException, SolrServerException {
CloudSolrClient client = createCloudClient(collection);
try {
client.add(docs);
client.commit(true, true);
} finally {
client.close();
}
}
protected void deleteById(String collection, List<String> ids) throws IOException, SolrServerException {
CloudSolrClient client = createCloudClient(collection);
try {
client.deleteById(ids);
client.commit(true, true);
} finally {
client.close();
}
}
protected void deleteByQuery(String collection, String q) throws IOException, SolrServerException {
CloudSolrClient client = createCloudClient(collection);
try {
client.deleteByQuery(q);
client.commit(true, true);
} finally {
client.close();
}
}
/**
* Invokes a commit on the given collection.
*/
protected void commit(String collection) throws IOException, SolrServerException {
CloudSolrClient client = createCloudClient(collection);
try {
client.commit(true, true);
} finally {
client.close();
}
}
/**
* Assert the number of documents in a given collection
*/
protected void assertNumDocs(int expectedNumDocs, String collection)
throws SolrServerException, IOException, InterruptedException {
CloudSolrClient client = createCloudClient(collection);
try {
int cnt = 30; // timeout after 15 seconds
AssertionError lastAssertionError = null;
while (cnt > 0) {
try {
assertEquals(expectedNumDocs, client.query(new SolrQuery("*:*")).getResults().getNumFound());
return;
}
catch (AssertionError e) {
lastAssertionError = e;
cnt--;
Thread.sleep(500);
}
}
throw new AssertionError("Timeout while trying to assert number of documents @ " + collection, lastAssertionError);
} finally {
client.close();
}
}
/**
* Invokes a CDCR action on a given node.
*/
@SuppressWarnings({"rawtypes"})
protected NamedList invokeCdcrAction(CloudJettyRunner jetty, CdcrParams.CdcrAction action) throws Exception {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.ACTION, action.toString());
SolrRequest request = new QueryRequest(params);
request.setPath(CDCR_PATH);
return jetty.client.request(request);
}
protected void waitForCdcrStateReplication(String collection) throws Exception {
log.info("Wait for CDCR state to replicate - collection: {}", collection);
int cnt = 30;
while (cnt > 0) {
@SuppressWarnings({"rawtypes"})
NamedList status = null;
boolean allEquals = true;
for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas
@SuppressWarnings({"rawtypes"})
NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS);
if (status == null) {
status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
continue;
}
allEquals &= status.equals(rsp.get(CdcrParams.CdcrAction.STATUS.toLower()));
}
if (allEquals) {
break;
}
else {
if (cnt == 0) {
throw new RuntimeException("Timeout waiting for CDCR state to replicate: collection="+collection);
}
cnt--;
Thread.sleep(500);
}
}
log.info("CDCR state is identical across nodes - collection: {}", collection);
}
/**
* Assert the state of CDCR on each nodes of the given collection.
*/
protected void assertState(String collection, CdcrParams.ProcessState processState, CdcrParams.BufferState bufferState)
throws Exception {
this.waitForCdcrStateReplication(collection); // ensure that cdcr state is replicated and stable
for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas
@SuppressWarnings({"rawtypes"})
NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS);
@SuppressWarnings({"rawtypes"})
NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
assertEquals(processState.toLower(), status.get(CdcrParams.ProcessState.getParam()));
assertEquals(bufferState.toLower(), status.get(CdcrParams.BufferState.getParam()));
}
}
/**
* A mapping between collection and node names. This is used when creating the collection in
* {@link #createCollection(String)}.
*/
private Map<String, List<String>> collectionToNodeNames = new HashMap<>();
/**
* Starts the servers, saves and associates the node names to the source collection,
* and finally creates the source collection.
*/
private void createSourceCollection() throws Exception {
List<String> nodeNames = this.startServers(shardCount * replicationFactor);
this.collectionToNodeNames.put(SOURCE_COLLECTION, nodeNames);
this.createCollection(SOURCE_COLLECTION);
this.waitForRecoveriesToFinish(SOURCE_COLLECTION, true);
this.updateMappingsFromZk(SOURCE_COLLECTION);
}
/**
* Clear the source collection. It will delete then create the collection through the collection API.
* The collection will have a new fresh index, i.e., including a new update log.
*/
protected void clearSourceCollection() throws Exception {
this.deleteCollection(SOURCE_COLLECTION);
this.waitForCollectionToDisappear(SOURCE_COLLECTION);
this.createCollection(SOURCE_COLLECTION);
this.waitForRecoveriesToFinish(SOURCE_COLLECTION, true);
this.updateMappingsFromZk(SOURCE_COLLECTION);
}
/**
* Starts the servers, saves and associates the node names to the target collection,
* and finally creates the target collection.
*/
private void createTargetCollection() throws Exception {
List<String> nodeNames = this.startServers(shardCount * replicationFactor);
this.collectionToNodeNames.put(TARGET_COLLECTION, nodeNames);
this.createCollection(TARGET_COLLECTION);
this.waitForRecoveriesToFinish(TARGET_COLLECTION, true);
this.updateMappingsFromZk(TARGET_COLLECTION);
}
/**
* Clear the source collection. It will delete then create the collection through the collection API.
* The collection will have a new fresh index, i.e., including a new update log.
*/
protected void clearTargetCollection() throws Exception {
this.deleteCollection(TARGET_COLLECTION);
this.waitForCollectionToDisappear(TARGET_COLLECTION);
this.createCollection(TARGET_COLLECTION);
this.waitForRecoveriesToFinish(TARGET_COLLECTION, true);
this.updateMappingsFromZk(TARGET_COLLECTION);
}
/**
* Create a new collection through the Collection API. It enforces the use of one max shard per node.
* It will define the nodes to spread the new collection across by using the mapping {@link #collectionToNodeNames},
* to ensure that a node will not host more than one core (which will create problem when trying to restart servers).
*/
private void createCollection(String name) throws Exception {
CloudSolrClient client = createCloudClient(null);
try {
// Create the target collection
Map<String, List<Integer>> collectionInfos = new HashMap<>();
int maxShardsPerNode = 1;
StringBuilder sb = new StringBuilder();
for (String nodeName : collectionToNodeNames.get(name)) {
sb.append(nodeName);
sb.append(',');
}
sb.deleteCharAt(sb.length() - 1);
createCollection(collectionInfos, name, shardCount, replicationFactor, maxShardsPerNode, client, sb.toString());
} finally {
client.close();
}
}
private CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos,
String collectionName, int numShards, int replicationFactor,
int maxShardsPerNode, SolrClient client, String createNodeSetStr)
throws SolrServerException, IOException {
return createCollection(collectionInfos, collectionName,
Utils.makeMap(
NUM_SLICES, numShards,
REPLICATION_FACTOR, replicationFactor,
CREATE_NODE_SET, createNodeSetStr,
MAX_SHARDS_PER_NODE, maxShardsPerNode),
client, "conf1");
}
private CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos, String collectionName,
Map<String, Object> collectionProps, SolrClient client,
String confSetName)
throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.CREATE.toString());
for (Map.Entry<String, Object> entry : collectionProps.entrySet()) {
if (entry.getValue() != null) params.set(entry.getKey(), String.valueOf(entry.getValue()));
}
Integer numShards = (Integer) collectionProps.get(OverseerCollectionMessageHandler.NUM_SLICES);
if (numShards == null) {
String shardNames = (String) collectionProps.get(OverseerCollectionMessageHandler.SHARDS_PROP);
numShards = StrUtils.splitSmart(shardNames, ',').size();
}
Integer replicationFactor = (Integer) collectionProps.get(REPLICATION_FACTOR);
if (replicationFactor == null) {
replicationFactor = (Integer) OverseerCollectionMessageHandler.COLLECTION_PROPS_AND_DEFAULTS.get(REPLICATION_FACTOR);
}
if (confSetName != null) {
params.set("collection.configName", confSetName);
}
List<Integer> list = new ArrayList<>();
list.add(numShards);
list.add(replicationFactor);
if (collectionInfos != null) {
collectionInfos.put(collectionName, list);
}
params.set("name", collectionName);
@SuppressWarnings({"rawtypes"})
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
CollectionAdminResponse res = new CollectionAdminResponse();
res.setResponse(client.request(request));
return res;
}
/**
* Delete a collection through the Collection API.
*/
protected CollectionAdminResponse deleteCollection(String collectionName) throws Exception {
SolrClient client = createCloudClient(null);
CollectionAdminResponse res;
try {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.DELETE.toString());
params.set("name", collectionName);
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
res = new CollectionAdminResponse();
res.setResponse(client.request(request));
} catch (Exception e) {
log.warn("Error while deleting the collection {}", collectionName, e);
return new CollectionAdminResponse();
} finally {
client.close();
}
return res;
}
private void waitForCollectionToDisappear(String collection) throws Exception {
CloudSolrClient client = this.createCloudClient(null);
try {
client.connect();
ZkStateReader zkStateReader = client.getZkStateReader();
AbstractDistribZkTestBase.waitForCollectionToDisappear(collection, zkStateReader, false, true, 15);
} finally {
client.close();
}
}
private void waitForRecoveriesToFinish(String collection, boolean verbose) throws Exception {
CloudSolrClient client = this.createCloudClient(null);
try {
client.connect();
ZkStateReader zkStateReader = client.getZkStateReader();
super.waitForRecoveriesToFinish(collection, zkStateReader, verbose);
} finally {
client.close();
}
}
/**
* Asserts that the collection has the correct number of shards and replicas
*/
protected void assertCollectionExpectations(String collectionName) throws Exception {
CloudSolrClient client = this.createCloudClient(null);
try {
client.connect();
ClusterState clusterState = client.getZkStateReader().getClusterState();
assertTrue("Could not find new collection " + collectionName, clusterState.hasCollection(collectionName));
Map<String, Slice> shards = clusterState.getCollection(collectionName).getSlicesMap();
// did we find expectedSlices shards/shards?
assertEquals("Found new collection " + collectionName + ", but mismatch on number of shards.", shardCount, shards.size());
int totalShards = 0;
for (String shardName : shards.keySet()) {
totalShards += shards.get(shardName).getReplicas().size();
}
int expectedTotalShards = shardCount * replicationFactor;
assertEquals("Found new collection " + collectionName + " with correct number of shards, but mismatch on number " +
"of shards.", expectedTotalShards, totalShards);
} finally {
client.close();
}
}
/**
* Restart a server.
*/
protected void restartServer(CloudJettyRunner server) throws Exception {
// it seems we need to set the collection property to have the jetty properly restarted
System.setProperty("collection", server.collection);
JettySolrRunner jetty = server.jetty;
jetty.stop();
jetty.start();
System.clearProperty("collection");
waitForRecoveriesToFinish(server.collection, true);
updateMappingsFromZk(server.collection); // must update the mapping as the core node name might have changed
}
/**
* Restarts a list of servers.
*/
protected void restartServers(List<CloudJettyRunner> servers) throws Exception {
for (CloudJettyRunner server : servers) {
this.restartServer(server);
}
}
private List<JettySolrRunner> jettys = new ArrayList<>();
/**
* Creates and starts a given number of servers.
*/
protected List<String> startServers(int nServer) throws Exception {
String temporaryCollection = "tmp_collection";
for (int i = 1; i <= nServer; i++) {
// give everyone there own solrhome
File jettyDir = createTempDir("jetty").toFile();
jettyDir.mkdirs();
setupJettySolrHome(jettyDir);
JettySolrRunner jetty = createJetty(jettyDir, null, "shard" + i);
jetty.start();
jettys.add(jetty);
}
try (SolrClient client = createCloudClient(temporaryCollection)) {
assertEquals(0, CollectionAdminRequest
.createCollection(temporaryCollection, "conf1", shardCount, 1)
.setCreateNodeSet("")
.process(client).getStatus());
for (int i = 0; i < jettys.size(); i++) {
assertTrue(CollectionAdminRequest
.addReplicaToShard(temporaryCollection, "shard"+((i % shardCount) + 1))
.setNode(jettys.get(i).getNodeName())
.process(client).isSuccess());
}
}
ZkStateReader zkStateReader = jettys.get(0).getCoreContainer().getZkController().getZkStateReader();
// now wait till we see the leader for each shard
for (int i = 1; i <= shardCount; i++) {
zkStateReader.getLeaderRetry(temporaryCollection, "shard" + i, 15000);
}
// store the node names
List<String> nodeNames = new ArrayList<>();
for (Slice shard : zkStateReader.getClusterState().getCollection(temporaryCollection).getSlices()) {
for (Replica replica : shard.getReplicas()) {
nodeNames.add(replica.getNodeName());
}
}
this.waitForRecoveriesToFinish(temporaryCollection,zkStateReader, true);
// delete the temporary collection - we will create our own collections later
this.deleteCollection(temporaryCollection);
this.waitForCollectionToDisappear(temporaryCollection);
System.clearProperty("collection");
return nodeNames;
}
@Override
protected void destroyServers() throws Exception {
for (JettySolrRunner runner : jettys) {
try {
runner.stop();
} catch (Exception e) {
log.error("", e);
}
}
jettys.clear();
}
/**
* Mapping from collection to jettys
*/
protected Map<String, List<CloudJettyRunner>> cloudJettys = new HashMap<>();
/**
* Mapping from collection/shard to jettys
*/
protected Map<String, Map<String, List<CloudJettyRunner>>> shardToJetty = new HashMap<>();
/**
* Mapping from collection/shard leader to jettys
*/
protected Map<String, Map<String, CloudJettyRunner>> shardToLeaderJetty = new HashMap<>();
/**
* Updates the mappings between the jetty's instances and the zookeeper cluster state.
*/
protected void updateMappingsFromZk(String collection) throws Exception {
List<CloudJettyRunner> cloudJettys = new ArrayList<>();
Map<String, List<CloudJettyRunner>> shardToJetty = new HashMap<>();
Map<String, CloudJettyRunner> shardToLeaderJetty = new HashMap<>();
CloudSolrClient cloudClient = this.createCloudClient(null);
try {
cloudClient.connect();
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection(collection);
for (JettySolrRunner jetty : jettys) {
int port = jetty.getLocalPort();
if (port == -1) {
throw new RuntimeException("Cannot find the port for jetty");
}
nextJetty:
for (Slice shard : coll.getSlices()) {
Set<Map.Entry<String, Replica>> entries = shard.getReplicasMap().entrySet();
for (Map.Entry<String, Replica> entry : entries) {
Replica replica = entry.getValue();
if (replica.getStr(ZkStateReader.BASE_URL_PROP).contains(":" + port)) {
if (!shardToJetty.containsKey(shard.getName())) {
shardToJetty.put(shard.getName(), new ArrayList<CloudJettyRunner>());
}
boolean isLeader = shard.getLeader() == replica;
CloudJettyRunner cjr = new CloudJettyRunner(jetty, replica, collection, shard.getName(), entry.getKey());
shardToJetty.get(shard.getName()).add(cjr);
if (isLeader) {
shardToLeaderJetty.put(shard.getName(), cjr);
}
cloudJettys.add(cjr);
break nextJetty;
}
}
}
}
List<CloudJettyRunner> oldRunners = this.cloudJettys.putIfAbsent(collection, cloudJettys);
if (oldRunners != null) {
// must close resources for the old entries
for (CloudJettyRunner oldRunner : oldRunners) {
IOUtils.closeQuietly(oldRunner.client);
}
}
this.cloudJettys.put(collection, cloudJettys);
this.shardToJetty.put(collection, shardToJetty);
this.shardToLeaderJetty.put(collection, shardToLeaderJetty);
} finally {
cloudClient.close();
}
}
/**
* Wrapper around a {@link org.apache.solr.client.solrj.embedded.JettySolrRunner} to map the jetty
* instance to various information of the cloud cluster, such as the collection and shard
* that is served by the jetty instance, the node name, core node name, url, etc.
*/
public static class CloudJettyRunner {
public JettySolrRunner jetty;
public String nodeName;
public String coreNodeName;
public String url;
public SolrClient client;
public Replica info;
public String shard;
public String collection;
public CloudJettyRunner(JettySolrRunner jetty, Replica replica,
String collection, String shard, String coreNodeName) {
this.jetty = jetty;
this.info = replica;
this.collection = collection;
Properties nodeProperties = jetty.getNodeProperties();
// we need to update the jetty's shard so that it registers itself to the right shard when restarted
this.shard = shard;
nodeProperties.setProperty(CoreDescriptor.CORE_SHARD, this.shard);
// we need to update the jetty's shard so that it registers itself under the right core name when restarted
this.coreNodeName = coreNodeName;
nodeProperties.setProperty(CoreDescriptor.CORE_NODE_NAME, this.coreNodeName);
this.nodeName = replica.getNodeName();
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(info);
this.url = coreNodeProps.getCoreUrl();
// strip the trailing slash as this can cause issues when executing requests
this.client = createNewSolrServer(this.url.substring(0, this.url.length() - 1));
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((url == null) ? 0 : url.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
CloudJettyRunner other = (CloudJettyRunner) obj;
if (url == null) {
if (other.url != null) return false;
} else if (!url.equals(other.url)) return false;
return true;
}
@Override
public String toString() {
return "CloudJettyRunner [url=" + url + "]";
}
}
protected static SolrClient createNewSolrServer(String baseUrl) {
try {
// setup the server...
HttpSolrClient s = getHttpSolrClient(baseUrl, DEFAULT_CONNECTION_TIMEOUT);
return s;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
protected void waitForBootstrapToComplete(String collectionName, String shardId) throws Exception {
@SuppressWarnings({"rawtypes"})
NamedList rsp;// we need to wait until bootstrap is complete otherwise the replicator thread will never start
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
rsp = invokeCdcrAction(shardToLeaderJetty.get(collectionName).get(shardId), CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
if (rsp.get(RESPONSE_STATUS).toString().equals(COMPLETED)) {
break;
}
Thread.sleep(1000);
}
}
protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception {
int cnt = 15;
while (cnt > 0) {
log.info("Checking queue size @ {}:{}", collectionName, shardId);
long size = this.getQueueSize(collectionName, shardId);
if (size == 0) { // if we received -1, it means that the log reader is not yet initialised, we should wait
return;
}
log.info("Waiting for replication to complete. Queue size: {} @ {}:{}", size, collectionName, shardId);
cnt--;
Thread.sleep(1000); // wait a bit for the replication to complete
}
throw new RuntimeException("Timeout waiting for CDCR replication to complete @" + collectionName + ":" + shardId);
}
protected long getQueueSize(String collectionName, String shardId) throws Exception {
@SuppressWarnings({"rawtypes"})
NamedList rsp = this.invokeCdcrAction(shardToLeaderJetty.get(collectionName).get(shardId), CdcrParams.CdcrAction.QUEUES);
@SuppressWarnings({"rawtypes"})
NamedList host = (NamedList) ((NamedList) rsp.get(CdcrParams.QUEUES)).getVal(0);
@SuppressWarnings({"rawtypes"})
NamedList status = (NamedList) host.get(TARGET_COLLECTION);
return (Long) status.get(CdcrParams.QUEUE_SIZE);
}
protected CollectionInfo collectInfo(String collection) throws Exception {
CollectionInfo info = new CollectionInfo(collection);
for (String shard : shardToJetty.get(collection).keySet()) {
List<CloudJettyRunner> jettyRunners = shardToJetty.get(collection).get(shard);
for (CloudJettyRunner jettyRunner : jettyRunners) {
for (SolrCore core : jettyRunner.jetty.getCoreContainer().getCores()) {
info.addCore(core, shard, shardToLeaderJetty.get(collection).containsValue(jettyRunner));
}
}
}
return info;
}
protected static class CollectionInfo {
List<CoreInfo> coreInfos = new ArrayList<>();
String collection;
CollectionInfo(String collection) {
this.collection = collection;
}
/**
* @return Returns a map shard -> list of cores
*/
Map<String, List<CoreInfo>> getShardToCoresMap() {
Map<String, List<CoreInfo>> map = new HashMap<>();
for (CoreInfo info : coreInfos) {
List<CoreInfo> list = map.get(info.shard);
if (list == null) {
list = new ArrayList<>();
map.put(info.shard, list);
}
list.add(info);
}
return map;
}
CoreInfo getLeader(String shard) {
List<CoreInfo> coreInfos = getShardToCoresMap().get(shard);
for (CoreInfo info : coreInfos) {
if (info.isLeader) {
return info;
}
}
assertTrue(String.format(Locale.ENGLISH, "There is no leader for collection %s shard %s", collection, shard), false);
return null;
}
List<CoreInfo> getReplicas(String shard) {
List<CoreInfo> coreInfos = getShardToCoresMap().get(shard);
coreInfos.remove(getLeader(shard));
return coreInfos;
}
void addCore(SolrCore core, String shard, boolean isLeader) throws Exception {
CoreInfo info = new CoreInfo();
info.collectionName = core.getName();
info.shard = shard;
info.isLeader = isLeader;
info.ulogDir = core.getUpdateHandler().getUpdateLog().getLogDir();
this.coreInfos.add(info);
}
public static class CoreInfo {
String collectionName;
String shard;
boolean isLeader;
String ulogDir;
}
}
}