blob: d965db8458b9d0a0486ce34880f951f4c576eceb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "producer.h"
#include <cstddef>
#include <exception>
#include <string>
#include <napi.h>
#include <ClientRPCHook.h>
#include <LoggerConfig.h>
#include <MQException.h>
#include <MQMessage.h>
#include <SendCallback.h>
namespace __node_rocketmq__ {
Napi::Object RocketMQProducer::Init(Napi::Env env, Napi::Object exports) {
Napi::Function func =
DefineClass(env,
"RocketMQProducer",
{
InstanceMethod<&RocketMQProducer::Start>("start"),
InstanceMethod<&RocketMQProducer::Shutdown>("shutdown"),
InstanceMethod<&RocketMQProducer::Send>("send"),
InstanceMethod<&RocketMQProducer::SetSessionCredentials>(
"setSessionCredentials"),
});
Napi::FunctionReference* constructor = new Napi::FunctionReference();
*constructor = Napi::Persistent(func);
env.SetInstanceData<Napi::FunctionReference>(constructor);
exports.Set("Producer", func);
return exports;
}
RocketMQProducer::RocketMQProducer(const Napi::CallbackInfo& info)
: Napi::ObjectWrap<RocketMQProducer>(info), producer_("") {
const Napi::Value group_name = info[0];
if (group_name.IsString()) {
producer_.set_group_name(group_name.ToString());
}
const Napi::Value instance_name = info[1];
if (instance_name.IsString()) {
producer_.set_instance_name(instance_name.ToString());
}
const Napi::Value options = info[2];
if (options.IsObject()) {
// try to set options
SetOptions(options.ToObject());
}
}
RocketMQProducer::~RocketMQProducer() {
producer_.shutdown();
}
void RocketMQProducer::SetOptions(const Napi::Object& options) {
// set name server
Napi::Value name_server = options.Get("nameServer");
if (name_server.IsString()) {
producer_.set_namesrv_addr(name_server.ToString());
}
// set group name
Napi::Value group_name = options.Get("groupName");
if (group_name.IsString()) {
producer_.set_group_name(group_name.ToString());
}
// set max message size
Napi::Value max_message_size = options.Get("maxMessageSize");
if (max_message_size.IsNumber()) {
producer_.set_max_message_size(max_message_size.ToNumber());
}
// set compress level
Napi::Value compress_level = options.Get("compressLevel");
if (compress_level.IsNumber()) {
producer_.set_compress_level(compress_level.ToNumber());
}
// set send message timeout
Napi::Value send_message_timeout = options.Get("sendMessageTimeout");
if (send_message_timeout.IsNumber()) {
producer_.set_send_msg_timeout(send_message_timeout.ToNumber());
}
// set log level
Napi::Value log_level = options.Get("logLevel");
if (log_level.IsNumber()) {
int32_t level = log_level.ToNumber();
if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
rocketmq::GetDefaultLoggerConfig().set_level(
static_cast<rocketmq::LogLevel>(level));
}
}
// set log directory
Napi::Value log_dir = options.Get("logDir");
if (log_dir.IsString()) {
rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
}
// set log file size
Napi::Value log_file_size = options.Get("logFileSize");
if (log_file_size.IsNumber()) {
rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_size.ToNumber());
}
// set log file num
Napi::Value log_file_num = options.Get("logFileNum");
if (log_file_num.IsNumber()) {
rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
}
}
Napi::Value RocketMQProducer::SetSessionCredentials(
const Napi::CallbackInfo& info) {
Napi::String access_key = info[0].As<Napi::String>();
Napi::String secret_key = info[1].As<Napi::String>();
Napi::String ons_channel = info[2].As<Napi::String>();
auto rpc_hook = std::make_shared<rocketmq::ClientRPCHook>(
rocketmq::SessionCredentials(access_key, secret_key, ons_channel));
producer_.setRPCHook(rpc_hook);
return info.Env().Undefined();
}
class ProducerStartWorker : public Napi::AsyncWorker {
public:
ProducerStartWorker(const Napi::Function& callback,
const rocketmq::DefaultMQProducer& producer)
: Napi::AsyncWorker(callback), producer_(producer) {}
void Execute() override { producer_.start(); }
private:
rocketmq::DefaultMQProducer producer_;
};
Napi::Value RocketMQProducer::Start(const Napi::CallbackInfo& info) {
Napi::Function callback = info[0].As<Napi::Function>();
auto* worker = new ProducerStartWorker(callback, producer_);
worker->Queue();
return info.Env().Undefined();
}
class ProducerShutdownWorker : public Napi::AsyncWorker {
public:
ProducerShutdownWorker(const Napi::Function& callback,
const rocketmq::DefaultMQProducer& producer)
: Napi::AsyncWorker(callback), producer_(producer) {}
void Execute() override { producer_.shutdown(); }
private:
rocketmq::DefaultMQProducer producer_;
};
Napi::Value RocketMQProducer::Shutdown(const Napi::CallbackInfo& info) {
Napi::Function callback = info[0].As<Napi::Function>();
auto* worker = new ProducerShutdownWorker(callback, producer_);
worker->Queue();
return info.Env().Undefined();
}
struct ResultOrException {
std::unique_ptr<rocketmq::SendResult> result;
std::exception_ptr exception;
};
void CallProducerSendJsCallback(Napi::Env env,
Napi::Function callback,
std::nullptr_t*,
ResultOrException* data) {
std::unique_ptr<ResultOrException> data_guard(data);
if (env != nullptr) {
if (callback != nullptr) {
if (data->exception) {
try {
std::rethrow_exception(data->exception);
} catch (const std::exception& e) {
callback.Call(Napi::Object::New(callback.Env()),
{Napi::Error::New(env, e.what()).Value()});
}
} else {
callback.Call(Napi::Object::New(callback.Env()),
{env.Undefined(),
Napi::Number::New(env, data->result->send_status()),
Napi::String::New(env, data->result->msg_id()),
Napi::Number::New(env, data->result->queue_offset())});
}
}
}
}
class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
public:
ProducerSendCallback(Napi::Env&& env, Napi::Function&& callback)
: callback_(
Callback::New(env, callback, "RocketMQ Send Callback", 0, 1)) {}
~ProducerSendCallback() { callback_.Release(); }
void onSuccess(rocketmq::SendResult& send_result) override {
auto* data =
new ResultOrException{std::unique_ptr<rocketmq::SendResult>(
new rocketmq::SendResult(send_result)),
nullptr};
napi_status status = callback_.BlockingCall(data);
if (status != napi_ok) {
// TODO: Handle error
std::exit(-1);
}
}
void onException(rocketmq::MQException& exception) noexcept override {
auto* data =
new ResultOrException{nullptr, std::make_exception_ptr(exception)};
napi_status status = callback_.BlockingCall(data);
if (status != napi_ok) {
// TODO: Handle error
std::exit(-1);
}
}
private:
using Callback = Napi::TypedThreadSafeFunction<std::nullptr_t,
ResultOrException,
&CallProducerSendJsCallback>;
Callback callback_;
};
Napi::Value RocketMQProducer::Send(const Napi::CallbackInfo& info) {
rocketmq::MQMessage message = [&]() {
Napi::String topic = info[0].As<Napi::String>();
Napi::Value body = info[1];
if (body.IsString()) {
return rocketmq::MQMessage(topic, body.ToString());
} else {
Napi::Buffer<char> buffer = body.As<Napi::Buffer<char>>();
return rocketmq::MQMessage(topic,
std::string(buffer.Data(), buffer.Length()));
}
}();
const Napi::Value options_v = info[2];
if (options_v.IsObject()) {
const Napi::Object options = options_v.ToObject();
Napi::Value tags = options.Get("tags");
if (tags.IsString()) {
message.set_tags(tags.ToString());
}
Napi::Value keys = options.Get("keys");
if (keys.IsString()) {
message.set_keys(keys.ToString());
}
}
auto* send_callback =
new ProducerSendCallback(info.Env(), info[3].As<Napi::Function>());
producer_.send(message, send_callback);
return info.Env().Undefined();
}
} // namespace __node_rocketmq__