blob: 02fa2f570e6199546221472bef7cfb189b670363 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "Client.h"
#include "Consumer.h"
#include "Producer.h"
#include "Reader.h"
#include "Authentication.h"
#include <pulsar/c/client.h>
#include <pulsar/c/client_configuration.h>
#include <pulsar/c/result.h>
static const std::string CFG_SERVICE_URL = "serviceUrl";
static const std::string CFG_AUTH = "authentication";
static const std::string CFG_AUTH_PROP = "binding";
static const std::string CFG_OP_TIMEOUT = "operationTimeoutSeconds";
static const std::string CFG_IO_THREADS = "ioThreads";
static const std::string CFG_LISTENER_THREADS = "messageListenerThreads";
static const std::string CFG_CONCURRENT_LOOKUP = "concurrentLookupRequest";
static const std::string CFG_USE_TLS = "useTls";
static const std::string CFG_TLS_TRUST_CERT = "tlsTrustCertsFilePath";
static const std::string CFG_TLS_VALIDATE_HOSTNAME = "tlsValidateHostname";
static const std::string CFG_TLS_ALLOW_INSECURE = "tlsAllowInsecureConnection";
static const std::string CFG_STATS_INTERVAL = "statsIntervalInSeconds";
Napi::FunctionReference Client::constructor;
Napi::Object Client::Init(Napi::Env env, Napi::Object exports) {
Napi::HandleScope scope(env);
Napi::Function func = DefineClass(
env, "Client",
{InstanceMethod("createProducer", &Client::CreateProducer),
InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("createReader", &Client::CreateReader),
InstanceMethod("close", &Client::Close)});
constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
exports.Set("Client", func);
return exports;
}
Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);
Napi::Object clientConfig = info[0].As<Napi::Object>();
if (!clientConfig.Has(CFG_SERVICE_URL) || !clientConfig.Get(CFG_SERVICE_URL).IsString()) {
if (clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
Napi::Error::New(env, "Service URL is required and must be specified as a string")
.ThrowAsJavaScriptException();
return;
}
}
Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
pulsar_client_configuration_t *cClientConfig = pulsar_client_configuration_create();
if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
Authentication *auth = Authentication::Unwrap(obj.Get(CFG_AUTH_PROP).ToObject());
pulsar_client_configuration_set_auth(cClientConfig, auth->GetCAuthentication());
}
}
if (clientConfig.Has(CFG_OP_TIMEOUT) && clientConfig.Get(CFG_OP_TIMEOUT).IsNumber()) {
int32_t operationTimeoutSeconds = clientConfig.Get(CFG_OP_TIMEOUT).ToNumber().Int32Value();
if (operationTimeoutSeconds > 0) {
pulsar_client_configuration_set_operation_timeout_seconds(cClientConfig, operationTimeoutSeconds);
}
}
if (clientConfig.Has(CFG_IO_THREADS) && clientConfig.Get(CFG_IO_THREADS).IsNumber()) {
int32_t ioThreads = clientConfig.Get(CFG_IO_THREADS).ToNumber().Int32Value();
if (ioThreads > 0) {
pulsar_client_configuration_set_io_threads(cClientConfig, ioThreads);
}
}
if (clientConfig.Has(CFG_LISTENER_THREADS) && clientConfig.Get(CFG_LISTENER_THREADS).IsNumber()) {
int32_t messageListenerThreads = clientConfig.Get(CFG_LISTENER_THREADS).ToNumber().Int32Value();
if (messageListenerThreads > 0) {
pulsar_client_configuration_set_message_listener_threads(cClientConfig, messageListenerThreads);
}
}
if (clientConfig.Has(CFG_CONCURRENT_LOOKUP) && clientConfig.Get(CFG_CONCURRENT_LOOKUP).IsNumber()) {
int32_t concurrentLookupRequest = clientConfig.Get(CFG_CONCURRENT_LOOKUP).ToNumber().Int32Value();
if (concurrentLookupRequest > 0) {
pulsar_client_configuration_set_concurrent_lookup_request(cClientConfig, concurrentLookupRequest);
}
}
if (clientConfig.Has(CFG_USE_TLS) && clientConfig.Get(CFG_USE_TLS).IsBoolean()) {
Napi::Boolean useTls = clientConfig.Get(CFG_USE_TLS).ToBoolean();
pulsar_client_configuration_set_use_tls(cClientConfig, useTls.Value());
}
if (clientConfig.Has(CFG_TLS_TRUST_CERT) && clientConfig.Get(CFG_TLS_TRUST_CERT).IsString()) {
Napi::String tlsTrustCertsFilePath = clientConfig.Get(CFG_TLS_TRUST_CERT).ToString();
pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig,
tlsTrustCertsFilePath.Utf8Value().c_str());
}
if (clientConfig.Has(CFG_TLS_VALIDATE_HOSTNAME) &&
clientConfig.Get(CFG_TLS_VALIDATE_HOSTNAME).IsBoolean()) {
Napi::Boolean tlsValidateHostname = clientConfig.Get(CFG_TLS_VALIDATE_HOSTNAME).ToBoolean();
pulsar_client_configuration_set_validate_hostname(cClientConfig, tlsValidateHostname.Value());
}
if (clientConfig.Has(CFG_TLS_ALLOW_INSECURE) && clientConfig.Get(CFG_TLS_ALLOW_INSECURE).IsBoolean()) {
Napi::Boolean tlsAllowInsecureConnection = clientConfig.Get(CFG_TLS_ALLOW_INSECURE).ToBoolean();
pulsar_client_configuration_set_tls_allow_insecure_connection(cClientConfig,
tlsAllowInsecureConnection.Value());
}
if (clientConfig.Has(CFG_STATS_INTERVAL) && clientConfig.Get(CFG_STATS_INTERVAL).IsNumber()) {
uint32_t statsIntervalInSeconds = clientConfig.Get(CFG_STATS_INTERVAL).ToNumber().Uint32Value();
pulsar_client_configuration_set_stats_interval_in_seconds(cClientConfig, statsIntervalInSeconds);
}
this->cClient = pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig);
pulsar_client_configuration_free(cClientConfig);
}
Client::~Client() { pulsar_client_free(this->cClient); }
Napi::Value Client::CreateProducer(const Napi::CallbackInfo &info) {
return Producer::NewInstance(info, this->cClient);
}
Napi::Value Client::Subscribe(const Napi::CallbackInfo &info) {
return Consumer::NewInstance(info, this->cClient);
}
Napi::Value Client::CreateReader(const Napi::CallbackInfo &info) {
return Reader::NewInstance(info, this->cClient);
}
class ClientCloseWorker : public Napi::AsyncWorker {
public:
ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
cClient(cClient) {}
~ClientCloseWorker() {}
void Execute() {
pulsar_result result = pulsar_client_close(this->cClient);
if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
}
void OnOK() { this->deferred.Resolve(Env().Null()); }
void OnError(const Napi::Error &e) {
this->deferred.Reject(
Napi::Error::New(Env(), std::string("Failed to close client: ") + e.Message()).Value());
}
private:
Napi::Promise::Deferred deferred;
pulsar_client_t *cClient;
};
Napi::Value Client::Close(const Napi::CallbackInfo &info) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
ClientCloseWorker *wk = new ClientCloseWorker(deferred, this->cClient);
wk->Queue();
return deferred.Promise();
}