blob: 6953a3240187ac511c2587e71bb68bca79502517 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.cdcr;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.update.processor.CdcrUpdateProcessor;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CdcrVersionReplicationTest extends BaseCdcrDistributedZkTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String vfield = CommonParams.VERSION_FIELD;
SolrClient solrServer;
public CdcrVersionReplicationTest() {
schemaString = "schema15.xml"; // we need a string id
super.createTargetCollection = false;
}
SolrClient createClientRandomly() throws Exception {
int r = random().nextInt(100);
// testing the smart cloud client (requests to leaders) is more important than testing the forwarding logic
if (r < 80) {
return createCloudClient(SOURCE_COLLECTION);
}
if (r < 90) {
return createNewSolrServer(shardToJetty.get(SOURCE_COLLECTION).get(SHARD1).get(random().nextInt(2)).url);
}
return createNewSolrServer(shardToJetty.get(SOURCE_COLLECTION).get(SHARD2).get(random().nextInt(2)).url);
}
@Test
@ShardsFixed(num = 4)
public void testCdcrDocVersions() throws Exception {
SolrClient client = createClientRandomly();
try {
handle.clear();
handle.put("timestamp", SKIPVAL);
doTestCdcrDocVersions(client);
commit(SOURCE_COLLECTION); // work arround SOLR-5628
} finally {
client.close();
}
}
private void doTestCdcrDocVersions(SolrClient solrClient) throws Exception {
this.solrServer = solrClient;
log.info("### STARTING doCdcrTestDocVersions - Add commands, client: {}", solrClient);
vadd("doc1", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
vadd("doc2", 11, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "11");
vadd("doc3", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
vadd("doc4", 11, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "11");
commit(SOURCE_COLLECTION);
// versions are preserved and verifiable both by query and by real-time get
doQuery(solrClient, "doc1,10,doc2,11,doc3,10,doc4,11", "q", "*:*");
doRealTimeGet("doc1,doc2,doc3,doc4", "10,11,10,11");
vadd("doc1", 5, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "5");
vadd("doc2", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
vadd("doc3", 9, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "9");
vadd("doc4", 8, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "8");
// lower versions are ignored
doRealTimeGet("doc1,doc2,doc3,doc4", "10,11,10,11");
vadd("doc1", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
vadd("doc2", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
vadd("doc3", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
vadd("doc4", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
// higher versions are accepted
doRealTimeGet("doc1,doc2,doc3,doc4", "12,12,12,12");
// non-cdcr update requests throw a version conflict exception for non-equal versions (optimistic locking feature)
vaddFail("doc1", 13, 409);
vaddFail("doc2", 13, 409);
vaddFail("doc3", 13, 409);
commit(SOURCE_COLLECTION);
// versions are still as they were
doQuery(solrClient, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
// query all shard replicas individually
doQueryShardReplica(SHARD1, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
doQueryShardReplica(SHARD2, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
// optimistic locking update
vadd("doc4", 12);
commit(SOURCE_COLLECTION);
QueryResponse rsp = solrClient.query(params("qt", "/get", "ids", "doc4"));
long version = (long) rsp.getResults().get(0).get(vfield);
// update accepted and a new version number was generated
assertTrue(version > 1_000_000_000_000l);
log.info("### STARTING doCdcrTestDocVersions - Delete commands");
// send a delete update with an older version number
vdelete("doc1", 5, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "5");
// must ignore the delete
doRealTimeGet("doc1", "12");
// send a delete update with a higher version number
vdelete("doc1", 13, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "13");
// must be deleted
doRealTimeGet("doc1", "");
// send a delete update with a higher version number
vdelete("doc4", version + 1, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "" + (version + 1));
// must be deleted
doRealTimeGet("doc4", "");
commit(SOURCE_COLLECTION);
// query each shard replica individually
doQueryShardReplica(SHARD1, "doc2,12,doc3,12", "q", "*:*");
doQueryShardReplica(SHARD2, "doc2,12,doc3,12", "q", "*:*");
// version conflict thanks to optimistic locking
if (solrClient instanceof CloudSolrClient) // TODO: it seems that optimistic locking doesn't work with forwarding, test with shard2 client
vdeleteFail("doc2", 50, 409);
// cleanup after ourselves for the next run
// deleteByQuery should work as usual with the CDCR_UPDATE param
doDeleteByQuery("id:doc*", CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, Long.toString(1));
commit(SOURCE_COLLECTION);
// deleteByQuery with a version lower than anything else should have no effect
doQuery(solrClient, "doc2,12,doc3,12", "q", "*:*");
doDeleteByQuery("id:doc*", CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, Long.toString(51));
commit(SOURCE_COLLECTION);
// deleteByQuery with a version higher than everything else should delete all remaining docs
doQuery(solrClient, "", "q", "*:*");
// check that replicas are as expected too
doQueryShardReplica(SHARD1, "", "q", "*:*");
doQueryShardReplica(SHARD2, "", "q", "*:*");
}
// ------------------ auxiliary methods ------------------
void doQueryShardReplica(String shard, String expectedDocs, String... queryParams) throws Exception {
for (CloudJettyRunner jetty : shardToJetty.get(SOURCE_COLLECTION).get(shard)) {
doQuery(jetty.client, expectedDocs, queryParams);
}
}
void vdelete(String id, long version, String... params) throws Exception {
UpdateRequest req = new UpdateRequest();
req.deleteById(id);
req.setParam(vfield, Long.toString(version));
for (int i = 0; i < params.length; i += 2) {
req.setParam(params[i], params[i + 1]);
}
solrServer.request(req);
}
void vdeleteFail(String id, long version, int errCode, String... params) throws Exception {
boolean failed = false;
try {
vdelete(id, version, params);
} catch (SolrException e) {
failed = true;
if (e.getCause() instanceof SolrException && e.getCause() != e) {
e = (SolrException) e.getCause();
}
assertEquals(errCode, e.code());
} catch (SolrServerException ex) {
Throwable t = ex.getCause();
if (t instanceof SolrException) {
failed = true;
SolrException exception = (SolrException) t;
assertEquals(errCode, exception.code());
}
} catch (Exception e) {
log.error("ERROR", e);
}
assertTrue(failed);
}
void vadd(String id, long version, String... params) throws Exception {
UpdateRequest req = new UpdateRequest();
req.add(sdoc("id", id, vfield, version));
for (int i = 0; i < params.length; i += 2) {
req.setParam(params[i], params[i + 1]);
}
solrServer.request(req);
}
void vaddFail(String id, long version, int errCode, String... params) throws Exception {
boolean failed = false;
try {
vadd(id, version, params);
} catch (SolrException e) {
failed = true;
if (e.getCause() instanceof SolrException && e.getCause() != e) {
e = (SolrException) e.getCause();
}
assertEquals(errCode, e.code());
} catch (SolrServerException ex) {
Throwable t = ex.getCause();
if (t instanceof SolrException) {
failed = true;
SolrException exception = (SolrException) t;
assertEquals(errCode, exception.code());
}
} catch (Exception e) {
log.error("ERROR", e);
}
assertTrue(failed);
}
void doQuery(SolrClient ss, String expectedDocs, String... queryParams) throws Exception {
List<String> strs = StrUtils.splitSmart(expectedDocs, ",", true);
Map<String, Object> expectedIds = new HashMap<>();
for (int i = 0; i < strs.size(); i += 2) {
String id = strs.get(i);
String vS = strs.get(i + 1);
Long v = Long.valueOf(vS);
expectedIds.put(id, v);
}
QueryResponse rsp = ss.query(params(queryParams));
Map<String, Object> obtainedIds = new HashMap<>();
for (SolrDocument doc : rsp.getResults()) {
obtainedIds.put((String) doc.get("id"), doc.get(vfield));
}
assertEquals(expectedIds, obtainedIds);
}
void doRealTimeGet(String ids, String versions) throws Exception {
Map<String, Object> expectedIds = new HashMap<>();
List<String> strs = StrUtils.splitSmart(ids, ",", true);
List<String> verS = StrUtils.splitSmart(versions, ",", true);
for (int i = 0; i < strs.size(); i++) {
if (!verS.isEmpty()) {
expectedIds.put(strs.get(i), Long.valueOf(verS.get(i)));
}
}
QueryResponse rsp = solrServer.query(params("qt", "/get", "ids", ids));
Map<String, Object> obtainedIds = new HashMap<>();
for (SolrDocument doc : rsp.getResults()) {
obtainedIds.put((String) doc.get("id"), doc.get(vfield));
}
assertEquals(expectedIds, obtainedIds);
}
void doDeleteByQuery(String q, String... reqParams) throws Exception {
UpdateRequest req = new UpdateRequest();
req.deleteByQuery(q);
req.setParams(params(reqParams));
req.process(solrServer);
}
}