Optimizing Codebase Style (#19)

* Make PullConsumer be consistent with others files

* Optimizing struct of examples

* Porcessing error gracefully
diff --git a/cmd/producer.go b/cmd/producer.go
deleted file mode 100644
index b67f50d..0000000
--- a/cmd/producer.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package main
-
-import (
-	"flag"
-	"fmt"
-	"github.com/apache/rocketmq-client-go/core"
-)
-
-var (
-	namesrvAddrs string
-	topic        string
-	body         string
-	groupID      string
-	keys         string
-)
-
-func init() {
-	flag.StringVar(&namesrvAddrs, "addr", "", "name server address")
-	flag.StringVar(&topic, "t", "", "topic name")
-	flag.StringVar(&groupID, "group", "", "producer group")
-	flag.StringVar(&body, "body", "", "message body")
-	flag.StringVar(&keys, "keys", "", "message keys")
-
-}
-
-func main() {
-	flag.Parse()
-	if namesrvAddrs == "" {
-		println("empty nameServer address")
-		return
-	}
-
-	if topic == "" {
-		println("empty topic")
-		return
-	}
-
-	if body == "" {
-		println("empty body")
-		return
-	}
-
-	if groupID == "" {
-		println("empty groupID")
-		return
-	}
-
-	cfg := &rocketmq.ProducerConfig{}
-	cfg.GroupID = groupID
-	cfg.NameServer = namesrvAddrs
-
-	producer, _ := rocketmq.NewProducer(cfg)
-	producer.Start()
-	defer producer.Shutdown()
-
-	result := producer.SendMessageSync(&rocketmq.Message{Topic: topic, Body: body, Keys: keys})
-	println(fmt.Sprintf("send message result: %s", result))
-}
diff --git a/core/api.go b/core/api.go
index 1ed6c82..8a6fdd3 100644
--- a/core/api.go
+++ b/core/api.go
@@ -32,7 +32,7 @@
 	LogC             *LogConfig
 }
 
-func (config *ClientConfig) string() string {
+func (config *ClientConfig) String() string {
 	// For security, don't print Credentials.
 	str := ""
 	str = strJoin(str, "GroupId", config.GroupID)
@@ -62,7 +62,7 @@
 }
 
 func (config *ProducerConfig) String() string {
-	str := "ProducerConfig=[" + config.ClientConfig.string()
+	str := "ProducerConfig=[" + config.ClientConfig.String()
 
 	if config.SendMsgTimeout > 0 {
 		str = strJoin(str, "SendMsgTimeout", config.SendMsgTimeout)
@@ -82,13 +82,17 @@
 type Producer interface {
 	baseAPI
 	// SendMessageSync send a message with sync
-	SendMessageSync(msg *Message) SendResult
+	SendMessageSync(msg *Message) (*SendResult, error)
 
 	// SendMessageOrderly send the message orderly
-	SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult
+	SendMessageOrderly(
+		msg *Message,
+		selector MessageQueueSelector,
+		arg interface{},
+		autoRetryTimes int) (*SendResult, error)
 
 	// SendMessageOneway send a message with oneway
-	SendMessageOneway(msg *Message)
+	SendMessageOneway(msg *Message) error
 }
 
 // NewPushConsumer create a new consumer with config.
@@ -124,7 +128,7 @@
 
 func (config *PushConsumerConfig) String() string {
 	// For security, don't print Credentials.
-	str := "PushConsumerConfig=[" + config.ClientConfig.string()
+	str := "PushConsumerConfig=[" + config.ClientConfig.String()
 
 	if config.ThreadCount > 0 {
 		str = strJoin(str, "ThreadCount", config.ThreadCount)
@@ -148,6 +152,15 @@
 	Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
 }
 
+// PullConsumerConfig the configuration for the pull consumer
+type PullConsumerConfig struct {
+	ClientConfig
+}
+
+func (config *PullConsumerConfig) String() string {
+	return "PushConsumerConfig=[" + config.ClientConfig.String() + "]"
+}
+
 // PullConsumer consumer pulling the message
 type PullConsumer interface {
 	baseAPI
@@ -176,7 +189,7 @@
 	Offset int64
 }
 
-func (result SendResult) String() string {
+func (result *SendResult) String() string {
 	return fmt.Sprintf("[status: %s, messageId: %s, offset: %d]", result.Status, result.MsgId, result.Offset)
 }
 
diff --git a/core/api_test.go b/core/api_test.go
index fc507f0..01f360b 100644
--- a/core/api_test.go
+++ b/core/api_test.go
@@ -37,8 +37,8 @@
 	pConfig.CompressLevel = 4
 	pConfig.MaxMessageSize = 1024
 
-	expect := "ProducerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: NameServerDomain, " +
-		"GroupId: testGroup, InstanceName: testProducer, " +
+	expect := "ProducerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, " +
+		"GroupName: producerGroupName, InstanceName: testProducer, " +
 		"LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, S" +
 		"endMsgTimeout: 30, CompressLevel: 4, MaxMessageSize: 1024, ]"
 	assert.Equal(t, expect, pConfig.String())
diff --git a/core/error.go b/core/error.go
new file mode 100644
index 0000000..b50b4b2
--- /dev/null
+++ b/core/error.go
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package rocketmq
+
+/*
+#include "rocketmq/CCommon.h"
+*/
+import "C"
+import "fmt"
+
+type rmqError int
+
+const (
+	NIL                        = rmqError(C.OK)
+	ErrNullPoint               = rmqError(C.NULL_POINTER)
+	ErrMallocFailed            = rmqError(C.MALLOC_FAILED)
+	ErrProducerStartFailed     = rmqError(C.PRODUCER_START_FAILED)
+	ErrSendSyncFailed          = rmqError(C.PRODUCER_SEND_SYNC_FAILED)
+	ErrSendOnewayFailed        = rmqError(C.PRODUCER_SEND_ONEWAY_FAILED)
+	ErrSendOrderlyFailed       = rmqError(C.PRODUCER_SEND_ORDERLY_FAILED)
+	ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_ERROR_CODE_START)
+	ErrPullConsumerStartFailed = rmqError(C.PULLCONSUMER_START_FAILED)
+	ErrFetchMQFailed           = rmqError(C.PULLCONSUMER_FETCH_MQ_FAILED)
+	ErrFetchMessageFailed      = rmqError(C.PULLCONSUMER_FETCH_MESSAGE_FAILED)
+)
+
+func (e rmqError) Error() string {
+	switch e {
+	case ErrNullPoint:
+		return "null point"
+	case ErrMallocFailed:
+		return "malloc memory failed"
+	case ErrProducerStartFailed:
+		return "start producer failed"
+	case ErrSendSyncFailed:
+		return "send message with sync failed"
+	case ErrSendOrderlyFailed:
+		return "send message with orderly failed"
+	case ErrSendOnewayFailed:
+		return "send message with oneway failed"
+	case ErrPushConsumerStartFailed:
+		return "start push-consumer failed"
+	case ErrPullConsumerStartFailed:
+		return "start pull-consumer failed"
+	case ErrFetchMQFailed:
+		return "fetch MessageQueue failed"
+	case ErrFetchMessageFailed:
+		return "fetch Message failed"
+	default:
+		return fmt.Sprintf("unknow error: %v", int(e))
+	}
+}
diff --git a/core/producer.go b/core/producer.go
index f402589..8ad3ea4 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -33,7 +33,6 @@
 import "C"
 import (
 	"errors"
-	"fmt"
 	log "github.com/sirupsen/logrus"
 	"unsafe"
 )
@@ -81,34 +80,34 @@
 	C.free(unsafe.Pointer(cs))
 
 	if cproduer == nil {
-		return nil, errors.New("create Producer failed, please check cpp logs for details")
+		return nil, errors.New("create Producer failed")
 	}
 
-	var code int
+	var err rmqError
 	if config.NameServer != "" {
 		cs = C.CString(config.NameServer)
-		code = int(C.SetProducerNameServerAddress(cproduer, cs))
+		err = rmqError(C.SetProducerNameServerAddress(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set NameServerAddress error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.NameServerDomain != "" {
 		cs = C.CString(config.NameServerDomain)
-		code = int(C.SetProducerNameServerDomain(cproduer, cs))
+		err = rmqError(C.SetProducerNameServerDomain(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set NameServerDomain error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.InstanceName != "" {
 		cs = C.CString(config.InstanceName)
-		code = int(C.SetProducerInstanceName(cproduer, cs))
+		err = rmqError(C.SetProducerInstanceName(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set InstanceName error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
@@ -116,53 +115,53 @@
 		ak := C.CString(config.Credentials.AccessKey)
 		sk := C.CString(config.Credentials.SecretKey)
 		ch := C.CString(config.Credentials.Channel)
-		code = int(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
+		err = rmqError(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
 
 		C.free(unsafe.Pointer(ak))
 		C.free(unsafe.Pointer(sk))
 		C.free(unsafe.Pointer(ch))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set Credentials error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.LogC != nil {
 		cs = C.CString(config.LogC.Path)
-		code = int(C.SetProducerLogPath(cproduer, cs))
+		err = rmqError(C.SetProducerLogPath(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set LogPath error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 
-		code = int(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set FileNumAndSize error, code is: %d", code)
+		err = rmqError(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		if err != NIL {
+			return nil, err
 		}
 
-		code = int(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set LogLevel error, code is: %d", code)
+		err = rmqError(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.SendMsgTimeout > 0 {
-		code = int(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set SendMsgTimeout error, code is: %d", code)
+		err = rmqError(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.CompressLevel > 0 {
-		code = int(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set CompressLevel error, code is: %d", code)
+		err = rmqError(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.MaxMessageSize > 0 {
-		code = int(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set MaxMessageSize error, code is: %d", code)
+		err = rmqError(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
@@ -181,77 +180,84 @@
 
 // Start the producer.
 func (p *defaultProducer) Start() error {
-	code := int(C.StartProducer(p.cproduer))
-	if code != 0 {
-		return fmt.Errorf("start producer error, error code is: %d", code)
+	err := rmqError(C.StartProducer(p.cproduer))
+	if err != NIL {
+		return err
 	}
 	return nil
 }
 
 // Shutdown the producer.
 func (p *defaultProducer) Shutdown() error {
-	code := int(C.ShutdownProducer(p.cproduer))
+	err := rmqError(C.ShutdownProducer(p.cproduer))
 
-	if code != 0 {
-		log.Warnf("shutdown producer error, error code is: %d", code)
+	if err != NIL {
+		return err
 	}
 
-	code = int(int(C.DestroyProducer(p.cproduer)))
-	if code != 0 {
-		log.Warnf("destroy producer error, error code is: %d", code)
+	err = rmqError(int(C.DestroyProducer(p.cproduer)))
+	if err != NIL {
+		return err
 	}
 
-	return nil
+	return err
 }
 
-func (p *defaultProducer) SendMessageSync(msg *Message) SendResult {
+func (p *defaultProducer) SendMessageSync(msg *Message) (*SendResult, error) {
 	cmsg := goMsgToC(msg)
 	defer C.DestroyMessage(cmsg)
 
 	var sr C.struct__SendResult_
-	code := int(C.SendMessageSync(p.cproduer, cmsg, &sr))
+	err := rmqError(C.SendMessageSync(p.cproduer, cmsg, &sr))
 
-	if code != 0 {
-		log.Warnf("send message error, error code is: %d", code)
+	if err != NIL {
+		log.Warnf("send message error, error is: %s", err.Error())
+		return nil, err
 	}
 
-	result := SendResult{}
+	result := &SendResult{}
 	result.Status = SendStatus(sr.sendStatus)
 	result.MsgId = C.GoString(&sr.msgId[0])
 	result.Offset = int64(sr.offset)
-	return result
+	return result, nil
 }
 
-func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult {
+func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) (*SendResult, error) {
 	cmsg := goMsgToC(msg)
+	defer C.DestroyMessage(cmsg)
 	key := selectors.put(&messageQueueSelectorWrapper{selector: selector, m: msg, arg: arg})
 
 	var sr C.struct__SendResult_
-	C.SendMessageOrderly(
+	err := rmqError(C.SendMessageOrderly(
 		p.cproduer,
 		cmsg,
 		(C.QueueSelectorCallback)(unsafe.Pointer(C.queueSelectorCallback_cgo)),
 		unsafe.Pointer(&key),
 		C.int(autoRetryTimes),
-		&sr,
-	)
-	C.DestroyMessage(cmsg)
-
-	return SendResult{
+		&sr))
+	
+	if err != NIL {
+		log.Warnf("send message orderly error, error is: %s", err.Error())
+		return nil, err
+	}
+	
+	return &SendResult{
 		Status: SendStatus(sr.sendStatus),
 		MsgId:  C.GoString(&sr.msgId[0]),
 		Offset: int64(sr.offset),
-	}
+	}, nil
 }
 
-func (p *defaultProducer) SendMessageOneway(msg *Message) {
+func (p *defaultProducer) SendMessageOneway(msg *Message) error {
 	cmsg := goMsgToC(msg)
 	defer C.DestroyMessage(cmsg)
 
-	code := int(C.SendMessageOneway(p.cproduer, cmsg))
-	if code != 0 {
-		log.Warnf("send message with oneway error, error code is: %d", code)
-	} else {
-		log.Debugf("Send Message: %s with oneway success.", msg.String())
+	err := rmqError(C.SendMessageOneway(p.cproduer, cmsg))
+	if err != NIL {
+		log.Warnf("send message with oneway error, error is: %s", err.Error())
+		return err
 	}
+	
+	log.Debugf("Send Message: %s with oneway success.", msg.String())
+	return nil
 }
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index fdf8d76..74e4a0b 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -63,19 +63,14 @@
 	}
 }
 
-// PullConsumerConfig the configuration for the pull consumer
-type PullConsumerConfig struct {
-	ClientConfig
-}
-
-// DefaultPullConsumer default consumer pulling the message
-type DefaultPullConsumer struct {
+// defaultPullConsumer default consumer pulling the message
+type defaultPullConsumer struct {
 	PullConsumerConfig
 	cconsumer *C.struct_CPullConsumer
 	funcsMap  sync.Map
 }
 
-func (c *DefaultPullConsumer) String() string {
+func (c *defaultPullConsumer) String() string {
 	topics := ""
 	c.funcsMap.Range(func(key, value interface{}) bool {
 		topics += key.(string) + ", "
@@ -84,75 +79,105 @@
 	return fmt.Sprintf("[%+v, subcribed topics: [%s]]", c.PullConsumerConfig, topics)
 }
 
-// NewPullConsumer creates one pull consumer
-func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
-	if conf == nil {
+// NewPullConsumer creates a pull consumer
+func NewPullConsumer(config *PullConsumerConfig) (PullConsumer, error) {
+	if config == nil {
 		return nil, errors.New("config is nil")
 	}
+	if config.GroupID == "" {
+		return nil, errors.New("GroupId is empty")
+	}
 
-	cs := C.CString(conf.GroupID)
+	if config.NameServer == "" && config.NameServerDomain == "" {
+		return nil, errors.New("NameServer and NameServerDomain is empty")
+	}
+
+	cs := C.CString(config.GroupID)
 	cconsumer := C.CreatePullConsumer(cs)
 	C.free(unsafe.Pointer(cs))
+	if cconsumer == nil {
+		return nil, errors.New("create PullConsumer failed")
+	}
 
-	cs = C.CString(conf.NameServer)
-	C.SetPullConsumerNameServerAddress(cconsumer, cs)
-	C.free(unsafe.Pointer(cs))
-
-	log := conf.LogC
-	if log != nil {
-		cs = C.CString(log.Path)
-		if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
-			return nil, errors.New("new pull consumer error:set log path failed")
-		}
+	var err rmqError
+	if config.NameServer != "" {
+		cs = C.CString(config.NameServer)
+		err = rmqError(C.SetPullConsumerNameServerAddress(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
-
-		if C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(log.FileNum), C.long(log.FileSize)) != 0 {
-			return nil, errors.New("new pull consumer error:set log file num and size failed")
-		}
-		if C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(log.Level)) != 0 {
-			return nil, errors.New("new pull consumer error:set log level failed")
+		if err != NIL {
+			return nil, err
 		}
 	}
 
-	if conf.Credentials != nil {
-		ak := C.CString(conf.Credentials.AccessKey)
-		sk := C.CString(conf.Credentials.SecretKey)
-		ch := C.CString(conf.Credentials.Channel)
-		C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch)
+	if config.NameServerDomain != "" {
+		cs = C.CString(config.NameServerDomain)
+		err = rmqError(C.SetPullConsumerNameServerDomain(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if err != NIL {
+			return nil, err
+		}
+	}
 
+	if config.Credentials != nil {
+		ak := C.CString(config.Credentials.AccessKey)
+		sk := C.CString(config.Credentials.SecretKey)
+		ch := C.CString(config.Credentials.Channel)
+		err = rmqError(C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch))
 		C.free(unsafe.Pointer(ak))
 		C.free(unsafe.Pointer(sk))
 		C.free(unsafe.Pointer(ch))
+		if err != NIL {
+			return nil, err
+		}
 	}
 
-	return &DefaultPullConsumer{PullConsumerConfig: *conf, cconsumer: cconsumer}, nil
+	if config.LogC != nil {
+		cs = C.CString(config.LogC.Path)
+		err = rmqError(C.SetPullConsumerLogPath(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if err != NIL {
+			return nil, err
+		}
+
+		err = rmqError(C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		if err != NIL {
+			return nil, err
+		}
+
+		err = rmqError(C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+		if err != NIL {
+			return nil, err
+		}
+	}
+
+	return &defaultPullConsumer{PullConsumerConfig: *config, cconsumer: cconsumer}, nil
 }
 
-// Start starts the pulling conumser
-func (c *DefaultPullConsumer) Start() error {
-	r := C.StartPullConsumer(c.cconsumer)
-	if r != 0 {
-		return fmt.Errorf("start failed, code:%d", int(r))
+// Start starts the pulling consumer
+func (c *defaultPullConsumer) Start() error {
+	err := rmqError(C.StartPullConsumer(c.cconsumer))
+	if err != NIL {
+		return err
 	}
 	return nil
 }
 
 // Shutdown shutdown the pulling consumer
-func (c *DefaultPullConsumer) Shutdown() error {
-	r := C.ShutdownPullConsumer(c.cconsumer)
-	if r != 0 {
-		return fmt.Errorf("shutdown failed, code:%d", int(r))
+func (c *defaultPullConsumer) Shutdown() error {
+	err := rmqError(C.ShutdownPullConsumer(c.cconsumer))
+	if err != NIL {
+		return err
 	}
 
-	r = C.DestroyPullConsumer(c.cconsumer)
-	if r != 0 {
-		return fmt.Errorf("destory failed, code:%d", int(r))
+	err = rmqError(C.DestroyPullConsumer(c.cconsumer))
+	if err != NIL {
+		return err
 	}
 	return nil
 }
 
 // FetchSubscriptionMessageQueues returns the topic's consume queue
-func (c *DefaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
+func (c *defaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
 	var (
 		q    *C.struct__CMessageQueue_
 		size C.int
@@ -191,7 +216,7 @@
 }
 
 // Pull pulling the message from the specified message queue
-func (c *DefaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
+func (c *defaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
 	cmq := C.struct__CMessageQueue_{
 		queueId: C.int(mq.ID),
 	}
diff --git a/core/push_consumer.go b/core/push_consumer.go
index c39dc09..e7ee1d7 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -92,32 +92,31 @@
 		return nil, errors.New("Create PushConsumer failed")
 	}
 
-	var code int
+	var err rmqError
 	if config.NameServer != "" {
 		cs = C.CString(config.NameServer)
-		code = int(C.SetPushConsumerNameServerAddress(cconsumer, cs))
+		err = rmqError(C.SetPushConsumerNameServerAddress(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set NameServerAddress error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.NameServerDomain != "" {
 		cs = C.CString(config.NameServerDomain)
-		code = int(C.SetPushConsumerNameServerDomain(cconsumer, cs))
+		err = rmqError(C.SetPushConsumerNameServerDomain(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set NameServerDomain error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.InstanceName != "" {
 		cs = C.CString(config.InstanceName)
-		code = int(C.SetPushConsumerInstanceName(cconsumer, cs))
+		err = rmqError(C.SetPushConsumerInstanceName(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set InstanceName error, code is: %d, "+
-				"please check cpp logs for details", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
@@ -125,45 +124,45 @@
 		ak := C.CString(config.Credentials.AccessKey)
 		sk := C.CString(config.Credentials.SecretKey)
 		ch := C.CString(config.Credentials.Channel)
-		code = int(C.SetPushConsumerSessionCredentials(cconsumer, ak, sk, ch))
+		err = rmqError(C.SetPushConsumerSessionCredentials(cconsumer, ak, sk, ch))
 		C.free(unsafe.Pointer(ak))
 		C.free(unsafe.Pointer(sk))
 		C.free(unsafe.Pointer(ch))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set Credentials error, code is: %d", int(code))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.LogC != nil {
 		cs = C.CString(config.LogC.Path)
-		code = int(C.SetPushConsumerLogPath(cconsumer, cs))
+		err = rmqError(C.SetPushConsumerLogPath(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set LogPath error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 
-		code = int(C.SetPushConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set FileNumAndSize error, code is: %d", code)
+		err = rmqError(C.SetPushConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		if err != NIL {
+			return nil, err
 		}
 
-		code = int(C.SetPushConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set LogLevel error, code is: %d", code)
+		err = rmqError(C.SetPushConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.ThreadCount > 0 {
-		code = int(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set ThreadCount error, code is: %d", int(code))
+		err = rmqError(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.MessageBatchMaxSize > 0 {
-		code = int(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set MessageBatchMaxSize error, code is: %d", int(code))
+		err = rmqError(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
@@ -175,18 +174,18 @@
 		case Clustering:
 			mode = C.CLUSTERING
 		}
-		code = int(C.SetPushConsumerMessageModel(cconsumer, mode))
+		err = rmqError(C.SetPushConsumerMessageModel(cconsumer, mode))
 
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set ConsumerMessageModel error, code is: %d", int(code))
+		if err != NIL {
+			return nil, err
 		}
 
 	}
 
-	code = int(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
+	err = rmqError(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
 
-	if code != 0 {
-		return nil, fmt.Errorf("PushConsumer RegisterMessageCallback error, code is: %d", int(code))
+	if err != NIL {
+		return nil, err
 	}
 
 	consumer.cconsumer = cconsumer
@@ -195,23 +194,23 @@
 }
 
 func (c *defaultPushConsumer) Start() error {
-	code := C.StartPushConsumer(c.cconsumer)
-	if code != 0 {
-		return fmt.Errorf("start PushConsumer error, code is: %d", int(code))
+	err := rmqError(C.StartPushConsumer(c.cconsumer))
+	if err != NIL {
+		return err
 	}
 	return nil
 }
 
 func (c *defaultPushConsumer) Shutdown() error {
-	code := C.ShutdownPushConsumer(c.cconsumer)
+	err := rmqError(C.ShutdownPushConsumer(c.cconsumer))
 
-	if code != 0 {
-		log.Warnf("Shutdown PushConsumer error, code is: %d, please check cpp logs for details", code)
+	if err != NIL {
+		return err
 	}
 
-	C.DestroyPushConsumer(c.cconsumer)
-	if code != 0 {
-		log.Warnf("Destroy PushConsumer error, code is: %d, please check cpp logs for details", code)
+	err = rmqError(C.DestroyPushConsumer(c.cconsumer))
+	if err != NIL {
+		return err
 	}
 	return nil
 }
@@ -221,9 +220,9 @@
 	if exist {
 		return nil
 	}
-	code := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
-	if code != 0 {
-		return fmt.Errorf("subscribe topic: %s failed, error code is: %d", topic, int(code))
+	err := rmqError(C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression)))
+	if err != NIL {
+		return err
 	}
 	c.funcsMap.Store(topic, consumeFunc)
 	log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
diff --git a/examples/main.go b/examples/main.go
new file mode 100644
index 0000000..72a2a68
--- /dev/null
+++ b/examples/main.go
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+	"github.com/apache/rocketmq-client-go/core"
+	"gopkg.in/alecthomas/kingpin.v2"
+	"os"
+)
+
+var (
+	rmq     = kingpin.New("rocketmq", "RocketMQ cmd tools")
+	namesrv = rmq.Flag("namesrv", "NameServer address.").Default("localhost:9876").Short('n').String()
+	topic   = rmq.Flag("topic", "topic name.").Short('t').Required().String()
+	gid     = rmq.Flag("groupId", "group Id").Short('g').Default("testGroup").String()
+	amount  = rmq.Flag("amount", "how many message to produce or consume").Default("64").Short('a').Int()
+
+	produce     = rmq.Command("produce", "send messages to RocketMQ")
+	body        = produce.Flag("body", "message body").Short('b').Required().String()
+	workerCount = produce.Flag("workerCount", "works of send message with orderly").Default("1").Short('w').Int()
+	orderly     = produce.Flag("orderly", "send msg orderly").Short('o').Bool()
+
+	consume = rmq.Command("consume", "consumes message from RocketMQ")
+)
+
+func main() {
+	switch kingpin.MustParse(rmq.Parse(os.Args[1:])) {
+	case produce.FullCommand():
+		pConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
+			GroupID:    *gid,
+			NameServer: *namesrv,
+			LogC: &rocketmq.LogConfig{
+				Path:     "example",
+				FileSize: 64 * 1 << 10,
+				FileNum:  1,
+				Level:    rocketmq.LogLevelDebug,
+			},
+		}}
+		if *orderly {
+			sendMessageOrderly(pConfig)
+		} else {
+			sendMessage(pConfig)
+		}
+	case consume.FullCommand():
+		cConfig := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
+			GroupID:    *gid,
+			NameServer: *namesrv,
+			LogC: &rocketmq.LogConfig{
+				Path:     "example",
+				FileSize: 64 * 1 << 10,
+				FileNum:  1,
+				Level:    rocketmq.LogLevelInfo,
+			},
+		}}
+
+		ConsumeWithPush(cConfig)
+	}
+}
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
deleted file mode 100644
index f3d70c7..0000000
--- a/examples/orderproducer/producer.go
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package main
-
-import (
-	"flag"
-	"fmt"
-	"sync"
-	"sync/atomic"
-
-	rocketmq "github.com/apache/rocketmq-client-go/core"
-)
-
-type queueSelectorByOrderID struct{}
-
-func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg interface{}) int {
-	return arg.(int) % size
-}
-
-var (
-	namesrvAddrs string
-	topic        string
-	body         string
-	groupID      string
-	msgCount     int64
-	workerCount  int
-)
-
-func init() {
-	flag.StringVar(&namesrvAddrs, "n", "", "name server address")
-	flag.StringVar(&topic, "t", "", "topic")
-	flag.StringVar(&groupID, "g", "", "group")
-	flag.StringVar(&body, "d", "", "body")
-	flag.Int64Var(&msgCount, "m", 0, "message count")
-	flag.IntVar(&workerCount, "w", 0, "worker count")
-}
-
-type worker struct {
-	p            rocketmq.Producer
-	leftMsgCount *int64
-}
-
-func (w *worker) run() {
-	selector := queueSelectorByOrderID{}
-	for atomic.AddInt64(w.leftMsgCount, -1) >= 0 {
-		r := w.p.SendMessageOrderly(
-			&rocketmq.Message{Topic: topic, Body: body}, selector, 7 /*orderID*/, 3,
-		)
-		fmt.Printf("send result:%+v\n", r)
-	}
-}
-
-// example:
-// ./producer -n "localhost:9876" -t local_test -g local_test -d data -m 100 -w 10
-func main() {
-	flag.Parse()
-
-	if namesrvAddrs == "" {
-		println("empty namesrv address")
-		return
-	}
-
-	if topic == "" {
-		println("empty topic")
-		return
-	}
-
-	if body == "" {
-		println("empty body")
-		return
-	}
-
-	if groupID == "" {
-		println("empty groupID")
-		return
-	}
-
-	if msgCount == 0 {
-		println("zero message count")
-		return
-	}
-
-	if workerCount == 0 {
-		println("zero worker count")
-		return
-	}
-
-	cfg := &rocketmq.ProducerConfig{}
-	cfg.GroupID = groupID
-	cfg.NameServer = namesrvAddrs
-
-	producer, err := rocketmq.NewProducer(cfg)
-	if err != nil {
-		fmt.Println("create Producer failed, error:", err)
-		return
-	}
-
-	producer.Start()
-	defer producer.Shutdown()
-
-	wg := sync.WaitGroup{}
-	wg.Add(workerCount)
-
-	workers := make([]worker, workerCount)
-	for i := range workers {
-		workers[i].p = producer
-		workers[i].leftMsgCount = &msgCount
-	}
-
-	for i := range workers {
-		go func(w *worker) { w.run(); wg.Done() }(&workers[i])
-	}
-
-	wg.Wait()
-}
diff --git a/examples/producer.go b/examples/producer.go
index 33eab07..01e8105 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -22,24 +22,29 @@
 	"github.com/apache/rocketmq-client-go/core"
 )
 
-func main() {
-	cfg := &rocketmq.ProducerConfig{}
-	cfg.GroupID = "testGroup"
-	cfg.NameServer = "localhost:9876"
-	producer, err := rocketmq.NewProducer(cfg)
+func sendMessage(config *rocketmq.ProducerConfig) {
+	producer, err := rocketmq.NewProducer(config)
+
 	if err != nil {
 		fmt.Println("create Producer failed, error:", err)
 		return
 	}
 
-	producer.Start()
+	err = producer.Start()
+	if err != nil {
+		fmt.Println("start producer error", err)
+		return
+	}
 	defer producer.Shutdown()
 
 	fmt.Printf("Producer: %s started... \n", producer)
-	for i := 0; i < 100; i++ {
-		msg := fmt.Sprintf("Hello RocketMQ-%d", i)
-		result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
-		fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
+	for i := 0; i < *amount; i++ {
+		msg := fmt.Sprintf("%s-%d", *body, i)
+		result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+		if err != nil {
+			fmt.Println("Error:", err)
+		}
+		fmt.Printf("send message: %s result: %s\n", msg, result)
 	}
 	fmt.Println("shutdown producer.")
 }
diff --git a/examples/producer_orderly.go b/examples/producer_orderly.go
new file mode 100644
index 0000000..9943f5b
--- /dev/null
+++ b/examples/producer_orderly.go
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package main
+
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+
+	"github.com/apache/rocketmq-client-go/core"
+)
+
+type queueSelectorByOrderID struct{}
+
+func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg interface{}) int {
+	return arg.(int) % size
+}
+
+type worker struct {
+	p            rocketmq.Producer
+	leftMsgCount int64
+}
+
+func (w *worker) run() {
+	selector := queueSelectorByOrderID{}
+	for atomic.AddInt64(&w.leftMsgCount, -1) >= 0 {
+		r, err := w.p.SendMessageOrderly(
+			&rocketmq.Message{Topic: *topic, Body: *body}, selector, 7 /*orderID*/, 3,
+		)
+		if err != nil {
+			println("Send Orderly Error:", err)
+		}
+		fmt.Printf("send orderly result:%+v\n", r)
+	}
+}
+
+func sendMessageOrderly(config *rocketmq.ProducerConfig) {
+	producer, err := rocketmq.NewProducer(config)
+	if err != nil {
+		fmt.Println("create Producer failed, error:", err)
+		return
+	}
+
+	producer.Start()
+	defer producer.Shutdown()
+
+	wg := sync.WaitGroup{}
+	wg.Add(*workerCount)
+
+	workers := make([]worker, *workerCount)
+	for i := range workers {
+		workers[i].p = producer
+		workers[i].leftMsgCount = (int64)(*amount)
+	}
+
+	for i := range workers {
+		go func(w *worker) { w.run(); wg.Done() }(&workers[i])
+	}
+
+	wg.Wait()
+}
diff --git a/examples/pullconsumer/consumer.go b/examples/pull_consumer.go
similarity index 69%
rename from examples/pullconsumer/consumer.go
rename to examples/pull_consumer.go
index d63729e..1b209c0 100644
--- a/examples/pullconsumer/consumer.go
+++ b/examples/pull_consumer.go
@@ -18,55 +18,15 @@
 package main
 
 import (
-	"flag"
 	"fmt"
 	"time"
 
-	rocketmq "github.com/apache/rocketmq-client-go/core"
+	"github.com/apache/rocketmq-client-go/core"
 )
 
-var (
-	namesrvAddrs string
-	topic        string
-	body         string
-	groupID      string
-	msgCount     int64
-	workerCount  int
-)
+func ConsumeWithPull(config *rocketmq.PullConsumerConfig, topic string) {
 
-func init() {
-	flag.StringVar(&namesrvAddrs, "n", "", "name server address")
-	flag.StringVar(&topic, "t", "", "topic")
-	flag.StringVar(&groupID, "g", "", "group")
-}
-
-// ./consumer -n "localhost:9876" -t test -g local_test
-func main() {
-	flag.Parse()
-
-	if namesrvAddrs == "" {
-		fmt.Println("empty namesrv")
-		return
-	}
-
-	if topic == "" {
-		fmt.Println("empty topic")
-		return
-	}
-
-	if groupID == "" {
-		fmt.Println("empty groupID")
-		return
-	}
-
-	cfg := &rocketmq.PullConsumerConfig{}
-	cfg.GroupID = groupID
-	cfg.NameServer = namesrvAddrs
-	cfg.LogC = &rocketmq.LogConfig{
-		Path: "example",
-	}
-
-	consumer, err := rocketmq.NewPullConsumer(cfg)
+	consumer, err := rocketmq.NewPullConsumer(config)
 	if err != nil {
 		fmt.Printf("new pull consumer error:%s\n", err)
 		return
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
index d17559c..723e32f 100644
--- a/examples/push_consumer.go
+++ b/examples/push_consumer.go
@@ -20,31 +20,40 @@
 import (
 	"fmt"
 	"github.com/apache/rocketmq-client-go/core"
-	"time"
+	"sync/atomic"
 )
 
-func main() {
-	fmt.Println("Start Receiving Messages...")
-	cfg := &rocketmq.PushConsumerConfig{
-		ThreadCount:         2,
-		MessageBatchMaxSize: 16,
-	}
-	cfg.GroupID = "testGroupId"
-	cfg.NameServer = "localhost:9876"
-	consumer, err := rocketmq.NewPushConsumer(cfg)
+func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
+
+	consumer, err := rocketmq.NewPushConsumer(config)
 	if err != nil {
-		fmt.Println("create Consumer failed, error:", err)
+		println("create Consumer failed, error:", err)
 		return
 	}
 
+	ch := make(chan interface{})
+	var count = (int64)(*amount)
 	// MUST subscribe topic before consumer started.
 	consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
 		fmt.Printf("A message received: \"%s\" \n", msg.Body)
+		if atomic.AddInt64(&count, -1) <= 0 {
+			ch <- "quit"
+		}
 		return rocketmq.ConsumeSuccess
 	})
 
-	consumer.Start()
-	defer consumer.Shutdown()
+	err = consumer.Start()
+	if err != nil {
+		println("consumer start failed,", err)
+		return
+	}
+
 	fmt.Printf("consumer: %s started...\n", consumer)
-	time.Sleep(10 * time.Minute)
+	<-ch
+	err = consumer.Shutdown()
+	if err != nil {
+		println("consumer shutdown failed")
+		return
+	}
+	println("consumer has shutdown.")
 }