blob: b552cdbd87ae528f42ba54ccb60f7eafc5553b61 [file] [log] [blame]
using J2N.Threading;
using J2N.Threading.Atomic;
using Lucene.Net.Diagnostics;
using Lucene.Net.Documents;
using Lucene.Net.Support;
using NUnit.Framework;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Console = Lucene.Net.Util.SystemConsole;
namespace Lucene.Net.Index
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using Directory = Lucene.Net.Store.Directory;
using Document = Documents.Document;
using FieldType = FieldType;
using IndexSearcher = Lucene.Net.Search.IndexSearcher;
using LuceneTestCase = Lucene.Net.Util.LuceneTestCase;
using MockAnalyzer = Lucene.Net.Analysis.MockAnalyzer;
using Query = Lucene.Net.Search.Query;
using ScoreDoc = Lucene.Net.Search.ScoreDoc;
using TermQuery = Lucene.Net.Search.TermQuery;
using TestUtil = Lucene.Net.Util.TestUtil;
using TopDocs = Lucene.Net.Search.TopDocs;
[TestFixture]
public class TestStressNRT : LuceneTestCase
{
private volatile DirectoryReader reader;
private readonly ConcurrentDictionary<int, long> model = new ConcurrentDictionary<int, long>();
private IDictionary<int, long> committedModel = new Dictionary<int, long>();
private long snapshotCount;
private long committedModelClock;
private volatile int lastId;
private readonly string field = "val_l";
private object[] syncArr;
private void InitModel(int ndocs)
{
snapshotCount = 0;
committedModelClock = 0;
lastId = 0;
syncArr = new object[ndocs];
for (int i = 0; i < ndocs; i++)
{
model[i] = -1L;
syncArr[i] = new object();
}
committedModel.PutAll(model);
}
[Test]
public virtual void Test()
{
// update variables
int commitPercent = Random.Next(20);
int softCommitPercent = Random.Next(100); // what percent of the commits are soft
int deletePercent = Random.Next(50);
int deleteByQueryPercent = Random.Next(25);
int ndocs = AtLeast(50);
int nWriteThreads = TestUtil.NextInt32(Random, 1, TestNightly ? 10 : 5);
int maxConcurrentCommits = TestUtil.NextInt32(Random, 1, TestNightly ? 10 : 5); // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
bool tombstones = Random.NextBoolean();
// query variables
AtomicInt64 operations = new AtomicInt64(AtLeast(10000)); // number of query operations to perform in total
int nReadThreads = TestUtil.NextInt32(Random, 1, TestNightly ? 10 : 5);
InitModel(ndocs);
FieldType storedOnlyType = new FieldType();
storedOnlyType.IsStored = true;
if (Verbose)
{
Console.WriteLine("\n");
Console.WriteLine("TEST: commitPercent=" + commitPercent);
Console.WriteLine("TEST: softCommitPercent=" + softCommitPercent);
Console.WriteLine("TEST: deletePercent=" + deletePercent);
Console.WriteLine("TEST: deleteByQueryPercent=" + deleteByQueryPercent);
Console.WriteLine("TEST: ndocs=" + ndocs);
Console.WriteLine("TEST: nWriteThreads=" + nWriteThreads);
Console.WriteLine("TEST: nReadThreads=" + nReadThreads);
Console.WriteLine("TEST: maxConcurrentCommits=" + maxConcurrentCommits);
Console.WriteLine("TEST: tombstones=" + tombstones);
Console.WriteLine("TEST: operations=" + operations);
Console.WriteLine("\n");
}
AtomicInt32 numCommitting = new AtomicInt32();
IList<ThreadJob> threads = new List<ThreadJob>();
Directory dir = NewDirectory();
RandomIndexWriter writer = new RandomIndexWriter(Random, dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)));
writer.DoRandomForceMergeAssert = false;
writer.Commit();
reader = DirectoryReader.Open(dir);
for (int i = 0; i < nWriteThreads; i++)
{
ThreadJob thread = new ThreadAnonymousInnerClassHelper(this, "WRITER" + i, commitPercent, softCommitPercent, deletePercent, deleteByQueryPercent, ndocs, maxConcurrentCommits, tombstones, operations, storedOnlyType, numCommitting, writer);
threads.Add(thread);
}
for (int i = 0; i < nReadThreads; i++)
{
ThreadJob thread = new ThreadAnonymousInnerClassHelper2(this, "READER" + i, ndocs, tombstones, operations);
threads.Add(thread);
}
foreach (ThreadJob thread in threads)
{
thread.Start();
}
foreach (ThreadJob thread in threads)
{
thread.Join();
}
writer.Dispose();
if (Verbose)
{
Console.WriteLine("TEST: close reader=" + reader);
}
reader.Dispose();
dir.Dispose();
}
private class ThreadAnonymousInnerClassHelper : ThreadJob
{
private readonly TestStressNRT outerInstance;
private readonly int commitPercent;
private readonly int softCommitPercent;
private readonly int deletePercent;
private readonly int deleteByQueryPercent;
private readonly int ndocs;
private readonly int maxConcurrentCommits;
private readonly bool tombstones;
private readonly AtomicInt64 operations;
private readonly FieldType storedOnlyType;
private readonly AtomicInt32 numCommitting;
private readonly RandomIndexWriter writer;
public ThreadAnonymousInnerClassHelper(TestStressNRT outerInstance, string str, int commitPercent, int softCommitPercent, int deletePercent, int deleteByQueryPercent, int ndocs, int maxConcurrentCommits, bool tombstones, AtomicInt64 operations, FieldType storedOnlyType, AtomicInt32 numCommitting, RandomIndexWriter writer)
: base(str)
{
this.outerInstance = outerInstance;
this.commitPercent = commitPercent;
this.softCommitPercent = softCommitPercent;
this.deletePercent = deletePercent;
this.deleteByQueryPercent = deleteByQueryPercent;
this.ndocs = ndocs;
this.maxConcurrentCommits = maxConcurrentCommits;
this.tombstones = tombstones;
this.operations = operations;
this.storedOnlyType = storedOnlyType;
this.numCommitting = numCommitting;
this.writer = writer;
rand = new Random(Random.Next());
}
internal Random rand;
public override void Run()
{
try
{
while (operations > 0)
{
int oper = rand.Next(100);
if (oper < commitPercent)
{
if (numCommitting.IncrementAndGet() <= maxConcurrentCommits)
{
IDictionary<int, long> newCommittedModel;
long version;
DirectoryReader oldReader;
lock (outerInstance)
{
newCommittedModel = new Dictionary<int, long>(outerInstance.model); // take a snapshot
version = outerInstance.snapshotCount++;
oldReader = outerInstance.reader;
oldReader.IncRef(); // increment the reference since we will use this for reopening
}
DirectoryReader newReader;
if (rand.Next(100) < softCommitPercent)
{
// assertU(h.Commit("softCommit","true"));
if (Random.NextBoolean())
{
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": call writer.getReader");
}
newReader = writer.GetReader(true);
}
else
{
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": reopen reader=" + oldReader + " version=" + version);
}
newReader = DirectoryReader.OpenIfChanged(oldReader, writer.IndexWriter, true);
}
}
else
{
// assertU(commit());
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": commit+reopen reader=" + oldReader + " version=" + version);
}
writer.Commit();
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": now reopen after commit");
}
newReader = DirectoryReader.OpenIfChanged(oldReader);
}
// Code below assumes newReader comes w/
// extra ref:
if (newReader == null)
{
oldReader.IncRef();
newReader = oldReader;
}
oldReader.DecRef();
lock (outerInstance)
{
// install the new reader if it's newest (and check the current version since another reader may have already been installed)
//System.out.println(Thread.currentThread().getName() + ": newVersion=" + newReader.getVersion());
if (Debugging.AssertsEnabled) Debugging.Assert(newReader.RefCount > 0);
if (Debugging.AssertsEnabled) Debugging.Assert(outerInstance.reader.RefCount > 0);
if (newReader.Version > outerInstance.reader.Version)
{
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": install new reader=" + newReader);
}
outerInstance.reader.DecRef();
outerInstance.reader = newReader;
// Silly: forces fieldInfos to be
// loaded so we don't hit IOE on later
// reader.toString
newReader.ToString();
// install this snapshot only if it's newer than the current one
if (version >= outerInstance.committedModelClock)
{
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": install new model version=" + version);
}
outerInstance.committedModel = newCommittedModel;
outerInstance.committedModelClock = version;
}
else
{
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": skip install new model version=" + version);
}
}
}
else
{
// if the same reader, don't decRef.
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": skip install new reader=" + newReader);
}
newReader.DecRef();
}
}
}
numCommitting.DecrementAndGet();
}
else
{
int id = rand.Next(ndocs);
object sync = outerInstance.syncArr[id];
// set the lastId before we actually change it sometimes to try and
// uncover more race conditions between writing and reading
bool before = Random.NextBoolean();
if (before)
{
outerInstance.lastId = id;
}
// We can't concurrently update the same document and retain our invariants of increasing values
// since we can't guarantee what order the updates will be executed.
lock (sync)
{
long val = outerInstance.model[id];
long nextVal = Math.Abs(val) + 1;
if (oper < commitPercent + deletePercent)
{
// assertU("<delete><id>" + id + "</id></delete>");
// add tombstone first
if (tombstones)
{
Document d = new Document();
d.Add(NewStringField("id", "-" + Convert.ToString(id), Documents.Field.Store.YES));
d.Add(NewField(outerInstance.field, Convert.ToString(nextVal), storedOnlyType));
writer.UpdateDocument(new Term("id", "-" + Convert.ToString(id)), d);
}
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": term delDocs id:" + id + " nextVal=" + nextVal);
}
writer.DeleteDocuments(new Term("id", Convert.ToString(id)));
outerInstance.model[id] = -nextVal;
}
else if (oper < commitPercent + deletePercent + deleteByQueryPercent)
{
//assertU("<delete><query>id:" + id + "</query></delete>");
// add tombstone first
if (tombstones)
{
Document d = new Document();
d.Add(NewStringField("id", "-" + Convert.ToString(id), Documents.Field.Store.YES));
d.Add(NewField(outerInstance.field, Convert.ToString(nextVal), storedOnlyType));
writer.UpdateDocument(new Term("id", "-" + Convert.ToString(id)), d);
}
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": query delDocs id:" + id + " nextVal=" + nextVal);
}
writer.DeleteDocuments(new TermQuery(new Term("id", Convert.ToString(id))));
outerInstance.model[id] = -nextVal;
}
else
{
// assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
Document d = new Document();
d.Add(NewStringField("id", Convert.ToString(id), Documents.Field.Store.YES));
d.Add(NewField(outerInstance.field, Convert.ToString(nextVal), storedOnlyType));
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": u id:" + id + " val=" + nextVal);
}
writer.UpdateDocument(new Term("id", Convert.ToString(id)), d);
if (tombstones)
{
// remove tombstone after new addition (this should be optional?)
writer.DeleteDocuments(new Term("id", "-" + Convert.ToString(id)));
}
outerInstance.model[id] = nextVal;
}
}
if (!before)
{
outerInstance.lastId = id;
}
}
}
}
catch (Exception e)
{
Console.WriteLine(Thread.CurrentThread.Name + ": FAILED: unexpected exception");
Console.WriteLine(e.StackTrace);
throw new Exception(e.Message, e);
}
}
}
private class ThreadAnonymousInnerClassHelper2 : ThreadJob
{
private readonly TestStressNRT outerInstance;
private readonly int ndocs;
private readonly bool tombstones;
private readonly AtomicInt64 operations;
public ThreadAnonymousInnerClassHelper2(TestStressNRT outerInstance, string str, int ndocs, bool tombstones, AtomicInt64 operations)
: base(str)
{
this.outerInstance = outerInstance;
this.ndocs = ndocs;
this.tombstones = tombstones;
this.operations = operations;
rand = new Random(Random.Next());
}
internal Random rand;
public override void Run()
{
try
{
IndexReader lastReader = null;
IndexSearcher lastSearcher = null;
while (operations.DecrementAndGet() >= 0)
{
// bias toward a recently changed doc
int id = rand.Next(100) < 25 ? outerInstance.lastId : rand.Next(ndocs);
// when indexing, we update the index, then the model
// so when querying, we should first check the model, and then the index
long val;
DirectoryReader r;
lock (outerInstance)
{
val = outerInstance.committedModel[id];
r = outerInstance.reader;
r.IncRef();
}
if (Verbose)
{
Console.WriteLine("TEST: " + Thread.CurrentThread.Name + ": s id=" + id + " val=" + val + " r=" + r.Version);
}
// sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
IndexSearcher searcher;
if (r == lastReader)
{
// Just re-use lastSearcher, else
// newSearcher may create too many thread
// pools (ExecutorService):
searcher = lastSearcher;
}
else
{
searcher = NewSearcher(
#if FEATURE_INSTANCE_TESTDATA_INITIALIZATION
outerInstance,
#endif
r);
lastReader = r;
lastSearcher = searcher;
}
Query q = new TermQuery(new Term("id", Convert.ToString(id)));
TopDocs results = searcher.Search(q, 10);
if (results.TotalHits == 0 && tombstones)
{
// if we couldn't find the doc, look for its tombstone
q = new TermQuery(new Term("id", "-" + Convert.ToString(id)));
results = searcher.Search(q, 1);
if (results.TotalHits == 0)
{
if (val == -1L)
{
// expected... no doc was added yet
r.DecRef();
continue;
}
Assert.Fail("No documents or tombstones found for id " + id + ", expected at least " + val + " reader=" + r);
}
}
if (results.TotalHits == 0 && !tombstones)
{
// nothing to do - we can't tell anything from a deleted doc without tombstones
}
else
{
// we should have found the document, or its tombstone
if (results.TotalHits != 1)
{
Console.WriteLine("FAIL: hits id:" + id + " val=" + val);
foreach (ScoreDoc sd in results.ScoreDocs)
{
Document doc = r.Document(sd.Doc);
Console.WriteLine(" docID=" + sd.Doc + " id:" + doc.Get("id") + " foundVal=" + doc.Get(outerInstance.field));
}
Assert.Fail("id=" + id + " reader=" + r + " totalHits=" + results.TotalHits);
}
Document doc_ = searcher.Doc(results.ScoreDocs[0].Doc);
long foundVal = Convert.ToInt64(doc_.Get(outerInstance.field));
if (foundVal < Math.Abs(val))
{
Assert.Fail("foundVal=" + foundVal + " val=" + val + " id=" + id + " reader=" + r);
}
}
r.DecRef();
}
}
catch (Exception e)
{
operations.Value = ((int)-1L);
Console.WriteLine(Thread.CurrentThread.Name + ": FAILED: unexpected exception");
Console.WriteLine(e.StackTrace);
throw new Exception(e.Message, e);
}
}
}
}
}