blob: 3ae404779b0b9dc85bc203d2f7ab8a10d23c1870 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "producer.h"
#include "workers/producer/send_message.h"
#include "workers/producer/start_or_shutdown.h"
#include <MQClientException.h>
#include <string>
using namespace std;
namespace __node_rocketmq__ {
#define NAN_GET_CPRODUCER() \
RocketMQProducer* _v8_producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder()); \
CProducer* producer_ptr = _v8_producer->GetProducer();
Nan::Persistent<Function> RocketMQProducer::constructor;
RocketMQProducer::RocketMQProducer(const char* group_id, const char* instance_name)
{
producer_ptr = CreateProducer(group_id);
if(instance_name)
{
SetProducerInstanceName(producer_ptr, instance_name);
}
}
RocketMQProducer::~RocketMQProducer()
{
try
{
ShutdownProducer(producer_ptr);
}
catch (...)
{
//
}
DestroyProducer(producer_ptr);
}
void RocketMQProducer::SetOptions(Local<Object> options)
{
// set name server
Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
if(_name_server_v->IsString())
{
Nan::Utf8String namesrv(_name_server_v);
SetProducerNameServerAddress(producer_ptr, *namesrv);
}
// set group name
Local<Value> _group_name_v = Nan::Get(options, Nan::New<String>("groupName").ToLocalChecked()).ToLocalChecked();
if(_group_name_v->IsString())
{
Nan::Utf8String group_name(_group_name_v);
SetProducerGroupName(producer_ptr, *group_name);
}
// set log num & single log size
int file_num = 3;
int64 file_size = 104857600;
Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
if(_log_file_num_v->IsNumber())
{
file_num = _log_file_num_v->Int32Value();
}
if(_log_file_size_v->IsNumber())
{
file_size = _log_file_size_v->Int32Value();
}
SetProducerLogFileNumAndSize(producer_ptr, file_num, file_size);
// set log level
Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
if(_log_level_v->IsNumber())
{
int level = _log_level_v->Int32Value();
SetProducerLogLevel(producer_ptr, (CLogLevel)level);
}
// set compress level
Local<Value> _compress_level_v = Nan::Get(options, Nan::New<String>("compressLevel").ToLocalChecked()).ToLocalChecked();
if(_compress_level_v->IsNumber()) {
int level = _compress_level_v->Int32Value();
SetProducerCompressLevel(producer_ptr, level);
}
// set send message timeout
Local<Value> _send_message_timeout_v = Nan::Get(options, Nan::New<String>("sendMessageTimeout").ToLocalChecked()).ToLocalChecked();
if(_send_message_timeout_v->IsNumber())
{
int timeout = _send_message_timeout_v->Int32Value();
SetProducerSendMsgTimeout(producer_ptr, timeout);
}
// set max message size
Local<Value> _max_message_size_v = Nan::Get(options, Nan::New<String>("maxMessageSize").ToLocalChecked()).ToLocalChecked();
if(_max_message_size_v->IsNumber())
{
int size = _max_message_size_v->Int32Value();
SetProducerMaxMessageSize(producer_ptr, size);
}
}
NAN_MODULE_INIT(RocketMQProducer::Init)
{
Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
tpl->SetClassName(Nan::New("RocketMQProducer").ToLocalChecked());
tpl->InstanceTemplate()->SetInternalFieldCount(1);
Nan::SetPrototypeMethod(tpl, "start", Start);
Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
Nan::SetPrototypeMethod(tpl, "send", Send);
Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
constructor.Reset(tpl->GetFunction());
Nan::Set(target, Nan::New("Producer").ToLocalChecked(), tpl->GetFunction());
}
NAN_METHOD(RocketMQProducer::New)
{
Isolate* isolate = info.GetIsolate();
Local<Context> context = Context::New(isolate);
if(!info.IsConstructCall())
{
const int argc = 3;
Local<Value> argv[argc] = { info[0], info[1], info[2] };
Local<Function> _constructor = Nan::New<v8::Function>(constructor);
info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
return;
}
Nan::Utf8String group_id(info[0]);
Nan::Utf8String instance_name(info[1]);
Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
RocketMQProducer* producer = new RocketMQProducer(*group_id, info[1]->IsNull() ? NULL : *instance_name);
producer->Wrap(info.This());
// try to set options
try
{
producer->SetOptions(options);
}
catch (runtime_error e)
{
Nan::ThrowError(e.what());
return;
}
info.GetReturnValue().Set(info.This());
}
NAN_METHOD(RocketMQProducer::Start)
{
NAN_GET_CPRODUCER();
Nan::Callback* callback = (info[0]->IsFunction()) ?
new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
NULL;
Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::START_PRODUCER));
}
NAN_METHOD(RocketMQProducer::Shutdown)
{
NAN_GET_CPRODUCER();
Nan::Callback* callback = (info[0]->IsFunction()) ?
new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
NULL;
Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::SHUTDOWN_PRODUCER));
}
NAN_METHOD(RocketMQProducer::SetSessionCredentials)
{
NAN_GET_CPRODUCER();
Nan::Utf8String access_key(info[0]);
Nan::Utf8String secret_key(info[1]);
Nan::Utf8String ons_channel(info[2]);
int ret;
try
{
ret = SetProducerSessionCredentials(producer_ptr, *access_key, *secret_key, *ons_channel);
}
catch(runtime_error e)
{
Nan::ThrowError(e.what());
}
catch(std::exception& e)
{
Nan::ThrowError(e.what());
}
catch(rocketmq::MQException& e)
{
Nan::ThrowError(e.what());
}
info.GetReturnValue().Set(ret);
}
NAN_METHOD(RocketMQProducer::Send)
{
Nan::Utf8String topic(info[0]);
Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
CMessage* msg = CreateMessage(*topic);
Local<Value> _tags_to_be_checked = Nan::Get(options, Nan::New<String>("tags").ToLocalChecked()).ToLocalChecked();
Local<Value> _keys_to_be_checked = Nan::Get(options, Nan::New<String>("keys").ToLocalChecked()).ToLocalChecked();
if(_tags_to_be_checked->IsString())
{
Nan::Utf8String tags(_tags_to_be_checked);
SetMessageTags(msg, *tags);
}
if(_keys_to_be_checked->IsString())
{
Nan::Utf8String keys(_keys_to_be_checked);
SetMessageKeys(msg, *keys);
}
// set message body:
// 1. if it's a string, call `SetMessageBody`;
// 2. if it's a buffer, call `SetByteMessageBody`.
if(info[1]->IsString())
{
Nan::Utf8String body(info[1]);
SetMessageBody(msg, *body);
}
else
{
Local<Object> node_buff_object = Nan::To<Object>(info[1]).ToLocalChecked();
unsigned int length = node::Buffer::Length(node_buff_object);
const char* buff = node::Buffer::Data(node_buff_object);
SetByteMessageBody(msg, buff, length);
}
Nan::Callback* callback = (info[3]->IsFunction()) ?
new Nan::Callback(Nan::To<Function>(info[3]).ToLocalChecked()) :
NULL;
RocketMQProducer* producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder());
Nan::AsyncQueueWorker(new ProducerSendMessageWorker(callback, producer, msg));
}
}