blob: d1fd106518066d7150682d9e569920fb809cbbd7 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PartitionedProducerImpl.h"
#include "LogUtils.h"
#include <boost/bind.hpp>
#include <sstream>
#include "RoundRobinMessageRouter.h"
#include "SinglePartitionMessageRouter.h"
#include "DestinationName.h"
#include "MessageImpl.h"
DECLARE_LOG_OBJECT()
namespace pulsar {
const std::string PartitionedProducerImpl::PARTITION_NAME_SUFFIX = "-partition-";
PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client,
const DestinationNamePtr destinationName,
const unsigned int numPartitions,
const ProducerConfiguration& config):client_(client),
destinationName_(destinationName),
topic_(destinationName_->toString()),
numPartitions_(numPartitions),
conf_(config),
state_(Pending)
{
numProducersCreated_ = 0;
cleanup_ = false;
if(config.getPartitionsRoutingMode() == ProducerConfiguration::RoundRobinDistribution) {
routerPolicy_ = boost::make_shared<RoundRobinMessageRouter>(numPartitions);
} else if (config.getPartitionsRoutingMode() == ProducerConfiguration::UseSinglePartition) {
routerPolicy_ = boost::make_shared<SinglePartitionMessageRouter>(numPartitions);
} else {
routerPolicy_ = config.getMessageRouterPtr();
}
}
PartitionedProducerImpl::~PartitionedProducerImpl() {
}
//override
const std::string& PartitionedProducerImpl::getTopic() const {
return topic_;
}
//override
void PartitionedProducerImpl::start() {
boost::shared_ptr<ProducerImpl> producer;
// create producer per partition
for (unsigned int i = 0; i < numPartitions_; i++) {
std::string topicPartitionName = destinationName_->getTopicPartitionName(i);
producer = boost::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
producer->getProducerCreatedFuture().addListener(boost::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
shared_from_this(), _1, _2, i));
producers_.push_back(producer);
LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
}
for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
}
}
void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result,
ProducerImplBaseWeakPtr producerWeakPtr,
unsigned int partitionIndex) {
// to indicate, we are doing cleanup using closeAsync after producer create
// has failed and the invocation of closeAsync is not from client
CloseCallback closeCallback = NULL;
Lock lock(mutex_);
if (state_ == Failed) {
// Ignore, we have already informed client that producer creation failed
return;
}
assert(numProducersCreated_ <= numPartitions_);
if (result != ResultOk) {
state_ = Failed;
lock.unlock();
closeAsync(closeCallback);
partitionedProducerCreatedPromise_.setFailed(result);
LOG_DEBUG("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
return;
}
assert(partitionIndex <= numPartitions_);
numProducersCreated_++;
if(numProducersCreated_ == numPartitions_) {
lock.unlock();
partitionedProducerCreatedPromise_.setValue(shared_from_this());
}
}
//override
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
//get partition for this message from router policy
short partition = (short)(routerPolicy_->getPartition(msg));
if (partition >= numPartitions_ || partition >= producers_.size()) {
LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
//change me: abort or notify failure in callback?
// change to appropriate error if callback
callback(ResultUnknownError, msg);
return;
}
//find a producer for that partition, index should start from 0
ProducerImplPtr& producer = producers_[partition];
msg.impl_->messageId.partition_ = partition;
//send message on that partition
producer->sendAsync(msg, callback);
}
//override
void PartitionedProducerImpl::shutdown() {
setState(Closed);
}
void PartitionedProducerImpl::setState(const PartitionedProducerState state) {
Lock lock(mutex_);
state_ = state;
lock.unlock();
}
/*
* if createProducerCallback is set, it means the closeAsync is called from CreateProducer API which failed to create
* one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure
*/
void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
int producerIndex = 0;
unsigned int producerAlreadyClosed = 0;
for (ProducerList::const_iterator i = producers_.begin(); i != producers_.end(); i++) {
ProducerImplPtr prod = *i;
if(!prod->isClosed()) {
prod->closeAsync(boost::bind(&PartitionedProducerImpl::handleSinglePartitionProducerClose,
shared_from_this(), _1, producerIndex, closeCallback));
} else {
producerAlreadyClosed++;
}
}
/*
* No need to set state since:-
* a. If closeAsync before creation then state == Closed, since producers_.size() = producerAlreadyClosed = 0
* b. If closeAsync called after all sub partitioned producer connected - handleSinglePartitionProducerClose handles the closing
* c. If closeAsync called due to failure in creating just one sub producer then state is set by handleSinglePartitionProducerCreated
*/
if (producerAlreadyClosed == producers_.size() && closeCallback) {
setState(Closed);
closeCallback(ResultOk);
}
}
void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
const unsigned int partitionIndex,
CloseCallback callback) {
Lock lock(mutex_);
if (state_ == Failed) {
// we should have already notified the client by callback
return;
}
if (result != ResultOk) {
state_ = Failed;
lock.unlock();
LOG_ERROR("Closing the producer failed for partition - " << partitionIndex);
if (callback) {
callback(result);
}
return;
}
assert (partitionIndex < numPartitions_);
if(numProducersCreated_ > 0) {
numProducersCreated_--;
}
// closed all successfully
if(!numProducersCreated_) {
state_ = Closed;
lock.unlock();
// set the producerCreatedPromise to failure, if client called
// closeAsync and it's not failure to create producer, the promise
// is set second time here, first time it was successful. So check
// if there's any adverse effect of setting it again. It should not
// be but must check. MUSTCHECK changeme
partitionedProducerCreatedPromise_.setFailed(ResultUnknownError);
if (callback) {
callback(result);
}
return;
}
}
//override
Future<Result, ProducerImplBaseWeakPtr> PartitionedProducerImpl::getProducerCreatedFuture() {
return partitionedProducerCreatedPromise_.getFuture();
}
//override
bool PartitionedProducerImpl::isClosed() {
return state_ == Closed;
}
}