// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "util/compress.h"

#include <bzlib.h>
#include <zlib.h>
#include <boost/crc.hpp>
#include <gutil/strings/substitute.h>
#undef DISALLOW_COPY_AND_ASSIGN // Snappy redefines this.
#include <lz4.h>
#include <snappy.h>
#include <zstd.h>
#include <zstd_errors.h>

#include "exec/read-write-util.h"
#include "runtime/mem-pool.h"

#include "common/names.h"

using boost::crc_32_type;
using namespace impala;
using namespace strings;

GzipCompressor::GzipCompressor(Format format, MemPool* mem_pool, bool reuse_buffer)
  : Codec(mem_pool, reuse_buffer),
    format_(format) {
  bzero(&stream_, sizeof(stream_));
}

GzipCompressor::~GzipCompressor() {
  (void)deflateEnd(&stream_);
}

Status GzipCompressor::Init() {
  int ret;
  // Initialize to run specified format
  int window_bits = WINDOW_BITS;
  if (format_ == DEFLATE) {
    window_bits = -window_bits;
  } else if (format_ == GZIP) {
    window_bits += GZIP_CODEC;
  }
  if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
                          window_bits, 9, Z_DEFAULT_STRATEGY )) != Z_OK) {
    return Status("zlib deflateInit failed: " +  string(stream_.msg));
  }

  return Status::OK();
}

int64_t GzipCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
#if !defined ZLIB_VERNUM || ZLIB_VERNUM <= 0x1230
  if (UNLIKELY(input_len == 0 && format_ == GZIP)) {
    // zlib version 1.2.3 has a bug in deflateBound() that causes it to return too low a
    // bound for this case. Hardcode the value returned in zlib version 1.2.3.1+.
    return 23;
  }
  // There is a known issue that zlib 1.2.3 does not include the size of the
  // gzip wrapper. This is has been fixed in zlib 1.2.3.1:
  // http://www.zlib.net/ChangeLog.txt
  // "Take into account wrapper variations in deflateBound()"
  //
  // Mark, maintainer of zlib, has stated that 12 needs to be added to result for gzip
  // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854
  // To have a safe upper bound for "wrapper variations", we add 32 to estimate
  return deflateBound(&stream_, input_len) + 32;
#else
  return deflateBound(&stream_, input_len);
#endif
}

Status GzipCompressor::Compress(int64_t input_length, const uint8_t* input,
    int64_t* output_length, uint8_t* output) {
  DCHECK_GE(*output_length, MaxOutputLen(input_length));
  stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
  stream_.avail_in = input_length;
  stream_.next_out = reinterpret_cast<Bytef*>(output);
  stream_.avail_out = *output_length;

  int64_t ret = 0;
  if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
    if (ret == Z_OK) {
      // will return Z_OK (and stream_.msg NOT set) if stream_.avail_out is too small
      return Status(Substitute(
          "zlib deflate failed: output buffer ($0) is too small.", output_length));
    }
    stringstream ss;
    ss << "zlib deflate failed: " << stream_.msg;
    return Status(ss.str());
  }

  *output_length = *output_length - stream_.avail_out;

  if (deflateReset(&stream_) != Z_OK) {
    return Status("zlib deflateReset failed: " + string(stream_.msg));
  }
  return Status::OK();
}

Status GzipCompressor::ProcessBlock(bool output_preallocated,
    int64_t input_length, const uint8_t* input, int64_t* output_length,
    uint8_t** output) {
  DCHECK_GE(input_length, 0);
  DCHECK(!output_preallocated || (output_preallocated && *output_length > 0));
  int64_t max_compressed_len = MaxOutputLen(input_length);
  if (!output_preallocated) {
    if (!reuse_buffer_ || buffer_length_ < max_compressed_len || out_buffer_ == nullptr) {
      DCHECK(memory_pool_ != nullptr) << "Can't allocate without passing in a mem pool";
      buffer_length_ = max_compressed_len;
      out_buffer_ = memory_pool_->Allocate(buffer_length_);
    }
    *output = out_buffer_;
    *output_length = buffer_length_;
  } else if (*output_length < max_compressed_len) {
    return Status("GzipCompressor::ProcessBlock: output length too small");
  }

  RETURN_IF_ERROR(Compress(input_length, input, output_length, *output));
  return Status::OK();
}

BzipCompressor::BzipCompressor(MemPool* mem_pool, bool reuse_buffer)
  : Codec(mem_pool, reuse_buffer) {
}

int64_t BzipCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
  // TODO: is it possible to get a bound with bzip.
  return -1;
}

Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
    const uint8_t* input, int64_t *output_length, uint8_t** output) {
  // The bz2 library does not allow input to be nullptr, even when input_length is 0. This
  // should be OK because we do not write any file formats that support bzip compression.
  DCHECK(input != nullptr);
  DCHECK_GE(input_length, 0);

  if (output_preallocated) {
    buffer_length_ = *output_length;
    out_buffer_ = *output;
  } else if (!reuse_buffer_ || out_buffer_ == nullptr) {
    // guess that we will need no more the input length.
    buffer_length_ = input_length;
    out_buffer_ = temp_memory_pool_->Allocate(buffer_length_);
  }

  unsigned int outlen = static_cast<unsigned int>(buffer_length_);
  int ret = BZ_OUTBUFF_FULL;
  while (ret == BZ_OUTBUFF_FULL) {
    if (out_buffer_ == nullptr) {
      DCHECK(!output_preallocated);
      temp_memory_pool_->Clear();
      buffer_length_ = buffer_length_ * 2;
      out_buffer_ = temp_memory_pool_->Allocate(buffer_length_);
    }
    outlen = static_cast<unsigned int>(buffer_length_);
    if ((ret = BZ2_bzBuffToBuffCompress(reinterpret_cast<char*>(out_buffer_), &outlen,
        const_cast<char*>(reinterpret_cast<const char*>(input)),
        static_cast<unsigned int>(input_length), 5, 2, 0)) == BZ_OUTBUFF_FULL) {
      if (output_preallocated) {
        return Status("Too small buffer passed to BzipCompressor");
      }
      out_buffer_ = nullptr;
    }
  }
  if (ret !=  BZ_OK) {
    stringstream ss;
    ss << "bzlib BZ2_bzBuffToBuffCompressor failed: " << ret;
    return Status(ss.str());

  }

  *output = out_buffer_;
  *output_length = outlen;
  memory_pool_->AcquireData(temp_memory_pool_.get(), false);
  return Status::OK();
}

// Currently this is only use for testing of the decompressor.
SnappyBlockCompressor::SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer)
  : Codec(mem_pool, reuse_buffer) {
}

int64_t SnappyBlockCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
  // TODO: is it possible to get a bound on this?
  return -1;
}

Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
    int64_t input_length, const uint8_t* input, int64_t *output_length,
    uint8_t** output) {
  DCHECK_GE(input_length, 0);
  // Hadoop uses a block compression scheme on top of snappy. The layout is as follows:
  // - size of the entire decompressed data (4 bytes)
  // - size of the 1st compressed block (4 bytes)
  // - 1st compressed block
  // - size of the 2nd compressed block (4 bytes)
  // - 2nd compressed block
  // ...
  // For testing purposes we are going to generate two blocks if input_length >= 4K.
  vector<int64_t> block_sizes;
  size_t length;
  if (input_length == 0) {
    length = sizeof (int32_t);
  } else if (input_length < 4 * 1024) {
    block_sizes.push_back(input_length);
    length = snappy::MaxCompressedLength(block_sizes[0]) + 2 * sizeof (int32_t);
  } else {
    block_sizes.push_back(input_length / 2);
    block_sizes.push_back(input_length - block_sizes[0]);
    length = snappy::MaxCompressedLength(block_sizes[0]) +
        snappy::MaxCompressedLength(block_sizes[1]) + 3 * sizeof (int32_t);
  }
  DCHECK(!output_preallocated || length <= *output_length);

  if (output_preallocated) {
    buffer_length_ = *output_length;
    out_buffer_ = *output;
  } else if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < length) {
    buffer_length_ = length;
    out_buffer_ = memory_pool_->Allocate(buffer_length_);
  }

  uint8_t* outp = out_buffer_;
  ReadWriteUtil::PutInt(outp, static_cast<uint32_t>(input_length));
  outp += sizeof (int32_t);
  for (int64_t block_size: block_sizes) {
    // TODO: should this be a while or a do-while loop? Check what Hadoop does.
    // Point at the spot to store the compressed size.
    uint8_t* sizep = outp;
    outp += sizeof (int32_t);
    size_t size;
    snappy::RawCompress(reinterpret_cast<const char*>(input),
        static_cast<size_t>(block_size), reinterpret_cast<char*>(outp), &size);

    ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size));
    input += block_size;
    outp += size;
    DCHECK_LE(outp - out_buffer_, length);
  }

  *output = out_buffer_;
  *output_length = outp - out_buffer_;
  return Status::OK();
}

SnappyCompressor::SnappyCompressor(MemPool* mem_pool, bool reuse_buffer)
  : Codec(mem_pool, reuse_buffer) {
}

int64_t SnappyCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
  return snappy::MaxCompressedLength(input_len);
}

Status SnappyCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
    const uint8_t* input, int64_t* output_length, uint8_t** output) {
  DCHECK_GE(input_length, 0);
  int64_t max_compressed_len = MaxOutputLen(input_length);
  if (output_preallocated && *output_length < max_compressed_len) {
    return Status("SnappyCompressor::ProcessBlock: output length too small");
  }

  if (!output_preallocated) {
      if ((!reuse_buffer_ || buffer_length_ < max_compressed_len)) {
        DCHECK(memory_pool_ != nullptr) << "Can't allocate without passing in a mem pool";
        buffer_length_ = max_compressed_len;
        out_buffer_ = memory_pool_->Allocate(buffer_length_);
      }
    *output = out_buffer_;
  }

  size_t out_len;
  snappy::RawCompress(reinterpret_cast<const char*>(input),
      static_cast<size_t>(input_length),
      reinterpret_cast<char*>(*output), &out_len);
  *output_length = out_len;
  return Status::OK();
}

uint32_t SnappyCompressor::ComputeChecksum(int64_t input_len, const uint8_t* input) {
  crc_32_type crc;
  crc.process_bytes(reinterpret_cast<const char*>(input), input_len);
  uint32_t chk = crc.checksum();
  // Snappy requires the checksum to be masked.
  return ((chk >> 15) | (chk << 17)) + 0xa282ead8;
}

Lz4Compressor::Lz4Compressor(MemPool* mem_pool, bool reuse_buffer)
  : Codec(mem_pool, reuse_buffer) {
}

int64_t Lz4Compressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
  return LZ4_compressBound(input_len);
}

Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_length,
    const uint8_t* input, int64_t* output_length, uint8_t** output) {
  DCHECK_GE(input_length, 0);
  CHECK(output_preallocated) << "Output was not allocated for Lz4 Codec";
  if (input_length == 0) return Status::OK();
  if (MaxOutputLen(input_length, input) == 0) {
    return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, input_length);
  }
  *output_length = LZ4_compress_default(reinterpret_cast<const char*>(input),
      reinterpret_cast<char*>(*output), input_length, *output_length);
  return Status::OK();
}

ZstandardCompressor::ZstandardCompressor(MemPool* mem_pool, bool reuse_buffer, int clevel)
  : Codec(mem_pool, reuse_buffer), clevel_(clevel) {}

int64_t ZstandardCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
  return ZSTD_compressBound(input_len);
}

Status ZstandardCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
    const uint8_t* input, int64_t* output_length, uint8_t** output) {
  DCHECK_GE(input_length, 0);
  DCHECK(output_preallocated) << "Output was not allocated for Zstd Codec";
  if (input_length == 0) return Status::OK();
  *output_length = ZSTD_compress(*output, *output_length, input, input_length, clevel_);
  if (ZSTD_isError(*output_length)) {
    return Status(TErrorCode::ZSTD_ERROR, "ZSTD_compress",
        ZSTD_getErrorString(ZSTD_getErrorCode(*output_length)));
  }
  return Status::OK();
}

Lz4BlockCompressor::Lz4BlockCompressor(MemPool* mem_pool, bool reuse_buffer)
  : Codec(mem_pool, reuse_buffer) {
}

int64_t Lz4BlockCompressor::MaxOutputLen(int64_t input_length, const uint8_t* input) {
  // Hadoop uses a block compression scheme. For more details look at the comments for
  // the SnappyBlockCompressor implementation above.
  // If input_length == 0 then only the input_length will be stored in the compressed
  // block.
  if (input_length == 0) { return sizeof(int32_t); }

  // The length estimation includes upper bound on LZ4 compressed data for the given
  // input_length and two int storage for uncompressed length and compressed
  // length.
  return LZ4_compressBound(input_length) + 2 * sizeof(int32_t);
}

Status Lz4BlockCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
    const uint8_t* input, int64_t* output_length, uint8_t** output) {
  DCHECK_GE(input_length, 0);
  size_t length = MaxOutputLen(input_length, input);

  CHECK(output_preallocated && length <= *output_length)
    << " Output was not allocated for Lz4 Codec or is not sufficient."
    << " output_preallocated " << output_preallocated << " length: " << length
    << " output_length " << *output_length;

  uint8_t* outp = *output;
  ReadWriteUtil::PutInt(outp, static_cast<uint32_t>(input_length));
  outp += sizeof(int32_t);
  if (input_length > 0) {
    uint8_t* sizep = outp;
    outp += sizeof(int32_t);
    const int64_t size = LZ4_compress_default(reinterpret_cast<const char*>(input),
        reinterpret_cast<char*>(outp), input_length, *output_length - (outp - *output));
    if (size == 0) { return Status(TErrorCode::LZ4_COMPRESS_DEFAULT_FAILED); }
    ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size));
    outp += size;
    DCHECK_LE(outp - *output, length);
  }

  *output_length = outp - *output;
  return Status::OK();
}
