Implements the pull consumer
diff --git a/core/log.go b/core/log.go
new file mode 100644
index 0000000..d9b87e8
--- /dev/null
+++ b/core/log.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package rocketmq
+
+/*
+#include "rocketmq/CCommon.h"
+*/
+import "C"
+
+// LogLevel the log level
+type LogLevel int
+
+// predefined log level
+const (
+	LogLevelFatal = LogLevel(C.E_LOG_LEVEL_FATAL)
+	LogLevelError = LogLevel(C.E_LOG_LEVEL_ERROR)
+	LogLevelWarn  = LogLevel(C.E_LOG_LEVEL_WARN)
+	LogLevelInfo  = LogLevel(C.E_LOG_LEVEL_INFO)
+	LogLevelDebug = LogLevel(C.E_LOG_LEVEL_DEBUG)
+	LogLevelTrace = LogLevel(C.E_LOG_LEVEL_TRACE)
+	LogLevelNum   = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM)
+)
+
+// LogConfig the log configuration for the pull consumer
+type LogConfig struct {
+	Path     string
+	FileNum  int
+	FileSize int64
+	Level    LogLevel
+}
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index b9c7338..e236ee6 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -14,6 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
+
 package rocketmq
 
 /*
@@ -27,18 +28,37 @@
 import "C"
 
 import (
+	"errors"
 	"fmt"
 	"sync"
 	"unsafe"
 )
 
+// PullStatus pull status
+type PullStatus int
+
+// predefined pull status
+const (
+	PullFound         = PullStatus(C.E_FOUND)
+	PullNoNewMsg      = PullStatus(C.E_NO_NEW_MSG)
+	PullNoMatchedMsg  = PullStatus(C.E_NO_MATCHED_MSG)
+	PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL)
+	PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT)
+)
+
+// PullConsumerConfig the configuration for the pull consumer
+type PullConsumerConfig struct {
+	GroupID     string
+	NameServer  string
+	Credentials *SessionCredentials
+	Log         *LogConfig
+}
+
 // DefaultPullConsumer default consumer pulling the message
 type DefaultPullConsumer struct {
-	config    *ConsumerConfig
+	PullConsumerConfig
 	cconsumer *C.struct_CPullConsumer
 	funcsMap  sync.Map
-
-	resources []*C.char
 }
 
 func (c *DefaultPullConsumer) String() string {
@@ -47,48 +67,141 @@
 		topics += key.(string) + ", "
 		return true
 	})
-	return fmt.Sprintf("[%s, subcribed topics: [%s]]", c.config, topics)
+	return fmt.Sprintf("[%+v, subcribed topics: [%s]]", c.PullConsumerConfig, topics)
+}
+
+// NewPullConsumer creates one pull consumer
+func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
+	cs := C.CString(conf.GroupID)
+	cconsumer := C.CreatePullConsumer(cs)
+	C.free(unsafe.Pointer(cs))
+
+	cs = C.CString(conf.NameServer)
+	C.SetPullConsumerNameServerAddress(cconsumer, cs)
+	C.free(unsafe.Pointer(cs))
+
+	log := conf.Log
+	if log != nil {
+		cs = C.CString(log.Path)
+		if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
+			return nil, errors.New("new pull consumer error:set log path failed")
+		}
+		C.free(unsafe.Pointer(cs))
+
+		if C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(log.FileNum), C.long(log.FileSize)) != 0 {
+			return nil, errors.New("new pull consumer error:set log file num and size failed")
+		}
+		if C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(log.Level)) != 0 {
+			return nil, errors.New("new pull consumer error:set log level failed")
+		}
+	}
+
+	if conf.Credentials != nil {
+		ak := C.CString(conf.Credentials.AccessKey)
+		sk := C.CString(conf.Credentials.SecretKey)
+		ch := C.CString(conf.Credentials.Channel)
+		C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch)
+
+		C.free(unsafe.Pointer(ak))
+		C.free(unsafe.Pointer(sk))
+		C.free(unsafe.Pointer(ch))
+	}
+
+	return &DefaultPullConsumer{PullConsumerConfig: *conf, cconsumer: cconsumer}, nil
 }
 
 // Start starts the pulling conumser
 func (c *DefaultPullConsumer) Start() error {
-	C.StartPullConsumer(c.cconsumer)
+	r := C.StartPullConsumer(c.cconsumer)
+	if r != 0 {
+		return fmt.Errorf("start failed, code:%d", r)
+	}
 	return nil
 }
 
 // Shutdown shutdown the pulling conumser
 func (c *DefaultPullConsumer) Shutdown() error {
-	C.ShutdownPullConsumer(c.cconsumer)
-	C.DestroyPullConsumer(c.cconsumer)
-	for _, r := range c.resources {
-		C.free(unsafe.Pointer(r))
+	r := C.ShutdownPullConsumer(c.cconsumer)
+	if r != 0 {
+		return fmt.Errorf("shutdown failed, code:%d", r)
+	}
+
+	r = C.DestroyPullConsumer(c.cconsumer)
+	if r != 0 {
+		return fmt.Errorf("destory failed, code:%d", r)
 	}
 	return nil
 }
 
+// FetchSubscriptionMessageQueues fetchs the topic's subcripted message queues
 func (c *DefaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
+	var (
+		q    *C.struct__CMessageQueue_
+		size C.int
+	)
 
+	ctopic := C.CString(topic)
+	C.FetchSubscriptionMessageQueues(c.cconsumer, ctopic, &q, &size)
+	C.free(unsafe.Pointer(ctopic))
+	if size == 0 {
+		return nil
+	}
+
+	qs := make([]MessageQueue, size)
+	for i := range qs {
+		cq := (*C.struct__CMessageQueue_)(
+			unsafe.Pointer(uintptr(unsafe.Pointer(q)) + uintptr(i)*unsafe.Sizeof(*q)),
+		)
+		qs[i].ID, qs[i].Broker, qs[i].Topic = int(cq.queueId), C.GoString(&cq.brokerName[0]), topic
+	}
+	C.ReleaseSubscriptionMessageQueue(q)
+
+	return qs
 }
 
-/*
+// PullResult the pull result
+type PullResult struct {
+	NextBeginOffset int64
+	MinOffset       int64
+	MaxOffset       int64
+	Status          PullStatus
+	Messages        []*MessageExt
+}
 
-CPullConsumer *CreatePullConsumer(const char *groupId);
-int DestroyPullConsumer(CPullConsumer *consumer);
-int StartPullConsumer(CPullConsumer *consumer);
-int ShutdownPullConsumer(CPullConsumer *consumer);
-int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
-const char *GetPullConsumerGroupID(CPullConsumer *consumer);
-int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv);
-int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
-                                     const char *channel);
-int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
-int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize);
-int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);
+// Pull pulling the message from the specified message queue
+func (c *DefaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
+	cmq := C.struct__CMessageQueue_{
+		queueId: C.int(mq.ID),
+	}
 
-int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs , int* size);
-int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);
+	copy(cmq.topic[:], *(*[]C.char)(unsafe.Pointer(&mq.Topic)))
+	copy(cmq.brokerName[:], *(*[]C.char)(unsafe.Pointer(&mq.Broker)))
+	fmt.Printf("%+v\n", []byte(mq.Topic))
+	fmt.Printf("%+v\n", cmq.topic)
+	fmt.Printf("%+v\n", []byte(mq.Broker))
+	fmt.Printf("%+v\n", cmq.brokerName)
 
-CPullResult Pull(CPullConsumer *consumer,const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums);
-int ReleasePullResult(CPullResult pullResult);
+	csubExpr := C.CString(subExpression)
+	cpullResult := C.Pull(c.cconsumer, &cmq, csubExpr, C.longlong(offset), C.int(maxNums))
 
-*/
+	pullResult := PullResult{
+		NextBeginOffset: int64(cpullResult.nextBeginOffset),
+		MinOffset:       int64(cpullResult.minOffset),
+		MaxOffset:       int64(cpullResult.maxOffset),
+		Status:          PullStatus(cpullResult.pullStatus),
+	}
+	if cpullResult.size > 0 {
+		msgs := make([]*MessageExt, cpullResult.size)
+		for i := range msgs {
+			msgs[i] = cmsgExtToGo((*C.struct_CMessageExt)(
+				unsafe.Pointer(
+					uintptr(unsafe.Pointer(cpullResult.msgFoundList)) + uintptr(i)*unsafe.Sizeof(*cpullResult.msgFoundList),
+				),
+			))
+		}
+	}
+
+	C.free(unsafe.Pointer(csubExpr))
+	C.ReleasePullResult(cpullResult)
+	return pullResult
+}
diff --git a/core/queue.go b/core/queue.go
new file mode 100644
index 0000000..47f10ef
--- /dev/null
+++ b/core/queue.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package rocketmq
+
+import "fmt"
+
+// MessageQueue the queue of the message
+type MessageQueue struct {
+	Topic  string
+	Broker string
+	ID     int
+}
+
+func (q *MessageQueue) String() string {
+	return fmt.Sprintf("broker:%s, topic:%s, id:%d", q.Broker, q.Topic, q.ID)
+}
diff --git a/examples/pullconsumer/consumer.go b/examples/pullconsumer/consumer.go
new file mode 100644
index 0000000..875d5eb
--- /dev/null
+++ b/examples/pullconsumer/consumer.go
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+	"flag"
+	"fmt"
+	"time"
+
+	rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+var (
+	namesrvAddrs string
+	topic        string
+	body         string
+	groupID      string
+	msgCount     int64
+	workerCount  int
+)
+
+func init() {
+	flag.StringVar(&namesrvAddrs, "n", "", "name server address")
+	flag.StringVar(&topic, "t", "", "topic")
+	flag.StringVar(&groupID, "g", "", "group")
+}
+
+// ./consumer -n "localhost:9988" -t test -g local_test
+func main() {
+	flag.Parse()
+
+	if namesrvAddrs == "" {
+		fmt.Println("empty namesrv")
+		return
+	}
+
+	if topic == "" {
+		fmt.Println("empty topic")
+		return
+	}
+
+	if groupID == "" {
+		fmt.Println("empty groupID")
+		return
+	}
+
+	consumer, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
+		GroupID:    groupID,
+		NameServer: namesrvAddrs,
+		Log: &rocketmq.LogConfig{
+			Path: "example",
+		},
+	})
+	if err != nil {
+		fmt.Printf("new pull consumer error:%s\n", err)
+		return
+	}
+
+	err = consumer.Start()
+	if err != nil {
+		fmt.Printf("start consumer error:%s\n", err)
+		return
+	}
+	defer consumer.Shutdown()
+
+	mqs := consumer.FetchSubscriptionMessageQueues(topic)
+	fmt.Printf("fetch subscription mqs:%+v\n", mqs)
+
+	total, offsets, now := 0, map[int]int64{}, time.Now()
+
+PULL:
+	for {
+		for _, mq := range mqs {
+			pr := consumer.Pull(mq, "*", offsets[mq.ID], 32)
+			total += len(pr.Messages)
+			fmt.Printf("pull %s, result:%+v\n", mq.String(), pr)
+
+			switch pr.Status {
+			case rocketmq.PullNoNewMsg:
+				break PULL
+			case rocketmq.PullFound:
+				fallthrough
+			case rocketmq.PullNoMatchedMsg:
+				fallthrough
+			case rocketmq.PullOffsetIllegal:
+				offsets[mq.ID] = pr.NextBeginOffset
+			case rocketmq.PullBrokerTimeout:
+				fmt.Println("broker timeout occur")
+			}
+		}
+	}
+
+	var timePerMessage time.Duration
+	if total > 0 {
+		timePerMessage = time.Since(now) / time.Duration(total)
+	}
+	fmt.Printf("total message:%d, per message time:%d\n", total, timePerMessage)
+}