blob: 0e68314850283e029bc604322668e73c0e116ac5 [file] [log] [blame]
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.LiveDocsFormat;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.NumericFieldUpdates.UpdatesIterator;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.MutableBits;
// Used by IndexWriter to hold open SegmentReaders (for
// searching or merging), plus pending deletes and updates,
// for a given segment
class ReadersAndUpdates {
// Not final because we replace (clone) when we need to
// change it and it's been shared:
public final SegmentCommitInfo info;
// Tracks how many consumers are using this instance:
private final AtomicInteger refCount = new AtomicInteger(1);
private final IndexWriter writer;
// Set once (null, and then maybe set, and never set again):
private SegmentReader reader;
// Holds the current shared (readable and writable)
// liveDocs. This is null when there are no deleted
// docs, and it's copy-on-write (cloned whenever we need
// to change it but it's been shared to an external NRT
// reader).
private Bits liveDocs;
// How many further deletions we've done against
// liveDocs vs when we loaded it or last wrote it:
private int pendingDeleteCount;
// True if the current liveDocs is referenced by an
// external NRT reader:
private boolean liveDocsShared;
// Indicates whether this segment is currently being merged. While a segment
// is merging, all field updates are also registered in the
// mergingNumericUpdates map. Also, calls to writeFieldUpdates merge the
// updates with mergingNumericUpdates.
// That way, when the segment is done merging, IndexWriter can apply the
// updates on the merged segment too.
private boolean isMerging = false;
private final Map<String,NumericFieldUpdates> mergingNumericUpdates = new HashMap<String,NumericFieldUpdates>();
public ReadersAndUpdates(IndexWriter writer, SegmentCommitInfo info) {
this.info = info;
this.writer = writer;
liveDocsShared = true;
}
public void incRef() {
final int rc = refCount.incrementAndGet();
assert rc > 1;
}
public void decRef() {
final int rc = refCount.decrementAndGet();
assert rc >= 0;
}
public int refCount() {
final int rc = refCount.get();
assert rc >= 0;
return rc;
}
public synchronized int getPendingDeleteCount() {
return pendingDeleteCount;
}
// Call only from assert!
public synchronized boolean verifyDocCounts() {
int count;
if (liveDocs != null) {
count = 0;
for(int docID=0;docID<info.info.getDocCount();docID++) {
if (liveDocs.get(docID)) {
count++;
}
}
} else {
count = info.info.getDocCount();
}
assert info.info.getDocCount() - info.getDelCount() - pendingDeleteCount == count: "info.docCount=" + info.info.getDocCount() + " info.getDelCount()=" + info.getDelCount() + " pendingDeleteCount=" + pendingDeleteCount + " count=" + count;
return true;
}
/** Returns a {@link SegmentReader}. */
public SegmentReader getReader(IOContext context) throws IOException {
if (reader == null) {
// We steal returned ref:
reader = new SegmentReader(info, context);
if (liveDocs == null) {
liveDocs = reader.getLiveDocs();
}
}
// Ref for caller
reader.incRef();
return reader;
}
public synchronized void release(SegmentReader sr) throws IOException {
assert info == sr.getSegmentInfo();
sr.decRef();
}
public synchronized boolean delete(int docID) {
assert liveDocs != null;
assert Thread.holdsLock(writer);
assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " docCount=" + info.info.getDocCount();
assert !liveDocsShared;
final boolean didDelete = liveDocs.get(docID);
if (didDelete) {
((MutableBits) liveDocs).clear(docID);
pendingDeleteCount++;
//System.out.println(" new del seg=" + info + " docID=" + docID + " pendingDelCount=" + pendingDeleteCount + " totDelCount=" + (info.docCount-liveDocs.count()));
}
return didDelete;
}
// NOTE: removes callers ref
public synchronized void dropReaders() throws IOException {
// TODO: can we somehow use IOUtils here...? problem is
// we are calling .decRef not .close)...
if (reader != null) {
//System.out.println(" pool.drop info=" + info + " rc=" + reader.getRefCount());
try {
reader.decRef();
} finally {
reader = null;
}
}
decRef();
}
/**
* Returns a ref to a clone. NOTE: you should decRef() the reader when you're
* dont (ie do not call close()).
*/
public synchronized SegmentReader getReadOnlyClone(IOContext context) throws IOException {
if (reader == null) {
getReader(context).decRef();
assert reader != null;
}
liveDocsShared = true;
if (liveDocs != null) {
return new SegmentReader(reader.getSegmentInfo(), reader, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount);
} else {
assert reader.getLiveDocs() == liveDocs;
reader.incRef();
return reader;
}
}
public synchronized void initWritableLiveDocs() throws IOException {
assert Thread.holdsLock(writer);
assert info.info.getDocCount() > 0;
//System.out.println("initWritableLivedocs seg=" + info + " liveDocs=" + liveDocs + " shared=" + shared);
if (liveDocsShared) {
// Copy on write: this means we've cloned a
// SegmentReader sharing the current liveDocs
// instance; must now make a private clone so we can
// change it:
LiveDocsFormat liveDocsFormat = info.info.getCodec().liveDocsFormat();
if (liveDocs == null) {
//System.out.println("create BV seg=" + info);
liveDocs = liveDocsFormat.newLiveDocs(info.info.getDocCount());
} else {
liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
}
liveDocsShared = false;
}
}
public synchronized Bits getLiveDocs() {
assert Thread.holdsLock(writer);
return liveDocs;
}
public synchronized Bits getReadOnlyLiveDocs() {
//System.out.println("getROLiveDocs seg=" + info);
assert Thread.holdsLock(writer);
liveDocsShared = true;
//if (liveDocs != null) {
//System.out.println(" liveCount=" + liveDocs.count());
//}
return liveDocs;
}
public synchronized void dropChanges() {
// Discard (don't save) changes when we are dropping
// the reader; this is used only on the sub-readers
// after a successful merge. If deletes had
// accumulated on those sub-readers while the merge
// is running, by now we have carried forward those
// deletes onto the newly merged segment, so we can
// discard them on the sub-readers:
pendingDeleteCount = 0;
dropMergingUpdates();
}
// Commit live docs (writes new _X_N.del files) and field updates (writes new
// _X_N updates files) to the directory; returns true if it wrote any file
// and false if there were no new deletes or updates to write:
public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
assert Thread.holdsLock(writer);
//System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount + " numericUpdates=" + numericUpdates);
if (pendingDeleteCount == 0) {
return false;
}
// We have new deletes
assert liveDocs.length() == info.info.getDocCount();
// Do this so we can delete any created files on
// exception; this saves all codecs from having to do
// it:
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
// We can write directly to the actual name (vs to a
// .tmp & renaming it) because the file is not live
// until segments file is written:
boolean success = false;
try {
Codec codec = info.info.getCodec();
codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
success = true;
} finally {
if (!success) {
// Advance only the nextWriteDelGen so that a 2nd
// attempt to write will write to a new file
info.advanceNextWriteDelGen();
// Delete any partially created file(s):
for (String fileName : trackingDir.getCreatedFiles()) {
try {
dir.deleteFile(fileName);
} catch (Throwable t) {
// Ignore so we throw only the first exc
}
}
}
}
// If we hit an exc in the line above (eg disk full)
// then info's delGen remains pointing to the previous
// (successfully written) del docs:
info.advanceDelGen();
info.setDelCount(info.getDelCount() + pendingDeleteCount);
pendingDeleteCount = 0;
return true;
}
// Writes field updates (new _X_N updates files) to the directory
public synchronized void writeFieldUpdates(Directory dir, Map<String,NumericFieldUpdates> numericFieldUpdates) throws IOException {
assert Thread.holdsLock(writer);
//System.out.println("rld.writeFieldUpdates: seg=" + info + " numericFieldUpdates=" + numericFieldUpdates);
assert numericFieldUpdates != null && !numericFieldUpdates.isEmpty();
// Do this so we can delete any created files on
// exception; this saves all codecs from having to do
// it:
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
FieldInfos fieldInfos = null;
boolean success = false;
try {
final Codec codec = info.info.getCodec();
// reader could be null e.g. for a just merged segment (from
// IndexWriter.commitMergedDeletes).
final SegmentReader reader = this.reader == null ? new SegmentReader(info, IOContext.READONCE) : this.reader;
try {
// clone FieldInfos so that we can update their dvGen separately from
// the reader's infos and write them to a new fieldInfos_gen file
FieldInfos.Builder builder = new FieldInfos.Builder(writer.globalFieldNumberMap);
// cannot use builder.add(reader.getFieldInfos()) because it does not
// clone FI.attributes as well FI.dvGen
for (FieldInfo fi : reader.getFieldInfos()) {
FieldInfo clone = builder.add(fi);
// copy the stuff FieldInfos.Builder doesn't copy
if (fi.attributes() != null) {
for (Entry<String,String> e : fi.attributes().entrySet()) {
clone.putAttribute(e.getKey(), e.getValue());
}
}
clone.setDocValuesGen(fi.getDocValuesGen());
}
// create new fields or update existing ones to have NumericDV type
for (String f : numericFieldUpdates.keySet()) {
builder.addOrUpdate(f, NumericDocValuesField.TYPE);
}
fieldInfos = builder.finish();
final long nextFieldInfosGen = info.getNextFieldInfosGen();
final String segmentSuffix = Long.toString(nextFieldInfosGen, Character.MAX_RADIX);
final SegmentWriteState state = new SegmentWriteState(null, trackingDir, info.info, fieldInfos, null, IOContext.DEFAULT, segmentSuffix);
final DocValuesFormat docValuesFormat = codec.docValuesFormat();
final DocValuesConsumer fieldsConsumer = docValuesFormat.fieldsConsumer(state);
boolean fieldsConsumerSuccess = false;
try {
// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: applying updates; seg=" + info + " updates=" + numericUpdates);
for (Entry<String,NumericFieldUpdates> e : numericFieldUpdates.entrySet()) {
final String field = e.getKey();
final NumericFieldUpdates fieldUpdates = e.getValue();
final FieldInfo fieldInfo = fieldInfos.fieldInfo(field);
assert fieldInfo != null;
fieldInfo.setDocValuesGen(nextFieldInfosGen);
// write the numeric updates to a new gen'd docvalues file
fieldsConsumer.addNumericField(fieldInfo, new Iterable<Number>() {
final NumericDocValues currentValues = reader.getNumericDocValues(field);
final Bits docsWithField = reader.getDocsWithField(field);
final int maxDoc = reader.maxDoc();
final UpdatesIterator updatesIter = fieldUpdates.getUpdates();
@Override
public Iterator<Number> iterator() {
updatesIter.reset();
return new Iterator<Number>() {
int curDoc = -1;
int updateDoc = updatesIter.nextDoc();
@Override
public boolean hasNext() {
return curDoc < maxDoc - 1;
}
@Override
public Number next() {
if (++curDoc >= maxDoc) {
throw new NoSuchElementException("no more documents to return values for");
}
if (curDoc == updateDoc) { // this document has an updated value
Long value = updatesIter.value(); // either null (unset value) or updated value
updateDoc = updatesIter.nextDoc(); // prepare for next round
return value;
} else {
// no update for this document
assert curDoc < updateDoc;
if (currentValues != null && docsWithField.get(curDoc)) {
// only read the current value if the document had a value before
return currentValues.get(curDoc);
} else {
return null;
}
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("this iterator does not support removing elements");
}
};
}
});
}
codec.fieldInfosFormat().getFieldInfosWriter().write(trackingDir, info.info.name, segmentSuffix, fieldInfos, IOContext.DEFAULT);
fieldsConsumerSuccess = true;
} finally {
if (fieldsConsumerSuccess) {
fieldsConsumer.close();
} else {
IOUtils.closeWhileHandlingException(fieldsConsumer);
}
}
} finally {
if (reader != this.reader) {
// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: closeReader " + reader);
reader.close();
}
}
success = true;
} finally {
if (!success) {
// Advance only the nextWriteDocValuesGen so that a 2nd
// attempt to write will write to a new file
info.advanceNextWriteFieldInfosGen();
// Delete any partially created file(s):
for (String fileName : trackingDir.getCreatedFiles()) {
try {
dir.deleteFile(fileName);
} catch (Throwable t) {
// Ignore so we throw only the first exc
}
}
}
}
info.advanceFieldInfosGen();
// copy all the updates to mergingUpdates, so they can later be applied to the merged segment
if (isMerging) {
for (Entry<String,NumericFieldUpdates> e : numericFieldUpdates.entrySet()) {
NumericFieldUpdates fieldUpdates = mergingNumericUpdates.get(e.getKey());
if (fieldUpdates == null) {
mergingNumericUpdates.put(e.getKey(), e.getValue());
} else {
fieldUpdates.merge(e.getValue());
}
}
}
// create a new map, keeping only the gens that are in use
Map<Long,Set<String>> genUpdatesFiles = info.getUpdatesFiles();
Map<Long,Set<String>> newGenUpdatesFiles = new HashMap<Long,Set<String>>();
final long fieldInfosGen = info.getFieldInfosGen();
for (FieldInfo fi : fieldInfos) {
long dvGen = fi.getDocValuesGen();
if (dvGen != -1 && !newGenUpdatesFiles.containsKey(dvGen)) {
if (dvGen == fieldInfosGen) {
newGenUpdatesFiles.put(fieldInfosGen, trackingDir.getCreatedFiles());
} else {
newGenUpdatesFiles.put(dvGen, genUpdatesFiles.get(dvGen));
}
}
}
info.setGenUpdatesFiles(newGenUpdatesFiles);
// wrote new files, should checkpoint()
writer.checkpoint();
// if there is a reader open, reopen it to reflect the updates
if (reader != null) {
SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount);
boolean reopened = false;
try {
reader.decRef();
reader = newReader;
reopened = true;
} finally {
if (!reopened) {
newReader.decRef();
}
}
}
}
/**
* Returns a reader for merge. This method applies field updates if there are
* any and marks that this segment is currently merging.
*/
synchronized SegmentReader getReaderForMerge(IOContext context) throws IOException {
assert Thread.holdsLock(writer);
// must execute these two statements as atomic operation, otherwise we
// could lose updates if e.g. another thread calls writeFieldUpdates in
// between, or the updates are applied to the obtained reader, but then
// re-applied in IW.commitMergedDeletes (unnecessary work and potential
// bugs).
isMerging = true;
return getReader(context);
}
/**
* Drops all merging updates. Called from IndexWriter after this segment
* finished merging (whether successfully or not).
*/
public synchronized void dropMergingUpdates() {
mergingNumericUpdates.clear();
isMerging = false;
}
/** Returns updates that came in while this segment was merging. */
public synchronized Map<String,NumericFieldUpdates> getMergingFieldUpdates() {
return mergingNumericUpdates;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("ReadersAndLiveDocs(seg=").append(info);
sb.append(" pendingDeleteCount=").append(pendingDeleteCount);
sb.append(" liveDocsShared=").append(liveDocsShared);
return sb.toString();
}
}