blob: 15be50e55d842f0383e08b10ad60560b89a73aba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.internal.column.columnindex;
import static java.util.Objects.requireNonNull;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Formatter;
import java.util.List;
import java.util.PrimitiveIterator;
import java.util.function.IntPredicate;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import org.apache.parquet.filter2.predicate.Operators.Lt;
import org.apache.parquet.filter2.predicate.Operators.LtEq;
import org.apache.parquet.filter2.predicate.Operators.Not;
import org.apache.parquet.filter2.predicate.Operators.NotEq;
import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;
import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveComparator;
import org.apache.parquet.schema.PrimitiveStringifier;
import org.apache.parquet.schema.PrimitiveType;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import it.unimi.dsi.fastutil.booleans.BooleanList;
import it.unimi.dsi.fastutil.booleans.BooleanLists;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import it.unimi.dsi.fastutil.longs.LongLists;
/**
* Builder implementation to create {@link ColumnIndex} objects.
*/
public abstract class ColumnIndexBuilder {
static abstract class ColumnIndexBase<C> implements ColumnIndex {
/*
* A class containing the value to be compared to the min/max values. This way we only need to do the deboxing once
* per predicate execution instead for every comparison.
*/
abstract class ValueComparator {
abstract int compareValueToMin(int arrayIndex);
abstract int compareValueToMax(int arrayIndex);
int arrayLength() {
return pageIndexes.length;
}
int translate(int arrayIndex) {
return pageIndexes[arrayIndex];
}
}
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
private static final int MAX_VALUE_LENGTH_FOR_TOSTRING = 40;
private static final String TOSTRING_TRUNCATION_MARKER = "(...)";
private static final int TOSTRING_TRUNCATION_START_POS = (MAX_VALUE_LENGTH_FOR_TOSTRING
- TOSTRING_TRUNCATION_MARKER.length()) / 2;
private static final int TOSTRING_TRUNCATION_END_POS = MAX_VALUE_LENGTH_FOR_TOSTRING
- TOSTRING_TRUNCATION_MARKER.length() - TOSTRING_TRUNCATION_START_POS;
private static final String TOSTRING_MISSING_VALUE_MARKER = "<none>";
final PrimitiveStringifier stringifier;
final PrimitiveComparator<C> comparator;
private boolean[] nullPages;
private BoundaryOrder boundaryOrder;
// Storing the page index for each array index (min/max values are not stored for null-pages)
private int[] pageIndexes;
// might be null
private long[] nullCounts;
static String truncate(String str) {
if (str.length() <= MAX_VALUE_LENGTH_FOR_TOSTRING) {
return str;
}
return str.substring(0, TOSTRING_TRUNCATION_START_POS) + TOSTRING_TRUNCATION_MARKER
+ str.substring(str.length() - TOSTRING_TRUNCATION_END_POS);
}
ColumnIndexBase(PrimitiveType type) {
comparator = type.comparator();
stringifier = type.stringifier();
}
@Override
public BoundaryOrder getBoundaryOrder() {
return boundaryOrder;
}
@Override
public List<Long> getNullCounts() {
if (nullCounts == null) {
return null;
}
return LongLists.unmodifiable(LongArrayList.wrap(nullCounts));
}
@Override
public List<Boolean> getNullPages() {
return BooleanLists.unmodifiable(BooleanArrayList.wrap(nullPages));
}
@Override
public List<ByteBuffer> getMinValues() {
List<ByteBuffer> list = new ArrayList<>(getPageCount());
int arrayIndex = 0;
for (int i = 0, n = getPageCount(); i < n; ++i) {
if (isNullPage(i)) {
list.add(EMPTY_BYTE_BUFFER);
} else {
list.add(getMinValueAsBytes(arrayIndex++));
}
}
return list;
}
@Override
public List<ByteBuffer> getMaxValues() {
List<ByteBuffer> list = new ArrayList<>(getPageCount());
int arrayIndex = 0;
for (int i = 0, n = getPageCount(); i < n; ++i) {
if (isNullPage(i)) {
list.add(EMPTY_BYTE_BUFFER);
} else {
list.add(getMaxValueAsBytes(arrayIndex++));
}
}
return list;
}
@Override
public String toString() {
try (Formatter formatter = new Formatter()) {
formatter.format("Boudary order: %s\n", boundaryOrder);
String minMaxPart = " %-" + MAX_VALUE_LENGTH_FOR_TOSTRING + "s %-" + MAX_VALUE_LENGTH_FOR_TOSTRING + "s\n";
formatter.format("%-10s %20s" + minMaxPart, "", "null count", "min", "max");
String format = "page-%-5d %20s" + minMaxPart;
int arrayIndex = 0;
for (int i = 0, n = nullPages.length; i < n; ++i) {
String nullCount = nullCounts == null ? TOSTRING_MISSING_VALUE_MARKER : Long.toString(nullCounts[i]);
String min, max;
if (nullPages[i]) {
min = max = TOSTRING_MISSING_VALUE_MARKER;
} else {
min = truncate(getMinValueAsString(arrayIndex));
max = truncate(getMaxValueAsString(arrayIndex++));
}
formatter.format(format, i, nullCount, min, max);
}
return formatter.toString();
}
}
int getPageCount() {
return nullPages.length;
}
boolean isNullPage(int pageIndex) {
return nullPages[pageIndex];
}
/*
* Returns the min value for arrayIndex as a ByteBuffer. (Min values are not stored for null-pages so arrayIndex
* might not equal to pageIndex.)
*/
abstract ByteBuffer getMinValueAsBytes(int arrayIndex);
/*
* Returns the max value for arrayIndex as a ByteBuffer. (Max values are not stored for null-pages so arrayIndex
* might not equal to pageIndex.)
*/
abstract ByteBuffer getMaxValueAsBytes(int arrayIndex);
/*
* Returns the min value for arrayIndex as a String. (Min values are not stored for null-pages so arrayIndex might
* not equal to pageIndex.)
*/
abstract String getMinValueAsString(int arrayIndex);
/*
* Returns the max value for arrayIndex as a String. (Max values are not stored for null-pages so arrayIndex might
* not equal to pageIndex.)
*/
abstract String getMaxValueAsString(int arrayIndex);
/* Creates a Statistics object for filtering. Used for user defined predicates. */
abstract <T extends Comparable<T>> org.apache.parquet.filter2.predicate.Statistics<T> createStats(int arrayIndex);
/* Creates a ValueComparator object containing the specified value to be compared for min/max values */
abstract ValueComparator createValueComparator(Object value);
@Override
public PrimitiveIterator.OfInt visit(And and) {
throw new UnsupportedOperationException("AND shall not be used on column index directly");
}
@Override
public PrimitiveIterator.OfInt visit(Not not) {
throw new UnsupportedOperationException("NOT shall not be used on column index directly");
}
@Override
public PrimitiveIterator.OfInt visit(Or or) {
throw new UnsupportedOperationException("OR shall not be used on column index directly");
}
@Override
public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Eq<T> eq) {
T value = eq.getValue();
if (value == null) {
if (nullCounts == null) {
// Searching for nulls so if we don't have null related statistics we have to return all pages
return IndexIterator.all(getPageCount());
} else {
return IndexIterator.filter(getPageCount(), pageIndex -> nullCounts[pageIndex] > 0);
}
}
return getBoundaryOrder().eq(createValueComparator(value));
}
@Override
public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Gt<T> gt) {
return getBoundaryOrder().gt(createValueComparator(gt.getValue()));
}
@Override
public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(GtEq<T> gtEq) {
return getBoundaryOrder().gtEq(createValueComparator(gtEq.getValue()));
}
@Override
public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Lt<T> lt) {
return getBoundaryOrder().lt(createValueComparator(lt.getValue()));
}
@Override
public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(LtEq<T> ltEq) {
return getBoundaryOrder().ltEq(createValueComparator(ltEq.getValue()));
}
@Override
public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(NotEq<T> notEq) {
T value = notEq.getValue();
if (value == null) {
return IndexIterator.filter(getPageCount(), pageIndex -> !nullPages[pageIndex]);
}
if (nullCounts == null) {
// Nulls match so if we don't have null related statistics we have to return all pages
return IndexIterator.all(getPageCount());
}
// Merging value filtering with pages containing nulls
IntSet matchingIndexes = new IntOpenHashSet();
getBoundaryOrder().notEq(createValueComparator(value))
.forEachRemaining((int index) -> matchingIndexes.add(index));
return IndexIterator.filter(getPageCount(),
pageIndex -> nullCounts[pageIndex] > 0 || matchingIndexes.contains(pageIndex));
}
@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> PrimitiveIterator.OfInt visit(
UserDefined<T, U> udp) {
final UserDefinedPredicate<T> predicate = udp.getUserDefinedPredicate();
final boolean acceptNulls = predicate.acceptsNullValue();
if (acceptNulls && nullCounts == null) {
// Nulls match so if we don't have null related statistics we have to return all pages
return IndexIterator.all(getPageCount());
}
return IndexIterator.filter(getPageCount(), new IntPredicate() {
private int arrayIndex = -1;
@Override
public boolean test(int pageIndex) {
if (isNullPage(pageIndex)) {
return acceptNulls;
} else {
++arrayIndex;
if (acceptNulls && nullCounts[pageIndex] > 0) {
return true;
}
org.apache.parquet.filter2.predicate.Statistics<T> stats = createStats(arrayIndex);
return !predicate.canDrop(stats);
}
}
});
}
@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> PrimitiveIterator.OfInt visit(
LogicalNotUserDefined<T, U> udp) {
final UserDefinedPredicate<T> inversePredicate = udp.getUserDefined().getUserDefinedPredicate();
final boolean acceptNulls = !inversePredicate.acceptsNullValue();
if (acceptNulls && nullCounts == null) {
// Nulls match so if we don't have null related statistics we have to return all pages
return IndexIterator.all(getPageCount());
}
return IndexIterator.filter(getPageCount(), new IntPredicate() {
private int arrayIndex = -1;
@Override
public boolean test(int pageIndex) {
if (isNullPage(pageIndex)) {
return acceptNulls;
} else {
++arrayIndex;
if (acceptNulls && nullCounts[pageIndex] > 0) {
return true;
}
org.apache.parquet.filter2.predicate.Statistics<T> stats = createStats(arrayIndex);
return !inversePredicate.inverseCanDrop(stats);
}
}
});
}
}
private static final ColumnIndexBuilder NO_OP_BUILDER = new ColumnIndexBuilder() {
@Override
public ColumnIndex build() {
return null;
}
@Override
public void add(Statistics<?> stats) {
}
@Override
void addMinMax(Object min, Object max) {
}
@Override
ColumnIndexBase<?> createColumnIndex(PrimitiveType type) {
return null;
}
@Override
void clearMinMax() {
}
@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
}
@Override
int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
return 0;
}
@Override
int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
return 0;
}
@Override
int sizeOf(Object value) {
return 0;
}
};
private PrimitiveType type;
private final BooleanList nullPages = new BooleanArrayList();
private final LongList nullCounts = new LongArrayList();
private long minMaxSize;
private final IntList pageIndexes = new IntArrayList();
private int nextPageIndex;
/**
* @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at
* {@link #build()}.
*/
public static ColumnIndexBuilder getNoOpBuilder() {
return NO_OP_BUILDER;
}
/**
* @param type
* the type this builder is to be created for
* @param truncateLength
* the length to be used for truncating binary values if possible
* @return a {@link ColumnIndexBuilder} instance to be used for creating {@link ColumnIndex} objects
*/
public static ColumnIndexBuilder getBuilder(PrimitiveType type, int truncateLength) {
ColumnIndexBuilder builder = createNewBuilder(type, truncateLength);
builder.type = type;
return builder;
}
private static ColumnIndexBuilder createNewBuilder(PrimitiveType type, int truncateLength) {
switch (type.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
case INT96:
return new BinaryColumnIndexBuilder(type, truncateLength);
case BOOLEAN:
return new BooleanColumnIndexBuilder();
case DOUBLE:
return new DoubleColumnIndexBuilder();
case FLOAT:
return new FloatColumnIndexBuilder();
case INT32:
return new IntColumnIndexBuilder();
case INT64:
return new LongColumnIndexBuilder();
default:
throw new IllegalArgumentException("Unsupported type for column index: " + type);
}
}
/**
* @param type
* the primitive type
* @param boundaryOrder
* the boundary order of the min/max values
* @param nullPages
* the null pages (one boolean value for each page that signifies whether the page consists of nulls
* entirely)
* @param nullCounts
* the number of null values for each page
* @param minValues
* the min values for each page
* @param maxValues
* the max values for each page
* @return the newly created {@link ColumnIndex} object based on the specified arguments
*/
public static ColumnIndex build(
PrimitiveType type,
BoundaryOrder boundaryOrder,
List<Boolean> nullPages,
List<Long> nullCounts,
List<ByteBuffer> minValues,
List<ByteBuffer> maxValues) {
ColumnIndexBuilder builder = createNewBuilder(type, Integer.MAX_VALUE);
builder.fill(nullPages, nullCounts, minValues, maxValues);
ColumnIndexBase<?> columnIndex = builder.build(type);
columnIndex.boundaryOrder = requireNonNull(boundaryOrder);
return columnIndex;
}
ColumnIndexBuilder() {
// Shall be able to be created inside this package only
}
/**
* Adds the data from the specified statistics to this builder
*
* @param stats
* the statistics to be added
*/
public void add(Statistics<?> stats) {
if (stats.hasNonNullValue()) {
nullPages.add(false);
Object min = stats.genericGetMin();
Object max = stats.genericGetMax();
addMinMax(min, max);
pageIndexes.add(nextPageIndex);
minMaxSize += sizeOf(min);
minMaxSize += sizeOf(max);
} else {
nullPages.add(true);
}
nullCounts.add(stats.getNumNulls());
++nextPageIndex;
}
abstract void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max);
abstract void addMinMax(Object min, Object max);
private void fill(List<Boolean> nullPages, List<Long> nullCounts, List<ByteBuffer> minValues,
List<ByteBuffer> maxValues) {
clear();
int pageCount = nullPages.size();
if ((nullCounts != null && nullCounts.size() != pageCount) || minValues.size() != pageCount
|| maxValues.size() != pageCount) {
throw new IllegalArgumentException(
String.format("Not all sizes are equal (nullPages:%d, nullCounts:%s, minValues:%d, maxValues:%d",
nullPages.size(), nullCounts == null ? "null" : nullCounts.size(), minValues.size(), maxValues.size()));
}
this.nullPages.addAll(nullPages);
// Nullcounts is optional in the format
if (nullCounts != null) {
this.nullCounts.addAll(nullCounts);
}
for (int i = 0; i < pageCount; ++i) {
if (!nullPages.get(i)) {
ByteBuffer min = minValues.get(i);
ByteBuffer max = maxValues.get(i);
addMinMaxFromBytes(min, max);
pageIndexes.add(i);
minMaxSize += min.remaining();
minMaxSize += max.remaining();
}
}
}
/**
* @return the newly created column index or {@code null} if the {@link ColumnIndex} would be empty
*/
public ColumnIndex build() {
ColumnIndexBase<?> columnIndex = build(type);
if (columnIndex == null) {
return null;
}
columnIndex.boundaryOrder = calculateBoundaryOrder(type.comparator());
return columnIndex;
}
private ColumnIndexBase<?> build(PrimitiveType type) {
if (nullPages.isEmpty()) {
return null;
}
ColumnIndexBase<?> columnIndex = createColumnIndex(type);
if (columnIndex == null) {
// Might happen if the specialized builder discovers invalid min/max values
return null;
}
columnIndex.nullPages = nullPages.toBooleanArray();
// Null counts is optional so keep it null if the builder has no values
if (!nullCounts.isEmpty()) {
columnIndex.nullCounts = nullCounts.toLongArray();
}
columnIndex.pageIndexes = pageIndexes.toIntArray();
return columnIndex;
}
private BoundaryOrder calculateBoundaryOrder(PrimitiveComparator<Binary> comparator) {
if (isAscending(comparator)) {
return BoundaryOrder.ASCENDING;
} else if (isDescending(comparator)) {
return BoundaryOrder.DESCENDING;
} else {
return BoundaryOrder.UNORDERED;
}
}
// min[i] <= min[i+1] && max[i] <= max[i+1]
private boolean isAscending(PrimitiveComparator<Binary> comparator) {
for (int i = 1, n = pageIndexes.size(); i < n; ++i) {
if (compareMinValues(comparator, i - 1, i) > 0 || compareMaxValues(comparator, i - 1, i) > 0) {
return false;
}
}
return true;
}
// min[i] >= min[i+1] && max[i] >= max[i+1]
private boolean isDescending(PrimitiveComparator<Binary> comparator) {
for (int i = 1, n = pageIndexes.size(); i < n; ++i) {
if (compareMinValues(comparator, i - 1, i) < 0 || compareMaxValues(comparator, i - 1, i) < 0) {
return false;
}
}
return true;
}
abstract int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2);
abstract int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2);
private void clear() {
nullPages.clear();
nullCounts.clear();
clearMinMax();
minMaxSize = 0;
nextPageIndex = 0;
pageIndexes.clear();
}
abstract void clearMinMax();
abstract ColumnIndexBase<?> createColumnIndex(PrimitiveType type);
abstract int sizeOf(Object value);
/**
* @return the number of pages added so far to this builder
*/
public int getPageCount() {
return nullPages.size();
}
/**
* @return the sum of size in bytes of the min/max values added so far to this builder
*/
public long getMinMaxSize() {
return minMaxSize;
}
}