/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#ifndef LIB_SHARED_BUFFER_H_
#define LIB_SHARED_BUFFER_H_

#include <boost/asio.hpp>

#include <array>
#include <vector>

namespace pulsar {
namespace detail {

class SharedBufferInternal : public std::vector<char> {
   public:
    SharedBufferInternal(int size) : std::vector<char>(size) {}

    inline char* ptr() { return &(*this)[0]; }

    void resize(int newSize) { std::vector<char>::resize(newSize); }

    int capacity() const { return std::vector<char>::capacity(); }
};
}  // namespace detail

class SharedBuffer {
   public:
    explicit SharedBuffer() : data_(), ptr_(0), readIdx_(0), writeIdx_(0), capacity_(0) {}

    /**
     * Allocate a buffer of given size
     */
    static SharedBuffer allocate(const uint32_t size) { return SharedBuffer(size); }

    /**
     * Create a buffer with a copy of memory pointed by ptr
     */
    static SharedBuffer copy(const char* ptr, uint32_t size) {
        SharedBuffer buf = allocate(size);
        buf.write(ptr, size);
        return buf;
    }

    static SharedBuffer copyFrom(const SharedBuffer& other, uint32_t capacity) {
        assert(other.readableBytes() <= capacity);
        SharedBuffer buf = allocate(capacity);
        buf.write(other.data(), other.readableBytes());
        return buf;
    }

    /**
     * Create a buffer that wraps the passed pointer, without copying the memory
     */
    static SharedBuffer wrap(char* ptr, size_t size) { return SharedBuffer(ptr, size); }

    inline const char* data() const { return ptr_ + readIdx_; }

    inline char* mutableData() { return ptr_ + writeIdx_; }

    /**
     * Return a shared buffer that include a portion of current buffer. No memory is copied
     */
    SharedBuffer slice(uint32_t offset) {
        SharedBuffer buf(*this);
        buf.consume(offset);
        return buf;
    }

    SharedBuffer slice(uint32_t offset, uint32_t length) {
        SharedBuffer buf(*this);
        buf.consume(offset);
        assert(buf.readableBytes() >= length);
        buf.writeIdx_ = buf.readIdx_ + length;
        return buf;
    }

    uint32_t readUnsignedInt() {
        assert(readableBytes() >= sizeof(uint32_t));
        uint32_t value = ntohl(*(uint32_t*)data());
        consume(sizeof(uint32_t));
        return value;
    }

    uint16_t readUnsignedShort() {
        assert(readableBytes() >= sizeof(uint16_t));
        uint16_t value = ntohs(*(uint16_t*)data());
        consume(sizeof(uint16_t));
        return value;
    }

    void writeUnsignedInt(uint32_t value) {
        assert(writableBytes() >= sizeof(uint32_t));
        *(uint32_t*)(mutableData()) = htonl(value);
        bytesWritten(sizeof(value));
    }

    void writeUnsignedShort(uint16_t value) {
        assert(writableBytes() >= sizeof(uint16_t));
        *(uint16_t*)(mutableData()) = htons(value);
        bytesWritten(sizeof(value));
    }

    inline uint32_t readableBytes() const { return writeIdx_ - readIdx_; }

    inline uint32_t writableBytes() const { return capacity_ - writeIdx_; }

    inline bool readable() const { return readableBytes() > 0; }

    inline bool writable() const { return writableBytes() > 0; }

    boost::asio::const_buffers_1 const_asio_buffer() const {
        return boost::asio::const_buffers_1(ptr_ + readIdx_, readableBytes());
    }

    boost::asio::mutable_buffers_1 asio_buffer() {
        assert(data_);
        return boost::asio::buffer(ptr_ + writeIdx_, writableBytes());
    }

    void write(const char* data, uint32_t size) {
        assert(size <= writableBytes());

        std::copy(data, data + size, mutableData());
        bytesWritten(size);
    }

    // Mark that some bytes were written into the buffer
    inline void bytesWritten(uint32_t size) {
        assert(size <= writableBytes());
        writeIdx_ += size;
    }

    // Return current writer index
    uint32_t writerIndex() { return writeIdx_; }

    // skip writerIndex
    void skipBytes(uint32_t size) {
        assert(writeIdx_ + size <= capacity_);
        writeIdx_ += size;
    }

    // set writerIndex
    void setWriterIndex(uint32_t index) {
        assert(index <= capacity_);
        writeIdx_ = index;
    }

    // Return current reader index
    uint32_t readerIndex() { return readIdx_; }

    // set readerIndex
    void setReaderIndex(uint32_t index) {
        assert(index <= capacity_);
        readIdx_ = index;
    }

    inline void consume(uint32_t size) {
        assert(size <= readableBytes());
        readIdx_ += size;
    }

    inline void rollback(uint32_t size) {
        assert(size <= readIdx_);
        readIdx_ -= size;
    }

    inline void reset() {
        readIdx_ = 0;
        writeIdx_ = 0;
    }

   private:
    typedef std::shared_ptr<detail::SharedBufferInternal> BufferPtr;

    BufferPtr data_;
    char* ptr_;
    uint32_t readIdx_;
    uint32_t writeIdx_;
    uint32_t capacity_;

    SharedBuffer(char* ptr, size_t size)
        : data_(), ptr_(ptr), readIdx_(0), writeIdx_(size), capacity_(size) {}

    explicit SharedBuffer(size_t size)
        : data_(std::make_shared<detail::SharedBufferInternal>(size)),
          ptr_(data_->ptr()),
          readIdx_(0),
          writeIdx_(0),
          capacity_(size) {}
};

template <int Size>
class CompositeSharedBuffer {
   public:
    void set(int idx, const SharedBuffer& buffer) {
        sharedBuffers_[idx] = buffer;
        asioBuffers_[idx] = buffer.const_asio_buffer();
    }

    // Implement the ConstBufferSequence requirements.
    typedef boost::asio::const_buffer value_type;
    typedef boost::asio::const_buffer* iterator;
    typedef const boost::asio::const_buffer* const_iterator;

    const boost::asio::const_buffer* begin() const { return &(asioBuffers_.at(0)); }

    const boost::asio::const_buffer* end() const { return begin() + Size; }

   private:
    std::array<SharedBuffer, Size> sharedBuffers_;
    std::array<boost::asio::const_buffer, Size> asioBuffers_;
};

typedef CompositeSharedBuffer<2> PairSharedBuffer;
}  // namespace pulsar

#endif /* LIB_SHARED_BUFFER_H_ */
