blob: 8a32d8e9dca8aff06dc274477837b5ee9f3f558d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef LIB_BATCHMESSAGECONTAINERBASE_H_
#define LIB_BATCHMESSAGECONTAINERBASE_H_
#include <pulsar/Result.h>
#include <pulsar/Message.h>
#include <pulsar/ProducerConfiguration.h>
#include <pulsar/Producer.h>
#include <memory>
#include <vector>
#include <boost/noncopyable.hpp>
#include "MessageAndCallbackBatch.h"
#include "OpSendMsg.h"
namespace pulsar {
class MessageCrypto;
class ProducerImpl;
class SharedBuffer;
namespace proto {
class MessageMetadata;
} // namespace proto
class BatchMessageContainerBase : public boost::noncopyable {
public:
BatchMessageContainerBase(const ProducerImpl& producer);
virtual ~BatchMessageContainerBase() {}
/**
* Get number of batches in the batch message container
*
* @return number of batches
*/
virtual size_t getNumBatches() const = 0;
/**
* Check the message will be the 1st message to be added to the batch
*
* This method is used to check if the reversed spot should be released. Because we won't released the
* reserved spot for 1st message. The released spot is to contain the whole batched message.
*
* @param msg the message to be added to the batch
* @return true if `msg` is the 1st message to be added to the batch
*/
virtual bool isFirstMessageToAdd(const Message& msg) const = 0;
/**
* Add a message to the batch message container
*
* @param msg message will add to the batch message container
* @param callback message send callback
* @return true if the batch is full, otherwise false
*/
virtual bool add(const Message& msg, const SendCallback& callback) = 0;
/**
* Clear the batch message container
*/
virtual void clear() = 0;
/**
* Create a OpSendMsg object to send
*
* @param opSendMsg the OpSendMsg object to create
* @param flushCallback the callback to trigger after the OpSendMsg was completed
* @return ResultOk if create successfully
* @note OpSendMsg's sendCallback_ must be set even if it failed
*/
virtual Result createOpSendMsg(OpSendMsg& opSendMsg,
const FlushCallback& flushCallback = nullptr) const = 0;
/**
* Create a OpSendMsg list to send
*
* @param opSendMsgList the OpSendMsg list to create
* @param flushCallback the callback to trigger after the OpSendMsg was completed
* @return all create results of `opSendMsgs`, ResultOk means create successfully
* @note OpSendMsg's sendCallback_ must be set even if it failed
*/
virtual std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
const FlushCallback& flushCallback = nullptr) const = 0;
/**
* Serialize into a std::ostream for logging
*
* @param os the std::ostream to serialize current batch container
*/
virtual void serialize(std::ostream& os) const = 0;
bool hasEnoughSpace(const Message& msg) const noexcept;
bool isEmpty() const noexcept;
protected:
// references to ProducerImpl's fields
const std::string& topicName_;
const ProducerConfiguration& producerConfig_;
const std::string& producerName_;
const uint64_t& producerId_;
const std::weak_ptr<MessageCrypto> msgCryptoWeakPtr_;
unsigned int getMaxNumMessages() const noexcept { return producerConfig_.getBatchingMaxMessages(); }
unsigned long getMaxSizeInBytes() const noexcept {
return producerConfig_.getBatchingMaxAllowedSizeInBytes();
}
unsigned int numMessages_ = 0;
unsigned long sizeInBytes_ = 0;
bool isFull() const noexcept;
void updateStats(const Message& msg);
void resetStats();
Result createOpSendMsgHelper(OpSendMsg& opSendMsg, const FlushCallback& flushCallback,
const MessageAndCallbackBatch& batch) const;
};
inline bool BatchMessageContainerBase::hasEnoughSpace(const Message& msg) const noexcept {
return (numMessages_ < getMaxNumMessages()) && (sizeInBytes_ + msg.getLength() <= getMaxSizeInBytes());
}
inline bool BatchMessageContainerBase::isFull() const noexcept {
return (numMessages_ >= getMaxNumMessages()) || (sizeInBytes_ >= getMaxSizeInBytes());
}
inline bool BatchMessageContainerBase::isEmpty() const noexcept { return numMessages_ == 0; }
inline void BatchMessageContainerBase::updateStats(const Message& msg) {
numMessages_++;
sizeInBytes_ += msg.getLength();
}
inline void BatchMessageContainerBase::resetStats() {
numMessages_ = 0;
sizeInBytes_ = 0;
}
inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainerBase& container) {
container.serialize(os);
return os;
}
} // namespace pulsar
#endif // LIB_BATCHMESSAGECONTAINERBASE_H_