Release rocketmq-flume 1.0.0 version
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..485dee6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..07a5fa6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,32 @@
+# RocketMQ Externals
+
+There are some RocketMQ external projects, with the purpose of growing the RocketMQ community.
+
+## RocketMQ-Console-Ng
+A console for RocketMQ
+
+## RocketMQ-JMS
+RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker. Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.
+
+## RocketMQ-Flume-Ng
+
+This project is used to receive and send messages between
+[RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume)
+
+1. Firstly, please get familiar with [RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume).
+2. Ensure that the jar related to [RocketMQ](http://rocketmq.incubator.apache.org/dowloading/releases) exists in local maven repository.
+3. Execute the following command in rocketmq-flume root directory
+
+   `mvn clean install dependency:copy-dependencies`
+
+4. Copy the jar depended by rocketmq-flume to `$FLUME_HOME/lib`(the specific jar will be given later)
+
+
+## RocketMQ-Spark
+
+Apache Spark-Streaming integration with RocketMQ. Both push & pull consumer mode are provided.
+For more details please refer to rocketmq-spark README.md.
+
+## RocketMQ-Docker
+Apache RocketMQ Docker provides Dockerfile and bash scripts for building and running docker image.
+
diff --git a/rocketmq-flume/.gitignore b/rocketmq-flume/.gitignore
new file mode 100644
index 0000000..1d9a6d3
--- /dev/null
+++ b/rocketmq-flume/.gitignore
@@ -0,0 +1,8 @@
+.*
+*.class
+*.jar
+*.war
+*.iml
+.settings
+target
+!.gitignore
\ No newline at end of file
diff --git a/rocketmq-flume/LICENSE b/rocketmq-flume/LICENSE
new file mode 100644
index 0000000..7f77f44
--- /dev/null
+++ b/rocketmq-flume/LICENSE
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (properties) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
\ No newline at end of file
diff --git a/rocketmq-flume/LICENSE-BIN b/rocketmq-flume/LICENSE-BIN
new file mode 100644
index 0000000..7f77f44
--- /dev/null
+++ b/rocketmq-flume/LICENSE-BIN
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (properties) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
\ No newline at end of file
diff --git a/rocketmq-flume/NOTICE b/rocketmq-flume/NOTICE
new file mode 100644
index 0000000..5384857
--- /dev/null
+++ b/rocketmq-flume/NOTICE
@@ -0,0 +1,5 @@
+Apache RocketMQ (incubating)
+Copyright 2016-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
diff --git a/rocketmq-flume/NOTICE-BIN b/rocketmq-flume/NOTICE-BIN
new file mode 100644
index 0000000..34641fa
--- /dev/null
+++ b/rocketmq-flume/NOTICE-BIN
@@ -0,0 +1,18 @@
+Apache RocketMQ (incubating)
+Copyright 2016-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+------
+This product has a bundle Flume:
+
+Apache Flume
+Copyright 2012 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+Portions of this software were developed at
+Cloudera, Inc. (http://www.cloudera.com/).
diff --git a/rocketmq-flume/README.md b/rocketmq-flume/README.md
new file mode 100644
index 0000000..1d7a17e
--- /dev/null
+++ b/rocketmq-flume/README.md
@@ -0,0 +1,124 @@
+rocketmq-flume-ng Sink & Source
+==========================
+
+This project is used to receive and send messages between
+[RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume)
+
+1. Firstly, please get familiar with [RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume).
+2. Ensure that the jar related to [RocketMQ](http://rocketmq.incubator.apache.org/dowloading/releases) exists in local maven repository.
+3. Execute the following command in rocketmq-flume root directory
+
+   `mvn clean install dependency:copy-dependencies`
+
+4. Copy the jar depended by rocketmq-flume to `$FLUME_HOME/lib`(the specific jar will be given later)
+
+## Sink
+
+### Sink configuration instruction
+
+| key           | nullable | default                |description|
+|---------------|----------|------------------------|-----------|
+| nameserver    | false    |                        |nameserver address|
+| topic         | true     | "FLUME_TOPIC"          |topic name|
+| tag           | true     | "FLUME_TAG"            |tag name|
+| producerGroup | true     | "FLUME_PRODUCER_GROUP" |producerGroup name|
+| batchSize     | true     | 1                      |max batch event taking num|
+| maxProcessTime| true     | 1000                   |max batch event taking time,default is 1s|
+
+### Sink example
+
+- Write the Flume configuration file
+
+```
+agent1.sources=source1
+agent1.channels=channel1
+agent1.sinks=sink1
+
+agent1.sources.source1.type=avro
+agent1.sources.source1.bind=0.0.0.0
+agent1.sources.source1.port=15151
+agent1.sources.source1.channels=channel1
+
+agent1.sinks.sink1.type=org.apache.rocketmq.flume.ng.sink.RocketMQSink
+agent1.sinks.sink1.nameserver=x.x.x.x:9876
+agent1.sinks.sink1.channel=channel1
+
+agent1.channels.channel1.type=memory
+agent1.channels.channel1.capacity=100
+agent1.channels.channel1.transactionCapacity=100
+agent1.channels.channel1.keep-alive=3
+```
+
+- Copy the jars below to `$FLUME_HOME/lib`
+
+```
+rocketmq-flume-sink-0.0.1-SNAPSHOT.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target)
+fastjson-1.2.12.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target/dependency)
+netty-all-4.0.36.Final.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target/dependency)
+rocketmq-client-4.0.0-incubating.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target/dependency)
+rocketmq-common-4.0.0-incubating.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target/dependency)
+rocketmq-remoting-4.0.0-incubating.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target/dependency)
+```
+
+- Execute the command and check the console output
+
+```
+shell1> $FLUME_HOME/bin/flume-ng agent -c conf -f conf/flume.conf -n agent1 -Dflume.root.logger=INFO,console
+shell2> $FLUME_HOME/bin/flume-ng avro-client -H localhost -p 15151 -F $FLUME_HOME/README
+```
+
+
+## Source
+
+### Source configuration instruction
+
+
+| key           | nullable | default              |description|
+|---------------|----------|----------------------|-----------|
+| nameserver    | false    |                      |nameserver address|
+| topic         | true     |"FLUME_TOPIC"         |topic name|
+| tag           | true     |"FLUME_TAG"           |tag name|
+| consumerGroup | true     |"FLUME_CONSUMER_GROUP"|consumerGroup name|
+| messageModel  | true     | "BROADCASTING"       |RocketMQ message model,"BROADCASTING" or "CLUSTERING"|
+| batchSize     | true     | 32                   |batch consuming messages from RocketMq max num|
+
+
+### Source example
+- Write the Flume configuration file
+
+```
+agent1.sources=source1
+agent1.channels=channel1
+agent1.sinks=sink1
+
+agent1.sources.source1.type=org.apache.rocketmq.flume.ng.source.RocketMQSource
+agent1.sources.source1.nameserver=x.x.x.x:9876
+agent1.sources.source1.channels=channel1
+
+agent1.sinks.sink1.type=logger
+agent1.sinks.sink1.channel=channel1
+
+agent1.channels.channel1.type=memory
+agent1.channels.channel1.capacity=100
+agent1.channels.channel1.transactionCapacity=100
+agent1.channels.channel1.keep-alive=3
+```
+
+- Copy the jars below to `$FLUME_HOME/lib`
+
+```
+rocketmq-flume-souce-0.0.1-SNAPSHOT.jar (path: $PROJECT_HOME/rocketmq-flume-source/target)
+fastjson-1.2.12.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target/dependency)
+netty-all-4.0.36.Final.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target/dependency)
+rocketmq-client-4.0.0-incubating.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target/dependency)
+rocketmq-common-4.0.0-incubating.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target/dependency)
+rocketmq-remoting-4.0.0-incubating.jar (path: $PROJECT_HOME/rocketmq-flume-sink/target/dependency)
+```
+
+- Send some test message to RocketMQ
+
+- Execute the command and check the console output
+
+```
+$FLUME_HOME/bin/flume-ng agent -c conf -f conf/flume.conf -n agent1 -Dflume.root.logger=INFO,console
+```
diff --git a/rocketmq-flume/pom.xml b/rocketmq-flume/pom.xml
new file mode 100644
index 0000000..a2fda61
--- /dev/null
+++ b/rocketmq-flume/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache</groupId>
+    <artifactId>rocketmq-flume</artifactId>
+    <version>1.0.0</version>
+    <name>rocketmq-flume</name>
+    <modules>
+        <module>rocketmq-flume-sink</module>
+        <module>rocketmq-flume-souce</module>
+    </modules>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <!--maven properties -->
+        <maven.test.skip>false</maven.test.skip>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <!-- compiler settings properties -->
+        <maven.compiler.source>1.7</maven.compiler.source>
+        <maven.compiler.target>1.7</maven.compiler.target>
+        <flume.version>1.7.0</flume.version>
+        <rocketmq.version>4.0.0-incubating</rocketmq.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>flume-ng-core</artifactId>
+            <version>${flume.version}</version>
+        </dependency>
+        <!--test -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+            <version>4.12</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-namesrv</artifactId>
+            <version>${rocketmq.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-classic</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+            <version>${rocketmq.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-classic</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+    </build>
+    <packaging>pom</packaging>
+</project>
\ No newline at end of file
diff --git a/rocketmq-flume/rocketmq-flume-sink/pom.xml b/rocketmq-flume/rocketmq-flume-sink/pom.xml
new file mode 100644
index 0000000..30950d9
--- /dev/null
+++ b/rocketmq-flume/rocketmq-flume-sink/pom.xml
@@ -0,0 +1,27 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>rocketmq-flume</artifactId>
+        <version>1.0.0</version>
+    </parent>
+    <artifactId>rocketmq-flume-sink</artifactId>
+</project>
diff --git a/rocketmq-flume/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSink.java b/rocketmq-flume/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSink.java
new file mode 100644
index 0000000..6a04f48
--- /dev/null
+++ b/rocketmq-flume/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSink.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.sink;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.BATCH_SIZE_CONFIG;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.BATCH_SIZE_DEFAULT;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.MAX_PROCESS_TIME_CONFIG;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.MAX_PROCESS_TIME_DEFAULT;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.NAME_SERVER_CONFIG;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.PRODUCER_GROUP_CONFIG;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.PRODUCER_GROUP_DEFAULT;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TAG_CONFIG;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TAG_DEFAULT;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TOPIC_CONFIG;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TOPIC_DEFAULT;
+
+/**
+ *
+ */
+public class RocketMQSink extends AbstractSink implements Configurable {
+
+    private static final Logger log = LoggerFactory.getLogger(RocketMQSink.class);
+
+    private String nameServer;
+    private String topic;
+    private String tag;
+    private String producerGroup;
+    private int batchSize;
+    private long maxProcessTime;
+
+    /** Monitoring counter. */
+    private SinkCounter sinkCounter;
+
+    private DefaultMQProducer producer;
+
+    @Override
+    public void configure(Context context) {
+
+        nameServer = context.getString(NAME_SERVER_CONFIG);
+        if (nameServer == null) {
+            throw new ConfigurationException("NameServer must not be null");
+        }
+
+        topic = context.getString(TOPIC_CONFIG, TOPIC_DEFAULT);
+        tag = context.getString(TAG_CONFIG, TAG_DEFAULT);
+        producerGroup = context.getString(PRODUCER_GROUP_CONFIG, PRODUCER_GROUP_DEFAULT);
+        batchSize = context.getInteger(BATCH_SIZE_CONFIG, BATCH_SIZE_DEFAULT);
+        maxProcessTime = context.getLong(MAX_PROCESS_TIME_CONFIG, MAX_PROCESS_TIME_DEFAULT);
+
+        if (sinkCounter == null) {
+            sinkCounter = new SinkCounter(getName());
+        }
+    }
+
+    @Override
+    public synchronized void start() {
+
+        producer = new DefaultMQProducer(producerGroup);
+        producer.setNamesrvAddr(nameServer);
+        try {
+            producer.start();
+        } catch (MQClientException e) {
+            sinkCounter.incrementConnectionFailedCount();
+
+            log.error("RocketMQ producer start failed", e);
+            throw new FlumeException("Failed to start RocketMQ producer", e);
+        }
+
+        sinkCounter.incrementConnectionCreatedCount();
+        sinkCounter.start();
+
+        super.start();
+    }
+
+    @Override
+    public Status process() throws EventDeliveryException {
+
+        Channel channel = getChannel();
+        Transaction transaction = null;
+
+        try {
+            transaction = channel.getTransaction();
+            transaction.begin();
+
+            /*
+            batch take
+             */
+            List<Event> events = new ArrayList<>();
+            long beginTime = System.currentTimeMillis();
+            while (true) {
+                Event event = channel.take();
+                if (event != null) {
+                    events.add(event);
+                }
+
+                if (events.size() == batchSize
+                    || System.currentTimeMillis() - beginTime > maxProcessTime) {
+                    break;
+                }
+            }
+
+            if (events.size() == 0) {
+                sinkCounter.incrementBatchEmptyCount();
+
+                transaction.rollback();
+                return Status.BACKOFF;
+            }
+            /*
+            async send
+             */
+            CountDownLatch latch = new CountDownLatch(events.size());
+            AtomicInteger errorNum = new AtomicInteger();
+
+            for (Event event : events) {
+                byte[] body = event.getBody();
+                Message message = new Message(topic, tag, body);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Processing event,body={}", new String(body, "UTF-8"));
+                }
+                producer.send(message, new SendCallBackHandler(message, latch, errorNum));
+            }
+            latch.await();
+
+            sinkCounter.addToEventDrainAttemptCount(events.size());
+
+            if (errorNum.get() > 0) {
+                log.error("errorNum=" + errorNum + ",transaction will rollback");
+                transaction.rollback();
+                return Status.BACKOFF;
+            } else {
+                transaction.commit();
+
+                sinkCounter.addToEventDrainSuccessCount(events.size());
+
+                return Status.READY;
+            }
+
+        } catch (Exception e) {
+            log.error("Failed to processing event", e);
+
+            if (transaction != null) {
+                try {
+                    transaction.rollback();
+                } catch (Exception ex) {
+                    log.error("Failed to rollback transaction", ex);
+                    throw new EventDeliveryException("Failed to rollback transaction", ex);
+                }
+            }
+
+            return Status.BACKOFF;
+
+        } finally {
+            if (transaction != null) {
+                transaction.close();
+            }
+        }
+    }
+
+    @Override public synchronized void stop() {
+        producer.shutdown();
+
+        sinkCounter.incrementConnectionClosedCount();
+        sinkCounter.stop();
+
+        super.stop();
+    }
+
+    public class SendCallBackHandler implements SendCallback {
+
+        private final Message message;
+        private final CountDownLatch latch;
+        private final AtomicInteger errorNum;
+
+        SendCallBackHandler(Message message, CountDownLatch latch, AtomicInteger errorNum) {
+            this.message = message;
+            this.latch = latch;
+            this.errorNum = errorNum;
+        }
+
+        @Override
+        public void onSuccess(SendResult sendResult) {
+
+            latch.countDown();
+
+            if (log.isDebugEnabled()) {
+                try {
+                    log.debug("Sent event,body={},sendResult={}", new String(message.getBody(), "UTF-8"), sendResult);
+                } catch (UnsupportedEncodingException e) {
+                    log.error("Encoding error", e);
+                }
+            }
+        }
+
+        @Override
+        public void onException(Throwable e) {
+            latch.countDown();
+            errorNum.incrementAndGet();
+
+            try {
+                log.error("Message publish failed,body=" + new String(message.getBody(), "UTF-8"), e);
+            } catch (UnsupportedEncodingException e1) {
+                log.error("Encoding error", e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-flume/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java b/rocketmq-flume/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java
new file mode 100644
index 0000000..eafc771
--- /dev/null
+++ b/rocketmq-flume/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.sink;
+
+/**
+ *
+ */
+public class RocketMQSinkConstants {
+
+    public static final String NAME_SERVER_CONFIG = "nameserver";
+
+    public static final String TOPIC_CONFIG = "topic";
+    public static final String TOPIC_DEFAULT = "FLUME_TOPIC";
+
+    public static final String TAG_CONFIG = "tag";
+    public static final String TAG_DEFAULT = "FLUME_TAG";
+
+    public static final String PRODUCER_GROUP_CONFIG = "producerGroup";
+    public static final String PRODUCER_GROUP_DEFAULT = "FLUME_PRODUCER_GROUP";
+
+    public static final String BATCH_SIZE_CONFIG = "batchSize";
+    public static final int BATCH_SIZE_DEFAULT = 1;
+
+    public static final String MAX_PROCESS_TIME_CONFIG = "maxProcessTime";
+    public static final long MAX_PROCESS_TIME_DEFAULT = 1000;
+}
diff --git a/rocketmq-flume/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java b/rocketmq-flume/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java
new file mode 100644
index 0000000..2497dfe
--- /dev/null
+++ b/rocketmq-flume/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.sink;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import java.io.UnsupportedEncodingException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.commons.lang.time.DateFormatUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.BATCH_SIZE_CONFIG;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.NAME_SERVER_CONFIG;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TAG_CONFIG;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TAG_DEFAULT;
+import static org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TOPIC_DEFAULT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ */
+public class RocketMQSinkTest {
+
+    private static final Logger log = org.slf4j.LoggerFactory.getLogger(RocketMQSinkTest.class);
+
+    private static String nameServer = "localhost:9876";
+
+    private static NamesrvController namesrvController;
+    private static BrokerController brokerController;
+
+    private DefaultMQPullConsumer consumer;
+    private String tag = TAG_DEFAULT + "_SINK_TEST_" + new Random().nextInt(99);
+    private String consumerGroup = "CONSUMER_GROUP_SINK_TEST";
+    private int batchSize = 100;
+
+    @BeforeClass
+    public static void startMQ() throws Exception {
+
+        /*
+        start nameserver
+         */
+        startNamesrv();
+
+        /*
+        start broker
+         */
+        startBroker();
+
+        Thread.sleep(2000);
+    }
+
+    private static void startNamesrv() throws Exception {
+
+        NamesrvConfig namesrvConfig = new NamesrvConfig();
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(9876);
+
+        namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
+        boolean initResult = namesrvController.initialize();
+        if (!initResult) {
+            namesrvController.shutdown();
+            throw new Exception();
+        }
+        namesrvController.start();
+    }
+
+    private static void startBroker() throws Exception {
+
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setNamesrvAddr(nameServer);
+        brokerConfig.setBrokerId(MixAll.MASTER_ID);
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(10911);
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
+        brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
+        boolean initResult = brokerController.initialize();
+        if (!initResult) {
+            brokerController.shutdown();
+            throw new Exception();
+        }
+        brokerController.start();
+    }
+
+    @Test
+    public void testEvent() throws MQClientException, InterruptedException, EventDeliveryException, RemotingException, MQBrokerException, UnsupportedEncodingException {
+
+        /*
+        start sink
+         */
+        Context context = new Context();
+        context.put(NAME_SERVER_CONFIG, nameServer);
+        context.put(TAG_CONFIG, tag);
+        RocketMQSink sink = new RocketMQSink();
+        Configurables.configure(sink, context);
+        MemoryChannel channel = new MemoryChannel();
+        Configurables.configure(channel, context);
+        sink.setChannel(channel);
+        sink.start();
+
+        /*
+        mock flume source
+         */
+        String sendMsg = "\"Hello RocketMQ\"" + "," + DateFormatUtils.format(new Date(), "yyyy-MM-DD hh:mm:ss");
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        Event event = EventBuilder.withBody(sendMsg.getBytes(), null);
+        channel.put(event);
+        tx.commit();
+        tx.close();
+        log.info("publish message : {}", sendMsg);
+        Sink.Status status = sink.process();
+        if (status == Sink.Status.BACKOFF) {
+            fail("Error");
+        }
+
+        sink.stop();
+
+        /*
+        consumer message
+         */
+        consumer = new DefaultMQPullConsumer(consumerGroup);
+        consumer.setNamesrvAddr(nameServer);
+        consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
+        consumer.registerMessageQueueListener(TOPIC_DEFAULT, null);
+        consumer.start();
+
+        String receiveMsg = null;
+        Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(TOPIC_DEFAULT);
+        for (MessageQueue queue : queues) {
+            long offset = getMessageQueueOffset(queue);
+            PullResult pullResult = consumer.pull(queue, tag, offset, 32);
+
+            if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                for (MessageExt message : pullResult.getMsgFoundList()) {
+                    byte[] body = message.getBody();
+                    receiveMsg = new String(body, "UTF-8");
+                    log.info("receive message : {}", receiveMsg);
+                }
+
+                long nextBeginOffset = pullResult.getNextBeginOffset();
+                putMessageQueueOffset(queue, nextBeginOffset);
+            }
+        }
+        /*
+        wait for processQueueTable init
+         */
+        Thread.sleep(1000);
+
+        consumer.shutdown();
+
+        assertEquals(sendMsg, receiveMsg);
+    }
+
+    @Test
+    public void testBatchEvent() throws MQClientException, InterruptedException, EventDeliveryException, RemotingException, MQBrokerException, UnsupportedEncodingException {
+
+        /*
+        start sink
+         */
+        Context context = new Context();
+        context.put(NAME_SERVER_CONFIG, nameServer);
+        context.put(TAG_CONFIG, tag);
+        context.put(BATCH_SIZE_CONFIG, String.valueOf(batchSize));
+        RocketMQSink sink = new RocketMQSink();
+        Configurables.configure(sink, context);
+        MemoryChannel channel = new MemoryChannel();
+        Configurables.configure(channel, context);
+        sink.setChannel(channel);
+        sink.start();
+
+        /*
+        mock flume source
+         */
+        Map<String, String> msgs = new HashMap<>();
+
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        int sendNum = 0;
+        for (int i = 0; i < batchSize; i++) {
+            String sendMsg = "\"Hello RocketMQ\"" + "," + DateFormatUtils.format(new Date(), "yyyy-MM-DD hh:mm:ss:SSSS");
+            Event event = EventBuilder.withBody(sendMsg.getBytes(), null);
+            channel.put(event);
+            log.info("publish message : {}", sendMsg);
+            String[] sendMsgKv = sendMsg.split(",");
+            msgs.put(sendMsgKv[1], sendMsgKv[0]);
+            sendNum++;
+            Thread.sleep(10);
+        }
+        log.info("send message num={}", sendNum);
+
+        tx.commit();
+        tx.close();
+        Sink.Status status = sink.process();
+        if (status == Sink.Status.BACKOFF) {
+            fail("Error");
+        }
+
+        sink.stop();
+
+        /*
+        consumer message
+         */
+        consumer = new DefaultMQPullConsumer(consumerGroup);
+        consumer.setNamesrvAddr(nameServer);
+        consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
+        consumer.registerMessageQueueListener(TOPIC_DEFAULT, null);
+        consumer.start();
+
+        int receiveNum = 0;
+        String receiveMsg = null;
+        Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(TOPIC_DEFAULT);
+        for (MessageQueue queue : queues) {
+            long offset = getMessageQueueOffset(queue);
+            PullResult pullResult = consumer.pull(queue, tag, offset, batchSize);
+
+            if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                for (MessageExt message : pullResult.getMsgFoundList()) {
+                    byte[] body = message.getBody();
+                    receiveMsg = new String(body, "UTF-8");
+                    String[] receiveMsgKv = receiveMsg.split(",");
+                    msgs.remove(receiveMsgKv[1]);
+                    log.info("receive message : {}", receiveMsg);
+                    receiveNum++;
+                }
+                long nextBeginOffset = pullResult.getNextBeginOffset();
+                putMessageQueueOffset(queue, nextBeginOffset);
+            }
+        }
+        log.info("receive message num={}", receiveNum);
+
+        /*
+        wait for processQueueTable init
+         */
+        Thread.sleep(1000);
+
+        consumer.shutdown();
+
+        assertEquals(msgs.size(), 0);
+    }
+
+    @Test
+    public void testNullEvent() throws MQClientException, InterruptedException, EventDeliveryException, RemotingException, MQBrokerException, UnsupportedEncodingException {
+
+        /*
+        start sink
+         */
+        Context context = new Context();
+        context.put(NAME_SERVER_CONFIG, nameServer);
+        context.put(TAG_CONFIG, tag);
+        RocketMQSink sink = new RocketMQSink();
+        Configurables.configure(sink, context);
+        MemoryChannel channel = new MemoryChannel();
+        Configurables.configure(channel, context);
+        sink.setChannel(channel);
+        sink.start();
+
+        Sink.Status status = sink.process();
+
+        assertEquals(status, Sink.Status.BACKOFF);
+
+        sink.stop();
+    }
+
+    private long getMessageQueueOffset(MessageQueue queue) throws MQClientException {
+
+        long offset = consumer.fetchConsumeOffset(queue, false);
+        if (offset < 0) {
+            offset = 0;
+        }
+
+        return offset;
+    }
+
+    private void putMessageQueueOffset(MessageQueue queue, long offset) throws MQClientException {
+        consumer.updateConsumeOffset(queue, offset);
+    }
+
+    @AfterClass
+    public static void stop() {
+
+        if (brokerController != null) {
+            brokerController.shutdown();
+        }
+
+        if (namesrvController != null) {
+            namesrvController.shutdown();
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/rocketmq-flume/rocketmq-flume-sink/src/test/resources/log4j.properties b/rocketmq-flume/rocketmq-flume-sink/src/test/resources/log4j.properties
new file mode 100644
index 0000000..dd15190
--- /dev/null
+++ b/rocketmq-flume/rocketmq-flume-sink/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootLogger = DEBUG, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.rocketmq = DEBUG
diff --git a/rocketmq-flume/rocketmq-flume-souce/pom.xml b/rocketmq-flume/rocketmq-flume-souce/pom.xml
new file mode 100644
index 0000000..6063bb4
--- /dev/null
+++ b/rocketmq-flume/rocketmq-flume-souce/pom.xml
@@ -0,0 +1,27 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>rocketmq-flume</artifactId>
+        <version>1.0.0</version>
+    </parent>
+    <artifactId>rocketmq-flume-souce</artifactId>
+</project>
diff --git a/rocketmq-flume/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSource.java b/rocketmq-flume/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSource.java
new file mode 100644
index 0000000..9860c7d
--- /dev/null
+++ b/rocketmq-flume/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSource.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.souce;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.source.AbstractPollableSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.BATCH_SIZE_CONFIG;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.BATCH_SIZE_DEFAULT;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.CONSUMER_GROUP_CONFIG;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.CONSUMER_GROUP_DEFAULT;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.HEADER_TAG_NAME;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.HEADER_TOPIC_NAME;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.MESSAGE_MODEL_CONFIG;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.MESSAGE_MODEL_DEFAULT;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.NAME_SERVER_CONFIG;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TAG_CONFIG;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TAG_DEFAULT;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TOPIC_CONFIG;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TOPIC_DEFAULT;
+
+/**
+ *
+ */
+public class RocketMQSource extends AbstractPollableSource implements Configurable {
+
+    private static final Logger log = LoggerFactory.getLogger(RocketMQSource.class);
+
+    private String nameServer;
+    private String topic;
+    private String tag;
+    private String consumerGroup;
+    private String messageModel;
+    private Integer batchSize;
+
+    /** Monitoring counter. */
+    private SourceCounter sourceCounter;
+
+    DefaultMQPullConsumer consumer;
+
+    @Override protected void doConfigure(Context context) throws FlumeException {
+
+        nameServer = context.getString(NAME_SERVER_CONFIG);
+        if (nameServer == null) {
+            throw new ConfigurationException("NameServer must not be null");
+        }
+
+        topic = context.getString(TOPIC_CONFIG, TOPIC_DEFAULT);
+        tag = context.getString(TAG_CONFIG, TAG_DEFAULT);
+        consumerGroup = context.getString(CONSUMER_GROUP_CONFIG, CONSUMER_GROUP_DEFAULT);
+        messageModel = context.getString(MESSAGE_MODEL_CONFIG, MESSAGE_MODEL_DEFAULT);
+        batchSize = context.getInteger(BATCH_SIZE_CONFIG, BATCH_SIZE_DEFAULT);
+
+        if (sourceCounter == null) {
+            sourceCounter = new SourceCounter(getName());
+        }
+    }
+
+    @Override
+    protected void doStart() throws FlumeException {
+
+        consumer = new DefaultMQPullConsumer(consumerGroup);
+        consumer.setNamesrvAddr(nameServer);
+        consumer.setMessageModel(MessageModel.valueOf(messageModel));
+        consumer.registerMessageQueueListener(topic, null);
+
+        try {
+            consumer.start();
+        } catch (MQClientException e) {
+            log.error("RocketMQ consumer start failed", e);
+            throw new FlumeException("Failed to start RocketMQ consumer", e);
+        }
+
+        sourceCounter.start();
+    }
+
+    @Override
+    protected Status doProcess() throws EventDeliveryException {
+
+        List<Event> events = new ArrayList<>();
+        Map<MessageQueue, Long> offsets = new HashMap<>();
+        Event event;
+        Map<String, String> headers;
+
+        try {
+            Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(topic);
+            for (MessageQueue queue : queues) {
+                long offset = getMessageQueueOffset(queue);
+                PullResult pullResult = consumer.pull(queue, tag, offset, batchSize);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Pull from queueId:{}, offset:{}, pullResult:{}", queue.getQueueId(), offset, pullResult);
+                }
+
+                if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                    for (MessageExt msg : pullResult.getMsgFoundList()) {
+                        byte[] body = msg.getBody();
+
+                        headers = new HashMap<>();
+                        headers.put(HEADER_TOPIC_NAME, topic);
+                        headers.put(HEADER_TAG_NAME, tag);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Processing message,body={}", new String(body, "UTF-8"));
+                        }
+
+                        event = EventBuilder.withBody(body, headers);
+                        events.add(event);
+                    }
+                    offsets.put(queue, pullResult.getNextBeginOffset());
+                }
+            }
+
+            if (events.size() > 0) {
+                sourceCounter.incrementAppendBatchReceivedCount();
+                sourceCounter.addToEventReceivedCount(events.size());
+
+                getChannelProcessor().processEventBatch(events);
+
+                sourceCounter.incrementAppendBatchAcceptedCount();
+                sourceCounter.addToEventAcceptedCount(events.size());
+
+                events.clear();
+            }
+
+            for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
+                putMessageQueueOffset(entry.getKey(), entry.getValue());
+            }
+
+        } catch (Exception e) {
+            log.error("Failed to consumer message", e);
+            return Status.BACKOFF;
+        }
+
+        return Status.READY;
+    }
+
+    @Override
+    protected void doStop() throws FlumeException {
+        sourceCounter.stop();
+
+        consumer.shutdown();
+    }
+
+    private long getMessageQueueOffset(MessageQueue queue) throws MQClientException {
+        long offset = consumer.fetchConsumeOffset(queue, false);
+        if (offset < 0) {
+            offset = 0;
+        }
+
+        return offset;
+    }
+
+    private void putMessageQueueOffset(MessageQueue queue, long offset) throws MQClientException {
+        consumer.updateConsumeOffset(queue, offset);
+    }
+}
diff --git a/rocketmq-flume/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceConstants.java b/rocketmq-flume/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceConstants.java
new file mode 100644
index 0000000..48a7b02
--- /dev/null
+++ b/rocketmq-flume/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.souce;
+
+/**
+ *
+ */
+public class RocketMQSourceConstants {
+
+    public static final String NAME_SERVER_CONFIG = "nameserver";
+
+    public static final String TOPIC_CONFIG = "topic";
+    public static final String TOPIC_DEFAULT = "FLUME_TOPIC";
+
+    public static final String TAG_CONFIG = "tag";
+    public static final String TAG_DEFAULT = "FLUME_TAG";
+
+    public static final String CONSUMER_GROUP_CONFIG = "consumerGroup";
+    public static final String CONSUMER_GROUP_DEFAULT = "FLUME_CONSUMER_GROUP";
+
+    public static final String MESSAGE_MODEL_CONFIG = "messageModel";
+    public static final String MESSAGE_MODEL_DEFAULT = "BROADCASTING";
+
+    public static final String BATCH_SIZE_CONFIG = "batchSize";
+    public static final int BATCH_SIZE_DEFAULT = 32;
+
+    public static final String HEADER_TOPIC_NAME = "topic";
+    public static final String HEADER_TAG_NAME = "tag";
+}
diff --git a/rocketmq-flume/rocketmq-flume-souce/src/test/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceTest.java b/rocketmq-flume/rocketmq-flume-souce/src/test/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceTest.java
new file mode 100644
index 0000000..eb05c2b
--- /dev/null
+++ b/rocketmq-flume/rocketmq-flume-souce/src/test/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.souce;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.lang.time.DateFormatUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSource;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.NAME_SERVER_CONFIG;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TAG_CONFIG;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TAG_DEFAULT;
+import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TOPIC_DEFAULT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ */
+public class RocketMQSourceTest {
+
+    private static final Logger log = LoggerFactory.getLogger(RocketMQSourceTest.class);
+
+    private static String nameServer = "localhost:9876";
+
+    private static NamesrvController namesrvController;
+    private static BrokerController brokerController;
+
+    private String tag = TAG_DEFAULT + "_SOURCE_TEST_" + new Random().nextInt(99);
+    private String producerGroup = "PRODUCER_GROUP_SOURCE_TEST";
+
+    @BeforeClass
+    public static void startMQ() throws Exception {
+
+        /*
+        start nameserver
+         */
+        startNamesrv();
+
+        /*
+        start broker
+         */
+        startBroker();
+
+        Thread.sleep(2000);
+    }
+
+    private static void startNamesrv() throws Exception {
+
+        NamesrvConfig namesrvConfig = new NamesrvConfig();
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(9876);
+
+        namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
+        boolean initResult = namesrvController.initialize();
+        if (!initResult) {
+            namesrvController.shutdown();
+            throw new Exception();
+        }
+        namesrvController.start();
+    }
+
+    private static void startBroker() throws Exception {
+
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setNamesrvAddr(nameServer);
+        brokerConfig.setBrokerId(MixAll.MASTER_ID);
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(10911);
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
+        brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
+        boolean initResult = brokerController.initialize();
+        if (!initResult) {
+            brokerController.shutdown();
+            throw new Exception();
+        }
+        brokerController.start();
+    }
+
+    @Test
+    public void testEvent() throws EventDeliveryException, MQBrokerException, MQClientException, InterruptedException, UnsupportedEncodingException {
+
+        // publish test message
+        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+        producer.setNamesrvAddr(nameServer);
+
+        String sendMsg = "\"Hello Flume\"" + "," + DateFormatUtils.format(new Date(), "yyyy-MM-DD hh:mm:ss");
+
+        try {
+            producer.start();
+
+            Message msg = new Message(TOPIC_DEFAULT, tag, sendMsg.getBytes("UTF-8"));
+            SendResult sendResult = producer.send(msg);
+            log.info("publish message : {}, sendResult:{}", sendMsg, sendResult);
+        } catch (Exception e) {
+            throw new MQClientException("Failed to publish messages", e);
+        } finally {
+            producer.shutdown();
+        }
+
+        // start source
+        Context context = new Context();
+        context.put(NAME_SERVER_CONFIG, nameServer);
+        context.put(TAG_CONFIG, tag);
+        Channel channel = new MemoryChannel();
+        Configurables.configure(channel, context);
+        List<Channel> channels = new ArrayList<>();
+        channels.add(channel);
+        ChannelSelector channelSelector = new ReplicatingChannelSelector();
+        channelSelector.setChannels(channels);
+        ChannelProcessor channelProcessor = new ChannelProcessor(channelSelector);
+
+        RocketMQSource source = new RocketMQSource();
+        source.setChannelProcessor(channelProcessor);
+        Configurables.configure(source, context);
+        source.start();
+        PollableSource.Status status = source.process();
+        if (status == PollableSource.Status.BACKOFF) {
+            fail("Error");
+        }
+        /*
+        wait for processQueueTable init
+         */
+        Thread.sleep(1000);
+
+        source.stop();
+
+        /*
+        mock flume sink
+         */
+        Transaction transaction = channel.getTransaction();
+        transaction.begin();
+        Event event = channel.take();
+        if (event == null) {
+            transaction.commit();
+            fail("Error");
+        }
+        byte[] body = event.getBody();
+        String receiveMsg = new String(body, "UTF-8");
+        log.info("receive message : {}", receiveMsg);
+
+        assertEquals(sendMsg, receiveMsg);
+    }
+
+    @AfterClass
+    public static void stop() {
+
+        if (brokerController != null) {
+            brokerController.shutdown();
+        }
+
+        if (namesrvController != null) {
+            namesrvController.shutdown();
+        }
+    }
+}
diff --git a/rocketmq-flume/rocketmq-flume-souce/src/test/resources/log4j.properties b/rocketmq-flume/rocketmq-flume-souce/src/test/resources/log4j.properties
new file mode 100644
index 0000000..dd15190
--- /dev/null
+++ b/rocketmq-flume/rocketmq-flume-souce/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootLogger = DEBUG, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.rocketmq = DEBUG
diff --git a/rocketmq-flume/style/copyright/Apache.xml b/rocketmq-flume/style/copyright/Apache.xml
new file mode 100644
index 0000000..7ca71b5
--- /dev/null
+++ b/rocketmq-flume/style/copyright/Apache.xml
@@ -0,0 +1,24 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <copyright>
+        <option name="myName" value="Apache"/>
+        <option name="notice"
+                value="Licensed to the Apache Software Foundation (ASF) under one or more&#10;contributor license agreements.  See the NOTICE file distributed with&#10;this work for additional information regarding copyright ownership.&#10;The ASF licenses this file to You under the Apache License, Version 2.0&#10;(the &quot;License&quot;); you may not use this file except in compliance with&#10;the License.  You may obtain a copy of the License at&#10;&#10;    http://www.apache.org/licenses/LICENSE-2.0&#10;&#10;Unless required by applicable law or agreed to in writing, software&#10;distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#10;WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#10;See the License for the specific language governing permissions and&#10;limitations under the License."/>
+    </copyright>
+</component>
\ No newline at end of file
diff --git a/rocketmq-flume/style/copyright/profiles_settings.xml b/rocketmq-flume/style/copyright/profiles_settings.xml
new file mode 100644
index 0000000..d326b8c
--- /dev/null
+++ b/rocketmq-flume/style/copyright/profiles_settings.xml
@@ -0,0 +1,64 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <settings default="Apache">
+        <module2copyright>
+            <element module="All" copyright="Apache"/>
+        </module2copyright>
+        <LanguageOptions name="GSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="HTML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JAVA">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="addBlankAfter" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSPX">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="MXML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="Properties">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="SPI">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="XML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="__TEMPLATE__">
+            <option name="separateBefore" value="true"/>
+            <option name="lenBefore" value="1"/>
+        </LanguageOptions>
+    </settings>
+</component>
\ No newline at end of file
diff --git a/rocketmq-flume/style/rmq_checkstyle.xml b/rocketmq-flume/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..776b305
--- /dev/null
+++ b/rocketmq-flume/style/rmq_checkstyle.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding -->
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <!--To configure the check to report on the first instance in each file-->
+    <module name="FileTabCharacter"/>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//FIXME"/>
+        <property name="message" value="Recommended fix FIXME task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//TODO"/>
+        <property name="message" value="Recommended fix TODO task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="@alibaba"/>
+        <property name="message" value="Recommended remove @alibaba keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@taobao"/>
+        <property name="message" value="Recommended remove @taobao keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@author"/>
+        <property name="message" value="Recommended remove @author tag in javadoc!"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports">
+            <property name="processJavadoc" value="true"/>
+        </module>
+        <module name="RedundantImport"/>
+
+        <!--<module name="IllegalImport" />-->
+
+        <!--Checks that classes that override equals() also override hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <module name="PackageName"/>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName"/>
+        <module name="TypeName"/>
+        <!--Checks that there are no import statements that use the * notation-->
+        <module name="AvoidStarImport"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="NoWhitespaceBefore"/>
+        <module name="WhitespaceAfter"/>
+        <module name="NoWhitespaceAfter"/>
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+        </module>
+        <module name="Indentation"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+    </module>
+</module>
diff --git a/rocketmq-flume/style/rmq_codeStyle.xml b/rocketmq-flume/style/rmq_codeStyle.xml
new file mode 100644
index 0000000..c727f67
--- /dev/null
+++ b/rocketmq-flume/style/rmq_codeStyle.xml
@@ -0,0 +1,143 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<code_scheme name="rocketmq">
+    <option name="USE_SAME_INDENTS" value="true"/>
+    <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/>
+    <option name="OTHER_INDENT_OPTIONS">
+        <value>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+            <option name="USE_TAB_CHARACTER" value="false"/>
+            <option name="SMART_TABS" value="false"/>
+            <option name="LABEL_INDENT_SIZE" value="0"/>
+            <option name="LABEL_INDENT_ABSOLUTE" value="false"/>
+            <option name="USE_RELATIVE_INDENTS" value="false"/>
+        </value>
+    </option>
+    <option name="PREFER_LONGER_NAMES" value="false"/>
+    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+        <value/>
+    </option>
+    <option name="IMPORT_LAYOUT_TABLE">
+        <value>
+            <package name="" withSubpackages="true" static="false"/>
+            <emptyLine/>
+            <package name="" withSubpackages="true" static="true"/>
+        </value>
+    </option>
+    <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+    <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+    <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+    <option name="JD_KEEP_INVALID_TAGS" value="false"/>
+    <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+    <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+    <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+    <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+    <option name="WHILE_ON_NEW_LINE" value="true"/>
+    <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+    <option name="ALIGN_MULTILINE_FOR" value="false"/>
+    <option name="SPACE_AFTER_TYPE_CAST" value="true"/>
+    <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+    <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+    <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+    <option name="LABELED_STATEMENT_WRAP" value="1"/>
+    <option name="WRAP_COMMENTS" value="true"/>
+    <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+    <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+    <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+    <JavaCodeStyleSettings>
+        <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+    </JavaCodeStyleSettings>
+    <XML>
+        <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+    </XML>
+    <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+        <option name="INDENT_SIZE" value="2"/>
+    </ADDITIONAL_INDENT_OPTIONS>
+    <codeStyleSettings language="Groovy">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="HOCON">
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="JAVA">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+        <option name="LABELED_STATEMENT_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="JSON">
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="Scala">
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="XML">
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+</code_scheme>