Merge pull request #13 from wenfengwang/master
Implementing SendMessageOneway
diff --git a/core/api.go b/core/api.go
index 8ad6af0..58c1465 100644
--- a/core/api.go
+++ b/core/api.go
@@ -32,6 +32,22 @@
LogC *LogConfig
}
+func (config *clientConfig) string() string {
+ // For security, don't print Credentials.
+ str := ""
+ str = strJoin(str, "GroupId", config.GroupID)
+ str = strJoin(str, "NameServer", config.NameServer)
+ str = strJoin(str, "NameServerDomain", config.NameServerDomain)
+ str = strJoin(str, "GroupName", config.GroupName)
+ str = strJoin(str, "InstanceName", config.InstanceName)
+
+ if config.LogC != nil {
+ str = strJoin(str, "LogConfig", config.LogC.String())
+ }
+
+ return str
+}
+
// NewProducer create a new producer with config
func NewProducer(config *ProducerConfig) (Producer, error) {
return newDefaultProducer(config)
@@ -46,11 +62,21 @@
}
func (config *ProducerConfig) String() string {
- // For security, don't print Credentials default.
- return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, NameServer: %s, "+
- "SendMsgTimeout: %d, CompressLevel: %d, MaxMessageSize: %d, ]", config.NameServer, config.GroupID,
- config.NameServerDomain, config.GroupName, config.InstanceName, config.SendMsgTimeout, config.CompressLevel,
- config.MaxMessageSize)
+ str := "ProducerConfig=[" + config.clientConfig.string()
+
+ if config.SendMsgTimeout > 0 {
+ str = strJoin(str, "SendMsgTimeout", config.SendMsgTimeout)
+ }
+
+ if config.CompressLevel > 0 {
+ str = strJoin(str, "CompressLevel", config.CompressLevel)
+ }
+
+ if config.MaxMessageSize > 0 {
+ str = strJoin(str, "MaxMessageSize", config.MaxMessageSize)
+ }
+
+ return str + "]"
}
type Producer interface {
@@ -61,8 +87,8 @@
// SendMessageOrderly send the message orderly
SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult
- // SendMessageAsync send a message with async
- SendMessageAsync(msg *Message)
+ // SendMessageOneway send a message with oneway
+ SendMessageOneway(msg *Message)
}
// NewPushConsumer create a new consumer with config.
@@ -70,6 +96,24 @@
return newPushConsumer(config)
}
+type MessageModel int
+
+const (
+ BroadCasting = MessageModel(1)
+ Clustering = MessageModel(2)
+)
+
+func (mode MessageModel) String() string {
+ switch mode {
+ case BroadCasting:
+ return "BroadCasting"
+ case Clustering:
+ return "Clustering"
+ default:
+ return "Unknown"
+ }
+}
+
// PushConsumerConfig define a new consumer.
type PushConsumerConfig struct {
clientConfig
@@ -79,9 +123,22 @@
}
func (config *PushConsumerConfig) String() string {
- return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, "+
- "ThreadCount: %d, MessageBatchMaxSize: %d, Model: %v ]", config.NameServer, config.GroupID,
- config.NameServerDomain, config.InstanceName, config.ThreadCount, config.MessageBatchMaxSize, config.Model)
+ // For security, don't print Credentials.
+ str := "PushConsumerConfig=[" + config.clientConfig.string()
+
+ if config.ThreadCount > 0 {
+ str = strJoin(str, "ThreadCount", config.ThreadCount)
+ }
+
+ if config.MessageBatchMaxSize > 0 {
+ str = strJoin(str, "MessageBatchMaxSize", config.MessageBatchMaxSize)
+ }
+
+ if config.Model != 0 {
+ str = strJoin(str, "MessageModel", config.Model.String())
+ }
+
+ return str + "]"
}
type PushConsumer interface {
diff --git a/core/api_test.go b/core/api_test.go
index 05aa6cf..fc507f0 100644
--- a/core/api_test.go
+++ b/core/api_test.go
@@ -17,13 +17,61 @@
package rocketmq
import (
- "fmt"
+ "github.com/stretchr/testify/assert"
"testing"
)
-func TestVersion(test *testing.T) {
- fmt.Println("-----TestGetVersion Start----")
- version := Version()
- fmt.Println(version)
- fmt.Println("-----TestGetVersion Finish----")
+func TestProducerConfig_String(t *testing.T) {
+ pConfig := ProducerConfig{}
+ pConfig.GroupID = "testGroup"
+ pConfig.NameServer = "localhost:9876"
+ pConfig.NameServerDomain = "domain1"
+ pConfig.GroupName = "producerGroupName"
+ pConfig.InstanceName = "testProducer"
+ pConfig.LogC = &LogConfig{
+ Path: "/rocketmq/log",
+ FileNum: 16,
+ FileSize: 1 << 20,
+ Level: LogLevelDebug}
+ pConfig.SendMsgTimeout = 30
+ pConfig.CompressLevel = 4
+ pConfig.MaxMessageSize = 1024
+
+ expect := "ProducerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: NameServerDomain, " +
+ "GroupId: testGroup, InstanceName: testProducer, " +
+ "LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, S" +
+ "endMsgTimeout: 30, CompressLevel: 4, MaxMessageSize: 1024, ]"
+ assert.Equal(t, expect, pConfig.String())
+}
+
+func TestPushConsumerConfig_String(t *testing.T) {
+ pcConfig := PushConsumerConfig{}
+ pcConfig.GroupID = "testGroup"
+ pcConfig.NameServer = "localhost:9876"
+ pcConfig.GroupName = "consumerGroupName"
+ pcConfig.InstanceName = "testPushConsumer"
+ pcConfig.LogC = &LogConfig{
+ Path: "/rocketmq/log",
+ FileNum: 16,
+ FileSize: 1 << 20,
+ Level: LogLevelDebug}
+ pcConfig.ThreadCount = 4
+ expect := "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, " +
+ "GroupName: consumerGroupName, InstanceName: testPushConsumer, " +
+ "LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, ThreadCount: 4, ]"
+ assert.Equal(t, expect, pcConfig.String())
+
+ pcConfig.NameServerDomain = "domain1"
+ expect = "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, " +
+ "GroupName: consumerGroupName, InstanceName: testPushConsumer, " +
+ "LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, ThreadCount: 4, ]"
+ assert.Equal(t, expect, pcConfig.String())
+
+ pcConfig.MessageBatchMaxSize = 32
+ pcConfig.Model = Clustering
+ expect = "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, " +
+ "GroupName: consumerGroupName, InstanceName: testPushConsumer, " +
+ "LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, ThreadCount: 4," +
+ " MessageBatchMaxSize: 32, MessageModel: Clustering, ]"
+ assert.Equal(t, expect, pcConfig.String())
}
diff --git a/core/log_test.go b/core/log_test.go
index 8c4a449..e2246f0 100644
--- a/core/log_test.go
+++ b/core/log_test.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package rocketmq
import (
diff --git a/core/producer.go b/core/producer.go
index 77ed63b..f402589 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -90,8 +90,7 @@
code = int(C.SetProducerNameServerAddress(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set NameServerAddress error, code is: %d"+
- "please check cpp logs for details", code))
+ return nil, fmt.Errorf("producer Set NameServerAddress error, code is: %d", code)
}
}
@@ -100,8 +99,7 @@
code = int(C.SetProducerNameServerDomain(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set NameServerDomain error, code is: %d"+
- "please check cpp logs for details", code))
+ return nil, fmt.Errorf("producer Set NameServerDomain error, code is: %d", code)
}
}
@@ -110,8 +108,7 @@
code = int(C.SetProducerInstanceName(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set InstanceName error, code is: %d"+
- "please check cpp logs for details", code))
+ return nil, fmt.Errorf("producer Set InstanceName error, code is: %d", code)
}
}
@@ -125,7 +122,7 @@
C.free(unsafe.Pointer(sk))
C.free(unsafe.Pointer(ch))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set Credentials error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set Credentials error, code is: %d", code)
}
}
@@ -134,38 +131,38 @@
code = int(C.SetProducerLogPath(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set LogPath error, code is: %d", code)
}
code = int(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set FileNumAndSize error, code is: %d", code)
}
code = int(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set LogLevel error, code is: %d", code)
}
}
if config.SendMsgTimeout > 0 {
code = int(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set SendMsgTimeout error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set SendMsgTimeout error, code is: %d", code)
}
}
if config.CompressLevel > 0 {
code = int(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set CompressLevel error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set CompressLevel error, code is: %d", code)
}
}
if config.MaxMessageSize > 0 {
code = int(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set MaxMessageSize error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set MaxMessageSize error, code is: %d", code)
}
}
@@ -186,7 +183,7 @@
func (p *defaultProducer) Start() error {
code := int(C.StartProducer(p.cproduer))
if code != 0 {
- return errors.New(fmt.Sprintf("start producer error, error code is: %d", code))
+ return fmt.Errorf("start producer error, error code is: %d", code)
}
return nil
}
@@ -247,6 +244,14 @@
}
}
-func (p *defaultProducer) SendMessageAsync(msg *Message) {
- // TODO
+func (p *defaultProducer) SendMessageOneway(msg *Message) {
+ cmsg := goMsgToC(msg)
+ defer C.DestroyMessage(cmsg)
+
+ code := int(C.SendMessageOneway(p.cproduer, cmsg))
+ if code != 0 {
+ log.Warnf("send message with oneway error, error code is: %d", code)
+ } else {
+ log.Debugf("Send Message: %s with oneway success.", msg.String())
+ }
}
diff --git a/core/push_consumer.go b/core/push_consumer.go
index 5f63b16..c39dc09 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -33,13 +33,11 @@
import (
"fmt"
"github.com/pkg/errors"
- "github.com/prometheus/common/log"
+ log "github.com/sirupsen/logrus"
"sync"
"unsafe"
)
-type MessageModel C.CMessageModel
-
type ConsumeStatus int
const (
@@ -100,7 +98,7 @@
code = int(C.SetPushConsumerNameServerAddress(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf(fmt.Sprintf("PushConsumer Set NameServerAddress error, code is: %d", code)))
+ return nil, fmt.Errorf("PushConsumer Set NameServerAddress error, code is: %d", code)
}
}
@@ -109,7 +107,7 @@
code = int(C.SetPushConsumerNameServerDomain(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set NameServerDomain error, code is: %d", code))
+ return nil, fmt.Errorf("PushConsumer Set NameServerDomain error, code is: %d", code)
}
}
@@ -118,8 +116,8 @@
code = int(C.SetPushConsumerInstanceName(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set InstanceName error, code is: %d, "+
- "please check cpp logs for details", code))
+ return nil, fmt.Errorf("PushConsumer Set InstanceName error, code is: %d, "+
+ "please check cpp logs for details", code)
}
}
@@ -132,53 +130,63 @@
C.free(unsafe.Pointer(sk))
C.free(unsafe.Pointer(ch))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set Credentials error, code is: %d", int(code)))
+ return nil, fmt.Errorf("PushConsumer Set Credentials error, code is: %d", int(code))
}
}
if config.LogC != nil {
cs = C.CString(config.LogC.Path)
- code = int(C.SetProducerLogPath(cconsumer, cs))
+ code = int(C.SetPushConsumerLogPath(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
+ return nil, fmt.Errorf("PushConsumer Set LogPath error, code is: %d", code)
}
- code = int(C.SetProducerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+ code = int(C.SetPushConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
+ return nil, fmt.Errorf("PushConsumer Set FileNumAndSize error, code is: %d", code)
}
- code = int(C.SetProducerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+ code = int(C.SetPushConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
+ return nil, fmt.Errorf("PushConsumer Set LogLevel error, code is: %d", code)
}
}
if config.ThreadCount > 0 {
code = int(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set ThreadCount error, code is: %d", int(code)))
+ return nil, fmt.Errorf("PushConsumer Set ThreadCount error, code is: %d", int(code))
}
}
if config.MessageBatchMaxSize > 0 {
code = int(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set MessageBatchMaxSize error, code is: %d", int(code)))
+ return nil, fmt.Errorf("PushConsumer Set MessageBatchMaxSize error, code is: %d", int(code))
}
}
- code = int(C.SetPushConsumerMessageModel(cconsumer, (C.CMessageModel)(config.Model)))
+ if config.Model != 0 {
+ var mode C.CMessageModel
+ switch config.Model {
+ case BroadCasting:
+ mode = C.BROADCASTING
+ case Clustering:
+ mode = C.CLUSTERING
+ }
+ code = int(C.SetPushConsumerMessageModel(cconsumer, mode))
- if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set ConsumerMessageModel error, code is: %d", int(code)))
+ if code != 0 {
+ return nil, fmt.Errorf("PushConsumer Set ConsumerMessageModel error, code is: %d", int(code))
+ }
+
}
code = int(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer RegisterMessageCallback error, code is: %d", int(code)))
+ return nil, fmt.Errorf("PushConsumer RegisterMessageCallback error, code is: %d", int(code))
}
consumer.cconsumer = cconsumer
@@ -189,7 +197,7 @@
func (c *defaultPushConsumer) Start() error {
code := C.StartPushConsumer(c.cconsumer)
if code != 0 {
- return errors.New(fmt.Sprintf("start PushConsumer error, code is: %d", int(code)))
+ return fmt.Errorf("start PushConsumer error, code is: %d", int(code))
}
return nil
}
@@ -215,7 +223,7 @@
}
code := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
if code != 0 {
- return errors.New(fmt.Sprintf("subscribe topic: %s failed, error code is: %d", topic, int(code)))
+ return fmt.Errorf("subscribe topic: %s failed, error code is: %d", topic, int(code))
}
c.funcsMap.Store(topic, consumeFunc)
log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
diff --git a/core/queue_selector.go b/core/queue_selector.go
index 311c378..7bf1927 100644
--- a/core/queue_selector.go
+++ b/core/queue_selector.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package rocketmq
import "C"
diff --git a/core/queue_selector_test.go b/core/queue_selector_test.go
index e9a68ee..74fff80 100644
--- a/core/queue_selector_test.go
+++ b/core/queue_selector_test.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package rocketmq
import (
diff --git a/core/utils.go b/core/utils.go
new file mode 100644
index 0000000..e9f83f1
--- /dev/null
+++ b/core/utils.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+import "fmt"
+
+func strJoin(str, key string, value interface{}) string {
+ if key == "" || value == "" {
+ return str
+ }
+
+ return str + key + ": " + fmt.Sprint(value) + ", "
+}
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
index 5559d87..f3d70c7 100644
--- a/examples/orderproducer/producer.go
+++ b/examples/orderproducer/producer.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package main
import (
diff --git a/examples/producer.go b/examples/producer.go
index 56ecee2..33eab07 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -20,13 +20,12 @@
import (
"fmt"
"github.com/apache/rocketmq-client-go/core"
- "time"
)
func main() {
cfg := &rocketmq.ProducerConfig{}
cfg.GroupID = "testGroup"
- cfg.NameServer = "47.101.55.250:9876"
+ cfg.NameServer = "localhost:9876"
producer, err := rocketmq.NewProducer(cfg)
if err != nil {
fmt.Println("create Producer failed, error:", err)
@@ -39,8 +38,8 @@
fmt.Printf("Producer: %s started... \n", producer)
for i := 0; i < 100; i++ {
msg := fmt.Sprintf("Hello RocketMQ-%d", i)
- result := producer.SendMessageSync(&rocketmq.Message{Topic: "wwf1", Body: msg})
+ result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
}
- time.Sleep(10 * time.Second)
+ fmt.Println("shutdown producer.")
}