Revert "Add new parallel merge task executor for parallel actions within a si…"
This reverts commit e3a34bfe56ae555fdf36f7a43c2f0d2698101c89.
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index c11c775..2d98bc5 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -208,10 +208,6 @@
* GITHUB#13156: Hunspell: don't proceed with other suggestions if we found good REP ones (Peter Gromov)
-* GITHUB#13124: MergeScheduler can now provide an executor for intra-merge parallelism. The first
- implementation is the ConcurrentMergeScheduler and the Lucene99HnswVectorsFormat will use it if no other
- executor is provided. (Ben Trent)
-
Optimizations
---------------------
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java
index bcea401..582155d 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java
@@ -24,8 +24,6 @@
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.codecs.KnnVectorsWriter;
import org.apache.lucene.codecs.lucene90.IndexedDISI;
-import org.apache.lucene.index.MergePolicy;
-import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.search.DocIdSetIterator;
@@ -165,8 +163,7 @@
* @param numMergeWorkers number of workers (threads) that will be used when doing merge. If
* larger than 1, a non-null {@link ExecutorService} must be passed as mergeExec
* @param mergeExec the {@link ExecutorService} that will be used by ALL vector writers that are
- * generated by this format to do the merge. If null, the configured {@link
- * MergeScheduler#getIntraMergeExecutor(MergePolicy.OneMerge)} is used.
+ * generated by this format to do the merge
*/
public Lucene99HnswVectorsFormat(
int maxConn, int beamWidth, int numMergeWorkers, ExecutorService mergeExec) {
@@ -187,6 +184,10 @@
}
this.maxConn = maxConn;
this.beamWidth = beamWidth;
+ if (numMergeWorkers > 1 && mergeExec == null) {
+ throw new IllegalArgumentException(
+ "No executor service passed in when " + numMergeWorkers + " merge workers are requested");
+ }
if (numMergeWorkers == 1 && mergeExec != null) {
throw new IllegalArgumentException(
"No executor service is needed as we'll use single thread to merge");
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java
index 1bd2c53..a236dd7 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java
@@ -352,14 +352,7 @@
int[][] vectorIndexNodeOffsets = null;
if (scorerSupplier.totalVectorCount() > 0) {
// build graph
- HnswGraphMerger merger =
- createGraphMerger(
- fieldInfo,
- scorerSupplier,
- mergeState.intraMergeTaskExecutor == null
- ? null
- : new TaskExecutor(mergeState.intraMergeTaskExecutor),
- numMergeWorkers);
+ HnswGraphMerger merger = createGraphMerger(fieldInfo, scorerSupplier);
for (int i = 0; i < mergeState.liveDocs.length; i++) {
merger.addReader(
mergeState.knnVectorsReaders[i], mergeState.docMaps[i], mergeState.liveDocs[i]);
@@ -496,23 +489,11 @@
}
private HnswGraphMerger createGraphMerger(
- FieldInfo fieldInfo,
- RandomVectorScorerSupplier scorerSupplier,
- TaskExecutor parallelMergeTaskExecutor,
- int numParallelMergeWorkers) {
+ FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) {
if (mergeExec != null) {
return new ConcurrentHnswMerger(
fieldInfo, scorerSupplier, M, beamWidth, mergeExec, numMergeWorkers);
}
- if (parallelMergeTaskExecutor != null) {
- return new ConcurrentHnswMerger(
- fieldInfo,
- scorerSupplier,
- M,
- beamWidth,
- parallelMergeTaskExecutor,
- numParallelMergeWorkers);
- }
return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth);
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
index 25b489a..8ffbbd7 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
@@ -21,12 +21,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
-import java.util.concurrent.Executor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.MergePolicy.OneMerge;
+import org.apache.lucene.internal.tests.ConcurrentMergeSchedulerAccess;
import org.apache.lucene.internal.tests.TestSecrets;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
@@ -112,9 +109,6 @@
private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;
- /** The executor provided for intra-merge parallelization */
- protected CachedExecutor intraMergeExecutor;
-
/** Sole constructor, with all settings set to default values. */
public ConcurrentMergeScheduler() {}
@@ -266,16 +260,6 @@
}
@Override
- public Executor getIntraMergeExecutor(OneMerge merge) {
- assert intraMergeExecutor != null : "scaledExecutor is not initialized";
- // don't do multithreaded merges for small merges
- if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB * 1024 * 1024) {
- return super.getIntraMergeExecutor(merge);
- }
- return intraMergeExecutor;
- }
-
- @Override
public Directory wrapForMerge(OneMerge merge, Directory in) {
Thread mergeThread = Thread.currentThread();
if (!MergeThread.class.isInstance(mergeThread)) {
@@ -462,13 +446,7 @@
@Override
public void close() {
- try {
- sync();
- } finally {
- if (intraMergeExecutor != null) {
- intraMergeExecutor.shutdown();
- }
- }
+ sync();
}
/**
@@ -532,9 +510,6 @@
void initialize(InfoStream infoStream, Directory directory) throws IOException {
super.initialize(infoStream, directory);
initDynamicDefaults(directory);
- if (intraMergeExecutor == null) {
- intraMergeExecutor = new CachedExecutor();
- }
}
@Override
@@ -780,16 +755,11 @@
@Override
public String toString() {
- return getClass().getSimpleName()
- + ": "
- + "maxThreadCount="
- + maxThreadCount
- + ", "
- + "maxMergeCount="
- + maxMergeCount
- + ", "
- + "ioThrottle="
- + doAutoIOThrottle;
+ StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
+ sb.append("maxThreadCount=").append(maxThreadCount).append(", ");
+ sb.append("maxMergeCount=").append(maxMergeCount).append(", ");
+ sb.append("ioThrottle=").append(doAutoIOThrottle);
+ return sb.toString();
}
private boolean isBacklog(long now, OneMerge merge) {
@@ -932,58 +902,12 @@
}
static {
- TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions);
- }
-
- /**
- * This executor provides intra-merge threads for parallel execution of merge tasks. It provides a
- * limited number of threads to execute merge tasks. In particular, if the number of
- * `mergeThreads` is equal to `maxThreadCount`, then the executor will execute the merge task in
- * the calling thread.
- */
- private class CachedExecutor implements Executor {
-
- private final AtomicInteger activeCount = new AtomicInteger(0);
- private final ThreadPoolExecutor executor;
-
- public CachedExecutor() {
- this.executor =
- new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new SynchronousQueue<>());
- }
-
- void shutdown() {
- executor.shutdown();
- }
-
- @Override
- public void execute(Runnable command) {
- final boolean isThreadAvailable;
- // we need to check if a thread is available before submitting the task to the executor
- // synchronize on CMS to get an accurate count of current threads
- synchronized (ConcurrentMergeScheduler.this) {
- int max = maxThreadCount - mergeThreads.size() - 1;
- int value = activeCount.get();
- if (value < max) {
- activeCount.incrementAndGet();
- assert activeCount.get() > 0 : "active count must be greater than 0 after increment";
- isThreadAvailable = true;
- } else {
- isThreadAvailable = false;
- }
- }
- if (isThreadAvailable) {
- executor.execute(
- () -> {
- try {
- command.run();
- } finally {
- activeCount.decrementAndGet();
- assert activeCount.get() >= 0 : "unexpected negative active count";
- }
- });
- } else {
- command.run();
- }
- }
+ TestSecrets.setConcurrentMergeSchedulerAccess(
+ new ConcurrentMergeSchedulerAccess() {
+ @Override
+ public void setSuppressExceptions(ConcurrentMergeScheduler cms) {
+ cms.setSuppressExceptions();
+ }
+ });
}
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 6797a74..c81abc3 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -3430,14 +3430,7 @@
}
SegmentMerger merger =
- new SegmentMerger(
- readers,
- segInfo,
- infoStream,
- trackingDir,
- globalFieldNumberMap,
- context,
- mergeScheduler.getIntraMergeExecutor(merge));
+ new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context);
if (!merger.shouldMerge()) {
return;
@@ -5226,13 +5219,7 @@
final SegmentMerger merger =
new SegmentMerger(
- mergeReaders,
- merge.info.info,
- infoStream,
- dirWrapper,
- globalFieldNumberMap,
- context,
- mergeScheduler.getIntraMergeExecutor(merge));
+ mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context);
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
merge.checkAborted();
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
index b6cfe73..1017204 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
@@ -18,12 +18,10 @@
import java.io.Closeable;
import java.io.IOException;
-import java.util.concurrent.Executor;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.util.InfoStream;
-import org.apache.lucene.util.SameThreadExecutorService;
/**
* Expert: {@link IndexWriter} uses an instance implementing this interface to execute the merges
@@ -34,8 +32,6 @@
*/
public abstract class MergeScheduler implements Closeable {
- private final Executor executor = new SameThreadExecutorService();
-
/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
protected MergeScheduler() {}
@@ -56,15 +52,6 @@
return in;
}
- /**
- * Provides an executor for parallelism during a single merge operation. By default, the method
- * returns a {@link SameThreadExecutorService} where all intra-merge actions occur in their
- * calling thread.
- */
- public Executor getIntraMergeExecutor(OneMerge merge) {
- return executor;
- }
-
/** Close this MergeScheduler. */
@Override
public abstract void close() throws IOException;
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeState.java b/lucene/core/src/java/org/apache/lucene/index/MergeState.java
index de3c8d8..6153a57 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeState.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeState.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Locale;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.FieldsProducer;
@@ -85,23 +84,15 @@
/** InfoStream for debugging messages. */
public final InfoStream infoStream;
- /** Executor for intra merge activity */
- public final Executor intraMergeTaskExecutor;
-
/** Indicates if the index needs to be sorted * */
public boolean needsIndexSort;
/** Sole constructor. */
- MergeState(
- List<CodecReader> readers,
- SegmentInfo segmentInfo,
- InfoStream infoStream,
- Executor intraMergeTaskExecutor)
+ MergeState(List<CodecReader> readers, SegmentInfo segmentInfo, InfoStream infoStream)
throws IOException {
verifyIndexSort(readers, segmentInfo);
this.infoStream = infoStream;
int numReaders = readers.size();
- this.intraMergeTaskExecutor = intraMergeTaskExecutor;
maxDocs = new int[numReaders];
fieldsProducers = new FieldsProducer[numReaders];
diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
index fd6d2a8..0142288 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
@@ -16,7 +16,6 @@
*/
package org.apache.lucene.index;
-import java.util.concurrent.Executor;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;
@@ -53,9 +52,4 @@
public MergeScheduler clone() {
return this;
}
-
- @Override
- public Executor getIntraMergeExecutor(OneMerge merge) {
- throw new UnsupportedOperationException("NoMergeScheduler does not support merges");
- }
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
index 4589f7e..7fc2f04 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
@@ -17,10 +17,7 @@
package org.apache.lucene.index;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesConsumer;
@@ -31,7 +28,6 @@
import org.apache.lucene.codecs.PointsWriter;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.TermVectorsWriter;
-import org.apache.lucene.search.TaskExecutor;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.InfoStream;
@@ -60,14 +56,13 @@
InfoStream infoStream,
Directory dir,
FieldInfos.FieldNumbers fieldNumbers,
- IOContext context,
- Executor intraMergeTaskExecutor)
+ IOContext context)
throws IOException {
if (context.context != IOContext.Context.MERGE) {
throw new IllegalArgumentException(
"IOContext.context should be MERGE; got: " + context.context);
}
- mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor);
+ mergeState = new MergeState(readers, segmentInfo, infoStream);
directory = dir;
this.codec = segmentInfo.getCodec();
this.context = context;
@@ -135,36 +130,19 @@
IOContext.READ,
segmentWriteState.segmentSuffix);
- TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor);
- List<Callable<Void>> mergingTasks = new ArrayList<>();
- mergingTasks.add(
- () -> {
- if (mergeState.mergeFieldInfos.hasNorms()) {
- mergeWithLogging(
- this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
- }
+ if (mergeState.mergeFieldInfos.hasNorms()) {
+ mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
+ }
- mergeWithLogging(
- this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
- return null;
- });
+ mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
if (mergeState.mergeFieldInfos.hasDocValues()) {
- mergingTasks.add(
- () -> {
- mergeWithLogging(
- this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
- return null;
- });
+ mergeWithLogging(
+ this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
}
if (mergeState.mergeFieldInfos.hasPointValues()) {
- mergingTasks.add(
- () -> {
- mergeWithLogging(
- this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
- return null;
- });
+ mergeWithLogging(this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
}
if (mergeState.mergeFieldInfos.hasVectorValues()) {
@@ -177,14 +155,9 @@
}
if (mergeState.mergeFieldInfos.hasVectors()) {
- mergingTasks.add(
- () -> {
- mergeWithLogging(this::mergeTermVectors, "term vectors");
- return null;
- });
+ mergeWithLogging(this::mergeTermVectors, "term vectors");
}
- taskExecutor.invokeAll(mergingTasks);
// write the merged infos
mergeWithLogging(
this::mergeFieldInfos, segmentWriteState, segmentReadState, "field infos", numMerged);
diff --git a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java
index 66ae7bc..382389b 100644
--- a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java
+++ b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java
@@ -180,6 +180,9 @@
() -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 0, 0.8f, null));
expectThrows(
IllegalArgumentException.class,
+ () -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 100, null, null));
+ expectThrows(
+ IllegalArgumentException.class,
() ->
new Lucene99HnswScalarQuantizedVectorsFormat(
20, 100, 1, null, new SameThreadExecutorService()));
diff --git a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java
index 493b2cd..c444802 100644
--- a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java
+++ b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java
@@ -50,6 +50,8 @@
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(512 + 1, 20));
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 3201));
expectThrows(
+ IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 100, 100, null));
+ expectThrows(
IllegalArgumentException.class,
() -> new Lucene99HnswVectorsFormat(20, 100, 1, new SameThreadExecutorService()));
}
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
index 08b9bf9..17ea6e8 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
@@ -20,20 +20,16 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
-import org.apache.lucene.document.KnnFloatVectorField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
@@ -46,10 +42,7 @@
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.InfoStream;
-import org.apache.lucene.util.SameThreadExecutorService;
-import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.SuppressForbidden;
-import org.apache.lucene.util.Version;
public class TestConcurrentMergeScheduler extends LuceneTestCase {
@@ -97,22 +90,12 @@
|| (th instanceof IllegalStateException
&& th.getMessage().contains("this writer hit an unrecoverable error"));
}
-
- @Override
- // override here to ensure even tiny merges get the parallel executor
- public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
- assert intraMergeExecutor != null : "intraMergeExecutor is not initialized";
- return intraMergeExecutor;
- }
});
}
IndexWriter writer = new IndexWriter(directory, iwc);
Document doc = new Document();
Field idField = newStringField("id", "", Field.Store.YES);
- KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
doc.add(idField);
- // Add knn float vectors to test parallel merge
- doc.add(knnField);
outer:
for (int i = 0; i < 10; i++) {
@@ -122,7 +105,6 @@
for (int j = 0; j < 20; j++) {
idField.setStringValue(Integer.toString(i * 20 + j));
- knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
writer.addDocument(doc);
}
@@ -244,35 +226,23 @@
Directory directory = newDirectory();
Document doc = new Document();
Field idField = newStringField("id", "", Field.Store.YES);
- KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
doc.add(idField);
- doc.add(knnField);
- IndexWriterConfig iwc =
- newIndexWriterConfig(new MockAnalyzer(random()))
- // Force excessive merging:
- .setMaxBufferedDocs(2)
- .setMergePolicy(newLogMergePolicy(100))
- .setCommitOnClose(false);
- if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) {
- iwc.setMergeScheduler(
- new ConcurrentMergeScheduler() {
- @Override
- // override here to ensure even tiny merges get the parallel executor
- public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
- assert intraMergeExecutor != null : "scaledExecutor is not initialized";
- return intraMergeExecutor;
- }
- });
- }
- IndexWriter writer = new IndexWriter(directory, iwc);
+ IndexWriter writer =
+ new IndexWriter(
+ directory,
+ newIndexWriterConfig(new MockAnalyzer(random()))
+ .
+ // Force excessive merging:
+ setMaxBufferedDocs(2)
+ .setMergePolicy(newLogMergePolicy(100))
+ .setCommitOnClose(false));
int numIters = TEST_NIGHTLY ? 10 : 3;
for (int iter = 0; iter < numIters; iter++) {
for (int j = 0; j < 201; j++) {
idField.setStringValue(Integer.toString(iter * 201 + j));
- knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
writer.addDocument(doc);
}
@@ -394,118 +364,6 @@
dir.close();
}
- public void testSmallMergesDonNotGetThreads() throws IOException {
- Directory dir = newDirectory();
- IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
- iwc.setMaxBufferedDocs(2);
- iwc.setMergeScheduler(
- new ConcurrentMergeScheduler() {
- @Override
- protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge)
- throws IOException {
- assertTrue(this.getIntraMergeExecutor(merge) instanceof SameThreadExecutorService);
- super.doMerge(mergeSource, merge);
- }
- });
- IndexWriter w = new IndexWriter(dir, iwc);
- for (int i = 0; i < 10; i++) {
- Document doc = new Document();
- doc.add(new StringField("id", "" + i, Field.Store.NO));
- w.addDocument(doc);
- }
- w.forceMerge(1);
- w.close();
- dir.close();
- }
-
- @SuppressForbidden(reason = "Thread sleep")
- public void testIntraMergeThreadPoolIsLimitedByMaxThreads() throws IOException {
- ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
- MergeScheduler.MergeSource mergeSource =
- new MergeScheduler.MergeSource() {
- @Override
- public MergePolicy.OneMerge getNextMerge() {
- fail("should not be called");
- return null;
- }
-
- @Override
- public void onMergeFinished(MergePolicy.OneMerge merge) {
- fail("should not be called");
- }
-
- @Override
- public boolean hasPendingMerges() {
- fail("should not be called");
- return false;
- }
-
- @Override
- public void merge(MergePolicy.OneMerge merge) throws IOException {
- fail("should not be called");
- }
- };
- try (Directory dir = newDirectory();
- mergeScheduler) {
- MergePolicy.OneMerge merge =
- new MergePolicy.OneMerge(
- List.of(
- new SegmentCommitInfo(
- new SegmentInfo(
- dir,
- Version.LATEST,
- null,
- "test",
- 0,
- false,
- false,
- Codec.getDefault(),
- Collections.emptyMap(),
- StringHelper.randomId(),
- new HashMap<>(),
- null),
- 0,
- 0,
- 0,
- 0,
- 0,
- new byte[16])));
- mergeScheduler.initialize(InfoStream.NO_OUTPUT, dir);
- mergeScheduler.setMaxMergesAndThreads(6, 6);
- Executor executor = mergeScheduler.intraMergeExecutor;
- AtomicInteger threadsExecutedOnPool = new AtomicInteger();
- AtomicInteger threadsExecutedOnSelf = new AtomicInteger();
- for (int i = 0; i < 4; i++) {
- mergeScheduler.mergeThreads.add(
- mergeScheduler.new MergeThread(mergeSource, merge) {
- @Override
- @SuppressForbidden(reason = "Thread sleep")
- public void run() {
- executor.execute(
- () -> {
- if (Thread.currentThread() == this) {
- threadsExecutedOnSelf.incrementAndGet();
- } else {
- threadsExecutedOnPool.incrementAndGet();
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
- }
- });
- }
- for (ConcurrentMergeScheduler.MergeThread thread : mergeScheduler.mergeThreads) {
- thread.start();
- }
- mergeScheduler.sync();
- assertEquals(3, threadsExecutedOnSelf.get());
- assertEquals(1, threadsExecutedOnPool.get());
- }
- }
-
private static class TrackingCMS extends ConcurrentMergeScheduler {
long totMergedBytes;
CountDownLatch atLeastOneMerge;
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDoc.java b/lucene/core/src/test/org/apache/lucene/index/TestDoc.java
index 0cdd645..3b245dd 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDoc.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDoc.java
@@ -45,7 +45,6 @@
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.InfoStream;
-import org.apache.lucene.util.SameThreadExecutorService;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;
@@ -238,8 +237,7 @@
InfoStream.getDefault(),
trackingDir,
new FieldInfos.FieldNumbers(null, null),
- context,
- new SameThreadExecutorService());
+ context);
merger.merge();
r1.close();
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java b/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java
index 94a1849..615accb 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java
@@ -32,7 +32,6 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.InfoStream;
-import org.apache.lucene.util.SameThreadExecutorService;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;
import org.apache.lucene.util.packed.PackedLongValues;
@@ -106,8 +105,7 @@
InfoStream.getDefault(),
mergedDir,
new FieldInfos.FieldNumbers(null, null),
- newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))),
- new SameThreadExecutorService());
+ newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))));
MergeState mergeState = merger.merge();
int docsMerged = mergeState.segmentInfo.maxDoc();
assertTrue(docsMerged == 2);