| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.search; |
| |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.util.Utils; |
| import org.apache.solr.request.SolrQueryRequest; |
| import org.apache.solr.util.RefCounted; |
| import org.apache.solr.util.TestHarness; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import static org.apache.solr.core.SolrCore.verbose; |
| import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; |
| |
| public class TestRealTimeGet extends TestRTGBase { |
| |
| @BeforeClass |
| public static void beforeClass() throws Exception { |
| randomizeUpdateLogImpl(); |
| initCore("solrconfig-tlog.xml","schema_latest.xml"); |
| } |
| |
| |
| @Test |
| public void testGetRealtime() throws Exception { |
| clearIndex(); |
| assertU(commit()); |
| |
| assertU(adoc("id","1", |
| "a_f","-1.5", "a_fd","-1.5", "a_fdS","-1.5", "a_fs","1.0","a_fs","2.5", "a_fds","1.0","a_fds","2.5", "a_fdsS","1.0","a_fdsS","2.5", |
| "a_d","-1.2E99", "a_dd","-1.2E99", "a_ddS","-1.2E99", "a_ds","1.0","a_ds","2.5", "a_dds","1.0","a_dds","2.5", "a_ddsS","1.0","a_ddsS","2.5", |
| "a_i","-1", "a_id","-1", "a_idS","-1", "a_is","1","a_is","2", "a_ids","1","a_ids","2", "a_idsS","1","a_idsS","2", |
| "a_l","-9999999999", "a_ld","-9999999999", "a_ldS","-9999999999", "a_ls","1","a_ls","9999999999", "a_lds","1","a_lds","9999999999", "a_ldsS","1","a_ldsS","9999999999", |
| "a_s", "abc", "a_sd", "bcd", "a_sdS", "cde", "a_ss","def","a_ss", "efg", "a_sds","fgh","a_sds","ghi", "a_sdsS","hij","a_sdsS","ijk", |
| "a_b", "false", "a_bd", "true", "a_bdS", "false", "a_bs","true","a_bs", "false", "a_bds","true","a_bds","false", "a_bdsS","true","a_bdsS","false" |
| )); |
| assertJQ(req("q","id:1") |
| ,"/response/numFound==0" |
| ); |
| assertJQ(req("qt","/get", "id","1", "fl","id, a_f,a_fd,a_fdS a_fs,a_fds,a_fdsS, " + |
| "a_d,a_dd,a_ddS, a_ds,a_dds,a_ddsS, a_i,a_id,a_idS a_is,a_ids,a_idsS, " + |
| "a_l,a_ld,a_ldS, a_ls,a_lds,a_ldsS, a_s,a_sd,a_sdS a_ss,a_sds,a_sdsS, " + |
| "a_b,a_bd,a_bdS, a_bs,a_bds,a_bdsS" |
| ) |
| ,"=={'doc':{'id':'1'" + |
| ", a_f:-1.5, a_fd:-1.5, a_fdS:-1.5, a_fs:[1.0,2.5], a_fds:[1.0,2.5],a_fdsS:[1.0,2.5]" + |
| ", a_d:-1.2E99, a_dd:-1.2E99, a_ddS:-1.2E99, a_ds:[1.0,2.5],a_dds:[1.0,2.5],a_ddsS:[1.0,2.5]" + |
| ", a_i:-1, a_id:-1, a_idS:-1, a_is:[1,2],a_ids:[1,2],a_idsS:[1,2]" + |
| ", a_l:-9999999999, a_ld:-9999999999, a_ldS:-9999999999, a_ls:[1,9999999999],a_lds:[1,9999999999],a_ldsS:[1,9999999999]" + |
| ", a_s:'abc', a_sd:'bcd', a_sdS:'cde', a_ss:['def','efg'],a_sds:['fgh','ghi'],a_sdsS:['hij','ijk']" + |
| ", a_b:false, a_bd:true, a_bdS:false, a_bs:[true,false],a_bds:[true,false],a_bdsS:[true,false]" + |
| " }}" |
| ); |
| assertJQ(req("qt","/get","ids","1", "fl","id") |
| ,"=={" + |
| " 'response':{'numFound':1,'start':0,'numFoundExact':true,'docs':[" + |
| " {" + |
| " 'id':'1'}]" + |
| " }}}" |
| ); |
| |
| assertU(commit()); |
| |
| assertJQ(req("q","id:1") |
| ,"/response/numFound==1" |
| ); |
| |
| // a cut-n-paste of the first big query, but this time it will be retrieved from the index rather than the transaction log |
| assertJQ(req("qt","/get", "id","1", "fl","id, a_f,a_fd,a_fdS a_fs,a_fds,a_fdsS, a_d,a_dd,a_ddS, a_ds,a_dds,a_ddsS, a_i,a_id,a_idS a_is,a_ids,a_idsS, a_l,a_ld,a_ldS a_ls,a_lds,a_ldsS") |
| ,"=={'doc':{'id':'1'" + |
| ", a_f:-1.5, a_fd:-1.5, a_fdS:-1.5, a_fs:[1.0,2.5], a_fds:[1.0,2.5],a_fdsS:[1.0,2.5]" + |
| ", a_d:-1.2E99, a_dd:-1.2E99, a_ddS:-1.2E99, a_ds:[1.0,2.5],a_dds:[1.0,2.5],a_ddsS:[1.0,2.5]" + |
| ", a_i:-1, a_id:-1, a_idS:-1, a_is:[1,2],a_ids:[1,2],a_idsS:[1,2]" + |
| ", a_l:-9999999999, a_ld:-9999999999, a_ldS:-9999999999, a_ls:[1,9999999999],a_lds:[1,9999999999],a_ldsS:[1,9999999999]" + |
| " }}" |
| ); |
| |
| assertJQ(req("qt","/get","id","1", "fl","id") |
| ,"=={'doc':{'id':'1'}}" |
| ); |
| assertJQ(req("qt","/get","ids","1", "fl","id") |
| ,"=={" + |
| " 'response':{'numFound':1,'start':0,'numFoundExact':true,'docs':[" + |
| " {" + |
| " 'id':'1'}]" + |
| " }}}" |
| ); |
| |
| assertU(delI("1")); |
| |
| assertJQ(req("q","id:1") |
| ,"/response/numFound==1" |
| ); |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':null}" |
| ); |
| assertJQ(req("qt","/get","ids","1") |
| ,"=={'response':{'numFound':0,'start':0,'numFoundExact':true,'docs':[]}}" |
| ); |
| |
| |
| assertU(adoc("id","10")); |
| assertU(adoc("id","11")); |
| assertJQ(req("qt","/get","id","10", "fl","id") |
| ,"=={'doc':{'id':'10'}}" |
| ); |
| assertU(delQ("id:10 foo_s:abcdef")); |
| assertJQ(req("qt","/get","id","10") |
| ,"=={'doc':null}" |
| ); |
| assertJQ(req("qt","/get","id","11", "fl","id") |
| ,"=={'doc':{'id':'11'}}" |
| ); |
| |
| // multivalued field |
| assertU(adoc("id","12", "val_ls","1", "val_ls","2")); |
| assertJQ(req("q","id:12") |
| ,"/response/numFound==0" |
| ); |
| assertJQ(req("qt","/get", "id","12", "fl","id,val_ls") |
| ,"=={'doc':{'id':'12', 'val_ls':[1,2]}}" |
| ); |
| |
| assertU(commit()); |
| |
| assertJQ(req("qt","/get", "id","12", "fl","id,val_ls") |
| ,"=={'doc':{'id':'12', 'val_ls':[1,2]}}" |
| ); |
| assertJQ(req("q","id:12") |
| ,"/response/numFound==1" |
| ); |
| |
| |
| SolrQueryRequest req = req(); |
| RefCounted<SolrIndexSearcher> realtimeHolder = req.getCore().getRealtimeSearcher(); |
| |
| // |
| // filters |
| // |
| assertU(adoc("id", "12")); |
| assertU(adoc("id", "13")); |
| |
| // this should not need to open another realtime searcher |
| assertJQ(req("qt","/get","id","11", "fl","id", "fq","id:11") |
| ,"=={doc:{id:'11'}}" |
| ); |
| |
| // assert that the same realtime searcher is still in effect (i.e. that we didn't |
| // open a new searcher when we didn't have to). |
| RefCounted<SolrIndexSearcher> realtimeHolder2 = req.getCore().getRealtimeSearcher(); |
| assertEquals(realtimeHolder.get(), realtimeHolder2.get()); // Autocommit could possibly cause this to fail? |
| realtimeHolder2.decref(); |
| |
| // filter most likely different segment |
| assertJQ(req("qt","/get","id","12", "fl","id", "fq","id:11") |
| ,"=={doc:null}" |
| ); |
| |
| // filter most likely same different segment |
| assertJQ(req("qt","/get","id","12", "fl","id", "fq","id:13") |
| ,"=={doc:null}" |
| ); |
| |
| assertJQ(req("qt","/get","id","12", "fl","id", "fq","id:12") |
| ,"=={doc:{id:'12'}}" |
| ); |
| |
| assertU(adoc("id", "14")); |
| assertU(adoc("id", "15")); |
| |
| // id list, with some in index and some not, first id from index. Also test mutiple fq params. |
| assertJQ(req("qt","/get","ids","12,14,13,15", "fl","id", "fq","id:[10 TO 14]", "fq","id:[13 TO 19]") |
| ,"/response/docs==[{id:'14'},{id:'13'}]" |
| ); |
| |
| assertU(adoc("id", "16")); |
| assertU(adoc("id", "17")); |
| |
| // id list, with some in index and some not, first id from tlog |
| assertJQ(req("qt","/get","ids","17,16,15,14", "fl","id", "fq","id:[15 TO 16]") |
| ,"/response/docs==[{id:'16'},{id:'15'}]" |
| ); |
| |
| // more complex filter |
| assertJQ(req("qt","/get","ids","17,16,15,14", "fl","id", "fq","{!frange l=15 u=16}id") |
| ,"/response/docs==[{id:'16'},{id:'15'}]" |
| ); |
| |
| realtimeHolder.decref(); |
| req.close(); |
| |
| } |
| |
| |
| @Test |
| public void testVersions() throws Exception { |
| clearIndex(); |
| assertU(commit()); |
| |
| long version = addAndGetVersion(sdoc("id","1") , null); |
| |
| assertJQ(req("q","id:1") |
| ,"/response/numFound==0" |
| ); |
| |
| // test version is there from rtg |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':{'id':'1','_version_':" + version + "}}" |
| ); |
| |
| // test version is there from the index |
| assertU(commit()); |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':{'id':'1','_version_':" + version + "}}" |
| ); |
| |
| // simulate an update from the leader |
| version += 10; |
| updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // test version is there from rtg |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':{'id':'1','_version_':" + version + "}}" |
| ); |
| |
| // simulate reordering: test that a version less than that does not take affect |
| updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // test that version hasn't changed |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':{'id':'1','_version_':" + version + "}}" |
| ); |
| |
| // simulate reordering: test that a delete w/ version less than that does not take affect |
| // TODO: also allow passing version on delete instead of on URL? |
| updateJ(jsonDelId("1"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(version - 1))); |
| |
| // test that version hasn't changed |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':{'id':'1','_version_':" + version + "}}" |
| ); |
| |
| // make sure reordering detection also works after a commit |
| assertU(commit()); |
| |
| // simulate reordering: test that a version less than that does not take affect |
| updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // test that version hasn't changed |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':{'id':'1','_version_':" + version + "}}" |
| ); |
| |
| // simulate reordering: test that a delete w/ version less than that does not take affect |
| updateJ(jsonDelId("1"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(version - 1))); |
| |
| // test that version hasn't changed |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':{'id':'1','_version_':" + version + "}}" |
| ); |
| |
| // now simulate a normal delete from the leader |
| version += 5; |
| updateJ(jsonDelId("1"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(version))); |
| |
| // make sure a reordered add doesn't take affect. |
| updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // test that it's still deleted |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':null}" |
| ); |
| |
| // test that we can remember the version of a delete after a commit |
| assertU(commit()); |
| |
| // make sure a reordered add doesn't take affect. |
| long version2 = deleteByQueryAndGetVersion("id:2", null); |
| |
| // test that it's still deleted |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':null}" |
| ); |
| |
| version = addAndGetVersion(sdoc("id","2"), null); |
| version2 = deleteByQueryAndGetVersion("id:2", null); |
| assertTrue(Math.abs(version2) > version ); |
| |
| // test that it's deleted |
| assertJQ(req("qt","/get","id","2") |
| ,"=={'doc':null}"); |
| |
| |
| version2 = Math.abs(version2) + 1000; |
| updateJ(jsonAdd(sdoc("id","3", "_version_",Long.toString(version2+100))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","4", "_version_",Long.toString(version2+200))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // this should only affect id:3 so far |
| deleteByQueryAndGetVersion("id:(3 4 5 6)", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(-(version2+150))) ); |
| |
| assertJQ(req("qt","/get","id","3"),"=={'doc':null}"); |
| assertJQ(req("qt","/get","id","4", "fl","id"),"=={'doc':{'id':'4'}}"); |
| |
| updateJ(jsonAdd(sdoc("id","5", "_version_",Long.toString(version2+201))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| updateJ(jsonAdd(sdoc("id","6", "_version_",Long.toString(version2+101))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); |
| |
| // the DBQ should also have caused id:6 to be removed |
| assertJQ(req("qt","/get","id","5", "fl","id"),"=={'doc':{'id':'5'}}"); |
| assertJQ(req("qt","/get","id","6"),"=={'doc':null}"); |
| |
| assertU(commit()); |
| |
| } |
| |
| @Test |
| public void testOptimisticLocking() throws Exception { |
| clearIndex(); |
| assertU(commit()); |
| |
| final long version = addAndGetVersion(sdoc("id","1") , null); |
| long version2; |
| |
| // try version added directly on doc |
| SolrException se = expectThrows(SolrException.class, "version should cause an error", |
| () -> addAndGetVersion(sdoc("id","1", "_version_", Long.toString(version-1)), null)); |
| assertEquals("version should cause a conflict", 409, se.code()); |
| |
| // try version added as a parameter on the request |
| se = expectThrows(SolrException.class, "version should cause an error", |
| () -> addAndGetVersion(sdoc("id","1"), params("_version_", Long.toString(version-1)))); |
| assertEquals("version should cause a conflict", 409, se.code()); |
| |
| // try an add specifying a negative version |
| se = expectThrows(SolrException.class, "negative version should cause a conflict", |
| () -> addAndGetVersion(sdoc("id","1"), params("_version_", Long.toString(-version)))); |
| assertEquals("version should cause a conflict", 409, se.code()); |
| |
| // try an add with a greater version |
| se = expectThrows(SolrException.class, "greater version should cause a conflict", |
| () -> addAndGetVersion(sdoc("id","1"), params("_version_", Long.toString(version+random().nextInt(1000)+1)))); |
| assertEquals("version should cause a conflict", 409, se.code()); |
| |
| // |
| // deletes |
| // |
| |
| // try a delete with version on the request |
| se = expectThrows(SolrException.class, "version should cause an error", |
| () -> deleteAndGetVersion("1", params("_version_", Long.toString(version-1)))); |
| assertEquals("version should cause a conflict", 409, se.code()); |
| |
| // try a delete with a negative version |
| se = expectThrows(SolrException.class, "negative version should cause an error", |
| () -> deleteAndGetVersion("1", params("_version_", Long.toString(-version)))); |
| assertEquals("version should cause a conflict", 409, se.code()); |
| |
| // try a delete with a greater version |
| se = expectThrows(SolrException.class, "greater version should cause an error", |
| () -> deleteAndGetVersion("1", params("_version_", Long.toString(version+random().nextInt(1000)+1)))); |
| assertEquals("version should cause a conflict", 409, se.code()); |
| |
| // try a delete of a document that doesn't exist, specifying a specific version |
| se = expectThrows(SolrException.class, "document does not exist should cause an error", |
| () -> deleteAndGetVersion("I_do_not_exist", params("_version_", Long.toString(version)))); |
| assertEquals("version should cause a conflict", 409, se.code()); |
| |
| |
| // try a delete of a document that doesn't exist, specifying that it should not |
| version2 = deleteAndGetVersion("I_do_not_exist", params("_version_", Long.toString(-1))); |
| assertTrue(version2 < 0); |
| |
| // overwrite the document |
| version2 = addAndGetVersion(sdoc("id","1", "_version_", Long.toString(version)), null); |
| assertTrue(version2 > version); |
| |
| // overwriting the previous version should now fail |
| se = expectThrows(SolrException.class, "overwriting previous version should fail", |
| () -> addAndGetVersion(sdoc("id","1"), params("_version_", Long.toString(version)))); |
| assertEquals(409, se.code()); |
| |
| // deleting the previous version should now fail |
| se = expectThrows(SolrException.class, "deleting the previous version should now fail", |
| () -> deleteAndGetVersion("1", params("_version_", Long.toString(version)))); |
| assertEquals(409, se.code()); |
| |
| final long prevVersion = version2; |
| |
| // deleting the current version should work |
| version2 = deleteAndGetVersion("1", params("_version_", Long.toString(prevVersion))); |
| |
| // overwriting the previous existing doc should now fail (since it was deleted) |
| se = expectThrows(SolrException.class, "overwriting the previous existing doc should now fail (since it was deleted)", |
| () -> addAndGetVersion(sdoc("id","1"), params("_version_", Long.toString(prevVersion)))); |
| assertEquals(409, se.code()); |
| |
| // deleting the previous existing doc should now fail (since it was deleted) |
| se = expectThrows(SolrException.class, "deleting the previous existing doc should now fail (since it was deleted)", |
| () -> deleteAndGetVersion("1", params("_version_", Long.toString(prevVersion)))); |
| assertEquals(409, se.code()); |
| |
| // overwriting a negative version should work |
| version2 = addAndGetVersion(sdoc("id","1"), params("_version_", Long.toString(-(prevVersion-1)))); |
| assertTrue(version2 > version); |
| long lastVersion = version2; |
| |
| // sanity test that we see the right version via rtg |
| assertJQ(req("qt","/get","id","1") |
| ,"=={'doc':{'id':'1','_version_':" + lastVersion + "}}" |
| ); |
| } |
| |
| |
| /*** |
| @Test |
| public void testGetRealtime() throws Exception { |
| SolrQueryRequest sr1 = req("q","foo"); |
| IndexReader r1 = sr1.getCore().getRealtimeReader(); |
| |
| assertU(adoc("id","1")); |
| |
| IndexReader r2 = sr1.getCore().getRealtimeReader(); |
| assertNotSame(r1, r2); |
| int refcount = r2.getRefCount(); |
| |
| // make sure a new reader wasn't opened |
| IndexReader r3 = sr1.getCore().getRealtimeReader(); |
| assertSame(r2, r3); |
| assertEquals(refcount+1, r3.getRefCount()); |
| |
| assertU(commit()); |
| |
| // this is not critical, but currently a commit does not refresh the reader |
| // if nothing has changed |
| IndexReader r4 = sr1.getCore().getRealtimeReader(); |
| assertEquals(refcount+2, r4.getRefCount()); |
| |
| |
| r1.decRef(); |
| r2.decRef(); |
| r3.decRef(); |
| r4.decRef(); |
| sr1.close(); |
| } |
| ***/ |
| |
| |
| @Test |
| public void testStressGetRealtime() throws Exception { |
| clearIndex(); |
| assertU(commit()); |
| |
| // req().getCore().getUpdateHandler().getIndexWriterProvider().getIndexWriter(req().getCore()).setInfoStream(System.out); |
| |
| final int commitPercent = 5 + random().nextInt(20); |
| final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft |
| final int deletePercent = 4+random().nextInt(25); |
| final int deleteByQueryPercent = 1+random().nextInt(5); |
| final int optimisticPercent = 1+random().nextInt(50); // percent change that an update uses optimistic locking |
| final int optimisticCorrectPercent = 25+random().nextInt(70); // percent change that a version specified will be correct |
| final int filteredGetPercent = random().nextInt( random().nextInt(20)+1 ); // percent of time that a get will be filtered... we normally don't want too high. |
| final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200)); |
| int nWriteThreads = 5 + random().nextInt(25); |
| |
| final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... |
| |
| // query variables |
| final int percentRealtimeQuery = 60; |
| final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total |
| int nReadThreads = 5 + random().nextInt(25); |
| |
| |
| verbose("commitPercent=", commitPercent); |
| verbose("softCommitPercent=",softCommitPercent); |
| verbose("deletePercent=",deletePercent); |
| verbose("deleteByQueryPercent=", deleteByQueryPercent); |
| verbose("ndocs=", ndocs); |
| verbose("nWriteThreads=", nWriteThreads); |
| verbose("nReadThreads=", nReadThreads); |
| verbose("percentRealtimeQuery=", percentRealtimeQuery); |
| verbose("maxConcurrentCommits=", maxConcurrentCommits); |
| verbose("operations=", operations); |
| |
| |
| initModel(ndocs); |
| |
| final AtomicInteger numCommitting = new AtomicInteger(); |
| |
| List<Thread> threads = new ArrayList<>(); |
| |
| for (int i=0; i<nWriteThreads; i++) { |
| Thread thread = new Thread("WRITER"+i) { |
| Random rand = new Random(random().nextInt()); |
| |
| @Override |
| public void run() { |
| try { |
| while (operations.get() > 0) { |
| int oper = rand.nextInt(100); |
| |
| if (oper < commitPercent) { |
| if (numCommitting.incrementAndGet() <= maxConcurrentCommits) { |
| Map<Integer,DocInfo> newCommittedModel; |
| long version; |
| |
| synchronized(TestRealTimeGet.this) { |
| newCommittedModel = new HashMap<>(model); // take a snapshot |
| version = snapshotCount++; |
| verbose("took snapshot version=",version); |
| } |
| |
| if (rand.nextInt(100) < softCommitPercent) { |
| verbose("softCommit start"); |
| assertU(TestHarness.commit("softCommit","true")); |
| verbose("softCommit end"); |
| } else { |
| verbose("hardCommit start"); |
| assertU(commit()); |
| verbose("hardCommit end"); |
| } |
| |
| synchronized(TestRealTimeGet.this) { |
| // install this model snapshot only if it's newer than the current one |
| if (version >= committedModelClock) { |
| if (VERBOSE) { |
| verbose("installing new committedModel version="+committedModelClock); |
| } |
| committedModel = newCommittedModel; |
| committedModelClock = version; |
| } |
| } |
| } |
| numCommitting.decrementAndGet(); |
| continue; |
| } |
| |
| |
| int id = rand.nextInt(ndocs); |
| Object sync = syncArr[id]; |
| |
| // set the lastId before we actually change it sometimes to try and |
| // uncover more race conditions between writing and reading |
| boolean before = rand.nextBoolean(); |
| if (before) { |
| lastId = id; |
| } |
| |
| // We can't concurrently update the same document and retain our invariants of increasing values |
| // since we can't guarantee what order the updates will be executed. |
| // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals. |
| synchronized (sync) { |
| DocInfo info = model.get(id); |
| |
| long val = info.val; |
| long nextVal = Math.abs(val)+1; |
| |
| if (oper < commitPercent + deletePercent) { |
| boolean opt = rand.nextInt() < optimisticPercent; |
| boolean correct = opt ? rand.nextInt() < optimisticCorrectPercent : false; |
| long badVersion = correct ? 0 : badVersion(rand, info.version); |
| |
| if (VERBOSE) { |
| if (!opt) { |
| verbose("deleting id",id,"val=",nextVal); |
| } else { |
| verbose("deleting id",id,"val=",nextVal, "existing_version=",info.version, (correct ? "" : (" bad_version=" + badVersion))); |
| } |
| } |
| |
| // assertU("<delete><id>" + id + "</id></delete>"); |
| Long version = null; |
| |
| if (opt) { |
| if (correct) { |
| version = deleteAndGetVersion(Integer.toString(id), params("_version_", Long.toString(info.version))); |
| } else { |
| SolrException se = expectThrows(SolrException.class, "should not get random version", |
| () -> deleteAndGetVersion(Integer.toString(id), params("_version_", Long.toString(badVersion)))); |
| assertEquals(409, se.code()); |
| } |
| } else { |
| version = deleteAndGetVersion(Integer.toString(id), null); |
| } |
| |
| if (version != null) { |
| model.put(id, new DocInfo(version, -nextVal)); |
| } |
| |
| if (VERBOSE) { |
| verbose("deleting id", id, "val=",nextVal,"DONE"); |
| } |
| } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) { |
| if (VERBOSE) { |
| verbose("deleteByQuery id ",id, "val=",nextVal); |
| } |
| |
| assertU("<delete><query>id:" + id + "</query></delete>"); |
| model.put(id, new DocInfo(-1L, -nextVal)); |
| if (VERBOSE) { |
| verbose("deleteByQuery id",id, "val=",nextVal,"DONE"); |
| } |
| } else { |
| boolean opt = rand.nextInt() < optimisticPercent; |
| boolean correct = opt ? rand.nextInt() < optimisticCorrectPercent : false; |
| long badVersion = correct ? 0 : badVersion(rand, info.version); |
| |
| if (VERBOSE) { |
| if (!opt) { |
| verbose("adding id",id,"val=",nextVal); |
| } else { |
| verbose("adding id",id,"val=",nextVal, "existing_version=",info.version, (correct ? "" : (" bad_version=" + badVersion))); |
| } |
| } |
| |
| Long version = null; |
| SolrInputDocument sd = sdoc("id", Integer.toString(id), FIELD, Long.toString(nextVal)); |
| |
| if (opt) { |
| if (correct) { |
| version = addAndGetVersion(sd, params("_version_", Long.toString(info.version))); |
| } else { |
| SolrException se = expectThrows(SolrException.class, "should not get bad version", |
| () -> addAndGetVersion(sd, params("_version_", Long.toString(badVersion)))); |
| assertEquals(409, se.code()); |
| } |
| } else { |
| version = addAndGetVersion(sd, null); |
| } |
| |
| |
| if (version != null) { |
| model.put(id, new DocInfo(version, nextVal)); |
| } |
| |
| if (VERBOSE) { |
| verbose("adding id", id, "val=", nextVal,"DONE"); |
| } |
| |
| } |
| } // end sync |
| |
| if (!before) { |
| lastId = id; |
| } |
| } |
| } catch (Throwable e) { |
| operations.set(-1L); |
| throw new RuntimeException(e); |
| } |
| } |
| }; |
| |
| threads.add(thread); |
| } |
| |
| |
| for (int i=0; i<nReadThreads; i++) { |
| Thread thread = new Thread("READER"+i) { |
| Random rand = new Random(random().nextInt()); |
| |
| @Override |
| public void run() { |
| try { |
| while (operations.decrementAndGet() >= 0) { |
| // bias toward a recently changed doc |
| int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs); |
| |
| // when indexing, we update the index, then the model |
| // so when querying, we should first check the model, and then the index |
| |
| boolean realTime = rand.nextInt(100) < percentRealtimeQuery; |
| DocInfo info; |
| |
| if (realTime) { |
| info = model.get(id); |
| } else { |
| synchronized(TestRealTimeGet.this) { |
| info = committedModel.get(id); |
| } |
| } |
| |
| if (VERBOSE) { |
| verbose("querying id", id); |
| } |
| |
| boolean filteredOut = false; |
| SolrQueryRequest sreq; |
| if (realTime) { |
| ModifiableSolrParams p = params("wt","json", "qt","/get", "ids",Integer.toString(id)); |
| if (rand.nextInt(100) < filteredGetPercent) { |
| int idToFilter = rand.nextBoolean() ? id : rand.nextInt(ndocs); |
| filteredOut = idToFilter != id; |
| p.add("fq", "id:"+idToFilter); |
| } |
| sreq = req(p); |
| } else { |
| sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true"); |
| } |
| |
| String response = h.query(sreq); |
| @SuppressWarnings({"rawtypes"}) |
| Map rsp = (Map) Utils.fromJSONString(response); |
| @SuppressWarnings({"rawtypes"}) |
| List doclist = (List)(((Map)rsp.get("response")).get("docs")); |
| if (doclist.size() == 0) { |
| // there's no info we can get back with a delete, so not much we can check without further synchronization |
| // This is also correct when filteredOut==true |
| } else { |
| assertEquals(1, doclist.size()); |
| long foundVal = (Long)(((Map)doclist.get(0)).get(FIELD)); |
| long foundVer = (Long)(((Map)doclist.get(0)).get("_version_")); |
| if (filteredOut || foundVal < Math.abs(info.val) |
| || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must |
| verbose("ERROR, id=", id, "found=",response,"model",info); |
| assertTrue(false); |
| } |
| } |
| } |
| } |
| catch (Throwable e) { |
| operations.set(-1L); |
| throw new RuntimeException(e); |
| } |
| } |
| }; |
| |
| threads.add(thread); |
| } |
| |
| |
| for (Thread thread : threads) { |
| thread.start(); |
| } |
| |
| for (Thread thread : threads) { |
| thread.join(); |
| } |
| |
| } |
| |
| |
| } |