Refactor: first demo for developing this SDK (#1)
Refactor: first demo for developing this SDK
diff --git a/.eslintrc b/.eslintrc
new file mode 100644
index 0000000..4601220
--- /dev/null
+++ b/.eslintrc
@@ -0,0 +1,5 @@
+{
+ "extends": [
+ "rocketmq-style"
+ ]
+}
diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md
new file mode 100644
index 0000000..1c4f079
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE.md
@@ -0,0 +1,20 @@
+<!--
+Thank you for reporting an issue.
+
+This issue tracker is for bugs and issues found within RocketMQ Node.js SDK.
+
+Please fill in as much of the template below as you're able.
+
+Version:
+Platform: output of `uname -a` (UNIX), or version and 32 or 64-bit (Windows)
+Subsystem: if known, please specify affected core module name
+
+If possible, please provide code that demonstrates the problem, keeping it as
+simple and free of external dependencies as you are able.
+-->
+
+* **Version**:
+* **Platform**:
+* **Subsystem**:
+
+<!-- Enter your issue details below this comment. -->
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
new file mode 100644
index 0000000..c2f77b7
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -0,0 +1,16 @@
+<!--
+Thank you for your pull request. Please provide a description above and review
+the requirements below.
+
+Bug fixes and new features should include tests and possibly benchmarks.
+
+Contributors guide: https://github.com/apache/rocketmq-client-nodejs/blob/master/CONTRIBUTING.md
+-->
+
+##### Checklist
+<!-- Remove items that do not apply. For completed items, change [ ] to [x]. -->
+
+- [ ] `npm test` passes
+- [ ] tests and/or benchmarks are included
+- [ ] documentation is changed or added
+- [ ] commit message follows [commit guidelines](https://github.com/apache/rocketmq-client-nodejs/blob/master/PULL_REQUEST.md#commit-message-guidelines)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..c9d16aa
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,188 @@
+# Created by https://www.gitignore.io/api/c++,node,linux,macos,windows
+# Edit at https://www.gitignore.io/?templates=c++,node,linux,macos,windows
+
+### C++ ###
+# Prerequisites
+*.d
+
+# Compiled Object files
+*.slo
+*.lo
+*.o
+*.obj
+
+# Precompiled Headers
+*.gch
+*.pch
+
+# Compiled Dynamic libraries
+*.so
+*.dylib
+*.dll
+
+# Fortran module files
+*.mod
+*.smod
+
+# Compiled Static libraries
+*.lai
+*.la
+*.a
+*.lib
+
+# Executables
+*.exe
+*.out
+*.app
+
+### Linux ###
+*~
+
+# temporary files which can be created if a process still has a handle open of a deleted file
+.fuse_hidden*
+
+# KDE directory preferences
+.directory
+
+# Linux trash folder which might appear on any partition or disk
+.Trash-*
+
+# .nfs files are created when an open file is removed but is still being accessed
+.nfs*
+
+### macOS ###
+# General
+.DS_Store
+.AppleDouble
+.LSOverride
+
+# Icon must end with two \r
+Icon
+
+# Thumbnails
+._*
+
+# Files that might appear in the root of a volume
+.DocumentRevisions-V100
+.fseventsd
+.Spotlight-V100
+.TemporaryItems
+.Trashes
+.VolumeIcon.icns
+.com.apple.timemachine.donotpresent
+
+# Directories potentially created on remote AFP share
+.AppleDB
+.AppleDesktop
+Network Trash Folder
+Temporary Items
+.apdisk
+
+### Node ###
+# Logs
+logs
+*.log
+npm-debug.log*
+yarn-debug.log*
+yarn-error.log*
+
+# Runtime data
+pids
+*.pid
+*.seed
+*.pid.lock
+
+# Directory for instrumented libs generated by jscoverage/JSCover
+lib-cov
+
+# Coverage directory used by tools like istanbul
+coverage
+
+# nyc test coverage
+.nyc_output
+
+# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
+.grunt
+
+# Bower dependency directory (https://bower.io/)
+bower_components
+
+# node-waf configuration
+.lock-wscript
+
+# Compiled binary addons (https://nodejs.org/api/addons.html)
+build/Release
+
+# Dependency directories
+node_modules/
+jspm_packages/
+
+# TypeScript v1 declaration files
+typings/
+
+# Optional npm cache directory
+.npm
+
+# Optional eslint cache
+.eslintcache
+
+# Optional REPL history
+.node_repl_history
+
+# Output of 'npm pack'
+*.tgz
+
+# Yarn Integrity file
+.yarn-integrity
+
+# dotenv environment variables file
+.env
+
+# parcel-bundler cache (https://parceljs.org/)
+.cache
+
+# next.js build output
+.next
+
+# nuxt.js build output
+.nuxt
+
+# vuepress build output
+.vuepress/dist
+
+# Serverless directories
+.serverless
+
+# FuseBox cache
+.fusebox/
+
+### Windows ###
+# Windows thumbnail cache files
+Thumbs.db
+ehthumbs.db
+ehthumbs_vista.db
+
+# Dump file
+*.stackdump
+
+# Folder config file
+[Dd]esktop.ini
+
+# Recycle Bin used on file shares
+$RECYCLE.BIN/
+
+# Windows Installer files
+*.cab
+*.msi
+*.msix
+*.msm
+*.msp
+
+# Windows shortcuts
+*.lnk
+
+# End of https://www.gitignore.io/api/c++,node,linux,macos,windows
+
+build
+package-lock.json
+.vscode
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..c9fc461
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,3 @@
+[submodule "deps/rocketmq"]
+path = deps/rocketmq
+url = https://github.com/apache/rocketmq-client-cpp.git
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..3ced58e
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,39 @@
+language: node_js
+
+node_js: 10
+
+before_script:
+ - wget http://us.mirrors.quenda.co/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
+ - unzip rocketmq-all-4.3.2-bin-release.zip
+ - cd rocketmq-all-4.3.2-bin-release
+ - perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
+ - nohup sh bin/mqnamesrv &
+ - nohup sh bin/mqbroker -n localhost:9876 &
+ - sleep 10
+ - ./bin/mqadmin updateTopic -b '127.0.0.1:10911' –n '127.0.0.1:9876' -t test
+ - ./bin/mqadmin updateSubGroup -b '127.0.0.1:10911' –n '127.0.0.1:9876' -g testGroup
+
+script:
+ - npm test
+
+matrix:
+ include:
+ - os: linux
+ dist: trusty
+ - os: linux
+ dist: xenial
+ jdk: openjdk8
+ env:
+ JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
+ apt:
+ packages:
+ - openjdk8
+ - os: windows
+ - os: osx
+ osx_image: xcode9.3
+ env:
+ JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_112.jdk/Contents/Home/jre
+ allow_failures:
+ - os: linux
+ dist: xenial
+ - os: windows
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..802ac1f
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1 @@
+## TODO
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..d53ea09
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (properties) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/README.md b/README.md
index 0743cbd..e4b3054 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,307 @@
-# rocketmq-client-nodejs
+# RocketMQ Client for Node.js
+
+This official Node.js client is a C++ binding of [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), which has been proven robust and widely adopted within Alibaba Group by many business units for more than three years.
+
+> **Notice 1:** This client is still in `dev` version. Use it cautiously in production.
+
+> **Notice 2:** This SDK is now only support macOS and Ubuntu **14.04**. Ubuntu 16+ is not supported and CentOS is not tested yet.
+
+## Installation
+
+```shell
+$ npm install --save apache-rocketmq
+```
+
+## Examples
+
+You may view [example/producer.js](https://github.com/apache/rocketmq-client-nodejs/blob/master/example/producer.js) and
+[example/push_consumer.js](https://github.com/apache/rocketmq-client-nodejs/blob/master/example/push_consumer.js) for quick start.
+
+## Usage
+
+Require this package first.
+
+```javascript
+const { Producer, PushConsumer } = require("apache-rocketmq");
+```
+
+### Producer
+
+#### Constructor
+
+```javascript
+new Producer(groupId[, instanceName][, options]);
+```
+
+`Producer`'s constructor receives three parameters:
+
++ `groupId`: the group id of the producer;
++ `instanceName`: the instance name of the producer, **optional**;
++ `options`: the options object, **optional**;
+ - `nameServer`: the name server of RocketMQ;
+ - `groupName`: the group name of this producer;
+ - `compressLevel`: the compress level (0-9) of this producer, default to `5` where `0` is fastest and `9` is most compressed;
+ - `sendMessageTimeout`: send message timeout millisecond, default to `3000` and suggestion is 2000 - 3000ms;
+ - `maxMessageSize`: max message size with unit (B), default to `1024 * 128` which means 128K;
+ - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+ - `logFileSize`: size of each C++ core logic log file with unit (B);
+ - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
+
+e.g.
+
+```javascript
+const { Producer } = require("apache-rocketmq");
+const producer = new Producer("GROUP_ID", "INSTANCE_NAME", {
+ nameServer: "127.0.0.1:9876",
+});
+```
+
+#### start
+
+```javascript
+producer.start([callback]);
+```
+
+`.start` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+producer.start(function(err) {
+ if(err) {
+ //
+ }
+});
+
+// or
+
+producer.start().then(() => {
+ //
+}).catch(err => {
+ //
+});
+```
+
+#### shutdown
+
+```javascript
+producer.start([callback]);
+```
+
+`.shutdown` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+producer.shutdown(function(err) {
+ if(err) {
+ //
+ }
+});
+
+// or
+
+producer.shutdown().then(() => {
+ //
+}).catch(err => {
+ //
+});
+```
+
+#### send
+
+```javascript
+producer.send(topic, body[, options][, callback]);
+```
+
+`.send` receives 4 parameters including a callback. If no callback passed, this function will return a Promise object.
+
++ `topic`: the topic string;
++ `body`: the message body string;
++ `options`: the options object, **optional**;
+ - `keys`: the keys for this message;
+ - `tags`: the tags for this message;
++ `callback`: the callback function, **optional**.
+
+e.g.
+
+```javascript
+producer.send("test", `baz ${i}`, {
+ keys: "foo",
+ tags: "bar"
+}, function(err, result) {
+ if(err) {
+ // ...
+ } else {
+ console.log(result);
+
+ // console example:
+ //
+ // { status: 0,
+ // statusStr: 'OK',
+ // msgId: '0101007F0000367E0000309DD68B0700',
+ // offset: 0 }
+ }
+});
+```
+
+##### send `status` and `statusStr`
+
+| `status` | `statusStr` |
+|----------|-----------------------|
+| `0` | `OK` |
+| `1` | `FLUSH_DISK_TIMEOUT` |
+| `2` | `FLUSH_SLAVE_TIMEOUT` |
+| `3` | `SLAVE_NOT_AVAILABLE` |
+
+### PushConsumer
+
+#### Constructor
+
+```javascript
+new PushConsumer(groupId[, instanceName][, options]);
+```
+
+`PushConsumer`'s constructor receives three parameters:
+
++ `groupId`: the group id of the push consumer;
++ `instanceName`: the instance name of the push consumer, **optional**;
++ `options`: the options object, **optional**;
+ - `nameServer`: the name server of RocketMQ;
+ - `threadCount`: the thread number of underlying C++ logic;
+ - `maxBatchSize`: message max batch size;
+ - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+ - `logFileSize`: size of each C++ core logic log file with unit (B);
+ - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
+
+e.g.
+
+```javascript
+const { PushConsumer } = require("apache-rocketmq");
+const consumer = new PushConsumer("GROUP_ID", "INSTANCE_NAME", {
+ nameServer: "127.0.0.1:9876",
+ threadCount: 3
+});
+```
+
+#### start
+
+```javascript
+consumer.start([callback]);
+```
+
+`.start` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+consumer.start(function(err) {
+ if(err) {
+ //
+ }
+});
+
+// or
+
+consumer.start().then(() => {
+ //
+}).catch(err => {
+ //
+});
+```
+
+#### shutdown
+
+```javascript
+consumer.start([callback]);
+```
+
+`.shutdown` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+consumer.shutdown(function(err) {
+ if(err) {
+ //
+ }
+});
+
+// or
+
+consumer.shutdown().then(() => {
+ //
+}).catch(err => {
+ //
+});
+```
+
+#### subscribe
+
+Add a subscription relationship to consumer.
+
+```javascript
+consumer.subscribe(topic[, expression]);
+```
+
+`.subscribe` receives two parameters which the second parameter is optional.
+
++ `topic`: The topic to be subscribed;
++ `expression`: The additional expression to be subscribed, **optional**. e.g. `*`.
+
+#### On Message Event
+
+If you want to receive messages from RocketMQ Server, you should add a listener for `message` event which receives 2
+parameters.
+
+```javascript
+function YOUR_LISTENER(msg, ack) {
+ //
+}
+```
+
++ `msg`: the message object to be consumed;
++ `ack`: the Acknowledge object, which has a `.done()` function.
+
+`msg` object looks like:
+
+```javascript
+{ topic: 'test',
+ tags: 'bar',
+ keys: 'foo',
+ body: 'baz 7',
+ msgId: '0101007F0000367E0000339DD68B0800' }
+```
+
+You may call `ack.done()` to tell RocketMQ that you've finished your message successfully which is same as `ack.done(true)`. And you may call `ack.done(false)` to tell it that you've failed.
+
+e.g.
+
+```javascript
+consumer.on("message", function(msg, ack) {
+ console.log(msg);
+ ack.done();
+});
+```
+
+## Apache RocketMQ Community
+
++ [RocketMQ Community Projects](https://github.com/apache/rocketmq-externals)
+
+## Contact Us
+
++ Mailing Lists: https://rocketmq.apache.org/about/contact/
++ Home: https://rocketmq.apache.org
++ Docs: https://rocketmq.apache.org/docs/quick-start/
++ Issues: https://github.com/apache/rocketmq-client-nodejs/issues
++ Ask: https://stackoverflow.com/questions/tagged/rocketmq
++ Slack: https://rocketmq-community.slack.com/
+
+## How to Contribute
+
+Contributions are warmly welcome! Be it trivial cleanup, major new feature or other suggestion. Read this [how to contribute](CONTRIBUTING.md) guide for more details.
+
+## License
+
+[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
+
diff --git a/binding.gyp b/binding.gyp
new file mode 100644
index 0000000..697d1c4
--- /dev/null
+++ b/binding.gyp
@@ -0,0 +1,48 @@
+{
+ "targets": [
+ {
+ "target_name": "rocketmq",
+ "sources": [
+ "src/rocketmq.cpp",
+ "src/producer.cpp",
+ "src/push_consumer.cpp",
+ "src/consumer_ack.cpp",
+ "src/consumer_ack_inner.cpp"
+ ],
+ "include_dirs": [
+ "deps/rocketmq/include",
+ "<!(node -e \"require('nan')\")"
+ ],
+ "conditions": [
+ ["OS==\"linux\"", {
+ "libraries": [
+ "<(module_root_dir)/deps/lib/librocketmq.a"
+ ],
+ "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+ "cflags_cc": [ "-Wno-ignored-qualifiers" ],
+ "cflags": [ "-std=c++11", "-g" ]
+ }],
+ ["OS==\"win\"", {
+ "libraries": [
+ "<(module_root_dir)/deps/lib/rocketmq-client-cpp.lib"
+ ],
+ "copies": [
+ {
+ "destination": "<(module_root_dir)/build/Release/",
+ "files": [ "<(module_root_dir)/deps/lib/rocketmq-client-cpp.dll" ]
+ }
+ ]
+ }],
+ ["OS==\"mac\"", {
+ "xcode_settings": {
+ "GCC_ENABLE_CPP_EXCEPTIONS": "YES"
+ },
+ "cflags!": [ "-fno-exceptions" ],
+ "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+ "cflags_cc": [ "-Wno-ignored-qualifiers" ],
+ "cflags": [ "-std=c++11", "-stdlib=libc++" ]
+ }]
+ ]
+ }
+ ]
+}
diff --git a/deps/lib/.gitkeep b/deps/lib/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/deps/lib/.gitkeep
diff --git a/deps/rocketmq b/deps/rocketmq
new file mode 160000
index 0000000..d5887b6
--- /dev/null
+++ b/deps/rocketmq
@@ -0,0 +1 @@
+Subproject commit d5887b63ddbba16fec562f64dca7f77ce9ca0bb1
diff --git a/example/common.js b/example/common.js
new file mode 100644
index 0000000..efc5153
--- /dev/null
+++ b/example/common.js
@@ -0,0 +1,6 @@
+"use strict";
+
+module.exports = {
+ nameServer: "127.0.0.1:9876",
+ messageCount: 10
+};
diff --git a/example/producer.js b/example/producer.js
new file mode 100644
index 0000000..0f8ac0b
--- /dev/null
+++ b/example/producer.js
@@ -0,0 +1,47 @@
+"use strict";
+
+const co = require("co");
+
+const common = require("./common");
+const Producer = require("../").Producer;
+
+co(function *() {
+ const producer = new Producer("testGroup", {
+ nameServer: common.nameServer,
+ groupName: "testGroupName",
+ logFileNum: 5,
+ logFileSize: 1048576000,
+ logLevel: "debug",
+ compressLevel: 3,
+ sendMessageTimeout: 5000,
+ maxMessageSize: 1024 * 256
+ });
+
+ console.time("producer start");
+ try {
+ yield producer.start();
+ } catch(e) {
+ console.error(e);
+ process.exit(4);
+ }
+ console.timeEnd("producer start");
+ for(let i = 0; i < common.messageCount; i++) {
+ console.time(`send ${i}`);
+ try {
+ const ret = yield producer.send("test", `baz ${i}`, {
+ keys: "foo",
+ tags: "bar"
+ });
+ console.timeEnd(`send ${i}`);
+ console.log(ret);
+ } catch(e) {
+ console.error(e);
+ console.error(e.stack);
+ process.exit(4);
+ }
+ }
+
+ console.time("producer end");
+ yield producer.shutdown();
+ console.timeEnd("producer end");
+});
diff --git a/example/push_consumer.js b/example/push_consumer.js
new file mode 100644
index 0000000..9154a97
--- /dev/null
+++ b/example/push_consumer.js
@@ -0,0 +1,58 @@
+"use strict";
+
+const assert = require("assert");
+
+const co = require("co");
+
+const common = require("./common");
+const PushConsumer = require("../").PushConsumer;
+
+co(function *() {
+ const msgs = [];
+ const consumer = new PushConsumer("testGroup", {
+ nameServer: common.nameServer,
+ logFileNum: 5,
+ logFileSize: 1048576000,
+ logLevel: "debug"
+ });
+
+ consumer.subscribe("test", "*");
+ consumer.on("message", function(msg, ack) {
+ msgs.push(msg);
+ ack.done();
+
+ // console.log(msg);
+ // return;
+
+ if(msgs.length === common.messageCount) {
+ msgs.sort(function(a, b) {
+ return a.body < b.body ? -1 : 1;
+ });
+
+ console.log(msgs);
+
+ for(let i = 0; i < msgs.length; i++) {
+ assert.deepStrictEqual(msgs[i], {
+ topic: "test",
+ tags: "bar",
+ keys: "foo",
+ body: `baz ${i}`,
+ msgId: msgs[i].msgId
+ });
+ }
+
+ console.time("consumer end");
+ consumer.shutdown().then(() => {
+ console.timeEnd("consumer end");
+ process.exit(0);
+ }).catch(e => {
+ console.error(e);
+ process.exit(4);
+ });
+ }
+ });
+
+ console.time("consumer start");
+ yield consumer.start();
+ console.timeEnd("consumer start");
+});
diff --git a/index.js b/index.js
new file mode 100644
index 0000000..9127a66
--- /dev/null
+++ b/index.js
@@ -0,0 +1,4 @@
+"use strict";
+
+exports.Producer = require("./lib/producer");
+exports.PushConsumer = require("./lib/push_consumer");
diff --git a/lib/common.js b/lib/common.js
new file mode 100644
index 0000000..73f3ccb
--- /dev/null
+++ b/lib/common.js
@@ -0,0 +1,12 @@
+"use strict";
+
+exports.requireBinding = function(name) {
+ let mod;
+ try {
+ mod = require(`../build/Debug/${name}`);
+ } catch(e) {
+ mod = require(`../build/Release/${name}`);
+ }
+
+ return mod;
+};
diff --git a/lib/env_init.js b/lib/env_init.js
new file mode 100644
index 0000000..16d8d31
--- /dev/null
+++ b/lib/env_init.js
@@ -0,0 +1,16 @@
+"use strict";
+
+const os = require("os");
+const path = require("path");
+
+const common = require("./common");
+
+const binding = common.requireBinding("rocketmq");
+
+switch(os.platform()) {
+case "darwin":
+ process.env.EVENT_NOKQUEUE = "1";
+ binding.macosDLOpen(path.join(__dirname, "../deps/lib/librocketmq.dylib"));
+ break;
+default: break;
+}
diff --git a/lib/producer.js b/lib/producer.js
new file mode 100644
index 0000000..8c5b09c
--- /dev/null
+++ b/lib/producer.js
@@ -0,0 +1,184 @@
+"use strict";
+
+require("./env_init");
+
+const assert = require("assert");
+
+const common = require("./common");
+
+const binding = common.requireBinding("rocketmq");
+
+const START_OR_SHUTDOWN = Symbol("RocketMQProducer#startOrShutdown");
+
+const SEND_RESULT_STATUS = {
+ OK: 0,
+ FLUSH_DISK_TIMEOUT: 1,
+ FLUSH_SLAVE_TIMEOUT: 2,
+ SLAVE_NOT_AVAILABLE: 3
+};
+const SEND_RESULT_STATUS_STR = {
+ 0: "OK",
+ 1: "FLUSH_DISK_TIMEOUT",
+ 2: "FLUSH_SLAVE_TIMEOUT",
+ 3: "SLAVE_NOT_AVAILABLE"
+};
+const DEFAULT_OPTIONS = {};
+const START_STATUS = {
+ STOPPED: 0,
+ STARTED: 1,
+ STOPPING: 2,
+ STARTING: 3
+};
+const OPTIONS_LOG_LEVEL = {
+ FATAL: 1,
+ ERROR: 2,
+ WARN: 3,
+ INFO: 4,
+ DEBUG: 5,
+ TRACE: 6,
+ NUM: 7
+};
+
+let producerRef = 0;
+let timer;
+
+class RocketMQProducer {
+ /**
+ * RocketMQ Producer constructor
+ * @param {String} groupId the group id
+ * @param {String} [instanceName] the instance name
+ * @param {Object} options the options
+ */
+ constructor(groupId, instanceName, options) {
+ if(typeof instanceName !== "string" && !options) {
+ options = instanceName;
+ instanceName = null;
+ }
+
+ options = Object.assign({}, DEFAULT_OPTIONS, options || {});
+ if(options.logLevel && typeof options.logLevel === "string") {
+ options.logLevel = OPTIONS_LOG_LEVEL[options.logLevel.toUpperCase()] || OPTIONS_LOG_LEVEL.INFO;
+ }
+ this.core = new binding.Producer(groupId, instanceName, options);
+ this.status = START_STATUS.STOPPED;
+ }
+
+ /**
+ * Set session credentials (usually used in Alibaba MQ)
+ * @param {String} accessKey the access key
+ * @param {String} secretKey the secret key
+ * @param {String} onsChannel the ons channel
+ * @return {Number} the result
+ */
+ setSessionCredentials(accessKey, secretKey, onsChannel) {
+ assert(typeof accessKey === "string");
+ assert(typeof secretKey === "string");
+ assert(typeof onsChannel === "string");
+ return !this.core.setSessionCredentials(accessKey, secretKey, onsChannel);
+ }
+
+ [START_OR_SHUTDOWN](method, callback) {
+ let promise;
+ let resolve;
+ let reject;
+ if(!callback) {
+ promise = new Promise((_resolve, _reject) => {
+ resolve = _resolve;
+ reject = _reject;
+ });
+ } else {
+ resolve = reject = callback;
+ }
+
+ this.core[method]((err, ret) => {
+ if(err) return reject(err);
+
+ if(method === "start") {
+ this.status = START_STATUS.STARTED;
+ if(!producerRef) timer = setInterval(function() {}, 24 * 3600 * 1000);
+ producerRef++;
+ } else {
+ this.status = START_STATUS.STOPPED;
+ producerRef--;
+ if(!producerRef) clearInterval(timer);
+ }
+
+ return callback ? resolve(undefined, ret) : resolve(ret);
+ });
+
+ if(!callback) return promise;
+ }
+
+ /**
+ * Start the producer
+ * @param {Function} [callback] the callback function
+ * @return {Promise|undefined} returns a Promise if no callback
+ */
+ start(callback) {
+ assert(this.status === START_STATUS.STOPPED);
+ this.status = START_STATUS.STARTING;
+ return this[START_OR_SHUTDOWN]("start", callback);
+ }
+
+ /**
+ * Shutdown the producer
+ * @param {Function} [callback] the callback function
+ * @return {Promise|undefined} returns a Promise if no callback
+ */
+ shutdown(callback) {
+ assert(this.status === START_STATUS.STARTED);
+ this.status = START_STATUS.STOPPING;
+ return this[START_OR_SHUTDOWN]("shutdown", callback);
+ }
+
+ /**
+ * Send a message
+ * @param {String} topic the topic
+ * @param {String} body the body
+ * @param {Object} [options] the options
+ * @param {Function} [callback] the callback function
+ * @return {Promise|undefined} returns a Promise if no callback
+ */
+ send(topic, body, options, callback) {
+ assert(typeof topic === "string");
+ assert(typeof body === "string" || Buffer.isBuffer(body));
+
+ if(typeof options === "function") {
+ callback = options;
+ options = {};
+ }
+
+ options = options || {};
+
+ let promise;
+ let resolve;
+ let reject;
+ if(!callback) {
+ promise = new Promise((_resolve, _reject) => {
+ resolve = _resolve;
+ reject = _reject;
+ });
+ }
+
+ this.core.send(topic, body, options, function sendMessageCallback(err, status, msgId, offset) {
+ if(err) {
+ if(!callback) return reject(err);
+ return callback(err);
+ }
+
+ const ret = {
+ status,
+ statusStr: SEND_RESULT_STATUS_STR[status] || "UNKNOWN",
+ msgId,
+ offset
+ };
+ if(!callback) return resolve(ret);
+ callback(undefined, ret);
+ });
+ if(promise) return promise;
+ }
+}
+
+RocketMQProducer.SEND_RESULT = SEND_RESULT_STATUS;
+
+module.exports = RocketMQProducer;
diff --git a/lib/push_consumer.js b/lib/push_consumer.js
new file mode 100644
index 0000000..fde2e84
--- /dev/null
+++ b/lib/push_consumer.js
@@ -0,0 +1,126 @@
+"use strict";
+
+require("./env_init");
+
+const assert = require("assert");
+const EventEmitter = require("events").EventEmitter;
+
+const common = require("./common");
+
+const binding = common.requireBinding("rocketmq");
+
+const START_OR_SHUTDOWN = Symbol("RocketMQPushConsumer#startOrShutdown");
+const START_STATUS = {
+ STOPPED: 0,
+ STARTED: 1,
+ STOPPING: 2,
+ STARTING: 3
+};
+const OPTIONS_LOG_LEVEL = {
+ FATAL: 1,
+ ERROR: 2,
+ WARN: 3,
+ INFO: 4,
+ DEBUG: 5,
+ TRACE: 6,
+ NUM: 7
+};
+
+const DEFAULT_OPTIONS = {};
+
+let producerRef = 0;
+let timer;
+
+class RocketMQPushConsumer extends EventEmitter {
+ /**
+ * RocketMQ PushConsumer constructor
+ * @param {String} groupId the group id
+ * @param {String} [instanceName] the instance name
+ * @param {Object} options the options
+ */
+ constructor(groupId, instanceName, options) {
+ super();
+
+ if(typeof instanceName !== "string" && !options) {
+ options = instanceName;
+ instanceName = null;
+ }
+
+ options = Object.assign({}, DEFAULT_OPTIONS, options || {});
+ if(options.logLevel && typeof options.logLevel === "string") {
+ options.logLevel = OPTIONS_LOG_LEVEL[options.logLevel.toUpperCase()] || OPTIONS_LOG_LEVEL.INFO;
+ }
+ this.core = new binding.PushConsumer(groupId, instanceName, options);
+ this.core.setListener(this.emit.bind(this, "message"));
+ this.status = START_STATUS.STOPPED;
+ }
+
+ [START_OR_SHUTDOWN](method, callback) {
+ let promise;
+ let resolve;
+ let reject;
+ if(!callback) {
+ promise = new Promise((_resolve, _reject) => {
+ resolve = _resolve;
+ reject = _reject;
+ });
+ } else {
+ resolve = reject = callback;
+ }
+
+ this.core[method]((err, ret) => {
+ if(err) return reject(err);
+
+ if(method === "start") {
+ this.status = START_STATUS.STARTED;
+ if(!producerRef) timer = setInterval(function() {}, 24 * 3600 * 1000);
+ producerRef++;
+ } else {
+ this.status = START_STATUS.STOPPED;
+ producerRef--;
+ if(!producerRef) clearInterval(timer);
+ }
+
+ return callback ? resolve(undefined, ret) : resolve(ret);
+ });
+
+ if(!callback) return promise;
+ }
+
+ /**
+ * Start the push consumer
+ * @param {Function} [callback] the callback function
+ * @return {Promise|undefined} returns a Promise if no callback
+ */
+ start(callback) {
+ assert(this.status === START_STATUS.STOPPED);
+ this.status = START_STATUS.STARTING;
+ return this[START_OR_SHUTDOWN]("start", callback);
+ }
+
+ /**
+ * Shutdown the push consumer
+ * @param {Function} [callback] the callback function
+ * @return {Promise|undefined} returns a Promise if no callback
+ */
+ shutdown(callback) {
+ assert(this.status === START_STATUS.STARTED);
+ this.status = START_STATUS.STOPPING;
+ return this[START_OR_SHUTDOWN]("shutdown", callback);
+ }
+
+ /**
+ * subscribe a topic
+ * @param {String} topic the topic to be subscribed
+ * @param {String} [expression] the additional expression to be subscribed
+ * @return {Number} the subscribe status result
+ */
+ subscribe(topic, expression) {
+ assert(this.status === START_STATUS.STOPPED);
+ assert(topic && typeof topic === "string");
+ assert(!expression || expression && typeof expression === "string");
+ return this.core.subscribe(topic, expression || "");
+ }
+}
+
+module.exports = RocketMQPushConsumer;
diff --git a/package.json b/package.json
new file mode 100644
index 0000000..ae0df92
--- /dev/null
+++ b/package.json
@@ -0,0 +1,26 @@
+{
+ "name": "apache-rocketmq",
+ "version": "0.0.1-dev",
+ "cppSDKVersion": "1.2.0",
+ "description": "RocketMQ binding for Node.js",
+ "main": "index.js",
+ "scripts": {
+ "test": "npm run lint && echo 'temp example test' && node example/producer.js && node example/push_consumer.js",
+ "lint": "eslint .",
+ "install": "node ./script/download_lib.js && node-gyp rebuild"
+ },
+ "author": "XadillaX <i@2333.moe>",
+ "license": "Apache-2.0",
+ "dependencies": {
+ "co": "^4.6.0",
+ "destroy": "^1.0.4",
+ "getos": "^3.1.1",
+ "mkdirp": "^0.5.1",
+ "nan": "^2.11.1",
+ "urllib": "^2.31.3"
+ },
+ "devDependencies": {
+ "eslint": "^5.9.0",
+ "eslint-config-rocketmq-style": "^1.0.0"
+ }
+}
diff --git a/script/download_lib.js b/script/download_lib.js
new file mode 100644
index 0000000..2b57a4b
--- /dev/null
+++ b/script/download_lib.js
@@ -0,0 +1,115 @@
+"use strict";
+
+const fs = require("fs");
+const os = require("os");
+const path = require("path");
+
+const co = require("co");
+const destroy = require("destroy");
+const _mkdirp = require("mkdirp");
+const urllib = require("urllib");
+
+const getLinuxDistroRoute = require("./get_linux_distro_route");
+const pkg = require("../package");
+
+let REGISTRY_MIRROR =
+ process.env.NODE_ROCKETMQ_REGISTRY ||
+ "https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com";
+if(!REGISTRY_MIRROR.endsWith("/")) REGISTRY_MIRROR += "/";
+
+const CPP_SDK_VERSION = pkg.cppSDKVersion;
+const LIB_DIR = path.join(__dirname, "..", "deps", "lib");
+const URL_ROOT = `${REGISTRY_MIRROR}cpp-client`;
+
+function mkdirp(dir) {
+ return new Promise((resolve, reject) => {
+ _mkdirp(dir, null, err => {
+ if(err) reject(err);
+ else resolve();
+ });
+ });
+}
+
+function *getUrlArray() {
+ const platform = os.platform();
+ const ret = [];
+ let distro;
+
+ switch(platform) {
+ case "win32":
+ ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.dll`);
+ ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.lib`);
+ break;
+
+ case "darwin":
+ ret.push(`${URL_ROOT}/mac/${CPP_SDK_VERSION}/librocketmq.dylib`);
+ break;
+
+ case "linux":
+ distro = yield getLinuxDistroRoute();
+ ret.push(`${URL_ROOT}/linux/${CPP_SDK_VERSION}/${distro}/librocketmq.a`);
+ break;
+
+ default: throw new Error(`Unsupported platform ${platform}`);
+ }
+
+ return ret;
+}
+
+co(function *() {
+ let urls;
+ try {
+ urls = yield getUrlArray();
+ } catch(e) {
+ console.error(`[rocketmq sdk] [error] ${e.message}`);
+ process.exit(4);
+ }
+
+ yield mkdirp(LIB_DIR);
+ let writeTimes = 0;
+ for(const url of urls) {
+ console.log(`[rocketmq sdk] [info] downloading [${url}]...`);
+
+ const resp = yield urllib.request(url, {
+ timeout: 60000 * 5,
+ followRedirect: true,
+ streaming: true
+ });
+
+ if(resp.status !== 200) {
+ destroy(resp.res);
+ console.error(`[rocketmq sdk] [error] error status ${resp.status} while downloading [${url}].`);
+ process.exit(4);
+ }
+
+ const readStream = resp.res;
+ const filename = path.join(LIB_DIR, path.basename(url));
+ const writeStream = fs.createWriteStream(filename, {
+ encoding: "binary"
+ });
+
+ // eslint-disable-next-line
+ function handleDownladCallback(err) {
+ if(err) {
+ console.error(`[rocketmq sdk] [error] error occurred while downloading [${url}] to [${filename}].`);
+ console.error(err.stack);
+ process.exit(4);
+ }
+
+ writeTimes++;
+ destroy(resp.res);
+
+ console.log(`[rocketmq sdk] [info] downloaded library [${url}].`);
+ if(writeTimes === urls.length) {
+ console.log("[rocketmq sdk] [info] all libraries have been written to disk.");
+ process.exit(0);
+ }
+ }
+
+ readStream.on("error", handleDownladCallback);
+ writeStream.on("error", handleDownladCallback);
+ writeStream.on("finish", handleDownladCallback);
+
+ readStream.pipe(writeStream);
+ }
+});
diff --git a/script/get_linux_distro_route.js b/script/get_linux_distro_route.js
new file mode 100644
index 0000000..d5245a1
--- /dev/null
+++ b/script/get_linux_distro_route.js
@@ -0,0 +1,86 @@
+"use strict";
+
+const assert = require("assert");
+
+const getos = require("getos");
+
+const RHEL_MAP_ARRAY = [
+ "Centos",
+ "Red Hat Linux",
+ "RHEL",
+ "Scientific Linux",
+ "ScientificSL",
+ "ScientificCERNSLC",
+ "ScientificFermiLTS",
+ "ScientificSLF"
+];
+const NORMAL_MAP_ARRAY = [
+ "Alpine Linux",
+ "Amazon Linux",
+ "Arch Linux",
+ "Chakra",
+ "Debian",
+ "elementary OS",
+ "IYCC",
+ "Linux Mint",
+ "Manjaro Linux",
+ "Ubuntu Linux"
+];
+
+function realGetLinuxDistroRoute(dist, release) {
+ let major;
+ if(release) {
+ major = Number(release.split(".")[0]);
+ }
+
+ // RHEL Distros
+ if(RHEL_MAP_ARRAY.includes(dist)) {
+ assert(major >= 5 && major <= 7, `Only support ${dist} 5-7.`);
+ return `RHEL${major}.x`;
+ }
+
+ // Fedora
+ if("Fedora" === dist) {
+ if(major <= 18) {
+ return "RHEL6.x";
+ } else if(major === 19) {
+ return "RHEL7.x";
+ }
+ }
+
+ if("Ubuntu Linux" === dist) {
+ assert([ 14, 16, 18 ].includes(major));
+ return `UBUNTU/${major}.04`;
+ }
+
+ if("Debian" === dist) {
+ assert(major >= 8 && major <= 10);
+ return `UBUNTU/${(major + 2) / 2}.04`;
+ }
+
+ // Ubuntu Distros
+ if(!NORMAL_MAP_ARRAY.includes(dist)) {
+ console.error(`[rocketmq sdk] [warn] ${dist} may not supported, fallback to use Ubuntu library.`);
+ }
+
+ return "UBUNTU/14.04";
+}
+
+function getLinuxDistroRoute() {
+ return new Promise((resolve, reject) => {
+ getos(function(err, ret) {
+ if(err) return reject(err);
+
+ let route;
+ try {
+ route = realGetLinuxDistroRoute(ret.dist, ret.release);
+ } catch(e) {
+ return reject(e);
+ }
+
+ resolve(route);
+ });
+ });
+}
+
+module.exports = getLinuxDistroRoute;
diff --git a/src/consumer_ack.cpp b/src/consumer_ack.cpp
new file mode 100644
index 0000000..d4243ea
--- /dev/null
+++ b/src/consumer_ack.cpp
@@ -0,0 +1,74 @@
+#include "consumer_ack.h"
+
+namespace __node_rocketmq__ {
+
+Nan::Persistent<Function> ConsumerAck::constructor;
+
+ConsumerAck::ConsumerAck() :
+ inner(NULL)
+{
+}
+
+ConsumerAck::~ConsumerAck()
+{
+ inner = NULL;
+}
+
+NAN_MODULE_INIT(ConsumerAck::Init)
+{
+ Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
+ tpl->SetClassName(Nan::New("ConsumerAck").ToLocalChecked());
+ tpl->InstanceTemplate()->SetInternalFieldCount(1);
+
+ Nan::SetPrototypeMethod(tpl, "done", Done);
+
+ constructor.Reset(tpl->GetFunction());
+ Nan::Set(target, Nan::New("ConsumerAck").ToLocalChecked(), tpl->GetFunction());
+}
+
+NAN_METHOD(ConsumerAck::New)
+{
+ Isolate* isolate = info.GetIsolate();
+ Local<Context> context = Context::New(isolate);
+
+ if(!info.IsConstructCall())
+ {
+ Local<Function> _constructor = Nan::New<Function>(constructor);
+ info.GetReturnValue().Set(_constructor->NewInstance(context, 0, 0).ToLocalChecked());
+ return;
+ }
+
+ ConsumerAck* producer = new ConsumerAck();
+ producer->Wrap(info.This());
+ info.GetReturnValue().Set(info.This());
+}
+
+NAN_METHOD(ConsumerAck::Done)
+{
+ ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(info.Holder());
+ bool succ = true;
+
+ if(info.Length() >= 1)
+ {
+ succ = (info[0]->IsUndefined() || info[0]->IsNull() || Nan::To<bool>(info[0]).FromJust());
+ }
+
+ // call inner ack's `Ack` function to emit the true `Acker`'s `Ack` function
+ // and finish waiting of consume thread
+ CConsumeStatus status = succ ?
+ CConsumeStatus::E_CONSUME_SUCCESS :
+ CConsumeStatus::E_RECONSUME_LATER;
+ ack->Ack(status);
+}
+
+void ConsumerAck::Ack(CConsumeStatus status)
+{
+ if(inner)
+ {
+ // call inner ack in the main event loop
+ inner->Ack(status);
+ inner = NULL;
+ }
+}
+
+}
\ No newline at end of file
diff --git a/src/consumer_ack.h b/src/consumer_ack.h
new file mode 100644
index 0000000..a90e5ca
--- /dev/null
+++ b/src/consumer_ack.h
@@ -0,0 +1,50 @@
+#ifndef __ROCKETMQ_CONSUMER_ACK_H__
+#define __ROCKETMQ_CONSUMER_ACK_H__
+
+#include "consumer_ack_inner.h"
+#include <nan.h>
+
+namespace __node_rocketmq__ {
+
+using v8::Context;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Isolate;
+using v8::Local;
+using v8::Object;
+using v8::String;
+using v8::Value;
+
+class ConsumerAck : public Nan::ObjectWrap {
+public:
+ static NAN_MODULE_INIT(Init);
+
+private:
+ explicit ConsumerAck();
+ ~ConsumerAck();
+
+ static NAN_METHOD(New);
+ static NAN_METHOD(Done);
+
+ void Ack(CConsumeStatus status);
+
+ static Nan::Persistent<v8::Function> constructor;
+
+public:
+ void SetInner(ConsumerAckInner* _inner)
+ {
+ inner = _inner;
+ }
+
+ static Nan::Persistent<v8::Function>& GetConstructor()
+ {
+ return constructor;
+ }
+
+private:
+ ConsumerAckInner* inner;
+};
+
+}
+
+#endif
\ No newline at end of file
diff --git a/src/consumer_ack_inner.cpp b/src/consumer_ack_inner.cpp
new file mode 100644
index 0000000..dcf5e89
--- /dev/null
+++ b/src/consumer_ack_inner.cpp
@@ -0,0 +1,61 @@
+#include "consumer_ack_inner.h"
+
+namespace __node_rocketmq__ {
+
+ConsumerAckInner::ConsumerAckInner() :
+ acked(false)
+{
+ uv_cond_init(&cond);
+ uv_mutex_init(&mutex);
+}
+
+ConsumerAckInner::~ConsumerAckInner()
+{
+ uv_mutex_destroy(&mutex);
+ uv_cond_destroy(&cond);
+}
+
+void ConsumerAckInner::Ack(CConsumeStatus _status)
+{
+ uv_mutex_lock(&mutex);
+ bool _acked = acked;
+
+ if(_acked)
+ {
+ uv_mutex_unlock(&mutex);
+ return;
+ }
+
+ status = _status;
+ acked = true;
+
+ // tell `this->WaitResult()` to continue
+ uv_cond_signal(&cond);
+ uv_mutex_unlock(&mutex);
+}
+
+CConsumeStatus ConsumerAckInner::WaitResult()
+{
+ uv_mutex_lock(&mutex);
+
+ // if `cond signal` send before `WaitResult()`,
+ // `uv_cond_wait` will be blocked and never continue
+ //
+ // so we have to return result directly without `uv_cond_wait`
+ if(acked)
+ {
+ CConsumeStatus _status = status;
+ uv_mutex_unlock(&mutex);
+ return _status;
+ }
+
+ // wait for `this->Ack()` and that will emit `uv_cond_signal` to let it stop wait
+ uv_cond_wait(&cond, &mutex);
+
+ CConsumeStatus _status = status;
+ uv_mutex_unlock(&mutex);
+
+ return _status;
+}
+
+}
diff --git a/src/consumer_ack_inner.h b/src/consumer_ack_inner.h
new file mode 100644
index 0000000..402de6b
--- /dev/null
+++ b/src/consumer_ack_inner.h
@@ -0,0 +1,26 @@
+#ifndef __ROCKETMQ_CONSUMER_ACK_INNER_H__
+#define __ROCKETMQ_CONSUMER_ACK_INNER_H__
+
+#include <uv.h>
+#include <CPushConsumer.h>
+
+namespace __node_rocketmq__ {
+
+class ConsumerAckInner {
+public:
+ ConsumerAckInner();
+ ~ConsumerAckInner();
+
+ void Ack(CConsumeStatus _status);
+ CConsumeStatus WaitResult();
+
+private:
+ bool acked;
+ CConsumeStatus status;
+ uv_mutex_t mutex;
+ uv_cond_t cond;
+};
+
+}
+
+#endif
\ No newline at end of file
diff --git a/src/producer.cpp b/src/producer.cpp
new file mode 100644
index 0000000..36996c6
--- /dev/null
+++ b/src/producer.cpp
@@ -0,0 +1,251 @@
+#include "producer.h"
+#include "workers/producer/send_message.h"
+#include "workers/producer/start_or_shutdown.h"
+
+#include <MQClientException.h>
+#include <string>
+using namespace std;
+
+namespace __node_rocketmq__ {
+
+#define NAN_GET_CPRODUCER() \
+ RocketMQProducer* _v8_producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder()); \
+ CProducer* producer_ptr = _v8_producer->GetProducer();
+
+Nan::Persistent<Function> RocketMQProducer::constructor;
+
+RocketMQProducer::RocketMQProducer(const char* group_id, const char* instance_name)
+{
+ producer_ptr = CreateProducer(group_id);
+ if(instance_name)
+ {
+ SetProducerInstanceName(producer_ptr, instance_name);
+ }
+}
+
+RocketMQProducer::~RocketMQProducer()
+{
+ try
+ {
+ ShutdownProducer(producer_ptr);
+ }
+ catch (...)
+ {
+ //
+ }
+
+ DestroyProducer(producer_ptr);
+}
+
+void RocketMQProducer::SetOptions(Local<Object> options)
+{
+ // set name server
+ Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
+ if(_name_server_v->IsString())
+ {
+ Nan::Utf8String namesrv(_name_server_v);
+ SetProducerNameServerAddress(producer_ptr, *namesrv);
+ }
+
+ // set group name
+ Local<Value> _group_name_v = Nan::Get(options, Nan::New<String>("groupName").ToLocalChecked()).ToLocalChecked();
+ if(_group_name_v->IsString())
+ {
+ Nan::Utf8String group_name(_group_name_v);
+ SetProducerGroupName(producer_ptr, *group_name);
+ }
+
+ // set log num & single log size
+ int file_num = 3;
+ int64 file_size = 104857600;
+ Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
+ Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
+ if(_log_file_num_v->IsNumber())
+ {
+ file_num = _log_file_num_v->Int32Value();
+ }
+ if(_log_file_size_v->IsNumber())
+ {
+ file_size = _log_file_size_v->Int32Value();
+ }
+ SetProducerLogFileNumAndSize(producer_ptr, file_num, file_size);
+
+ // set log level
+ Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
+ if(_log_level_v->IsNumber())
+ {
+ int level = _log_level_v->Int32Value();
+ SetProducerLogLevel(producer_ptr, (CLogLevel)level);
+ }
+
+ // set compress level
+ Local<Value> _compress_level_v = Nan::Get(options, Nan::New<String>("compressLevel").ToLocalChecked()).ToLocalChecked();
+ if(_compress_level_v->IsNumber()) {
+ int level = _compress_level_v->Int32Value();
+ SetProducerCompressLevel(producer_ptr, level);
+ }
+
+ // set send message timeout
+ Local<Value> _send_message_timeout_v = Nan::Get(options, Nan::New<String>("sendMessageTimeout").ToLocalChecked()).ToLocalChecked();
+ if(_send_message_timeout_v->IsNumber())
+ {
+ int timeout = _send_message_timeout_v->Int32Value();
+ SetProducerSendMsgTimeout(producer_ptr, timeout);
+ }
+
+ // set max message size
+ Local<Value> _max_message_size_v = Nan::Get(options, Nan::New<String>("maxMessageSize").ToLocalChecked()).ToLocalChecked();
+ if(_max_message_size_v->IsNumber())
+ {
+ int size = _max_message_size_v->Int32Value();
+ SetProducerMaxMessageSize(producer_ptr, size);
+ }
+}
+
+NAN_MODULE_INIT(RocketMQProducer::Init)
+{
+ Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
+ tpl->SetClassName(Nan::New("RocketMQProducer").ToLocalChecked());
+ tpl->InstanceTemplate()->SetInternalFieldCount(1);
+
+ Nan::SetPrototypeMethod(tpl, "start", Start);
+ Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
+ Nan::SetPrototypeMethod(tpl, "send", Send);
+ Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
+
+ constructor.Reset(tpl->GetFunction());
+ Nan::Set(target, Nan::New("Producer").ToLocalChecked(), tpl->GetFunction());
+}
+
+NAN_METHOD(RocketMQProducer::New)
+{
+ Isolate* isolate = info.GetIsolate();
+ Local<Context> context = Context::New(isolate);
+
+ if(!info.IsConstructCall())
+ {
+ const int argc = 3;
+ Local<Value> argv[argc] = { info[0], info[1], info[2] };
+ Local<Function> _constructor = Nan::New<v8::Function>(constructor);
+ info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
+ return;
+ }
+
+ Nan::Utf8String group_id(info[0]);
+ Nan::Utf8String instance_name(info[1]);
+ Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
+ RocketMQProducer* producer = new RocketMQProducer(*group_id, info[1]->IsNull() ? NULL : *instance_name);
+
+ producer->Wrap(info.This());
+
+ // try to set options
+ try
+ {
+ producer->SetOptions(options);
+ }
+ catch (runtime_error e)
+ {
+ Nan::ThrowError(e.what());
+ return;
+ }
+
+ info.GetReturnValue().Set(info.This());
+}
+
+NAN_METHOD(RocketMQProducer::Start)
+{
+ NAN_GET_CPRODUCER();
+
+ Nan::Callback* callback = (info[0]->IsFunction()) ?
+ new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+ NULL;
+
+ Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::START_PRODUCER));
+}
+
+NAN_METHOD(RocketMQProducer::Shutdown)
+{
+ NAN_GET_CPRODUCER();
+
+ Nan::Callback* callback = (info[0]->IsFunction()) ?
+ new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+ NULL;
+
+ Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::SHUTDOWN_PRODUCER));
+}
+
+NAN_METHOD(RocketMQProducer::SetSessionCredentials)
+{
+ NAN_GET_CPRODUCER();
+
+ Nan::Utf8String access_key(info[0]);
+ Nan::Utf8String secret_key(info[1]);
+ Nan::Utf8String ons_channel(info[2]);
+
+ int ret;
+ try
+ {
+ ret = SetProducerSessionCredentials(producer_ptr, *access_key, *secret_key, *ons_channel);
+ }
+ catch(runtime_error e)
+ {
+ Nan::ThrowError(e.what());
+ }
+ catch(std::exception& e)
+ {
+ Nan::ThrowError(e.what());
+ }
+ catch(rocketmq::MQException& e)
+ {
+ Nan::ThrowError(e.what());
+ }
+ info.GetReturnValue().Set(ret);
+}
+
+NAN_METHOD(RocketMQProducer::Send)
+{
+ Nan::Utf8String topic(info[0]);
+ Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
+
+ CMessage* msg = CreateMessage(*topic);
+
+ Local<Value> _tags_to_be_checked = Nan::Get(options, Nan::New<String>("tags").ToLocalChecked()).ToLocalChecked();
+ Local<Value> _keys_to_be_checked = Nan::Get(options, Nan::New<String>("keys").ToLocalChecked()).ToLocalChecked();
+
+ if(_tags_to_be_checked->IsString())
+ {
+ Nan::Utf8String tags(_tags_to_be_checked);
+ SetMessageTags(msg, *tags);
+ }
+
+ if(_keys_to_be_checked->IsString())
+ {
+ Nan::Utf8String keys(_keys_to_be_checked);
+ SetMessageKeys(msg, *keys);
+ }
+
+ // set message body:
+ // 1. if it's a string, call `SetMessageBody`;
+ // 2. if it's a buffer, call `SetByteMessageBody`.
+ if(info[1]->IsString())
+ {
+ Nan::Utf8String body(info[1]);
+ SetMessageBody(msg, *body);
+ }
+ else
+ {
+ Local<Object> node_buff_object = Nan::To<Object>(info[1]).ToLocalChecked();
+ unsigned int length = node::Buffer::Length(node_buff_object);
+ const char* buff = node::Buffer::Data(node_buff_object);
+ SetByteMessageBody(msg, buff, length);
+ }
+
+ Nan::Callback* callback = (info[3]->IsFunction()) ?
+ new Nan::Callback(Nan::To<Function>(info[3]).ToLocalChecked()) :
+ NULL;
+
+ RocketMQProducer* producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder());
+ Nan::AsyncQueueWorker(new ProducerSendMessageWorker(callback, producer, msg));
+}
+
+}
diff --git a/src/producer.h b/src/producer.h
new file mode 100644
index 0000000..55966f9
--- /dev/null
+++ b/src/producer.h
@@ -0,0 +1,46 @@
+#ifndef __ROCKETMQ_PRODUCER_H__
+#define __ROCKETMQ_PRODUCER_H__
+
+#include <CProducer.h>
+#include <nan.h>
+
+namespace __node_rocketmq__ {
+
+using v8::Context;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Isolate;
+using v8::Local;
+using v8::Object;
+using v8::String;
+using v8::Value;
+
+class RocketMQProducer : public Nan::ObjectWrap {
+public:
+ static NAN_MODULE_INIT(Init);
+
+public:
+ CProducer* GetProducer() { return producer_ptr; }
+
+private:
+ explicit RocketMQProducer(const char* group_id, const char* instance_name);
+ ~RocketMQProducer();
+
+ static NAN_METHOD(New);
+ static NAN_METHOD(Start);
+ static NAN_METHOD(Shutdown);
+ static NAN_METHOD(Send);
+ static NAN_METHOD(SetSessionCredentials);
+
+ static Nan::Persistent<Function> constructor;
+
+private:
+ void SetOptions(Local<Object> options);
+
+private:
+ CProducer* producer_ptr;
+};
+
+}
+
+#endif
diff --git a/src/push_consumer.cpp b/src/push_consumer.cpp
new file mode 100644
index 0000000..94a626b
--- /dev/null
+++ b/src/push_consumer.cpp
@@ -0,0 +1,378 @@
+#include <map>
+#include "push_consumer.h"
+#include "consumer_ack.h"
+#include "workers/push_consumer/start_or_shutdown.h"
+
+using namespace std;
+
+namespace __node_rocketmq__ {
+
+struct MessageHandlerParam
+{
+ RocketMQPushConsumer* consumer;
+ ConsumerAckInner* ack;
+ CMessageExt* msg;
+};
+char message_handler_param_keys[5][8] = { "topic", "tags", "keys", "body", "msgId" };
+
+uv_mutex_t _get_msg_ext_column_lock;
+
+map<CPushConsumer*, RocketMQPushConsumer*> _push_consumer_map;
+
+#define NAN_GET_CPUSHCONSUMER() \
+ RocketMQPushConsumer* _v8_consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder()); \
+ CPushConsumer* consumer_ptr = _v8_consumer->GetConsumer();
+
+Nan::Persistent<Function> RocketMQPushConsumer::constructor;
+
+RocketMQPushConsumer::RocketMQPushConsumer(const char* group_id, const char* instance_name) :
+ consumer_ptr(NULL)
+{
+ consumer_ptr = CreatePushConsumer(group_id);
+
+ if(instance_name)
+ {
+ SetPushConsumerInstanceName(consumer_ptr, instance_name);
+ }
+
+ _push_consumer_map[consumer_ptr] = this;
+
+ RegisterMessageCallback(consumer_ptr, RocketMQPushConsumer::OnMessage);
+}
+
+RocketMQPushConsumer::~RocketMQPushConsumer()
+{
+ try
+ {
+ ShutdownPushConsumer(consumer_ptr);
+ auto it = _push_consumer_map.find(consumer_ptr);
+ if(it != _push_consumer_map.end())
+ {
+ _push_consumer_map.erase(consumer_ptr);
+ }
+ }
+ catch(...)
+ {
+ //
+ }
+
+ DestroyPushConsumer(consumer_ptr);
+ consumer_ptr = NULL;
+}
+
+void RocketMQPushConsumer::SetOptions(Local<Object> options)
+{
+ // set name server
+ Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
+ if(_name_server_v->IsString())
+ {
+ Nan::Utf8String namesrv(_name_server_v);
+ SetPushConsumerNameServerAddress(consumer_ptr, *namesrv);
+ }
+
+ // set thread count
+ Local<Value> _thread_count_v = Nan::Get(options, Nan::New<String>("threadCount").ToLocalChecked()).ToLocalChecked();
+ if(_thread_count_v->IsNumber())
+ {
+ int thread_count = Nan::To<int32_t>(_thread_count_v).FromJust();
+ if(thread_count > 0)
+ {
+ SetPushConsumerThreadCount(consumer_ptr, thread_count);
+ }
+ }
+
+ // set message batch max size
+ Local<Value> _max_batch_size_v = Nan::Get(options, Nan::New<String>("maxBatchSize").ToLocalChecked()).ToLocalChecked();
+ if(_max_batch_size_v->IsNumber())
+ {
+ int max_batch_size = Nan::To<int32_t>(_max_batch_size_v).FromJust();
+ if(max_batch_size > 0)
+ {
+ SetPushConsumerMessageBatchMaxSize(consumer_ptr, max_batch_size);
+ }
+ }
+
+ // set log num & single log size
+ int file_num = 3;
+ int64 file_size = 104857600;
+ Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
+ Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
+ if(_log_file_num_v->IsNumber())
+ {
+ file_num = _log_file_num_v->Int32Value();
+ }
+ if(_log_file_size_v->IsNumber())
+ {
+ file_size = _log_file_size_v->Int32Value();
+ }
+ SetPushConsumerLogFileNumAndSize(consumer_ptr, file_num, file_size);
+
+ // set log level
+ Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
+ if(_log_level_v->IsNumber())
+ {
+ int level = _log_level_v->Int32Value();
+ SetPushConsumerLogLevel(consumer_ptr, (CLogLevel) level);
+ }
+}
+
+NAN_MODULE_INIT(RocketMQPushConsumer::Init)
+{
+ uv_mutex_init(&_get_msg_ext_column_lock);
+ Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
+ tpl->SetClassName(Nan::New("RocketMQPushConsumer").ToLocalChecked());
+ tpl->InstanceTemplate()->SetInternalFieldCount(1);
+
+ Nan::SetPrototypeMethod(tpl, "start", Start);
+ Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
+ Nan::SetPrototypeMethod(tpl, "subscribe", Subscribe);
+ Nan::SetPrototypeMethod(tpl, "setListener", SetListener);
+ Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
+
+ constructor.Reset(tpl->GetFunction());
+ Nan::Set(target, Nan::New("PushConsumer").ToLocalChecked(), tpl->GetFunction());
+}
+
+NAN_METHOD(RocketMQPushConsumer::New)
+{
+ Isolate* isolate = info.GetIsolate();
+ Local<Context> context = Context::New(isolate);
+
+ if(!info.IsConstructCall())
+ {
+ const int argc = 3;
+ Local<Value> argv[argc] = { info[0], info[1], info[2] };
+ Local<Function> _constructor = Nan::New<v8::Function>(constructor);
+ info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
+ return;
+ }
+
+ Nan::Utf8String v8_group_id(info[0]);
+ Nan::Utf8String v8_instance_name(info[1]);
+ string group_id = *v8_group_id;
+ string instance_name = *v8_instance_name;
+ Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
+ RocketMQPushConsumer* consumer = new RocketMQPushConsumer(group_id.c_str(), info[1]->IsNull() ? NULL : instance_name.c_str());
+
+ consumer->Wrap(info.This());
+
+ // try to set options
+ try
+ {
+ consumer->SetOptions(options);
+ }
+ catch(const runtime_error e)
+ {
+ Nan::ThrowError(e.what());
+ return;
+ }
+ catch(const std::exception& e)
+ {
+ Nan::ThrowError(e.what());
+ return;
+ }
+
+ info.GetReturnValue().Set(info.This());
+}
+
+NAN_METHOD(RocketMQPushConsumer::Start)
+{
+ NAN_GET_CPUSHCONSUMER();
+
+ Nan::Callback* callback = (info[0]->IsFunction()) ?
+ new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+ NULL;
+
+ Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::START_PUSH_CONSUMER));
+}
+
+NAN_METHOD(RocketMQPushConsumer::Shutdown)
+{
+ NAN_GET_CPUSHCONSUMER();
+
+ Nan::Callback* callback = (info[0]->IsFunction()) ?
+ new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+ NULL;
+
+ Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::SHUTDOWN_PUSH_CONSUMER));
+}
+
+NAN_METHOD(RocketMQPushConsumer::Subscribe)
+{
+ NAN_GET_CPUSHCONSUMER();
+
+ Nan::Utf8String v8_topic(info[0]);
+ Nan::Utf8String v8_expression(info[1]);
+ string topic = *v8_topic;
+ string expression = *v8_expression;
+
+ int ret;
+ try
+ {
+ ret = ::Subscribe(consumer_ptr, topic.c_str(), expression.c_str());
+ }
+ catch(const runtime_error e)
+ {
+ Nan::ThrowError(e.what());
+ return;
+ }
+ catch(const std::exception& e)
+ {
+ Nan::ThrowError(e.what());
+ return;
+ }
+
+ info.GetReturnValue().Set(ret);
+}
+
+NAN_METHOD(RocketMQPushConsumer::SetListener)
+{
+ RocketMQPushConsumer* consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder());
+ if(!consumer->listener_func.IsEmpty())
+ {
+ consumer->listener_func.Reset();
+ }
+
+ consumer->listener_func.Reset(Nan::To<Function>(info[0]).ToLocalChecked());
+}
+
+NAN_METHOD(RocketMQPushConsumer::SetSessionCredentials)
+{
+ NAN_GET_CPUSHCONSUMER();
+
+ Nan::Utf8String access_key(info[0]);
+ Nan::Utf8String secret_key(info[1]);
+ Nan::Utf8String ons_channel(info[2]);
+
+ int ret;
+ try
+ {
+ ret = SetPushConsumerSessionCredentials(consumer_ptr, *access_key, *secret_key, *ons_channel);
+ }
+ catch(const runtime_error e)
+ {
+ Nan::ThrowError(e.what());
+ return;
+ }
+ catch(const std::exception& e)
+ {
+ Nan::ThrowError(e.what());
+ return;
+ }
+
+ info.GetReturnValue().Set(ret);
+}
+
+string RocketMQPushConsumer::GetMessageColumn(char* name, CMessageExt* msg)
+{
+ const char* orig = NULL;
+
+ uv_mutex_lock(&_get_msg_ext_column_lock);
+ switch(name[0])
+ {
+ // topic / tags
+ case 't':
+ orig = name[1] == 'o' ? GetMessageTopic(msg) : GetMessageTags(msg);
+ break;
+
+ // keys
+ case 'k':
+ orig = GetMessageKeys(msg);
+ break;
+
+ // body
+ case 'b':
+ orig = GetMessageBody(msg);
+ break;
+
+ // msgId
+ case 'm':
+ orig = GetMessageId(msg);
+ break;
+
+ default:
+ orig = NULL;
+ break;
+ }
+
+ uv_mutex_unlock(&_get_msg_ext_column_lock);
+
+ if(!orig) return "";
+ return orig;
+}
+
+void close_async_done(uv_handle_t* handle)
+{
+ free(handle);
+}
+
+void RocketMQPushConsumer::HandleMessageInEventLoop(uv_async_t* async)
+{
+ Nan::HandleScope scope;
+
+ Isolate* isolate = Isolate::GetCurrent();
+ Local<Context> context = isolate->GetCurrentContext();
+
+ MessageHandlerParam* param = (MessageHandlerParam*)(async->data);
+ RocketMQPushConsumer* consumer = param->consumer;
+ ConsumerAckInner* ack_inner = param->ack;
+ CMessageExt* msg = param->msg;
+
+ // create the JavaScript ack object and then set inner ack object
+ Local<Function> cons = Nan::New<Function>(ConsumerAck::GetConstructor());
+ Local<Object> ack_obj = cons->NewInstance(context, 0, 0).ToLocalChecked();
+ ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(ack_obj);
+ ack->SetInner(ack_inner);
+
+ // TODO: const char *GetMessageProperty(CMessageExt *msgExt, const char *key);
+ Local<Object> result = Nan::New<Object>();
+ for(int i = 0; i < 5; i++)
+ {
+ Nan::Set(
+ result,
+ Nan::New(message_handler_param_keys[i]).ToLocalChecked(),
+ Nan::New(RocketMQPushConsumer::GetMessageColumn(message_handler_param_keys[i], msg)).ToLocalChecked());
+ }
+
+ Local<Value> argv[2] = {
+ result,
+ ack_obj
+ };
+ Nan::Callback* callback = consumer->GetListenerFunction();
+ callback->Call(2, argv);
+
+ uv_close((uv_handle_t*)async, close_async_done);
+}
+
+int RocketMQPushConsumer::OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext)
+{
+ RocketMQPushConsumer* consumer = _push_consumer_map[consumer_ptr];
+ if (!consumer)
+ {
+ // TODO: error handle
+ return CConsumeStatus::E_RECONSUME_LATER;
+ }
+
+ ConsumerAckInner ack_inner;
+
+ // create async parameter
+ MessageHandlerParam param;
+ param.consumer = consumer;
+ param.ack = &ack_inner;
+ param.msg = msg_ext;
+
+ // create a new async handler and bind with `RocketMQPushConsumer::HandleMessageInEventLoop`
+ uv_async_t* async = (uv_async_t*)malloc(sizeof(uv_async_t));
+ uv_async_init(uv_default_loop(), async, RocketMQPushConsumer::HandleMessageInEventLoop);
+ async->data = (void*)(¶m);
+
+ // send async handler
+ uv_async_send(async);
+
+ // wait for result
+ CConsumeStatus status = ack_inner.WaitResult();
+
+ return status;
+}
+
+}
diff --git a/src/push_consumer.h b/src/push_consumer.h
new file mode 100644
index 0000000..8ee466e
--- /dev/null
+++ b/src/push_consumer.h
@@ -0,0 +1,62 @@
+#ifndef __ROCKETMQ_PUSH_CONSUMER_H__
+#define __ROCKETMQ_PUSH_CONSUMER_H__
+
+#include <CPushConsumer.h>
+#include <uv.h>
+#include <nan.h>
+#include <string>
+
+namespace __node_rocketmq__ {
+
+using v8::Context;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Isolate;
+using v8::Local;
+using v8::Object;
+using v8::String;
+using v8::Value;
+
+class RocketMQPushConsumer : public Nan::ObjectWrap {
+public:
+ static NAN_MODULE_INIT(Init);
+ static int OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext);
+ static std::string GetMessageColumn(char* name, CMessageExt* msg);
+
+private:
+ explicit RocketMQPushConsumer(const char* group_id, const char* instance_name);
+ ~RocketMQPushConsumer();
+
+ static NAN_METHOD(New);
+ static NAN_METHOD(Start);
+ static NAN_METHOD(Shutdown);
+ static NAN_METHOD(Subscribe);
+ static NAN_METHOD(SetListener);
+ static NAN_METHOD(SetSessionCredentials);
+
+ static Nan::Persistent<v8::Function> constructor;
+
+ void SetOptions(Local<Object>);
+ static void HandleMessageInEventLoop(uv_async_t* async);
+
+protected:
+ CPushConsumer* GetConsumer()
+ {
+ return consumer_ptr;
+ }
+
+ Nan::Callback* GetListenerFunction()
+ {
+ Nan::Callback* cb;
+ cb = &listener_func;
+ return cb;
+ }
+
+private:
+ CPushConsumer* consumer_ptr;
+ Nan::Callback listener_func;
+};
+
+}
+
+#endif
diff --git a/src/rocketmq.cpp b/src/rocketmq.cpp
new file mode 100644
index 0000000..bc29609
--- /dev/null
+++ b/src/rocketmq.cpp
@@ -0,0 +1,32 @@
+#include <nan.h>
+
+#include "producer.h"
+#include "push_consumer.h"
+#include "consumer_ack.h"
+
+namespace __node_rocketmq__ {
+
+#if defined(__APPLE__)
+uv_lib_t lib;
+
+NAN_METHOD(DLOpen)
+{
+ Nan::Utf8String filename(info[0]);
+ uv_dlopen(*filename, &lib);
+}
+#endif
+
+NAN_MODULE_INIT(Init)
+{
+ RocketMQProducer::Init(target);
+ RocketMQPushConsumer::Init(target);
+ ConsumerAck::Init(target);
+
+#if defined(__APPLE__)
+ Nan::Set(target, Nan::New("macosDLOpen").ToLocalChecked(), Nan::New<v8::FunctionTemplate>(DLOpen)->GetFunction());
+#endif
+}
+
+NODE_MODULE(rocketmq, Init)
+
+}
diff --git a/src/workers/producer/send_message.h b/src/workers/producer/send_message.h
new file mode 100644
index 0000000..f757319
--- /dev/null
+++ b/src/workers/producer/send_message.h
@@ -0,0 +1,64 @@
+#ifndef __ROCKETMQ_SEND_MESSAGE_H__
+#define __ROCKETMQ_SEND_MESSAGE_H__
+
+#include <nan.h>
+#include <CProducer.h>
+#include <MQClientException.h>
+
+namespace __node_rocketmq__ {
+
+using namespace std;
+
+class ProducerSendMessageWorker : public Nan::AsyncWorker {
+public:
+ ProducerSendMessageWorker(Nan::Callback* callback, RocketMQProducer* producer, CMessage* msg) :
+ AsyncWorker(callback),
+ msg(msg),
+ producer(producer)
+ {
+ }
+
+ ~ProducerSendMessageWorker()
+ {
+ DestroyMessage(msg);
+ }
+
+ void Execute()
+ {
+ try
+ {
+ SendMessageSync(producer->GetProducer(), msg, &send_ret);
+ }
+ catch(const runtime_error e)
+ {
+ SetErrorMessage(e.what());
+ }
+ catch(const std::exception& e)
+ {
+ SetErrorMessage(e.what());
+ }
+ }
+
+ void HandleOKCallback()
+ {
+ Nan::HandleScope scope;
+
+ Local<Value> argv[] = {
+ Nan::Undefined(),
+ Nan::New<v8::Number>((unsigned int)send_ret.sendStatus),
+ Nan::New<v8::String>(send_ret.msgId).ToLocalChecked(),
+ Nan::New<v8::Number>((long long)send_ret.offset)
+ };
+ callback->Call(4, argv);
+ }
+
+private:
+ CMessage* msg;
+ RocketMQProducer* producer;
+
+ CSendResult send_ret;
+};
+
+}
+
+#endif
diff --git a/src/workers/producer/start_or_shutdown.h b/src/workers/producer/start_or_shutdown.h
new file mode 100644
index 0000000..42aae60
--- /dev/null
+++ b/src/workers/producer/start_or_shutdown.h
@@ -0,0 +1,72 @@
+#ifndef __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
+#define __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
+
+#include <nan.h>
+#include <CProducer.h>
+#include <MQClientException.h>
+
+namespace __node_rocketmq__ {
+
+using namespace std;
+
+enum ProducerWorkerType {
+ START_PRODUCER = 0,
+ SHUTDOWN_PRODUCER
+};
+
+class ProducerStartOrShutdownWorker : public Nan::AsyncWorker {
+public:
+ ProducerStartOrShutdownWorker(Nan::Callback* callback, CProducer* producer_ptr, ProducerWorkerType type) :
+ AsyncWorker(callback),
+ producer_ptr(producer_ptr),
+ ret(0),
+ type(type)
+ {
+ }
+
+ ~ProducerStartOrShutdownWorker()
+ {
+ }
+
+ void Execute()
+ {
+ try
+ {
+ switch(type) {
+ case START_PRODUCER:
+ ret = StartProducer(producer_ptr); break;
+ case SHUTDOWN_PRODUCER:
+ ret = ShutdownProducer(producer_ptr); break;
+ default: break;
+ }
+ }
+ catch(const runtime_error e)
+ {
+ SetErrorMessage(e.what());
+ }
+ catch(const exception& e)
+ {
+ SetErrorMessage(e.what());
+ }
+ }
+
+ void HandleOKCallback()
+ {
+ Nan::HandleScope scope;
+
+ Local<Value> argv[] = {
+ Nan::Undefined(),
+ Nan::New<v8::Number>((int)ret),
+ };
+ callback->Call(2, argv);
+ }
+
+private:
+ CProducer* producer_ptr;
+ int ret;
+ ProducerWorkerType type;
+};
+
+}
+
+#endif
diff --git a/src/workers/push_consumer/start_or_shutdown.h b/src/workers/push_consumer/start_or_shutdown.h
new file mode 100644
index 0000000..84371b1
--- /dev/null
+++ b/src/workers/push_consumer/start_or_shutdown.h
@@ -0,0 +1,72 @@
+#ifndef __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
+#define __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
+
+#include <nan.h>
+#include <CPushConsumer.h>
+#include <MQClientException.h>
+
+namespace __node_rocketmq__ {
+
+using namespace std;
+
+enum PushConsumerWorkerType {
+ START_PUSH_CONSUMER = 0,
+ SHUTDOWN_PUSH_CONSUMER
+};
+
+class PushConsumerStartOrShutdownWorker : public Nan::AsyncWorker {
+public:
+ PushConsumerStartOrShutdownWorker(Nan::Callback* callback, CPushConsumer* consumer_ptr, PushConsumerWorkerType type) :
+ AsyncWorker(callback),
+ consumer_ptr(consumer_ptr),
+ ret(0),
+ type(type)
+ {
+ }
+
+ ~PushConsumerStartOrShutdownWorker()
+ {
+ }
+
+ void Execute()
+ {
+ try
+ {
+ switch(type) {
+ case START_PUSH_CONSUMER:
+ ret = StartPushConsumer(consumer_ptr); break;
+ case SHUTDOWN_PUSH_CONSUMER:
+ ret = ShutdownPushConsumer(consumer_ptr); break;
+ default: break;
+ }
+ }
+ catch(const runtime_error e)
+ {
+ SetErrorMessage(e.what());
+ }
+ catch(const exception& e)
+ {
+ SetErrorMessage(e.what());
+ }
+ }
+
+ void HandleOKCallback()
+ {
+ Nan::HandleScope scope;
+
+ Local<Value> argv[] = {
+ Nan::Undefined(),
+ Nan::New<v8::Number>((int)ret),
+ };
+ callback->Call(2, argv);
+ }
+
+private:
+ CPushConsumer* consumer_ptr;
+ int ret;
+ PushConsumerWorkerType type;
+};
+
+}
+
+#endif