Merge pull request #2 from wenfengwang/master
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..485dee6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
diff --git a/README.md b/README.md
index 9b7deb9..26ed01e 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,7 @@
## RocketMQ Client Go [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
-* RocketMQ Go client is developed on top of [rocketmq-client-go](https://github.com/apache/rocketmq-client-go), which has been proven robust and widely adopted within Alibaba Group by many business units for more than three years.
+* The client is using cgo to call [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), which has been proven robust and widely adopted within Alibaba Group by many business units for more than three years.
+
----------
## Features
diff --git a/core/api.go b/core/api.go
new file mode 100644
index 0000000..92619f3
--- /dev/null
+++ b/core/api.go
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+import "fmt"
+
+func Version() (version string) {
+ return GetVersion()
+}
+
+// NewProduer create a new producer with config
+func NewProduer(config *ProducerConfig) Producer {
+ return newDefaultProducer(config)
+}
+
+// ProducerConfig define a producer
+type ProducerConfig struct {
+ GroupID string
+ NameServer string
+ Credentials *SessionCredentials
+}
+
+func (config *ProducerConfig) String() string {
+ // For security, don't print Credentials default.
+ return fmt.Sprintf("[groupId: %s, nameServer: %s]", config.NameServer, config.GroupID)
+}
+
+type Producer interface {
+ baseAPI
+ // SendMessageSync send a message with sync
+ SendMessageSync(msg *Message) SendResult
+
+ // SendMessageAsync send a message with async
+ SendMessageAsync(msg *Message)
+}
+
+// NewPushConsumer create a new consumer with config.
+func NewPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+ return newPushConsumer(config)
+}
+
+// ConsumerConfig define a new conusmer.
+type ConsumerConfig struct {
+ GroupID string
+ NameServer string
+ ConsumerThreadCount int
+ MessageBatchMaxSize int
+ //ConsumerInstanceName int
+ Credentials *SessionCredentials
+}
+
+func (config *ConsumerConfig) String() string {
+ return fmt.Sprintf("[groupId: %s, nameServer: %s, consumerThreadCount: %d, messageBatchMaxSize: %d]",
+ config.GroupID, config.NameServer, config.ConsumerThreadCount, config.MessageBatchMaxSize)
+}
+
+type PushConsumer interface {
+ baseAPI
+ // Subscribe a new topic with specify filter expression and consume function.
+ Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
+}
+
+type SessionCredentials struct {
+ AccessKey string
+ SecretKey string
+ Channel string
+}
+
+func (session *SessionCredentials) String() string {
+ return fmt.Sprintf("[accessKey: %s, secretKey: %s, channel: %s]",
+ session.AccessKey, session.SecretKey, session.Channel)
+}
+
+type SendResult struct {
+ Status SendStatus
+ MsgId string
+ Offset int64
+}
+
+func (result SendResult) String() string {
+ return fmt.Sprintf("[status: %s, messageId: %s, offset: %d]", result.Status, result.MsgId, result.Offset)
+
+}
+
+type baseAPI interface {
+ Start() error
+ Shutdown() error
+}
diff --git a/src/client/version.go b/core/api_test.go
similarity index 78%
copy from src/client/version.go
copy to core/api_test.go
index 2137825..05aa6cf 100644
--- a/src/client/version.go
+++ b/core/api_test.go
@@ -14,9 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package client
+package rocketmq
-var GO_CLIENT_VERSION = "Go Client V1.0.0, BuildTime:2018.10.30"
-func GetVersion() (version string){
- return GO_CLIENT_VERSION
+import (
+ "fmt"
+ "testing"
+)
+
+func TestVersion(test *testing.T) {
+ fmt.Println("-----TestGetVersion Start----")
+ version := Version()
+ fmt.Println(version)
+ fmt.Println("-----TestGetVersion Finish----")
}
diff --git a/core/cfuns.go b/core/cfuns.go
new file mode 100644
index 0000000..7fe4ffe
--- /dev/null
+++ b/core/cfuns.go
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+/*
+#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
+#include "rocketmq/CMessageExt.h"
+#include "rocketmq/CPushConsumer.h"
+*/
+import "C"
+import (
+ "sync"
+)
+
+var pushConsumerMap sync.Map
+
+//export consumeMessageCallback
+func consumeMessageCallback(cconsumer *C.CPushConsumer, msg *C.CMessageExt) C.int {
+ consumer, exist := pushConsumerMap.Load(cconsumer)
+ if !exist {
+ return C.int(ReConsumeLater)
+ }
+
+ msgExt := cmsgExtToGo(msg)
+ cfunc, exist := consumer.(*defaultPushConsumer).funcsMap.Load(msgExt.Topic)
+ if !exist {
+ return C.int(ReConsumeLater)
+ }
+ return C.int(cfunc.(func(msg *MessageExt) ConsumeStatus)(msgExt))
+}
diff --git a/core/message.go b/core/message.go
new file mode 100644
index 0000000..98dc6cb
--- /dev/null
+++ b/core/message.go
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+//#include "rocketmq/CMessage.h"
+//#include "rocketmq/CMessageExt.h"
+import "C"
+import "fmt"
+
+type Message struct {
+ Topic string
+ Keys string
+ // improve: maybe []byte is better.
+ Body string
+}
+
+func (msg *Message) String() string {
+ return fmt.Sprintf("[topic: %s, keys: %s, body: %s]", msg.Topic, msg.Keys, msg.Body)
+}
+
+type MessageExt struct {
+ Message
+ MessageID string
+ Tags string
+ // improve: is there is a method convert c++ map to go variable?
+ cmsgExt *C.struct_CMessageExt
+ //Properties string
+}
+
+func (msgExt *MessageExt) String() string {
+ return fmt.Sprintf("[messageId: %s, %s, Tags: %s]", msgExt.MessageID, msgExt.Message, msgExt.Tags)
+}
+
+func (msgExt *MessageExt) GetProperty(key string) string {
+ return C.GoString(C.GetMessageProperty(msgExt.cmsgExt, C.CString(key)))
+}
+
+func cmsgToGo(cmsg *C.struct_CMessage) *Message {
+ defer C.DestroyMessage(cmsg)
+ gomsg := &Message{}
+
+ return gomsg
+}
+
+func goMsgToC(gomsg *Message) *C.struct_CMessage {
+ var cmsg = C.CreateMessage(C.CString(gomsg.Topic))
+
+ // int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
+ C.SetMessageKeys(cmsg, C.CString(gomsg.Keys))
+
+ // int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
+ C.SetMessageBody(cmsg, C.CString(gomsg.Body))
+ return cmsg
+}
+
+//
+func cmsgExtToGo(cmsg *C.struct_CMessageExt) *MessageExt {
+ //defer C.DestroyMessageExt(cmsg)
+ gomsg := &MessageExt{}
+
+ gomsg.Topic = C.GoString(C.GetMessageTopic(cmsg))
+ gomsg.Body = C.GoString(C.GetMessageBody(cmsg))
+ gomsg.Keys = C.GoString(C.GetMessageKeys(cmsg))
+ gomsg.Tags = C.GoString(C.GetMessageTags(cmsg))
+ gomsg.MessageID = C.GoString(C.GetMessageId(cmsg))
+
+ return gomsg
+}
+
+//
+//func goMsgExtToC(gomsg *MessageExt) *C.struct_CMessageExt {
+// var cmsg = C.CreateMessage(C.CString(gomsg.Topic))
+//
+// // int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
+// C.SetMessageKeys(cmsg, C.CString(gomsg.Keys))
+//
+// // int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
+// C.SetMessageBody(cmsg, C.CString(gomsg.Body))
+// return cmsg
+//}
diff --git a/src/test/client_test.go b/core/message_test.go
similarity index 75%
rename from src/test/client_test.go
rename to core/message_test.go
index 3b12421..1d1f2a8 100644
--- a/src/test/client_test.go
+++ b/core/message_test.go
@@ -14,17 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package client_test
+package rocketmq
import (
- "fmt"
- "testing"
- "../client"
+ "testing"
)
-func TestVersion(test *testing.T){
- fmt.Println("-----TestGetVersion Start----")
- version := client.Version();
- fmt.Println(version)
- fmt.Println("-----TestGetVersion Finish----")
+func TestGetMessageTopic(test *testing.T) {
+ //fmt.Println("-----TestGetMessageTopic Start----")
+ //msg := rocketmq.CreateMessage("testTopic")
+ //rocketmq.DestroyMessage(msg)
+ //fmt.Println("-----TestGetMessageTopic Finish----")
}
diff --git a/core/producer.go b/core/producer.go
new file mode 100644
index 0000000..3d7e022
--- /dev/null
+++ b/core/producer.go
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+//#include "rocketmq/CMessage.h"
+//#include "rocketmq/CProducer.h"
+//#include "rocketmq/CSendResult.h"
+import "C"
+import "fmt"
+
+type SendStatus int
+
+const (
+ SendOK = SendStatus(C.E_SEND_OK)
+ SendFlushDiskTimeout = SendStatus(C.E_SEND_FLUSH_DISK_TIMEOUT)
+ SendFlushSlaveTimeout = SendStatus(C.E_SEND_FLUSH_SLAVE_TIMEOUT)
+ SendSlaveNotAvailable = SendStatus(C.E_SEND_SLAVE_NOT_AVAILABLE)
+)
+
+func (status SendStatus) String() string {
+ switch status {
+ case SendOK:
+ return "SendOK"
+ case SendFlushDiskTimeout:
+ return "SendFlushDiskTimeout"
+ case SendFlushSlaveTimeout:
+ return "SendFlushSlaveTimeout"
+ case SendSlaveNotAvailable:
+ return "SendSlaveNotAvailable"
+ default:
+ return "Unknown"
+ }
+}
+
+func newDefaultProducer(config *ProducerConfig) *defaultProducer {
+ producer := &defaultProducer{config: config}
+ producer.cproduer = C.CreateProducer(C.CString(config.GroupID))
+ code := int(C.SetProducerNameServerAddress(producer.cproduer, C.CString(producer.config.NameServer)))
+ if config.Credentials != nil {
+ ret := C.SetProducerSessionCredentials(producer.cproduer,
+ C.CString(config.Credentials.AccessKey),
+ C.CString(config.Credentials.SecretKey),
+ C.CString(config.Credentials.Channel))
+ code = int(ret)
+ }
+ switch code {
+
+ }
+ return producer
+}
+
+type defaultProducer struct {
+ config *ProducerConfig
+ cproduer *C.struct_CProducer
+}
+
+func (p *defaultProducer) String() string {
+ return p.config.String()
+}
+
+// Start the producer.
+func (p *defaultProducer) Start() error {
+ err := int(C.StartProducer(p.cproduer))
+ // TODO How to process err code.
+ fmt.Printf("result: %v \n", err)
+ return nil
+}
+
+// Shutdown the producer.
+func (p *defaultProducer) Shutdown() error {
+ defer C.DestroyProducer(p.cproduer)
+ err := C.ShutdownProducer(p.cproduer)
+
+ // TODO How to process err code.
+ fmt.Printf("result: %v \n", err)
+ return nil
+}
+
+func (p *defaultProducer) SendMessageSync(msg *Message) SendResult {
+ cmsg := goMsgToC(msg)
+ defer C.DestroyMessage(cmsg)
+
+ var sr C.struct__SendResult_
+ C.SendMessageSync(p.cproduer, cmsg, &sr)
+
+ result := SendResult{}
+ result.Status = SendStatus(sr.sendStatus)
+ result.MsgId = C.GoString(&sr.msgId[0])
+ result.Offset = int64(sr.offset)
+ return result
+}
+
+func (p *defaultProducer) SendMessageAsync(msg *Message) {
+ // TODO
+}
diff --git a/core/producer_test.go b/core/producer_test.go
new file mode 100644
index 0000000..04c007b
--- /dev/null
+++ b/core/producer_test.go
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+//func TestCreateMessage(test *testing.T){
+// fmt.Println("-----TestCreateMessage Start----")
+// rocketmq.CreateMessage("testTopic")
+// fmt.Println("-----TestCreateMessage Finish----")
+//}
+//
+//func TestDestroyMessage(test *testing.T){
+// fmt.Println("-----TestCreateMessage Start----")
+// msg := rocketmq.CreateMessage("testTopic")
+// rocketmq.DestroyMessage(msg)
+// fmt.Println("-----TestCreateMessage Finish----")
+//}
+//func TestSetMessageKeys(test *testing.T){
+// fmt.Println("-----TestSetMessageKeys Start----")
+// msg := rocketmq.CreateMessage("testTopic")
+// len := rocketmq.SetMessageKeys(msg,"testKey")
+// fmt.Println("Len:",len)
+// rocketmq.DestroyMessage(msg)
+// fmt.Println("-----TestCreateMessage Finish----")
+//}
+//func TestCreateProducer(test *testing.T){
+// fmt.Println("-----TestCreateProducer Start----")
+// rocketmq.CreateProducer("testGroupId")
+// fmt.Println("-----TestCreateProducer Finish----")
+//}
diff --git a/core/push_consumer.go b/core/push_consumer.go
new file mode 100644
index 0000000..e431d95
--- /dev/null
+++ b/core/push_consumer.go
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+/*
+#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
+#include "rocketmq/CMessageExt.h"
+#include "rocketmq/CPushConsumer.h"
+#include "stdio.h"
+
+extern int consumeMessageCallback(CPushConsumer *consumer, CMessageExt *msg);
+
+int callback_cgo(CPushConsumer *consumer, CMessageExt *msg) {
+ return consumeMessageCallback(consumer, msg);
+}
+*/
+import "C"
+
+import (
+ "fmt"
+ "sync"
+ "unsafe"
+)
+
+type ConsumeStatus int
+
+const (
+ ConsumeSuccess = ConsumeStatus(C.E_CONSUME_SUCCESS)
+ ReConsumeLater = ConsumeStatus(C.E_RECONSUME_LATER)
+)
+
+func (status ConsumeStatus) String() string {
+ switch status {
+ case ConsumeSuccess:
+ return "ConsumeSuccess"
+ case ReConsumeLater:
+ return "ReConsumeLater"
+ default:
+ return "Unknown"
+ }
+}
+
+type defaultPushConsumer struct {
+ config *ConsumerConfig
+ cconsumer *C.struct_CPushConsumer
+ funcsMap sync.Map
+}
+
+func (c *defaultPushConsumer) String() string {
+ topics := ""
+ c.funcsMap.Range(func(key, value interface{}) bool {
+ topics += key.(string) + ", "
+ return true
+ })
+ return fmt.Sprintf("[%s, subcribed topics: [%s]]", c.config, topics)
+}
+
+func newPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+ consumer := &defaultPushConsumer{config: config}
+ cconsumer := C.CreatePushConsumer(C.CString(config.GroupID))
+ C.SetPushConsumerNameServerAddress(cconsumer, C.CString(config.NameServer))
+ C.SetPushConsumerThreadCount(cconsumer, C.int(config.ConsumerThreadCount))
+ C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.ConsumerThreadCount))
+ C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo)))
+ if config.Credentials != nil {
+ C.SetPushConsumerSessionCredentials(cconsumer,
+ C.CString(config.Credentials.AccessKey),
+ C.CString(config.Credentials.SecretKey),
+ C.CString(config.Credentials.Channel))
+ }
+
+ consumer.cconsumer = cconsumer
+ pushConsumerMap.Store(cconsumer, consumer)
+ return consumer, nil
+}
+
+func (c *defaultPushConsumer) Start() error {
+ C.StartPushConsumer(c.cconsumer)
+ return nil
+}
+
+func (c *defaultPushConsumer) Shutdown() error {
+ C.ShutdownPushConsumer(c.cconsumer)
+ C.DestroyPushConsumer(c.cconsumer)
+ return nil
+}
+
+func (c *defaultPushConsumer) Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error {
+ _, exist := c.funcsMap.Load(topic)
+ if exist {
+ return nil
+ }
+ err := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
+ fmt.Println("err:", err)
+ c.funcsMap.Store(topic, consumeFunc)
+ fmt.Printf("subscribe topic[%s] with expression[%s] successfully. \n", topic, expression)
+ return nil
+}
diff --git a/src/client/version.go b/core/push_consumer_test.go
similarity index 69%
copy from src/client/version.go
copy to core/push_consumer_test.go
index 2137825..22c1444 100644
--- a/src/client/version.go
+++ b/core/push_consumer_test.go
@@ -14,9 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package client
+package rocketmq
-var GO_CLIENT_VERSION = "Go Client V1.0.0, BuildTime:2018.10.30"
-func GetVersion() (version string){
- return GO_CLIENT_VERSION
-}
+//import "fmt"
+//import "testing"
+//import "../client"
+//
+//func TestCreatePushConsumer(test *testing.T){
+// fmt.Println("-----TestCreateProducer Start----")
+// consumer := rocketmq.CreatePushConsumer("testGroupId")
+// rocketmq.DestroyPushConsumer(consumer)
+// fmt.Println("-----TestCreateProducer Finish----")
+//}
diff --git a/src/client/version.go b/core/version.go
similarity index 84%
rename from src/client/version.go
rename to core/version.go
index 2137825..2466671 100644
--- a/src/client/version.go
+++ b/core/version.go
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package client
+package rocketmq
-var GO_CLIENT_VERSION = "Go Client V1.0.0, BuildTime:2018.10.30"
-func GetVersion() (version string){
- return GO_CLIENT_VERSION
+var GO_CLIENT_VERSION = "Go Client V1.0.0, BuildTime:2018.10.30"
+
+func GetVersion() (version string) {
+ return GO_CLIENT_VERSION
}
diff --git a/src/test/version_test.go b/core/version_test.go
similarity index 73%
rename from src/test/version_test.go
rename to core/version_test.go
index 59f35ba..85f43c1 100644
--- a/src/test/version_test.go
+++ b/core/version_test.go
@@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package client_test
+package rocketmq
-import (
- "fmt"
- "testing"
- "../client"
-)
-
-func TestGetVersion(test *testing.T){
- fmt.Println("-----TestGetVersion Start----")
- version := client.GetVersion();
- fmt.Println(version)
- fmt.Println("-----TestGetVersion Finish----")
-}
+//import (
+// "fmt"
+// "testing"
+// "../client"
+//)
+//
+//func TestGetVersion(test *testing.T){
+// fmt.Println("-----TestGetVersion Start----")
+// version := rocketmq.GetVersion();
+// fmt.Println(version)
+// fmt.Println("-----TestGetVersion Finish----")
+//}
diff --git a/examples/producer.go b/examples/producer.go
new file mode 100644
index 0000000..a6c801c
--- /dev/null
+++ b/examples/producer.go
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "fmt"
+ "github.com/apache/rocketmq-client-go/core"
+ "time"
+)
+
+func main() {
+ SendMessage()
+}
+
+func SendMessage() {
+ producer := rocketmq.NewProduer(&rocketmq.ProducerConfig{GroupID: "testGroup", NameServer: "localhost:9876"})
+ producer.Start()
+ defer producer.Shutdown()
+
+ fmt.Printf("Producer: %s started... \n", producer)
+ for i := 0; i < 100; i++ {
+ msg := fmt.Sprintf("Hello RocketMQ-%d", i)
+ result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+ fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
+ }
+ time.Sleep(10 * time.Second)
+ producer.Shutdown()
+}
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
new file mode 100644
index 0000000..e65613b
--- /dev/null
+++ b/examples/push_consumer.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "fmt"
+ "github.com/apache/rocketmq-client-go/core"
+ "time"
+)
+
+func main() {
+ PushConsumeMessage()
+}
+
+func PushConsumeMessage() {
+ fmt.Println("Start Receiving Messages...")
+ consumer, _ := rocketmq.NewPushConsumer(&rocketmq.ConsumerConfig{GroupID: "testGroupId", NameServer: "localhost:9876",
+ ConsumerThreadCount: 2, MessageBatchMaxSize: 16})
+
+ // MUST subscribe topic before consumer started.
+ consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
+ fmt.Printf("A message received: \"%s\" \n", msg.Body)
+ return rocketmq.ConsumeSuccess
+ })
+
+ consumer.Start()
+ defer consumer.Shutdown()
+ fmt.Printf("consumer: %s started...\n", consumer)
+ time.Sleep(10 * time.Minute)
+}
diff --git a/src/client/client.go b/src/client/client.go
deleted file mode 100644
index c4a9a59..0000000
--- a/src/client/client.go
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package client
-
-func Version() (version string){
- return GetVersion()
-}
-
-type Message interface {
-}
-type MessageExt interface {
-}
-type Producer interface {
-}
-type PushConsumer interface {
-}
\ No newline at end of file
diff --git a/src/client/consume.go b/src/client/consume.go
deleted file mode 100644
index 66aa2a6..0000000
--- a/src/client/consume.go
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package client
-
-//#cgo CFLAGS: -I/usr/local/include/rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
-//#include "CMessageExt.h"
-//#include "CPushConsumer.h"
-import "C"
-
-//export ConsumeMessageCallback
-func ConsumeMessageCallback(consumer *C.struct_CPushConsumer,msg *C.struct_CMessageExt) (C.int) {
- return C.int(ConsumeMessageInner(consumer,msg))
-}
diff --git a/src/client/consumestatus.go b/src/client/consumestatus.go
deleted file mode 100644
index 55fd015..0000000
--- a/src/client/consumestatus.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package client
-
-type ConsumeStatus int
-
-const (
- ConsumeSuccess ConsumeStatus = iota // value --> 0
- ReConsumeLater // value --> 1
-)
-func (status ConsumeStatus) String() string {
- switch status {
- case ConsumeSuccess:
- return "ConsumeSuccess"
- case ReConsumeLater:
- return "ReConsumeLater"
- default:
- return "Unknown"
- }
-}
\ No newline at end of file
diff --git a/src/client/messageExt.go b/src/client/messageExt.go
deleted file mode 100644
index bccea21..0000000
--- a/src/client/messageExt.go
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package client
-
-//#cgo CFLAGS: -I/usr/local/include/rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
-//#include "CMessageExt.h"
-import "C"
-
-//type MessageExt C.struct_CMessageExt
-
-func GetMessageTopic(msg MessageExt)(topic string){
- topic = C.GoString(C.GetMessageTopic(msg.(*C.struct_CMessageExt)))
- return
-}
-func GetMessageTags(msg MessageExt)(tags string){
- tags = C.GoString(C.GetMessageTags(msg.(*C.struct_CMessageExt)))
- return
-}
-func GetMessageKeys(msg MessageExt)(keys string){
- keys = C.GoString(C.GetMessageKeys(msg.(*C.struct_CMessageExt)))
- return
-}
-func GetMessageBody(msg MessageExt)(body string){
- body = C.GoString(C.GetMessageBody(msg.(*C.struct_CMessageExt)))
- return
-}
-func GetMessageProperty(msg MessageExt,key string)(value string){
- value = C.GoString(C.GetMessageProperty(msg.(*C.struct_CMessageExt),C.CString(key)))
- return
-}
-func GetMessageId(msg MessageExt)(msgId string){
- msgId = C.GoString(C.GetMessageId(msg.(*C.struct_CMessageExt)))
- return
-}
diff --git a/src/client/producer.go b/src/client/producer.go
deleted file mode 100644
index 95b2d66..0000000
--- a/src/client/producer.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package client
-
-//#cgo CFLAGS: -I/usr/local/include/rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
-//#include "CMessage.h"
-//#include "CProducer.h"
-import "C"
-
-//type Producer C.struct_CProducer
-//type Message C.struct_CMessage
-//type SendResult C.struct_CSendResult
-//type PushConsumer C.struct_CPushConsumer
-//type MessageExt C.struct_CMessageExt
-
-func CreateMessage(topic string)(msg Message){
- msg = C.CreateMessage(C.CString(topic))
- return msg;
-}
-func DestroyMessage(msg Message){
- C.DestroyMessage(msg.(*C.struct_CMessage))
-}
-func SetMessageKeys(msg Message,keys string)(int){
- return int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
-}
-func SetMessageBody(msg Message,body string)(int){
- return int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
-}
-func CreateProducer(groupId string)(producer Producer){
- producer = C.CreateProducer(C.CString(groupId))
- return producer;
-}
-func DestroyProducer(producer Producer){
- C.DestroyProducer(producer.(*C.struct_CProducer))
-}
-func StartProducer(producer Producer)(int){
- return int(C.StartProducer(producer.(*C.struct_CProducer)))
-}
-func ShutdownProducer(producer Producer)(int){
- return int(C.ShutdownProducer(producer.(*C.struct_CProducer)))
-}
-func SetProducerNameServerAddress(producer Producer, nameServer string)(int){
- return int(C.SetProducerNameServerAddress(producer.(*C.struct_CProducer),C.CString(nameServer)))
-}
-func SetProducerSessionCredentials(producer Producer, accessKey string, secretKey string, channel string) (int) {
- ret := C.SetProducerSessionCredentials(producer.(*C.struct_CProducer),
- C.CString(accessKey),
- C.CString(secretKey),
- C.CString(channel))
- return int(ret)
-}
-func SendMessageSync(producer Producer, msg Message)(sendResult SendResult){
- var sr C.struct__SendResult_
- C.SendMessageSync(producer.(*C.struct_CProducer),msg.(*C.struct_CMessage),&sr)
- sendResult.Status = SendStatus(sr.sendStatus)
- sendResult.MsgId = C.GoString(&sr.msgId[0])
- sendResult.Offset = int64(sr.offset)
- return sendResult
-}
diff --git a/src/client/pushconsumer.go b/src/client/pushconsumer.go
deleted file mode 100644
index 9192862..0000000
--- a/src/client/pushconsumer.go
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package client
-
-//#cgo CFLAGS: -I/usr/local/include/rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
-//#include "CMessageExt.h"
-//#include "CPushConsumer.h"
-//extern int ConsumeMessageCallback(CPushConsumer *consumer,CMessageExt *msg);
-//int ConsumerMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg) {
-//return ConsumeMessageCallback(consumer,msg);
-//}
-//int SetConsumerMessageCallBackInner(CPushConsumer *consumer) {
-//return RegisterMessageCallback(consumer,ConsumerMessageCallBackInner);
-//}
-import "C"
-import "fmt"
-
-//type PushConsumer C.struct_CPushConsumer
-//type MessageExt C.struct_CMessageExt
-type Callback func(msg MessageExt) ConsumeStatus
-
-func CreatePushConsumer(groupId string) (consumer PushConsumer) {
- consumer = C.CreatePushConsumer(C.CString(groupId))
- return consumer;
-}
-func DestroyPushConsumer(consumer PushConsumer) {
- consumer = C.DestroyPushConsumer(consumer.(*C.struct_CPushConsumer))
- return
-}
-func StartPushConsumer(consumer PushConsumer) int {
- return int(C.StartPushConsumer(consumer.(*C.struct_CPushConsumer)))
-}
-func ShutdownPushConsumer(consumer PushConsumer) int {
- return int(C.ShutdownPushConsumer(consumer.(*C.struct_CPushConsumer)))
-}
-func SetPushConsumerGroupID(consumer PushConsumer, groupId string) (int) {
- return int(C.SetPushConsumerGroupID(consumer.(*C.struct_CPushConsumer), C.CString(groupId)))
-}
-func SetPushConsumerNameServerAddress(consumer PushConsumer, name string) (int) {
- return int(C.SetPushConsumerNameServerAddress(consumer.(*C.struct_CPushConsumer), C.CString(name)))
-}
-func SetPushConsumerThreadCount(consumer PushConsumer, count int) (int) {
- return int(C.SetPushConsumerThreadCount(consumer.(*C.struct_CPushConsumer), C.int(count)))
-}
-func SetPushConsumerMessageBatchMaxSize(consumer PushConsumer, size int) (int) {
- return int(C.SetPushConsumerMessageBatchMaxSize(consumer.(*C.struct_CPushConsumer), C.int(size)))
-}
-func SetPushConsumerInstanceName(consumer PushConsumer, name string) (int) {
- return int(C.SetPushConsumerInstanceName(consumer.(*C.struct_CPushConsumer), C.CString(name)))
-}
-func SetPushConsumerSessionCredentials(consumer PushConsumer, accessKey string, secretKey string, channel string) (int) {
- ret := C.SetPushConsumerSessionCredentials(consumer.(*C.struct_CPushConsumer),
- C.CString(accessKey),
- C.CString(secretKey),
- C.CString(channel))
- return int(ret)
-}
-
-func Subscribe(consumer PushConsumer, topic string, expression string) (int) {
- return int(C.Subscribe(consumer.(*C.struct_CPushConsumer), C.CString(topic), C.CString(expression)))
-}
-
-var CallBackMap map[PushConsumer]Callback = map[PushConsumer]Callback{}
-
-func RegisterMessageCallback(consumer PushConsumer, callback Callback) (int) {
- CallBackMap[consumer] = callback
- ret := C.SetConsumerMessageCallBackInner(consumer.(*C.struct_CPushConsumer))
- return int(ret)
-}
-func ConsumeMessageInner(consumer PushConsumer, msg MessageExt) (ConsumeStatus) {
- fmt.Println("ConsumeMessageInner")
- callback,ok := CallBackMap[consumer]
- if ok {
- return callback(msg)
- }
- return ReConsumeLater
-}
diff --git a/src/client/sendresult.go b/src/client/sendresult.go
deleted file mode 100644
index 2dc0c73..0000000
--- a/src/client/sendresult.go
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package client
-
-type SendStatus int
-
-const (
- SendOK SendStatus = iota // value --> 0
- SendFlushDiskTimeout // value --> 1
- SendFlushSlaveTimeout // value --> 2
- SendSlaveNotAvailable // value --> 3
-)
-func (status SendStatus) String() string {
- switch status {
- case SendOK:
- return "SendOK"
- case SendFlushDiskTimeout:
- return "SendFlushDiskTimeout"
- case SendFlushSlaveTimeout:
- return "SendFlushSlaveTimeout"
- case SendSlaveNotAvailable:
- return "SendSlaveNotAvailable"
- default:
- return "Unknown"
- }
-}
-type SendResult struct {
- Status SendStatus
- MsgId string
- Offset int64
-}
\ No newline at end of file
diff --git a/src/sample/main.go b/src/sample/main.go
deleted file mode 100644
index eb868b8..0000000
--- a/src/sample/main.go
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import "fmt"
-
-func main() {
- fmt.Println("Start Sample Main Function......")
- SampleVersion()
- SampleSendMessage()
- SamplePushConsumeMessage()
- fmt.Scan()
-}
diff --git a/src/sample/sample_common.go b/src/sample/sample_common.go
deleted file mode 100644
index 461199b..0000000
--- a/src/sample/sample_common.go
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import "fmt"
-import "../client"
-
-func SampleVersion() {
- fmt.Println("Version :", client.GetVersion())
-}
\ No newline at end of file
diff --git a/src/sample/sample_producer.go b/src/sample/sample_producer.go
deleted file mode 100644
index 336812c..0000000
--- a/src/sample/sample_producer.go
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import "fmt"
-import "../client"
-
-func SampleSendMessage() {
- fmt.Println("Start Send Message..")
- namesvr := "172.17.0.2:9876"
- topic := "T_TestTopic"
- keys := "testKeys"
- body := "testBody"
- //Create Producer
- producer := client.CreateProducer("testGroupId")
- fmt.Println("Create Producer")
- client.SetProducerNameServerAddress(producer, namesvr)
- fmt.Println("Set Producer Nameserver:", namesvr)
- client.StartProducer(producer)
- fmt.Println("Start Producer")
-
- for i := 1; i <= 10; i++ {
- //Create Message
- msg := client.CreateMessage(topic)
- fmt.Println("Create Message, Topic:", topic)
- client.SetMessageKeys(msg, keys)
- fmt.Println("Set Message Keys:", keys)
- client.SetMessageBody(msg, body)
- fmt.Println("Set Message body:", body)
-
- sendresult := client.SendMessageSync(producer, msg)
- fmt.Println("Send Message OK")
- fmt.Println("SendStatus:", sendresult.Status)
- fmt.Println("SendResult:", sendresult)
- client.DestroyMessage(msg)
- }
- client.ShutdownProducer(producer)
- client.DestroyProducer(producer)
-}
diff --git a/src/sample/sample_pushconsumer.go b/src/sample/sample_pushconsumer.go
deleted file mode 100644
index e70dd75..0000000
--- a/src/sample/sample_pushconsumer.go
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import "fmt"
-import "../client"
-
-func SampleConsumeMessage(msg client.MessageExt) (client.ConsumeStatus) {
- fmt.Println("ConsumeMessageInSample")
- fmt.Println("Message topic",client.GetMessageTopic(msg))
- fmt.Println("MessageId",client.GetMessageId(msg))
- return client.ConsumeSuccess
-}
-
-func SamplePushConsumeMessage() {
- fmt.Println("Start Send Message..")
- namesvr := "172.17.0.2:9876"
- topic := "T_TestTopic"
- expression := "*"
- //Create Producer
- consumer := client.CreatePushConsumer("testGroupId")
- fmt.Println("Create Push Consumer")
- client.SetPushConsumerNameServerAddress(consumer, namesvr)
- fmt.Println("Set Push Consumer Nameserver:", namesvr)
-
- client.Subscribe(consumer, topic, expression)
- fmt.Println("Set Push Consumer Subscribe,Topic:", topic," Exp:", expression)
-
- client.RegisterMessageCallback(consumer,SampleConsumeMessage)
- client.StartPushConsumer(consumer)
- fmt.Println("Start Push Consumer")
- fmt.Scan()
- select{}
- client.ShutdownPushConsumer(consumer)
- client.DestroyPushConsumer(consumer)
-}
diff --git a/src/test/messageExt_test.go b/src/test/messageExt_test.go
deleted file mode 100644
index a241c28..0000000
--- a/src/test/messageExt_test.go
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package client_test
-
-import (
- "fmt"
- "testing"
- "../client"
-)
-
-func TestGetMessageTopic(test *testing.T){
- fmt.Println("-----TestGetMessageTopic Start----")
- msg := client.CreateMessage("testTopic")
- client.DestroyMessage(msg)
- fmt.Println("-----TestGetMessageTopic Finish----")
-}
diff --git a/src/test/producer_test.go b/src/test/producer_test.go
deleted file mode 100644
index 66e2caf..0000000
--- a/src/test/producer_test.go
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package client_test
-
-import "fmt"
-import "testing"
-import "../client"
-
-func TestCreateMessage(test *testing.T){
- fmt.Println("-----TestCreateMessage Start----")
- client.CreateMessage("testTopic")
- fmt.Println("-----TestCreateMessage Finish----")
-}
-func TestDestroyMessage(test *testing.T){
- fmt.Println("-----TestCreateMessage Start----")
- msg := client.CreateMessage("testTopic")
- client.DestroyMessage(msg)
- fmt.Println("-----TestCreateMessage Finish----")
-}
-func TestSetMessageKeys(test *testing.T){
- fmt.Println("-----TestSetMessageKeys Start----")
- msg := client.CreateMessage("testTopic")
- len := client.SetMessageKeys(msg,"testKey")
- fmt.Println("Len:",len)
- client.DestroyMessage(msg)
- fmt.Println("-----TestCreateMessage Finish----")
-}
-func TestCreateProducer(test *testing.T){
- fmt.Println("-----TestCreateProducer Start----")
- client.CreateProducer("testGroupId")
- fmt.Println("-----TestCreateProducer Finish----")
-}
diff --git a/src/test/pushconsumer_test.go b/src/test/pushconsumer_test.go
deleted file mode 100644
index 7083713..0000000
--- a/src/test/pushconsumer_test.go
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package client_test
-
-import "fmt"
-import "testing"
-import "../client"
-
-func TestCreatePushConsumer(test *testing.T){
- fmt.Println("-----TestCreateProducer Start----")
- consumer := client.CreatePushConsumer("testGroupId")
- client.DestroyPushConsumer(consumer)
- fmt.Println("-----TestCreateProducer Finish----")
-}