/**
 * @file CompressContent.h
 * CompressContent class declaration
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef __COMPRESS_CONTENT_H__
#define __COMPRESS_CONTENT_H__

#include <cinttypes>

#include "archive_entry.h"
#include "archive.h"

#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Core.h"
#include "core/Resource.h"
#include "core/Property.h"
#include "core/logging/LoggerConfiguration.h"
#include "io/ZlibStream.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {

#define COMPRESSION_FORMAT_ATTRIBUTE "use mime.type attribute"
#define COMPRESSION_FORMAT_GZIP "gzip"
#define COMPRESSION_FORMAT_BZIP2 "bzip2"
#define COMPRESSION_FORMAT_XZ_LZMA2 "xz-lzma2"
#define COMPRESSION_FORMAT_LZMA "lzma"

#define MODE_COMPRESS "compress"
#define MODE_DECOMPRESS "decompress"

// CompressContent Class
class CompressContent: public core::Processor {
public:
  // Constructor
  /*!
   * Create a new processor
   */
  explicit CompressContent(std::string name, utils::Identifier uuid = utils::Identifier())
    : core::Processor(name, uuid)
    , logger_(logging::LoggerFactory<CompressContent>::getLogger())
    , updateFileName_(false)
    , encapsulateInTar_(false) {
  }
  // Destructor
  virtual ~CompressContent() = default;
  // Processor Name
  static constexpr char const* ProcessorName = "CompressContent";
  // Supported Properties
  static core::Property CompressMode;
  static core::Property CompressLevel;
  static core::Property CompressFormat;
  static core::Property UpdateFileName;
  static core::Property EncapsulateInTar;

  // Supported Relationships
  static core::Relationship Failure;
  static core::Relationship Success;

public:
  // Nest Callback Class for read stream from flow for compress
  class ReadCallbackCompress: public InputStreamCallback {
  public:
    ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) :
        flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) {
    }
    ~ReadCallbackCompress() = default;
    int64_t process(std::shared_ptr<io::BaseStream> stream) {
      uint8_t buffer[4096U];
      int64_t ret = 0;
      uint64_t read_size = 0;

      ret = archive_write_header(arch_, entry_);
      if (ret != ARCHIVE_OK) {
        logger_->log_error("Compress Content archive error %s", archive_error_string(arch_));
        status_ = -1;
        return -1;
      }
      while (read_size < flow_->getSize()) {
        ret = stream->read(buffer, sizeof(buffer));
        if (ret < 0) {
          status_ = -1;
          return -1;
        }
        if (ret > 0) {
          ret = archive_write_data(arch_, buffer, gsl::narrow<size_t>(ret));
          if (ret < 0) {
            logger_->log_error("Compress Content archive error %s", archive_error_string(arch_));
            status_ = -1;
            return -1;
          }
          read_size += ret;
        } else {
          break;
        }
      }
      return read_size;
    }
    std::shared_ptr<core::FlowFile> flow_;
    struct archive *arch_;
    struct archive_entry *entry_;
    int status_;
    std::shared_ptr<logging::Logger> logger_;
  };
  // Nest Callback Class for read stream from flow for decompress
  class ReadCallbackDecompress: public InputStreamCallback {
  public:
    ReadCallbackDecompress(std::shared_ptr<core::FlowFile> &flow) :
        read_size_(0), offset_(0), flow_(flow) {
      origin_offset_ = flow_->getOffset();
    }
    ~ReadCallbackDecompress() = default;
    int64_t process(std::shared_ptr<io::BaseStream> stream) {
      read_size_ = 0;
      stream->seek(offset_);
      int readRet = stream->read(buffer_, sizeof(buffer_));
      read_size_ = readRet;
      if (readRet > 0) {
        offset_ += read_size_;
      }
      return readRet;
    }
    int64_t read_size_;
    uint8_t buffer_[8192];
    uint64_t offset_;
    uint64_t origin_offset_;
    std::shared_ptr<core::FlowFile> flow_;
  };
  // Nest Callback Class for write stream
  class WriteCallback: public OutputStreamCallback {
  public:
    WriteCallback(std::string &compress_mode, int compress_level, std::string &compress_format,
        std::shared_ptr<core::FlowFile> &flow, const std::shared_ptr<core::ProcessSession> &session) :
        compress_mode_(compress_mode), compress_level_(compress_level), compress_format_(compress_format),
        flow_(flow), session_(session),
        logger_(logging::LoggerFactory<CompressContent>::getLogger()),
        readDecompressCb_(flow) {
      size_ = 0;
      stream_ = nullptr;
      status_ = 0;
    }
    ~WriteCallback() = default;

    std::string compress_mode_;
    int compress_level_;
    std::string compress_format_;
    std::shared_ptr<core::FlowFile> flow_;
    std::shared_ptr<core::ProcessSession> session_;
    std::shared_ptr<io::BaseStream> stream_;
    int64_t size_;
    std::shared_ptr<logging::Logger> logger_;
    CompressContent::ReadCallbackDecompress readDecompressCb_;
    int status_;

    static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) {
      WriteCallback *callback = (WriteCallback *) context;
      la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), size);
      if (ret > 0)
        callback->size_ += (int64_t) ret;
      return ret;
    }

    static la_ssize_t archive_read(struct archive *arch, void *context, const void **buff) {
      WriteCallback *callback = (WriteCallback *) context;
      callback->session_->read(callback->flow_, &callback->readDecompressCb_);
      if (callback->readDecompressCb_.read_size_ >= 0) {
        *buff = callback->readDecompressCb_.buffer_;
        return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.read_size_);
      } else {
        archive_set_error(arch, EIO, "Error reading flowfile");
        return -1;
      }
    }

    static la_int64_t archive_skip(struct archive *a, void *client_data, la_int64_t request) {
      return 0;
    }

    void archive_write_log_error_cleanup(struct archive *arch) {
      logger_->log_error("Compress Content archive write error %s", archive_error_string(arch));
      status_ = -1;
      archive_write_free(arch);
    }

    void archive_read_log_error_cleanup(struct archive *arch) {
      logger_->log_error("Compress Content archive read error %s", archive_error_string(arch));
      status_ = -1;
      archive_read_free(arch);
    }

    int64_t process(std::shared_ptr<io::BaseStream> stream) {
      struct archive *arch;
      int r;

      if (compress_mode_ == MODE_COMPRESS) {
        arch = archive_write_new();
        if (!arch) {
          status_ = -1;
          return -1;
        }
        r = archive_write_set_format_ustar(arch);
        if (r != ARCHIVE_OK) {
          archive_write_log_error_cleanup(arch);
          return -1;
        }
        if (compress_format_ == COMPRESSION_FORMAT_GZIP) {
          r = archive_write_add_filter_gzip(arch);
          if (r != ARCHIVE_OK) {
            archive_write_log_error_cleanup(arch);
            return -1;
          }
          std::string option;
          option = "gzip:compression-level=" + std::to_string(compress_level_);
          r = archive_write_set_options(arch, option.c_str());
          if (r != ARCHIVE_OK) {
            archive_write_log_error_cleanup(arch);
            return -1;
          }
        } else if (compress_format_ == COMPRESSION_FORMAT_BZIP2) {
          r = archive_write_add_filter_bzip2(arch);
          if (r != ARCHIVE_OK) {
            archive_write_log_error_cleanup(arch);
            return -1;
          }
        } else if (compress_format_ == COMPRESSION_FORMAT_LZMA) {
          r = archive_write_add_filter_lzma(arch);
          if (r != ARCHIVE_OK) {
            archive_write_log_error_cleanup(arch);
            return -1;
          }
        } else if (compress_format_ == COMPRESSION_FORMAT_XZ_LZMA2) {
          r = archive_write_add_filter_xz(arch);
          if (r != ARCHIVE_OK) {
            archive_write_log_error_cleanup(arch);
            return -1;
          }
        } else {
            archive_write_log_error_cleanup(arch);
            return -1;
        }
        r = archive_write_set_bytes_per_block(arch, 0);
        if (r != ARCHIVE_OK) {
          archive_write_log_error_cleanup(arch);
          return -1;
        }
        this->stream_ = stream;
        r = archive_write_open(arch, this, NULL, archive_write, NULL);
        if (r != ARCHIVE_OK) {
          archive_write_log_error_cleanup(arch);
          return -1;
        }
        struct archive_entry *entry = archive_entry_new();
        if (!entry) {
          archive_write_log_error_cleanup(arch);
          return -1;
        }
        std::string fileName;
        flow_->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
        archive_entry_set_pathname(entry, fileName.c_str());
        archive_entry_set_size(entry, flow_->getSize());
        archive_entry_set_mode(entry, S_IFREG | 0755);
        ReadCallbackCompress readCb(flow_, arch, entry);
        session_->read(flow_, &readCb);
        if (readCb.status_ < 0) {
          archive_entry_free(entry);
          archive_write_log_error_cleanup(arch);
          status_ = -1;
          return -1;
        }
        archive_entry_free(entry);
        archive_write_close(arch);
        archive_write_free(arch);
        return size_;
      } else {
        arch = archive_read_new();
        if (!arch) {
          status_ = -1;
          return -1;
        }
        r = archive_read_support_format_all(arch);
        if (r != ARCHIVE_OK) {
          archive_read_log_error_cleanup(arch);
          return -1;
        }
        r = archive_read_support_filter_all(arch);
        if (r != ARCHIVE_OK) {
          archive_read_log_error_cleanup(arch);
          return -1;
        }
        this->stream_ = stream;
        r = archive_read_open2(arch, this, NULL, archive_read, archive_skip, NULL);
        if (r != ARCHIVE_OK) {
          archive_read_log_error_cleanup(arch);
          return -1;
        }
        struct archive_entry *entry;
        if (archive_read_next_header(arch, &entry) != ARCHIVE_OK) {
          archive_read_log_error_cleanup(arch);
          return -1;
        }
        int64_t entry_size = archive_entry_size(entry);
        logger_->log_debug("Decompress Content archive entry size %" PRId64, entry_size);
        size_ = 0;
        while (size_ < entry_size) {
          char buffer[8192];
          int ret = archive_read_data(arch, buffer, sizeof(buffer));
          if (ret < 0) {
            archive_read_log_error_cleanup(arch);
            return -1;
          }
          if (ret == 0)
            break;
          size_ += ret;
          ret = stream_->write(reinterpret_cast<uint8_t*>(buffer), ret);
          if (ret < 0) {
            archive_read_log_error_cleanup(arch);
            return -1;
          }
        }
        archive_read_close(arch);
        archive_read_free(arch);
        return size_;
      }
    }
  };

  class GzipWriteCallback : public OutputStreamCallback {
   public:
    GzipWriteCallback(std::string compress_mode, int compress_level, std::shared_ptr<core::FlowFile> flow, std::shared_ptr<core::ProcessSession> session)
      : logger_(logging::LoggerFactory<CompressContent>::getLogger())
      , compress_mode_(std::move(compress_mode))
      , compress_level_(compress_level)
      , flow_(std::move(flow))
      , session_(std::move(session)) {
    }

    std::shared_ptr<logging::Logger> logger_;
    std::string compress_mode_;
    int compress_level_;
    std::shared_ptr<core::FlowFile> flow_;
    std::shared_ptr<core::ProcessSession> session_;
    bool success_{false};

    int64_t process(std::shared_ptr<io::BaseStream> outputStream) override {
      class ReadCallback : public InputStreamCallback {
       public:
        ReadCallback(GzipWriteCallback& writer, std::shared_ptr<io::OutputStream> outputStream)
          : writer_(writer)
          , outputStream_(std::move(outputStream)) {
        }

        int64_t process(std::shared_ptr<io::BaseStream> inputStream) override {
          std::vector<uint8_t> buffer(16 * 1024U);
          int64_t read_size = 0;
          while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
            int ret = inputStream->read(buffer.data(), buffer.size());
            if (ret < 0) {
              return -1;
            } else if (ret == 0) {
              break;
            } else {
              if (outputStream_->write(buffer.data(), ret) != ret) {
                return -1;
              }
              read_size += ret;
            }
          }
          outputStream_->close();
          return read_size;
        }

        GzipWriteCallback& writer_;
        std::shared_ptr<io::OutputStream> outputStream_;
      };

      std::shared_ptr<io::ZlibBaseStream> filterStream;
      if (compress_mode_ == MODE_COMPRESS) {
        filterStream = std::make_shared<io::ZlibCompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP, compress_level_);
      } else {
        filterStream = std::make_shared<io::ZlibDecompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP);
      }
      ReadCallback readCb(*this, filterStream);
      session_->read(flow_, &readCb);

      success_ = filterStream->isFinished();

      return flow_->getSize();
    }
  };

public:
  /**
   * Function that's executed when the processor is scheduled.
   * @param context process context.
   * @param sessionFactory process session factory that is used when creating
   * ProcessSession objects.
   */
  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
  // OnTrigger method, implemented by NiFi CompressContent
  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
  }
  // OnTrigger method, implemented by NiFi CompressContent
  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
  // Initialize, over write by NiFi CompressContent
  virtual void initialize(void);

protected:

private:
  std::shared_ptr<logging::Logger> logger_;
  int compressLevel_;
  std::string compressMode_;
  std::string compressFormat_;
  bool updateFileName_;
  bool encapsulateInTar_;
  std::map<std::string, std::string> compressionFormatMimeTypeMap_;
  std::map<std::string, std::string> fileExtension_;
};

REGISTER_RESOURCE(CompressContent, "Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type attribute as appropriate");

} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */

#endif
