| diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java |
| index 6554cc59da..7df98c68cf 100644 |
| --- a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java |
| +++ b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java |
| @@ -18,7 +18,11 @@ package org.apache.lucene.index; |
| |
| |
| import java.io.IOException; |
| +import java.util.ArrayList; |
| import java.util.List; |
| +import java.util.concurrent.Callable; |
| +import java.util.concurrent.ForkJoinPool; |
| +import java.util.concurrent.ForkJoinTask; |
| |
| import org.apache.lucene.codecs.Codec; |
| import org.apache.lucene.codecs.DocValuesConsumer; |
| @@ -94,97 +98,114 @@ final class SegmentMerger { |
| * @throws IOException if there is a low-level IO error |
| */ |
| MergeState merge() throws IOException { |
| + ForkJoinPool fjPool = ForkJoinPool.commonPool(); |
| + |
| if (!shouldMerge()) { |
| throw new IllegalStateException("Merge would result in 0 document segment"); |
| } |
| mergeFieldInfos(); |
| - long t0 = 0; |
| - if (mergeState.infoStream.isEnabled("SM")) { |
| - t0 = System.nanoTime(); |
| - } |
| - int numMerged = mergeFields(); |
| - if (mergeState.infoStream.isEnabled("SM")) { |
| - long t1 = System.nanoTime(); |
| - mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge stored fields [" + numMerged + " docs]"); |
| - } |
| - assert numMerged == mergeState.segmentInfo.maxDoc(): "numMerged=" + numMerged + " vs mergeState.segmentInfo.maxDoc()=" + mergeState.segmentInfo.maxDoc(); |
| + |
| |
| final SegmentWriteState segmentWriteState = new SegmentWriteState(mergeState.infoStream, directory, mergeState.segmentInfo, |
| - mergeState.mergeFieldInfos, null, context); |
| + mergeState.mergeFieldInfos, null, context); |
| final SegmentReadState segmentReadState = new SegmentReadState(directory, mergeState.segmentInfo, mergeState.mergeFieldInfos, |
| - IOContext.READ, segmentWriteState.segmentSuffix); |
| + IOContext.READ, segmentWriteState.segmentSuffix); |
| |
| - if (mergeState.mergeFieldInfos.hasNorms()) { |
| - if (mergeState.infoStream.isEnabled("SM")) { |
| - t0 = System.nanoTime(); |
| - } |
| - mergeNorms(segmentWriteState); |
| - if (mergeState.infoStream.isEnabled("SM")) { |
| - long t1 = System.nanoTime(); |
| - mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge norms [" + numMerged + " docs]"); |
| - } |
| - } |
| + ArrayList<ForkJoinTask<Void>> tasks = new ArrayList<>(); |
| |
| - if (mergeState.infoStream.isEnabled("SM")) { |
| - t0 = System.nanoTime(); |
| - } |
| - try (NormsProducer norms = mergeState.mergeFieldInfos.hasNorms() |
| - ? codec.normsFormat().normsProducer(segmentReadState) |
| - : null) { |
| - NormsProducer normsMergeInstance = null; |
| - if (norms != null) { |
| - // Use the merge instance in order to reuse the same IndexInput for all terms |
| - normsMergeInstance = norms.getMergeInstance(); |
| + // norms |
| + fjPool.submit(() -> { |
| + long t0 = System.nanoTime(); |
| + if (mergeState.mergeFieldInfos.hasNorms()) { |
| + mergeNorms(segmentWriteState); |
| + if (mergeState.infoStream.isEnabled("SM")) { |
| + long t1 = System.nanoTime(); |
| + mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge norms"); |
| + } |
| } |
| - mergeTerms(segmentWriteState, normsMergeInstance); |
| - } |
| - if (mergeState.infoStream.isEnabled("SM")) { |
| - long t1 = System.nanoTime(); |
| - mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge postings [" + numMerged + " docs]"); |
| - } |
| + return (Void) null; |
| + }).join(); |
| |
| - if (mergeState.infoStream.isEnabled("SM")) { |
| - t0 = System.nanoTime(); |
| - } |
| - if (mergeState.mergeFieldInfos.hasDocValues()) { |
| - mergeDocValues(segmentWriteState); |
| - } |
| - if (mergeState.infoStream.isEnabled("SM")) { |
| - long t1 = System.nanoTime(); |
| - mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge doc values [" + numMerged + " docs]"); |
| + // stored fields |
| + { |
| + tasks.add(fjPool.submit(() -> { |
| + long t0 = System.nanoTime(); |
| + int numMerged = mergeFields(); |
| + if (mergeState.infoStream.isEnabled("SM")) { |
| + long t1 = System.nanoTime(); |
| + mergeState.infoStream.message("SM", ((t1 - t0) / 1000000) + " msec to merge stored fields [" + numMerged + " docs]"); |
| + } |
| + assert numMerged == mergeState.segmentInfo.maxDoc() : "numMerged=" + numMerged + " vs mergeState.segmentInfo.maxDoc()=" + mergeState.segmentInfo.maxDoc(); |
| + return (Void) null; |
| + })); |
| } |
| |
| - if (mergeState.infoStream.isEnabled("SM")) { |
| - t0 = System.nanoTime(); |
| - } |
| - if (mergeState.mergeFieldInfos.hasPointValues()) { |
| - mergePoints(segmentWriteState); |
| - } |
| - if (mergeState.infoStream.isEnabled("SM")) { |
| - long t1 = System.nanoTime(); |
| - mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge points [" + numMerged + " docs]"); |
| - } |
| + tasks.add(fjPool.submit(() -> { |
| + long t0 = System.nanoTime(); |
| + if (mergeState.mergeFieldInfos.hasNorms()) { |
| + try (NormsProducer producer = codec.normsFormat().normsProducer(segmentReadState)) { |
| + mergeTerms(segmentWriteState, producer.getMergeInstance()); |
| + } |
| + } else { |
| + mergeTerms(segmentWriteState, null); |
| + } |
| + if (mergeState.infoStream.isEnabled("SM")) { |
| + long t1 = System.nanoTime(); |
| + mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge postings"); |
| + } |
| + return (Void) null; |
| + })); |
| |
| - if (mergeState.mergeFieldInfos.hasVectors()) { |
| + tasks.add(fjPool.submit(() -> { |
| + long t0 = System.nanoTime(); |
| + if (mergeState.mergeFieldInfos.hasDocValues()) { |
| + mergeDocValues(segmentWriteState); |
| + } |
| if (mergeState.infoStream.isEnabled("SM")) { |
| - t0 = System.nanoTime(); |
| + long t1 = System.nanoTime(); |
| + mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge doc values"); |
| + } |
| + return (Void) null; |
| + })); |
| + |
| + tasks.add(fjPool.submit(() -> { |
| + long t0 = System.nanoTime(); |
| + if (mergeState.mergeFieldInfos.hasPointValues()) { |
| + mergePoints(segmentWriteState); |
| } |
| - numMerged = mergeVectors(); |
| if (mergeState.infoStream.isEnabled("SM")) { |
| long t1 = System.nanoTime(); |
| - mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge vectors [" + numMerged + " docs]"); |
| + mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge points"); |
| } |
| - assert numMerged == mergeState.segmentInfo.maxDoc(); |
| + return (Void) null; |
| + })); |
| + |
| + if (mergeState.mergeFieldInfos.hasVectors()) { |
| + tasks.add(fjPool.submit(() -> { |
| + long t0 = System.nanoTime(); |
| + long numMerged = mergeVectors(); |
| + if (mergeState.infoStream.isEnabled("SM")) { |
| + long t1 = System.nanoTime(); |
| + mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to merge vectors [" + numMerged + " docs]"); |
| + } |
| + assert numMerged == mergeState.segmentInfo.maxDoc(); |
| + return (Void) null; |
| + })); |
| + } |
| + |
| + for (ForkJoinTask<?> t : tasks) { |
| + t.join(); |
| } |
| - |
| + |
| // write the merged infos |
| + long t0 = 0; |
| if (mergeState.infoStream.isEnabled("SM")) { |
| t0 = System.nanoTime(); |
| } |
| codec.fieldInfosFormat().write(directory, mergeState.segmentInfo, "", mergeState.mergeFieldInfos, context); |
| if (mergeState.infoStream.isEnabled("SM")) { |
| long t1 = System.nanoTime(); |
| - mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to write field infos [" + numMerged + " docs]"); |
| + mergeState.infoStream.message("SM", ((t1-t0)/1000000) + " msec to write field infos"); |
| } |
| |
| return mergeState; |