blob: 46ac5abe400e3faa3b08e2c18ef51f0c98d0c947 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.samples.springboot;
import com.alibaba.fastjson.TypeReference;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Resource;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.samples.springboot.domain.OrderPaidEvent;
import org.apache.rocketmq.samples.springboot.domain.ProductWithPayload;
import org.apache.rocketmq.samples.springboot.domain.User;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalRequestCallback;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
/**
* Producer, using RocketMQTemplate sends a variety of messages
*/
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${demo.rocketmq.transTopic}")
private String springTransTopic;
@Value("${demo.rocketmq.topic}")
private String springTopic;
@Value("${demo.rocketmq.topic.user}")
private String userTopic;
@Value("${demo.rocketmq.orderTopic}")
private String orderPaidTopic;
@Value("${demo.rocketmq.msgExtTopic}")
private String msgExtTopic;
@Value("${demo.rocketmq.stringRequestTopic}")
private String stringRequestTopic;
@Value("${demo.rocketmq.bytesRequestTopic}")
private String bytesRequestTopic;
@Value("${demo.rocketmq.objectRequestTopic}")
private String objectRequestTopic;
@Value("${demo.rocketmq.genericRequestTopic}")
private String genericRequestTopic;
@Resource(name = "extRocketMQTemplate")
private RocketMQTemplate extRocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// Send string
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
sendResult = rocketMQTemplate.syncSend(userTopic, new User().setUserAge((byte) 18).setUserName("Kitty"));
System.out.printf("syncSend1 to topic %s sendResult=%s %n", userTopic, sendResult);
sendResult = rocketMQTemplate.syncSend(userTopic, MessageBuilder.withPayload(
new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
System.out.printf("syncSend1 to topic %s sendResult=%s %n", userTopic, sendResult);
// Use the extRocketMQTemplate
sendResult = extRocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World!2222".getBytes()).build());
System.out.printf("extRocketMQTemplate.syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
// Send string with spring Message
sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
System.out.printf("syncSend2 to topic %s sendResult=%s %n", springTopic, sendResult);
// Send user-defined object
rocketMQTemplate.asyncSend(orderPaidTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
@Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
});
// Send message with special tag
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag0", "I'm from tag0"); // tag0 will not be consumer-selected
System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag0");
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag1");
// Send a batch of strings
testBatchMessages();
// Send transactional messages using rocketMQTemplate
testRocketMQTemplateTransaction();
// Send transactional messages using extRocketMQTemplate
testExtRocketMQTemplateTransaction();
// Send request in sync mode and receive a reply of String type.
String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
System.out.printf("send %s and receive %s %n", "request string", replyString);
// Send request in sync mode with timeout parameter and receive a reply of byte[] type.
byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, MessageBuilder.withPayload("request byte[]").build(), byte[].class, 3000);
System.out.printf("send %s and receive %s %n", "request byte[]", new String(replyBytes));
// Send request in sync mode with hashKey parameter and receive a reply of User type.
User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");
User replyUser = rocketMQTemplate.sendAndReceive(objectRequestTopic, requestUser, User.class, "order-id");
System.out.printf("send %s and receive %s %n", requestUser, replyUser);
// Send request in sync mode with timeout and delayLevel parameter parameter and receive a reply of generic type.
ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(genericRequestTopic, "request generic",
new TypeReference<ProductWithPayload<String>>() {
}.getType(), 30000, 2);
System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);
// Send request in async mode and receive a reply of String type.
rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RocketMQLocalRequestCallback<String>() {
@Override public void onSuccess(String message) {
System.out.printf("send %s and receive %s %n", "request string", message);
}
@Override public void onException(Throwable e) {
e.printStackTrace();
}
});
// Send request in async mode and receive a reply of User type.
rocketMQTemplate.sendAndReceive(objectRequestTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {
@Override public void onSuccess(User message) {
System.out.printf("send user object and receive %s %n", message.toString());
}
@Override public void onException(Throwable e) {
e.printStackTrace();
}
}, 5000);
}
private void testBatchMessages() {
List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
}
SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
System.out.printf("--- Batch messages send result :" + sr);
}
private void testRocketMQTemplateTransaction() throws MessagingException {
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg = MessageBuilder.withPayload("rocketMQTemplate transactional message " + i).
setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
springTransTopic + ":" + tags[i % tags.length], msg, null);
System.out.printf("------rocketMQTemplate send Transactional msg body = %s , sendResult=%s %n",
msg.getPayload(), sendResult.getSendStatus());
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void testExtRocketMQTemplateTransaction() throws MessagingException {
for (int i = 0; i < 10; i++) {
try {
Message msg = MessageBuilder.withPayload("extRocketMQTemplate transactional message " + i).
setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
SendResult sendResult = extRocketMQTemplate.sendMessageInTransaction(
springTransTopic, msg, null);
System.out.printf("------ExtRocketMQTemplate send Transactional msg body = %s , sendResult=%s %n",
msg.getPayload(), sendResult.getSendStatus());
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
transId);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
// Return local transaction with success(commit), in this case,
// this message will not be checked in checkLocalTransaction()
System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
// Return local transaction with failure(rollback) , in this case,
// this message will not be checked in checkLocalTransaction()
System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
retState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
retState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
retState = RocketMQLocalTransactionState.ROLLBACK;
break;
}
}
System.out.printf("------ !!! checkLocalTransaction is executed once," +
" msgTransactionId=%s, TransactionState=%s status=%s %n",
transId, retState, status);
return retState;
}
}
@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
class ExtTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.printf("ExtTransactionListenerImpl executeLocalTransaction and return UNKNOWN. \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.printf("ExtTransactionListenerImpl checkLocalTransaction and return COMMIT. \n");
return RocketMQLocalTransactionState.COMMIT;
}
}
}