| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /* |
| * cdbparquetrleencoder.c |
| * |
| * Created on: Aug 22, 2013 |
| * Author: malili |
| */ |
| |
| #include "postgres.h" |
| #include "cdb/cdbparquetrleencoder.h" |
| |
| #define RDLEVEL_INIT_CAPACITY 1024 |
| |
| static void writeRLERun(RLEEncoder *encoder); |
| static void writeOrAppendBitPackedRun(RLEEncoder *encoder); |
| static void endPreviousBitPackedRun(RLEEncoder *encoder); |
| |
| static void readNextRun(RLEDecoder *decoder); |
| |
| void |
| RLEEncoder_Init(RLEEncoder *encoder, int bitWidth) |
| { |
| encoder->bitWidth = bitWidth; |
| encoder->packBuffer = (uint8_t *) palloc0(bitWidth); |
| |
| encoder->numBufferedValues = 0; |
| encoder->previousValue = 0; |
| encoder->repeatCount = 0; |
| encoder->bitPackedGroupCount = 0; |
| /* -1 indicate that no current bit-packed-run */ |
| encoder->bitPackedRunHeaderPos = -1; |
| |
| CapacityByteWriter_Init(&encoder->writer, /* capacity= */RDLEVEL_INIT_CAPACITY); |
| } |
| |
| void |
| RLEEncoder_WriteInt(RLEEncoder *encoder, int32_t value) |
| { |
| if (value == encoder->previousValue) |
| { |
| /* records how many times we have seen this value */ |
| encoder->repeatCount++; |
| if (encoder->repeatCount >= 8) |
| { |
| /* |
| * we've seen this at least 8 times, we're |
| * certainly going to write an rle-run, |
| * so just keep on counting repeats for now |
| */ |
| return; |
| } |
| } |
| else |
| { |
| /* This is a new value, check if it signals the end of an rle-run */ |
| if (encoder->repeatCount >= 8) { |
| writeRLERun(encoder); |
| } |
| /* re record the repeat count and value */ |
| encoder->repeatCount = 1; |
| encoder->previousValue = value; |
| } |
| |
| /* |
| * We have not seen enough repeats to justify an rle-run yet, |
| * so buffer this value in case we decide to write a bit-packed-run |
| */ |
| encoder->bufferedValues[encoder->numBufferedValues++] = value; |
| |
| if (encoder->numBufferedValues == 8) |
| { |
| /* |
| * we've encountered less than 8 repeated values, |
| * so either start a new bit-packed-run or |
| * append to the current bit-packed-run |
| */ |
| writeOrAppendBitPackedRun(encoder); |
| } |
| } |
| |
| int |
| RLEEncoder_Flush(RLEEncoder *encoder) |
| { |
| int i; |
| |
| /* write anything that is buffered / queued up for an rle-run */ |
| if (encoder->repeatCount >= 8) |
| { |
| writeRLERun(encoder); |
| } |
| else if (encoder->numBufferedValues > 0) |
| { |
| /* write buffered value to an bit-packed-run */ |
| for (i = encoder->numBufferedValues; i < 8; ++i) |
| encoder->bufferedValues[i] = 0; |
| writeOrAppendBitPackedRun(encoder); |
| endPreviousBitPackedRun(encoder); |
| } |
| else |
| { |
| endPreviousBitPackedRun(encoder); |
| } |
| |
| return encoder->writer.bufferPos; |
| } |
| |
| uint8_t * |
| RLEEncoder_Data(RLEEncoder *encoder) |
| { |
| return encoder->writer.buffer; |
| } |
| |
| int |
| RLEEncoder_Size(RLEEncoder *encoder) |
| { |
| return encoder->writer.bufferPos; |
| } |
| |
| void |
| writeOrAppendBitPackedRun(RLEEncoder *encoder) |
| { |
| if (encoder->bitPackedGroupCount >= BITPACK_RUN_MAX_GROUP_COUNT) |
| { |
| /* |
| * we've packed as many values as we can for this run, |
| * end it and start a new one |
| */ |
| endPreviousBitPackedRun(encoder); |
| } |
| |
| if (encoder->bitPackedRunHeaderPos == -1) |
| { |
| /* |
| * this is a new bit-packed-run, allocate a byte for the header |
| * and keep a "pointer" to it so that it can be mutated later |
| */ |
| encoder->bitPackedRunHeaderPos = encoder->writer.bufferPos; |
| CapacityByteWriter_WriteSingle(&encoder->writer, 0); |
| } |
| |
| pack8Values(encoder->bitWidth, |
| encoder->bufferedValues, 0, |
| encoder->packBuffer, 0); |
| |
| CapacityByteWriter_WriteMany(&encoder->writer, encoder->packBuffer, 0, encoder->bitWidth); |
| |
| /* empty the buffer, they've all been written */ |
| encoder->numBufferedValues = 0; |
| |
| /* |
| * clear the repeat count, as some repeated values, |
| * may have just been bit packed into this run |
| */ |
| encoder->repeatCount = 0; |
| |
| encoder->bitPackedGroupCount++; |
| |
| } |
| |
| /* |
| * If we are currently writing a bit-packed-run, update the |
| * bit-packed-header and consider this run to be over. |
| * |
| * Otherwise do nothing. |
| */ |
| void |
| endPreviousBitPackedRun(RLEEncoder *encoder) |
| { |
| if (encoder->bitPackedRunHeaderPos == -1) |
| return; |
| |
| /* create bit-packed-header, which needs to fit in 1 byte */ |
| uint8_t bitPackHeader = (uint8_t) ((encoder->bitPackedGroupCount << 1) | 1); |
| |
| /* update bit-packed-header */ |
| encoder->writer.buffer[encoder->bitPackedRunHeaderPos] = bitPackHeader; |
| |
| /* mark that this run is over */ |
| encoder->bitPackedRunHeaderPos = -1; |
| |
| /* reset the number of groups */ |
| encoder->bitPackedGroupCount = 0; |
| } |
| |
| |
| /** |
| * write out a rle running |
| */ |
| void |
| writeRLERun(RLEEncoder *encoder) |
| { |
| /* |
| * we may have been working on a bit-packed-run |
| * so close that run if it exists before writing this rle run |
| */ |
| endPreviousBitPackedRun(encoder); |
| |
| /* write the rle-header (lsb of 0 signifies a rle run) */ |
| writeUnsignedVarInt(&encoder->writer, encoder->repeatCount << 1); |
| |
| /* write the repeated-value */ |
| writeIntLittleEndianPaddedOnBitWidth(&encoder->writer, |
| encoder->previousValue, |
| encoder->bitWidth); |
| /* reset the repeat count */ |
| encoder->repeatCount = 0; |
| /* throw away all the buffered values, |
| * they were just repeats and they've been written */ |
| encoder->numBufferedValues = 0; |
| } |
| |
| void |
| RLEDecoder_Init(RLEDecoder *decoder, int bitWidth, uint8_t *in, int inputSize) |
| { |
| decoder->bitWidth = bitWidth; |
| decoder->input = in; |
| decoder->inputPos = 0; |
| decoder->inputSize = inputSize; |
| } |
| |
| int |
| RLEDecoder_ReadInt(RLEDecoder *decoder) |
| { |
| int result = -1; |
| |
| if (decoder->valueCount == 0) |
| { |
| readNextRun(decoder); |
| } |
| |
| switch (decoder->mode) |
| { |
| case MODE_RLE: |
| result = decoder->rleValue; |
| break; |
| case MODE_BITPACK: |
| result = decoder->bitpackBuffer[decoder->bitpackBufferSize - decoder->valueCount]; |
| break; |
| default: |
| /* TODO raise error */ |
| break; |
| } |
| |
| decoder->valueCount--; |
| return result; |
| } |
| |
| void |
| readNextRun(RLEDecoder *decoder) |
| { |
| int i, header, num_groups; |
| |
| if (decoder->inputPos >= decoder->inputSize) |
| { |
| return; /* TODO raise error */ |
| } |
| |
| decoder->inputPos += readUnsignedVarInt(decoder->input + decoder->inputPos, &header); |
| decoder->mode = ((header & 1) == 0) ? MODE_RLE : MODE_BITPACK; |
| |
| switch (decoder->mode) |
| { |
| case MODE_RLE: |
| decoder->valueCount = header >> 1; |
| decoder->inputPos += readIntLittleEndianPaddedOnBitWidth(decoder->bitWidth, |
| decoder->input + decoder->inputPos, |
| &decoder->rleValue); |
| break; |
| case MODE_BITPACK: |
| /* |
| * each bit-pack group contains 8 packed values, |
| * which takes up `bitWidth` bytes when encoded |
| */ |
| num_groups = header >> 1; |
| decoder->valueCount = num_groups * 8; |
| decoder->bitpackBufferSize = decoder->valueCount; |
| |
| for (i = 0; i < decoder->valueCount; i += 8) |
| { |
| unpack8Values(decoder->bitWidth, |
| decoder->input, |
| decoder->inputPos, |
| decoder->bitpackBuffer, |
| i); |
| decoder->inputPos += decoder->bitWidth; |
| } |
| break; |
| } |
| } |