blob: 328da8984f228da0ba4f5f162df03acede03bbbe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "Consumer.h"
#include "ConsumerConfig.h"
#include "Message.h"
#include "MessageId.h"
#include "ThreadSafeDeferred.h"
#include "LogUtils.h"
#include <pulsar/c/result.h>
#include <atomic>
#include <thread>
#include <future>
#include <sstream>
Napi::FunctionReference Consumer::constructor;
void Consumer::Init(Napi::Env env, Napi::Object exports) {
Napi::HandleScope scope(env);
Napi::Function func =
DefineClass(env, "Consumer",
{
InstanceMethod("receive", &Consumer::Receive),
InstanceMethod("acknowledge", &Consumer::Acknowledge),
InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
InstanceMethod("negativeAcknowledge", &Consumer::NegativeAcknowledge),
InstanceMethod("negativeAcknowledgeId", &Consumer::NegativeAcknowledgeId),
InstanceMethod("acknowledgeCumulative", &Consumer::AcknowledgeCumulative),
InstanceMethod("acknowledgeCumulativeId", &Consumer::AcknowledgeCumulativeId),
InstanceMethod("seek", &Consumer::Seek),
InstanceMethod("seekTimestamp", &Consumer::SeekTimestamp),
InstanceMethod("isConnected", &Consumer::IsConnected),
InstanceMethod("close", &Consumer::Close),
InstanceMethod("unsubscribe", &Consumer::Unsubscribe),
});
constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
}
struct MessageListenerProxyData {
std::shared_ptr<pulsar_message_t> cMessage;
Consumer *consumer;
std::function<void(void)> callback;
MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Consumer *consumer,
std::function<void(void)> callback)
: cMessage(cMessage), consumer(consumer), callback(callback) {}
};
inline void logMessageListenerError(Consumer *consumer, const char *err) {
std::ostringstream ss;
ss << "[" << consumer->GetTopic() << "][" << consumer->GetSubscriptionName()
<< "] Message listener error in processing message: " << err;
LOG_ERROR(ss.str().c_str());
}
void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
Napi::Object msg = Message::NewInstance({}, data->cMessage);
Consumer *consumer = data->consumer;
// `consumer` might be null in certain cases, segmentation fault might happend without this null check. We
// need to handle this rare case in future.
if (consumer) {
Napi::Value ret;
try {
ret = jsCallback.Call({msg, consumer->Value()});
} catch (std::exception &exception) {
logMessageListenerError(consumer, exception.what());
}
if (ret.IsPromise()) {
Napi::Promise promise = ret.As<Napi::Promise>();
Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>();
ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) {
Napi::Error error = info[0].As<Napi::Error>();
logMessageListenerError(consumer, error.what());
})});
promise = ret.As<Napi::Promise>();
Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>();
finallyFunc.Call(
promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })});
return;
}
}
data->callback();
}
void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessage, void *ctx) {
std::shared_ptr<pulsar_message_t> cMessage(rawMessage, pulsar_message_free);
MessageListenerCallback *listenerCallback = (MessageListenerCallback *)ctx;
Consumer *consumer = (Consumer *)listenerCallback->consumer;
if (listenerCallback->callback.Acquire() != napi_ok) {
return;
}
std::promise<void> promise;
std::future<void> future = promise.get_future();
std::unique_ptr<MessageListenerProxyData> dataPtr(
new MessageListenerProxyData(cMessage, consumer, [&promise]() { promise.set_value(); }));
listenerCallback->callback.BlockingCall(dataPtr.get(), MessageListenerProxy);
listenerCallback->callback.Release();
future.wait();
}
void Consumer::SetCConsumer(std::shared_ptr<pulsar_consumer_t> cConsumer) { this->cConsumer = cConsumer; }
void Consumer::SetListenerCallback(MessageListenerCallback *listener) {
if (this->listener != nullptr) {
// It is only safe to set the listener once for the lifecycle of the Consumer
return;
}
if (listener != nullptr) {
listener->consumer = this;
// If a consumer listener is set, the Consumer instance is kept alive even if it goes out of scope in JS
// code.
this->Ref();
this->listener = listener;
}
}
Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info), listener(nullptr) {}
struct ConsumerNewInstanceContext {
ConsumerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred,
std::shared_ptr<pulsar_client_t> cClient,
std::shared_ptr<ConsumerConfig> consumerConfig)
: deferred(deferred), cClient(cClient), consumerConfig(consumerConfig){};
std::shared_ptr<ThreadSafeDeferred> deferred;
std::shared_ptr<pulsar_client_t> cClient;
std::shared_ptr<ConsumerConfig> consumerConfig;
static void subscribeCallback(pulsar_result result, pulsar_consumer_t *rawConsumer, void *ctx) {
auto instanceContext = static_cast<ConsumerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
auto cClient = instanceContext->cClient;
auto consumerConfig = instanceContext->consumerConfig;
delete instanceContext;
if (result != pulsar_result_Ok) {
return deferred->Reject(std::string("Failed to create consumer: ") + pulsar_result_str(result));
}
auto cConsumer = std::shared_ptr<pulsar_consumer_t>(rawConsumer, pulsar_consumer_free);
auto listener = consumerConfig->GetListenerCallback();
if (listener) {
// pause, will resume in OnOK, to prevent MessageListener get a nullptr of consumer
pulsar_consumer_pause_message_listener(cConsumer.get());
}
deferred->Resolve([cConsumer, consumerConfig, listener](const Napi::Env env) {
Napi::Object obj = Consumer::constructor.New({});
Consumer *consumer = Consumer::Unwrap(obj);
consumer->SetCConsumer(cConsumer);
consumer->SetListenerCallback(listener);
if (listener) {
// resume to enable MessageListener function callback
resume_message_listener(cConsumer.get());
}
return obj;
});
}
};
Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>(config, &MessageListener);
const std::string &topic = consumerConfig->GetTopic();
const std::vector<std::string> &topics = consumerConfig->GetTopics();
const std::string &topicsPattern = consumerConfig->GetTopicsPattern();
if (topic.empty() && topics.size() == 0 && topicsPattern.empty()) {
deferred->Reject(
std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
"creating consumer"));
return deferred->Promise();
}
const std::string &subscription = consumerConfig->GetSubscription();
if (subscription.empty()) {
deferred->Reject(
std::string("Subscription is required and must be specified as a string when creating consumer"));
return deferred->Promise();
}
int32_t ackTimeoutMs = consumerConfig->GetAckTimeoutMs();
if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
std::string msg("Ack timeout should be 0 or greater than or equal to " +
std::to_string(MIN_ACK_TIMEOUT_MILLIS));
deferred->Reject(msg);
return deferred->Promise();
}
int32_t nAckRedeliverTimeoutMs = consumerConfig->GetNAckRedeliverTimeoutMs();
if (nAckRedeliverTimeoutMs < 0) {
std::string msg("NAck timeout should be greater than or equal to zero");
deferred->Reject(msg);
return deferred->Promise();
}
auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);
if (!topicsPattern.empty()) {
pulsar_client_subscribe_pattern_async(cClient.get(), topicsPattern.c_str(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
} else if (topics.size() > 0) {
const char **cTopics = new const char *[topics.size()];
for (size_t i = 0; i < topics.size(); i++) {
cTopics[i] = topics[i].c_str();
}
pulsar_client_subscribe_multi_topics_async(cClient.get(), cTopics, topics.size(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
delete[] cTopics;
} else {
pulsar_client_subscribe_async(cClient.get(), topic.c_str(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
}
return deferred->Promise();
}
std::string Consumer::GetTopic() { return {pulsar_consumer_get_topic(this->cConsumer.get())}; }
std::string Consumer::GetSubscriptionName() {
return {pulsar_consumer_get_subscription_name(this->cConsumer.get())};
}
// We still need a receive worker because the c api is missing the equivalent async definition
class ConsumerReceiveWorker : public Napi::AsyncWorker {
public:
ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, std::shared_ptr<pulsar_consumer_t> cConsumer,
int64_t timeout = -1)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
cConsumer(cConsumer),
timeout(timeout) {}
~ConsumerReceiveWorker() {}
void Execute() {
pulsar_result result;
pulsar_message_t *rawMessage;
if (timeout > 0) {
result = pulsar_consumer_receive_with_timeout(this->cConsumer.get(), &rawMessage, timeout);
} else {
result = pulsar_consumer_receive(this->cConsumer.get(), &rawMessage);
}
if (result != pulsar_result_Ok) {
SetError(std::string("Failed to receive message: ") + pulsar_result_str(result));
} else {
this->cMessage = std::shared_ptr<pulsar_message_t>(rawMessage, pulsar_message_free);
}
}
void OnOK() {
Napi::Object obj = Message::NewInstance({}, this->cMessage);
this->deferred.Resolve(obj);
}
void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
private:
Napi::Promise::Deferred deferred;
std::shared_ptr<pulsar_consumer_t> cConsumer;
std::shared_ptr<pulsar_message_t> cMessage;
int64_t timeout;
};
Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
if (info[0].IsUndefined()) {
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_receive_async(
this->cConsumer.get(),
[](pulsar_result result, pulsar_message_t *rawMessage, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to receive message: ") + pulsar_result_str(result));
} else {
deferred->Resolve([rawMessage](const Napi::Env env) {
Napi::Object obj = Message::NewInstance(
{}, std::shared_ptr<pulsar_message_t>(rawMessage, pulsar_message_free));
return obj;
});
}
},
ctx);
return deferred->Promise();
} else {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value());
wk->Queue();
return deferred.Promise();
}
}
Napi::Value Consumer::Acknowledge(const Napi::CallbackInfo &info) {
auto obj = info[0].As<Napi::Object>();
auto msg = Message::Unwrap(obj);
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_acknowledge_async(
this->cConsumer.get(), msg->GetCMessage().get(),
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to acknowledge: ") + pulsar_result_str(result));
} else {
deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
}
},
ctx);
return deferred->Promise();
}
Napi::Value Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
auto obj = info[0].As<Napi::Object>();
auto *msgId = MessageId::Unwrap(obj);
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_acknowledge_async_id(
this->cConsumer.get(), msgId->GetCMessageId().get(),
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to acknowledge id: ") + pulsar_result_str(result));
} else {
deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
}
},
ctx);
return deferred->Promise();
}
void Consumer::NegativeAcknowledge(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
Message *msg = Message::Unwrap(obj);
std::shared_ptr<pulsar_message_t> cMessage = msg->GetCMessage();
pulsar_consumer_negative_acknowledge(this->cConsumer.get(), cMessage.get());
}
void Consumer::NegativeAcknowledgeId(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
MessageId *msgId = MessageId::Unwrap(obj);
std::shared_ptr<pulsar_message_id_t> cMessageId = msgId->GetCMessageId();
pulsar_consumer_negative_acknowledge_id(this->cConsumer.get(), cMessageId.get());
}
Napi::Value Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
auto obj = info[0].As<Napi::Object>();
auto *msg = Message::Unwrap(obj);
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_acknowledge_cumulative_async(
this->cConsumer.get(), msg->GetCMessage().get(),
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to acknowledge cumulatively: ") + pulsar_result_str(result));
} else {
deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
}
},
ctx);
return deferred->Promise();
}
Napi::Value Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
auto obj = info[0].As<Napi::Object>();
auto *msgId = MessageId::Unwrap(obj);
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_acknowledge_cumulative_async_id(
this->cConsumer.get(), msgId->GetCMessageId().get(),
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to acknowledge cumulatively by id: ") +
pulsar_result_str(result));
} else {
deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
}
},
ctx);
return deferred->Promise();
}
Napi::Value Consumer::Seek(const Napi::CallbackInfo &info) {
auto obj = info[0].As<Napi::Object>();
auto *msgId = MessageId::Unwrap(obj);
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_seek_async(
this->cConsumer.get(), msgId->GetCMessageId().get(),
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to seek message by id: ") + pulsar_result_str(result));
} else {
deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
}
},
ctx);
return deferred->Promise();
}
Napi::Value Consumer::SeekTimestamp(const Napi::CallbackInfo &info) {
Napi::Number timestamp = info[0].As<Napi::Object>().ToNumber();
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_seek_by_timestamp_async(
this->cConsumer.get(), timestamp.Int64Value(),
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to seek message by timestamp: ") + pulsar_result_str(result));
} else {
deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
}
},
ctx);
return deferred->Promise();
}
Napi::Value Consumer::IsConnected(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
return Napi::Boolean::New(env, pulsar_consumer_is_connected(this->cConsumer.get()));
}
void Consumer::Cleanup() {
if (this->listener != nullptr) {
pulsar_consumer_pause_message_listener(this->cConsumer.get());
this->listener->callback.Release();
this->listener = nullptr;
this->Unref();
}
}
Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
this->Cleanup();
pulsar_consumer_close_async(
this->cConsumer.get(),
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to close consumer: ") + pulsar_result_str(result));
} else {
deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
}
},
ctx);
return deferred->Promise();
}
Napi::Value Consumer::Unsubscribe(const Napi::CallbackInfo &info) {
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_pause_message_listener(this->cConsumer.get());
pulsar_consumer_unsubscribe_async(
this->cConsumer.get(),
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to unsubscribe consumer: ") + pulsar_result_str(result));
} else {
deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
}
},
ctx);
return deferred->Promise();
}
Consumer::~Consumer() { this->Cleanup(); }