[ISSUE #506] support send message with arbitrarily delay time (#515)

* support sync send message with arbitrarily delay time.

* support sync send message with arbitrarily delay time.

* fix check style error.
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index f1f7886..8b444e3 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -31,6 +31,7 @@
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.support.DelayMode;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
 import org.slf4j.Logger;
@@ -536,6 +537,77 @@
     }
 
     /**
+     * Same to {@link #syncSend(String, Message)} with send delay time specified in addition.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
+     * @param delayTime delay time in seconds for message
+     * @return {@link SendResult}
+     */
+    public SendResult syncSendDelayTimeSeconds(String destination, Message<?> message, long delayTime) {
+        return syncSend(destination, message, producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_SECONDS);
+    }
+
+    /**
+     * Same to {@link #syncSend(String, Object)} with send delayTime specified in addition.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param payload the Object to use as payload
+     * @param delayTime delay time in seconds for message
+     * @return {@link SendResult}
+     */
+    public SendResult syncSendDelayTimeSeconds(String destination, Object payload, long delayTime) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return syncSend(destination, message, producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_SECONDS);
+    }
+
+    /**
+     * Same to {@link #syncSend(String, Message)} with send timeout and delay time specified in addition.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
+     * @param timeout send timeout with millis
+     * @param delayTime delay time for message
+     * @return {@link SendResult}
+     */
+    public SendResult syncSend(String destination, Message<?> message, long timeout, long delayTime, DelayMode mode) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("syncSend failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+        try {
+            long now = System.currentTimeMillis();
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
+            if (delayTime > 0 && Objects.nonNull(mode)) {
+                switch (mode) {
+                    case DELAY_SECONDS:
+                        rocketMsg.setDelayTimeSec(delayTime);
+                        break;
+                    case DELAY_MILLISECONDS:
+                        rocketMsg.setDelayTimeMs(delayTime);
+                        break;
+                    case DELIVER_TIME_MILLISECONDS:
+                        rocketMsg.setDeliverTimeMs(delayTime);
+                        break;
+                    default:
+                        log.warn("delay mode: {} not support", mode);
+                }
+            }
+            SendResult sendResult = producer.send(rocketMsg, timeout);
+            long costTime = System.currentTimeMillis() - now;
+            if (log.isDebugEnabled()) {
+                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            }
+            return sendResult;
+        } catch (Exception e) {
+            log.error("syncSend failed. destination:{}, message:{}, detail exception info: ", destination, message, e);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+
+
+    /**
      * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DelayMode.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DelayMode.java
new file mode 100644
index 0000000..ff70c89
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DelayMode.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.support;
+
+public enum DelayMode {
+    DELAY_SECONDS,
+    DELAY_MILLISECONDS,
+    DELIVER_TIME_MILLISECONDS,
+}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index 49ce039..256345a 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -94,6 +94,12 @@
         } catch (MessagingException e) {
             assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed");
         }
+
+        try {
+            rocketMQTemplate.syncSendDelayTimeSeconds(topic, "payload", 10L);
+        } catch (MessagingException e) {
+            assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed");
+        }
     }
     @Test
     public void testAsyncBatchSendMessage() {