topo ok
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
index cee4637..2187072 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java
@@ -66,7 +66,7 @@
         if (!this.parent.shuffleNode()) {
             node = new ProcessorNode<>(name, parent.getName(), new AddTagSupplier<>());
         } else if (windowInfo.getJoinStream() != null) {
-            node = new ShuffleProcessorNode<>(name, parent.getName(), new AddTagSupplier<>("", windowInfo::getJoinStream));
+            node = new ShuffleProcessorNode<>(name, parent.getName(), new AddTagSupplier<>("addTagToStream", windowInfo::getJoinStream));
         } else {
             node = new ShuffleProcessorNode<>(name, parent.getName(), new AddTagSupplier<>());
         }
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java
index 5190cb9..8269ef4 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java
@@ -59,8 +59,10 @@
         }
 
         public <T> WindowStream<K, T> window(WindowInfo windowInfo) {
-           List<String> temp = new ArrayList<>();
+            List<String> temp = new ArrayList<>();
+            GraphNode commChild = new ProcessorNode<>("comm", temp, new AddTagSupplier<>());
 
+            Pipeline leftStreamPipeline = JoinedStream.this.leftStream.getPipeline();
             {
                 GroupedStream<K, V1> leftGroupedStream = JoinedStream.this.leftStream.keyBy(leftKeySelectAction);
 
@@ -71,8 +73,9 @@
 
                 leftGroupedStream.window(leftWindowInfo);
 
-                String leftParentName = getParentGraphNodeName(JoinedStream.this.leftStream.getPipeline());
-                temp.add(leftParentName);
+                GraphNode lastNode = leftStreamPipeline.getLastNode();
+                temp.add(lastNode.getName());
+                commChild.addParent(lastNode);
             }
 
             {
@@ -86,46 +89,28 @@
 
                 rightGroupedStream.window(rightWindowInfo);
 
-                String rightParentName = getParentGraphNodeName(JoinedStream.this.leftStream.getPipeline());
-                temp.add(rightParentName);
+                Pipeline rightStreamPipeline = JoinedStream.this.rightStream.getPipeline();
+
+                GraphNode lastNode = rightStreamPipeline.getLastNode();
+                temp.add(lastNode.getName());
+                commChild.addParent(lastNode);
+                lastNode.addChild(commChild);
             }
 
-            Pipeline total = new Pipeline();
-
-            GraphNode node = new ProcessorNode<>("comm", temp, new AddTagSupplier<>());
-
-            return new WindowStreamImpl<>(total, node, windowInfo);
+            return new WindowStreamImpl<>(leftStreamPipeline, commChild, windowInfo);
         }
 
-        private <T> AggregateAction<K, T, T> createCommonAgg() {
-            return (key, value, accumulator) -> value;
-        }
-
-        private <T> ValueJoinAction<V1, V2, T> createThreeWayPipeAction() {
-            ValueJoinAction<V1, V2, T> action = new ValueJoinAction<V1, V2, T>() {
-                @Override
-                public T apply(V1 value1, V2 value2) {
-                    return null;
-                }
-            };
-
-            return action;
-        }
-
-        private String getParentGraphNodeName(Pipeline pipeline) {
-            GraphNode leftLastNode = pipeline.getLastNode();
-            List<GraphNode> allParent = leftLastNode.getAllParent();
-
-            return allParent.get(0).getName();
-        }
 
         private WindowInfo copy(WindowInfo windowInfo) {
             WindowInfo result = new WindowInfo();
 
             WindowInfo.JoinStream joinStream = windowInfo.getJoinStream();
-            WindowInfo.JoinStream stream = new WindowInfo.JoinStream(joinStream.getJoinType(), joinStream.getStreamType());
 
-            result.setJoinStream(stream);
+            if (joinStream != null) {
+                WindowInfo.JoinStream stream = new WindowInfo.JoinStream(joinStream.getJoinType(), joinStream.getStreamType());
+                result.setJoinStream(stream);
+            }
+
             result.setSessionTimeout(windowInfo.getSessionTimeout());
             result.setWindowType(windowInfo.getWindowType());
             result.setWindowSize(windowInfo.getWindowSize());
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/Pipeline.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/Pipeline.java
index 8ce98a8..95aa985 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/Pipeline.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/Pipeline.java
@@ -73,6 +73,16 @@
         return this.root;
     }
 
+    public GraphNode getByName(String name) {
+        for (GraphNode virtualNode : virtualNodes) {
+            if (virtualNode.getName().equals(name)) {
+                return virtualNode;
+            }
+        }
+
+        return null;
+    }
+
     public GraphNode getLastNode() {
         return this.virtualNodes.get(virtualNodes.size() - 1);
     }
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/topology/TopologyBuilder.java b/core/src/main/java/org/apache/rocketmq/streams/core/topology/TopologyBuilder.java
index c749ef3..92ecd28 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/topology/TopologyBuilder.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/topology/TopologyBuilder.java
@@ -60,7 +60,9 @@
         realNodeFactory.put(name, processorFactory);
 
         RealProcessorFactory<T> parentFactory = (RealProcessorFactory<T>) realNodeFactory.get(parentName);
-        parentFactory.addChild(processorFactory);
+        if (parentFactory != null) {//join时两个parent,但是当前流中只有一个parent
+            parentFactory.addChild(processorFactory);
+        }
 
         grouping(name, parentName);
     }
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/topology/virtual/ProcessorNode.java b/core/src/main/java/org/apache/rocketmq/streams/core/topology/virtual/ProcessorNode.java
index f7200cb..3cf1774 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/topology/virtual/ProcessorNode.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/topology/virtual/ProcessorNode.java
@@ -25,31 +25,34 @@
 
 public class ProcessorNode<T> extends AbstractGraphNode {
     protected final Supplier<Processor<T>> supplier;
-    protected List<String> parentNames = new ArrayList<>();
+    protected final List<String> parentNames;
     protected boolean shuffle = false;
 
 
     public ProcessorNode(String name, String parentName, Supplier<Processor<T>> supplier) {
         super(name);
         this.supplier = supplier;
+        this.parentNames = new ArrayList<>();
         this.parentNames.add(parentName);
     }
+
     public ProcessorNode(String name, List<String> parentNames, Supplier<Processor<T>> supplier) {
         super(name);
         this.supplier = supplier;
-        this.parentNames.addAll(parentNames);
+        this.parentNames = parentNames;
     }
 
     public ProcessorNode(String name, List<String> parentNames, boolean shuffle, Supplier<Processor<T>> supplier) {
         super(name);
         this.supplier = supplier;
-        this.parentNames.addAll(parentNames);
+        this.parentNames = parentNames;
         this.shuffle = shuffle;
     }
 
     public ProcessorNode(String name, String parentName, boolean shuffle, Supplier<Processor<T>> supplier) {
         super(name);
         this.supplier = supplier;
+        this.parentNames = new ArrayList<>();
         this.parentNames.add(parentName);
         this.shuffle = shuffle;
     }