blob: ed42c07cb85653d26f765421a31157d9eff649a6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef LIB_BATCHMESSAGECONTAINERBASE_H_
#define LIB_BATCHMESSAGECONTAINERBASE_H_
#include <pulsar/Message.h>
#include <pulsar/Producer.h>
#include <pulsar/ProducerConfiguration.h>
#include <pulsar/Result.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <stdexcept>
#include <vector>
namespace pulsar {
class MessageCrypto;
class ProducerImpl;
class SharedBuffer;
struct OpSendMsg;
class MessageAndCallbackBatch;
namespace proto {
class MessageMetadata;
} // namespace proto
class BatchMessageContainerBase : public boost::noncopyable {
public:
BatchMessageContainerBase(const ProducerImpl& producer);
virtual ~BatchMessageContainerBase();
virtual bool hasMultiOpSendMsgs() const = 0;
/**
* Check the message will be the 1st message to be added to the batch
*
* This method is used to check if the reversed spot should be released. Because we won't released the
* reserved spot for 1st message. The released spot is to contain the whole batched message.
*
* @param msg the message to be added to the batch
* @return true if `msg` is the 1st message to be added to the batch
*/
virtual bool isFirstMessageToAdd(const Message& msg) const = 0;
/**
* Add a message to the batch message container
*
* @param msg message will add to the batch message container
* @param callback message send callback
* @return true if the batch is full, otherwise false
*/
virtual bool add(const Message& msg, const SendCallback& callback) = 0;
virtual std::unique_ptr<OpSendMsg> createOpSendMsg(const FlushCallback& flushCallback = nullptr) {
throw std::runtime_error("createOpSendMsg is not supported");
}
virtual std::vector<std::unique_ptr<OpSendMsg>> createOpSendMsgs(
const FlushCallback& flushCallback = nullptr) {
throw std::runtime_error("createOpSendMsgs is not supported");
}
/**
* Serialize into a std::ostream for logging
*
* @param os the std::ostream to serialize current batch container
*/
virtual void serialize(std::ostream& os) const = 0;
bool hasEnoughSpace(const Message& msg) const noexcept;
bool isEmpty() const noexcept;
protected:
// references to ProducerImpl's fields
const std::string topicName_;
const ProducerConfiguration& producerConfig_;
const std::string& producerName_;
const uint64_t& producerId_;
const std::weak_ptr<MessageCrypto> msgCryptoWeakPtr_;
unsigned int getMaxNumMessages() const noexcept { return producerConfig_.getBatchingMaxMessages(); }
unsigned long getMaxSizeInBytes() const noexcept {
return producerConfig_.getBatchingMaxAllowedSizeInBytes();
}
unsigned int numMessages_ = 0;
unsigned long sizeInBytes_ = 0;
bool isFull() const noexcept;
void updateStats(const Message& msg);
void resetStats();
std::unique_ptr<OpSendMsg> createOpSendMsgHelper(MessageAndCallbackBatch& flushCallback) const;
virtual void clear() = 0;
};
inline bool BatchMessageContainerBase::hasEnoughSpace(const Message& msg) const noexcept {
return (numMessages_ < getMaxNumMessages()) && (sizeInBytes_ + msg.getLength() <= getMaxSizeInBytes());
}
inline bool BatchMessageContainerBase::isFull() const noexcept {
return (numMessages_ >= getMaxNumMessages()) || (sizeInBytes_ >= getMaxSizeInBytes());
}
inline bool BatchMessageContainerBase::isEmpty() const noexcept { return numMessages_ == 0; }
inline void BatchMessageContainerBase::updateStats(const Message& msg) {
numMessages_++;
sizeInBytes_ += msg.getLength();
}
inline void BatchMessageContainerBase::resetStats() {
numMessages_ = 0;
sizeInBytes_ = 0;
}
inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainerBase& container) {
container.serialize(os);
return os;
}
} // namespace pulsar
#endif // LIB_BATCHMESSAGECONTAINERBASE_H_