/**
 * Copyright 2016 Yahoo Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef LIB_PRODUCERIMPL_H_
#define LIB_PRODUCERIMPL_H_

#include <boost/thread/mutex.hpp>
#include <boost/date_time/posix_time/ptime.hpp>



#include "ClientImpl.h"
#include "BlockingQueue.h"
#include "HandlerBase.h"
#include "SharedBuffer.h"
#include "CompressionCodec.h"

using namespace pulsar;

namespace pulsar {

class BatchMessageContainer;

typedef boost::shared_ptr<BatchMessageContainer> BatchMessageContainerPtr;

class PulsarFriend;

struct OpSendMsg {
    Message msg_;
    SendCallback sendCallback_;
    uint64_t producerId_;
    uint64_t sequenceId_;
    boost::posix_time::ptime timeout_;

    OpSendMsg();
    OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& msg,
              const SendCallback& sendCallback, const ProducerConfiguration& conf);
};

class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<ProducerImpl>, public ProducerImplBase {
 public:

    ProducerImpl(ClientImplPtr client, const std::string& topic,
                 const ProducerConfiguration& producerConfiguration);
    ~ProducerImpl();

    virtual const std::string& getTopic() const;

    virtual void sendAsync(const Message& msg, SendCallback callback);

    virtual void closeAsync(CloseCallback callback);

    virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture();

    bool removeCorruptMessage(uint64_t sequenceId);

    bool ackReceived(uint64_t sequenceId);

    virtual void disconnectProducer();

    uint64_t getProducerId() const;

    virtual void start();

    virtual void shutdown();

    bool isClosed();

 protected:

    typedef BlockingQueue<OpSendMsg> MessageQueue;

    void setMessageMetadata(const Message &msg, const uint64_t& sequenceId, const uint32_t& uncompressedSize);

    void sendMessage(const Message& msg, SendCallback callback);

    void batchMessageTimeoutHandler(const boost::system::error_code& ec);

    friend class PulsarFriend;

    friend class BatchMessageContainer;

    virtual void connectionOpened(const ClientConnectionPtr& connection);
    virtual void connectionFailed(Result result);

    virtual HandlerBaseWeakPtr get_weak_from_this() {
        return shared_from_this();
    }

    const std::string& getName() const;

 private:
    void printStats();

    void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
                              const std::string& producerName);

    void handleClose(Result result, ResultCallback callback);

    void resendMessages(ClientConnectionPtr cnx);

    typedef boost::unique_lock<boost::mutex> Lock;

    ProducerConfiguration conf_;

    ExecutorServicePtr executor_;

    MessageQueue pendingMessagesQueue_;

    std::string producerName_;
    std::string producerStr_;
    uint64_t producerId_;
    uint64_t msgSequenceGenerator_;
    proto::BaseCommand cmd_;
    BatchMessageContainerPtr batchMessageContainer;

    typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr;
    TimerPtr sendTimer_;
    void handleSendTimeout(const boost::system::error_code& err);

    Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;

    void failPendingMessages(Result result);

    unsigned long numOfMsgPublished;
    unsigned long numOfSendAsyncCalls;
    unsigned long numOfMsgAckSuccessfully;
};

struct ProducerImplCmp {
    bool operator()(const ProducerImplPtr &a, const ProducerImplPtr &b) const;
};

} /* namespace pulsar */

#endif /* LIB_PRODUCERIMPL_H_ */
