blob: aa9ea3b0dde5618045fa679bac7c8185597d9e81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.cdcr;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.handler.CdcrParams;
import org.apache.solr.util.TimeOut;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CdcrBidirectionalTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Test
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12524")
public void testBiDir() throws Exception {
MiniSolrCloudCluster cluster2 = new MiniSolrCloudCluster(1, createTempDir("cdcr-cluster2"), buildJettyConfig("/solr"));
MiniSolrCloudCluster cluster1 = new MiniSolrCloudCluster(1, createTempDir("cdcr-cluster1"), buildJettyConfig("/solr"));
try {
if (log.isInfoEnabled()) {
log.info("cluster2 zkHost = {}", cluster2.getZkServer().getZkAddress());
}
System.setProperty("cdcr.cluster2.zkHost", cluster2.getZkServer().getZkAddress());
if (log.isInfoEnabled()) {
log.info("cluster1 zkHost = {}", cluster1.getZkServer().getZkAddress());
}
System.setProperty("cdcr.cluster1.zkHost", cluster1.getZkServer().getZkAddress());
cluster1.uploadConfigSet(configset("cdcr-cluster1"), "cdcr-cluster1");
CollectionAdminRequest.createCollection("cdcr-cluster1", "cdcr-cluster1", 2, 1)
.withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
.setMaxShardsPerNode(2)
.process(cluster1.getSolrClient());
CloudSolrClient cluster1SolrClient = cluster1.getSolrClient();
cluster1SolrClient.setDefaultCollection("cdcr-cluster1");
cluster2.uploadConfigSet(configset("cdcr-cluster2"), "cdcr-cluster2");
CollectionAdminRequest.createCollection("cdcr-cluster2", "cdcr-cluster2", 2, 1)
.withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
.setMaxShardsPerNode(2)
.process(cluster2.getSolrClient());
CloudSolrClient cluster2SolrClient = cluster2.getSolrClient();
cluster2SolrClient.setDefaultCollection("cdcr-cluster2");
UpdateRequest req = null;
CdcrTestsUtil.cdcrStart(cluster1SolrClient);
Thread.sleep(2000);
// ADD operation on cluster 1
int docs = (TEST_NIGHTLY ? 100 : 10);
int numDocs_c1 = 0;
for (int k = 0; k < docs; k++) {
req = new UpdateRequest();
for (; numDocs_c1 < (k + 1) * 100; numDocs_c1++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "cluster1_" + numDocs_c1);
doc.addField("xyz", numDocs_c1);
req.add(doc);
}
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
log.info("Adding {} docs with commit=true, numDocs={}", docs, numDocs_c1);
req.process(cluster1SolrClient);
}
QueryResponse response = cluster1SolrClient.query(new SolrQuery("*:*"));
assertEquals("cluster 1 docs mismatch", numDocs_c1, response.getResults().getNumFound());
assertEquals("cluster 2 docs mismatch", numDocs_c1, CdcrTestsUtil.waitForClusterToSync(numDocs_c1, cluster2SolrClient));
CdcrTestsUtil.cdcrStart(cluster2SolrClient); // FULL BI-DIRECTIONAL CDCR FORWARDING ON
Thread.sleep(2000);
// ADD operation on cluster 2
int numDocs_c2 = 0;
for (int k = 0; k < docs; k++) {
req = new UpdateRequest();
for (; numDocs_c2 < (k + 1) * 100; numDocs_c2++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "cluster2_" + numDocs_c2);
doc.addField("xyz", numDocs_c2);
req.add(doc);
}
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
log.info("Adding {} docs with commit=true, numDocs= {}", docs, numDocs_c2);
req.process(cluster2SolrClient);
}
int numDocs = numDocs_c1 + numDocs_c2;
response = cluster2SolrClient.query(new SolrQuery("*:*"));
assertEquals("cluster 2 docs mismatch", numDocs, response.getResults().getNumFound());
assertEquals("cluster 1 docs mismatch", numDocs, CdcrTestsUtil.waitForClusterToSync(numDocs, cluster1SolrClient));
// logging cdcr clusters queue response
response = CdcrTestsUtil.getCdcrQueue(cluster1SolrClient);
if (log.isInfoEnabled()) {
log.info("Cdcr cluster1 queue response: {}", response.getResponse());
}
response = CdcrTestsUtil.getCdcrQueue(cluster2SolrClient);
if (log.isInfoEnabled()) {
log.info("Cdcr cluster2 queue response: {}", response.getResponse());
}
// lets find and keep the maximum version assigned by cluster1 & cluster2 across all our updates
long maxVersion_c1 = Math.min((long)CdcrTestsUtil.getFingerPrintMaxVersion(cluster1SolrClient, "shard1", numDocs),
(long)CdcrTestsUtil.getFingerPrintMaxVersion(cluster1SolrClient, "shard2", numDocs));
long maxVersion_c2 = Math.min((long)CdcrTestsUtil.getFingerPrintMaxVersion(cluster2SolrClient, "shard1", numDocs),
(long)CdcrTestsUtil.getFingerPrintMaxVersion(cluster2SolrClient, "shard2", numDocs));
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
params.set(CommonParams.QT, "/cdcr");
response = cluster2SolrClient.query(params);
Long checkpoint_2 = (Long) response.getResponse().get(CdcrParams.CHECKPOINT);
assertNotNull(checkpoint_2);
params = new ModifiableSolrParams();
params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
params.set(CommonParams.QT, "/cdcr");
response = cluster1SolrClient.query(params);
Long checkpoint_1 = (Long) response.getResponse().get(CdcrParams.CHECKPOINT);
assertNotNull(checkpoint_1);
log.info("v1: {}\tv2: {}\tcheckpoint1: {}\tcheckpoint2: {}"
, maxVersion_c1, maxVersion_c2, checkpoint_1, checkpoint_2);
assertEquals("COLLECTIONCHECKPOINT from cluster2 should have returned the maximum " +
"version across all updates made to cluster1", maxVersion_c1, checkpoint_2.longValue());
assertEquals("COLLECTIONCHECKPOINT from cluster1 should have returned the maximum " +
"version across all updates made to cluster2", maxVersion_c2, checkpoint_1.longValue());
assertEquals("max versions of updates in both clusters should be same", maxVersion_c1, maxVersion_c2);
// DELETE BY QUERY
String deleteByQuery = "id:cluster1_" +String.valueOf(random().nextInt(numDocs_c1));
response = cluster1SolrClient.query(new SolrQuery(deleteByQuery));
assertEquals("should match exactly one doc", 1, response.getResults().getNumFound());
cluster1SolrClient.deleteByQuery(deleteByQuery);
cluster1SolrClient.commit();
numDocs--;
numDocs_c1--;
response = cluster1SolrClient.query(new SolrQuery("*:*"));
assertEquals("cluster 1 docs mismatch", numDocs, response.getResults().getNumFound());
assertEquals("cluster 2 docs mismatch", numDocs, CdcrTestsUtil.waitForClusterToSync(numDocs, cluster2SolrClient));
// DELETE BY ID
SolrInputDocument doc;
String delete_id_query = "cluster2_" + random().nextInt(numDocs_c2);
cluster2SolrClient.deleteById(delete_id_query);
cluster2SolrClient.commit();
numDocs--;
numDocs_c2--;
response = cluster2SolrClient.query(new SolrQuery("*:*"));
assertEquals("cluster 2 docs mismatch", numDocs, response.getResults().getNumFound());
assertEquals("cluster 1 docs mismatch", numDocs, CdcrTestsUtil.waitForClusterToSync(numDocs, cluster1SolrClient));
// ATOMIC UPDATES
req = new UpdateRequest();
doc = new SolrInputDocument();
String atomicFieldName = "abc";
ImmutableMap.of("", "");
String atomicUpdateId = "cluster2_" + random().nextInt(numDocs_c2);
doc.addField("id", atomicUpdateId);
doc.addField("xyz", ImmutableMap.of("delete", ""));
doc.addField(atomicFieldName, ImmutableMap.of("set", "ABC"));
req.add(doc);
req.process(cluster2SolrClient);
cluster2SolrClient.commit();
String atomicQuery = "id:" + atomicUpdateId;
response = cluster2SolrClient.query(new SolrQuery(atomicQuery));
assertEquals("cluster 2 wrong doc", "ABC", response.getResults().get(0).get(atomicFieldName));
assertEquals("cluster 1 wrong doc", "ABC", getDocFieldValue(cluster1SolrClient, atomicQuery, "ABC", atomicFieldName ));
// logging cdcr clusters queue response
response = CdcrTestsUtil.getCdcrQueue(cluster1SolrClient);
if (log.isInfoEnabled()) {
log.info("Cdcr cluster1 queue response at end of testcase: {}", response.getResponse());
}
response = CdcrTestsUtil.getCdcrQueue(cluster2SolrClient);
if (log.isInfoEnabled()) {
log.info("Cdcr cluster2 queue response at end of testcase: {}", response.getResponse());
}
CdcrTestsUtil.cdcrStop(cluster1SolrClient);
CdcrTestsUtil.cdcrStop(cluster2SolrClient);
} finally {
if (cluster1 != null) {
cluster1.shutdown();
}
if (cluster2 != null) {
cluster2.shutdown();
}
}
}
private String getDocFieldValue(CloudSolrClient clusterSolrClient, String query, String match, String field) throws Exception {
TimeOut waitTimeOut = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!waitTimeOut.hasTimedOut()) {
clusterSolrClient.commit();
QueryResponse response = clusterSolrClient.query(new SolrQuery(query));
if (response.getResults().size() > 0 && match.equals(response.getResults().get(0).get(field))) {
return (String) response.getResults().get(0).get(field);
}
Thread.sleep(1000);
}
return null;
}
}