Support authentication
diff --git a/binding.gyp b/binding.gyp
index 88a6c70..cfcf8b7 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -29,12 +29,13 @@
"sources": [
"src/addon.cc",
"src/Message.cc",
+ "src/MessageId.cc",
+ "src/Authentication.cc",
"src/Client.cc",
"src/Producer.cc",
"src/ProducerConfig.cc",
"src/Consumer.cc",
"src/ConsumerConfig.cc",
- "src/MessageId.cc",
],
"libraries": ["-lpulsar"],
}
diff --git a/examples/consumer_tls_auth.js b/examples/consumer_tls_auth.js
new file mode 100644
index 0000000..e71b251
--- /dev/null
+++ b/examples/consumer_tls_auth.js
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const Pulsar = require('../index.js');
+
+(async () => {
+ const auth = new Pulsar.AuthenticationTls({
+ certificatePath: '/path/to/client.crt',
+ privateKeyPath: '/path/to/client.key',
+ });
+
+ // Create a client
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar+ssl://localhost:6651',
+ authentication: auth,
+ tlsTrustCertsFilePath: '/path/to/server.crt',
+ });
+
+ // Create a consumer
+ const consumer = await client.subscribe({
+ topic: 'persistent://public/default/my-topic',
+ subscription: 'sub1',
+ });
+
+ // Receive messages
+ for (let i = 0; i < 10; i += 1) {
+ const msg = await consumer.receive();
+ console.log(msg.getData().toString());
+ consumer.acknowledge(msg);
+ }
+
+ await consumer.close();
+ await client.close();
+})();
diff --git a/examples/producer_tls_auth.js b/examples/producer_tls_auth.js
new file mode 100644
index 0000000..df4e33e
--- /dev/null
+++ b/examples/producer_tls_auth.js
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const Pulsar = require('../index.js');
+
+(async () => {
+ const auth = new Pulsar.AuthenticationTls({
+ certificatePath: '/path/to/client.crt',
+ privateKeyPath: '/path/to/client.key',
+ });
+
+ // Create a client
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar+ssl://localhost:6651',
+ authentication: auth,
+ tlsTrustCertsFilePath: '/path/to/server.crt',
+ });
+
+ // Create a producer
+ const producer = await client.createProducer({
+ topic: 'persistent://public/default/my-topic',
+ });
+
+ // Send messages
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ });
+ console.log(`Sent message: ${msg}`);
+ }
+ await producer.flush();
+
+ await producer.close();
+ await client.close();
+})();
diff --git a/index.js b/index.js
index 03f3a08..97c83c6 100644
--- a/index.js
+++ b/index.js
@@ -18,11 +18,17 @@
*/
const PulsarBinding = require('bindings')('Pulsar');
+const AuthenticationTls = require('./src/AuthenticationTls.js');
+const AuthenticationAthenz = require('./src/AuthenticationAthenz.js');
+const AuthenticationToken = require('./src/AuthenticationToken.js');
const Pulsar = {
Client: PulsarBinding.Client,
Message: PulsarBinding.Message,
MessageId: PulsarBinding.MessageId,
+ AuthenticationTls,
+ AuthenticationAthenz,
+ AuthenticationToken,
};
module.exports = Pulsar;
diff --git a/src/Authentication.cc b/src/Authentication.cc
new file mode 100644
index 0000000..226fd0a
--- /dev/null
+++ b/src/Authentication.cc
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Authentication.h"
+
+static const std::string PARAM_TLS_CERT = "certificatePath";
+static const std::string PARAM_TLS_KEY = "privateKeyPath";
+static const std::string PARAM_TOKEN = "token";
+
+Napi::FunctionReference Authentication::constructor;
+
+Napi::Object Authentication::Init(Napi::Env env, Napi::Object exports) {
+ Napi::HandleScope scope(env);
+
+ Napi::Function func = DefineClass(env, "Authentication", {});
+
+ constructor = Napi::Persistent(func);
+ constructor.SuppressDestruct();
+
+ exports.Set("Authentication", func);
+ return exports;
+}
+
+Authentication::Authentication(const Napi::CallbackInfo &info)
+ : Napi::ObjectWrap<Authentication>(info), cAuthentication(nullptr) {
+ Napi::Env env = info.Env();
+ Napi::HandleScope scope(env);
+
+ if (info.Length() < 1 || !info[0].IsString() || info[0].ToString().Utf8Value().empty()) {
+ Napi::Error::New(env, "Authentication method is not specified").ThrowAsJavaScriptException();
+ return;
+ }
+
+ std::string authMethod = info[0].ToString().Utf8Value();
+
+ if (authMethod == "tls" || authMethod == "token") {
+ if (info.Length() < 2 || !info[1].IsObject()) {
+ Napi::Error::New(env, "Authentication parameter must be a object").ThrowAsJavaScriptException();
+ return;
+ }
+
+ Napi::Object obj = info[1].ToObject();
+
+ if (authMethod == "tls") {
+ if (!obj.Has(PARAM_TLS_CERT) || !obj.Get(PARAM_TLS_CERT).IsString() || !obj.Has(PARAM_TLS_KEY) ||
+ !obj.Get(PARAM_TLS_KEY).IsString()) {
+ Napi::Error::New(env, "Missing required parameter").ThrowAsJavaScriptException();
+ return;
+ }
+ this->cAuthentication =
+ pulsar_authentication_tls_create(obj.Get(PARAM_TLS_CERT).ToString().Utf8Value().c_str(),
+ obj.Get(PARAM_TLS_KEY).ToString().Utf8Value().c_str());
+ } else if (authMethod == "token") {
+ if (!obj.Has(PARAM_TOKEN) || !obj.Get(PARAM_TOKEN).IsString()) {
+ Napi::Error::New(env, "Missing required parameter").ThrowAsJavaScriptException();
+ return;
+ }
+ this->cAuthentication =
+ pulsar_authentication_token_create(obj.Get(PARAM_TOKEN).ToString().Utf8Value().c_str());
+ }
+ } else if (authMethod == "athenz") {
+ if (info.Length() < 2 || !info[1].IsString()) {
+ Napi::Error::New(env, "Authentication parameter must be a JSON string").ThrowAsJavaScriptException();
+ return;
+ }
+ this->cAuthentication = pulsar_authentication_athenz_create(info[1].ToString().Utf8Value().c_str());
+ } else {
+ Napi::Error::New(env, "Unsupported authentication method").ThrowAsJavaScriptException();
+ return;
+ }
+}
+
+Authentication::~Authentication() {
+ if (this->cAuthentication != nullptr) {
+ pulsar_authentication_free(this->cAuthentication);
+ }
+}
+
+pulsar_authentication_t *Authentication::GetCAuthentication() { return this->cAuthentication; }
diff --git a/src/Authentication.h b/src/Authentication.h
new file mode 100644
index 0000000..3666bd8
--- /dev/null
+++ b/src/Authentication.h
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef AUTH_H
+#define AUTH_H
+
+#include <napi.h>
+#include <pulsar/c/authentication.h>
+
+class Authentication : public Napi::ObjectWrap<Authentication> {
+ public:
+ static Napi::Object Init(Napi::Env env, Napi::Object exports);
+ Authentication(const Napi::CallbackInfo &info);
+ ~Authentication();
+ pulsar_authentication_t *GetCAuthentication();
+
+ private:
+ static Napi::FunctionReference constructor;
+ pulsar_authentication_t *cAuthentication;
+};
+
+#endif
diff --git a/src/AuthenticationAthenz.js b/src/AuthenticationAthenz.js
new file mode 100644
index 0000000..edbce3b
--- /dev/null
+++ b/src/AuthenticationAthenz.js
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const PulsarBinding = require('bindings')('Pulsar');
+
+class AuthenticationAthenz {
+ constructor(params) {
+ const paramsStr = (typeof params === 'object') ? JSON.stringify(params) : params;
+ this.binding = new PulsarBinding.Authentication('athenz', paramsStr);
+ }
+}
+
+module.exports = AuthenticationAthenz;
diff --git a/src/AuthenticationTls.js b/src/AuthenticationTls.js
new file mode 100644
index 0000000..f00b579
--- /dev/null
+++ b/src/AuthenticationTls.js
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const PulsarBinding = require('bindings')('Pulsar');
+
+class AuthenticationTls {
+ constructor(params) {
+ this.binding = new PulsarBinding.Authentication('tls', params);
+ }
+}
+
+module.exports = AuthenticationTls;
diff --git a/src/AuthenticationToken.js b/src/AuthenticationToken.js
new file mode 100644
index 0000000..e40c892
--- /dev/null
+++ b/src/AuthenticationToken.js
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const PulsarBinding = require('bindings')('Pulsar');
+
+class AuthenticationToken {
+ constructor(params) {
+ this.binding = new PulsarBinding.Authentication('token', params);
+ }
+}
+
+module.exports = AuthenticationToken;
diff --git a/src/Client.cc b/src/Client.cc
index 16fb13d..123fc0f 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -20,11 +20,14 @@
#include "Client.h"
#include "Consumer.h"
#include "Producer.h"
+#include "Authentication.h"
#include <pulsar/c/client.h>
#include <pulsar/c/client_configuration.h>
#include <pulsar/c/result.h>
static const std::string CFG_SERVICE_URL = "serviceUrl";
+static const std::string CFG_AUTH = "authentication";
+static const std::string CFG_AUTH_PROP = "binding";
static const std::string CFG_OP_TIMEOUT = "operationTimeoutSeconds";
static const std::string CFG_IO_THREADS = "ioThreads";
static const std::string CFG_LISTENER_THREADS = "messageListenerThreads";
@@ -68,6 +71,14 @@
pulsar_client_configuration_t *cClientConfig = pulsar_client_configuration_create();
+ if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
+ Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
+ if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
+ Authentication *auth = Authentication::Unwrap(obj.Get(CFG_AUTH_PROP).ToObject());
+ pulsar_client_configuration_set_auth(cClientConfig, auth->GetCAuthentication());
+ }
+ }
+
if (clientConfig.Has(CFG_OP_TIMEOUT) && clientConfig.Get(CFG_OP_TIMEOUT).IsNumber()) {
int32_t operationTimeoutSeconds = clientConfig.Get(CFG_OP_TIMEOUT).ToNumber().Int32Value();
if (operationTimeoutSeconds > 0) {
diff --git a/src/addon.cc b/src/addon.cc
index 9e75d3f..050ae12 100644
--- a/src/addon.cc
+++ b/src/addon.cc
@@ -17,16 +17,18 @@
* under the License.
*/
-#include "Client.h"
-#include "Consumer.h"
#include "Message.h"
#include "MessageId.h"
+#include "Authentication.h"
#include "Producer.h"
+#include "Consumer.h"
+#include "Client.h"
#include <napi.h>
Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
Message::Init(env, exports);
MessageId::Init(env, exports);
+ Authentication::Init(env, exports);
Producer::Init(env, exports);
Consumer::Init(env, exports);
return Client::Init(env, exports);