blob: 9bca6e8844254b24148eca777f4a3c4f6b2f5f10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.io;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* used to read reassembled records
*
* @param <T> the type of the materialized record
*/
class RecordReaderImplementation<T> extends RecordReader<T> {
private static final Logger LOG = LoggerFactory.getLogger(RecordReaderImplementation.class);
public static class Case {
private int id;
private final int startLevel;
private final int depth;
private final int nextLevel;
private final boolean goingUp;
private final boolean goingDown;
private final int nextState;
private final boolean defined;
public Case(int startLevel, int depth, int nextLevel, int nextState, boolean defined) {
this.startLevel = startLevel;
this.depth = depth;
this.nextLevel = nextLevel;
this.nextState = nextState;
this.defined = defined;
// means going up the tree (towards the leaves) of the record
// true if we need to open up groups in this case
goingUp = startLevel <= depth;
// means going down the tree (towards the root) of the record
// true if we need to close groups in this case
goingDown = depth + 1 > nextLevel;
}
public void setID(int id) {
this.id = id;
}
@Override
// this implementation is buggy but the simpler one bellow has duplicates.
// it still works but generates more code than necessary
// a middle ground is necessary
// public int hashCode() {
// int hashCode = 0;
// if (goingUp) {
// hashCode += 1 * (1 + startLevel) + 2 * (1 + depth);
// }
// if (goingDown) {
// hashCode += 3 * (1 + depth) + 5 * (1 + nextLevel);
// }
// return hashCode;
// }
public int hashCode() {
int hashCode = 17;
hashCode += 31 * startLevel;
hashCode += 31 * depth;
hashCode += 31 * nextLevel;
hashCode += 31 * nextState;
hashCode += 31 * (defined ? 0 : 1);
return hashCode;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Case) {
return equals((Case)obj);
}
return false;
};
// see comment for hashCode above
// public boolean equals(Case other) {
// if (goingUp && !other.goingUp || !goingUp && other.goingUp) {
// return false;
// }
// if (goingUp && other.goingUp && (startLevel != other.startLevel || depth != other.depth)) {
// return false;
// }
// if (goingDown && !other.goingDown || !goingDown && other.goingDown) {
// return false;
// }
// if (goingDown && other.goingDown && (depth != other.depth || nextLevel != other.nextLevel)) {
// return false;
// }
// return true;
// }
public boolean equals(Case other) {
return other != null
&& startLevel == other.startLevel
&& depth == other.depth
&& nextLevel == other.nextLevel
&& nextState == other.nextState
&& ((defined && other.defined) || (!defined && !other.defined));
}
public int getID() {
return id;
}
public int getStartLevel() {
return startLevel;
}
public int getDepth() {
return depth;
}
public int getNextLevel() {
return nextLevel;
}
public int getNextState() {
return nextState;
}
public boolean isGoingUp() {
return goingUp;
}
public boolean isGoingDown() {
return goingDown;
}
public boolean isDefined() {
return defined;
}
@Override
public String toString() {
return "Case " + startLevel + " -> " + depth + " -> " + nextLevel + "; goto sate_"+getNextState();
}
}
public static class State {
public final int id;
public final PrimitiveColumnIO primitiveColumnIO;
public final int maxDefinitionLevel;
public final int maxRepetitionLevel;
public final PrimitiveTypeName primitive;
public final ColumnReader column;
public final String[] fieldPath; // indexed by currentLevel
public final int[] indexFieldPath; // indexed by currentLevel
public final GroupConverter[] groupConverterPath;
public final PrimitiveConverter primitiveConverter;
public final String primitiveField;
public final int primitiveFieldIndex;
public final int[] nextLevel; //indexed by next r
private int[] definitionLevelToDepth; // indexed by current d
private State[] nextState; // indexed by next r
private Case[][][] caseLookup;
private List<Case> definedCases;
private List<Case> undefinedCases;
private State(int id, PrimitiveColumnIO primitiveColumnIO, ColumnReader column, int[] nextLevel, GroupConverter[] groupConverterPath, PrimitiveConverter primitiveConverter) {
this.id = id;
this.primitiveColumnIO = primitiveColumnIO;
this.maxDefinitionLevel = primitiveColumnIO.getDefinitionLevel();
this.maxRepetitionLevel = primitiveColumnIO.getRepetitionLevel();
this.column = column;
this.nextLevel = nextLevel;
this.groupConverterPath = groupConverterPath;
this.primitiveConverter = primitiveConverter;
this.primitive = primitiveColumnIO.getType().asPrimitiveType().getPrimitiveTypeName();
this.fieldPath = primitiveColumnIO.getFieldPath();
this.primitiveField = fieldPath[fieldPath.length - 1];
this.indexFieldPath = primitiveColumnIO.getIndexFieldPath();
this.primitiveFieldIndex = indexFieldPath[indexFieldPath.length - 1];
}
public int getDepth(int definitionLevel) {
return definitionLevelToDepth[definitionLevel];
}
public List<Case> getDefinedCases() {
return definedCases;
}
public List<Case> getUndefinedCases() {
return undefinedCases;
}
public Case getCase(int currentLevel, int d, int nextR) {
return caseLookup[currentLevel][d][nextR];
}
public State getNextState(int nextR) {
return nextState[nextR];
}
}
private final GroupConverter recordRootConverter;
private final RecordMaterializer<T> recordMaterializer;
private State[] states;
private ColumnReader[] columnReaders;
private boolean shouldSkipCurrentRecord = false;
/**
* @param root the root of the schema
* @param recordMaterializer responsible of materializing the records
* @param validating whether we should validate against the schema
* @param columnStore where to read the column data from
*/
public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer<T> recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore) {
this.recordMaterializer = recordMaterializer;
this.recordRootConverter = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType());
PrimitiveColumnIO[] leaves = root.getLeaves().toArray(new PrimitiveColumnIO[0]);
columnReaders = new ColumnReader[leaves.length];
int[][] nextColumnIdxForRepLevel = new int[leaves.length][];
int[][] levelToClose = new int[leaves.length][];
GroupConverter[][] groupConverterPaths = new GroupConverter[leaves.length][];
PrimitiveConverter[] leafConverters = new PrimitiveConverter[leaves.length];
int[] firstIndexForLevel = new int[256]; // "256 levels of nesting ought to be enough for anybody"
// build the automaton
for (int i = 0; i < leaves.length; i++) {
PrimitiveColumnIO leafColumnIO = leaves[i];
//generate converters along the path from root to leaf
final int[] indexFieldPath = leafColumnIO.getIndexFieldPath();
groupConverterPaths[i] = new GroupConverter[indexFieldPath.length - 1];
GroupConverter current = this.recordRootConverter;
for (int j = 0; j < indexFieldPath.length - 1; j++) {
current = current.getConverter(indexFieldPath[j]).asGroupConverter();
groupConverterPaths[i][j] = current;
}
leafConverters[i] = current.getConverter(indexFieldPath[indexFieldPath.length - 1]).asPrimitiveConverter();
columnReaders[i] = columnStore.getColumnReader(leafColumnIO.getColumnDescriptor());
int maxRepetitionLevel = leafColumnIO.getRepetitionLevel();
nextColumnIdxForRepLevel[i] = new int[maxRepetitionLevel+1];
levelToClose[i] = new int[maxRepetitionLevel+1]; //next level
for (int nextRepLevel = 0; nextRepLevel <= maxRepetitionLevel; ++nextRepLevel) {
// remember which is the first for this level
if (leafColumnIO.isFirst(nextRepLevel)) {
firstIndexForLevel[nextRepLevel] = i;
}
int nextColIdx;
//TODO: when we use nextColumnIdxForRepLevel, should we provide current rep level or the rep level for next item
// figure out automaton transition
if (nextRepLevel == 0) { // 0 always means jump to the next (the last one being a special case)
nextColIdx = i + 1;
} else if (leafColumnIO.isLast(nextRepLevel)) { // when we are at the last of the next repetition level we jump back to the first
nextColIdx = firstIndexForLevel[nextRepLevel];
} else { // otherwise we just go back to the next.
nextColIdx = i + 1;
}
// figure out which level down the tree we need to go back
if (nextColIdx == leaves.length) { // reached the end of the record => close all levels
levelToClose[i][nextRepLevel] = 0;
} else if (leafColumnIO.isLast(nextRepLevel)) { // reached the end of this level => close the repetition level
ColumnIO parent = leafColumnIO.getParent(nextRepLevel);
levelToClose[i][nextRepLevel] = parent.getFieldPath().length - 1;
} else { // otherwise close until the next common parent
levelToClose[i][nextRepLevel] = getCommonParentLevel(
leafColumnIO.getFieldPath(),
leaves[nextColIdx].getFieldPath());
}
// sanity check: that would be a bug
if (levelToClose[i][nextRepLevel] > leaves[i].getFieldPath().length-1) {
throw new ParquetEncodingException(Arrays.toString(leaves[i].getFieldPath())+" -("+nextRepLevel+")-> "+levelToClose[i][nextRepLevel]);
}
nextColumnIdxForRepLevel[i][nextRepLevel] = nextColIdx;
}
}
states = new State[leaves.length];
for (int i = 0; i < leaves.length; i++) {
states[i] = new State(i, leaves[i], columnReaders[i], levelToClose[i], groupConverterPaths[i], leafConverters[i]);
int[] definitionLevelToDepth = new int[states[i].primitiveColumnIO.getDefinitionLevel() + 1];
// for each possible definition level, determine the depth at which to create groups
final ColumnIO[] path = states[i].primitiveColumnIO.getPath();
int depth = 0;
for (int d = 0; d < definitionLevelToDepth.length; ++d) {
while (depth < (states[i].fieldPath.length - 1)
&& d >= path[depth + 1].getDefinitionLevel()
) {
++ depth;
}
definitionLevelToDepth[d] = depth - 1;
}
states[i].definitionLevelToDepth = definitionLevelToDepth;
}
for (int i = 0; i < leaves.length; i++) {
State state = states[i];
int[] nextStateIds = nextColumnIdxForRepLevel[i];
state.nextState = new State[nextStateIds.length];
for (int j = 0; j < nextStateIds.length; j++) {
state.nextState[j] = nextStateIds[j] == states.length ? null : states[nextStateIds[j]];
}
}
for (int i = 0; i < states.length; i++) {
State state = states[i];
final Map<Case, Case> definedCases = new HashMap<Case, Case>();
final Map<Case, Case> undefinedCases = new HashMap<Case, Case>();
Case[][][] caseLookup = new Case[state.fieldPath.length][][];
for (int currentLevel = 0; currentLevel < state.fieldPath.length; ++ currentLevel) {
caseLookup[currentLevel] = new Case[state.maxDefinitionLevel+1][];
for (int d = 0; d <= state.maxDefinitionLevel; ++ d) {
caseLookup[currentLevel][d] = new Case[state.maxRepetitionLevel+1];
for (int nextR = 0; nextR <= state.maxRepetitionLevel; ++ nextR) {
int caseStartLevel = currentLevel;
int caseDepth = Math.max(state.getDepth(d), caseStartLevel - 1);
int caseNextLevel = Math.min(state.nextLevel[nextR], caseDepth + 1);
Case currentCase = new Case(caseStartLevel, caseDepth, caseNextLevel, getNextReader(state.id, nextR), d == state.maxDefinitionLevel);
Map<Case, Case> cases = currentCase.isDefined() ? definedCases : undefinedCases;
if (!cases.containsKey(currentCase)) {
currentCase.setID(cases.size());
cases.put(currentCase, currentCase);
} else {
currentCase = cases.get(currentCase);
}
caseLookup[currentLevel][d][nextR] = currentCase;
}
}
}
state.caseLookup = caseLookup;
state.definedCases = new ArrayList<Case>(definedCases.values());
state.undefinedCases = new ArrayList<Case>(undefinedCases.values());
Comparator<Case> caseComparator = new Comparator<Case>() {
@Override
public int compare(Case o1, Case o2) {
return o1.id - o2.id;
}
};
Collections.sort(state.definedCases, caseComparator);
Collections.sort(state.undefinedCases, caseComparator);
}
}
//TODO: have those wrappers for a converter
private RecordConsumer validator(RecordConsumer recordConsumer, boolean validating, MessageType schema) {
return validating ? new ValidatingRecordConsumer(recordConsumer, schema) : recordConsumer;
}
private RecordConsumer wrap(RecordConsumer recordConsumer) {
if (LOG.isDebugEnabled()) {
return new RecordConsumerLoggingWrapper(recordConsumer);
}
return recordConsumer;
}
/**
* @see org.apache.parquet.io.RecordReader#read()
*/
@Override
public T read() {
int currentLevel = 0;
recordRootConverter.start();
State currentState = states[0];
do {
ColumnReader columnReader = currentState.column;
int d = columnReader.getCurrentDefinitionLevel();
// creating needed nested groups until the current field (opening tags)
int depth = currentState.definitionLevelToDepth[d];
for (; currentLevel <= depth; ++currentLevel) {
currentState.groupConverterPath[currentLevel].start();
}
// currentLevel = depth + 1 at this point
// set the current value
if (d >= currentState.maxDefinitionLevel) {
// not null
columnReader.writeCurrentValueToConverter();
}
columnReader.consume();
int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel();
// level to go to close current groups
int next = currentState.nextLevel[nextR];
for (; currentLevel > next; currentLevel--) {
currentState.groupConverterPath[currentLevel - 1].end();
}
currentState = currentState.nextState[nextR];
} while (currentState != null);
recordRootConverter.end();
T record = recordMaterializer.getCurrentRecord();
shouldSkipCurrentRecord = record == null;
if (shouldSkipCurrentRecord) {
recordMaterializer.skipCurrentRecord();
}
return record;
}
@Override
public boolean shouldSkipCurrentRecord() {
return shouldSkipCurrentRecord;
}
private static void log(String string) {
LOG.debug(string);
}
int getNextReader(int current, int nextRepetitionLevel) {
State nextState = states[current].nextState[nextRepetitionLevel];
return nextState == null ? states.length : nextState.id;
}
int getNextLevel(int current, int nextRepetitionLevel) {
return states[current].nextLevel[nextRepetitionLevel];
}
private int getCommonParentLevel(String[] previous, String[] next) {
int i = 0;
while (i < Math.min(previous.length, next.length) && previous[i].equals(next[i])) {
++i;
}
return i;
}
protected int getStateCount() {
return states.length;
}
protected State getState(int i) {
return states[i];
}
protected RecordMaterializer<T> getMaterializer() {
return recordMaterializer;
}
protected Converter getRecordConsumer() {
return recordRootConverter;
}
protected Iterable<ColumnReader> getColumnReaders() {
// Converting the array to an iterable ensures that the array cannot be altered
return Arrays.asList(columnReaders);
}
}