blob: 8a269c89931db9d1678bebe107a49ad6aec1465f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
/** Holds shared SegmentReader instances. IndexWriter uses
* SegmentReaders for 1) applying deletes/DV updates, 2) doing
* merges, 3) handing out a real-time reader. This pool
* reuses instances of the SegmentReaders in all these
* places if it is in "near real-time mode" (getReader()
* has been called on this instance). */
final class ReaderPool implements Closeable {
private final Map<SegmentCommitInfo,ReadersAndUpdates> readerMap = new HashMap<>();
private final Directory directory;
private final Directory originalDirectory;
private final FieldInfos.FieldNumbers fieldNumbers;
private final LongSupplier completedDelGenSupplier;
private final InfoStream infoStream;
private final SegmentInfos segmentInfos;
private final String softDeletesField;
// This is a "write once" variable (like the organic dye
// on a DVD-R that may or may not be heated by a laser and
// then cooled to permanently record the event): it's
// false, by default until {@link #enableReaderPooling()}
// is called for the first time,
// at which point it's switched to true and never changes
// back to false. Once this is true, we hold open and
// reuse SegmentReader instances internally for applying
// deletes, doing merges, and reopening near real-time
// readers.
// in practice this should be called once the readers are likely
// to be needed and reused ie if IndexWriter#getReader is called.
private volatile boolean poolReaders;
private final AtomicBoolean closed = new AtomicBoolean(false);
ReaderPool(Directory directory, Directory originalDirectory, SegmentInfos segmentInfos,
FieldInfos.FieldNumbers fieldNumbers, LongSupplier completedDelGenSupplier, InfoStream infoStream,
String softDeletesField, StandardDirectoryReader reader) throws IOException {
this.directory = directory;
this.originalDirectory = originalDirectory;
this.segmentInfos = segmentInfos;
this.fieldNumbers = fieldNumbers;
this.completedDelGenSupplier = completedDelGenSupplier;
this.infoStream = infoStream;
this.softDeletesField = softDeletesField;
if (reader != null) {
// Pre-enroll all segment readers into the reader pool; this is necessary so
// any in-memory NRT live docs are correctly carried over, and so NRT readers
// pulled from this IW share the same segment reader:
List<LeafReaderContext> leaves = reader.leaves();
assert segmentInfos.size() == leaves.size();
for (int i=0;i<leaves.size();i++) {
LeafReaderContext leaf = leaves.get(i);
SegmentReader segReader = (SegmentReader) leaf.reader();
SegmentReader newReader = new SegmentReader(segmentInfos.info(i), segReader, segReader.getLiveDocs(),
segReader.getHardLiveDocs(), segReader.numDocs(), true);
readerMap.put(newReader.getOriginalSegmentInfo(), new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(),
newReader, newPendingDeletes(newReader, newReader.getOriginalSegmentInfo())));
}
}
}
/** Asserts this info still exists in IW's segment infos */
synchronized boolean assertInfoIsLive(SegmentCommitInfo info) {
int idx = segmentInfos.indexOf(info);
assert idx != -1: "info=" + info + " isn't live";
assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos";
return true;
}
/**
* Drops reader for the given {@link SegmentCommitInfo} if it's pooled
* @return <code>true</code> if a reader is pooled
*/
synchronized boolean drop(SegmentCommitInfo info) throws IOException {
final ReadersAndUpdates rld = readerMap.get(info);
if (rld != null) {
assert info == rld.info;
readerMap.remove(info);
rld.dropReaders();
return true;
}
return false;
}
/**
* Returns the sum of the ram used by all the buffered readers and updates in MB
*/
synchronized long ramBytesUsed() {
long bytes = 0;
for (ReadersAndUpdates rld : readerMap.values()) {
bytes += rld.ramBytesUsed.get();
}
return bytes;
}
/**
* Returns <code>true</code> iff any of the buffered readers and updates has at least one pending delete
*/
synchronized boolean anyDeletions() {
for(ReadersAndUpdates rld : readerMap.values()) {
if (rld.getDelCount() > 0) {
return true;
}
}
return false;
}
/**
* Enables reader pooling for this pool. This should be called once the readers in this pool are shared with an
* outside resource like an NRT reader. Once reader pooling is enabled a {@link ReadersAndUpdates} will be kept around
* in the reader pool on calling {@link #release(ReadersAndUpdates, boolean)} until the segment get dropped via calls
* to {@link #drop(SegmentCommitInfo)} or {@link #dropAll()} or {@link #close()}.
* Reader pooling is disabled upon construction but can't be disabled again once it's enabled.
*/
void enableReaderPooling() {
poolReaders = true;
}
boolean isReaderPoolingEnabled() {
return poolReaders;
}
/**
* Releases the {@link ReadersAndUpdates}. This should only be called if the {@link #get(SegmentCommitInfo, boolean)}
* is called with the create paramter set to true.
* @return <code>true</code> if any files were written by this release call.
*/
synchronized boolean release(ReadersAndUpdates rld, boolean assertInfoLive) throws IOException {
boolean changed = false;
// Matches incRef in get:
rld.decRef();
if (rld.refCount() == 0) {
// This happens if the segment was just merged away,
// while a buffered deletes packet was still applying deletes/updates to it.
assert readerMap.containsKey(rld.info) == false: "seg=" + rld.info
+ " has refCount 0 but still unexpectedly exists in the reader pool";
} else {
// Pool still holds a ref:
assert rld.refCount() > 0: "refCount=" + rld.refCount() + " reader=" + rld.info;
if (poolReaders == false && rld.refCount() == 1 && readerMap.containsKey(rld.info)) {
// This is the last ref to this RLD, and we're not
// pooling, so remove it:
if (rld.writeLiveDocs(directory)) {
// Make sure we only write del docs for a live segment:
assert assertInfoLive == false || assertInfoIsLive(rld.info);
// Must checkpoint because we just
// created new _X_N.del and field updates files;
// don't call IW.checkpoint because that also
// increments SIS.version, which we do not want to
// do here: it was done previously (after we
// invoked BDS.applyDeletes), whereas here all we
// did was move the state to disk:
changed = true;
}
if (rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream)) {
changed = true;
}
if (rld.getNumDVUpdates() == 0) {
rld.dropReaders();
readerMap.remove(rld.info);
} else {
// We are forced to pool this segment until its deletes fully apply (no delGen gaps)
}
}
}
return changed;
}
@Override
public synchronized void close() throws IOException {
if (closed.compareAndSet(false, true)) {
dropAll();
}
}
/**
* Writes all doc values updates to disk if there are any.
* @return <code>true</code> iff any files where written
*/
boolean writeAllDocValuesUpdates() throws IOException {
Collection<ReadersAndUpdates> copy;
synchronized (this) {
// this needs to be protected by the reader pool lock otherwise we hit ConcurrentModificationException
copy = new HashSet<>(readerMap.values());
}
boolean any = false;
for (ReadersAndUpdates rld : copy) {
any |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream);
}
return any;
}
/**
* Writes all doc values updates to disk if there are any.
* @return <code>true</code> iff any files where written
*/
boolean writeDocValuesUpdatesForMerge(List<SegmentCommitInfo> infos) throws IOException {
boolean any = false;
for (SegmentCommitInfo info : infos) {
ReadersAndUpdates rld = get(info, false);
if (rld != null) {
any |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream);
rld.setIsMerging();
}
}
return any;
}
/**
* Returns a list of all currently maintained ReadersAndUpdates sorted by it's ram consumption largest to smallest.
* This list can also contain readers that don't consume any ram at this point ie. don't have any updates buffered.
*/
synchronized List<ReadersAndUpdates> getReadersByRam() {
class RamRecordingHolder {
final ReadersAndUpdates updates;
final long ramBytesUsed;
RamRecordingHolder(ReadersAndUpdates updates) {
this.updates = updates;
this.ramBytesUsed = updates.ramBytesUsed.get();
}
}
final ArrayList<RamRecordingHolder> readersByRam;
synchronized (this) {
if (readerMap.isEmpty()) {
return Collections.emptyList();
}
readersByRam = new ArrayList<>(readerMap.size());
for (ReadersAndUpdates rld : readerMap.values()) {
// we have to record the ram usage once and then sort
// since the ram usage can change concurrently and that will confuse the sort or hit an assertion
// the we can acquire here is not enough we would need to lock all ReadersAndUpdates to make sure it doesn't
// change
readersByRam.add(new RamRecordingHolder(rld));
}
}
// Sort this outside of the lock by largest ramBytesUsed:
CollectionUtil.introSort(readersByRam, (a, b) -> Long.compare(b.ramBytesUsed, a.ramBytesUsed));
return Collections.unmodifiableList(readersByRam.stream().map(h -> h.updates).collect(Collectors.toList()));
}
/** Remove all our references to readers, and commits
* any pending changes. */
synchronized void dropAll() throws IOException {
Throwable priorE = null;
final Iterator<Map.Entry<SegmentCommitInfo,ReadersAndUpdates>> it = readerMap.entrySet().iterator();
while(it.hasNext()) {
final ReadersAndUpdates rld = it.next().getValue();
// Important to remove as-we-go, not with .clear()
// in the end, in case we hit an exception;
// otherwise we could over-decref if close() is
// called again:
it.remove();
// NOTE: it is allowed that these decRefs do not
// actually close the SRs; this happens when a
// near real-time reader is kept open after the
// IndexWriter instance is closed:
try {
rld.dropReaders();
} catch (Throwable t) {
priorE = IOUtils.useOrSuppress(priorE, t);
}
}
assert readerMap.size() == 0;
if (priorE != null) {
throw IOUtils.rethrowAlways(priorE);
}
}
/**
* Commit live docs changes for the segment readers for
* the provided infos.
*
* @throws IOException If there is a low-level I/O error
*/
synchronized boolean commit(SegmentInfos infos) throws IOException {
boolean atLeastOneChange = false;
for (SegmentCommitInfo info : infos) {
final ReadersAndUpdates rld = readerMap.get(info);
if (rld != null) {
assert rld.info == info;
boolean changed = rld.writeLiveDocs(directory);
changed |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream);
if (changed) {
// Make sure we only write del docs for a live segment:
assert assertInfoIsLive(info);
// Must checkpoint because we just
// created new _X_N.del and field updates files;
// don't call IW.checkpoint because that also
// increments SIS.version, which we do not want to
// do here: it was done previously (after we
// invoked BDS.applyDeletes), whereas here all we
// did was move the state to disk:
atLeastOneChange = true;
}
}
}
return atLeastOneChange;
}
/**
* Returns <code>true</code> iff there are any buffered doc values updates. Otherwise <code>false</code>.
*/
synchronized boolean anyDocValuesChanges() {
for (ReadersAndUpdates rld : readerMap.values()) {
// NOTE: we don't check for pending deletes because deletes carry over in RAM to NRT readers
if (rld.getNumDVUpdates() != 0) {
return true;
}
}
return false;
}
/**
* Obtain a ReadersAndLiveDocs instance from the
* readerPool. If create is true, you must later call
* {@link #release(ReadersAndUpdates, boolean)}.
*/
synchronized ReadersAndUpdates get(SegmentCommitInfo info, boolean create) {
assert info.info.dir == originalDirectory: "info.dir=" + info.info.dir + " vs " + originalDirectory;
if (closed.get()) {
assert readerMap.isEmpty() : "Reader map is not empty: " + readerMap;
throw new AlreadyClosedException("ReaderPool is already closed");
}
ReadersAndUpdates rld = readerMap.get(info);
if (rld == null) {
if (create == false) {
return null;
}
rld = new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), info, newPendingDeletes(info));
// Steal initial reference:
readerMap.put(info, rld);
} else {
assert rld.info == info: "rld.info=" + rld.info + " info=" + info + " isLive?=" + assertInfoIsLive(rld.info)
+ " vs " + assertInfoIsLive(info);
}
if (create) {
// Return ref to caller:
rld.incRef();
}
assert noDups();
return rld;
}
private PendingDeletes newPendingDeletes(SegmentCommitInfo info) {
return softDeletesField == null ? new PendingDeletes(info) : new PendingSoftDeletes(softDeletesField, info);
}
private PendingDeletes newPendingDeletes(SegmentReader reader, SegmentCommitInfo info) {
return softDeletesField == null ? new PendingDeletes(reader, info) :
new PendingSoftDeletes(softDeletesField, reader, info);
}
// Make sure that every segment appears only once in the
// pool:
private boolean noDups() {
Set<String> seen = new HashSet<>();
for(SegmentCommitInfo info : readerMap.keySet()) {
assert !seen.contains(info.info.name) : "seen twice: " + info.info.name ;
seen.add(info.info.name);
}
return true;
}
}