Refactor: first demo for developing this SDK (#1)

Refactor: first demo for developing this SDK
diff --git a/.eslintrc b/.eslintrc
new file mode 100644
index 0000000..4601220
--- /dev/null
+++ b/.eslintrc
@@ -0,0 +1,5 @@
+{
+    "extends": [
+      "rocketmq-style"
+    ]
+}
diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md
new file mode 100644
index 0000000..1c4f079
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE.md
@@ -0,0 +1,20 @@
+<!--
+Thank you for reporting an issue.
+
+This issue tracker is for bugs and issues found within RocketMQ Node.js SDK.
+
+Please fill in as much of the template below as you're able.
+
+Version:
+Platform: output of `uname -a` (UNIX), or version and 32 or 64-bit (Windows)
+Subsystem: if known, please specify affected core module name
+
+If possible, please provide code that demonstrates the problem, keeping it as
+simple and free of external dependencies as you are able.
+-->
+
+* **Version**:
+* **Platform**:
+* **Subsystem**:
+
+<!-- Enter your issue details below this comment. -->
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
new file mode 100644
index 0000000..c2f77b7
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -0,0 +1,16 @@
+<!--
+Thank you for your pull request. Please provide a description above and review
+the requirements below.
+
+Bug fixes and new features should include tests and possibly benchmarks.
+
+Contributors guide: https://github.com/apache/rocketmq-client-nodejs/blob/master/CONTRIBUTING.md
+-->
+
+##### Checklist
+<!-- Remove items that do not apply. For completed items, change [ ] to [x]. -->
+
+- [ ] `npm test` passes
+- [ ] tests and/or benchmarks are included
+- [ ] documentation is changed or added
+- [ ] commit message follows [commit guidelines](https://github.com/apache/rocketmq-client-nodejs/blob/master/PULL_REQUEST.md#commit-message-guidelines)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..c9d16aa
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,188 @@
+# Created by https://www.gitignore.io/api/c++,node,linux,macos,windows
+# Edit at https://www.gitignore.io/?templates=c++,node,linux,macos,windows
+
+### C++ ###
+# Prerequisites
+*.d
+
+# Compiled Object files
+*.slo
+*.lo
+*.o
+*.obj
+
+# Precompiled Headers
+*.gch
+*.pch
+
+# Compiled Dynamic libraries
+*.so
+*.dylib
+*.dll
+
+# Fortran module files
+*.mod
+*.smod
+
+# Compiled Static libraries
+*.lai
+*.la
+*.a
+*.lib
+
+# Executables
+*.exe
+*.out
+*.app
+
+### Linux ###
+*~
+
+# temporary files which can be created if a process still has a handle open of a deleted file
+.fuse_hidden*
+
+# KDE directory preferences
+.directory
+
+# Linux trash folder which might appear on any partition or disk
+.Trash-*
+
+# .nfs files are created when an open file is removed but is still being accessed
+.nfs*
+
+### macOS ###
+# General
+.DS_Store
+.AppleDouble
+.LSOverride
+
+# Icon must end with two \r
+Icon
+
+# Thumbnails
+._*
+
+# Files that might appear in the root of a volume
+.DocumentRevisions-V100
+.fseventsd
+.Spotlight-V100
+.TemporaryItems
+.Trashes
+.VolumeIcon.icns
+.com.apple.timemachine.donotpresent
+
+# Directories potentially created on remote AFP share
+.AppleDB
+.AppleDesktop
+Network Trash Folder
+Temporary Items
+.apdisk
+
+### Node ###
+# Logs
+logs
+*.log
+npm-debug.log*
+yarn-debug.log*
+yarn-error.log*
+
+# Runtime data
+pids
+*.pid
+*.seed
+*.pid.lock
+
+# Directory for instrumented libs generated by jscoverage/JSCover
+lib-cov
+
+# Coverage directory used by tools like istanbul
+coverage
+
+# nyc test coverage
+.nyc_output
+
+# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
+.grunt
+
+# Bower dependency directory (https://bower.io/)
+bower_components
+
+# node-waf configuration
+.lock-wscript
+
+# Compiled binary addons (https://nodejs.org/api/addons.html)
+build/Release
+
+# Dependency directories
+node_modules/
+jspm_packages/
+
+# TypeScript v1 declaration files
+typings/
+
+# Optional npm cache directory
+.npm
+
+# Optional eslint cache
+.eslintcache
+
+# Optional REPL history
+.node_repl_history
+
+# Output of 'npm pack'
+*.tgz
+
+# Yarn Integrity file
+.yarn-integrity
+
+# dotenv environment variables file
+.env
+
+# parcel-bundler cache (https://parceljs.org/)
+.cache
+
+# next.js build output
+.next
+
+# nuxt.js build output
+.nuxt
+
+# vuepress build output
+.vuepress/dist
+
+# Serverless directories
+.serverless
+
+# FuseBox cache
+.fusebox/
+
+### Windows ###
+# Windows thumbnail cache files
+Thumbs.db
+ehthumbs.db
+ehthumbs_vista.db
+
+# Dump file
+*.stackdump
+
+# Folder config file
+[Dd]esktop.ini
+
+# Recycle Bin used on file shares
+$RECYCLE.BIN/
+
+# Windows Installer files
+*.cab
+*.msi
+*.msix
+*.msm
+*.msp
+
+# Windows shortcuts
+*.lnk
+
+# End of https://www.gitignore.io/api/c++,node,linux,macos,windows
+
+build
+package-lock.json
+.vscode
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..c9fc461
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,3 @@
+[submodule "deps/rocketmq"]
+path = deps/rocketmq
+url = https://github.com/apache/rocketmq-client-cpp.git
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..3ced58e
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,39 @@
+language: node_js
+
+node_js: 10
+
+before_script:
+  - wget http://us.mirrors.quenda.co/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
+  - unzip rocketmq-all-4.3.2-bin-release.zip
+  - cd rocketmq-all-4.3.2-bin-release
+  - perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
+  - nohup sh bin/mqnamesrv &
+  - nohup sh bin/mqbroker -n localhost:9876 &
+  - sleep 10
+  - ./bin/mqadmin updateTopic -b '127.0.0.1:10911' –n '127.0.0.1:9876' -t test
+  - ./bin/mqadmin updateSubGroup -b '127.0.0.1:10911' –n '127.0.0.1:9876' -g testGroup
+
+script:
+  - npm test
+
+matrix:
+  include:
+    - os: linux
+      dist: trusty
+    - os: linux
+      dist: xenial
+      jdk: openjdk8
+      env:
+        JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
+      apt:
+        packages:
+          - openjdk8
+    - os: windows
+    - os: osx
+      osx_image: xcode9.3
+      env:
+        JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_112.jdk/Contents/Home/jre
+  allow_failures:
+    - os: linux
+      dist: xenial
+    - os: windows
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..802ac1f
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1 @@
+## TODO
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..d53ea09
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+                                Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (properties) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/README.md b/README.md
index 0743cbd..e4b3054 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,307 @@
-# rocketmq-client-nodejs
+# RocketMQ Client for Node.js
+
+This official Node.js client is a C++ binding of [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), which has been proven robust and widely adopted within Alibaba Group by many business units for more than three years.
+
+> **Notice 1:** This client is still in `dev` version. Use it cautiously in production.
+
+> **Notice 2:** This SDK is now only support macOS and Ubuntu **14.04**. Ubuntu 16+ is not supported and CentOS is not tested yet.
+
+## Installation
+
+```shell
+$ npm install --save apache-rocketmq
+```
+
+## Examples
+
+You may view [example/producer.js](https://github.com/apache/rocketmq-client-nodejs/blob/master/example/producer.js) and
+[example/push_consumer.js](https://github.com/apache/rocketmq-client-nodejs/blob/master/example/push_consumer.js) for quick start.
+
+## Usage
+
+Require this package first.
+
+```javascript
+const { Producer, PushConsumer } = require("apache-rocketmq");
+```
+
+### Producer
+
+#### Constructor
+
+```javascript
+new Producer(groupId[, instanceName][, options]);
+```
+
+`Producer`'s constructor receives three parameters:
+
++ `groupId`: the group id of the producer;
++ `instanceName`: the instance name of the producer, **optional**;
++ `options`: the options object, **optional**;
+  - `nameServer`: the name server of RocketMQ;
+  - `groupName`: the group name of this producer;
+  - `compressLevel`: the compress level (0-9) of this producer, default to `5` where `0` is fastest and `9` is most compressed;
+  - `sendMessageTimeout`: send message timeout millisecond, default to `3000` and suggestion is 2000 - 3000ms;
+  - `maxMessageSize`: max message size with unit (B), default to `1024 * 128` which means 128K;
+  - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logFileSize`: size of each C++ core logic log file with unit (B);
+  - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
+
+e.g.
+
+```javascript
+const { Producer } = require("apache-rocketmq");
+const producer = new Producer("GROUP_ID", "INSTANCE_NAME", {
+    nameServer: "127.0.0.1:9876",
+});
+```
+
+#### start
+
+```javascript
+producer.start([callback]);
+```
+
+`.start` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+producer.start(function(err) {
+    if(err) {
+        //
+    }
+});
+
+// or
+
+producer.start().then(() => {
+    //
+}).catch(err => {
+    //
+});
+```
+
+#### shutdown
+
+```javascript
+producer.start([callback]);
+```
+
+`.shutdown` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+producer.shutdown(function(err) {
+    if(err) {
+        //
+    }
+});
+
+// or
+
+producer.shutdown().then(() => {
+    //
+}).catch(err => {
+    //
+});
+```
+
+#### send
+
+```javascript
+producer.send(topic, body[, options][, callback]);
+```
+
+`.send` receives 4 parameters including a callback. If no callback passed, this function will return a Promise object.
+
++ `topic`: the topic string;
++ `body`: the message body string;
++ `options`: the options object, **optional**;
+  - `keys`: the keys for this message;
+  - `tags`: the tags for this message;
++ `callback`: the callback function, **optional**.
+
+e.g.
+
+```javascript
+producer.send("test", `baz ${i}`, {
+    keys: "foo",
+    tags: "bar"
+}, function(err, result) {
+    if(err) {
+        // ...    
+    } else {
+        console.log(result);
+
+        // console example:
+        //
+        //  { status: 0,
+        //    statusStr: 'OK',
+        //    msgId: '0101007F0000367E0000309DD68B0700',
+        //    offset: 0 }
+    }
+});
+```
+
+##### send `status` and `statusStr`
+
+| `status` | `statusStr`           |
+|----------|-----------------------|
+| `0`      | `OK`                  |
+| `1`      | `FLUSH_DISK_TIMEOUT`  |
+| `2`      | `FLUSH_SLAVE_TIMEOUT` |
+| `3`      | `SLAVE_NOT_AVAILABLE` |
+
+### PushConsumer
+
+#### Constructor
+
+```javascript
+new PushConsumer(groupId[, instanceName][, options]);
+```
+
+`PushConsumer`'s constructor receives three parameters:
+
++ `groupId`: the group id of the push consumer;
++ `instanceName`: the instance name of the push consumer, **optional**;
++ `options`: the options object, **optional**;
+  - `nameServer`: the name server of RocketMQ;
+  - `threadCount`: the thread number of underlying C++ logic;
+  - `maxBatchSize`: message max batch size;
+  - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logFileSize`: size of each C++ core logic log file with unit (B);
+  - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
+
+e.g.
+
+```javascript
+const { PushConsumer } = require("apache-rocketmq");
+const consumer = new PushConsumer("GROUP_ID", "INSTANCE_NAME", {
+    nameServer: "127.0.0.1:9876",
+    threadCount: 3
+});
+```
+
+#### start
+
+```javascript
+consumer.start([callback]);
+```
+
+`.start` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+consumer.start(function(err) {
+    if(err) {
+        //
+    }
+});
+
+// or
+
+consumer.start().then(() => {
+    //
+}).catch(err => {
+    //
+});
+```
+
+#### shutdown
+
+```javascript
+consumer.start([callback]);
+```
+
+`.shutdown` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+consumer.shutdown(function(err) {
+    if(err) {
+        //
+    }
+});
+
+// or
+
+consumer.shutdown().then(() => {
+    //
+}).catch(err => {
+    //
+});
+```
+
+#### subscribe
+
+Add a subscription relationship to consumer.
+
+```javascript
+consumer.subscribe(topic[, expression]);
+```
+
+`.subscribe` receives two parameters which the second parameter is optional.
+
++ `topic`: The topic to be subscribed;
++ `expression`: The additional expression to be subscribed, **optional**. e.g. `*`.
+
+#### On Message Event
+
+If you want to receive messages from RocketMQ Server, you should add a listener for `message` event which receives 2
+parameters.
+
+```javascript
+function YOUR_LISTENER(msg, ack) {
+    //
+}
+```
+
++ `msg`: the message object to be consumed;
++ `ack`: the Acknowledge object, which has a `.done()` function.
+
+`msg` object looks like:
+
+```javascript
+{ topic: 'test',
+  tags: 'bar',
+  keys: 'foo',
+  body: 'baz 7',
+  msgId: '0101007F0000367E0000339DD68B0800' }
+```
+
+You may call `ack.done()` to tell RocketMQ that you've finished your message successfully which is same as `ack.done(true)`. And you may call `ack.done(false)` to tell it that you've failed.
+
+e.g.
+
+```javascript
+consumer.on("message", function(msg, ack) {
+    console.log(msg);
+    ack.done();
+});
+```
+
+## Apache RocketMQ Community
+
++ [RocketMQ Community Projects](https://github.com/apache/rocketmq-externals)
+
+## Contact Us
+
++ Mailing Lists: https://rocketmq.apache.org/about/contact/
++ Home: https://rocketmq.apache.org
++ Docs: https://rocketmq.apache.org/docs/quick-start/
++ Issues: https://github.com/apache/rocketmq-client-nodejs/issues
++ Ask: https://stackoverflow.com/questions/tagged/rocketmq
++ Slack: https://rocketmq-community.slack.com/
+
+## How to Contribute
+
+Contributions are warmly welcome! Be it trivial cleanup, major new feature or other suggestion. Read this [how to contribute](CONTRIBUTING.md) guide for more details.
+
+## License
+
+[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
+
diff --git a/binding.gyp b/binding.gyp
new file mode 100644
index 0000000..697d1c4
--- /dev/null
+++ b/binding.gyp
@@ -0,0 +1,48 @@
+{
+  "targets": [
+    {
+      "target_name": "rocketmq",
+      "sources": [
+        "src/rocketmq.cpp",
+        "src/producer.cpp",
+        "src/push_consumer.cpp",
+        "src/consumer_ack.cpp",
+        "src/consumer_ack_inner.cpp"
+      ],
+      "include_dirs": [
+        "deps/rocketmq/include",
+        "<!(node -e \"require('nan')\")"
+      ],
+      "conditions": [
+        ["OS==\"linux\"", {
+          "libraries": [
+            "<(module_root_dir)/deps/lib/librocketmq.a"
+          ],
+          "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+          "cflags_cc": [ "-Wno-ignored-qualifiers" ],
+          "cflags": [ "-std=c++11", "-g" ]
+        }],
+        ["OS==\"win\"", {
+          "libraries": [
+            "<(module_root_dir)/deps/lib/rocketmq-client-cpp.lib"
+          ],
+          "copies": [
+            {
+              "destination": "<(module_root_dir)/build/Release/",
+              "files": [ "<(module_root_dir)/deps/lib/rocketmq-client-cpp.dll" ]
+            }
+          ]
+        }],
+        ["OS==\"mac\"", {
+          "xcode_settings": {
+            "GCC_ENABLE_CPP_EXCEPTIONS": "YES"
+          },
+          "cflags!": [ "-fno-exceptions" ],
+          "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+          "cflags_cc": [ "-Wno-ignored-qualifiers" ],
+          "cflags": [ "-std=c++11", "-stdlib=libc++" ]
+        }]
+      ]
+    }
+  ]
+}
diff --git a/deps/lib/.gitkeep b/deps/lib/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/deps/lib/.gitkeep
diff --git a/deps/rocketmq b/deps/rocketmq
new file mode 160000
index 0000000..d5887b6
--- /dev/null
+++ b/deps/rocketmq
@@ -0,0 +1 @@
+Subproject commit d5887b63ddbba16fec562f64dca7f77ce9ca0bb1
diff --git a/example/common.js b/example/common.js
new file mode 100644
index 0000000..efc5153
--- /dev/null
+++ b/example/common.js
@@ -0,0 +1,6 @@
+"use strict";
+
+module.exports = {
+    nameServer: "127.0.0.1:9876",
+    messageCount: 10
+};
diff --git a/example/producer.js b/example/producer.js
new file mode 100644
index 0000000..0f8ac0b
--- /dev/null
+++ b/example/producer.js
@@ -0,0 +1,47 @@
+"use strict";
+
+const co = require("co");
+
+const common = require("./common");
+const Producer = require("../").Producer;
+
+co(function *() {
+    const producer = new Producer("testGroup", {
+        nameServer: common.nameServer,
+        groupName: "testGroupName",
+        logFileNum: 5,
+        logFileSize: 1048576000,
+        logLevel: "debug",
+        compressLevel: 3,
+        sendMessageTimeout: 5000,
+        maxMessageSize: 1024 * 256
+    });
+
+    console.time("producer start");
+    try {
+        yield producer.start();
+    } catch(e) {
+        console.error(e);
+        process.exit(4);
+    }
+    console.timeEnd("producer start");
+    for(let i = 0; i < common.messageCount; i++) {
+        console.time(`send ${i}`);
+        try {
+            const ret = yield producer.send("test", `baz ${i}`, {
+                keys: "foo",
+                tags: "bar"
+            });
+            console.timeEnd(`send ${i}`);
+            console.log(ret);
+        } catch(e) {
+            console.error(e);
+            console.error(e.stack);
+            process.exit(4);
+        }
+    }
+
+    console.time("producer end");
+    yield producer.shutdown();
+    console.timeEnd("producer end");
+});
diff --git a/example/push_consumer.js b/example/push_consumer.js
new file mode 100644
index 0000000..9154a97
--- /dev/null
+++ b/example/push_consumer.js
@@ -0,0 +1,58 @@
+"use strict";
+
+const assert = require("assert");
+
+const co = require("co");
+
+const common = require("./common");
+const PushConsumer = require("../").PushConsumer;
+
+co(function *() {
+    const msgs = [];
+    const consumer = new PushConsumer("testGroup", {
+        nameServer: common.nameServer,
+        logFileNum: 5,
+        logFileSize: 1048576000,
+        logLevel: "debug"
+    });
+
+    consumer.subscribe("test", "*");
+    consumer.on("message", function(msg, ack) {
+        msgs.push(msg);
+        ack.done();
+
+        // console.log(msg);
+        // return;
+
+        if(msgs.length === common.messageCount) {
+            msgs.sort(function(a, b) {
+                return a.body < b.body ? -1 : 1;
+            });
+
+            console.log(msgs);
+
+            for(let i = 0; i < msgs.length; i++) {
+                assert.deepStrictEqual(msgs[i], {
+                    topic: "test",
+                    tags: "bar",
+                    keys: "foo",
+                    body: `baz ${i}`,
+                    msgId: msgs[i].msgId
+                });
+            }
+
+            console.time("consumer end");
+            consumer.shutdown().then(() => {
+                console.timeEnd("consumer end");
+                process.exit(0);
+            }).catch(e => {
+                console.error(e);
+                process.exit(4);
+            });
+        }
+    });
+
+    console.time("consumer start");
+    yield consumer.start();
+    console.timeEnd("consumer start");
+});
diff --git a/index.js b/index.js
new file mode 100644
index 0000000..9127a66
--- /dev/null
+++ b/index.js
@@ -0,0 +1,4 @@
+"use strict";
+
+exports.Producer = require("./lib/producer");
+exports.PushConsumer = require("./lib/push_consumer");
diff --git a/lib/common.js b/lib/common.js
new file mode 100644
index 0000000..73f3ccb
--- /dev/null
+++ b/lib/common.js
@@ -0,0 +1,12 @@
+"use strict";
+
+exports.requireBinding = function(name) {
+    let mod;
+    try {
+        mod = require(`../build/Debug/${name}`);
+    } catch(e) {
+        mod = require(`../build/Release/${name}`);
+    }
+
+    return mod;
+};
diff --git a/lib/env_init.js b/lib/env_init.js
new file mode 100644
index 0000000..16d8d31
--- /dev/null
+++ b/lib/env_init.js
@@ -0,0 +1,16 @@
+"use strict";
+
+const os = require("os");
+const path = require("path");
+
+const common = require("./common");
+
+const binding = common.requireBinding("rocketmq");
+
+switch(os.platform()) {
+case "darwin":
+    process.env.EVENT_NOKQUEUE = "1";
+    binding.macosDLOpen(path.join(__dirname, "../deps/lib/librocketmq.dylib"));
+    break;
+default: break;
+}
diff --git a/lib/producer.js b/lib/producer.js
new file mode 100644
index 0000000..8c5b09c
--- /dev/null
+++ b/lib/producer.js
@@ -0,0 +1,184 @@
+"use strict";
+
+require("./env_init");
+
+const assert = require("assert");
+
+const common = require("./common");
+
+const binding = common.requireBinding("rocketmq");
+
+const START_OR_SHUTDOWN = Symbol("RocketMQProducer#startOrShutdown");
+
+const SEND_RESULT_STATUS = {
+    OK: 0,
+    FLUSH_DISK_TIMEOUT: 1,
+    FLUSH_SLAVE_TIMEOUT: 2,
+    SLAVE_NOT_AVAILABLE: 3
+};
+const SEND_RESULT_STATUS_STR = {
+    0: "OK",
+    1: "FLUSH_DISK_TIMEOUT",
+    2: "FLUSH_SLAVE_TIMEOUT",
+    3: "SLAVE_NOT_AVAILABLE"
+};
+const DEFAULT_OPTIONS = {};
+const START_STATUS = {
+    STOPPED: 0,
+    STARTED: 1,
+    STOPPING: 2,
+    STARTING: 3
+};
+const OPTIONS_LOG_LEVEL = {
+    FATAL: 1,
+    ERROR: 2,
+    WARN: 3,
+    INFO: 4,
+    DEBUG: 5,
+    TRACE: 6,
+    NUM: 7
+};
+
+let producerRef = 0;
+let timer;
+
+class RocketMQProducer {
+    /**
+     * RocketMQ Producer constructor
+     * @param {String} groupId the group id
+     * @param {String} [instanceName] the instance name
+     * @param {Object} options the options
+     */
+    constructor(groupId, instanceName, options) {
+        if(typeof instanceName !== "string" && !options) {
+            options = instanceName;
+            instanceName = null;
+        }
+
+        options = Object.assign({}, DEFAULT_OPTIONS, options || {});
+        if(options.logLevel && typeof options.logLevel === "string") {
+            options.logLevel = OPTIONS_LOG_LEVEL[options.logLevel.toUpperCase()] || OPTIONS_LOG_LEVEL.INFO;
+        }
+        this.core = new binding.Producer(groupId, instanceName, options);
+        this.status = START_STATUS.STOPPED;
+    }
+
+    /**
+     * Set session credentials (usually used in Alibaba MQ)
+     * @param {String} accessKey the access key
+     * @param {String} secretKey the secret key
+     * @param {String} onsChannel the ons channel
+     * @return {Number} the result
+     */
+    setSessionCredentials(accessKey, secretKey, onsChannel) {
+        assert(typeof accessKey === "string");
+        assert(typeof secretKey === "string");
+        assert(typeof onsChannel === "string");
+        return !this.core.setSessionCredentials(accessKey, secretKey, onsChannel);
+    }
+
+    [START_OR_SHUTDOWN](method, callback) {
+        let promise;
+        let resolve;
+        let reject;
+        if(!callback) {
+            promise = new Promise((_resolve, _reject) => {
+                resolve = _resolve;
+                reject = _reject;
+            });
+        } else {
+            resolve = reject = callback;
+        }
+
+        this.core[method]((err, ret) => {
+            if(err) return reject(err);
+
+            if(method === "start") {
+                this.status = START_STATUS.STARTED;
+                if(!producerRef) timer = setInterval(function() {}, 24 * 3600 * 1000);
+                producerRef++;
+            } else {
+                this.status = START_STATUS.STOPPED;
+                producerRef--;
+                if(!producerRef) clearInterval(timer);
+            }
+
+            return callback ? resolve(undefined, ret) : resolve(ret);
+        });
+
+        if(!callback) return promise;
+    }
+
+    /**
+     * Start the producer
+     * @param {Function} [callback] the callback function
+     * @return {Promise|undefined} returns a Promise if no callback
+     */
+    start(callback) {
+        assert(this.status === START_STATUS.STOPPED);
+        this.status = START_STATUS.STARTING;
+        return this[START_OR_SHUTDOWN]("start", callback);
+    }
+
+    /**
+     * Shutdown the producer
+     * @param {Function} [callback] the callback function
+     * @return {Promise|undefined} returns a Promise if no callback
+     */
+    shutdown(callback) {
+        assert(this.status === START_STATUS.STARTED);
+        this.status = START_STATUS.STOPPING;
+        return this[START_OR_SHUTDOWN]("shutdown", callback);
+    }
+
+    /**
+     * Send a message
+     * @param {String} topic the topic
+     * @param {String} body the body
+     * @param {Object} [options] the options
+     * @param {Function} [callback] the callback function
+     * @return {Promise|undefined} returns a Promise if no callback
+     */
+    send(topic, body, options, callback) {
+        assert(typeof topic === "string");
+        assert(typeof body === "string" || Buffer.isBuffer(body));
+
+        if(typeof options === "function") {
+            callback = options;
+            options = {};
+        }
+
+        options = options || {};
+
+        let promise;
+        let resolve;
+        let reject;
+        if(!callback) {
+            promise = new Promise((_resolve, _reject) => {
+                resolve = _resolve;
+                reject = _reject;
+            });
+        }
+
+        this.core.send(topic, body, options, function sendMessageCallback(err, status, msgId, offset) {
+            if(err) {
+                if(!callback) return reject(err);
+                return callback(err);
+            }
+
+            const ret = {
+                status,
+                statusStr: SEND_RESULT_STATUS_STR[status] || "UNKNOWN",
+                msgId,
+                offset
+            };
+            if(!callback) return resolve(ret);
+            callback(undefined, ret);
+        });
+        if(promise) return promise;
+    }
+}
+
+RocketMQProducer.SEND_RESULT = SEND_RESULT_STATUS;
+
+module.exports = RocketMQProducer;
diff --git a/lib/push_consumer.js b/lib/push_consumer.js
new file mode 100644
index 0000000..fde2e84
--- /dev/null
+++ b/lib/push_consumer.js
@@ -0,0 +1,126 @@
+"use strict";
+
+require("./env_init");
+
+const assert = require("assert");
+const EventEmitter = require("events").EventEmitter;
+
+const common = require("./common");
+
+const binding = common.requireBinding("rocketmq");
+
+const START_OR_SHUTDOWN = Symbol("RocketMQPushConsumer#startOrShutdown");
+const START_STATUS = {
+    STOPPED: 0,
+    STARTED: 1,
+    STOPPING: 2,
+    STARTING: 3
+};
+const OPTIONS_LOG_LEVEL = {
+    FATAL: 1,
+    ERROR: 2,
+    WARN: 3,
+    INFO: 4,
+    DEBUG: 5,
+    TRACE: 6,
+    NUM: 7
+};
+
+const DEFAULT_OPTIONS = {};
+
+let producerRef = 0;
+let timer;
+
+class RocketMQPushConsumer extends EventEmitter {
+    /**
+     * RocketMQ PushConsumer constructor
+     * @param {String} groupId the group id
+     * @param {String} [instanceName] the instance name
+     * @param {Object} options the options
+     */
+    constructor(groupId, instanceName, options) {
+        super();
+
+        if(typeof instanceName !== "string" && !options) {
+            options = instanceName;
+            instanceName = null;
+        }
+
+        options = Object.assign({}, DEFAULT_OPTIONS, options || {});
+        if(options.logLevel && typeof options.logLevel === "string") {
+            options.logLevel = OPTIONS_LOG_LEVEL[options.logLevel.toUpperCase()] || OPTIONS_LOG_LEVEL.INFO;
+        }
+        this.core = new binding.PushConsumer(groupId, instanceName, options);
+        this.core.setListener(this.emit.bind(this, "message"));
+        this.status = START_STATUS.STOPPED;
+    }
+
+    [START_OR_SHUTDOWN](method, callback) {
+        let promise;
+        let resolve;
+        let reject;
+        if(!callback) {
+            promise = new Promise((_resolve, _reject) => {
+                resolve = _resolve;
+                reject = _reject;
+            });
+        } else {
+            resolve = reject = callback;
+        }
+
+        this.core[method]((err, ret) => {
+            if(err) return reject(err);
+
+            if(method === "start") {
+                this.status = START_STATUS.STARTED;
+                if(!producerRef) timer = setInterval(function() {}, 24 * 3600 * 1000);
+                producerRef++;
+            } else {
+                this.status = START_STATUS.STOPPED;
+                producerRef--;
+                if(!producerRef) clearInterval(timer);
+            }
+
+            return callback ? resolve(undefined, ret) : resolve(ret);
+        });
+
+        if(!callback) return promise;
+    }
+
+    /**
+     * Start the push consumer
+     * @param {Function} [callback] the callback function
+     * @return {Promise|undefined} returns a Promise if no callback
+     */
+    start(callback) {
+        assert(this.status === START_STATUS.STOPPED);
+        this.status = START_STATUS.STARTING;
+        return this[START_OR_SHUTDOWN]("start", callback);
+    }
+
+    /**
+     * Shutdown the push consumer
+     * @param {Function} [callback] the callback function
+     * @return {Promise|undefined} returns a Promise if no callback
+     */
+    shutdown(callback) {
+        assert(this.status === START_STATUS.STARTED);
+        this.status = START_STATUS.STOPPING;
+        return this[START_OR_SHUTDOWN]("shutdown", callback);
+    }
+
+    /**
+     * subscribe a topic
+     * @param {String} topic the topic to be subscribed
+     * @param {String} [expression] the additional expression to be subscribed
+     * @return {Number} the subscribe status result
+     */
+    subscribe(topic, expression) {
+        assert(this.status === START_STATUS.STOPPED);
+        assert(topic && typeof topic === "string");
+        assert(!expression || expression && typeof expression === "string");
+        return this.core.subscribe(topic, expression || "");
+    }
+}
+
+module.exports = RocketMQPushConsumer;
diff --git a/package.json b/package.json
new file mode 100644
index 0000000..ae0df92
--- /dev/null
+++ b/package.json
@@ -0,0 +1,26 @@
+{
+  "name": "apache-rocketmq",
+  "version": "0.0.1-dev",
+  "cppSDKVersion": "1.2.0",
+  "description": "RocketMQ binding for Node.js",
+  "main": "index.js",
+  "scripts": {
+    "test": "npm run lint && echo 'temp example test' && node example/producer.js && node example/push_consumer.js",
+    "lint": "eslint .",
+    "install": "node ./script/download_lib.js && node-gyp rebuild"
+  },
+  "author": "XadillaX <i@2333.moe>",
+  "license": "Apache-2.0",
+  "dependencies": {
+    "co": "^4.6.0",
+    "destroy": "^1.0.4",
+    "getos": "^3.1.1",
+    "mkdirp": "^0.5.1",
+    "nan": "^2.11.1",
+    "urllib": "^2.31.3"
+  },
+  "devDependencies": {
+    "eslint": "^5.9.0",
+    "eslint-config-rocketmq-style": "^1.0.0"
+  }
+}
diff --git a/script/download_lib.js b/script/download_lib.js
new file mode 100644
index 0000000..2b57a4b
--- /dev/null
+++ b/script/download_lib.js
@@ -0,0 +1,115 @@
+"use strict";
+
+const fs = require("fs");
+const os = require("os");
+const path = require("path");
+
+const co = require("co");
+const destroy = require("destroy");
+const _mkdirp = require("mkdirp");
+const urllib = require("urllib");
+
+const getLinuxDistroRoute = require("./get_linux_distro_route");
+const pkg = require("../package");
+
+let REGISTRY_MIRROR =
+    process.env.NODE_ROCKETMQ_REGISTRY ||
+    "https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com";
+if(!REGISTRY_MIRROR.endsWith("/")) REGISTRY_MIRROR += "/";
+
+const CPP_SDK_VERSION = pkg.cppSDKVersion;
+const LIB_DIR = path.join(__dirname, "..", "deps", "lib");
+const URL_ROOT = `${REGISTRY_MIRROR}cpp-client`;
+
+function mkdirp(dir) {
+    return new Promise((resolve, reject) => {
+        _mkdirp(dir, null, err => {
+            if(err) reject(err);
+            else resolve();
+        });
+    });
+}
+
+function *getUrlArray() {
+    const platform = os.platform();
+    const ret = [];
+    let distro;
+
+    switch(platform) {
+    case "win32":
+        ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.dll`);
+        ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.lib`);
+        break;
+
+    case "darwin":
+        ret.push(`${URL_ROOT}/mac/${CPP_SDK_VERSION}/librocketmq.dylib`);
+        break;
+
+    case "linux":
+        distro = yield getLinuxDistroRoute();
+        ret.push(`${URL_ROOT}/linux/${CPP_SDK_VERSION}/${distro}/librocketmq.a`);
+        break;
+
+    default: throw new Error(`Unsupported platform ${platform}`);
+    }
+
+    return ret;
+}
+
+co(function *() {
+    let urls;
+    try {
+        urls = yield getUrlArray();
+    } catch(e) {
+        console.error(`[rocketmq sdk] [error] ${e.message}`);
+        process.exit(4);
+    }
+
+    yield mkdirp(LIB_DIR);
+    let writeTimes = 0;
+    for(const url of urls) {
+        console.log(`[rocketmq sdk] [info] downloading [${url}]...`);
+
+        const resp = yield urllib.request(url, {
+            timeout: 60000 * 5,
+            followRedirect: true,
+            streaming: true
+        });
+
+        if(resp.status !== 200) {
+            destroy(resp.res);
+            console.error(`[rocketmq sdk] [error] error status ${resp.status} while downloading [${url}].`);
+            process.exit(4);
+        }
+
+        const readStream = resp.res;
+        const filename = path.join(LIB_DIR, path.basename(url));
+        const writeStream = fs.createWriteStream(filename, {
+            encoding: "binary"
+        });
+
+        // eslint-disable-next-line
+        function handleDownladCallback(err) {
+            if(err) {
+                console.error(`[rocketmq sdk] [error] error occurred while downloading [${url}] to [${filename}].`);
+                console.error(err.stack);
+                process.exit(4);
+            }
+
+            writeTimes++;
+            destroy(resp.res);
+
+            console.log(`[rocketmq sdk] [info] downloaded library [${url}].`);
+            if(writeTimes === urls.length) {
+                console.log("[rocketmq sdk] [info] all libraries have been written to disk.");
+                process.exit(0);
+            }
+        }
+
+        readStream.on("error", handleDownladCallback);
+        writeStream.on("error", handleDownladCallback);
+        writeStream.on("finish", handleDownladCallback);
+
+        readStream.pipe(writeStream);
+    }
+});
diff --git a/script/get_linux_distro_route.js b/script/get_linux_distro_route.js
new file mode 100644
index 0000000..d5245a1
--- /dev/null
+++ b/script/get_linux_distro_route.js
@@ -0,0 +1,86 @@
+"use strict";
+
+const assert = require("assert");
+
+const getos = require("getos");
+
+const RHEL_MAP_ARRAY = [
+    "Centos",
+    "Red Hat Linux",
+    "RHEL",
+    "Scientific Linux",
+    "ScientificSL",
+    "ScientificCERNSLC",
+    "ScientificFermiLTS",
+    "ScientificSLF"
+];
+const NORMAL_MAP_ARRAY = [
+    "Alpine Linux",
+    "Amazon Linux",
+    "Arch Linux",
+    "Chakra",
+    "Debian",
+    "elementary OS",
+    "IYCC",
+    "Linux Mint",
+    "Manjaro Linux",
+    "Ubuntu Linux"
+];
+
+function realGetLinuxDistroRoute(dist, release) {
+    let major;
+    if(release) {
+        major = Number(release.split(".")[0]);
+    }
+
+    // RHEL Distros
+    if(RHEL_MAP_ARRAY.includes(dist)) {
+        assert(major >= 5 && major <= 7, `Only support ${dist} 5-7.`);
+        return `RHEL${major}.x`;
+    }
+
+    // Fedora
+    if("Fedora" === dist) {
+        if(major <= 18) {
+            return "RHEL6.x";
+        } else if(major === 19) {
+            return "RHEL7.x";
+        }
+    }
+
+    if("Ubuntu Linux" === dist) {
+        assert([ 14, 16, 18 ].includes(major));
+        return `UBUNTU/${major}.04`;
+    }
+
+    if("Debian" === dist) {
+        assert(major >= 8 && major <= 10);
+        return `UBUNTU/${(major + 2) / 2}.04`;
+    }
+
+    // Ubuntu Distros
+    if(!NORMAL_MAP_ARRAY.includes(dist)) {
+        console.error(`[rocketmq sdk] [warn] ${dist} may not supported, fallback to use Ubuntu library.`);
+    }
+
+    return "UBUNTU/14.04";
+}
+
+function getLinuxDistroRoute() {
+    return new Promise((resolve, reject) => {
+        getos(function(err, ret) {
+            if(err) return reject(err);
+
+            let route;
+            try {
+                route = realGetLinuxDistroRoute(ret.dist, ret.release);
+            } catch(e) {
+                return reject(e);
+            }
+
+            resolve(route);
+        });
+    });
+}
+
+module.exports = getLinuxDistroRoute;
diff --git a/src/consumer_ack.cpp b/src/consumer_ack.cpp
new file mode 100644
index 0000000..d4243ea
--- /dev/null
+++ b/src/consumer_ack.cpp
@@ -0,0 +1,74 @@
+#include "consumer_ack.h"
+
+namespace __node_rocketmq__ {
+
+Nan::Persistent<Function> ConsumerAck::constructor;
+
+ConsumerAck::ConsumerAck() :
+    inner(NULL)
+{
+}
+
+ConsumerAck::~ConsumerAck()
+{
+    inner = NULL;
+}
+
+NAN_MODULE_INIT(ConsumerAck::Init)
+{
+    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
+    tpl->SetClassName(Nan::New("ConsumerAck").ToLocalChecked());
+    tpl->InstanceTemplate()->SetInternalFieldCount(1);
+
+    Nan::SetPrototypeMethod(tpl, "done", Done);
+
+    constructor.Reset(tpl->GetFunction());
+    Nan::Set(target, Nan::New("ConsumerAck").ToLocalChecked(), tpl->GetFunction());
+}
+
+NAN_METHOD(ConsumerAck::New)
+{
+    Isolate* isolate = info.GetIsolate();
+    Local<Context> context = Context::New(isolate);
+
+    if(!info.IsConstructCall())
+    {
+        Local<Function> _constructor = Nan::New<Function>(constructor);
+        info.GetReturnValue().Set(_constructor->NewInstance(context, 0, 0).ToLocalChecked());
+        return;
+    }
+
+    ConsumerAck* producer = new ConsumerAck();
+    producer->Wrap(info.This());
+    info.GetReturnValue().Set(info.This());
+}
+
+NAN_METHOD(ConsumerAck::Done)
+{
+    ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(info.Holder());
+    bool succ = true;
+
+    if(info.Length() >= 1)
+    {
+        succ = (info[0]->IsUndefined() || info[0]->IsNull() || Nan::To<bool>(info[0]).FromJust());
+    }
+
+    // call inner ack's `Ack` function to emit the true `Acker`'s `Ack` function
+    // and finish waiting of consume thread
+    CConsumeStatus status = succ ?
+        CConsumeStatus::E_CONSUME_SUCCESS :
+        CConsumeStatus::E_RECONSUME_LATER;
+    ack->Ack(status);
+}
+
+void ConsumerAck::Ack(CConsumeStatus status)
+{
+    if(inner)
+    {
+        // call inner ack in the main event loop
+        inner->Ack(status);
+        inner = NULL;
+    }
+}
+
+}
\ No newline at end of file
diff --git a/src/consumer_ack.h b/src/consumer_ack.h
new file mode 100644
index 0000000..a90e5ca
--- /dev/null
+++ b/src/consumer_ack.h
@@ -0,0 +1,50 @@
+#ifndef __ROCKETMQ_CONSUMER_ACK_H__
+#define __ROCKETMQ_CONSUMER_ACK_H__
+
+#include "consumer_ack_inner.h"
+#include <nan.h>
+
+namespace __node_rocketmq__ {
+
+using v8::Context;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Isolate;
+using v8::Local;
+using v8::Object;
+using v8::String;
+using v8::Value;
+
+class ConsumerAck : public Nan::ObjectWrap {
+public:
+    static NAN_MODULE_INIT(Init);
+
+private:
+    explicit ConsumerAck();
+    ~ConsumerAck();
+
+    static NAN_METHOD(New);
+    static NAN_METHOD(Done);
+
+    void Ack(CConsumeStatus status);
+
+    static Nan::Persistent<v8::Function> constructor;
+
+public:
+    void SetInner(ConsumerAckInner* _inner)
+    {
+        inner = _inner;
+    }
+
+    static Nan::Persistent<v8::Function>& GetConstructor()
+    {
+        return constructor;
+    }
+
+private:
+    ConsumerAckInner* inner;
+};
+
+}
+
+#endif
\ No newline at end of file
diff --git a/src/consumer_ack_inner.cpp b/src/consumer_ack_inner.cpp
new file mode 100644
index 0000000..dcf5e89
--- /dev/null
+++ b/src/consumer_ack_inner.cpp
@@ -0,0 +1,61 @@
+#include "consumer_ack_inner.h"
+
+namespace __node_rocketmq__ {
+
+ConsumerAckInner::ConsumerAckInner() :
+    acked(false)
+{
+    uv_cond_init(&cond);
+    uv_mutex_init(&mutex);
+}
+
+ConsumerAckInner::~ConsumerAckInner()
+{
+    uv_mutex_destroy(&mutex);
+    uv_cond_destroy(&cond);
+}
+
+void ConsumerAckInner::Ack(CConsumeStatus _status)
+{
+    uv_mutex_lock(&mutex);
+    bool _acked = acked;
+
+    if(_acked)
+    {
+        uv_mutex_unlock(&mutex);
+        return;
+    }
+
+    status = _status;
+    acked = true;
+
+    // tell `this->WaitResult()` to continue
+    uv_cond_signal(&cond);
+    uv_mutex_unlock(&mutex);
+}
+
+CConsumeStatus ConsumerAckInner::WaitResult()
+{
+    uv_mutex_lock(&mutex);
+
+    // if `cond signal` send before `WaitResult()`,
+    // `uv_cond_wait` will be blocked and never continue
+    //
+    // so we have to return result directly without `uv_cond_wait`
+    if(acked)
+    {
+        CConsumeStatus _status = status;
+        uv_mutex_unlock(&mutex);
+        return _status;
+    }
+
+    // wait for `this->Ack()` and that will emit `uv_cond_signal` to let it stop wait
+    uv_cond_wait(&cond, &mutex);
+
+    CConsumeStatus _status = status;
+    uv_mutex_unlock(&mutex);
+
+    return _status;
+}
+
+}
diff --git a/src/consumer_ack_inner.h b/src/consumer_ack_inner.h
new file mode 100644
index 0000000..402de6b
--- /dev/null
+++ b/src/consumer_ack_inner.h
@@ -0,0 +1,26 @@
+#ifndef __ROCKETMQ_CONSUMER_ACK_INNER_H__
+#define __ROCKETMQ_CONSUMER_ACK_INNER_H__
+
+#include <uv.h>
+#include <CPushConsumer.h>
+
+namespace __node_rocketmq__ {
+
+class ConsumerAckInner {
+public:
+    ConsumerAckInner();
+    ~ConsumerAckInner();
+
+    void Ack(CConsumeStatus _status);
+    CConsumeStatus WaitResult();
+
+private:
+    bool acked;
+    CConsumeStatus status;
+    uv_mutex_t mutex;
+    uv_cond_t cond;
+};
+
+}
+
+#endif
\ No newline at end of file
diff --git a/src/producer.cpp b/src/producer.cpp
new file mode 100644
index 0000000..36996c6
--- /dev/null
+++ b/src/producer.cpp
@@ -0,0 +1,251 @@
+#include "producer.h"
+#include "workers/producer/send_message.h"
+#include "workers/producer/start_or_shutdown.h"
+
+#include <MQClientException.h>
+#include <string>
+using namespace std;
+
+namespace __node_rocketmq__ {
+
+#define NAN_GET_CPRODUCER() \
+    RocketMQProducer* _v8_producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder()); \
+    CProducer* producer_ptr = _v8_producer->GetProducer();
+
+Nan::Persistent<Function> RocketMQProducer::constructor;
+
+RocketMQProducer::RocketMQProducer(const char* group_id, const char* instance_name)
+{
+    producer_ptr = CreateProducer(group_id);
+    if(instance_name)
+    {
+        SetProducerInstanceName(producer_ptr, instance_name);
+    }
+}
+
+RocketMQProducer::~RocketMQProducer()
+{
+    try
+    {
+        ShutdownProducer(producer_ptr);
+    }
+    catch (...)
+    {
+        //
+    }
+
+    DestroyProducer(producer_ptr);
+}
+
+void RocketMQProducer::SetOptions(Local<Object> options)
+{
+    // set name server
+    Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
+    if(_name_server_v->IsString())
+    {
+        Nan::Utf8String namesrv(_name_server_v);
+        SetProducerNameServerAddress(producer_ptr, *namesrv);
+    }
+
+    // set group name
+    Local<Value> _group_name_v = Nan::Get(options, Nan::New<String>("groupName").ToLocalChecked()).ToLocalChecked();
+    if(_group_name_v->IsString())
+    {
+        Nan::Utf8String group_name(_group_name_v);
+        SetProducerGroupName(producer_ptr, *group_name);
+    }
+
+    // set log num & single log size
+    int file_num = 3;
+    int64 file_size = 104857600;
+    Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
+    Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
+    if(_log_file_num_v->IsNumber())
+    {
+        file_num = _log_file_num_v->Int32Value();
+    }
+    if(_log_file_size_v->IsNumber())
+    {
+        file_size = _log_file_size_v->Int32Value();
+    }
+    SetProducerLogFileNumAndSize(producer_ptr, file_num, file_size);
+
+    // set log level
+    Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
+    if(_log_level_v->IsNumber())
+    {
+        int level = _log_level_v->Int32Value();
+        SetProducerLogLevel(producer_ptr, (CLogLevel)level);
+    }
+
+    // set compress level
+    Local<Value> _compress_level_v = Nan::Get(options, Nan::New<String>("compressLevel").ToLocalChecked()).ToLocalChecked();
+    if(_compress_level_v->IsNumber()) {
+        int level = _compress_level_v->Int32Value();
+        SetProducerCompressLevel(producer_ptr, level);
+    }
+
+    // set send message timeout
+    Local<Value> _send_message_timeout_v = Nan::Get(options, Nan::New<String>("sendMessageTimeout").ToLocalChecked()).ToLocalChecked();
+    if(_send_message_timeout_v->IsNumber())
+    {
+        int timeout = _send_message_timeout_v->Int32Value();
+        SetProducerSendMsgTimeout(producer_ptr, timeout);
+    }
+
+    // set max message size
+    Local<Value> _max_message_size_v = Nan::Get(options, Nan::New<String>("maxMessageSize").ToLocalChecked()).ToLocalChecked();
+    if(_max_message_size_v->IsNumber())
+    {
+        int size = _max_message_size_v->Int32Value();
+        SetProducerMaxMessageSize(producer_ptr, size);
+    }
+}
+
+NAN_MODULE_INIT(RocketMQProducer::Init)
+{
+    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
+    tpl->SetClassName(Nan::New("RocketMQProducer").ToLocalChecked());
+    tpl->InstanceTemplate()->SetInternalFieldCount(1);
+
+    Nan::SetPrototypeMethod(tpl, "start", Start);
+    Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
+    Nan::SetPrototypeMethod(tpl, "send", Send);
+    Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
+
+    constructor.Reset(tpl->GetFunction());
+    Nan::Set(target, Nan::New("Producer").ToLocalChecked(), tpl->GetFunction());
+}
+
+NAN_METHOD(RocketMQProducer::New)
+{
+    Isolate* isolate = info.GetIsolate();
+    Local<Context> context = Context::New(isolate);
+
+    if(!info.IsConstructCall())
+    {
+        const int argc = 3;
+        Local<Value> argv[argc] = { info[0], info[1], info[2] };
+        Local<Function> _constructor = Nan::New<v8::Function>(constructor);
+        info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
+        return;
+    }
+
+    Nan::Utf8String group_id(info[0]);
+    Nan::Utf8String instance_name(info[1]);
+    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
+    RocketMQProducer* producer = new RocketMQProducer(*group_id, info[1]->IsNull() ? NULL : *instance_name);
+
+    producer->Wrap(info.This());
+
+    // try to set options
+    try
+    {
+        producer->SetOptions(options);
+    }
+    catch (runtime_error e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+
+    info.GetReturnValue().Set(info.This());
+}
+
+NAN_METHOD(RocketMQProducer::Start)
+{
+    NAN_GET_CPRODUCER();
+
+    Nan::Callback* callback = (info[0]->IsFunction()) ?
+        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+        NULL;
+
+    Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::START_PRODUCER));
+}
+
+NAN_METHOD(RocketMQProducer::Shutdown)
+{
+    NAN_GET_CPRODUCER();
+
+    Nan::Callback* callback = (info[0]->IsFunction()) ?
+        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+        NULL;
+
+    Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::SHUTDOWN_PRODUCER));
+}
+
+NAN_METHOD(RocketMQProducer::SetSessionCredentials)
+{
+    NAN_GET_CPRODUCER();
+
+    Nan::Utf8String access_key(info[0]);
+    Nan::Utf8String secret_key(info[1]);
+    Nan::Utf8String ons_channel(info[2]);
+
+    int ret;
+    try
+    {
+        ret = SetProducerSessionCredentials(producer_ptr, *access_key, *secret_key, *ons_channel);
+    }
+    catch(runtime_error e)
+    {
+        Nan::ThrowError(e.what());
+    }
+    catch(std::exception& e)
+    {
+        Nan::ThrowError(e.what());
+    }
+    catch(rocketmq::MQException& e)
+    {
+        Nan::ThrowError(e.what());
+    }
+    info.GetReturnValue().Set(ret);
+}
+
+NAN_METHOD(RocketMQProducer::Send)
+{
+    Nan::Utf8String topic(info[0]);
+    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
+
+    CMessage* msg = CreateMessage(*topic);
+
+    Local<Value> _tags_to_be_checked = Nan::Get(options, Nan::New<String>("tags").ToLocalChecked()).ToLocalChecked();
+    Local<Value> _keys_to_be_checked = Nan::Get(options, Nan::New<String>("keys").ToLocalChecked()).ToLocalChecked();
+
+    if(_tags_to_be_checked->IsString())
+    {
+        Nan::Utf8String tags(_tags_to_be_checked);
+        SetMessageTags(msg, *tags);
+    }
+
+    if(_keys_to_be_checked->IsString())
+    {
+        Nan::Utf8String keys(_keys_to_be_checked);
+        SetMessageKeys(msg, *keys);
+    }
+
+    // set message body:
+    //   1. if it's a string, call `SetMessageBody`;
+    //   2. if it's a buffer, call `SetByteMessageBody`.
+    if(info[1]->IsString())
+    {
+        Nan::Utf8String body(info[1]);
+        SetMessageBody(msg, *body);
+    }
+    else
+    {
+        Local<Object> node_buff_object = Nan::To<Object>(info[1]).ToLocalChecked();
+        unsigned int length = node::Buffer::Length(node_buff_object);
+        const char* buff = node::Buffer::Data(node_buff_object);
+        SetByteMessageBody(msg, buff, length);
+    }
+
+    Nan::Callback* callback = (info[3]->IsFunction()) ?
+        new Nan::Callback(Nan::To<Function>(info[3]).ToLocalChecked()) :
+        NULL;
+
+    RocketMQProducer* producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder());
+    Nan::AsyncQueueWorker(new ProducerSendMessageWorker(callback, producer, msg));
+}
+
+}
diff --git a/src/producer.h b/src/producer.h
new file mode 100644
index 0000000..55966f9
--- /dev/null
+++ b/src/producer.h
@@ -0,0 +1,46 @@
+#ifndef __ROCKETMQ_PRODUCER_H__
+#define __ROCKETMQ_PRODUCER_H__
+
+#include <CProducer.h>
+#include <nan.h>
+
+namespace __node_rocketmq__ {
+
+using v8::Context;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Isolate;
+using v8::Local;
+using v8::Object;
+using v8::String;
+using v8::Value;
+
+class RocketMQProducer : public Nan::ObjectWrap {
+public:
+    static NAN_MODULE_INIT(Init);
+
+public:
+    CProducer* GetProducer() { return producer_ptr; }
+
+private:
+    explicit RocketMQProducer(const char* group_id, const char* instance_name);
+    ~RocketMQProducer();
+
+    static NAN_METHOD(New);
+    static NAN_METHOD(Start);
+    static NAN_METHOD(Shutdown);
+    static NAN_METHOD(Send);
+    static NAN_METHOD(SetSessionCredentials);
+
+    static Nan::Persistent<Function> constructor;
+
+private:
+    void SetOptions(Local<Object> options);
+
+private:
+    CProducer* producer_ptr;
+};
+
+}
+
+#endif
diff --git a/src/push_consumer.cpp b/src/push_consumer.cpp
new file mode 100644
index 0000000..94a626b
--- /dev/null
+++ b/src/push_consumer.cpp
@@ -0,0 +1,378 @@
+#include <map>
+#include "push_consumer.h"
+#include "consumer_ack.h"
+#include "workers/push_consumer/start_or_shutdown.h"
+
+using namespace std;
+
+namespace __node_rocketmq__ {
+
+struct MessageHandlerParam
+{
+    RocketMQPushConsumer* consumer;
+    ConsumerAckInner* ack;
+    CMessageExt* msg;
+};
+char message_handler_param_keys[5][8] = { "topic", "tags", "keys", "body", "msgId" };
+
+uv_mutex_t _get_msg_ext_column_lock;
+
+map<CPushConsumer*, RocketMQPushConsumer*> _push_consumer_map;
+
+#define NAN_GET_CPUSHCONSUMER() \
+    RocketMQPushConsumer* _v8_consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder()); \
+    CPushConsumer* consumer_ptr = _v8_consumer->GetConsumer();
+
+Nan::Persistent<Function> RocketMQPushConsumer::constructor;
+
+RocketMQPushConsumer::RocketMQPushConsumer(const char* group_id, const char* instance_name) :
+    consumer_ptr(NULL)
+{
+    consumer_ptr = CreatePushConsumer(group_id);
+
+    if(instance_name)
+    {
+        SetPushConsumerInstanceName(consumer_ptr, instance_name);
+    }
+
+    _push_consumer_map[consumer_ptr] = this;
+
+    RegisterMessageCallback(consumer_ptr, RocketMQPushConsumer::OnMessage);
+}
+
+RocketMQPushConsumer::~RocketMQPushConsumer()
+{
+    try
+    {
+        ShutdownPushConsumer(consumer_ptr);
+        auto it = _push_consumer_map.find(consumer_ptr);
+        if(it != _push_consumer_map.end())
+        {
+            _push_consumer_map.erase(consumer_ptr);
+        }
+    }
+    catch(...)
+    {
+        //
+    }
+
+    DestroyPushConsumer(consumer_ptr);
+    consumer_ptr = NULL;
+}
+
+void RocketMQPushConsumer::SetOptions(Local<Object> options)
+{
+    // set name server
+    Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
+    if(_name_server_v->IsString())
+    {
+        Nan::Utf8String namesrv(_name_server_v);
+        SetPushConsumerNameServerAddress(consumer_ptr, *namesrv);
+    }
+
+    // set thread count
+    Local<Value> _thread_count_v = Nan::Get(options, Nan::New<String>("threadCount").ToLocalChecked()).ToLocalChecked();
+    if(_thread_count_v->IsNumber())
+    {
+        int thread_count = Nan::To<int32_t>(_thread_count_v).FromJust();
+        if(thread_count > 0)
+        {
+            SetPushConsumerThreadCount(consumer_ptr, thread_count);
+        }
+    }
+
+    // set message batch max size
+    Local<Value> _max_batch_size_v = Nan::Get(options, Nan::New<String>("maxBatchSize").ToLocalChecked()).ToLocalChecked();
+    if(_max_batch_size_v->IsNumber())
+    {
+        int max_batch_size = Nan::To<int32_t>(_max_batch_size_v).FromJust();
+        if(max_batch_size > 0)
+        {
+            SetPushConsumerMessageBatchMaxSize(consumer_ptr, max_batch_size);
+        }
+    }
+
+    // set log num & single log size
+    int file_num = 3;
+    int64 file_size = 104857600;
+    Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
+    Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
+    if(_log_file_num_v->IsNumber())
+    {
+        file_num = _log_file_num_v->Int32Value();
+    }
+    if(_log_file_size_v->IsNumber())
+    {
+        file_size = _log_file_size_v->Int32Value();
+    }
+    SetPushConsumerLogFileNumAndSize(consumer_ptr, file_num, file_size);
+
+    // set log level
+    Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
+    if(_log_level_v->IsNumber())
+    {
+        int level = _log_level_v->Int32Value();
+        SetPushConsumerLogLevel(consumer_ptr, (CLogLevel) level);
+    }
+}
+
+NAN_MODULE_INIT(RocketMQPushConsumer::Init)
+{
+    uv_mutex_init(&_get_msg_ext_column_lock);
+    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
+    tpl->SetClassName(Nan::New("RocketMQPushConsumer").ToLocalChecked());
+    tpl->InstanceTemplate()->SetInternalFieldCount(1);
+
+    Nan::SetPrototypeMethod(tpl, "start", Start);
+    Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
+    Nan::SetPrototypeMethod(tpl, "subscribe", Subscribe);
+    Nan::SetPrototypeMethod(tpl, "setListener", SetListener);
+    Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
+
+    constructor.Reset(tpl->GetFunction());
+    Nan::Set(target, Nan::New("PushConsumer").ToLocalChecked(), tpl->GetFunction());
+}
+
+NAN_METHOD(RocketMQPushConsumer::New)
+{
+    Isolate* isolate = info.GetIsolate();
+    Local<Context> context = Context::New(isolate);
+
+    if(!info.IsConstructCall())
+    {
+        const int argc = 3;
+        Local<Value> argv[argc] = { info[0], info[1], info[2] };
+        Local<Function> _constructor = Nan::New<v8::Function>(constructor);
+        info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
+        return;
+    }
+
+    Nan::Utf8String v8_group_id(info[0]);
+    Nan::Utf8String v8_instance_name(info[1]);
+    string group_id = *v8_group_id;
+    string instance_name = *v8_instance_name;
+    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
+    RocketMQPushConsumer* consumer = new RocketMQPushConsumer(group_id.c_str(), info[1]->IsNull() ? NULL : instance_name.c_str());
+
+    consumer->Wrap(info.This());
+
+    // try to set options
+    try
+    {
+        consumer->SetOptions(options);
+    }
+    catch(const runtime_error e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+    catch(const std::exception& e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+
+    info.GetReturnValue().Set(info.This());
+}
+
+NAN_METHOD(RocketMQPushConsumer::Start)
+{
+    NAN_GET_CPUSHCONSUMER();
+
+    Nan::Callback* callback = (info[0]->IsFunction()) ?
+        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+        NULL;
+
+    Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::START_PUSH_CONSUMER));
+}
+
+NAN_METHOD(RocketMQPushConsumer::Shutdown)
+{
+    NAN_GET_CPUSHCONSUMER();
+
+    Nan::Callback* callback = (info[0]->IsFunction()) ?
+        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+        NULL;
+
+    Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::SHUTDOWN_PUSH_CONSUMER));
+}
+
+NAN_METHOD(RocketMQPushConsumer::Subscribe)
+{
+    NAN_GET_CPUSHCONSUMER();
+
+    Nan::Utf8String v8_topic(info[0]);
+    Nan::Utf8String v8_expression(info[1]);
+    string topic = *v8_topic;
+    string expression = *v8_expression;
+
+    int ret;
+    try
+    {
+        ret = ::Subscribe(consumer_ptr, topic.c_str(), expression.c_str());
+    }
+    catch(const runtime_error e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+    catch(const std::exception& e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+
+    info.GetReturnValue().Set(ret);
+}
+
+NAN_METHOD(RocketMQPushConsumer::SetListener)
+{
+    RocketMQPushConsumer* consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder());
+    if(!consumer->listener_func.IsEmpty())
+    {
+        consumer->listener_func.Reset();
+    }
+
+    consumer->listener_func.Reset(Nan::To<Function>(info[0]).ToLocalChecked());
+}
+
+NAN_METHOD(RocketMQPushConsumer::SetSessionCredentials)
+{
+    NAN_GET_CPUSHCONSUMER();
+
+    Nan::Utf8String access_key(info[0]);
+    Nan::Utf8String secret_key(info[1]);
+    Nan::Utf8String ons_channel(info[2]);
+
+    int ret;
+    try
+    {
+        ret = SetPushConsumerSessionCredentials(consumer_ptr, *access_key, *secret_key, *ons_channel);
+    }
+    catch(const runtime_error e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+    catch(const std::exception& e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+
+    info.GetReturnValue().Set(ret);
+}
+
+string RocketMQPushConsumer::GetMessageColumn(char* name, CMessageExt* msg)
+{
+    const char* orig = NULL;
+
+    uv_mutex_lock(&_get_msg_ext_column_lock);
+    switch(name[0])
+    {
+    // topic / tags
+    case 't':
+        orig = name[1] == 'o' ? GetMessageTopic(msg) : GetMessageTags(msg);
+        break;
+
+    // keys
+    case 'k':
+        orig = GetMessageKeys(msg);
+        break;
+
+    // body
+    case 'b':
+        orig = GetMessageBody(msg);
+        break;
+
+    // msgId
+    case 'm':
+        orig = GetMessageId(msg);
+        break;
+
+    default:
+        orig = NULL;
+        break;
+    }
+
+    uv_mutex_unlock(&_get_msg_ext_column_lock);
+
+    if(!orig) return "";
+    return orig;
+}
+
+void close_async_done(uv_handle_t* handle)
+{
+    free(handle);
+}
+
+void RocketMQPushConsumer::HandleMessageInEventLoop(uv_async_t* async)
+{
+    Nan::HandleScope scope;
+
+    Isolate* isolate = Isolate::GetCurrent();
+    Local<Context> context = isolate->GetCurrentContext();
+
+    MessageHandlerParam* param = (MessageHandlerParam*)(async->data);
+    RocketMQPushConsumer* consumer = param->consumer;
+    ConsumerAckInner* ack_inner = param->ack;
+    CMessageExt* msg = param->msg;
+
+    // create the JavaScript ack object and then set inner ack object
+    Local<Function> cons = Nan::New<Function>(ConsumerAck::GetConstructor());
+    Local<Object> ack_obj = cons->NewInstance(context, 0, 0).ToLocalChecked();
+    ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(ack_obj);
+    ack->SetInner(ack_inner);
+
+    // TODO: const char *GetMessageProperty(CMessageExt *msgExt, const char *key);
+    Local<Object> result = Nan::New<Object>();
+    for(int i = 0; i < 5; i++)
+    {
+        Nan::Set(
+            result,
+            Nan::New(message_handler_param_keys[i]).ToLocalChecked(),
+            Nan::New(RocketMQPushConsumer::GetMessageColumn(message_handler_param_keys[i], msg)).ToLocalChecked());
+    }
+
+    Local<Value> argv[2] = {
+        result,
+        ack_obj
+    };
+    Nan::Callback* callback = consumer->GetListenerFunction();
+    callback->Call(2, argv);
+
+    uv_close((uv_handle_t*)async, close_async_done);
+}
+
+int RocketMQPushConsumer::OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext)
+{
+    RocketMQPushConsumer* consumer = _push_consumer_map[consumer_ptr];
+    if (!consumer)
+    {
+        // TODO: error handle
+        return CConsumeStatus::E_RECONSUME_LATER;
+    }
+
+    ConsumerAckInner ack_inner;
+
+    // create async parameter
+    MessageHandlerParam param;
+    param.consumer = consumer;
+    param.ack = &ack_inner;
+    param.msg = msg_ext;
+
+    // create a new async handler and bind with `RocketMQPushConsumer::HandleMessageInEventLoop`
+    uv_async_t* async = (uv_async_t*)malloc(sizeof(uv_async_t));
+    uv_async_init(uv_default_loop(), async, RocketMQPushConsumer::HandleMessageInEventLoop);
+    async->data = (void*)(&param);
+
+    // send async handler
+    uv_async_send(async);
+
+    // wait for result
+    CConsumeStatus status = ack_inner.WaitResult();
+
+    return status;
+}
+
+}
diff --git a/src/push_consumer.h b/src/push_consumer.h
new file mode 100644
index 0000000..8ee466e
--- /dev/null
+++ b/src/push_consumer.h
@@ -0,0 +1,62 @@
+#ifndef __ROCKETMQ_PUSH_CONSUMER_H__
+#define __ROCKETMQ_PUSH_CONSUMER_H__
+
+#include <CPushConsumer.h>
+#include <uv.h>
+#include <nan.h>
+#include <string>
+
+namespace __node_rocketmq__ {
+
+using v8::Context;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Isolate;
+using v8::Local;
+using v8::Object;
+using v8::String;
+using v8::Value;
+
+class RocketMQPushConsumer : public Nan::ObjectWrap {
+public:
+    static NAN_MODULE_INIT(Init);
+    static int OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext);
+    static std::string GetMessageColumn(char* name, CMessageExt* msg);
+
+private:
+    explicit RocketMQPushConsumer(const char* group_id, const char* instance_name);
+    ~RocketMQPushConsumer();
+
+    static NAN_METHOD(New);
+    static NAN_METHOD(Start);
+    static NAN_METHOD(Shutdown);
+    static NAN_METHOD(Subscribe);
+    static NAN_METHOD(SetListener);
+    static NAN_METHOD(SetSessionCredentials);
+
+    static Nan::Persistent<v8::Function> constructor;
+
+    void SetOptions(Local<Object>);
+    static void HandleMessageInEventLoop(uv_async_t* async);
+
+protected:
+    CPushConsumer* GetConsumer()
+    {
+        return consumer_ptr;
+    }
+
+    Nan::Callback* GetListenerFunction()
+    {
+        Nan::Callback* cb;
+        cb = &listener_func;
+        return cb;
+    }
+
+private:
+    CPushConsumer* consumer_ptr;
+    Nan::Callback listener_func;
+};
+
+}
+
+#endif
diff --git a/src/rocketmq.cpp b/src/rocketmq.cpp
new file mode 100644
index 0000000..bc29609
--- /dev/null
+++ b/src/rocketmq.cpp
@@ -0,0 +1,32 @@
+#include <nan.h>
+
+#include "producer.h"
+#include "push_consumer.h"
+#include "consumer_ack.h"
+
+namespace __node_rocketmq__ {
+
+#if defined(__APPLE__)
+uv_lib_t lib;
+
+NAN_METHOD(DLOpen)
+{
+    Nan::Utf8String filename(info[0]);
+    uv_dlopen(*filename, &lib);
+}
+#endif
+
+NAN_MODULE_INIT(Init)
+{
+    RocketMQProducer::Init(target);
+    RocketMQPushConsumer::Init(target);
+    ConsumerAck::Init(target);
+
+#if defined(__APPLE__)
+    Nan::Set(target, Nan::New("macosDLOpen").ToLocalChecked(), Nan::New<v8::FunctionTemplate>(DLOpen)->GetFunction());
+#endif
+}
+
+NODE_MODULE(rocketmq, Init)
+
+}
diff --git a/src/workers/producer/send_message.h b/src/workers/producer/send_message.h
new file mode 100644
index 0000000..f757319
--- /dev/null
+++ b/src/workers/producer/send_message.h
@@ -0,0 +1,64 @@
+#ifndef __ROCKETMQ_SEND_MESSAGE_H__
+#define __ROCKETMQ_SEND_MESSAGE_H__
+
+#include <nan.h>
+#include <CProducer.h>
+#include <MQClientException.h>
+
+namespace __node_rocketmq__ {
+
+using namespace std;
+
+class ProducerSendMessageWorker : public Nan::AsyncWorker {
+public:
+    ProducerSendMessageWorker(Nan::Callback* callback, RocketMQProducer* producer, CMessage* msg) :
+        AsyncWorker(callback),
+        msg(msg),
+        producer(producer)
+    {
+    }
+
+    ~ProducerSendMessageWorker()
+    {
+        DestroyMessage(msg);
+    }
+
+    void Execute()
+    {
+        try
+        {
+            SendMessageSync(producer->GetProducer(), msg, &send_ret);
+        }
+        catch(const runtime_error e)
+        {
+            SetErrorMessage(e.what());
+        }
+        catch(const std::exception& e)
+        {
+            SetErrorMessage(e.what());
+        }
+    }
+
+    void HandleOKCallback()
+    {
+        Nan::HandleScope scope;
+
+        Local<Value> argv[] = {
+            Nan::Undefined(),
+            Nan::New<v8::Number>((unsigned int)send_ret.sendStatus),
+            Nan::New<v8::String>(send_ret.msgId).ToLocalChecked(),
+            Nan::New<v8::Number>((long long)send_ret.offset)
+        };
+        callback->Call(4, argv);
+    }
+
+private:
+    CMessage* msg;
+    RocketMQProducer* producer;
+
+    CSendResult send_ret;
+};
+
+}
+
+#endif
diff --git a/src/workers/producer/start_or_shutdown.h b/src/workers/producer/start_or_shutdown.h
new file mode 100644
index 0000000..42aae60
--- /dev/null
+++ b/src/workers/producer/start_or_shutdown.h
@@ -0,0 +1,72 @@
+#ifndef __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
+#define __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
+
+#include <nan.h>
+#include <CProducer.h>
+#include <MQClientException.h>
+
+namespace __node_rocketmq__ {
+
+using namespace std;
+
+enum ProducerWorkerType {
+    START_PRODUCER = 0,
+    SHUTDOWN_PRODUCER
+};
+
+class ProducerStartOrShutdownWorker : public Nan::AsyncWorker {
+public:
+    ProducerStartOrShutdownWorker(Nan::Callback* callback, CProducer* producer_ptr, ProducerWorkerType type) :
+        AsyncWorker(callback),
+        producer_ptr(producer_ptr),
+        ret(0),
+        type(type)
+    {
+    }
+
+    ~ProducerStartOrShutdownWorker()
+    {
+    }
+
+    void Execute()
+    {
+        try
+        {
+            switch(type) {
+            case START_PRODUCER:
+                ret = StartProducer(producer_ptr); break;
+            case SHUTDOWN_PRODUCER:
+                ret = ShutdownProducer(producer_ptr); break;
+            default: break;
+            }
+        }
+        catch(const runtime_error e)
+        {
+            SetErrorMessage(e.what());
+        }
+        catch(const exception& e)
+        {
+            SetErrorMessage(e.what());
+        }
+    }
+
+    void HandleOKCallback()
+    {
+        Nan::HandleScope scope;
+
+        Local<Value> argv[] = {
+            Nan::Undefined(),
+            Nan::New<v8::Number>((int)ret),
+        };
+        callback->Call(2, argv);
+    }
+
+private:
+    CProducer* producer_ptr;
+    int ret;
+    ProducerWorkerType type;
+};
+
+}
+
+#endif
diff --git a/src/workers/push_consumer/start_or_shutdown.h b/src/workers/push_consumer/start_or_shutdown.h
new file mode 100644
index 0000000..84371b1
--- /dev/null
+++ b/src/workers/push_consumer/start_or_shutdown.h
@@ -0,0 +1,72 @@
+#ifndef __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
+#define __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
+
+#include <nan.h>
+#include <CPushConsumer.h>
+#include <MQClientException.h>
+
+namespace __node_rocketmq__ {
+
+using namespace std;
+
+enum PushConsumerWorkerType {
+    START_PUSH_CONSUMER = 0,
+    SHUTDOWN_PUSH_CONSUMER
+};
+
+class PushConsumerStartOrShutdownWorker : public Nan::AsyncWorker {
+public:
+    PushConsumerStartOrShutdownWorker(Nan::Callback* callback, CPushConsumer* consumer_ptr, PushConsumerWorkerType type) :
+        AsyncWorker(callback),
+        consumer_ptr(consumer_ptr),
+        ret(0),
+        type(type)
+    {
+    }
+
+    ~PushConsumerStartOrShutdownWorker()
+    {
+    }
+
+    void Execute()
+    {
+        try
+        {
+            switch(type) {
+            case START_PUSH_CONSUMER:
+                ret = StartPushConsumer(consumer_ptr); break;
+            case SHUTDOWN_PUSH_CONSUMER:
+                ret = ShutdownPushConsumer(consumer_ptr); break;
+            default: break;
+            }
+        }
+        catch(const runtime_error e)
+        {
+            SetErrorMessage(e.what());
+        }
+        catch(const exception& e)
+        {
+            SetErrorMessage(e.what());
+        }
+    }
+
+    void HandleOKCallback()
+    {
+        Nan::HandleScope scope;
+
+        Local<Value> argv[] = {
+            Nan::Undefined(),
+            Nan::New<v8::Number>((int)ret),
+        };
+        callback->Call(2, argv);
+    }
+
+private:
+    CPushConsumer* consumer_ptr;
+    int ret;
+    PushConsumerWorkerType type;
+};
+
+}
+
+#endif