blob: cfc1883b6e9824803715390a43ff5e6600fe1e62 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_PRODUCERIMPL_H_
#define LIB_PRODUCERIMPL_H_
#include <boost/thread/mutex.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include "ClientImpl.h"
#include "BlockingQueue.h"
#include "HandlerBase.h"
#include "SharedBuffer.h"
#include "CompressionCodec.h"
using namespace pulsar;
namespace pulsar {
class BatchMessageContainer;
typedef boost::shared_ptr<BatchMessageContainer> BatchMessageContainerPtr;
class PulsarFriend;
struct OpSendMsg {
Message msg_;
SendCallback sendCallback_;
uint64_t producerId_;
uint64_t sequenceId_;
boost::posix_time::ptime timeout_;
OpSendMsg();
OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& msg,
const SendCallback& sendCallback, const ProducerConfiguration& conf);
};
class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<ProducerImpl>, public ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const std::string& topic,
const ProducerConfiguration& producerConfiguration);
~ProducerImpl();
virtual const std::string& getTopic() const;
virtual void sendAsync(const Message& msg, SendCallback callback);
virtual void closeAsync(CloseCallback callback);
virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture();
bool removeCorruptMessage(uint64_t sequenceId);
bool ackReceived(uint64_t sequenceId);
virtual void disconnectProducer();
uint64_t getProducerId() const;
virtual void start();
virtual void shutdown();
bool isClosed();
protected:
typedef BlockingQueue<OpSendMsg> MessageQueue;
void setMessageMetadata(const Message &msg, const uint64_t& sequenceId, const uint32_t& uncompressedSize);
void sendMessage(const Message& msg, SendCallback callback);
void batchMessageTimeoutHandler(const boost::system::error_code& ec);
friend class PulsarFriend;
friend class BatchMessageContainer;
virtual void connectionOpened(const ClientConnectionPtr& connection);
virtual void connectionFailed(Result result);
virtual HandlerBaseWeakPtr get_weak_from_this() {
return shared_from_this();
}
const std::string& getName() const;
private:
void printStats();
void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const std::string& producerName);
void handleClose(Result result, ResultCallback callback);
void resendMessages(ClientConnectionPtr cnx);
typedef boost::unique_lock<boost::mutex> Lock;
ProducerConfiguration conf_;
ExecutorServicePtr executor_;
MessageQueue pendingMessagesQueue_;
std::string producerName_;
std::string producerStr_;
uint64_t producerId_;
uint64_t msgSequenceGenerator_;
proto::BaseCommand cmd_;
BatchMessageContainerPtr batchMessageContainer;
typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr;
TimerPtr sendTimer_;
void handleSendTimeout(const boost::system::error_code& err);
Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
void failPendingMessages(Result result);
unsigned long numOfMsgPublished;
unsigned long numOfSendAsyncCalls;
unsigned long numOfMsgAckSuccessfully;
};
struct ProducerImplCmp {
bool operator()(const ProducerImplPtr &a, const ProducerImplPtr &b) const;
};
} /* namespace pulsar */
#endif /* LIB_PRODUCERIMPL_H_ */