Merge pull request #8 from duhenglucky/sdk_polish

 Add pull consumer interface and polished Admin interface
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
index 702d9a5..46f1e79 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
@@ -16,22 +16,6 @@
  */
 package org.apache.rocketmq.ons.api;
 
-import java.util.Properties;
+public interface Admin extends LifeCycle, Credentials {
 
-
-public interface Admin {
-
-    boolean isStarted();
-
-
-    boolean isClosed();
-
-
-    void start();
-
-
-    void updateCredential(Properties credentialProperties);
-
-
-    void shutdown();
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java
index a592559..03d8f7c 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java
@@ -16,14 +16,11 @@
  */
 package org.apache.rocketmq.ons.api;
 
-
-public interface Consumer extends Admin {
+public interface Consumer extends LifeCycle, Credentials {
 
     void subscribe(final String topic, final String subExpression, final MessageListener listener);
 
-
     void subscribe(final String topic, final MessageSelector selector, final MessageListener listener);
 
-
     void unsubscribe(final String topic);
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Credentials.java
similarity index 78%
copy from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java
copy to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Credentials.java
index f3fe785..08a90c8 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Credentials.java
@@ -14,13 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.ons.api;
+import java.util.Properties;
 
-package org.apache.rocketmq.ons.api.transaction;
-
-import org.apache.rocketmq.ons.api.Message;
-
-
-public interface LocalTransactionExecuter {
-
-    TransactionStatus execute(final Message msg, final Object arg);
+public interface Credentials {
+    void updateCredential(Properties credentialProperties);
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
similarity index 79%
copy from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java
copy to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
index f3fe785..811411e 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
@@ -14,13 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.ons.api;
+public interface LifeCycle {
+    boolean isStarted();
 
-package org.apache.rocketmq.ons.api.transaction;
+    boolean isClosed();
 
-import org.apache.rocketmq.ons.api.Message;
+    void start();
 
-
-public interface LocalTransactionExecuter {
-
-    TransactionStatus execute(final Message msg, final Object arg);
+    void shutdown();
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java
index e45e613..3bd1a0d 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java
@@ -16,16 +16,10 @@
  */
 package org.apache.rocketmq.ons.api;
 
+import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 
-public interface Producer extends Admin {
-
-
-    @Override
-    void start();
-
-    @Override
-    void shutdown();
+public interface Producer extends LifeCycle, Credentials {
 
     SendResult send(final Message message);
 
@@ -34,4 +28,8 @@
     void sendAsync(final Message message, final SendCallback sendCallback);
 
     void setCallbackExecutor(final ExecutorService callbackExecutor);
+
+    SendResult send(final Message message, final String shardingKey);
+
+    SendResult send(final Collection<Message> messages);
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
new file mode 100644
index 0000000..d7f60a8
--- /dev/null
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.time.Duration;
+
+public interface PullConsumer {
+
+    interface TopicPartitionChangeListener {
+        /**
+         * This method will be invoked in the condition of partition numbers changed, These scenarios occur when the
+         * topic is expanded or shrunk.
+         *
+         * @param topicPartitions
+         */
+        void onChanged(Set<TopicPartition> topicPartitions);
+    }
+
+    /**
+     * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
+     * does not already have any metadata about the given topic.
+     *
+     * @param topic
+     * @return
+     */
+    Set<TopicPartition> topicPartitions(String topic);
+
+    /**
+     * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
+     * and will replace the previous assignment (if there is one).
+     *
+     * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
+     * assignment replaces the old one.
+     *
+     * @param topicPartitions
+     */
+    void assign(Collection<TopicPartition> topicPartitions);
+
+    /**
+     * Register a callback for sensing topic metadata changes.
+     *
+     * @param topic
+     * @param callback
+     */
+    void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
+
+    /**
+     * Fetch data for the topics or partitions specified using assign API. It is an error to not have subscribed to any
+     * topics or partitions before polling for data.
+     *
+     * @param timeout
+     * @return
+     */
+    List<Message> poll(Duration timeout);
+
+    /**
+     * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration)} }. If this API is invoked
+     * for the same message queue more than once, the latest offset will be used on the next poll(). Note that you may
+     * lose data if this API is arbitrarily used in the middle of consumption.
+     *
+     * @param topicPartition
+     * @param offset
+     */
+    void seek(TopicPartition topicPartition, long offset);
+
+    /**
+     * Overrides the fetch offsets with the beginning offset in server that the consumer will use on the next {@link
+     * #poll(Duration)} }.
+     *
+     * @param topicPartition
+     */
+    void seekToBeginning(TopicPartition topicPartition);
+
+    /**
+     * Overrides the fetch offsets with the end offset in server that the consumer will use on the next {@link
+     * #poll(Duration)} }.
+     *
+     * @param topicPartition
+     */
+    void seekToEnd(TopicPartition topicPartition);
+
+    /**
+     * Suspend fetching from the requested message queues. Future calls to {@link #poll(Duration)} will not return any
+     * records from these message queues until they have been resumed using {@link #resume(Collection)}.
+     *
+     * Note that this method does not affect message queue subscription. In particular, it does not cause a group
+     * rebalance.
+     *
+     * @param topicPartitions
+     */
+    void pause(Collection<TopicPartition> topicPartitions);
+
+    /**
+     * Resume specified message queues which have been paused with {@link #pause(Collection)}. New calls to {@link
+     * #poll(Duration)} will return records from these partitions if there are any to be fetched. If the message queues were
+     * not previously paused, this method is a no-op.
+     *
+     * @param topicPartitions
+     */
+    void resume(Collection<TopicPartition> topicPartitions);
+
+    /**
+     * Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
+     * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
+     * queue.
+     *
+     * @param topicPartition
+     * @param timestamp
+     * @return
+     */
+    Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);
+
+    /**
+     * Get the last committed offset for the given message queue (whether the commit happened by this process or
+     * another). This offset will be used as the position for the consumer in the event of a failure.
+     *
+     * @param topicPartition
+     * @return
+     */
+    Long committed(TopicPartition topicPartition);
+
+    /**
+     * Sync commit current consumed offset to server.
+     */
+    void commitSync();
+}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/TopicPartition.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/TopicPartition.java
new file mode 100644
index 0000000..82ff1b1
--- /dev/null
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/TopicPartition.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api;
+
+public class TopicPartition {
+    private String topic;
+
+    private String partition;
+
+    public TopicPartition(String topic, String partition) {
+        this.topic = topic;
+        this.partition = partition;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getPartition() {
+        return partition;
+    }
+
+    public void setPartition(String partition) {
+        this.partition = partition;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        result = prime * result + ((partition == null) ? 0 : partition.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        TopicPartition other = (TopicPartition) obj;
+        if (partition == null) {
+            if (other.partition != null) {
+                return false;
+            }
+        } else if (!partition.equals(other.partition)) {
+            return false;
+        }
+        if (topic == null) {
+            if (other.topic != null)
+                return false;
+        } else if (!topic.equals(other.topic)) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java
index 848a139..d5887bc 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java
@@ -17,9 +17,10 @@
 
 package org.apache.rocketmq.ons.api.batch;
 
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
 
-public interface BatchConsumer extends Admin {
+public interface BatchConsumer extends LifeCycle, Credentials {
 
     void subscribe(final String topic, final String subExpression, final BatchMessageListener listener);
 
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
index 2094158..1036253 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.ons.api.bean;
 
+import java.util.Collection;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import org.apache.rocketmq.ons.api.Message;
@@ -95,4 +96,12 @@
     public boolean isClosed() {
         return this.producer.isClosed();
     }
+
+    @Override public SendResult send(Message message, String shardingKey) {
+        return null;
+    }
+
+    @Override public SendResult send(Collection<Message> messages) {
+        return null;
+    }
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
index cb55e14..ddf513c 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
@@ -23,7 +23,7 @@
 import org.apache.rocketmq.ons.api.SendResult;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
 import org.apache.rocketmq.ons.api.transaction.LocalTransactionChecker;
-import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter;
+import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecutor;
 import org.apache.rocketmq.ons.api.transaction.TransactionProducer;
 
 
@@ -63,7 +63,7 @@
     }
 
     @Override
-    public SendResult send(Message message, LocalTransactionExecuter executer, Object arg) {
+    public SendResult send(Message message, LocalTransactionExecutor executer, Object arg) {
         return this.transactionProducer.send(message, executer, arg);
     }
 
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java
index 4e73c1b..09b9127 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java
@@ -17,11 +17,11 @@
 
 package org.apache.rocketmq.ons.api.order;
 
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
 import org.apache.rocketmq.ons.api.MessageSelector;
 
-
-public interface OrderConsumer extends Admin {
+public interface OrderConsumer extends LifeCycle, Credentials {
 
     @Override
     void start();
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java
index 4e9a0bb..5a401fc 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java
@@ -17,12 +17,16 @@
 
 package org.apache.rocketmq.ons.api.order;
 
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
 import org.apache.rocketmq.ons.api.Message;
+import org.apache.rocketmq.ons.api.Producer;
 import org.apache.rocketmq.ons.api.SendResult;
 
-
-public interface OrderProducer extends Admin {
+/**
+ * This interface will be removed in the year 2021, {@link Producer#send(Message, String)} is recommended
+ */
+public interface OrderProducer extends LifeCycle, Credentials {
 
     SendResult send(final Message message, final String shardingKey);
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
index eb46593..cb3aa0a 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
@@ -21,4 +21,6 @@
 public interface LocalTransactionChecker {
 
     TransactionStatus check(final Message msg);
+
+
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecutor.java
similarity index 88%
rename from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java
rename to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecutor.java
index f3fe785..fad6493 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecutor.java
@@ -20,7 +20,7 @@
 import org.apache.rocketmq.ons.api.Message;
 
 
-public interface LocalTransactionExecuter {
+public interface LocalTransactionExecutor {
 
-    TransactionStatus execute(final Message msg, final Object arg);
+    TransactionStatus execute(final Message message, final Object arg);
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java
index c139db1..f9878a4 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java
@@ -17,22 +17,14 @@
 
 package org.apache.rocketmq.ons.api.transaction;
 
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
 import org.apache.rocketmq.ons.api.Message;
 import org.apache.rocketmq.ons.api.SendResult;
 
-
-public interface TransactionProducer extends Admin {
-
-    @Override
-    void start();
-
-
-    @Override
-    void shutdown();
-
+public interface TransactionProducer extends LifeCycle, Credentials {
 
     SendResult send(final Message message,
-        final LocalTransactionExecuter executer,
+        final LocalTransactionExecutor executer,
         final Object arg);
 }
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
index cfbe212..2b6280a 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.ons.api.impl.rocketmq;
 
+import java.util.Collection;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
@@ -219,6 +220,14 @@
         return sendResult;
     }
 
+    @Override public SendResult send(Message message, String shardingKey) {
+        return null;
+    }
+
+    @Override public SendResult send(Collection<Message> messages) {
+        return null;
+    }
+
     private ONSClientException checkProducerException(String topic, String msgId, Throwable e) {
         if (e instanceof MQClientException) {
             if (e.getCause() != null) {
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
index 13ed7e4..abcf954 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
@@ -31,7 +31,7 @@
 import org.apache.rocketmq.ons.api.SendResult;
 import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl;
 import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
-import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter;
+import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecutor;
 import org.apache.rocketmq.ons.api.transaction.TransactionProducer;
 import org.apache.rocketmq.ons.api.transaction.TransactionStatus;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
@@ -123,7 +123,7 @@
     }
 
     @Override
-    public SendResult send(final Message message, final LocalTransactionExecuter executer, Object arg) {
+    public SendResult send(final Message message, final LocalTransactionExecutor executer, Object arg) {
         this.checkONSProducerServiceState(this.transactionMQProducer.getDefaultMQProducerImpl());
         org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);
         org.apache.rocketmq.client.producer.TransactionSendResult sendResultRMQ = null;
diff --git a/ons-core/pom.xml b/ons-core/pom.xml
index 27e4142..862c623 100644
--- a/ons-core/pom.xml
+++ b/ons-core/pom.xml
@@ -41,14 +41,12 @@
         <maven.jdoc.skip>true</maven.jdoc.skip>
         <downloadSources>true</downloadSources>
         <!-- compiler settings properties -->
-        <java_source_version>1.6</java_source_version>
-        <java_target_version>1.6</java_target_version>
+        <java_source_version>1.8</java_source_version>
+        <java_target_version>1.8</java_target_version>
         <file_encoding>UTF-8</file_encoding>
         <!-- Always use stable version of RocketMQ -->
         <rocketmq.version>4.5.1</rocketmq.version>
         <auth.version>${project.version}</auth.version>
-        <spring.version>4.1.2.RELEASE</spring.version>
-        <diamond.version>3.7.4</diamond.version>
     </properties>
     <build>
         <plugins>
diff --git a/ons-sample/pom.xml b/ons-sample/pom.xml
index 710b0af..667d6fe 100644
--- a/ons-sample/pom.xml
+++ b/ons-sample/pom.xml
@@ -31,12 +31,12 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>ons-api</artifactId>
-            <version>1.0.1-SNAPSHOT</version>
+            <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>ons-client</artifactId>
-            <version>1.0.1-SNAPSHOT</version>
+            <version>${project.version}</version>
         </dependency>
     </dependencies>
 
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
index 4d16d51..992bd3b 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
@@ -23,7 +23,7 @@
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.api.SendResult;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter;
+import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecutor;
 import org.apache.rocketmq.ons.api.transaction.TransactionProducer;
 import org.apache.rocketmq.ons.api.transaction.TransactionStatus;
 import org.apache.rocketmq.ons.sample.MQConfig;
@@ -44,12 +44,13 @@
 
         for (int i = 0; i < 10; i++) {
             try {
-                SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecuter() {
+                SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecutor() {
                     @Override
                     public TransactionStatus execute(Message msg, Object arg) {
                         System.out.printf("Execute local transaction and return TransactionStatus. %n");
                         return TransactionStatus.CommitTransaction;
                     }
+
                 }, null);
                 assert sendResult != null;
             } catch (ONSClientException e) {