[ISSUE #199]Support rsqldb (#200)

* feat(nest join) support nest join

* maintain(example) remove MqttSourceExample
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
index dcb7abb..7586d3e 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
@@ -26,6 +26,7 @@
 import org.apache.rocketmq.streams.common.channel.sinkcache.DataSourceAutoFlushTask;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.schedule.ScheduleManager;
 import org.apache.rocketmq.streams.common.schedule.ScheduleTask;
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
index 0b10543..70d9310 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
@@ -99,6 +99,8 @@
 
     protected String msgRouteFromLable;//消息从哪里来的标签,标记上游节点的标记,主要是通过build table name来标记
 
+    private String originTable;
+
     protected String logFingerprintValue;//日志指纹的值
 
     public MessageHeader copy() {
@@ -359,4 +361,12 @@
     public void setPipelineName(String pipelineName) {
         this.pipelineName = pipelineName;
     }
+
+    public String getOriginTable() {
+        return originTable;
+    }
+
+    public void setOriginTable(String originTable) {
+        this.originTable = originTable;
+    }
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
index 9641604..8774369 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
@@ -290,6 +290,7 @@
             //boolean needFlush = needFlush(msg);
             if (StringUtil.isNotEmpty(oriMsgPrewSourceName)) {
                 msg.getHeader().setMsgRouteFromLable(oriMsgPrewSourceName);
+                msg.getHeader().setOriginTable(oriMsgPrewSourceName);
             }
             boolean isContinue = executeStage(stage, msg, copyContext);
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
index 9cb9df3..cf4b9e9 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
@@ -44,8 +44,14 @@
         @Override
         protected IMessage doProcess(IMessage message, AbstractContext context) {
             String lable = message.getHeader().getMsgRouteFromLable();
+            String originTable = message.getHeader().getOriginTable();
+
             String joinFlag = null;
             if (lable != null) {
+                if ((lable.equals("left") || lable.equals("right")) && originTable != null) {
+                    lable = originTable;
+                }
+
                 if (lable.equals(rightDependentTableName)) {
                     joinFlag = MessageHeader.JOIN_RIGHT;
                 } else {
@@ -61,9 +67,7 @@
             } else {
                 rightPipeline.doMessage(message, context);
             }
-            //if(!MessageGloableTrace.existFinshBranch(message)){
-            //    context.setBreak(true);
-            //}
+
             context.breakExecute();
             return message;
         }
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java
deleted file mode 100644
index 33b8b0a..0000000
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.examples.source;
-
-import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.client.StreamBuilder;
-import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.client.strategy.ShuffleStrategy;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-
-public class MqttSourceExample {
-
-    public static void main(String[] args) {
-        DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
-        dataStream.fromMqtt("xxxxx", "xxxx", "xxxxxx", "", "")
-            .flatMap(message -> ((JSONObject) message).getJSONArray("Data"))
-            .window(TumblingWindow.of(Time.minutes(1)))
-            .groupBy("AttributeCode")
-            .setLocalStorageOnly(true)
-            .avg("Value", "avg_value")
-            .toDataStream()
-            .toPrint()
-            .with(ShuffleStrategy.shuffleWithMemory())
-            .start();
-    }
-
-}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index b16002a..eea5986 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -102,7 +102,6 @@
 
             if (WindowJoinType.left.name().equalsIgnoreCase(routeLabel)) {
                 storage.putWindowBaseValue(queueId, windowInstanceId, WindowType.JOIN_WINDOW, WindowJoinType.left, temp);
-
             } else if (WindowJoinType.right.name().equalsIgnoreCase(routeLabel)) {
                 storage.putWindowBaseValue(queueId, windowInstanceId, WindowType.JOIN_WINDOW, WindowJoinType.right, temp);
             } else {
@@ -490,6 +489,22 @@
         if (needFlush) {
             nextMessage.getHeader().setNeedFlush(true);
         }
+
+        String routeLabel = nextMessage.getHeader().getMsgRouteFromLable();
+        if (routeLabel == null) {
+            //嵌套join,内部join后没有routeLabel,需要设置结果的routeLabel
+            String configureName = this.getConfigureName();
+            String[] tempList = configureName.split("_");
+            for (int i = tempList.length -1; i > 0; i--) {
+                if ("left".equalsIgnoreCase(tempList[i]) || "right".equalsIgnoreCase(tempList[i])) {
+                    routeLabel = tempList[i];
+                    System.out.println("nested join, routeLabel=" + routeLabel);
+                    break;
+                }
+            }
+            nextMessage.getHeader().setMsgRouteFromLable(routeLabel);
+        }
+
         AbstractContext context = new Context(nextMessage);
         boolean isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
         if (isWindowTest) {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index bad436f..7b8175c 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -229,6 +229,11 @@
         for (String splitId : splitIds) {
              this.loadResult.put(splitId, future);
         }
+
+        if (message.getHeader().isSystemMessage() && window.getFireReceiver() == null) {
+            return;
+        }
+
         window.getFireReceiver().doMessage(message, context);
     }