blob: 19996328870cca74ea63bc5794814b64ca7d6f15 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* \class BatchMessageContainer
*
* \brief This class is a container for holding individual messages being published until they are batched and sent to broker.
*
* \note This class is not thread safe.
*/
#ifndef LIB_BATCHMESSAGECONTAINER_H_
#define LIB_BATCHMESSAGECONTAINER_H_
#include <string>
#include <vector>
#include <utility>
#include <pulsar/MessageBuilder.h>
#include "MessageImpl.h"
#include "CompressionCodec.h"
#include "Commands.h"
#include "LogUtils.h"
#include "ObjectPool.h"
#include <boost/bind.hpp>
#include "ExecutorService.h"
#include <boost/asio.hpp>
#include "ProducerImpl.h"
namespace pulsar {
class BatchMessageContainer {
public:
struct MessageContainer {
MessageContainer(Message message, SendCallback sendCallback)
: message_(message),
sendCallback_(sendCallback) {
}
Message message_;
SendCallback sendCallback_;
};
typedef std::vector<MessageContainer> MessageContainerList;
typedef boost::shared_ptr<MessageContainerList> MessageContainerListPtr;
BatchMessageContainer(ProducerImpl& producer);
~BatchMessageContainer();
void add(const Message& msg, SendCallback sendCallback, bool disableCheck = false);
SharedBuffer getBatchedPayload();
void clear();
static void batchMessageCallBack(Result r, MessageContainerListPtr messages);
friend inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainer& batchMessageContainer);
friend class ProducerImpl;
private:
const CompressionType compressionType_;
const unsigned int maxAllowedNumMessagesInBatch_;
const unsigned long maxAllowedMessageBatchSizeInBytes_;
unsigned long batchSizeInBytes_;
/// Topic Name is used for creating descriptors in log messages
const std::string topicName_;
/// Producer Name is used for creating descriptors in log messages
std::string producerName_;
Message::MessageImplPtr impl_;
// This copy (to vector) is needed since OpSendMsg no long holds the individual message and w/o a container
// the impl_ Shared Pointer will delete the data.
MessageContainerListPtr messagesContainerListPtr_;
ProducerImpl& producer_;
DeadlineTimerPtr timer_;
unsigned long numberOfBatchesSent_;
double averageBatchSize_;
void compressPayLoad();
inline bool isEmpty() const;
inline bool isFull() const;
inline bool hasSpaceInBatch(const Message& msg) const;
void startTimer();
void sendMessage();
};
bool BatchMessageContainer::hasSpaceInBatch(const Message& msg) const {
return (msg.impl_->payload.readableBytes() + this->batchSizeInBytes_
<= this->maxAllowedMessageBatchSizeInBytes_)
&& (this->messagesContainerListPtr_->size() < this->maxAllowedNumMessagesInBatch_);
}
bool BatchMessageContainer::isEmpty() const {
return this->messagesContainerListPtr_->empty();
}
bool BatchMessageContainer::isFull() const {
return (this->batchSizeInBytes_ >= this->maxAllowedMessageBatchSizeInBytes_
|| this->messagesContainerListPtr_->size() >= this->maxAllowedNumMessagesInBatch_);
}
std::ostream& operator<<(std::ostream& os, const BatchMessageContainer& b) {
os << "{ BatchContainer [size = " << b.messagesContainerListPtr_->size() << "] [batchSizeInBytes_ = "
<< b.batchSizeInBytes_ << "] [maxAllowedMessageBatchSizeInBytes_ = "
<< b.maxAllowedMessageBatchSizeInBytes_ << "] [maxAllowedNumMessagesInBatch_ = "
<< b.maxAllowedNumMessagesInBatch_ << "] [topicName = " << b.topicName_
<< "] [producerName_ = " << b.producerName_ << "] [batchSizeInBytes_ = "
<< b.batchSizeInBytes_ << "] [numberOfBatchesSent = " << b.numberOfBatchesSent_
<< "] [averageBatchSize = " << b.averageBatchSize_ << "]}";
return os;
}
}
#endif /* LIB_BATCHMESSAGECONTAINER_H_ */