to#550 fix async send message example (#551)
diff --git a/docs/02-producer/02message1.md b/docs/02-producer/02message1.md
index 2882d03..a8ec08f 100644
--- a/docs/02-producer/02message1.md
+++ b/docs/02-producer/02message1.md
@@ -114,26 +114,37 @@
// 启动producer
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
- for (int i = 0; i < 100; i++) {
- final int index = i;
- // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
- Message msg = new Message("TopicTest",
- "TagA",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
- // 异步发送消息, 发送结果通过callback返回给客户端
- producer.send(msg, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.printf("%-10d OK %s %n", index,
- sendResult.getMsgId());
+ int messageCount = 100;
+ final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
+ for (int i = 0; i < messageCount; i++) {
+ try {
+ final int index = i;
+ // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ // 异步发送消息, 发送结果通过callback返回给客户端
+ producer.send(msg, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ System.out.printf("%-10d OK %s %n", index,
+ sendResult.getMsgId());
+ countDownLatch.countDown();
+ }
+ @Override
+ public void onException(Throwable e) {
+ System.out.printf("%-10d Exception %s %n", index, e);
+ e.printStackTrace();
+ countDownLatch.countDown();
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ countDownLatch.countDown();
}
- @Override
- public void onException(Throwable e) {
- System.out.printf("%-10d Exception %s %n", index, e);
- e.printStackTrace();
- }
- });
}
+ //异步发送,如果要求可靠传输,必须要等回调接口返回明确结果后才能结束逻辑,否则立即关闭Producer可能导致部分消息尚未传输成功
+ countDownLatch.await(5, TimeUnit.SECONDS);
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
diff --git a/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md b/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md
index eddb627..e09baa5 100644
--- a/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md
+++ b/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md
@@ -109,26 +109,37 @@
// Start Producer
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
- for (int i = 0; i < 100; i++) {
- final int index = i;
- // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.
- Message msg = new Message("TopicTest",
- "TagA",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
- // Send a message asynchronously, the result is returned to the client by callback
- producer.send(msg, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.printf("%-10d OK %s %n", index,
- sendResult.getMsgId());
+ int messageCount = 100;
+ final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
+ for (int i = 0; i < messageCount; i++) {
+ try {
+ final int index = i;
+ // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ // Send a message asynchronously, the result is returned to the client by callback
+ producer.send(msg, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ System.out.printf("%-10d OK %s %n", index,
+ sendResult.getMsgId());
+ countDownLatch.countDown();
+ }
+ @Override
+ public void onException(Throwable e) {
+ System.out.printf("%-10d Exception %s %n", index, e);
+ e.printStackTrace();
+ countDownLatch.countDown();
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ countDownLatch.countDown();
}
- @Override
- public void onException(Throwable e) {
- System.out.printf("%-10d Exception %s %n", index, e);
- e.printStackTrace();
- }
- });
}
+ //If reliable transmission is required for asynchronous sending, the logic must not be terminated until a clear result is returned from the callback interface. Otherwise, closing the Producer immediately may result in some messages not being successfully transmitted.
+ countDownLatch.await(5, TimeUnit.SECONDS);
// Close the producer once it is no longer in use
producer.shutdown();
}