blob: 2dcb552b7d88c515703ccb64a4ce3dd2c67d8b83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search;
import java.io.IOException;
import java.util.List;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.util.PriorityQueue;
import org.apache.lucene.util.mutable.MutableValue;
import org.apache.lucene.util.mutable.MutableValueDate;
import org.apache.lucene.util.mutable.MutableValueDouble;
import org.apache.lucene.util.mutable.MutableValueFloat;
import org.apache.lucene.util.mutable.MutableValueInt;
import org.apache.lucene.util.mutable.MutableValueLong;
import org.apache.solr.schema.SchemaField;
/**
* Merge multiple numeric point fields (segments) together.
*
* @lucene.internal
* @lucene.experimental
*/
public class PointMerger {
public static int TOTAL_BUFFER_SIZE = 1000000; // target number of elements to cache across all segments
public static int MIN_SEG_BUFFER_SIZE = 100; // minimum buffer size on any segment (to limit unnecessary exception throws)
public static class ValueIterator {
PQueue queue;
MutableValue topVal;
public ValueIterator(SchemaField field, List<LeafReaderContext> readers) throws IOException {
this(field, readers, TOTAL_BUFFER_SIZE, MIN_SEG_BUFFER_SIZE);
}
public ValueIterator(SchemaField field, List<LeafReaderContext> readers, int totalBufferSize, int minSegBufferSize) throws IOException {
assert field.getType().isPointField();
queue = new PQueue(readers.size());
if (readers.isEmpty()) {
return;
}
long ndocs = readers.get(readers.size()-1).docBase + readers.get(readers.size()-1).reader().maxDoc();
for (LeafReaderContext ctx : readers) {
PointValues pv = ctx.reader().getPointValues(field.getName());
if (pv == null) continue;
BaseSeg seg = null;
// int capacity = 2;
int capacity = (int)((long)totalBufferSize * ctx.reader().maxDoc() / ndocs);
capacity = Math.max(capacity, minSegBufferSize);
switch (field.getType().getNumberType()) {
case INTEGER:
seg = new IntSeg(pv, capacity);
break;
case LONG:
seg = new LongSeg(pv, capacity);
break;
case FLOAT:
seg = new FloatSeg(pv, capacity);
break;
case DOUBLE:
seg = new DoubleSeg(pv, capacity);
break;
case DATE:
seg = new DateSeg(pv, capacity);
break;
}
int count = seg.setNextValue();
if (count >= 0) {
queue.add(seg);
}
}
if (queue.size() > 0) topVal = queue.top().getMutableValue().duplicate();
}
// gets the mutable value that is updated after every call to getNextCount().
// getMutableValue only needs to be called a single time since the instance is reused for every call to getNextCount().
public MutableValue getMutableValue() {
return topVal;
}
public long getNextCount() throws IOException {
if (queue.size() == 0) return -1;
BaseSeg seg = queue.top();
topVal.copy(seg.getMutableValue());
long count = 0;
do {
count += seg.getCurrentCount();
int nextCount = seg.setNextValue();
if (nextCount < 0) {
queue.pop();
if (queue.size() == 0) break;
} else {
queue.updateTop();
}
seg = queue.top();
} while (seg.getMutableValue().equalsSameType(topVal));
return count;
}
}
static class PQueue extends PriorityQueue<BaseSeg> {
public PQueue(int maxSize) {
super(maxSize);
}
@Override
protected boolean lessThan(BaseSeg a, BaseSeg b) {
return BaseSeg.lessThan(a,b);
}
}
abstract static class BaseSeg implements PointValues.IntersectVisitor {
final PointValues points;
final int[] count;
int pos = -1; // index of the last valid entry
int readPos = -1; // last position read from
MutableValue currentValue; // subclass constructor will fill this in
int currentCount;
BaseSeg(PointValues points, int capacity) {
this.points = points;
this.count = new int[capacity];
}
public static boolean lessThan(BaseSeg a, BaseSeg b) {
return a.currentValue.compareTo(b.currentValue) < 0;
}
public MutableValue getMutableValue() {
return currentValue;
}
// returns -1 count if there are no more values
public int getCurrentCount() {
return currentCount;
}
// sets the next value and returns getCurrentCount()
public int setNextValue() throws IOException {
return 0;
};
void refill() throws IOException {
assert readPos >= pos;
readPos = -1;
pos = -1;
try {
points.intersect(this);
} catch (BreakException e) {
// nothing to do
}
}
@Override
public void visit(int docID) throws IOException {
throw new UnsupportedOperationException();
}
}
static class IntSeg extends BaseSeg {
final int[] values;
int last = Integer.MIN_VALUE;
final MutableValueInt mval;
IntSeg(PointValues points, int capacity) {
super(points, capacity);
this.values = new int[capacity];
this.currentValue = this.mval = new MutableValueInt();
}
public int setNextValue() throws IOException {
if (readPos >= pos) {
if (last != Integer.MAX_VALUE) {
++last;
refill();
}
if (readPos >= pos) {
last = Integer.MAX_VALUE;
currentCount = -1;
return -1;
}
}
++readPos;
mval.value = values[readPos];
currentCount = count[readPos];
return currentCount;
}
@Override
public void visit(int docID, byte[] packedValue) throws IOException {
// TODO: handle filter or deleted documents?
int v = IntPoint.decodeDimension(packedValue, 0);
if (v < last) return;
if (v == last && pos >= 0) {
count[pos]++;
} else {
if (pos+1 < values.length) {
last = v;
++pos;
values[pos] = v;
count[pos] = 1;
} else {
// a new value we don't have room for
throw breakException;
}
}
}
@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
int v = IntPoint.decodeDimension(maxPackedValue, 0);
if (v >= last) {
return PointValues.Relation.CELL_CROSSES_QUERY;
} else {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}
}
static class LongSeg extends BaseSeg {
final long[] values;
long last = Long.MIN_VALUE;
MutableValueLong mval;
LongSeg(PointValues points, int capacity) {
super(points, capacity);
this.values = new long[capacity];
this.currentValue = this.mval = new MutableValueLong();
}
public int setNextValue() throws IOException {
if (readPos >= pos) {
if (last != Long.MAX_VALUE) {
++last;
refill();
}
if (readPos >= pos) {
last = Long.MAX_VALUE;
currentCount = -1;
return -1;
}
}
++readPos;
mval.value = values[readPos];
currentCount = count[readPos];
return currentCount;
}
@Override
public void visit(int docID, byte[] packedValue) throws IOException {
// TODO: handle filter or deleted documents?
long v = LongPoint.decodeDimension(packedValue, 0);
if (v < last) return;
if (v == last && pos >= 0) {
count[pos]++;
} else {
if (pos+1 < values.length) {
last = v;
++pos;
values[pos] = v;
count[pos] = 1;
} else {
// a new value we don't have room for
throw breakException;
}
}
}
@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
long v = LongPoint.decodeDimension(maxPackedValue, 0);
if (v >= last) {
return PointValues.Relation.CELL_CROSSES_QUERY;
} else {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}
}
static class FloatSeg extends BaseSeg {
final float[] values;
float last = -Float.MAX_VALUE;
final MutableValueFloat mval;
FloatSeg(PointValues points, int capacity) {
super(points, capacity);
this.values = new float[capacity];
this.currentValue = this.mval = new MutableValueFloat();
}
public int setNextValue() throws IOException {
if (readPos >= pos) {
if (last != Float.MAX_VALUE) {
last = Math.nextUp(last);
refill();
}
if (readPos >= pos) {
last = Float.MAX_VALUE;
currentCount = -1;
return -1;
}
}
++readPos;
mval.value = values[readPos];
currentCount = count[readPos];
return currentCount;
}
@Override
public void visit(int docID, byte[] packedValue) throws IOException {
// TODO: handle filter or deleted documents?
float v = FloatPoint.decodeDimension(packedValue, 0);
if (v < last) return;
if (v == last && pos >= 0) {
count[pos]++;
} else {
if (pos+1 < values.length) {
last = v;
++pos;
values[pos] = v;
count[pos] = 1;
} else {
// a new value we don't have room for
throw breakException;
}
}
}
@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
float v = FloatPoint.decodeDimension(maxPackedValue, 0);
if (v >= last) {
return PointValues.Relation.CELL_CROSSES_QUERY;
} else {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}
}
static class DoubleSeg extends BaseSeg {
final double[] values;
double last = -Double.MAX_VALUE;
final MutableValueDouble mval;
DoubleSeg(PointValues points, int capacity) {
super(points, capacity);
this.values = new double[capacity];
this.currentValue = this.mval = new MutableValueDouble();
}
public int setNextValue() throws IOException {
if (readPos >= pos) {
if (last != Double.MAX_VALUE) {
last = Math.nextUp(last);
refill();
}
if (readPos >= pos) {
last = Double.MAX_VALUE;
currentCount = -1;
return -1;
}
}
++readPos;
mval.value = values[readPos];
currentCount = count[readPos];
return currentCount;
}
@Override
public void visit(int docID, byte[] packedValue) throws IOException {
// TODO: handle filter or deleted documents?
double v = DoublePoint.decodeDimension(packedValue, 0);
if (v < last) return;
if (v == last && pos >= 0) {
count[pos]++;
} else {
if (pos+1 < values.length) {
last = v;
++pos;
values[pos] = v;
count[pos] = 1;
} else {
// a new value we don't have room for
throw breakException;
}
}
}
@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
double v = DoublePoint.decodeDimension(maxPackedValue, 0);
if (v >= last) {
return PointValues.Relation.CELL_CROSSES_QUERY;
} else {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}
}
static class DateSeg extends LongSeg {
DateSeg(PointValues points, int capacity) {
super(points, capacity);
this.currentValue = this.mval = new MutableValueDate();
}
}
static class BreakException extends RuntimeException {
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
static BreakException breakException = new BreakException();
}