blob: c827f9f8fe6dc635f84f819cf51da2b8cf7b43c8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "Producer.h"
#include "ProducerConfig.h"
#include "Message.h"
#include "MessageId.h"
#include "ThreadSafeDeferred.h"
#include <pulsar/c/result.h>
#include <memory>
Napi::FunctionReference Producer::constructor;
void Producer::Init(Napi::Env env, Napi::Object exports) {
Napi::HandleScope scope(env);
Napi::Function func =
DefineClass(env, "Producer",
{InstanceMethod("send", &Producer::Send), InstanceMethod("flush", &Producer::Flush),
InstanceMethod("close", &Producer::Close),
InstanceMethod("getProducerName", &Producer::GetProducerName),
InstanceMethod("getTopic", &Producer::GetTopic),
InstanceMethod("isConnected", &Producer::IsConnected)});
constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
}
void Producer::SetCProducer(std::shared_ptr<pulsar_producer_t> cProducer) { this->cProducer = cProducer; }
struct ProducerNewInstanceContext {
ProducerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred,
std::shared_ptr<pulsar_client_t> cClient,
std::shared_ptr<ProducerConfig> producerConfig)
: deferred(deferred), cClient(cClient), producerConfig(producerConfig){};
std::shared_ptr<ThreadSafeDeferred> deferred;
std::shared_ptr<pulsar_client_t> cClient;
std::shared_ptr<ProducerConfig> producerConfig;
};
Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
auto producerConfig = std::make_shared<ProducerConfig>(config);
const std::string &topic = producerConfig->GetTopic();
if (topic.empty()) {
deferred->Reject(
std::string("Topic is required and must be specified as a string when creating producer"));
return deferred->Promise();
}
auto ctx = new ProducerNewInstanceContext(deferred, cClient, producerConfig);
pulsar_client_create_producer_async(
cClient.get(), topic.c_str(), producerConfig->GetCProducerConfig().get(),
[](pulsar_result result, pulsar_producer_t *rawProducer, void *ctx) {
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
auto cClient = instanceContext->cClient;
delete instanceContext;
if (result != pulsar_result_Ok) {
return deferred->Reject(std::string("Failed to create producer: ") + pulsar_result_str(result));
}
std::shared_ptr<pulsar_producer_t> cProducer(rawProducer, pulsar_producer_free);
deferred->Resolve([cProducer](const Napi::Env env) {
Napi::Object obj = Producer::constructor.New({});
Producer *producer = Producer::Unwrap(obj);
producer->SetCProducer(cProducer);
return obj;
});
},
ctx);
return deferred->Promise();
}
Producer::Producer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Producer>(info) {}
struct ProducerSendContext {
ProducerSendContext(std::shared_ptr<ThreadSafeDeferred> deferred,
std::shared_ptr<pulsar_message_t> cMessage)
: deferred(deferred), cMessage(cMessage){};
std::shared_ptr<ThreadSafeDeferred> deferred;
std::shared_ptr<pulsar_message_t> cMessage;
};
Napi::Value Producer::Send(const Napi::CallbackInfo &info) {
auto cMessage = Message::BuildMessage(info[0].As<Napi::Object>());
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ProducerSendContext(deferred, cMessage);
pulsar_producer_send_async(
this->cProducer.get(), cMessage.get(),
[](pulsar_result result, pulsar_message_id_t *msgId, void *ctx) {
auto producerSendContext = static_cast<ProducerSendContext *>(ctx);
auto deferred = producerSendContext->deferred;
auto cMessage = producerSendContext->cMessage;
delete producerSendContext;
std::shared_ptr<pulsar_message_id_t> cMessageId(msgId, pulsar_message_id_free);
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to send message: ") + pulsar_result_str(result));
} else {
deferred->Resolve([cMessageId](const Napi::Env env) { return MessageId::NewInstance(cMessageId); });
}
},
ctx);
return deferred->Promise();
}
Napi::Value Producer::Flush(const Napi::CallbackInfo &info) {
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_producer_flush_async(
this->cProducer.get(),
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to flush producer: ") + pulsar_result_str(result));
} else {
deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
}
},
ctx);
return deferred->Promise();
}
Napi::Value Producer::Close(const Napi::CallbackInfo &info) {
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_producer_close_async(
this->cProducer.get(),
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to close producer: ") + pulsar_result_str(result));
} else {
deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
}
},
ctx);
return deferred->Promise();
}
Napi::Value Producer::GetProducerName(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
return Napi::String::New(env, pulsar_producer_get_producer_name(this->cProducer.get()));
}
Napi::Value Producer::GetTopic(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
return Napi::String::New(env, pulsar_producer_get_topic(this->cProducer.get()));
}
Napi::Value Producer::IsConnected(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
return Napi::Boolean::New(env, pulsar_producer_is_connected(this->cProducer.get()));
}
Producer::~Producer() {}