blob: 54e816ca6c198d64320dfa7b4c28f8f8739f84db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.junit.Test;
import static org.hamcrest.core.StringContains.containsString;
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
public class PeerSyncTest extends BaseDistributedSearchTestCase {
protected static int numVersions = 100; // number of versions to use when syncing
protected static final String FROM_LEADER = DistribPhase.FROMLEADER.toString();
protected static final ModifiableSolrParams seenLeader =
params(DISTRIB_UPDATE_PARAM, FROM_LEADER);
public PeerSyncTest() {
stress = 0;
// TODO: a better way to do this?
configString = "solrconfig-tlog.xml";
schemaString = "schema.xml";
// validate that the schema was not changed to an unexpected state
try {
initCore(configString, schemaString);
} catch (Exception e) {
throw new RuntimeException(e);
}
IndexSchema schema = h.getCore().getLatestSchema();
assertTrue(schema.getFieldOrNull("_version_").hasDocValues() && !schema.getFieldOrNull("_version_").indexed()
&& !schema.getFieldOrNull("_version_").stored());
assertTrue(!schema.getFieldOrNull("val_i_dvo").indexed() && !schema.getFieldOrNull("val_i_dvo").stored() &&
schema.getFieldOrNull("val_i_dvo").hasDocValues());
}
@Test
@ShardsFixed(num = 3)
public void test() throws Exception {
Set<Integer> docsAdded = new LinkedHashSet<>();
handle.clear();
handle.put("timestamp", SKIPVAL);
handle.put("score", SKIPVAL);
handle.put("maxScore", SKIPVAL);
SolrClient client0 = clients.get(0);
SolrClient client1 = clients.get(1);
SolrClient client2 = clients.get(2);
long v = 0;
add(client0, seenLeader, sdoc("id","1","_version_",++v));
// this fails because client0 has no context (i.e. no updates of its own to judge if applying the updates
// from client1 will bring it into sync with client1)
assertSync(client1, numVersions, false, shardsArr[0]);
// bring client1 back into sync with client0 by adding the doc
add(client1, seenLeader, sdoc("id","1","_version_",v));
// both have the same version list, so sync should now return true
assertSync(client1, numVersions, true, shardsArr[0]);
// TODO: test that updates weren't necessary
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*"), client0, client1);
add(client0, seenLeader, addRandFields(sdoc("id","2","_version_",++v)));
// now client1 has the context to sync
assertSync(client1, numVersions, true, shardsArr[0]);
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*"), client0, client1);
add(client0, seenLeader, addRandFields(sdoc("id","3","_version_",++v)));
add(client0, seenLeader, addRandFields(sdoc("id","4","_version_",++v)));
add(client0, seenLeader, addRandFields(sdoc("id","5","_version_",++v)));
add(client0, seenLeader, addRandFields(sdoc("id","6","_version_",++v)));
add(client0, seenLeader, addRandFields(sdoc("id","7","_version_",++v)));
add(client0, seenLeader, addRandFields(sdoc("id","8","_version_",++v)));
add(client0, seenLeader, addRandFields(sdoc("id","9","_version_",++v)));
add(client0, seenLeader, addRandFields(sdoc("id","10","_version_",++v)));
for (int i=0; i<10; i++) docsAdded.add(i+1);
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
testOverlap(docsAdded, client0, client1, v);
// test delete and deleteByQuery
v=1000;
SolrInputDocument doc = sdoc("id","1000","_version_",++v);
add(client0, seenLeader, doc);
add(client0, seenLeader, sdoc("id","1001","_version_",++v));
delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:1001 OR id:1002");
add(client0, seenLeader, sdoc("id","1002","_version_",++v));
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "1000");
docsAdded.add(1002); // 1002 added
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
// test that delete by query is returned even if not requested, and that it doesn't delete newer stuff than it should
v=2000;
SolrClient client = client0;
add(client, seenLeader, sdoc("id","2000","_version_",++v));
add(client, seenLeader, sdoc("id","2001","_version_",++v));
delQ(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:2001 OR id:2002");
add(client, seenLeader, sdoc("id","2002","_version_",++v));
del(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "2000");
docsAdded.add(2002); // 2002 added
v=2000;
client = client1;
add(client, seenLeader, sdoc("id","2000","_version_",++v));
++v; // pretend we missed the add of 2001. peersync should retrieve it, but should also retrieve any deleteByQuery objects after it
// add(client, seenLeader, sdoc("id","2001","_version_",++v));
delQ(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:2001 OR id:2002");
add(client, seenLeader, sdoc("id","2002","_version_",++v));
del(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "2000");
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
//
// Test that handling reorders work when applying docs retrieved from peer
//
// this should cause us to retrieve the delete (but not the following add)
// the reorder in application shouldn't affect anything
add(client0, seenLeader, sdoc("id","3000","_version_",3001));
add(client1, seenLeader, sdoc("id","3000","_version_",3001));
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","3000"), "3000");
docsAdded.add(3000);
// this should cause us to retrieve an add tha was previously deleted
add(client0, seenLeader, sdoc("id","3001","_version_",3003));
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","3001"), "3004");
del(client1, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","3001"), "3004");
// this should cause us to retrieve an older add that was overwritten
add(client0, seenLeader, sdoc("id","3002","_version_",3004));
add(client0, seenLeader, sdoc("id","3002","_version_",3005));
add(client1, seenLeader, sdoc("id","3002","_version_",3005));
docsAdded.add(3001); // 3001 added
docsAdded.add(3002); // 3002 added
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
// now lets check fingerprinting causes appropriate fails
v = 4000;
add(client0, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
docsAdded.add(4000);
int toAdd = numVersions+10;
for (int i=0; i<toAdd; i++) {
add(client0, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
add(client1, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
docsAdded.add((int)v+i+1);
}
// client0 now has an additional add beyond our window and the fingerprint should cause this to fail
assertSync(client1, numVersions, false, shardsArr[0]);
// if we turn of fingerprinting, it should succeed
System.setProperty("solr.disableFingerprint", "true");
try {
assertSync(client1, numVersions, true, shardsArr[0]);
} finally {
System.clearProperty("solr.disableFingerprint");
}
// lets add the missing document and verify that order doesn't matter
add(client1, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
assertSync(client1, numVersions, true, shardsArr[0]);
// lets do some overwrites to ensure that repeated updates and maxDoc don't matter
for (int i=0; i<10; i++) {
add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i + 1), "_version_", v + i + 1));
}
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
// lets add some in-place updates
add(client0, seenLeader, sdoc("id", "5000", "val_i_dvo", 0, "title", "mytitle", "_version_", 5000)); // full update
docsAdded.add(5000);
assertSync(client1, numVersions, true, shardsArr[0]);
// verify the in-place updated document (id=5000) has correct fields
assertEquals(0, client1.getById("5000").get("val_i_dvo"));
assertEquals(client0.getById("5000")+" and "+client1.getById("5000"),
"mytitle", client1.getById("5000").getFirstValue("title"));
ModifiableSolrParams inPlaceParams = new ModifiableSolrParams(seenLeader);
inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "5000");
add(client0, inPlaceParams, sdoc("id", "5000", "val_i_dvo", 1, "_version_", 5001)); // in-place update
assertSync(client1, numVersions, true, shardsArr[0]);
// verify the in-place updated document (id=5000) has correct fields
assertEquals(1, client1.getById("5000").get("val_i_dvo"));
assertEquals(client0.getById("5000")+" and "+client1.getById("5000"),
"mytitle", client1.getById("5000").getFirstValue("title"));
// interleave the in-place updates with a few deletes to other documents
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","5002"), 4001);
delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","5003"), "id:4002");
docsAdded.remove(4001);
docsAdded.remove(4002);
inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "5001");
add(client0, inPlaceParams, sdoc("id", 5000, "val_i_dvo", 2, "_version_", 5004)); // in-place update
assertSync(client1, numVersions, true, shardsArr[0]);
// verify the in-place updated document (id=5000) has correct fields
assertEquals(2, client1.getById("5000").get("val_i_dvo"));
assertEquals(client0.getById("5000")+" and "+client1.getById("5000"),
"mytitle", client1.getById("5000").getFirstValue("title"));
// a DBQ with value
delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","5005"), "val_i_dvo:1"); // current val is 2, so this should not delete anything
assertSync(client1, numVersions, true, shardsArr[0]);
add(client0, seenLeader, sdoc("id", "5000", "val_i_dvo", 0, "title", "mytitle", "_version_", 5000)); // full update
docsAdded.add(5000);
assertSync(client1, numVersions, true, shardsArr[0]);
inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "5004");
add(client0, inPlaceParams, sdoc("id", 5000, "val_i_dvo", 3, "_version_", 5006));
assertSync(client1, numVersions, true, shardsArr[0]);
// verify the in-place updated document (id=5000) has correct fields
assertEquals(3, client1.getById("5000").get("val_i_dvo"));
assertEquals(client0.getById("5000")+" and "+client1.getById("5000"),
"mytitle", client1.getById("5000").getFirstValue("title"));
validateDocs(docsAdded, client0, client1);
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","5007"), 5000);
docsAdded.remove(5000);
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
// if doc with id=6000 is deleted, further in-place-updates should fail
add(client0, seenLeader, sdoc("id", "6000", "val_i_dvo", 6, "title", "mytitle", "_version_", 6000)); // full update
delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","6004"), "val_i_dvo:6"); // current val is 6000, this will delete id=6000
assertSync(client1, numVersions, true, shardsArr[0]);
SolrException ex = expectThrows(SolrException.class, () -> {
inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "6000");
add(client0, inPlaceParams, sdoc("id", 6000, "val_i_dvo", 6003, "_version_", 5007));
});
assertEquals(ex.toString(), SolrException.ErrorCode.SERVER_ERROR.code, ex.code());
assertThat(ex.getMessage(), containsString("Can't find document with id=6000"));
// Reordered DBQ with Child-nodes (SOLR-10114)
docsAdded.clear();
// Reordered full delete should not delete child-docs
add(client0, seenLeader, sdocWithChildren(7001, "7001", 2)); // add with later version
docsAdded.add(7001);
docsAdded.add(7001001);
docsAdded.add(7001002);
delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","7000"), "id:*"); // reordered delete
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
// Reordered DBQ should not affect update
add(client0, seenLeader, sdocWithChildren(8000, "8000", 5)); // add with later version
delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","8002"), "id:8500"); // not found, arrives earlier
add(client0, seenLeader, sdocWithChildren(8000, "8001", 2)); // update with two childs
docsAdded.add(8000);
docsAdded.add(8000001);
docsAdded.add(8000002);
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
}
protected void testOverlap(Set<Integer> docsAdded, SolrClient client0, SolrClient client1, long v) throws IOException, SolrServerException {
int toAdd = (int)(numVersions *.95);
for (int i=0; i<toAdd; i++) {
add(client0, seenLeader, sdoc("id",Integer.toString(i+11),"_version_",v+i+1));
docsAdded.add(i+11);
}
// sync should fail since there's not enough overlap to give us confidence
assertSync(client1, numVersions, false, shardsArr[0]);
// add some of the docs that were missing... just enough to give enough overlap
int toAdd2 = (int)(numVersions * .25);
for (int i=0; i<toAdd2; i++) {
add(client1, seenLeader, sdoc("id",Integer.toString(i+11),"_version_",v+i+1));
}
assertSync(client1, numVersions, true, shardsArr[0]);
validateDocs(docsAdded, client0, client1);
}
protected void validateDocs(Set<Integer> docsAdded, SolrClient client0, SolrClient client1) throws SolrServerException, IOException {
client0.commit();
client1.commit();
QueryResponse qacResponse;
qacResponse = queryAndCompare(params("q", "*:*", "rows", "10000", "sort","_version_ desc"), client0, client1);
validateQACResponse(docsAdded, qacResponse);
}
void assertSync(SolrClient client, int numVersions, boolean expectedResult, String... syncWith) throws IOException, SolrServerException {
QueryRequest qr = new QueryRequest(params("qt","/get", "getVersions",Integer.toString(numVersions), "sync", StrUtils.join(Arrays.asList(syncWith), ',')));
@SuppressWarnings({"rawtypes"})
NamedList rsp = client.request(qr);
assertEquals(expectedResult, (Boolean) rsp.get("sync"));
}
void validateQACResponse(Set<Integer> docsAdded, QueryResponse qacResponse) {
Set<Integer> qacDocs = new LinkedHashSet<>();
for (int i=0; i<qacResponse.getResults().size(); i++) {
qacDocs.add(Integer.parseInt(qacResponse.getResults().get(i).getFieldValue("id").toString()));
}
assertEquals(docsAdded, qacDocs);
assertEquals(docsAdded.size(), qacResponse.getResults().getNumFound());
}
}