blob: 3e207a7974c6447c063222eeb0320fb09e1d00c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "push_consumer.h"
#include <exception>
#include <future>
#include <napi.h>
#include <ClientRPCHook.h>
#include <LoggerConfig.h>
#include <MQMessageListener.h>
#include "consumer_ack.h"
using namespace std;
namespace __node_rocketmq__ {
Napi::Object RocketMQPushConsumer::Init(Napi::Env env, Napi::Object exports) {
Napi::Function func = DefineClass(
env,
"RocketMQPushConsumer",
{
InstanceMethod<&RocketMQPushConsumer::Start>("start"),
InstanceMethod<&RocketMQPushConsumer::Shutdown>("shutdown"),
InstanceMethod<&RocketMQPushConsumer::Subscribe>("subscribe"),
InstanceMethod<&RocketMQPushConsumer::SetListener>("setListener"),
InstanceMethod<&RocketMQPushConsumer::SetSessionCredentials>(
"setSessionCredentials"),
});
Napi::FunctionReference* constructor = new Napi::FunctionReference();
*constructor = Napi::Persistent(func);
env.SetInstanceData<Napi::FunctionReference>(constructor);
exports.Set("PushConsumer", func);
return exports;
}
RocketMQPushConsumer::RocketMQPushConsumer(const Napi::CallbackInfo& info)
: Napi::ObjectWrap<RocketMQPushConsumer>(info), consumer_("") {
const Napi::Value group_name = info[0];
if (group_name.IsString()) {
consumer_.set_group_name(group_name.ToString());
}
const Napi::Value instance_name = info[1];
if (instance_name.IsString()) {
consumer_.set_instance_name(instance_name.ToString());
}
const Napi::Value options = info[2];
if (options.IsObject()) {
// try to set options
SetOptions(options.ToObject());
}
}
RocketMQPushConsumer::~RocketMQPushConsumer() {
consumer_.shutdown();
}
void RocketMQPushConsumer::SetOptions(const Napi::Object& options) {
// set name server
Napi::Value name_server = options.Get("nameServer");
if (name_server.IsString()) {
consumer_.set_namesrv_addr(name_server.ToString());
}
// set group name
Napi::Value group_name = options.Get("groupName");
if (group_name.IsString()) {
consumer_.set_group_name(group_name.ToString());
}
// set thread count
Napi::Value thread_count = options.Get("threadCount");
if (thread_count.IsNumber()) {
consumer_.set_consume_thread_nums(thread_count.ToNumber());
}
// set message batch max size
Napi::Value max_batch_size = options.Get("maxBatchSize");
if (max_batch_size.IsNumber()) {
consumer_.set_consume_message_batch_max_size(max_batch_size.ToNumber());
}
// set log level
Napi::Value log_level = options.Get("logLevel");
if (log_level.IsNumber()) {
int32_t level = log_level.ToNumber();
if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
rocketmq::GetDefaultLoggerConfig().set_level(
static_cast<rocketmq::LogLevel>(level));
}
}
// set log directory
Napi::Value log_dir = options.Get("logDir");
if (log_dir.IsString()) {
rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
}
// set log file size
Napi::Value log_file_size = options.Get("logFileSize");
if (log_file_size.IsNumber()) {
rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_size.ToNumber());
}
// set log file num
Napi::Value log_file_num = options.Get("logFileNum");
if (log_file_num.IsNumber()) {
rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
}
}
Napi::Value RocketMQPushConsumer::SetSessionCredentials(
const Napi::CallbackInfo& info) {
Napi::String access_key = info[0].As<Napi::String>();
Napi::String secret_key = info[1].As<Napi::String>();
Napi::String ons_channel = info[2].As<Napi::String>();
auto rpc_hook = std::make_shared<rocketmq::ClientRPCHook>(
rocketmq::SessionCredentials(access_key, secret_key, ons_channel));
consumer_.setRPCHook(rpc_hook);
return info.Env().Undefined();
}
class ConsumerStartWorker : public Napi::AsyncWorker {
public:
ConsumerStartWorker(const Napi::Function& callback,
const rocketmq::DefaultMQPushConsumer& consumer)
: Napi::AsyncWorker(callback), consumer_(consumer) {}
void Execute() override { consumer_.start(); }
private:
rocketmq::DefaultMQPushConsumer consumer_;
};
Napi::Value RocketMQPushConsumer::Start(const Napi::CallbackInfo& info) {
Napi::Function callback = info[0].As<Napi::Function>();
auto* worker = new ConsumerStartWorker(callback, consumer_);
worker->Queue();
return info.Env().Undefined();
}
class ConsumerShutdownWorker : public Napi::AsyncWorker {
public:
ConsumerShutdownWorker(const Napi::Function& callback,
const rocketmq::DefaultMQPushConsumer& consumer)
: Napi::AsyncWorker(callback), consumer_(consumer) {}
void Execute() override { consumer_.shutdown(); }
private:
rocketmq::DefaultMQPushConsumer consumer_;
};
Napi::Value RocketMQPushConsumer::Shutdown(const Napi::CallbackInfo& info) {
Napi::Function callback = info[0].As<Napi::Function>();
auto* worker = new ConsumerShutdownWorker(callback, consumer_);
worker->Queue();
return info.Env().Undefined();
}
Napi::Value RocketMQPushConsumer::Subscribe(const Napi::CallbackInfo& info) {
Napi::String topic = info[0].As<Napi::String>();
Napi::String expression = info[1].As<Napi::String>();
consumer_.subscribe(topic, expression);
return info.Env().Undefined();
}
struct MessageAndPromise {
rocketmq::MQMessageExt message;
std::promise<bool> promise;
};
void CallConsumerMessageJsListener(Napi::Env env,
Napi::Function listener,
std::nullptr_t*,
MessageAndPromise* data) {
if (env != nullptr) {
if (listener != nullptr) {
Napi::Object message = Napi::Object::New(env);
message.Set("topic", data->message.topic());
message.Set("tags", data->message.tags());
message.Set("keys", data->message.keys());
message.Set("body", data->message.body());
message.Set("msgId", data->message.msg_id());
Napi::Object ack = ConsumerAck::NewInstance(env);
ConsumerAck* consumer_ack = Napi::ObjectWrap<ConsumerAck>::Unwrap(ack);
consumer_ack->SetPromise(std::move(data->promise));
try {
listener.Call(Napi::Object::New(listener.Env()), {message, ack});
} catch (const Napi::Error& e) {
try {
consumer_ack->Done(std::current_exception());
} catch (const std::future_error&) {
// ignore
}
}
return;
}
}
data->promise.set_value(false);
}
class ConsumerMessageListener : public rocketmq::MessageListenerConcurrently {
public:
ConsumerMessageListener(Napi::Env& env, Napi::Function&& callback)
: listener_(
Listener::New(env, callback, "RocketMQ Message Listener", 0, 1)) {}
~ConsumerMessageListener() { listener_.Release(); }
rocketmq::ConsumeStatus consumeMessage(
std::vector<rocketmq::MQMessageExt>& msgs) override {
for (auto& msg : msgs) {
MessageAndPromise data{msg, std::promise<bool>()};
auto future = data.promise.get_future();
listener_.BlockingCall(&data);
try {
if (!future.get()) {
return rocketmq::ConsumeStatus::RECONSUME_LATER;
}
} catch (const std::exception& e) {
return rocketmq::ConsumeStatus::RECONSUME_LATER;
}
}
return rocketmq::ConsumeStatus::CONSUME_SUCCESS;
};
private:
using Listener =
Napi::TypedThreadSafeFunction<std::nullptr_t,
MessageAndPromise,
&CallConsumerMessageJsListener>;
Listener listener_;
};
Napi::Value RocketMQPushConsumer::SetListener(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
listener_.reset(
new ConsumerMessageListener(env, info[0].As<Napi::Function>()));
consumer_.registerMessageListener(listener_.get());
return env.Undefined();
}
} // namespace __node_rocketmq__