blob: b06511d9d7ad8cf4ce4c505979f7e59c9a52870d [file] [log] [blame]
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
index 6554cc59da..7df98c68cf 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
@@ -18,7 +18,11 @@ package org.apache.lucene.index;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesConsumer;
@@ -94,97 +98,114 @@ final class SegmentMerger {
* @throws IOException if there is a low-level IO error
*/
MergeState merge() throws IOException {
+ ForkJoinPool fjPool = ForkJoinPool.commonPool();
+
if (!shouldMerge()) {
throw new IllegalStateException("Merge would result in 0 document segment");
}
mergeFieldInfos();
- long t0 = 0;
- if (mergeState.infoStream.isEnabled("SM")) {
- t0 = System.nanoTime();
- }
- int numMerged = mergeFields();
- if (mergeState.infoStream.isEnabled("SM")) {
- long t1 = System.nanoTime();
- mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge stored fields [" + numMerged + " docs]");
- }
- assert numMerged == mergeState.segmentInfo.maxDoc(): "numMerged=" + numMerged + " vs mergeState.segmentInfo.maxDoc()=" + mergeState.segmentInfo.maxDoc();
+
final SegmentWriteState segmentWriteState = new SegmentWriteState(mergeState.infoStream, directory, mergeState.segmentInfo,
- mergeState.mergeFieldInfos, null, context);
+ mergeState.mergeFieldInfos, null, context);
final SegmentReadState segmentReadState = new SegmentReadState(directory, mergeState.segmentInfo, mergeState.mergeFieldInfos,
- IOContext.READ, segmentWriteState.segmentSuffix);
+ IOContext.READ, segmentWriteState.segmentSuffix);
- if (mergeState.mergeFieldInfos.hasNorms()) {
- if (mergeState.infoStream.isEnabled("SM")) {
- t0 = System.nanoTime();
- }
- mergeNorms(segmentWriteState);
- if (mergeState.infoStream.isEnabled("SM")) {
- long t1 = System.nanoTime();
- mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge norms [" + numMerged + " docs]");
- }
- }
+ ArrayList<ForkJoinTask<Void>> tasks = new ArrayList<>();
- if (mergeState.infoStream.isEnabled("SM")) {
- t0 = System.nanoTime();
- }
- try (NormsProducer norms = mergeState.mergeFieldInfos.hasNorms()
- ? codec.normsFormat().normsProducer(segmentReadState)
- : null) {
- NormsProducer normsMergeInstance = null;
- if (norms != null) {
- // Use the merge instance in order to reuse the same IndexInput for all terms
- normsMergeInstance = norms.getMergeInstance();
+ // norms
+ fjPool.submit(() -> {
+ long t0 = System.nanoTime();
+ if (mergeState.mergeFieldInfos.hasNorms()) {
+ mergeNorms(segmentWriteState);
+ if (mergeState.infoStream.isEnabled("SM")) {
+ long t1 = System.nanoTime();
+ mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge norms");
+ }
}
- mergeTerms(segmentWriteState, normsMergeInstance);
- }
- if (mergeState.infoStream.isEnabled("SM")) {
- long t1 = System.nanoTime();
- mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge postings [" + numMerged + " docs]");
- }
+ return (Void) null;
+ }).join();
- if (mergeState.infoStream.isEnabled("SM")) {
- t0 = System.nanoTime();
- }
- if (mergeState.mergeFieldInfos.hasDocValues()) {
- mergeDocValues(segmentWriteState);
- }
- if (mergeState.infoStream.isEnabled("SM")) {
- long t1 = System.nanoTime();
- mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge doc values [" + numMerged + " docs]");
+ // stored fields
+ {
+ tasks.add(fjPool.submit(() -> {
+ long t0 = System.nanoTime();
+ int numMerged = mergeFields();
+ if (mergeState.infoStream.isEnabled("SM")) {
+ long t1 = System.nanoTime();
+ mergeState.infoStream.message("SM", ((t1 - t0) / 1000000) + " msec to merge stored fields [" + numMerged + " docs]");
+ }
+ assert numMerged == mergeState.segmentInfo.maxDoc() : "numMerged=" + numMerged + " vs mergeState.segmentInfo.maxDoc()=" + mergeState.segmentInfo.maxDoc();
+ return (Void) null;
+ }));
}
- if (mergeState.infoStream.isEnabled("SM")) {
- t0 = System.nanoTime();
- }
- if (mergeState.mergeFieldInfos.hasPointValues()) {
- mergePoints(segmentWriteState);
- }
- if (mergeState.infoStream.isEnabled("SM")) {
- long t1 = System.nanoTime();
- mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge points [" + numMerged + " docs]");
- }
+ tasks.add(fjPool.submit(() -> {
+ long t0 = System.nanoTime();
+ if (mergeState.mergeFieldInfos.hasNorms()) {
+ try (NormsProducer producer = codec.normsFormat().normsProducer(segmentReadState)) {
+ mergeTerms(segmentWriteState, producer.getMergeInstance());
+ }
+ } else {
+ mergeTerms(segmentWriteState, null);
+ }
+ if (mergeState.infoStream.isEnabled("SM")) {
+ long t1 = System.nanoTime();
+ mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge postings");
+ }
+ return (Void) null;
+ }));
- if (mergeState.mergeFieldInfos.hasVectors()) {
+ tasks.add(fjPool.submit(() -> {
+ long t0 = System.nanoTime();
+ if (mergeState.mergeFieldInfos.hasDocValues()) {
+ mergeDocValues(segmentWriteState);
+ }
if (mergeState.infoStream.isEnabled("SM")) {
- t0 = System.nanoTime();
+ long t1 = System.nanoTime();
+ mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge doc values");
+ }
+ return (Void) null;
+ }));
+
+ tasks.add(fjPool.submit(() -> {
+ long t0 = System.nanoTime();
+ if (mergeState.mergeFieldInfos.hasPointValues()) {
+ mergePoints(segmentWriteState);
}
- numMerged = mergeVectors();
if (mergeState.infoStream.isEnabled("SM")) {
long t1 = System.nanoTime();
- mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge vectors [" + numMerged + " docs]");
+ mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge points");
}
- assert numMerged == mergeState.segmentInfo.maxDoc();
+ return (Void) null;
+ }));
+
+ if (mergeState.mergeFieldInfos.hasVectors()) {
+ tasks.add(fjPool.submit(() -> {
+ long t0 = System.nanoTime();
+ long numMerged = mergeVectors();
+ if (mergeState.infoStream.isEnabled("SM")) {
+ long t1 = System.nanoTime();
+ mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge vectors [" + numMerged + " docs]");
+ }
+ assert numMerged == mergeState.segmentInfo.maxDoc();
+ return (Void) null;
+ }));
+ }
+
+ for (ForkJoinTask<?> t : tasks) {
+ t.join();
}
-
+
// write the merged infos
+ long t0 = 0;
if (mergeState.infoStream.isEnabled("SM")) {
t0 = System.nanoTime();
}
codec.fieldInfosFormat().write(directory, mergeState.segmentInfo, "", mergeState.mergeFieldInfos, context);
if (mergeState.infoStream.isEnabled("SM")) {
long t1 = System.nanoTime();
- mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to write field infos [" + numMerged + " docs]");
+ mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to write field infos");
}
return mergeState;