Merge pull request #198 from ni-ze/supportRsqldb
fix(left join) fix left join
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index b23bc93..b16002a 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -200,15 +200,18 @@
int index = 1;
if (WindowJoinType.left.name().equalsIgnoreCase(routeLabel) && rows.size() > 0) {
//left join(左联接):返回左表中的所有记录以及和右表中的联接字段相等的记录。
+ JSONObject object = messageBody.clone();
+ result.add(object);
+
for (Map<String, Object> raw : rows) {
- JSONObject object = messageBody.clone();
+
String shuffleTarget = (String) raw.get(SHUFFLE_KEY);
if (shuffleTarget != null && shuffleTarget.equals(shuffleValue)) {
+ //join后覆盖object中的字段
object.fluentPutAll(addAsName(raw, rightAsName));
+ object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+ index++;
}
- object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
- index++;
- result.add(object);
}
} else if (WindowJoinType.left.name().equalsIgnoreCase(routeLabel) && rows.size() <= 0) {
JSONObject object = messageBody.clone();
@@ -217,17 +220,17 @@
} else if (WindowJoinType.right.name().equalsIgnoreCase(routeLabel) && rows.size() > 0) {
//right join(右联接):返回右表中的所有记录以及和左表中的联接字段相等的记录。
messageBody = addAsName(messageBody, rightAsName);
+ JSONObject object = messageBody.clone();
for (Map<String, Object> raw : rows) {
- JSONObject object = messageBody.clone();
String shuffleTarget = (String) raw.get(SHUFFLE_KEY);
if (shuffleTarget != null && shuffleTarget.equals(shuffleValue)) {
object.fluentPutAll(raw);
+ object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+ //只有匹配上了才会输出
+ result.add(object);
+ index++;
}
-
- object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
- index++;
- result.add(object);
}
}
return result;