/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "orc/Int128.hh"

#include "Adaptor.hh"
#include "ByteRLE.hh"
#include "ColumnReader.hh"
#include "orc/Exceptions.hh"
#include "RLE.hh"

#include <math.h>
#include <iostream>

namespace orc {

  StripeStreams::~StripeStreams() {
    // PASS
  }

  inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
    switch (static_cast<int64_t>(kind)) {
    case proto::ColumnEncoding_Kind_DIRECT:
    case proto::ColumnEncoding_Kind_DICTIONARY:
      return RleVersion_1;
    case proto::ColumnEncoding_Kind_DIRECT_V2:
    case proto::ColumnEncoding_Kind_DICTIONARY_V2:
      return RleVersion_2;
    default:
      throw ParseError("Unknown encoding in convertRleVersion");
    }
  }

  ColumnReader::ColumnReader(const Type& type,
                             StripeStreams& stripe
                             ): columnId(type.getColumnId()),
                                memoryPool(stripe.getMemoryPool()) {
    std::unique_ptr<SeekableInputStream> stream =
      stripe.getStream(columnId, proto::Stream_Kind_PRESENT, true);
    if (stream.get()) {
      notNullDecoder = createBooleanRleDecoder(std::move(stream));
    }
  }

  ColumnReader::~ColumnReader() {
    // PASS
  }

  uint64_t ColumnReader::skip(uint64_t numValues) {
    ByteRleDecoder* decoder = notNullDecoder.get();
    if (decoder) {
      // page through the values that we want to skip
      // and count how many are non-null
      const size_t MAX_BUFFER_SIZE = 32768;
      size_t bufferSize = std::min(MAX_BUFFER_SIZE,
                                   static_cast<size_t>(numValues));
      char buffer[MAX_BUFFER_SIZE];
      uint64_t remaining = numValues;
      while (remaining > 0) {
        uint64_t chunkSize =
          std::min(remaining,
                   static_cast<uint64_t>(bufferSize));
        decoder->next(buffer, chunkSize, nullptr);
        remaining -= chunkSize;
        for(uint64_t i=0; i < chunkSize; ++i) {
          if (!buffer[i]) {
            numValues -= 1;
          }
        }
      }
    }
    return numValues;
  }

  void ColumnReader::next(ColumnVectorBatch& rowBatch,
                          uint64_t numValues,
                          char* incomingMask) {
    if (numValues > rowBatch.capacity) {
      rowBatch.resize(numValues);
    }
    rowBatch.numElements = numValues;
    ByteRleDecoder* decoder = notNullDecoder.get();
    if (decoder) {
      char* notNullArray = rowBatch.notNull.data();
      decoder->next(notNullArray, numValues, incomingMask);
      // check to see if there are nulls in this batch
      for(uint64_t i=0; i < numValues; ++i) {
        if (!notNullArray[i]) {
          rowBatch.hasNulls = true;
          return;
        }
      }
    } else if (incomingMask) {
      // If we don't have a notNull stream, copy the incomingMask
      rowBatch.hasNulls = true;
      memcpy(rowBatch.notNull.data(), incomingMask, numValues);
      return;
    }
    rowBatch.hasNulls = false;
  }

  void ColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    if (notNullDecoder.get()) {
      notNullDecoder->seek(positions.at(columnId));
    }
  }

  /**
   * Expand an array of bytes in place to the corresponding array of longs.
   * Has to work backwards so that they data isn't clobbered during the
   * expansion.
   * @param buffer the array of chars and array of longs that need to be
   *        expanded
   * @param numValues the number of bytes to convert to longs
   */
  void expandBytesToLongs(int64_t* buffer, uint64_t numValues) {
    for(size_t i=numValues - 1; i < numValues; --i) {
      buffer[i] = reinterpret_cast<char *>(buffer)[i];
    }
  }

  class BooleanColumnReader: public ColumnReader {
  private:
    std::unique_ptr<orc::ByteRleDecoder> rle;

  public:
    BooleanColumnReader(const Type& type, StripeStreams& stipe);
    ~BooleanColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char* notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };

  BooleanColumnReader::BooleanColumnReader(const Type& type,
                                           StripeStreams& stripe
                                           ): ColumnReader(type, stripe){
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr)
      throw ParseError("DATA stream not found in Boolean column");
    rle = createBooleanRleDecoder(std::move(stream));
  }

  BooleanColumnReader::~BooleanColumnReader() {
    // PASS
  }

  uint64_t BooleanColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    rle->skip(numValues);
    return numValues;
  }

  void BooleanColumnReader::next(ColumnVectorBatch& rowBatch,
                                 uint64_t numValues,
                                 char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    // Since the byte rle places the output in a char* instead of long*,
    // we cheat here and use the long* and then expand it in a second pass.
    int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
    rle->next(reinterpret_cast<char*>(ptr),
              numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
    expandBytesToLongs(ptr, numValues);
  }

  void BooleanColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle->seek(positions.at(columnId));
  }

  class ByteColumnReader: public ColumnReader {
  private:
    std::unique_ptr<orc::ByteRleDecoder> rle;

  public:
    ByteColumnReader(const Type& type, StripeStreams& stipe);
    ~ByteColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char* notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };

  ByteColumnReader::ByteColumnReader(const Type& type,
                                           StripeStreams& stripe
                                           ): ColumnReader(type, stripe){
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr)
      throw ParseError("DATA stream not found in Byte column");
    rle = createByteRleDecoder(std::move(stream));
  }

  ByteColumnReader::~ByteColumnReader() {
    // PASS
  }

  uint64_t ByteColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    rle->skip(numValues);
    return numValues;
  }

  void ByteColumnReader::next(ColumnVectorBatch& rowBatch,
                              uint64_t numValues,
                              char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    // Since the byte rle places the output in a char* instead of long*,
    // we cheat here and use the long* and then expand it in a second pass.
    int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
    rle->next(reinterpret_cast<char*>(ptr),
              numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
    expandBytesToLongs(ptr, numValues);
  }

  void ByteColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle->seek(positions.at(columnId));
  }

  class IntegerColumnReader: public ColumnReader {
  protected:
    std::unique_ptr<orc::RleDecoder> rle;

  public:
    IntegerColumnReader(const Type& type, StripeStreams& stripe);
    ~IntegerColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char* notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };

  IntegerColumnReader::IntegerColumnReader(const Type& type,
                                           StripeStreams& stripe
                                           ): ColumnReader(type, stripe) {
    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr)
      throw ParseError("DATA stream not found in Integer column");
    rle = createRleDecoder(std::move(stream), true, vers, memoryPool);
  }

  IntegerColumnReader::~IntegerColumnReader() {
    // PASS
  }

  uint64_t IntegerColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    rle->skip(numValues);
    return numValues;
  }

  void IntegerColumnReader::next(ColumnVectorBatch& rowBatch,
                                 uint64_t numValues,
                                 char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    rle->next(dynamic_cast<LongVectorBatch&>(rowBatch).data.data(),
              numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
  }

  void IntegerColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle->seek(positions.at(columnId));
  }

  class TimestampColumnReader: public ColumnReader {
  private:
    std::unique_ptr<orc::RleDecoder> secondsRle;
    std::unique_ptr<orc::RleDecoder> nanoRle;
    const Timezone& writerTimezone;
    const int64_t epochOffset;

  public:
    TimestampColumnReader(const Type& type, StripeStreams& stripe);
    ~TimestampColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char* notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };


  TimestampColumnReader::TimestampColumnReader(const Type& type,
                                               StripeStreams& stripe
                               ): ColumnReader(type, stripe),
                                  writerTimezone(stripe.getWriterTimezone()),
                                  epochOffset(writerTimezone.getEpoch()) {
    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr)
      throw ParseError("DATA stream not found in Timestamp column");
    secondsRle = createRleDecoder(std::move(stream), true, vers, memoryPool);
    stream = stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
    if (stream == nullptr)
      throw ParseError("SECONDARY stream not found in Timestamp column");
    nanoRle = createRleDecoder(std::move(stream), false, vers, memoryPool);
  }

  TimestampColumnReader::~TimestampColumnReader() {
    // PASS
  }

  uint64_t TimestampColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    secondsRle->skip(numValues);
    nanoRle->skip(numValues);
    return numValues;
  }

  void TimestampColumnReader::next(ColumnVectorBatch& rowBatch,
                                   uint64_t numValues,
                                   char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    TimestampVectorBatch& timestampBatch =
      dynamic_cast<TimestampVectorBatch&>(rowBatch);
    int64_t *secsBuffer = timestampBatch.data.data();
    secondsRle->next(secsBuffer, numValues, notNull);
    int64_t *nanoBuffer = timestampBatch.nanoseconds.data();
    nanoRle->next(nanoBuffer, numValues, notNull);

    // Construct the values
    for(uint64_t i=0; i < numValues; i++) {
      if (notNull == nullptr || notNull[i]) {
        uint64_t zeros = nanoBuffer[i] & 0x7;
        nanoBuffer[i] >>= 3;
        if (zeros != 0) {
          for(uint64_t j = 0; j <= zeros; ++j) {
            nanoBuffer[i] *= 10;
          }
        }
        int64_t writerTime = secsBuffer[i] + epochOffset;
        secsBuffer[i] = writerTimezone.convertToUTC(writerTime);
        if (secsBuffer[i] < 0 && nanoBuffer[i] != 0) {
          secsBuffer[i] -= 1;
        }
      }
    }
  }

  void TimestampColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    secondsRle->seek(positions.at(columnId));
    nanoRle->seek(positions.at(columnId));
  }

  class DoubleColumnReader: public ColumnReader {
  public:
    DoubleColumnReader(const Type& type, StripeStreams& stripe);
    ~DoubleColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char* notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;

  private:
    std::unique_ptr<SeekableInputStream> inputStream;
    TypeKind columnKind;
    const uint64_t bytesPerValue ;
    const char *bufferPointer;
    const char *bufferEnd;

    unsigned char readByte() {
      if (bufferPointer == bufferEnd) {
        int length;
        if (!inputStream->Next
            (reinterpret_cast<const void**>(&bufferPointer), &length)) {
          throw ParseError("bad read in DoubleColumnReader::next()");
        }
        bufferEnd = bufferPointer + length;
      }
      return static_cast<unsigned char>(*(bufferPointer++));
    }

    double readDouble() {
      int64_t bits = 0;
      for (uint64_t i=0; i < 8; i++) {
        bits |= static_cast<int64_t>(readByte()) << (i*8);
      }
      double *result = reinterpret_cast<double*>(&bits);
      return *result;
    }

    double readFloat() {
      int32_t bits = 0;
      for (uint64_t i=0; i < 4; i++) {
        bits |= readByte() << (i*8);
      }
      float *result = reinterpret_cast<float*>(&bits);
      return static_cast<double>(*result);
    }
  };

  DoubleColumnReader::DoubleColumnReader(const Type& type,
                                         StripeStreams& stripe
                                         ): ColumnReader(type, stripe),
                                            columnKind(type.getKind()),
                                            bytesPerValue((type.getKind() ==
                                                           FLOAT) ? 4 : 8),
                                            bufferPointer(nullptr),
                                            bufferEnd(nullptr) {
    inputStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (inputStream == nullptr)
      throw ParseError("DATA stream not found in Double column");
  }

  DoubleColumnReader::~DoubleColumnReader() {
    // PASS
  }

  uint64_t DoubleColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);

    if (static_cast<size_t>(bufferEnd - bufferPointer) >=
        bytesPerValue * numValues) {
      bufferPointer += bytesPerValue * numValues;
    } else {
      size_t sizeToSkip = bytesPerValue * numValues -
                          static_cast<size_t>(bufferEnd - bufferPointer);
      const size_t cap = static_cast<size_t>(std::numeric_limits<int>::max());
      while (sizeToSkip != 0) {
        size_t step = sizeToSkip > cap ? cap : sizeToSkip;
        inputStream->Skip(static_cast<int>(step));
        sizeToSkip -= step;
      }
      bufferEnd = nullptr;
      bufferPointer = nullptr;
    }

    return numValues;
  }

  void DoubleColumnReader::next(ColumnVectorBatch& rowBatch,
                                uint64_t numValues,
                                char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    // update the notNull from the parent class
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    double* outArray = dynamic_cast<DoubleVectorBatch&>(rowBatch).data.data();

    if (columnKind == FLOAT) {
      if (notNull) {
        for(size_t i=0; i < numValues; ++i) {
          if (notNull[i]) {
            outArray[i] = readFloat();
          }
        }
      } else {
        for(size_t i=0; i < numValues; ++i) {
          outArray[i] = readFloat();
        }
      }
    } else {
      if (notNull) {
        for(size_t i=0; i < numValues; ++i) {
          if (notNull[i]) {
            outArray[i] = readDouble();
          }
        }
      } else {
        for(size_t i=0; i < numValues; ++i) {
          outArray[i] = readDouble();
        }
      }
    }
  }

  void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) {
    int64_t posn = 0;
    while (posn < bufferSize) {
      const void* chunk;
      int length;
      if (!stream->Next(&chunk, &length)) {
        throw ParseError("bad read in readFully");
      }
      if (posn + length > bufferSize) {
        throw ParseError("Corrupt dictionary blob in StringDictionaryColumn");
      }
      memcpy(buffer + posn, chunk, static_cast<size_t>(length));
      posn += length;
    }
  }

  void DoubleColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    inputStream->seek(positions.at(columnId));
  }

  class StringDictionaryColumnReader: public ColumnReader {
  private:
    std::shared_ptr<StringDictionary> dictionary;
    std::unique_ptr<RleDecoder> rle;

  public:
    StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
    ~StringDictionaryColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;

    void nextEncoded(ColumnVectorBatch& rowBatch,
                      uint64_t numValues,
                      char* notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };

  StringDictionaryColumnReader::StringDictionaryColumnReader
             (const Type& type,
              StripeStreams& stripe
              ): ColumnReader(type, stripe),
                 dictionary(new StringDictionary(stripe.getMemoryPool())) {
    RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
                                                .kind());
    uint32_t dictSize = stripe.getEncoding(columnId).dictionarysize();
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr) {
      throw ParseError("DATA stream not found in StringDictionaryColumn");
    }
    rle = createRleDecoder(std::move(stream), false, rleVersion, memoryPool);
    stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
    if (dictSize > 0 && stream == nullptr) {
      throw ParseError("LENGTH stream not found in StringDictionaryColumn");
    }
    std::unique_ptr<RleDecoder> lengthDecoder =
        createRleDecoder(std::move(stream), false, rleVersion, memoryPool);
    dictionary->dictionaryOffset.resize(dictSize + 1);
    int64_t* lengthArray = dictionary->dictionaryOffset.data();
    lengthDecoder->next(lengthArray + 1, dictSize, nullptr);
    lengthArray[0] = 0;
    for(uint32_t i = 1; i < dictSize + 1; ++i) {
      if (lengthArray[i] < 0) {
        throw ParseError("Negative dictionary entry length");
      }
      lengthArray[i] += lengthArray[i - 1];
    }
    int64_t blobSize = lengthArray[dictSize];
    dictionary->dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
    std::unique_ptr<SeekableInputStream> blobStream =
      stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
    if (blobSize > 0 && blobStream == nullptr) {
      throw ParseError(
          "DICTIONARY_DATA stream not found in StringDictionaryColumn");
    }
    readFully(dictionary->dictionaryBlob.data(), blobSize, blobStream.get());
  }

  StringDictionaryColumnReader::~StringDictionaryColumnReader() {
    // PASS
  }

  uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    rle->skip(numValues);
    return numValues;
  }

  void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch,
                                          uint64_t numValues,
                                          char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    // update the notNull from the parent class
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
    char *blob = dictionary->dictionaryBlob.data();
    int64_t *dictionaryOffsets = dictionary->dictionaryOffset.data();
    char **outputStarts = byteBatch.data.data();
    int64_t *outputLengths = byteBatch.length.data();
    rle->next(outputLengths, numValues, notNull);
    uint64_t dictionaryCount = dictionary->dictionaryOffset.size() - 1;
    if (notNull) {
      for(uint64_t i=0; i < numValues; ++i) {
        if (notNull[i]) {
          int64_t entry = outputLengths[i];
          if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount ) {
            throw ParseError("Entry index out of range in StringDictionaryColumn");
          }
          outputStarts[i] = blob + dictionaryOffsets[entry];
          outputLengths[i] = dictionaryOffsets[entry+1] -
            dictionaryOffsets[entry];
        }
      }
    } else {
      for(uint64_t i=0; i < numValues; ++i) {
        int64_t entry = outputLengths[i];
        if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
          throw ParseError("Entry index out of range in StringDictionaryColumn");
        }
        outputStarts[i] = blob + dictionaryOffsets[entry];
        outputLengths[i] = dictionaryOffsets[entry+1] -
          dictionaryOffsets[entry];
      }
    }
  }

  void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
                                                  uint64_t numValues,
                                                  char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    rowBatch.isEncoded = true;

    EncodedStringVectorBatch& batch = dynamic_cast<EncodedStringVectorBatch&>(rowBatch);
    batch.dictionary = this->dictionary;

    // Length buffer is reused to save dictionary entry ids
    rle->next(batch.index.data(), numValues, notNull);
  }

  void StringDictionaryColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle->seek(positions.at(columnId));
  }


  class StringDirectColumnReader: public ColumnReader {
  private:
    std::unique_ptr<RleDecoder> lengthRle;
    std::unique_ptr<SeekableInputStream> blobStream;
    const char *lastBuffer;
    size_t lastBufferLength;

    /**
     * Compute the total length of the values.
     * @param lengths the array of lengths
     * @param notNull the array of notNull flags
     * @param numValues the lengths of the arrays
     * @return the total number of bytes for the non-null values
     */
    size_t computeSize(const int64_t *lengths, const char *notNull,
                       uint64_t numValues);

  public:
    StringDirectColumnReader(const Type& type, StripeStreams& stipe);
    ~StringDirectColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };

  StringDirectColumnReader::StringDirectColumnReader
                 (const Type& type,
                  StripeStreams& stripe
                  ): ColumnReader(type, stripe) {
    RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
                                                .kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
    if (stream == nullptr)
      throw ParseError("LENGTH stream not found in StringDirectColumn");
    lengthRle = createRleDecoder(
        std::move(stream), false, rleVersion, memoryPool);
    blobStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (blobStream == nullptr)
      throw ParseError("DATA stream not found in StringDirectColumn");
    lastBuffer = nullptr;
    lastBufferLength = 0;
  }

  StringDirectColumnReader::~StringDirectColumnReader() {
    // PASS
  }

  uint64_t StringDirectColumnReader::skip(uint64_t numValues) {
    const size_t BUFFER_SIZE = 1024;
    numValues = ColumnReader::skip(numValues);
    int64_t buffer[BUFFER_SIZE];
    uint64_t done = 0;
    size_t totalBytes = 0;
    // read the lengths, so we know haw many bytes to skip
    while (done < numValues) {
      uint64_t step = std::min(BUFFER_SIZE,
                                    static_cast<size_t>(numValues - done));
      lengthRle->next(buffer, step, nullptr);
      totalBytes += computeSize(buffer, nullptr, step);
      done += step;
    }
    if (totalBytes <= lastBufferLength) {
      // subtract the needed bytes from the ones left over
      lastBufferLength -= totalBytes;
      lastBuffer += totalBytes;
    } else {
      // move the stream forward after accounting for the buffered bytes
      totalBytes -= lastBufferLength;
      const size_t cap = static_cast<size_t>(std::numeric_limits<int>::max());
      while (totalBytes != 0) {
        size_t step = totalBytes > cap ? cap : totalBytes;
        blobStream->Skip(static_cast<int>(step));
        totalBytes -= step;
      }
      lastBufferLength = 0;
      lastBuffer = nullptr;
    }
    return numValues;
  }

  size_t StringDirectColumnReader::computeSize(const int64_t* lengths,
                                               const char* notNull,
                                               uint64_t numValues) {
    size_t totalLength = 0;
    if (notNull) {
      for(size_t i=0; i < numValues; ++i) {
        if (notNull[i]) {
          totalLength += static_cast<size_t>(lengths[i]);
        }
      }
    } else {
      for(size_t i=0; i < numValues; ++i) {
        totalLength += static_cast<size_t>(lengths[i]);
      }
    }
    return totalLength;
  }

  void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch,
                                      uint64_t numValues,
                                      char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    // update the notNull from the parent class
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
    char **startPtr = byteBatch.data.data();
    int64_t *lengthPtr = byteBatch.length.data();

    // read the length vector
    lengthRle->next(lengthPtr, numValues, notNull);

    // figure out the total length of data we need from the blob stream
    const size_t totalLength = computeSize(lengthPtr, notNull, numValues);

    // Load data from the blob stream into our buffer until we have enough
    // to get the rest directly out of the stream's buffer.
    size_t bytesBuffered = 0;
    byteBatch.blob.resize(totalLength);
    char *ptr= byteBatch.blob.data();
    while (bytesBuffered + lastBufferLength < totalLength) {
      memcpy(ptr + bytesBuffered, lastBuffer, lastBufferLength);
      bytesBuffered += lastBufferLength;
      const void* readBuffer;
      int readLength;
      if (!blobStream->Next(&readBuffer, &readLength)) {
        throw ParseError("failed to read in StringDirectColumnReader.next");
      }
      lastBuffer = static_cast<const char*>(readBuffer);
      lastBufferLength = static_cast<size_t>(readLength);
    }

    if (bytesBuffered < totalLength) {
      size_t moreBytes = totalLength - bytesBuffered;
      memcpy(ptr + bytesBuffered, lastBuffer, moreBytes);
      lastBuffer += moreBytes;
      lastBufferLength -= moreBytes;
    }

    size_t filledSlots = 0;
    ptr = byteBatch.blob.data();
    if (notNull) {
      while (filledSlots < numValues) {
        if (notNull[filledSlots]) {
          startPtr[filledSlots] = const_cast<char*>(ptr);
          ptr += lengthPtr[filledSlots];
        }
        filledSlots += 1;
      }
    } else {
      while (filledSlots < numValues) {
        startPtr[filledSlots] = const_cast<char*>(ptr);
        ptr += lengthPtr[filledSlots];
        filledSlots += 1;
      }
    }
  }

  void StringDirectColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    blobStream->seek(positions.at(columnId));
    lengthRle->seek(positions.at(columnId));
  }

  class StructColumnReader: public ColumnReader {
  private:
    std::vector<std::unique_ptr<ColumnReader>> children;

  public:
    StructColumnReader(const Type& type, StripeStreams& stipe);

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;

    void nextEncoded(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;

  private:
    template<bool encoded>
    void nextInternal(ColumnVectorBatch& rowBatch,
                      uint64_t numValues,
                      char *notNull);
  };

  StructColumnReader::StructColumnReader(const Type& type,
                                         StripeStreams& stripe
                                         ): ColumnReader(type, stripe) {
    // count the number of selected sub-columns
    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
    switch (static_cast<int64_t>(stripe.getEncoding(columnId).kind())) {
    case proto::ColumnEncoding_Kind_DIRECT:
      for(unsigned int i=0; i < type.getSubtypeCount(); ++i) {
        const Type& child = *type.getSubtype(i);
        if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) {
          children.push_back(buildReader(child, stripe));
        }
      }
      break;
    case proto::ColumnEncoding_Kind_DIRECT_V2:
    case proto::ColumnEncoding_Kind_DICTIONARY:
    case proto::ColumnEncoding_Kind_DICTIONARY_V2:
    default:
      throw ParseError("Unknown encoding for StructColumnReader");
    }
  }

  uint64_t StructColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    for(auto& ptr : children) {
      ptr->skip(numValues);
    }
    return numValues;
  }

  void StructColumnReader::next(ColumnVectorBatch& rowBatch,
                                uint64_t numValues,
                                char *notNull) {
    nextInternal<false>(rowBatch, numValues, notNull);
  }

  void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
                                uint64_t numValues,
                                char *notNull) {
    nextInternal<true>(rowBatch, numValues, notNull);
  }

  template<bool encoded>
  void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
                                uint64_t numValues,
                                char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    uint64_t i=0;
    notNull = rowBatch.hasNulls? rowBatch.notNull.data() : nullptr;
    for(auto iter = children.begin(); iter != children.end(); ++iter, ++i) {
      if (encoded) {
        (*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
                    numValues, notNull);
      } else {
        (*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
                    numValues, notNull);
      }
    }
  }

  void StructColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);

    for(auto& ptr : children) {
      ptr->seekToRowGroup(positions);
    }
  }

  class ListColumnReader: public ColumnReader {
  private:
    std::unique_ptr<ColumnReader> child;
    std::unique_ptr<RleDecoder> rle;

  public:
    ListColumnReader(const Type& type, StripeStreams& stipe);
    ~ListColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;

    void nextEncoded(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;

  private:
    template<bool encoded>
    void nextInternal(ColumnVectorBatch& rowBatch,
                      uint64_t numValues,
                      char *notNull);
  };

  ListColumnReader::ListColumnReader(const Type& type,
                                     StripeStreams& stripe
                                     ): ColumnReader(type, stripe) {
    // count the number of selected sub-columns
    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
    if (stream == nullptr)
      throw ParseError("LENGTH stream not found in List column");
    rle = createRleDecoder(std::move(stream), false, vers, memoryPool);
    const Type& childType = *type.getSubtype(0);
    if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) {
      child = buildReader(childType, stripe);
    }
  }

  ListColumnReader::~ListColumnReader() {
    // PASS
  }

  uint64_t ListColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    ColumnReader *childReader = child.get();
    if (childReader) {
      const uint64_t BUFFER_SIZE = 1024;
      int64_t buffer[BUFFER_SIZE];
      uint64_t childrenElements = 0;
      uint64_t lengthsRead = 0;
      while (lengthsRead < numValues) {
        uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
        rle->next(buffer, chunk, nullptr);
        for(size_t i=0; i < chunk; ++i) {
          childrenElements += static_cast<size_t>(buffer[i]);
        }
        lengthsRead += chunk;
      }
      childReader->skip(childrenElements);
    } else {
      rle->skip(numValues);
    }
    return numValues;
  }

  void ListColumnReader::next(ColumnVectorBatch& rowBatch,
                                      uint64_t numValues,
                                      char *notNull) {
    nextInternal<false>(rowBatch, numValues, notNull);
  }

  void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
                                      uint64_t numValues,
                                      char *notNull) {
    nextInternal<true>(rowBatch, numValues, notNull);
  }

  template<bool encoded>
  void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
                              uint64_t numValues,
                              char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    ListVectorBatch &listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
    int64_t* offsets = listBatch.offsets.data();
    notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr;
    rle->next(offsets, numValues, notNull);
    uint64_t totalChildren = 0;
    if (notNull) {
      for(size_t i=0; i < numValues; ++i) {
        if (notNull[i]) {
          uint64_t tmp = static_cast<uint64_t>(offsets[i]);
          offsets[i] = static_cast<int64_t>(totalChildren);
          totalChildren += tmp;
        } else {
          offsets[i] = static_cast<int64_t>(totalChildren);
        }
      }
    } else {
      for(size_t i=0; i < numValues; ++i) {
        uint64_t tmp = static_cast<uint64_t>(offsets[i]);
        offsets[i] = static_cast<int64_t>(totalChildren);
        totalChildren += tmp;
      }
    }
    offsets[numValues] = static_cast<int64_t>(totalChildren);
    ColumnReader *childReader = child.get();
    if (childReader) {
      if (encoded) {
        childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, nullptr);
      } else {
        childReader->next(*(listBatch.elements.get()), totalChildren, nullptr);
      }
    }
  }

  void ListColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle->seek(positions.at(columnId));
    if (child.get()) {
      child->seekToRowGroup(positions);
    }
  }

  class MapColumnReader: public ColumnReader {
  private:
    std::unique_ptr<ColumnReader> keyReader;
    std::unique_ptr<ColumnReader> elementReader;
    std::unique_ptr<RleDecoder> rle;

  public:
    MapColumnReader(const Type& type, StripeStreams& stipe);
    ~MapColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;

    void nextEncoded(ColumnVectorBatch& rowBatch,
                     uint64_t numValues,
                     char *notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;

  private:
    template<bool encoded>
    void nextInternal(ColumnVectorBatch& rowBatch,
                      uint64_t numValues,
                      char *notNull);
  };

  MapColumnReader::MapColumnReader(const Type& type,
                                   StripeStreams& stripe
                                   ): ColumnReader(type, stripe) {
    // Determine if the key and/or value columns are selected
    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
    if (stream == nullptr)
      throw ParseError("LENGTH stream not found in Map column");
    rle = createRleDecoder(std::move(stream), false, vers, memoryPool);
    const Type& keyType = *type.getSubtype(0);
    if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) {
      keyReader = buildReader(keyType, stripe);
    }
    const Type& elementType = *type.getSubtype(1);
    if (selectedColumns[static_cast<uint64_t>(elementType.getColumnId())]) {
      elementReader = buildReader(elementType, stripe);
    }
  }

  MapColumnReader::~MapColumnReader() {
    // PASS
  }

  uint64_t MapColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    ColumnReader *rawKeyReader = keyReader.get();
    ColumnReader *rawElementReader = elementReader.get();
    if (rawKeyReader || rawElementReader) {
      const uint64_t BUFFER_SIZE = 1024;
      int64_t buffer[BUFFER_SIZE];
      uint64_t childrenElements = 0;
      uint64_t lengthsRead = 0;
      while (lengthsRead < numValues) {
        uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
        rle->next(buffer, chunk, nullptr);
        for(size_t i=0; i < chunk; ++i) {
          childrenElements += static_cast<size_t>(buffer[i]);
        }
        lengthsRead += chunk;
      }
      if (rawKeyReader) {
        rawKeyReader->skip(childrenElements);
      }
      if (rawElementReader) {
        rawElementReader->skip(childrenElements);
      }
    } else {
      rle->skip(numValues);
    }
    return numValues;
  }

  void MapColumnReader::next(ColumnVectorBatch& rowBatch,
                             uint64_t numValues,
                             char *notNull)
  {
    nextInternal<false>(rowBatch, numValues, notNull);
  }

  void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
                             uint64_t numValues,
                             char *notNull)
  {
    nextInternal<true>(rowBatch, numValues, notNull);
  }

  template<bool encoded>
  void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
                             uint64_t numValues,
                             char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    MapVectorBatch &mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
    int64_t* offsets = mapBatch.offsets.data();
    notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr;
    rle->next(offsets, numValues, notNull);
    uint64_t totalChildren = 0;
    if (notNull) {
      for(size_t i=0; i < numValues; ++i) {
        if (notNull[i]) {
          uint64_t tmp = static_cast<uint64_t>(offsets[i]);
          offsets[i] = static_cast<int64_t>(totalChildren);
          totalChildren += tmp;
        } else {
          offsets[i] = static_cast<int64_t>(totalChildren);
        }
      }
    } else {
      for(size_t i=0; i < numValues; ++i) {
        uint64_t tmp = static_cast<uint64_t>(offsets[i]);
        offsets[i] = static_cast<int64_t>(totalChildren);
        totalChildren += tmp;
      }
    }
    offsets[numValues] = static_cast<int64_t>(totalChildren);
    ColumnReader *rawKeyReader = keyReader.get();
    if (rawKeyReader) {
      if (encoded) {
        rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren, nullptr);
      } else {
        rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr);
      }
    }
    ColumnReader *rawElementReader = elementReader.get();
    if (rawElementReader) {
      if (encoded) {
        rawElementReader->nextEncoded(*(mapBatch.elements.get()), totalChildren, nullptr);
      } else {
        rawElementReader->next(*(mapBatch.elements.get()), totalChildren, nullptr);
      }
    }
  }

  void MapColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle->seek(positions.at(columnId));
    if (keyReader.get()) {
      keyReader->seekToRowGroup(positions);
    }
    if (elementReader.get()) {
      elementReader->seekToRowGroup(positions);
    }
  }

  class UnionColumnReader: public ColumnReader {
  private:
    std::unique_ptr<ByteRleDecoder> rle;
    std::vector<std::unique_ptr<ColumnReader>> childrenReader;
    std::vector<int64_t> childrenCounts;
    uint64_t numChildren;

  public:
    UnionColumnReader(const Type& type, StripeStreams& stipe);

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;

    void nextEncoded(ColumnVectorBatch& rowBatch,
                     uint64_t numValues,
                     char *notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;

  private:
    template<bool encoded>
    void nextInternal(ColumnVectorBatch& rowBatch,
                      uint64_t numValues,
                      char *notNull);
  };

  UnionColumnReader::UnionColumnReader(const Type& type,
                                       StripeStreams& stripe
                                       ): ColumnReader(type, stripe) {
    numChildren = type.getSubtypeCount();
    childrenReader.resize(numChildren);
    childrenCounts.resize(numChildren);

    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr)
      throw ParseError("LENGTH stream not found in Union column");
    rle = createByteRleDecoder(std::move(stream));
    // figure out which types are selected
    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
    for(unsigned int i=0; i < numChildren; ++i) {
      const Type &child = *type.getSubtype(i);
      if (selectedColumns[static_cast<size_t>(child.getColumnId())]) {
        childrenReader[i] = buildReader(child, stripe);
      }
    }
  }

  uint64_t UnionColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    const uint64_t BUFFER_SIZE = 1024;
    char buffer[BUFFER_SIZE];
    uint64_t lengthsRead = 0;
    int64_t *counts = childrenCounts.data();
    memset(counts, 0, sizeof(int64_t) * numChildren);
    while (lengthsRead < numValues) {
      uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
      rle->next(buffer, chunk, nullptr);
      for(size_t i=0; i < chunk; ++i) {
        counts[static_cast<size_t>(buffer[i])] += 1;
      }
      lengthsRead += chunk;
    }
    for(size_t i=0; i < numChildren; ++i) {
      if (counts[i] != 0 && childrenReader[i] != nullptr) {
        childrenReader[i]->skip(static_cast<uint64_t>(counts[i]));
      }
    }
    return numValues;
  }

  void UnionColumnReader::next(ColumnVectorBatch& rowBatch,
                              uint64_t numValues,
                              char *notNull) {
    nextInternal<false>(rowBatch, numValues, notNull);
  }

  void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
                              uint64_t numValues,
                              char *notNull) {
    nextInternal<true>(rowBatch, numValues, notNull);
  }

  template<bool encoded>
  void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
                               uint64_t numValues,
                               char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    UnionVectorBatch &unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
    uint64_t* offsets = unionBatch.offsets.data();
    int64_t* counts = childrenCounts.data();
    memset(counts, 0, sizeof(int64_t) * numChildren);
    unsigned char* tags = unionBatch.tags.data();
    notNull = unionBatch.hasNulls ? unionBatch.notNull.data() : nullptr;
    rle->next(reinterpret_cast<char *>(tags), numValues, notNull);
    // set the offsets for each row
    if (notNull) {
      for(size_t i=0; i < numValues; ++i) {
        if (notNull[i]) {
          offsets[i] =
            static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
        }
      }
    } else {
      for(size_t i=0; i < numValues; ++i) {
        offsets[i] =
          static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
      }
    }
    // read the right number of each child column
    for(size_t i=0; i < numChildren; ++i) {
      if (childrenReader[i] != nullptr) {
        if (encoded) {
          childrenReader[i]->nextEncoded(*(unionBatch.children[i]),
                                  static_cast<uint64_t>(counts[i]), nullptr);
        } else {
          childrenReader[i]->next(*(unionBatch.children[i]),
                                  static_cast<uint64_t>(counts[i]), nullptr);
        }
      }
    }
  }

  void UnionColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle->seek(positions.at(columnId));
    for(size_t i = 0; i < numChildren; ++i) {
      if (childrenReader[i] != nullptr) {
        childrenReader[i]->seekToRowGroup(positions);
      }
    }
  }

  /**
   * Destructively convert the number from zigzag encoding to the
   * natural signed representation.
   */
  void unZigZagInt128(Int128& value) {
    bool needsNegate = value.getLowBits() & 1;
    value >>= 1;
    if (needsNegate) {
      value.negate();
      value -= 1;
    }
  }

  class Decimal64ColumnReader: public ColumnReader {
  public:
    static const uint32_t MAX_PRECISION_64 = 18;
    static const uint32_t MAX_PRECISION_128 = 38;
    static const int64_t POWERS_OF_TEN[MAX_PRECISION_64 + 1];

  protected:
    std::unique_ptr<SeekableInputStream> valueStream;
    int32_t precision;
    int32_t scale;
    const char* buffer;
    const char* bufferEnd;

    std::unique_ptr<RleDecoder> scaleDecoder;

    /**
     * Read the valueStream for more bytes.
     */
    void readBuffer() {
      while (buffer == bufferEnd) {
        int length;
        if (!valueStream->Next(reinterpret_cast<const void**>(&buffer),
                               &length)) {
          throw ParseError("Read past end of stream in Decimal64ColumnReader "+
                           valueStream->getName());
        }
        bufferEnd = buffer + length;
      }
    }

    void readInt64(int64_t& value, int32_t currentScale) {
      value = 0;
      size_t offset = 0;
      while (true) {
        readBuffer();
        unsigned char ch = static_cast<unsigned char>(*(buffer++));
        value |= static_cast<uint64_t>(ch & 0x7f) << offset;
        offset += 7;
        if (!(ch & 0x80)) {
          break;
        }
      }
      value = unZigZag(static_cast<uint64_t>(value));
      if (scale > currentScale &&
          static_cast<uint64_t>(scale - currentScale) <= MAX_PRECISION_64) {
        value *= POWERS_OF_TEN[scale - currentScale];
      } else if (scale < currentScale &&
          static_cast<uint64_t>(currentScale - scale) <= MAX_PRECISION_64) {
        value /= POWERS_OF_TEN[currentScale - scale];
      } else if (scale != currentScale) {
        throw ParseError("Decimal scale out of range");
      }
    }

  public:
    Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
    ~Decimal64ColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;

    void seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };
  const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
  const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
  const int64_t Decimal64ColumnReader::POWERS_OF_TEN[MAX_PRECISION_64 + 1]=
    {1,
     10,
     100,
     1000,
     10000,
     100000,
     1000000,
     10000000,
     100000000,
     1000000000,
     10000000000,
     100000000000,
     1000000000000,
     10000000000000,
     100000000000000,
     1000000000000000,
     10000000000000000,
     100000000000000000,
     1000000000000000000};

  Decimal64ColumnReader::Decimal64ColumnReader(const Type& type,
                                               StripeStreams& stripe
                                               ): ColumnReader(type, stripe) {
    scale = static_cast<int32_t>(type.getScale());
    precision = static_cast<int32_t>(type.getPrecision());
    valueStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (valueStream == nullptr)
      throw ParseError("DATA stream not found in Decimal64Column");
    buffer = nullptr;
    bufferEnd = nullptr;
    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
    if (stream == nullptr)
      throw ParseError("SECONDARY stream not found in Decimal64Column");
    scaleDecoder = createRleDecoder(std::move(stream), true, vers, memoryPool);
  }

  Decimal64ColumnReader::~Decimal64ColumnReader() {
    // PASS
  }

  uint64_t Decimal64ColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    uint64_t skipped = 0;
    while (skipped < numValues) {
      readBuffer();
      if (!(0x80 & *(buffer++))) {
        skipped += 1;
      }
    }
    scaleDecoder->skip(numValues);
    return numValues;
  }

  void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch,
                                   uint64_t numValues,
                                   char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    Decimal64VectorBatch &batch =
      dynamic_cast<Decimal64VectorBatch&>(rowBatch);
    int64_t* values = batch.values.data();
    // read the next group of scales
    int64_t* scaleBuffer = batch.readScales.data();
    scaleDecoder->next(scaleBuffer, numValues, notNull);
    batch.precision = precision;
    batch.scale = scale;
    if (notNull) {
      for(size_t i=0; i < numValues; ++i) {
        if (notNull[i]) {
          readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
        }
      }
    } else {
      for(size_t i=0; i < numValues; ++i) {
        readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
      }
    }
  }

  void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) {
    if (scale > currentScale) {
      while(scale > currentScale) {
        uint32_t scaleAdjust =
          std::min(Decimal64ColumnReader::MAX_PRECISION_64,
                   scale - currentScale);
        value *= Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust];
        currentScale += scaleAdjust;
      }
    } else if (scale < currentScale) {
      Int128 remainder;
      while(currentScale > scale) {
        uint32_t scaleAdjust =
          std::min(Decimal64ColumnReader::MAX_PRECISION_64,
                   currentScale - scale);
        value = value.divide(Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust],
                             remainder);
        currentScale -= scaleAdjust;
      }
    }
  }

  void Decimal64ColumnReader::seekToRowGroup(
    std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    valueStream->seek(positions.at(columnId));
    scaleDecoder->seek(positions.at(columnId));
  }

  class Decimal128ColumnReader: public Decimal64ColumnReader {
  public:
    Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
    ~Decimal128ColumnReader() override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;

  private:
    void readInt128(Int128& value, int32_t currentScale) {
      value = 0;
      Int128 work;
      uint32_t offset = 0;
      while (true) {
        readBuffer();
        unsigned char ch = static_cast<unsigned char>(*(buffer++));
        work = ch & 0x7f;
        work <<= offset;
        value |=  work;
        offset += 7;
        if (!(ch & 0x80)) {
          break;
        }
      }
      unZigZagInt128(value);
      scaleInt128(value, static_cast<uint32_t>(scale),
                  static_cast<uint32_t>(currentScale));
    }
  };

  Decimal128ColumnReader::Decimal128ColumnReader
                (const Type& type,
                 StripeStreams& stripe
                 ): Decimal64ColumnReader(type, stripe) {
    // PASS
  }

  Decimal128ColumnReader::~Decimal128ColumnReader() {
    // PASS
  }

  void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch,
                                   uint64_t numValues,
                                   char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    Decimal128VectorBatch &batch =
      dynamic_cast<Decimal128VectorBatch&>(rowBatch);
    Int128* values = batch.values.data();
    // read the next group of scales
    int64_t* scaleBuffer = batch.readScales.data();
    scaleDecoder->next(scaleBuffer, numValues, notNull);
    batch.precision = precision;
    batch.scale = scale;
    if (notNull) {
      for(size_t i=0; i < numValues; ++i) {
        if (notNull[i]) {
          readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
        }
      }
    } else {
      for(size_t i=0; i < numValues; ++i) {
        readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
      }
    }
  }

  class DecimalHive11ColumnReader: public Decimal64ColumnReader {
  private:
    bool throwOnOverflow;
    std::ostream* errorStream;

    /**
     * Read an Int128 from the stream and correct it to the desired scale.
     */
    bool readInt128(Int128& value, int32_t currentScale) {
      // -/+ 99999999999999999999999999999999999999
      static const Int128 MIN_VALUE(-0x4b3b4ca85a86c47b, 0xf675ddc000000001);
      static const Int128 MAX_VALUE( 0x4b3b4ca85a86c47a, 0x098a223fffffffff);

      value = 0;
      Int128 work;
      uint32_t offset = 0;
      bool result = true;
      while (true) {
        readBuffer();
        unsigned char ch = static_cast<unsigned char>(*(buffer++));
        work = ch & 0x7f;
        // If we have read more than 128 bits, we flag the error, but keep
        // reading bytes so the stream isn't thrown off.
        if (offset > 128 || (offset == 126 && work > 3)) {
          result = false;
        }
        work <<= offset;
        value |=  work;
        offset += 7;
        if (!(ch & 0x80)) {
          break;
        }
      }

      if (!result) {
        return result;
      }
      unZigZagInt128(value);
      scaleInt128(value, static_cast<uint32_t>(scale),
                  static_cast<uint32_t>(currentScale));
      return value >= MIN_VALUE && value <= MAX_VALUE;
    }

  public:
    DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
    ~DecimalHive11ColumnReader() override;

    void next(ColumnVectorBatch& rowBatch,
              uint64_t numValues,
              char *notNull) override;
  };

  DecimalHive11ColumnReader::DecimalHive11ColumnReader
                    (const Type& type,
                     StripeStreams& stripe
                     ): Decimal64ColumnReader(type, stripe) {
    scale = stripe.getForcedScaleOnHive11Decimal();
    throwOnOverflow = stripe.getThrowOnHive11DecimalOverflow();
    errorStream = stripe.getErrorStream();
  }

  DecimalHive11ColumnReader::~DecimalHive11ColumnReader() {
    // PASS
  }

  void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch,
                                       uint64_t numValues,
                                       char *notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    Decimal128VectorBatch &batch =
      dynamic_cast<Decimal128VectorBatch&>(rowBatch);
    Int128* values = batch.values.data();
    // read the next group of scales
    int64_t* scaleBuffer = batch.readScales.data();

    scaleDecoder->next(scaleBuffer, numValues, notNull);

    batch.precision = precision;
    batch.scale = scale;
    if (notNull) {
      for(size_t i=0; i < numValues; ++i) {
        if (notNull[i]) {
          if (!readInt128(values[i],
                          static_cast<int32_t>(scaleBuffer[i]))) {
            if (throwOnOverflow) {
              throw ParseError("Hive 0.11 decimal was more than 38 digits.");
            } else {
              *errorStream << "Warning: "
                           << "Hive 0.11 decimal with more than 38 digits "
                           << "replaced by NULL.\n";
              notNull[i] = false;
            }
          }
        }
      }
    } else {
      for(size_t i=0; i < numValues; ++i) {
        if (!readInt128(values[i],
                        static_cast<int32_t>(scaleBuffer[i]))) {
          if (throwOnOverflow) {
            throw ParseError("Hive 0.11 decimal was more than 38 digits.");
          } else {
            *errorStream << "Warning: "
                         << "Hive 0.11 decimal with more than 38 digits "
                         << "replaced by NULL.\n";
            batch.hasNulls = true;
            batch.notNull[i] = false;
          }
        }
      }
    }
  }

  /**
   * Create a reader for the given stripe.
   */
  std::unique_ptr<ColumnReader> buildReader(const Type& type,
                                            StripeStreams& stripe) {
    switch (static_cast<int64_t>(type.getKind())) {
    case DATE:
    case INT:
    case LONG:
    case SHORT:
      return std::unique_ptr<ColumnReader>(
          new IntegerColumnReader(type, stripe));
    case BINARY:
    case CHAR:
    case STRING:
    case VARCHAR:
      switch (static_cast<int64_t>(stripe.getEncoding(type.getColumnId()).kind())){
      case proto::ColumnEncoding_Kind_DICTIONARY:
      case proto::ColumnEncoding_Kind_DICTIONARY_V2:
        return std::unique_ptr<ColumnReader>(
            new StringDictionaryColumnReader(type, stripe));
      case proto::ColumnEncoding_Kind_DIRECT:
      case proto::ColumnEncoding_Kind_DIRECT_V2:
        return std::unique_ptr<ColumnReader>(
            new StringDirectColumnReader(type, stripe));
      default:
        throw NotImplementedYet("buildReader unhandled string encoding");
      }

    case BOOLEAN:
      return std::unique_ptr<ColumnReader>(
          new BooleanColumnReader(type, stripe));

    case BYTE:
      return std::unique_ptr<ColumnReader>(
          new ByteColumnReader(type, stripe));

    case LIST:
      return std::unique_ptr<ColumnReader>(
          new ListColumnReader(type, stripe));

    case MAP:
      return std::unique_ptr<ColumnReader>(
          new MapColumnReader(type, stripe));

    case UNION:
      return std::unique_ptr<ColumnReader>(
          new UnionColumnReader(type, stripe));

    case STRUCT:
      return std::unique_ptr<ColumnReader>(
          new StructColumnReader(type, stripe));

    case FLOAT:
    case DOUBLE:
      return std::unique_ptr<ColumnReader>(
          new DoubleColumnReader(type, stripe));

    case TIMESTAMP:
      return std::unique_ptr<ColumnReader>
        (new TimestampColumnReader(type, stripe));

    case DECIMAL:
      // is this a Hive 0.11 or 0.12 file?
      if (type.getPrecision() == 0) {
        return std::unique_ptr<ColumnReader>
          (new DecimalHive11ColumnReader(type, stripe));

      // can we represent the values using int64_t?
      } else if (type.getPrecision() <=
                 Decimal64ColumnReader::MAX_PRECISION_64) {
        return std::unique_ptr<ColumnReader>
          (new Decimal64ColumnReader(type, stripe));

      // otherwise we use the Int128 implementation
      } else {
        return std::unique_ptr<ColumnReader>
          (new Decimal128ColumnReader(type, stripe));
      }

    default:
      throw NotImplementedYet("buildReader unhandled type");
    }
  }

}
