| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.core.datastore.page.encoding; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| |
| import org.apache.carbondata.core.datastore.ColumnType; |
| import org.apache.carbondata.core.datastore.TableSpec; |
| import org.apache.carbondata.core.datastore.page.ColumnPage; |
| import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaFloatingCodec; |
| import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec; |
| import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec; |
| import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec; |
| import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec; |
| import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec; |
| import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta; |
| import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; |
| import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; |
| import org.apache.carbondata.core.metadata.ValueEncoderMeta; |
| import org.apache.carbondata.core.metadata.datatype.DataType; |
| import org.apache.carbondata.core.metadata.datatype.DataTypes; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.format.Encoding; |
| |
| import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_FLOATING; |
| import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL; |
| import static org.apache.carbondata.format.Encoding.ADAPTIVE_FLOATING; |
| import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL; |
| import static org.apache.carbondata.format.Encoding.BOOL_BYTE; |
| import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS; |
| import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS_VARCHAR; |
| import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL; |
| |
| /** |
| * Base class for encoding factory implementation. |
| */ |
| public abstract class EncodingFactory { |
| |
| /** |
| * Return new encoder for specified column |
| */ |
| public abstract ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, |
| ColumnPage inputPage); |
| |
| /** |
| * Return new decoder based on encoder metadata read from file |
| */ |
| public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas, |
| String compressor) throws IOException { |
| return createDecoder(encodings, encoderMetas, compressor, false); |
| } |
| |
| /** |
| * Return new decoder based on encoder metadata read from file |
| * @param encodings encodings used to decode the page |
| * @param encoderMetas metadata of encodings to decode the data |
| * @param compressor Compressor name which will be used to decode data. |
| * @param fullVectorFill whether the flow should go to fill the given vector completely while |
| * decoding the data itself. |
| * @return decoder to decode page. |
| * @throws IOException |
| */ |
| public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas, |
| String compressor, boolean fullVectorFill) throws IOException { |
| assert (encodings.size() >= 1); |
| assert (encoderMetas.size() == 1); |
| boolean isComplexPrimitiveIntLengthEncoding = |
| encodings.contains(Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY); |
| Encoding encoding = encodings.get(0); |
| byte[] encoderMeta = encoderMetas.get(0).array(); |
| ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta); |
| DataInputStream in = new DataInputStream(stream); |
| if (encoding == DIRECT_COMPRESS || encoding == DIRECT_COMPRESS_VARCHAR) { |
| ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta(); |
| metadata.setFillCompleteVector(fullVectorFill); |
| metadata.readFields(in); |
| DirectCompressCodec directCompressCodec = |
| new DirectCompressCodec(metadata.getStoreDataType()); |
| directCompressCodec.setComplexPrimitiveIntLengthEncoding(isComplexPrimitiveIntLengthEncoding); |
| return directCompressCodec.createDecoder(metadata); |
| } else if (encoding == ADAPTIVE_INTEGRAL) { |
| ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta(); |
| metadata.setFillCompleteVector(fullVectorFill); |
| metadata.readFields(in); |
| SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata); |
| return new AdaptiveIntegralCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(), |
| stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata); |
| } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) { |
| ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta(); |
| metadata.setFillCompleteVector(fullVectorFill); |
| metadata.readFields(in); |
| SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata); |
| return new AdaptiveDeltaIntegralCodec(metadata.getSchemaDataType(), |
| metadata.getStoreDataType(), stats, encodings.contains(Encoding.INVERTED_INDEX)) |
| .createDecoder(metadata); |
| } else if (encoding == ADAPTIVE_FLOATING) { |
| ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta(); |
| metadata.setFillCompleteVector(fullVectorFill); |
| metadata.readFields(in); |
| SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata); |
| return new AdaptiveFloatingCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(), |
| stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata); |
| } else if (encoding == ADAPTIVE_DELTA_FLOATING) { |
| ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta(); |
| metadata.setFillCompleteVector(fullVectorFill); |
| metadata.readFields(in); |
| SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata); |
| return new AdaptiveDeltaFloatingCodec(metadata.getSchemaDataType(), |
| metadata.getStoreDataType(), stats, encodings.contains(Encoding.INVERTED_INDEX)) |
| .createDecoder(metadata); |
| } else if (encoding == RLE_INTEGRAL) { |
| RLEEncoderMeta metadata = new RLEEncoderMeta(); |
| metadata.readFields(in); |
| return new RLECodec().createDecoder(metadata); |
| } else if (encoding == BOOL_BYTE) { |
| ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta(); |
| metadata.setFillCompleteVector(fullVectorFill); |
| metadata.readFields(in); |
| return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata); |
| } else { |
| // for backward compatibility |
| ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta); |
| return createDecoderLegacy(metadata, compressor, fullVectorFill); |
| } |
| } |
| |
| /** |
| * Old way of creating decoder, based on algorithm |
| */ |
| public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor) { |
| return createDecoderLegacy(metadata, compressor, false); |
| } |
| |
| /** |
| * Old way of creating decoder, based on algorithm |
| */ |
| private ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor, |
| boolean fullVectorFill) { |
| if (null == metadata) { |
| throw new RuntimeException("internal error"); |
| } |
| SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata); |
| TableSpec.ColumnSpec spec = |
| TableSpec.ColumnSpec.newInstanceLegacy("legacy", stats.getDataType(), ColumnType.MEASURE); |
| DataType dataType = DataType.getDataType(metadata.getType()); |
| if (dataType == DataTypes.BYTE || |
| dataType == DataTypes.SHORT || |
| dataType == DataTypes.INT || |
| dataType == DataTypes.LONG) { |
| // create the codec based on algorithm and create decoder by recovering the metadata |
| ColumnPageCodec codec = |
| DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats, false, spec); |
| if (codec instanceof AdaptiveIntegralCodec) { |
| AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec; |
| ColumnPageEncoderMeta meta = |
| new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor); |
| meta.setFillCompleteVector(fullVectorFill); |
| return codec.createDecoder(meta); |
| } else if (codec instanceof AdaptiveDeltaIntegralCodec) { |
| AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec; |
| ColumnPageEncoderMeta meta = |
| new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor); |
| meta.setFillCompleteVector(fullVectorFill); |
| return codec.createDecoder(meta); |
| } else if (codec instanceof DirectCompressCodec) { |
| ColumnPageEncoderMeta meta = |
| new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats, |
| compressor); |
| meta.setFillCompleteVector(fullVectorFill); |
| return codec.createDecoder(meta); |
| } else { |
| throw new RuntimeException("internal error"); |
| } |
| } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) { |
| // create the codec based on algorithm and create decoder by recovering the metadata |
| ColumnPageCodec codec = |
| DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats, false, spec); |
| if (codec instanceof AdaptiveFloatingCodec) { |
| AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec; |
| ColumnPageEncoderMeta meta = |
| new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor); |
| meta.setFillCompleteVector(fullVectorFill); |
| return codec.createDecoder(meta); |
| } else if (codec instanceof DirectCompressCodec) { |
| ColumnPageEncoderMeta meta = |
| new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats, |
| compressor); |
| meta.setFillCompleteVector(fullVectorFill); |
| return codec.createDecoder(meta); |
| } else if (codec instanceof AdaptiveDeltaFloatingCodec) { |
| AdaptiveDeltaFloatingCodec adaptiveCodec = (AdaptiveDeltaFloatingCodec) codec; |
| ColumnPageEncoderMeta meta = |
| new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor); |
| meta.setFillCompleteVector(fullVectorFill); |
| return codec.createDecoder(meta); |
| } else { |
| throw new RuntimeException("internal error"); |
| } |
| } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.BYTE_ARRAY) { |
| // no dictionary dimension |
| ColumnPageEncoderMeta meta = |
| new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor); |
| meta.setFillCompleteVector(fullVectorFill); |
| return new DirectCompressCodec(stats.getDataType()).createDecoder(meta); |
| } else { |
| throw new RuntimeException("unsupported data type: " + stats.getDataType()); |
| } |
| } |
| } |