| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.presto; |
| |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.Set; |
| |
| import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; |
| import org.apache.carbondata.core.metadata.datatype.*; |
| import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; |
| import org.apache.carbondata.presto.readers.*; |
| |
| public class CarbonVectorBatch { |
| |
| private static final int DEFAULT_BATCH_SIZE = 4 * 1024; |
| |
| private final int capacity; |
| private final CarbonColumnVectorImpl[] columns; |
| // True if the row is filtered. |
| private final boolean[] filteredRows; |
| // Column indices that cannot have null values. |
| private final Set<Integer> nullFilteredColumns; |
| private int numRows; |
| // Total number of rows that have been filtered. |
| private int numRowsFiltered = 0; |
| |
| private CarbonVectorBatch(StructField[] schema, CarbonPrestoDecodeReadSupport readSupport, |
| int maxRows) { |
| this.capacity = maxRows; |
| this.columns = new CarbonColumnVectorImpl[schema.length]; |
| this.nullFilteredColumns = new HashSet<>(); |
| this.filteredRows = new boolean[maxRows]; |
| DataType[] dataTypes = readSupport.getDataTypes(); |
| |
| for (int i = 0; i < schema.length; ++i) { |
| columns[i] = createDirectStreamReader(maxRows, dataTypes[i], schema[i]); |
| } |
| } |
| |
| public static CarbonVectorBatch allocate(StructField[] schema, |
| CarbonPrestoDecodeReadSupport readSupport, boolean isDirectFill) { |
| if (isDirectFill) { |
| return new CarbonVectorBatch(schema, readSupport, |
| CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT); |
| } else { |
| return new CarbonVectorBatch(schema, readSupport, DEFAULT_BATCH_SIZE); |
| } |
| } |
| |
| public static CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType, |
| StructField field) { |
| if (dataType == DataTypes.BOOLEAN) { |
| return new BooleanStreamReader(batchSize, field.getDataType()); |
| } else if (dataType == DataTypes.SHORT) { |
| return new ShortStreamReader(batchSize, field.getDataType()); |
| } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) { |
| return new IntegerStreamReader(batchSize, field.getDataType()); |
| } else if (dataType == DataTypes.TIMESTAMP) { |
| return new TimestampStreamReader(batchSize, field.getDataType()); |
| } else if (dataType == DataTypes.LONG) { |
| return new LongStreamReader(batchSize, field.getDataType()); |
| } else if (dataType == DataTypes.DOUBLE) { |
| return new DoubleStreamReader(batchSize, field.getDataType()); |
| } else if (dataType == DataTypes.FLOAT) { |
| return new FloatStreamReader(batchSize, field.getDataType()); |
| } else if (dataType == DataTypes.BYTE) { |
| return new ByteStreamReader(batchSize, field.getDataType()); |
| } else if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR |
| || dataType == DataTypes.BINARY) { |
| return new SliceStreamReader(batchSize, field.getDataType()); |
| } else if (DataTypes.isDecimal(dataType)) { |
| if (dataType instanceof DecimalType) { |
| return new DecimalSliceStreamReader(batchSize, field.getDataType(), (DecimalType) dataType); |
| } else { |
| return null; |
| } |
| } else if (field.getDataType().isComplexType()) { |
| return new ComplexTypeStreamReader(batchSize, field); |
| } else { |
| throw new UnsupportedOperationException("Datatype is invalid"); |
| } |
| } |
| |
| /** |
| * Resets the batch for writing. |
| */ |
| public void reset() { |
| for (int i = 0; i < numCols(); ++i) { |
| columns[i].reset(); |
| } |
| if (this.numRowsFiltered > 0) { |
| Arrays.fill(filteredRows, false); |
| } |
| this.numRows = 0; |
| this.numRowsFiltered = 0; |
| } |
| |
| /** |
| * Returns the number of columns that make up this batch. |
| */ |
| public int numCols() { |
| return columns.length; |
| } |
| |
| /** |
| * Sets the number of rows that are valid. Additionally, marks all rows as "filtered" if one or |
| * more of their attributes are part of a non-nullable column. |
| */ |
| public void setNumRows(int numRows) { |
| assert (numRows <= this.capacity); |
| this.numRows = numRows; |
| |
| for (int ordinal : nullFilteredColumns) { |
| for (int rowId = 0; rowId < numRows; rowId++) { |
| if (!filteredRows[rowId] && columns[ordinal].isNull(rowId)) { |
| filteredRows[rowId] = true; |
| ++numRowsFiltered; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the number of rows for read, including filtered rows. |
| */ |
| public int numRows() { |
| return numRows; |
| } |
| |
| /** |
| * Returns the number of valid rows. |
| */ |
| public int numValidRows() { |
| assert (numRowsFiltered <= numRows); |
| return numRows - numRowsFiltered; |
| } |
| |
| /** |
| * Returns the column at `ordinal`. |
| */ |
| public CarbonColumnVectorImpl column(int ordinal) { |
| return columns[ordinal]; |
| } |
| |
| /** |
| * Returns the max capacity (in number of rows) for this batch. |
| */ |
| public int capacity() { |
| return capacity; |
| } |
| |
| } |