/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1

#include <thrift/transport/TTransport.h>
#include <thrift/Thrift.h>
#include <thrift/TProcessor.h>

#include <string>
#include <stdio.h>

#include <boost/atomic.hpp>
#include <thrift/stdcxx.h>

#include <thrift/concurrency/Mutex.h>
#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/Thread.h>

namespace apache {
namespace thrift {
namespace transport {

using apache::thrift::TProcessor;
using apache::thrift::protocol::TProtocolFactory;
using apache::thrift::concurrency::Mutex;
using apache::thrift::concurrency::Monitor;

// Data pertaining to a single event
typedef struct eventInfo {
  uint8_t* eventBuff_;
  uint32_t eventSize_;
  uint32_t eventBuffPos_;

  eventInfo() : eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
  ~eventInfo() {
    if (eventBuff_) {
      delete[] eventBuff_;
    }
  }
} eventInfo;

// information about current read state
typedef struct readState {
  eventInfo* event_;

  // keep track of event size
  uint8_t eventSizeBuff_[4];
  uint8_t eventSizeBuffPos_;
  bool readingSize_;

  // read buffer variables
  int32_t bufferPtr_;
  int32_t bufferLen_;

  // last successful dispatch point
  int32_t lastDispatchPtr_;

  void resetState(uint32_t lastDispatchPtr) {
    readingSize_ = true;
    eventSizeBuffPos_ = 0;
    lastDispatchPtr_ = lastDispatchPtr;
  }

  void resetAllValues() {
    resetState(0);
    bufferPtr_ = 0;
    bufferLen_ = 0;
    if (event_) {
      delete (event_);
    }
    event_ = 0;
  }

  inline uint32_t getEventSize() {
    const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_);
    return *reinterpret_cast<const uint32_t*>(buffer);
  }

  readState() {
    event_ = 0;
    resetAllValues();
  }

  ~readState() {
    if (event_) {
      delete (event_);
    }
  }

} readState;

/**
 * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
 * to be written to disk.  Should be used in the following way:
 *  1) Buffer created
 *  2) Buffer written to (addEvent)
 *  3) Buffer read from (getNext)
 *  4) Buffer reset (reset)
 *  5) Go back to 2, or destroy buffer
 *
 * The buffer should never be written to after it is read from, unless it is reset first.
 * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
 *       which uses the buffer in this way.
 *
 */
class TFileTransportBuffer {
public:
  TFileTransportBuffer(uint32_t size);
  ~TFileTransportBuffer();

  bool addEvent(eventInfo* event);
  eventInfo* getNext();
  void reset();
  bool isFull();
  bool isEmpty();

private:
  TFileTransportBuffer(); // should not be used

  enum mode { WRITE, READ };
  mode bufferMode_;

  uint32_t writePoint_;
  uint32_t readPoint_;
  uint32_t size_;
  eventInfo** buffer_;
};

/**
 * Abstract interface for transports used to read files
 */
class TFileReaderTransport : virtual public TTransport {
public:
  virtual int32_t getReadTimeout() = 0;
  virtual void setReadTimeout(int32_t readTimeout) = 0;

  virtual uint32_t getNumChunks() = 0;
  virtual uint32_t getCurChunk() = 0;
  virtual void seekToChunk(int32_t chunk) = 0;
  virtual void seekToEnd() = 0;
};

/**
 * Abstract interface for transports used to write files
 */
class TFileWriterTransport : virtual public TTransport {
public:
  virtual uint32_t getChunkSize() = 0;
  virtual void setChunkSize(uint32_t chunkSize) = 0;
};

/**
 * File implementation of a transport. Reads and writes are done to a
 * file on disk.
 *
 */
class TFileTransport : public TFileReaderTransport, public TFileWriterTransport {
public:
  TFileTransport(std::string path, bool readOnly = false);
  ~TFileTransport();

  // TODO: what is the correct behaviour for this?
  // the log file is generally always open
  bool isOpen() { return true; }

  void write(const uint8_t* buf, uint32_t len);
  void flush();

  uint32_t readAll(uint8_t* buf, uint32_t len);
  uint32_t read(uint8_t* buf, uint32_t len);
  bool peek();

  // log-file specific functions
  void seekToChunk(int32_t chunk);
  void seekToEnd();
  uint32_t getNumChunks();
  uint32_t getCurChunk();

  // for changing the output file
  void resetOutputFile(int fd, std::string filename, off_t offset);

  // Setter/Getter functions for user-controllable options
  void setReadBuffSize(uint32_t readBuffSize) {
    if (readBuffSize) {
      readBuffSize_ = readBuffSize;
    }
  }
  uint32_t getReadBuffSize() { return readBuffSize_; }

  static const int32_t TAIL_READ_TIMEOUT = -1;
  static const int32_t NO_TAIL_READ_TIMEOUT = 0;
  void setReadTimeout(int32_t readTimeout) { readTimeout_ = readTimeout; }
  int32_t getReadTimeout() { return readTimeout_; }

  void setChunkSize(uint32_t chunkSize) {
    if (chunkSize) {
      chunkSize_ = chunkSize;
    }
  }
  uint32_t getChunkSize() { return chunkSize_; }

  void setEventBufferSize(uint32_t bufferSize) {
    if (bufferAndThreadInitialized_) {
      GlobalOutput("Cannot change the buffer size after writer thread started");
      return;
    }
    eventBufferSize_ = bufferSize;
  }

  uint32_t getEventBufferSize() { return eventBufferSize_; }

  void setFlushMaxUs(uint32_t flushMaxUs) {
    if (flushMaxUs) {
      flushMaxUs_ = flushMaxUs;
    }
  }
  uint32_t getFlushMaxUs() { return flushMaxUs_; }

  void setFlushMaxBytes(uint32_t flushMaxBytes) {
    if (flushMaxBytes) {
      flushMaxBytes_ = flushMaxBytes;
    }
  }
  uint32_t getFlushMaxBytes() { return flushMaxBytes_; }

  void setMaxEventSize(uint32_t maxEventSize) { maxEventSize_ = maxEventSize; }
  uint32_t getMaxEventSize() { return maxEventSize_; }

  void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
    maxCorruptedEvents_ = maxCorruptedEvents;
  }
  uint32_t getMaxCorruptedEvents() { return maxCorruptedEvents_; }

  void setEofSleepTimeUs(uint32_t eofSleepTime) {
    if (eofSleepTime) {
      eofSleepTime_ = eofSleepTime;
    }
  }
  uint32_t getEofSleepTimeUs() { return eofSleepTime_; }

  /*
   * Override TTransport *_virt() functions to invoke our implementations.
   * We cannot use TVirtualTransport to provide these, since we need to inherit
   * virtually from TTransport.
   */
  virtual uint32_t read_virt(uint8_t* buf, uint32_t len) { return this->read(buf, len); }
  virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) { return this->readAll(buf, len); }
  virtual void write_virt(const uint8_t* buf, uint32_t len) { this->write(buf, len); }

private:
  // helper functions for writing to a file
  void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
  bool swapEventBuffers(struct timeval* deadline);
  bool initBufferAndWriteThread();

  // control for writer thread
  static void* startWriterThread(void* ptr) {
    static_cast<TFileTransport*>(ptr)->writerThread();
    return NULL;
  }
  void writerThread();

  // helper functions for reading from a file
  eventInfo* readEvent();

  // event corruption-related functions
  bool isEventCorrupted();
  void performRecovery();

  // Utility functions
  void openLogFile();
  void getNextFlushTime(struct timeval* ts_next_flush);

  // Class variables
  readState readState_;
  uint8_t* readBuff_;
  eventInfo* currentEvent_;

  uint32_t readBuffSize_;
  static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;

  int32_t readTimeout_;
  static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;

  // size of chunks that file will be split up into
  uint32_t chunkSize_;
  static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;

  // size of event buffers
  uint32_t eventBufferSize_;
  static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;

  // max number of microseconds that can pass without flushing
  uint32_t flushMaxUs_;
  static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;

  // max number of bytes that can be written without flushing
  uint32_t flushMaxBytes_;
  static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;

  // max event size
  uint32_t maxEventSize_;
  static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;

  // max number of corrupted events per chunk
  uint32_t maxCorruptedEvents_;
  static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;

  // sleep duration when EOF is hit
  uint32_t eofSleepTime_;
  static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;

  // sleep duration when a corrupted event is encountered
  uint32_t corruptedEventSleepTime_;
  static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;

  // sleep duration in seconds when an IO error is encountered in the writer thread
  uint32_t writerThreadIOErrorSleepTime_;
  static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;

  // writer thread
  apache::thrift::concurrency::PlatformThreadFactory threadFactory_;
  stdcxx::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;

  // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
  // needs to be written to the file.  The buffers are swapped by the writer thread.
  TFileTransportBuffer* dequeueBuffer_;
  TFileTransportBuffer* enqueueBuffer_;

  // conditions used to block when the buffer is full or empty
  Monitor notFull_, notEmpty_;
  boost::atomic<bool> closing_;

  // To keep track of whether the buffer has been flushed
  Monitor flushed_;
  boost::atomic<bool> forceFlush_;

  // Mutex that is grabbed when enqueueing and swapping the read/write buffers
  Mutex mutex_;

  // File information
  std::string filename_;
  int fd_;

  // Whether the writer thread and buffers have been initialized
  bool bufferAndThreadInitialized_;

  // Offset within the file
  off_t offset_;

  // event corruption information
  uint32_t lastBadChunk_;
  uint32_t numCorruptedEventsInChunk_;

  bool readOnly_;
};

// Exception thrown when EOF is hit
class TEOFException : public TTransportException {
public:
  TEOFException() : TTransportException(TTransportException::END_OF_FILE){};
};

// wrapper class to process events from a file containing thrift events
class TFileProcessor {
public:
  /**
   * Constructor that defaults output transport to null transport
   *
   * @param processor processes log-file events
   * @param protocolFactory protocol factory
   * @param inputTransport file transport
   */
  TFileProcessor(stdcxx::shared_ptr<TProcessor> processor,
                 stdcxx::shared_ptr<TProtocolFactory> protocolFactory,
                 stdcxx::shared_ptr<TFileReaderTransport> inputTransport);

  TFileProcessor(stdcxx::shared_ptr<TProcessor> processor,
                 stdcxx::shared_ptr<TProtocolFactory> inputProtocolFactory,
                 stdcxx::shared_ptr<TProtocolFactory> outputProtocolFactory,
                 stdcxx::shared_ptr<TFileReaderTransport> inputTransport);

  /**
   * Constructor
   *
   * @param processor processes log-file events
   * @param protocolFactory protocol factory
   * @param inputTransport input file transport
   * @param output output transport
   */
  TFileProcessor(stdcxx::shared_ptr<TProcessor> processor,
                 stdcxx::shared_ptr<TProtocolFactory> protocolFactory,
                 stdcxx::shared_ptr<TFileReaderTransport> inputTransport,
                 stdcxx::shared_ptr<TTransport> outputTransport);

  /**
   * processes events from the file
   *
   * @param numEvents number of events to process (0 for unlimited)
   * @param tail tails the file if true
   */
  void process(uint32_t numEvents, bool tail);

  /**
   * process events until the end of the chunk
   *
   */
  void processChunk();

private:
  stdcxx::shared_ptr<TProcessor> processor_;
  stdcxx::shared_ptr<TProtocolFactory> inputProtocolFactory_;
  stdcxx::shared_ptr<TProtocolFactory> outputProtocolFactory_;
  stdcxx::shared_ptr<TFileReaderTransport> inputTransport_;
  stdcxx::shared_ptr<TTransport> outputTransport_;
};
}
}
} // apache::thrift::transport

#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_

