[ISSUE #199]Support rsqldb (#200)
* feat(nest join) support nest join
* maintain(example) remove MqttSourceExample
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
index dcb7abb..7586d3e 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
@@ -26,6 +26,7 @@
import org.apache.rocketmq.streams.common.channel.sinkcache.DataSourceAutoFlushTask;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.schedule.ScheduleManager;
import org.apache.rocketmq.streams.common.schedule.ScheduleTask;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
index 0b10543..70d9310 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
@@ -99,6 +99,8 @@
protected String msgRouteFromLable;//消息从哪里来的标签,标记上游节点的标记,主要是通过build table name来标记
+ private String originTable;
+
protected String logFingerprintValue;//日志指纹的值
public MessageHeader copy() {
@@ -359,4 +361,12 @@
public void setPipelineName(String pipelineName) {
this.pipelineName = pipelineName;
}
+
+ public String getOriginTable() {
+ return originTable;
+ }
+
+ public void setOriginTable(String originTable) {
+ this.originTable = originTable;
+ }
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
index 9641604..8774369 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
@@ -290,6 +290,7 @@
//boolean needFlush = needFlush(msg);
if (StringUtil.isNotEmpty(oriMsgPrewSourceName)) {
msg.getHeader().setMsgRouteFromLable(oriMsgPrewSourceName);
+ msg.getHeader().setOriginTable(oriMsgPrewSourceName);
}
boolean isContinue = executeStage(stage, msg, copyContext);
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
index 9cb9df3..cf4b9e9 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
@@ -44,8 +44,14 @@
@Override
protected IMessage doProcess(IMessage message, AbstractContext context) {
String lable = message.getHeader().getMsgRouteFromLable();
+ String originTable = message.getHeader().getOriginTable();
+
String joinFlag = null;
if (lable != null) {
+ if ((lable.equals("left") || lable.equals("right")) && originTable != null) {
+ lable = originTable;
+ }
+
if (lable.equals(rightDependentTableName)) {
joinFlag = MessageHeader.JOIN_RIGHT;
} else {
@@ -61,9 +67,7 @@
} else {
rightPipeline.doMessage(message, context);
}
- //if(!MessageGloableTrace.existFinshBranch(message)){
- // context.setBreak(true);
- //}
+
context.breakExecute();
return message;
}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java
deleted file mode 100644
index 33b8b0a..0000000
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.examples.source;
-
-import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.client.StreamBuilder;
-import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.client.strategy.ShuffleStrategy;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-
-public class MqttSourceExample {
-
- public static void main(String[] args) {
- DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
- dataStream.fromMqtt("xxxxx", "xxxx", "xxxxxx", "", "")
- .flatMap(message -> ((JSONObject) message).getJSONArray("Data"))
- .window(TumblingWindow.of(Time.minutes(1)))
- .groupBy("AttributeCode")
- .setLocalStorageOnly(true)
- .avg("Value", "avg_value")
- .toDataStream()
- .toPrint()
- .with(ShuffleStrategy.shuffleWithMemory())
- .start();
- }
-
-}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index b16002a..eea5986 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -102,7 +102,6 @@
if (WindowJoinType.left.name().equalsIgnoreCase(routeLabel)) {
storage.putWindowBaseValue(queueId, windowInstanceId, WindowType.JOIN_WINDOW, WindowJoinType.left, temp);
-
} else if (WindowJoinType.right.name().equalsIgnoreCase(routeLabel)) {
storage.putWindowBaseValue(queueId, windowInstanceId, WindowType.JOIN_WINDOW, WindowJoinType.right, temp);
} else {
@@ -490,6 +489,22 @@
if (needFlush) {
nextMessage.getHeader().setNeedFlush(true);
}
+
+ String routeLabel = nextMessage.getHeader().getMsgRouteFromLable();
+ if (routeLabel == null) {
+ //嵌套join,内部join后没有routeLabel,需要设置结果的routeLabel
+ String configureName = this.getConfigureName();
+ String[] tempList = configureName.split("_");
+ for (int i = tempList.length -1; i > 0; i--) {
+ if ("left".equalsIgnoreCase(tempList[i]) || "right".equalsIgnoreCase(tempList[i])) {
+ routeLabel = tempList[i];
+ System.out.println("nested join, routeLabel=" + routeLabel);
+ break;
+ }
+ }
+ nextMessage.getHeader().setMsgRouteFromLable(routeLabel);
+ }
+
AbstractContext context = new Context(nextMessage);
boolean isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
if (isWindowTest) {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index bad436f..7b8175c 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -229,6 +229,11 @@
for (String splitId : splitIds) {
this.loadResult.put(splitId, future);
}
+
+ if (message.getHeader().isSystemMessage() && window.getFireReceiver() == null) {
+ return;
+ }
+
window.getFireReceiver().doMessage(message, context);
}