Merge pull request #12 from wenfengwang/master

More Configurable fields and mores
diff --git a/cmd/producer.go b/cmd/producer.go
new file mode 100644
index 0000000..b67f50d
--- /dev/null
+++ b/cmd/producer.go
@@ -0,0 +1,58 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"github.com/apache/rocketmq-client-go/core"
+)
+
+var (
+	namesrvAddrs string
+	topic        string
+	body         string
+	groupID      string
+	keys         string
+)
+
+func init() {
+	flag.StringVar(&namesrvAddrs, "addr", "", "name server address")
+	flag.StringVar(&topic, "t", "", "topic name")
+	flag.StringVar(&groupID, "group", "", "producer group")
+	flag.StringVar(&body, "body", "", "message body")
+	flag.StringVar(&keys, "keys", "", "message keys")
+
+}
+
+func main() {
+	flag.Parse()
+	if namesrvAddrs == "" {
+		println("empty nameServer address")
+		return
+	}
+
+	if topic == "" {
+		println("empty topic")
+		return
+	}
+
+	if body == "" {
+		println("empty body")
+		return
+	}
+
+	if groupID == "" {
+		println("empty groupID")
+		return
+	}
+
+	cfg := &rocketmq.ProducerConfig{}
+	cfg.GroupID = groupID
+	cfg.NameServer = namesrvAddrs
+
+	producer, _ := rocketmq.NewProducer(cfg)
+	producer.Start()
+	defer producer.Shutdown()
+
+	result := producer.SendMessageSync(&rocketmq.Message{Topic: topic, Body: body, Keys: keys})
+	println(fmt.Sprintf("send message result: %s", result))
+}
diff --git a/core/api.go b/core/api.go
index bf38a78..8ad6af0 100644
--- a/core/api.go
+++ b/core/api.go
@@ -22,21 +22,35 @@
 	return GetVersion()
 }
 
+type clientConfig struct {
+	GroupID          string
+	NameServer       string
+	NameServerDomain string
+	GroupName        string
+	InstanceName     string
+	Credentials      *SessionCredentials
+	LogC             *LogConfig
+}
+
 // NewProducer create a new producer with config
-func NewProducer(config *ProducerConfig) Producer {
+func NewProducer(config *ProducerConfig) (Producer, error) {
 	return newDefaultProducer(config)
 }
 
 // ProducerConfig define a producer
 type ProducerConfig struct {
-	GroupID     string
-	NameServer  string
-	Credentials *SessionCredentials
+	clientConfig
+	SendMsgTimeout int
+	CompressLevel  int
+	MaxMessageSize int
 }
 
 func (config *ProducerConfig) String() string {
 	// For security, don't print Credentials default.
-	return fmt.Sprintf("[groupId: %s, nameServer: %s]", config.NameServer, config.GroupID)
+	return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, NameServer: %s, "+
+		"SendMsgTimeout: %d, CompressLevel: %d, MaxMessageSize: %d, ]", config.NameServer, config.GroupID,
+		config.NameServerDomain, config.GroupName, config.InstanceName, config.SendMsgTimeout, config.CompressLevel,
+		config.MaxMessageSize)
 }
 
 type Producer interface {
@@ -52,27 +66,27 @@
 }
 
 // NewPushConsumer create a new consumer with config.
-func NewPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+func NewPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 	return newPushConsumer(config)
 }
 
-// ConsumerConfig define a new consumer.
-type ConsumerConfig struct {
-	GroupID             string
-	NameServer          string
-	ConsumerThreadCount int
+// PushConsumerConfig define a new consumer.
+type PushConsumerConfig struct {
+	clientConfig
+	ThreadCount         int
 	MessageBatchMaxSize int
-	//ConsumerInstanceName int
-	Credentials *SessionCredentials
+	Model               MessageModel
 }
 
-func (config *ConsumerConfig) String() string {
-	return fmt.Sprintf("[groupId: %s, nameServer: %s, consumerThreadCount: %d, messageBatchMaxSize: %d]",
-		config.GroupID, config.NameServer, config.ConsumerThreadCount, config.MessageBatchMaxSize)
+func (config *PushConsumerConfig) String() string {
+	return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, "+
+		"ThreadCount: %d, MessageBatchMaxSize: %d, Model: %v ]", config.NameServer, config.GroupID,
+		config.NameServerDomain, config.InstanceName, config.ThreadCount, config.MessageBatchMaxSize, config.Model)
 }
 
 type PushConsumer interface {
 	baseAPI
+
 	// Subscribe a new topic with specify filter expression and consume function.
 	Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
 }
@@ -80,8 +94,10 @@
 // PullConsumer consumer pulling the message
 type PullConsumer interface {
 	baseAPI
+
 	// Pull returns the messages from the consume queue by specify the offset and the max number
 	Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult
+
 	// FetchSubscriptionMessageQueues returns the consume queue of the topic
 	FetchSubscriptionMessageQueues(topic string) []MessageQueue
 }
@@ -105,7 +121,6 @@
 
 func (result SendResult) String() string {
 	return fmt.Sprintf("[status: %s, messageId: %s, offset: %d]", result.Status, result.MsgId, result.Offset)
-
 }
 
 type baseAPI interface {
diff --git a/core/cfuns.go b/core/cfuns.go
index 7fe4ffe..5b4cba9 100644
--- a/core/cfuns.go
+++ b/core/cfuns.go
@@ -36,6 +36,7 @@
 	}
 
 	msgExt := cmsgExtToGo(msg)
+	//C.DestroyMessageExt(msg)
 	cfunc, exist := consumer.(*defaultPushConsumer).funcsMap.Load(msgExt.Topic)
 	if !exist {
 		return C.int(ReConsumeLater)
diff --git a/core/log.go b/core/log.go
index e081e4c..5ff244e 100644
--- a/core/log.go
+++ b/core/log.go
@@ -37,6 +37,27 @@
 	LogLevelNum   = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM)
 )
 
+func (l LogLevel) String() string {
+	switch l {
+	case LogLevelFatal:
+		return "Fatal"
+	case LogLevelError:
+		return "Error"
+	case LogLevelWarn:
+		return "Warn"
+	case LogLevelInfo:
+		return "Info"
+	case LogLevelDebug:
+		return "Debug"
+	case LogLevelTrace:
+		return "Trace"
+	case LogLevelNum:
+		return "Num"
+	default:
+		return "Unkonw"
+	}
+}
+
 // LogConfig the log configuration for the pull consumer
 type LogConfig struct {
 	Path     string
diff --git a/core/log_test.go b/core/log_test.go
new file mode 100644
index 0000000..8c4a449
--- /dev/null
+++ b/core/log_test.go
@@ -0,0 +1,22 @@
+package rocketmq
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func TestLogConfig_String(t *testing.T) {
+	logc := LogConfig{Path: "/log/path1", FileNum: 3, FileSize: 1 << 20, Level: LogLevelDebug}
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Debug}", logc.String())
+	logc.Level = LogLevelFatal
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Fatal}", logc.String())
+	logc.Level = LogLevelError
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Error}", logc.String())
+	logc.Level = LogLevelWarn
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Warn}", logc.String())
+	logc.Level = LogLevelInfo
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Info}", logc.String())
+	logc.Level = LogLevelTrace
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Trace}", logc.String())
+	logc.Level = LogLevelError
+}
diff --git a/core/message.go b/core/message.go
index 98dc6cb..8e75d18 100644
--- a/core/message.go
+++ b/core/message.go
@@ -16,80 +16,105 @@
  */
 package rocketmq
 
-//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
-//#include "rocketmq/CMessage.h"
-//#include "rocketmq/CMessageExt.h"
+/*
+#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+#include "rocketmq/CMessage.h"
+#include "rocketmq/CMessageExt.h"
+#include <stdlib.h>
+*/
 import "C"
-import "fmt"
+import (
+	"fmt"
+	"unsafe"
+)
 
 type Message struct {
-	Topic string
-	Keys  string
-	// improve: maybe []byte is better.
-	Body string
+	Topic          string
+	Tags           string
+	Keys           string
+	Body           string
+	DelayTimeLevel int
+	Property       map[string]string
 }
 
 func (msg *Message) String() string {
-	return fmt.Sprintf("[topic: %s, keys: %s, body: %s]", msg.Topic, msg.Keys, msg.Body)
+	return fmt.Sprintf("[Topic: %s, Tags: %s, Keys: %s, Body: %s, DelayTimeLevel: %d, Property: %v]",
+		msg.Topic, msg.Tags, msg.Keys, msg.Body, msg.DelayTimeLevel, msg.Property)
+}
+
+func goMsgToC(gomsg *Message) *C.struct_CMessage {
+	cs := C.CString(gomsg.Topic)
+	var cmsg = C.CreateMessage(cs)
+	C.free(unsafe.Pointer(cs))
+
+	cs = C.CString(gomsg.Tags)
+	C.SetMessageTags(cmsg, cs)
+	C.free(unsafe.Pointer(cs))
+
+	cs = C.CString(gomsg.Keys)
+	C.SetMessageKeys(cmsg, cs)
+	C.free(unsafe.Pointer(cs))
+
+	cs = C.CString(gomsg.Body)
+	C.SetMessageBody(cmsg, cs)
+	C.free(unsafe.Pointer(cs))
+
+	C.SetDelayTimeLevel(cmsg, C.int(gomsg.DelayTimeLevel))
+
+	for k, v := range gomsg.Property {
+		key := C.CString(k)
+		value := C.CString(v)
+		C.SetMessageProperty(cmsg, key, value)
+		C.free(unsafe.Pointer(key))
+		C.free(unsafe.Pointer(value))
+	}
+	return cmsg
 }
 
 type MessageExt struct {
 	Message
-	MessageID string
-	Tags      string
+	MessageID                 string
+	QueueId                   int
+	ReconsumeTimes            int
+	StoreSize                 int
+	BornTimestamp             int64
+	StoreTimestamp            int64
+	QueueOffset               int64
+	CommitLogOffset           int64
+	PreparedTransactionOffset int64
+
 	// improve: is there is a method convert c++ map to go variable?
 	cmsgExt *C.struct_CMessageExt
-	//Properties  string
 }
 
 func (msgExt *MessageExt) String() string {
-	return fmt.Sprintf("[messageId: %s, %s, Tags: %s]", msgExt.MessageID, msgExt.Message, msgExt.Tags)
+	return fmt.Sprintf("[MessageId: %s, %s, QueueId: %d, ReconsumeTimes: %d, StoreSize: %d, BornTimestamp: %d, "+
+		"StoreTimestamp: %d, QueueOffset: %d, CommitLogOffset: %d, PreparedTransactionOffset: %d]", msgExt.MessageID,
+		msgExt.Message.String(), msgExt.QueueId, msgExt.ReconsumeTimes, msgExt.StoreSize, msgExt.BornTimestamp,
+		msgExt.StoreTimestamp, msgExt.QueueOffset, msgExt.CommitLogOffset, msgExt.PreparedTransactionOffset)
 }
 
 func (msgExt *MessageExt) GetProperty(key string) string {
 	return C.GoString(C.GetMessageProperty(msgExt.cmsgExt, C.CString(key)))
 }
 
-func cmsgToGo(cmsg *C.struct_CMessage) *Message {
-	defer C.DestroyMessage(cmsg)
-	gomsg := &Message{}
-
-	return gomsg
-}
-
-func goMsgToC(gomsg *Message) *C.struct_CMessage {
-	var cmsg = C.CreateMessage(C.CString(gomsg.Topic))
-
-	// int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
-	C.SetMessageKeys(cmsg, C.CString(gomsg.Keys))
-
-	// int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
-	C.SetMessageBody(cmsg, C.CString(gomsg.Body))
-	return cmsg
-}
-
-//
 func cmsgExtToGo(cmsg *C.struct_CMessageExt) *MessageExt {
-	//defer C.DestroyMessageExt(cmsg)
-	gomsg := &MessageExt{}
+	gomsg := &MessageExt{cmsgExt: cmsg}
 
 	gomsg.Topic = C.GoString(C.GetMessageTopic(cmsg))
-	gomsg.Body = C.GoString(C.GetMessageBody(cmsg))
-	gomsg.Keys = C.GoString(C.GetMessageKeys(cmsg))
 	gomsg.Tags = C.GoString(C.GetMessageTags(cmsg))
+	gomsg.Keys = C.GoString(C.GetMessageKeys(cmsg))
+	gomsg.Body = C.GoString(C.GetMessageBody(cmsg))
 	gomsg.MessageID = C.GoString(C.GetMessageId(cmsg))
+	gomsg.DelayTimeLevel = int(C.GetMessageDelayTimeLevel(cmsg))
+	gomsg.QueueId = int(C.GetMessageQueueId(cmsg))
+	gomsg.ReconsumeTimes = int(C.GetMessageReconsumeTimes(cmsg))
+	gomsg.StoreSize = int(C.GetMessageStoreSize(cmsg))
+	gomsg.BornTimestamp = int64(C.GetMessageBornTimestamp(cmsg))
+	gomsg.StoreTimestamp = int64(C.GetMessageStoreTimestamp(cmsg))
+	gomsg.QueueOffset = int64(C.GetMessageQueueOffset(cmsg))
+	gomsg.CommitLogOffset = int64(C.GetMessageCommitLogOffset(cmsg))
+	gomsg.PreparedTransactionOffset = int64(C.GetMessagePreparedTransactionOffset(cmsg))
 
 	return gomsg
 }
-
-//
-//func goMsgExtToC(gomsg *MessageExt) *C.struct_CMessageExt {
-//	var cmsg = C.CreateMessage(C.CString(gomsg.Topic))
-//
-//	// int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
-//	C.SetMessageKeys(cmsg, C.CString(gomsg.Keys))
-//
-//	// int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
-//	C.SetMessageBody(cmsg, C.CString(gomsg.Body))
-//	return cmsg
-//}
diff --git a/core/message_test.go b/core/message_test.go
index 1d1f2a8..1df6ade 100644
--- a/core/message_test.go
+++ b/core/message_test.go
@@ -17,12 +17,44 @@
 package rocketmq
 
 import (
+	"github.com/stretchr/testify/assert"
 	"testing"
 )
 
-func TestGetMessageTopic(test *testing.T) {
-	//fmt.Println("-----TestGetMessageTopic Start----")
-	//msg := rocketmq.CreateMessage("testTopic")
-	//rocketmq.DestroyMessage(msg)
-	//fmt.Println("-----TestGetMessageTopic Finish----")
+func TestMessage_String(t *testing.T) {
+	msg := Message{
+		Topic:          "testTopic",
+		Tags:           "TagA, TagB",
+		Keys:           "Key1, Key2",
+		Body:           "Body1234567890",
+		DelayTimeLevel: 8}
+	expect := "[Topic: testTopic, Tags: TagA, TagB, Keys: Key1, Key2, Body: Body1234567890, DelayTimeLevel: 8," +
+		" Property: map[]]"
+	assert.Equal(t, expect, msg.String())
+}
+
+func TestMessageExt_String(t *testing.T) {
+	msg := Message{
+		Topic:          "testTopic",
+		Tags:           "TagA, TagB",
+		Keys:           "Key1, Key2",
+		Body:           "Body1234567890",
+		DelayTimeLevel: 8}
+	msgExt := MessageExt{
+		Message:                   msg,
+		MessageID:                 "messageId",
+		QueueId:                   2,
+		ReconsumeTimes:            13,
+		StoreSize:                 1 << 10,
+		BornTimestamp:             int64(1234567890897),
+		StoreTimestamp:            int64(1234567890),
+		QueueOffset:               int64(1234567890),
+		CommitLogOffset:           int64(1234567890),
+		PreparedTransactionOffset: int64(1234567890),
+	}
+	expect := "[MessageId: messageId, [Topic: testTopic, Tags: TagA, TagB, Keys: Key1, Key2, " +
+		"Body: Body1234567890, DelayTimeLevel: 8, Property: map[]], QueueId: 2, ReconsumeTimes: " +
+		"13, StoreSize: 1024, BornTimestamp: 1234567890897, StoreTimestamp: 1234567890, QueueOffset: 1234567890," +
+		" CommitLogOffset: 1234567890, PreparedTransactionOffset: 1234567890]"
+	assert.Equal(t, expect, msgExt.String())
 }
diff --git a/core/producer.go b/core/producer.go
index 3cccf21..77ed63b 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -20,6 +20,7 @@
 #cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
 
 #include <stdio.h>
+#include <stdlib.h>
 #include "rocketmq/CMessage.h"
 #include "rocketmq/CProducer.h"
 #include "rocketmq/CSendResult.h"
@@ -31,7 +32,9 @@
 */
 import "C"
 import (
+	"errors"
 	"fmt"
+	log "github.com/sirupsen/logrus"
 	"unsafe"
 )
 
@@ -59,21 +62,115 @@
 	}
 }
 
-func newDefaultProducer(config *ProducerConfig) *defaultProducer {
-	producer := &defaultProducer{config: config}
-	producer.cproduer = C.CreateProducer(C.CString(config.GroupID))
-	code := int(C.SetProducerNameServerAddress(producer.cproduer, C.CString(producer.config.NameServer)))
-	if config.Credentials != nil {
-		ret := C.SetProducerSessionCredentials(producer.cproduer,
-			C.CString(config.Credentials.AccessKey),
-			C.CString(config.Credentials.SecretKey),
-			C.CString(config.Credentials.Channel))
-		code = int(ret)
+func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
+	if config == nil {
+		return nil, errors.New("config is nil")
 	}
-	switch code {
 
+	if config.GroupID == "" {
+		return nil, errors.New("GroupId is empty")
 	}
-	return producer
+
+	if config.NameServer == "" && config.NameServerDomain == "" {
+		return nil, errors.New("NameServer and NameServerDomain is empty")
+	}
+
+	producer := &defaultProducer{config: config}
+	cs := C.CString(config.GroupID)
+	cproduer := C.CreateProducer(cs)
+	C.free(unsafe.Pointer(cs))
+
+	if cproduer == nil {
+		return nil, errors.New("create Producer failed, please check cpp logs for details")
+	}
+
+	var code int
+	if config.NameServer != "" {
+		cs = C.CString(config.NameServer)
+		code = int(C.SetProducerNameServerAddress(cproduer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set NameServerAddress error, code is: %d"+
+				"please check cpp logs for details", code))
+		}
+	}
+
+	if config.NameServerDomain != "" {
+		cs = C.CString(config.NameServerDomain)
+		code = int(C.SetProducerNameServerDomain(cproduer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set NameServerDomain error, code is: %d"+
+				"please check cpp logs for details", code))
+		}
+	}
+
+	if config.InstanceName != "" {
+		cs = C.CString(config.InstanceName)
+		code = int(C.SetProducerInstanceName(cproduer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set InstanceName error, code is: %d"+
+				"please check cpp logs for details", code))
+		}
+	}
+
+	if config.Credentials != nil {
+		ak := C.CString(config.Credentials.AccessKey)
+		sk := C.CString(config.Credentials.SecretKey)
+		ch := C.CString(config.Credentials.Channel)
+		code = int(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
+
+		C.free(unsafe.Pointer(ak))
+		C.free(unsafe.Pointer(sk))
+		C.free(unsafe.Pointer(ch))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set Credentials error, code is: %d", code))
+		}
+	}
+
+	if config.LogC != nil {
+		cs = C.CString(config.LogC.Path)
+		code = int(C.SetProducerLogPath(cproduer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
+		}
+
+		code = int(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
+		}
+
+		code = int(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
+		}
+	}
+
+	if config.SendMsgTimeout > 0 {
+		code = int(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set SendMsgTimeout error, code is: %d", code))
+		}
+	}
+
+	if config.CompressLevel > 0 {
+		code = int(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set CompressLevel error, code is: %d", code))
+		}
+	}
+
+	if config.MaxMessageSize > 0 {
+		code = int(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set MaxMessageSize error, code is: %d", code))
+		}
+	}
+
+	producer.cproduer = cproduer
+	return producer, nil
 }
 
 type defaultProducer struct {
@@ -87,19 +184,26 @@
 
 // Start the producer.
 func (p *defaultProducer) Start() error {
-	err := int(C.StartProducer(p.cproduer))
-	// TODO How to process err code.
-	fmt.Printf("producer start result: %v \n", err)
+	code := int(C.StartProducer(p.cproduer))
+	if code != 0 {
+		return errors.New(fmt.Sprintf("start producer error, error code is: %d", code))
+	}
 	return nil
 }
 
 // Shutdown the producer.
 func (p *defaultProducer) Shutdown() error {
-	defer C.DestroyProducer(p.cproduer)
-	err := C.ShutdownProducer(p.cproduer)
+	code := int(C.ShutdownProducer(p.cproduer))
 
-	// TODO How to process err code.
-	fmt.Printf("shutdown result: %v \n", err)
+	if code != 0 {
+		log.Warnf("shutdown producer error, error code is: %d", code)
+	}
+
+	code = int(int(C.DestroyProducer(p.cproduer)))
+	if code != 0 {
+		log.Warnf("destroy producer error, error code is: %d", code)
+	}
+
 	return nil
 }
 
@@ -108,7 +212,11 @@
 	defer C.DestroyMessage(cmsg)
 
 	var sr C.struct__SendResult_
-	C.SendMessageSync(p.cproduer, cmsg, &sr)
+	code := int(C.SendMessageSync(p.cproduer, cmsg, &sr))
+
+	if code != 0 {
+		log.Warnf("send message error, error code is: %d", code)
+	}
 
 	result := SendResult{}
 	result.Status = SendStatus(sr.sendStatus)
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index 19a033c..6dcac43 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -65,10 +65,7 @@
 
 // PullConsumerConfig the configuration for the pull consumer
 type PullConsumerConfig struct {
-	GroupID     string
-	NameServer  string
-	Credentials *SessionCredentials
-	Log         *LogConfig
+	clientConfig
 }
 
 // DefaultPullConsumer default consumer pulling the message
@@ -89,6 +86,10 @@
 
 // NewPullConsumer creates one pull consumer
 func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
+	if conf == nil {
+		return nil, errors.New("config is nil")
+	}
+
 	cs := C.CString(conf.GroupID)
 	cconsumer := C.CreatePullConsumer(cs)
 	C.free(unsafe.Pointer(cs))
@@ -97,7 +98,7 @@
 	C.SetPullConsumerNameServerAddress(cconsumer, cs)
 	C.free(unsafe.Pointer(cs))
 
-	log := conf.Log
+	log := conf.LogC
 	if log != nil {
 		cs = C.CString(log.Path)
 		if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
@@ -131,21 +132,21 @@
 func (c *DefaultPullConsumer) Start() error {
 	r := C.StartPullConsumer(c.cconsumer)
 	if r != 0 {
-		return fmt.Errorf("start failed, code:%d", r)
+		return fmt.Errorf("start failed, code:%d", int(r))
 	}
 	return nil
 }
 
-// Shutdown shutdown the pulling conumser
+// Shutdown shutdown the pulling consumer
 func (c *DefaultPullConsumer) Shutdown() error {
 	r := C.ShutdownPullConsumer(c.cconsumer)
 	if r != 0 {
-		return fmt.Errorf("shutdown failed, code:%d", r)
+		return fmt.Errorf("shutdown failed, code:%d", int(r))
 	}
 
 	r = C.DestroyPullConsumer(c.cconsumer)
 	if r != 0 {
-		return fmt.Errorf("destory failed, code:%d", r)
+		return fmt.Errorf("destory failed, code:%d", int(r))
 	}
 	return nil
 }
diff --git a/core/push_consumer.go b/core/push_consumer.go
index e431d95..5f63b16 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -18,9 +18,9 @@
 
 /*
 #cgo LDFLAGS: -L/usr/local/lib -lrocketmq
+#include <stdlib.h>
 #include "rocketmq/CMessageExt.h"
 #include "rocketmq/CPushConsumer.h"
-#include "stdio.h"
 
 extern int consumeMessageCallback(CPushConsumer *consumer, CMessageExt *msg);
 
@@ -32,10 +32,14 @@
 
 import (
 	"fmt"
+	"github.com/pkg/errors"
+	"github.com/prometheus/common/log"
 	"sync"
 	"unsafe"
 )
 
+type MessageModel C.CMessageModel
+
 type ConsumeStatus int
 
 const (
@@ -55,7 +59,7 @@
 }
 
 type defaultPushConsumer struct {
-	config    *ConsumerConfig
+	config    *PushConsumerConfig
 	cconsumer *C.struct_CPushConsumer
 	funcsMap  sync.Map
 }
@@ -69,18 +73,112 @@
 	return fmt.Sprintf("[%s, subcribed topics: [%s]]", c.config, topics)
 }
 
-func newPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
+	if config == nil {
+		return nil, errors.New("config is nil")
+	}
+	if config.GroupID == "" {
+		return nil, errors.New("GroupId is empty.")
+	}
+
+	if config.NameServer == "" && config.NameServerDomain == "" {
+		return nil, errors.New("NameServer and NameServerDomain is empty.")
+	}
+
 	consumer := &defaultPushConsumer{config: config}
-	cconsumer := C.CreatePushConsumer(C.CString(config.GroupID))
-	C.SetPushConsumerNameServerAddress(cconsumer, C.CString(config.NameServer))
-	C.SetPushConsumerThreadCount(cconsumer, C.int(config.ConsumerThreadCount))
-	C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.ConsumerThreadCount))
-	C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo)))
+	cs := C.CString(config.GroupID)
+	cconsumer := C.CreatePushConsumer(cs)
+	C.free(unsafe.Pointer(cs))
+
+	if cconsumer == nil {
+		return nil, errors.New("Create PushConsumer failed")
+	}
+
+	var code int
+	if config.NameServer != "" {
+		cs = C.CString(config.NameServer)
+		code = int(C.SetPushConsumerNameServerAddress(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf(fmt.Sprintf("PushConsumer Set NameServerAddress error, code is: %d", code)))
+		}
+	}
+
+	if config.NameServerDomain != "" {
+		cs = C.CString(config.NameServerDomain)
+		code = int(C.SetPushConsumerNameServerDomain(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("PushConsumer Set NameServerDomain error, code is: %d", code))
+		}
+	}
+
+	if config.InstanceName != "" {
+		cs = C.CString(config.InstanceName)
+		code = int(C.SetPushConsumerInstanceName(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("PushConsumer Set InstanceName error, code is: %d, "+
+				"please check cpp logs for details", code))
+		}
+	}
+
 	if config.Credentials != nil {
-		C.SetPushConsumerSessionCredentials(cconsumer,
-			C.CString(config.Credentials.AccessKey),
-			C.CString(config.Credentials.SecretKey),
-			C.CString(config.Credentials.Channel))
+		ak := C.CString(config.Credentials.AccessKey)
+		sk := C.CString(config.Credentials.SecretKey)
+		ch := C.CString(config.Credentials.Channel)
+		code = int(C.SetPushConsumerSessionCredentials(cconsumer, ak, sk, ch))
+		C.free(unsafe.Pointer(ak))
+		C.free(unsafe.Pointer(sk))
+		C.free(unsafe.Pointer(ch))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("PushConsumer Set Credentials error, code is: %d", int(code)))
+		}
+	}
+
+	if config.LogC != nil {
+		cs = C.CString(config.LogC.Path)
+		code = int(C.SetProducerLogPath(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
+		}
+
+		code = int(C.SetProducerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
+		}
+
+		code = int(C.SetProducerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
+		}
+	}
+
+	if config.ThreadCount > 0 {
+		code = int(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("PushConsumer Set ThreadCount error, code is: %d", int(code)))
+		}
+	}
+
+	if config.MessageBatchMaxSize > 0 {
+		code = int(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("PushConsumer Set MessageBatchMaxSize error, code is: %d", int(code)))
+		}
+	}
+
+	code = int(C.SetPushConsumerMessageModel(cconsumer, (C.CMessageModel)(config.Model)))
+
+	if code != 0 {
+		return nil, errors.New(fmt.Sprintf("PushConsumer Set ConsumerMessageModel error, code is: %d", int(code)))
+	}
+
+	code = int(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
+
+	if code != 0 {
+		return nil, errors.New(fmt.Sprintf("PushConsumer RegisterMessageCallback error, code is: %d", int(code)))
 	}
 
 	consumer.cconsumer = cconsumer
@@ -89,13 +187,24 @@
 }
 
 func (c *defaultPushConsumer) Start() error {
-	C.StartPushConsumer(c.cconsumer)
+	code := C.StartPushConsumer(c.cconsumer)
+	if code != 0 {
+		return errors.New(fmt.Sprintf("start PushConsumer error, code is: %d", int(code)))
+	}
 	return nil
 }
 
 func (c *defaultPushConsumer) Shutdown() error {
-	C.ShutdownPushConsumer(c.cconsumer)
+	code := C.ShutdownPushConsumer(c.cconsumer)
+
+	if code != 0 {
+		log.Warnf("Shutdown PushConsumer error, code is: %d, please check cpp logs for details", code)
+	}
+
 	C.DestroyPushConsumer(c.cconsumer)
+	if code != 0 {
+		log.Warnf("Destroy PushConsumer error, code is: %d, please check cpp logs for details", code)
+	}
 	return nil
 }
 
@@ -104,9 +213,11 @@
 	if exist {
 		return nil
 	}
-	err := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
-	fmt.Println("err:", err)
+	code := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
+	if code != 0 {
+		return errors.New(fmt.Sprintf("subscribe topic: %s failed, error code is: %d", topic, int(code)))
+	}
 	c.funcsMap.Store(topic, consumeFunc)
-	fmt.Printf("subscribe topic[%s] with expression[%s] successfully. \n", topic, expression)
+	log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
 	return nil
 }
diff --git a/core/version.go b/core/version.go
index 5700714..4ac86d5 100644
--- a/core/version.go
+++ b/core/version.go
@@ -16,8 +16,8 @@
  */
 package rocketmq
 
-const GO_CLIENT_VERSION = "Go Client V1.0.0, BuildTime:2018.10.30"
+const GoClientVersion = "Go Client V1.0.0, BuildTime:2018.10.30"
 
 func GetVersion() (version string) {
-	return GO_CLIENT_VERSION
+	return GoClientVersion
 }
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
index 11fb864..5559d87 100644
--- a/examples/orderproducer/producer.go
+++ b/examples/orderproducer/producer.go
@@ -83,11 +83,16 @@
 		return
 	}
 
-	producer := rocketmq.NewProducer(&rocketmq.ProducerConfig{
-		GroupID:    groupID,
-		NameServer: namesrvAddrs,
-	})
-  
+	cfg := &rocketmq.ProducerConfig{}
+	cfg.GroupID = groupID
+	cfg.NameServer = namesrvAddrs
+
+	producer, err := rocketmq.NewProducer(cfg)
+	if err != nil {
+		fmt.Println("create Producer failed, error:", err)
+		return
+	}
+
 	producer.Start()
 	defer producer.Shutdown()
 
diff --git a/examples/producer.go b/examples/producer.go
index 98a45b0..56ecee2 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -24,20 +24,23 @@
 )
 
 func main() {
-	SendMessage()
-}
+	cfg := &rocketmq.ProducerConfig{}
+	cfg.GroupID = "testGroup"
+	cfg.NameServer = "47.101.55.250:9876"
+	producer, err := rocketmq.NewProducer(cfg)
+	if err != nil {
+		fmt.Println("create Producer failed, error:", err)
+		return
+	}
 
-func SendMessage() {
-	producer := rocketmq.NewProducer(&rocketmq.ProducerConfig{GroupID: "testGroup", NameServer: "localhost:9876"})
 	producer.Start()
 	defer producer.Shutdown()
 
 	fmt.Printf("Producer: %s started... \n", producer)
 	for i := 0; i < 100; i++ {
 		msg := fmt.Sprintf("Hello RocketMQ-%d", i)
-		result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+		result := producer.SendMessageSync(&rocketmq.Message{Topic: "wwf1", Body: msg})
 		fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
 	}
 	time.Sleep(10 * time.Second)
-	producer.Shutdown()
 }
diff --git a/examples/pullconsumer/consumer.go b/examples/pullconsumer/consumer.go
index 3643a22..d63729e 100644
--- a/examples/pullconsumer/consumer.go
+++ b/examples/pullconsumer/consumer.go
@@ -59,13 +59,14 @@
 		return
 	}
 
-	consumer, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
-		GroupID:    groupID,
-		NameServer: namesrvAddrs,
-		Log: &rocketmq.LogConfig{
-			Path: "example",
-		},
-	})
+	cfg := &rocketmq.PullConsumerConfig{}
+	cfg.GroupID = groupID
+	cfg.NameServer = namesrvAddrs
+	cfg.LogC = &rocketmq.LogConfig{
+		Path: "example",
+	}
+
+	consumer, err := rocketmq.NewPullConsumer(cfg)
 	if err != nil {
 		fmt.Printf("new pull consumer error:%s\n", err)
 		return
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
index e65613b..d17559c 100644
--- a/examples/push_consumer.go
+++ b/examples/push_consumer.go
@@ -24,13 +24,18 @@
 )
 
 func main() {
-	PushConsumeMessage()
-}
-
-func PushConsumeMessage() {
 	fmt.Println("Start Receiving Messages...")
-	consumer, _ := rocketmq.NewPushConsumer(&rocketmq.ConsumerConfig{GroupID: "testGroupId", NameServer: "localhost:9876",
-		ConsumerThreadCount: 2, MessageBatchMaxSize: 16})
+	cfg := &rocketmq.PushConsumerConfig{
+		ThreadCount:         2,
+		MessageBatchMaxSize: 16,
+	}
+	cfg.GroupID = "testGroupId"
+	cfg.NameServer = "localhost:9876"
+	consumer, err := rocketmq.NewPushConsumer(cfg)
+	if err != nil {
+		fmt.Println("create Consumer failed, error:", err)
+		return
+	}
 
 	// MUST subscribe topic before consumer started.
 	consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {