blob: e2346ef0de02ff837e2fd29b00ccbe23253e60ec [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef LIB_PRODUCERIMPL_H_
#define LIB_PRODUCERIMPL_H_
#include <mutex>
#include <boost/date_time/posix_time/ptime.hpp>
#include "ClientImpl.h"
#include "BlockingQueue.h"
#include "HandlerBase.h"
#include "SharedBuffer.h"
#include "CompressionCodec.h"
#include "MessageCrypto.h"
#include "stats/ProducerStatsDisabled.h"
#include "stats/ProducerStatsImpl.h"
#include "PulsarApi.pb.h"
#include "OpSendMsg.h"
#include "BatchMessageContainerBase.h"
#include "PendingFailures.h"
using namespace pulsar;
namespace pulsar {
typedef bool bool_type;
typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
class PulsarFriend;
class Producer;
class ProducerImpl : public HandlerBase,
public std::enable_shared_from_this<ProducerImpl>,
public ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const std::string& topic,
const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
~ProducerImpl();
virtual const std::string& getTopic() const;
virtual void sendAsync(const Message& msg, SendCallback callback);
virtual void closeAsync(CloseCallback callback);
virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture();
bool removeCorruptMessage(uint64_t sequenceId);
bool ackReceived(uint64_t sequenceId, MessageId& messageId);
virtual void disconnectProducer();
const std::string& getProducerName() const;
int64_t getLastSequenceId() const;
const std::string& getSchemaVersion() const;
uint64_t getProducerId() const;
int32_t partition() const noexcept { return partition_; }
virtual void start();
virtual void shutdown();
virtual bool isClosed();
virtual void triggerFlush();
virtual void flushAsync(FlushCallback callback);
protected:
ProducerStatsBasePtr producerStatsBasePtr_;
typedef BlockingQueue<OpSendMsg> MessageQueue;
void setMessageMetadata(const Message& msg, const uint64_t& sequenceId, const uint32_t& uncompressedSize);
void sendMessage(const OpSendMsg& opSendMsg);
void batchMessageTimeoutHandler(const boost::system::error_code& ec);
friend class PulsarFriend;
friend class Producer;
friend class BatchMessageContainerBase;
friend class BatchMessageContainer;
virtual void connectionOpened(const ClientConnectionPtr& connection);
virtual void connectionFailed(Result result);
virtual HandlerBaseWeakPtr get_weak_from_this() { return shared_from_this(); }
const std::string& getName() const;
private:
void printStats();
void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData);
void statsCallBackHandler(Result, const MessageId&, SendCallback, boost::posix_time::ptime);
void handleClose(Result result, ResultCallback callback, ProducerImplPtr producer);
void resendMessages(ClientConnectionPtr cnx);
void refreshEncryptionKey(const boost::system::error_code& ec);
bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
SharedBuffer& encryptedPayload);
void cancelTimers();
typedef std::unique_lock<std::mutex> Lock;
ProducerConfiguration conf_;
ExecutorServicePtr executor_;
MessageQueue pendingMessagesQueue_;
int32_t partition_; // -1 if topic is non-partitioned
std::string producerName_;
bool userProvidedProducerName_;
std::string producerStr_;
uint64_t producerId_;
int64_t msgSequenceGenerator_;
proto::BaseCommand cmd_;
std::unique_ptr<BatchMessageContainerBase> batchMessageContainer_;
DeadlineTimerPtr batchTimer_;
PendingFailures batchMessageAndSend(const FlushCallback& flushCallback = nullptr);
volatile int64_t lastSequenceIdPublished_;
std::string schemaVersion_;
DeadlineTimerPtr sendTimer_;
void handleSendTimeout(const boost::system::error_code& err);
Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
struct PendingCallbacks;
std::shared_ptr<PendingCallbacks> getPendingCallbacksWhenFailed();
std::shared_ptr<PendingCallbacks> getPendingCallbacksWhenFailedWithLock();
void failPendingMessages(Result result);
MessageCryptoPtr msgCrypto_;
DeadlineTimerPtr dataKeyGenTImer_;
uint32_t dataKeyGenIntervalSec_;
};
struct ProducerImplCmp {
bool operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const;
};
} /* namespace pulsar */
#endif /* LIB_PRODUCERIMPL_H_ */