| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.lucene.index; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.function.LongSupplier; |
| import java.util.stream.Collectors; |
| |
| import org.apache.lucene.store.AlreadyClosedException; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.util.CollectionUtil; |
| import org.apache.lucene.util.IOUtils; |
| import org.apache.lucene.util.InfoStream; |
| |
| /** Holds shared SegmentReader instances. IndexWriter uses |
| * SegmentReaders for 1) applying deletes/DV updates, 2) doing |
| * merges, 3) handing out a real-time reader. This pool |
| * reuses instances of the SegmentReaders in all these |
| * places if it is in "near real-time mode" (getReader() |
| * has been called on this instance). */ |
| final class ReaderPool implements Closeable { |
| |
| private final Map<SegmentCommitInfo,ReadersAndUpdates> readerMap = new HashMap<>(); |
| private final Directory directory; |
| private final Directory originalDirectory; |
| private final FieldInfos.FieldNumbers fieldNumbers; |
| private final LongSupplier completedDelGenSupplier; |
| private final InfoStream infoStream; |
| private final SegmentInfos segmentInfos; |
| private final String softDeletesField; |
| // This is a "write once" variable (like the organic dye |
| // on a DVD-R that may or may not be heated by a laser and |
| // then cooled to permanently record the event): it's |
| // false, by default until {@link #enableReaderPooling()} |
| // is called for the first time, |
| // at which point it's switched to true and never changes |
| // back to false. Once this is true, we hold open and |
| // reuse SegmentReader instances internally for applying |
| // deletes, doing merges, and reopening near real-time |
| // readers. |
| // in practice this should be called once the readers are likely |
| // to be needed and reused ie if IndexWriter#getReader is called. |
| private volatile boolean poolReaders; |
| private final AtomicBoolean closed = new AtomicBoolean(false); |
| |
| ReaderPool(Directory directory, Directory originalDirectory, SegmentInfos segmentInfos, |
| FieldInfos.FieldNumbers fieldNumbers, LongSupplier completedDelGenSupplier, InfoStream infoStream, |
| String softDeletesField, StandardDirectoryReader reader) throws IOException { |
| this.directory = directory; |
| this.originalDirectory = originalDirectory; |
| this.segmentInfos = segmentInfos; |
| this.fieldNumbers = fieldNumbers; |
| this.completedDelGenSupplier = completedDelGenSupplier; |
| this.infoStream = infoStream; |
| this.softDeletesField = softDeletesField; |
| if (reader != null) { |
| // Pre-enroll all segment readers into the reader pool; this is necessary so |
| // any in-memory NRT live docs are correctly carried over, and so NRT readers |
| // pulled from this IW share the same segment reader: |
| List<LeafReaderContext> leaves = reader.leaves(); |
| assert segmentInfos.size() == leaves.size(); |
| for (int i=0;i<leaves.size();i++) { |
| LeafReaderContext leaf = leaves.get(i); |
| SegmentReader segReader = (SegmentReader) leaf.reader(); |
| SegmentReader newReader = new SegmentReader(segmentInfos.info(i), segReader, segReader.getLiveDocs(), |
| segReader.getHardLiveDocs(), segReader.numDocs(), true); |
| readerMap.put(newReader.getOriginalSegmentInfo(), new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), |
| newReader, newPendingDeletes(newReader, newReader.getOriginalSegmentInfo()))); |
| } |
| } |
| } |
| |
| /** Asserts this info still exists in IW's segment infos */ |
| synchronized boolean assertInfoIsLive(SegmentCommitInfo info) { |
| int idx = segmentInfos.indexOf(info); |
| assert idx != -1: "info=" + info + " isn't live"; |
| assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos"; |
| return true; |
| } |
| |
| /** |
| * Drops reader for the given {@link SegmentCommitInfo} if it's pooled |
| * @return <code>true</code> if a reader is pooled |
| */ |
| synchronized boolean drop(SegmentCommitInfo info) throws IOException { |
| final ReadersAndUpdates rld = readerMap.get(info); |
| if (rld != null) { |
| assert info == rld.info; |
| readerMap.remove(info); |
| rld.dropReaders(); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Returns the sum of the ram used by all the buffered readers and updates in MB |
| */ |
| synchronized long ramBytesUsed() { |
| long bytes = 0; |
| for (ReadersAndUpdates rld : readerMap.values()) { |
| bytes += rld.ramBytesUsed.get(); |
| } |
| return bytes; |
| } |
| |
| /** |
| * Returns <code>true</code> iff any of the buffered readers and updates has at least one pending delete |
| */ |
| synchronized boolean anyDeletions() { |
| for(ReadersAndUpdates rld : readerMap.values()) { |
| if (rld.getDelCount() > 0) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Enables reader pooling for this pool. This should be called once the readers in this pool are shared with an |
| * outside resource like an NRT reader. Once reader pooling is enabled a {@link ReadersAndUpdates} will be kept around |
| * in the reader pool on calling {@link #release(ReadersAndUpdates, boolean)} until the segment get dropped via calls |
| * to {@link #drop(SegmentCommitInfo)} or {@link #dropAll()} or {@link #close()}. |
| * Reader pooling is disabled upon construction but can't be disabled again once it's enabled. |
| */ |
| void enableReaderPooling() { |
| poolReaders = true; |
| } |
| |
| boolean isReaderPoolingEnabled() { |
| return poolReaders; |
| } |
| |
| /** |
| * Releases the {@link ReadersAndUpdates}. This should only be called if the {@link #get(SegmentCommitInfo, boolean)} |
| * is called with the create paramter set to true. |
| * @return <code>true</code> if any files were written by this release call. |
| */ |
| synchronized boolean release(ReadersAndUpdates rld, boolean assertInfoLive) throws IOException { |
| boolean changed = false; |
| // Matches incRef in get: |
| rld.decRef(); |
| |
| if (rld.refCount() == 0) { |
| // This happens if the segment was just merged away, |
| // while a buffered deletes packet was still applying deletes/updates to it. |
| assert readerMap.containsKey(rld.info) == false: "seg=" + rld.info |
| + " has refCount 0 but still unexpectedly exists in the reader pool"; |
| } else { |
| |
| // Pool still holds a ref: |
| assert rld.refCount() > 0: "refCount=" + rld.refCount() + " reader=" + rld.info; |
| |
| if (poolReaders == false && rld.refCount() == 1 && readerMap.containsKey(rld.info)) { |
| // This is the last ref to this RLD, and we're not |
| // pooling, so remove it: |
| if (rld.writeLiveDocs(directory)) { |
| // Make sure we only write del docs for a live segment: |
| assert assertInfoLive == false || assertInfoIsLive(rld.info); |
| // Must checkpoint because we just |
| // created new _X_N.del and field updates files; |
| // don't call IW.checkpoint because that also |
| // increments SIS.version, which we do not want to |
| // do here: it was done previously (after we |
| // invoked BDS.applyDeletes), whereas here all we |
| // did was move the state to disk: |
| changed = true; |
| } |
| if (rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream)) { |
| changed = true; |
| } |
| if (rld.getNumDVUpdates() == 0) { |
| rld.dropReaders(); |
| readerMap.remove(rld.info); |
| } else { |
| // We are forced to pool this segment until its deletes fully apply (no delGen gaps) |
| } |
| } |
| } |
| return changed; |
| } |
| |
| @Override |
| public synchronized void close() throws IOException { |
| if (closed.compareAndSet(false, true)) { |
| dropAll(); |
| } |
| } |
| |
| /** |
| * Writes all doc values updates to disk if there are any. |
| * @return <code>true</code> iff any files where written |
| */ |
| boolean writeAllDocValuesUpdates() throws IOException { |
| Collection<ReadersAndUpdates> copy; |
| synchronized (this) { |
| // this needs to be protected by the reader pool lock otherwise we hit ConcurrentModificationException |
| copy = new HashSet<>(readerMap.values()); |
| } |
| boolean any = false; |
| for (ReadersAndUpdates rld : copy) { |
| any |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream); |
| } |
| return any; |
| } |
| |
| /** |
| * Writes all doc values updates to disk if there are any. |
| * @return <code>true</code> iff any files where written |
| */ |
| boolean writeDocValuesUpdatesForMerge(List<SegmentCommitInfo> infos) throws IOException { |
| boolean any = false; |
| for (SegmentCommitInfo info : infos) { |
| ReadersAndUpdates rld = get(info, false); |
| if (rld != null) { |
| any |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream); |
| rld.setIsMerging(); |
| } |
| } |
| return any; |
| } |
| |
| /** |
| * Returns a list of all currently maintained ReadersAndUpdates sorted by it's ram consumption largest to smallest. |
| * This list can also contain readers that don't consume any ram at this point ie. don't have any updates buffered. |
| */ |
| synchronized List<ReadersAndUpdates> getReadersByRam() { |
| class RamRecordingHolder { |
| final ReadersAndUpdates updates; |
| final long ramBytesUsed; |
| RamRecordingHolder(ReadersAndUpdates updates) { |
| this.updates = updates; |
| this.ramBytesUsed = updates.ramBytesUsed.get(); |
| } |
| } |
| final ArrayList<RamRecordingHolder> readersByRam; |
| synchronized (this) { |
| if (readerMap.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| readersByRam = new ArrayList<>(readerMap.size()); |
| for (ReadersAndUpdates rld : readerMap.values()) { |
| // we have to record the ram usage once and then sort |
| // since the ram usage can change concurrently and that will confuse the sort or hit an assertion |
| // the we can acquire here is not enough we would need to lock all ReadersAndUpdates to make sure it doesn't |
| // change |
| readersByRam.add(new RamRecordingHolder(rld)); |
| } |
| } |
| // Sort this outside of the lock by largest ramBytesUsed: |
| CollectionUtil.introSort(readersByRam, (a, b) -> Long.compare(b.ramBytesUsed, a.ramBytesUsed)); |
| return Collections.unmodifiableList(readersByRam.stream().map(h -> h.updates).collect(Collectors.toList())); |
| } |
| |
| |
| /** Remove all our references to readers, and commits |
| * any pending changes. */ |
| synchronized void dropAll() throws IOException { |
| Throwable priorE = null; |
| final Iterator<Map.Entry<SegmentCommitInfo,ReadersAndUpdates>> it = readerMap.entrySet().iterator(); |
| while(it.hasNext()) { |
| final ReadersAndUpdates rld = it.next().getValue(); |
| |
| // Important to remove as-we-go, not with .clear() |
| // in the end, in case we hit an exception; |
| // otherwise we could over-decref if close() is |
| // called again: |
| it.remove(); |
| |
| // NOTE: it is allowed that these decRefs do not |
| // actually close the SRs; this happens when a |
| // near real-time reader is kept open after the |
| // IndexWriter instance is closed: |
| try { |
| rld.dropReaders(); |
| } catch (Throwable t) { |
| priorE = IOUtils.useOrSuppress(priorE, t); |
| } |
| } |
| assert readerMap.size() == 0; |
| if (priorE != null) { |
| throw IOUtils.rethrowAlways(priorE); |
| } |
| } |
| |
| /** |
| * Commit live docs changes for the segment readers for |
| * the provided infos. |
| * |
| * @throws IOException If there is a low-level I/O error |
| */ |
| synchronized boolean commit(SegmentInfos infos) throws IOException { |
| boolean atLeastOneChange = false; |
| for (SegmentCommitInfo info : infos) { |
| final ReadersAndUpdates rld = readerMap.get(info); |
| if (rld != null) { |
| assert rld.info == info; |
| boolean changed = rld.writeLiveDocs(directory); |
| changed |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream); |
| |
| if (changed) { |
| // Make sure we only write del docs for a live segment: |
| assert assertInfoIsLive(info); |
| |
| // Must checkpoint because we just |
| // created new _X_N.del and field updates files; |
| // don't call IW.checkpoint because that also |
| // increments SIS.version, which we do not want to |
| // do here: it was done previously (after we |
| // invoked BDS.applyDeletes), whereas here all we |
| // did was move the state to disk: |
| atLeastOneChange = true; |
| } |
| } |
| } |
| return atLeastOneChange; |
| } |
| |
| /** |
| * Returns <code>true</code> iff there are any buffered doc values updates. Otherwise <code>false</code>. |
| */ |
| synchronized boolean anyDocValuesChanges() { |
| for (ReadersAndUpdates rld : readerMap.values()) { |
| // NOTE: we don't check for pending deletes because deletes carry over in RAM to NRT readers |
| if (rld.getNumDVUpdates() != 0) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Obtain a ReadersAndLiveDocs instance from the |
| * readerPool. If create is true, you must later call |
| * {@link #release(ReadersAndUpdates, boolean)}. |
| */ |
| synchronized ReadersAndUpdates get(SegmentCommitInfo info, boolean create) { |
| assert info.info.dir == originalDirectory: "info.dir=" + info.info.dir + " vs " + originalDirectory; |
| if (closed.get()) { |
| assert readerMap.isEmpty() : "Reader map is not empty: " + readerMap; |
| throw new AlreadyClosedException("ReaderPool is already closed"); |
| } |
| |
| ReadersAndUpdates rld = readerMap.get(info); |
| if (rld == null) { |
| if (create == false) { |
| return null; |
| } |
| rld = new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), info, newPendingDeletes(info)); |
| // Steal initial reference: |
| readerMap.put(info, rld); |
| } else { |
| assert rld.info == info: "rld.info=" + rld.info + " info=" + info + " isLive?=" + assertInfoIsLive(rld.info) |
| + " vs " + assertInfoIsLive(info); |
| } |
| |
| if (create) { |
| // Return ref to caller: |
| rld.incRef(); |
| } |
| |
| assert noDups(); |
| |
| return rld; |
| } |
| |
| private PendingDeletes newPendingDeletes(SegmentCommitInfo info) { |
| return softDeletesField == null ? new PendingDeletes(info) : new PendingSoftDeletes(softDeletesField, info); |
| } |
| |
| private PendingDeletes newPendingDeletes(SegmentReader reader, SegmentCommitInfo info) { |
| return softDeletesField == null ? new PendingDeletes(reader, info) : |
| new PendingSoftDeletes(softDeletesField, reader, info); |
| } |
| |
| // Make sure that every segment appears only once in the |
| // pool: |
| private boolean noDups() { |
| Set<String> seen = new HashSet<>(); |
| for(SegmentCommitInfo info : readerMap.keySet()) { |
| assert !seen.contains(info.info.name) : "seen twice: " + info.info.name ; |
| seen.add(info.info.name); |
| } |
| return true; |
| } |
| } |