Merge pull request #198 from ni-ze/supportRsqldb

fix(left join) fix left join
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index b23bc93..b16002a 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -200,15 +200,18 @@
         int index = 1;
         if (WindowJoinType.left.name().equalsIgnoreCase(routeLabel) && rows.size() > 0) {
             //left join(左联接):返回左表中的所有记录以及和右表中的联接字段相等的记录。
+            JSONObject object = messageBody.clone();
+            result.add(object);
+
             for (Map<String, Object> raw : rows) {
-                JSONObject object = messageBody.clone();
+
                 String shuffleTarget = (String) raw.get(SHUFFLE_KEY);
                 if (shuffleTarget != null && shuffleTarget.equals(shuffleValue)) {
+                    //join后覆盖object中的字段
                     object.fluentPutAll(addAsName(raw, rightAsName));
+                    object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+                    index++;
                 }
-                object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
-                index++;
-                result.add(object);
             }
         } else if (WindowJoinType.left.name().equalsIgnoreCase(routeLabel) && rows.size() <= 0) {
             JSONObject object = messageBody.clone();
@@ -217,17 +220,17 @@
         } else if (WindowJoinType.right.name().equalsIgnoreCase(routeLabel) && rows.size() > 0) {
             //right join(右联接):返回右表中的所有记录以及和左表中的联接字段相等的记录。
             messageBody = addAsName(messageBody, rightAsName);
+            JSONObject object = messageBody.clone();
             for (Map<String, Object> raw : rows) {
-                JSONObject object = messageBody.clone();
 
                 String shuffleTarget = (String) raw.get(SHUFFLE_KEY);
                 if (shuffleTarget != null && shuffleTarget.equals(shuffleValue)) {
                     object.fluentPutAll(raw);
+                    object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+                    //只有匹配上了才会输出
+                    result.add(object);
+                    index++;
                 }
-
-                object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
-                index++;
-                result.add(object);
             }
         }
         return result;