topo ok
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
index cee4637..2187072 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
@@ -66,7 +66,7 @@
if (!this.parent.shuffleNode()) {
node = new ProcessorNode<>(name, parent.getName(), new AddTagSupplier<>());
} else if (windowInfo.getJoinStream() != null) {
- node = new ShuffleProcessorNode<>(name, parent.getName(), new AddTagSupplier<>("", windowInfo::getJoinStream));
+ node = new ShuffleProcessorNode<>(name, parent.getName(), new AddTagSupplier<>("addTagToStream", windowInfo::getJoinStream));
} else {
node = new ShuffleProcessorNode<>(name, parent.getName(), new AddTagSupplier<>());
}
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java
index 5190cb9..8269ef4 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java
@@ -59,8 +59,10 @@
}
public <T> WindowStream<K, T> window(WindowInfo windowInfo) {
- List<String> temp = new ArrayList<>();
+ List<String> temp = new ArrayList<>();
+ GraphNode commChild = new ProcessorNode<>("comm", temp, new AddTagSupplier<>());
+ Pipeline leftStreamPipeline = JoinedStream.this.leftStream.getPipeline();
{
GroupedStream<K, V1> leftGroupedStream = JoinedStream.this.leftStream.keyBy(leftKeySelectAction);
@@ -71,8 +73,9 @@
leftGroupedStream.window(leftWindowInfo);
- String leftParentName = getParentGraphNodeName(JoinedStream.this.leftStream.getPipeline());
- temp.add(leftParentName);
+ GraphNode lastNode = leftStreamPipeline.getLastNode();
+ temp.add(lastNode.getName());
+ commChild.addParent(lastNode);
}
{
@@ -86,46 +89,28 @@
rightGroupedStream.window(rightWindowInfo);
- String rightParentName = getParentGraphNodeName(JoinedStream.this.leftStream.getPipeline());
- temp.add(rightParentName);
+ Pipeline rightStreamPipeline = JoinedStream.this.rightStream.getPipeline();
+
+ GraphNode lastNode = rightStreamPipeline.getLastNode();
+ temp.add(lastNode.getName());
+ commChild.addParent(lastNode);
+ lastNode.addChild(commChild);
}
- Pipeline total = new Pipeline();
-
- GraphNode node = new ProcessorNode<>("comm", temp, new AddTagSupplier<>());
-
- return new WindowStreamImpl<>(total, node, windowInfo);
+ return new WindowStreamImpl<>(leftStreamPipeline, commChild, windowInfo);
}
- private <T> AggregateAction<K, T, T> createCommonAgg() {
- return (key, value, accumulator) -> value;
- }
-
- private <T> ValueJoinAction<V1, V2, T> createThreeWayPipeAction() {
- ValueJoinAction<V1, V2, T> action = new ValueJoinAction<V1, V2, T>() {
- @Override
- public T apply(V1 value1, V2 value2) {
- return null;
- }
- };
-
- return action;
- }
-
- private String getParentGraphNodeName(Pipeline pipeline) {
- GraphNode leftLastNode = pipeline.getLastNode();
- List<GraphNode> allParent = leftLastNode.getAllParent();
-
- return allParent.get(0).getName();
- }
private WindowInfo copy(WindowInfo windowInfo) {
WindowInfo result = new WindowInfo();
WindowInfo.JoinStream joinStream = windowInfo.getJoinStream();
- WindowInfo.JoinStream stream = new WindowInfo.JoinStream(joinStream.getJoinType(), joinStream.getStreamType());
- result.setJoinStream(stream);
+ if (joinStream != null) {
+ WindowInfo.JoinStream stream = new WindowInfo.JoinStream(joinStream.getJoinType(), joinStream.getStreamType());
+ result.setJoinStream(stream);
+ }
+
result.setSessionTimeout(windowInfo.getSessionTimeout());
result.setWindowType(windowInfo.getWindowType());
result.setWindowSize(windowInfo.getWindowSize());
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/Pipeline.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/Pipeline.java
index 8ce98a8..95aa985 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/Pipeline.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/Pipeline.java
@@ -73,6 +73,16 @@
return this.root;
}
+ public GraphNode getByName(String name) {
+ for (GraphNode virtualNode : virtualNodes) {
+ if (virtualNode.getName().equals(name)) {
+ return virtualNode;
+ }
+ }
+
+ return null;
+ }
+
public GraphNode getLastNode() {
return this.virtualNodes.get(virtualNodes.size() - 1);
}
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/topology/TopologyBuilder.java b/core/src/main/java/org/apache/rocketmq/streams/core/topology/TopologyBuilder.java
index c749ef3..92ecd28 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/topology/TopologyBuilder.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/topology/TopologyBuilder.java
@@ -60,7 +60,9 @@
realNodeFactory.put(name, processorFactory);
RealProcessorFactory<T> parentFactory = (RealProcessorFactory<T>) realNodeFactory.get(parentName);
- parentFactory.addChild(processorFactory);
+ if (parentFactory != null) {//join时两个parent,但是当前流中只有一个parent
+ parentFactory.addChild(processorFactory);
+ }
grouping(name, parentName);
}
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/topology/virtual/ProcessorNode.java b/core/src/main/java/org/apache/rocketmq/streams/core/topology/virtual/ProcessorNode.java
index f7200cb..3cf1774 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/topology/virtual/ProcessorNode.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/topology/virtual/ProcessorNode.java
@@ -25,31 +25,34 @@
public class ProcessorNode<T> extends AbstractGraphNode {
protected final Supplier<Processor<T>> supplier;
- protected List<String> parentNames = new ArrayList<>();
+ protected final List<String> parentNames;
protected boolean shuffle = false;
public ProcessorNode(String name, String parentName, Supplier<Processor<T>> supplier) {
super(name);
this.supplier = supplier;
+ this.parentNames = new ArrayList<>();
this.parentNames.add(parentName);
}
+
public ProcessorNode(String name, List<String> parentNames, Supplier<Processor<T>> supplier) {
super(name);
this.supplier = supplier;
- this.parentNames.addAll(parentNames);
+ this.parentNames = parentNames;
}
public ProcessorNode(String name, List<String> parentNames, boolean shuffle, Supplier<Processor<T>> supplier) {
super(name);
this.supplier = supplier;
- this.parentNames.addAll(parentNames);
+ this.parentNames = parentNames;
this.shuffle = shuffle;
}
public ProcessorNode(String name, String parentName, boolean shuffle, Supplier<Processor<T>> supplier) {
super(name);
this.supplier = supplier;
+ this.parentNames = new ArrayList<>();
this.parentNames.add(parentName);
this.shuffle = shuffle;
}