[WAYANG-#8] Change structure for PywyPlan

Signed-off-by: bertty <bertty@apache.org>
diff --git a/python/src/pywy/context.py b/python/src/pywy/context.py
deleted file mode 100644
index 1f2e883..0000000
--- a/python/src/pywy/context.py
+++ /dev/null
@@ -1,35 +0,0 @@
-from pywy.platforms.basic.plugin import Plugin
-from pywy.dataquanta import DataQuanta
-from pywy.wayangplan.source import TextFileSource
-
-class WayangContext:
-  """
-  This is the entry point for users to work with Wayang.
-  """
-  def __init__(self):
-    self.plugins = set()
-
-  """
-  add a :class:`Plugin` to the :class:`Context`
-  """
-  def register(self, *p: Plugin):
-    self.plugins.add(p)
-    return self
-
-  """
-  remove a :class:`Plugin` from the :class:`Context`
-  """
-  def unregister(self, p: Plugin):
-    self.plugins.remove(p)
-    return self
-
-  def textFile(self, file_path: str) -> DataQuanta[str]:
-    return DataQuanta(TextFileSource(file_path))
-
-
-  def __str__(self):
-    return "Plugins: {} \n".format(str(self.plugins))
-
-  def __repr__(self):
-    return self.__str__()
-
diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py
index c4e7b15..5544b3e 100644
--- a/python/src/pywy/dataquanta.py
+++ b/python/src/pywy/dataquanta.py
@@ -1,5 +1,37 @@
 from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
 from pywy.wayangplan import *
+from pywy.wayangplan.wayang import PywyPlan
+from pywy.platforms.basic.plugin import Plugin
+
+class WayangContext:
+  """
+  This is the entry point for users to work with Wayang.
+  """
+  def __init__(self):
+    self.plugins = set()
+
+  """
+  add a :class:`Plugin` to the :class:`Context`
+  """
+  def register(self, *p: Plugin):
+    self.plugins.add(p)
+    return self
+
+  """
+  remove a :class:`Plugin` from the :class:`Context`
+  """
+  def unregister(self, p: Plugin):
+    self.plugins.remove(p)
+    return self
+
+  def textFile(self, file_path: str) -> 'DataQuanta[str]':
+    return DataQuanta(self, TextFileSource(file_path))
+
+  def __str__(self):
+    return "Plugins: {} \n".format(str(self.plugins))
+
+  def __repr__(self):
+    return self.__str__()
 
 class DataQuanta(GenericTco):
     """
@@ -7,22 +39,30 @@
     """
     previous : WyOperator = None
 
-    def __init__(self, operator: WyOperator):
+    def __init__(self, context:WayangContext,  operator: WyOperator):
         self.operator = operator
+        self.context = context
 
     def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]" :
-        return DataQuanta(FilterOperator(p))
+        return DataQuanta(self.context, self.__connect(FilterOperator(p)))
 
     def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" :
-        return DataQuanta(MapOperator(f))
+        return DataQuanta(self.context,self.__connect(MapOperator(f)))
 
     def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" :
-        return DataQuanta(FlatmapOperator(f))
+        return DataQuanta(self.context,self.__connect(FlatmapOperator(f)))
 
     def storeTextFile(self: "DataQuanta[I]", path: str) :
-        last = DataQuanta(TextFileSink(path))
+        last = self.__connect(TextFileSink(path))
+        plan = PywyPlan(self.context.plugins, [last])
+        plan.print()
+
         # TODO add the logic to execute the plan
 
+    def __connect(self, op:WyOperator, port_op: int = 0) -> WyOperator:
+        self.operator.connect(0, op, port_op)
+        return op
+
     def getOperator(self):
         return self.operator
 
diff --git a/python/src/pywy/platforms/python/operators/PyFilterOperator.py b/python/src/pywy/platforms/python/operators/PyFilterOperator.py
index 24b97d7..e01d431 100644
--- a/python/src/pywy/platforms/python/operators/PyFilterOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyFilterOperator.py
@@ -13,7 +13,7 @@
         pass
 
     def execute(self, inputs: Channel, outputs: Channel):
-        self.validateChannels(inputs, outputs)
+        self.validate_channels(inputs, outputs)
         udf = self.predicate
         if isinstance(inputs[0], PyIteratorChannel) :
             py_in_iter_channel: PyIteratorChannel = inputs[0]
@@ -36,8 +36,8 @@
             raise Exception("Channel Type does not supported")
 
 
-    def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
         return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor}
 
-    def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
         return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor}
diff --git a/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py b/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py
index 8b24411..11a779b 100644
--- a/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py
@@ -16,7 +16,7 @@
         pass
 
     def execute(self, inputs: Channel, outputs: Channel):
-        self.validateChannels(inputs, outputs)
+        self.validate_channels(inputs, outputs)
         if isinstance(inputs[0], PyIteratorChannel) :
             file = open(self.path,'w')
             py_in_iter_channel: PyIteratorChannel = inputs[0]
@@ -29,8 +29,8 @@
             raise Exception("Channel Type does not supported")
 
 
-    def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
         return {PyIteratorChannelDescriptor}
 
-    def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
         raise Exception("The PyTextFileSource does not support Output Channels")
diff --git a/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py b/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py
index d228b0e..cc36605 100644
--- a/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py
@@ -17,7 +17,7 @@
         pass
 
     def execute(self, inputs: Channel, outputs: Channel):
-        self.validateChannels(inputs, outputs)
+        self.validate_channels(inputs, outputs)
         if isinstance(outputs[0], PyIteratorChannel) :
             py_out_iter_channel: PyIteratorChannel = outputs[0]
             py_out_iter_channel.accept_iterable(
@@ -31,8 +31,8 @@
             raise Exception("Channel Type does not supported")
 
 
-    def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
         raise Exception("The PyTextFileSource does not support Input Channels")
 
-    def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
         return {PyIteratorChannelDescriptor}
diff --git a/python/src/pywy/test.py b/python/src/pywy/test.py
index ec4fbad..de0153d 100644
--- a/python/src/pywy/test.py
+++ b/python/src/pywy/test.py
@@ -1,7 +1,7 @@
 from pywy.platforms.basic.platform import Platform
-from pywy.context import WayangContext
+from pywy.dataquanta import WayangContext
 from pywy.platforms.python.channels import Channel
-from pywy.platforms.basic.plugin import java, spark
+from pywy.plugins import java, spark
 from pywy.wayangplan.unary import *
 
 p = Platform("nana")
@@ -36,62 +36,62 @@
             .filter(pre)\
             .storeTextFile("/Users/bertty/databloom/blossom/python/resources/test.output")
 
-filterop: FilterOperator = fileop.filter(pre).getOperator()
-#fop_pre = filterop.getWrapper()
-#fop_pre_res = fop_pre(["la", "lala"])
-#for i in fop_pre_res:
-#    print(i)
-
-
-mapop: MapOperator = fileop.map(func).getOperator()
-mop_func = mapop.getWrapper()
-mop_func_res = mop_func(["la", "lala"])
-#for i in mop_func_res:
-#    print(i)
-
-
-fmop: FlatmapOperator = fileop.flatmap(fmfunc).getOperator()
-fmop_func = fmop.getWrapper()
-fmop_func_res = fmop_func([2, 3])
-#for i in fmop_func_res:
-#    print(i)
-
-def concatenate(function_a, function_b):
-    def executable(iterable):
-        return function_b(function_a(iterable))
-    return executable
-
-#res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
-#res_pro = res(["la", "lala"])
-#for i in res_pro:
-#    print(i)
-
-from pywy.platforms.python.mappings import PywyOperatorMappings
-from pywy.platforms.python.operators import *
-
-print(PywyOperatorMappings)
-
-pyF = PyFilterOperator()
-print(pyF)
-print(pyF.getInputChannelDescriptors())
-print(type(pyF.getInputChannelDescriptors().pop().create_instance()))
-
-qq : Channel = pyF.getInputChannelDescriptors().pop().create_instance()
-print(qq)
-print(type(qq))
-print("ads")
-
-
-def pre_lala(a:str):
-    print("executed")
-    return len(a) > 3
-
-ou1 = filter(pre_lala, ["la", "lala"])
-print(ou1)
-
-for i in ou1:
-    print(i)
-
-pyFM = PywyOperatorMappings.get_instanceof(filterop)
-print(pyFM)
-print(type(pyFM))
\ No newline at end of file
+# filterop: FilterOperator = fileop.filter(pre).getOperator()
+# #fop_pre = filterop.getWrapper()
+# #fop_pre_res = fop_pre(["la", "lala"])
+# #for i in fop_pre_res:
+# #    print(i)
+#
+#
+# mapop: MapOperator = fileop.map(func).getOperator()
+# mop_func = mapop.getWrapper()
+# mop_func_res = mop_func(["la", "lala"])
+# #for i in mop_func_res:
+# #    print(i)
+#
+#
+# fmop: FlatmapOperator = fileop.flatmap(fmfunc).getOperator()
+# fmop_func = fmop.getWrapper()
+# fmop_func_res = fmop_func([2, 3])
+# #for i in fmop_func_res:
+# #    print(i)
+#
+# def concatenate(function_a, function_b):
+#     def executable(iterable):
+#         return function_b(function_a(iterable))
+#     return executable
+#
+# #res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
+# #res_pro = res(["la", "lala"])
+# #for i in res_pro:
+# #    print(i)
+#
+# from pywy.platforms.python.mappings import PywyOperatorMappings
+# from pywy.platforms.python.operators import *
+#
+# print(PywyOperatorMappings)
+#
+# pyF = PyFilterOperator()
+# print(pyF)
+# print(pyF.get_input_channeldescriptors())
+# print(type(pyF.get_input_channeldescriptors().pop().create_instance()))
+#
+# qq : Channel = pyF.get_input_channeldescriptors().pop().create_instance()
+# print(qq)
+# print(type(qq))
+# print("ads")
+#
+#
+# def pre_lala(a:str):
+#     print("executed")
+#     return len(a) > 3
+#
+# ou1 = filter(pre_lala, ["la", "lala"])
+# print(ou1)
+#
+# for i in ou1:
+#     print(i)
+#
+# pyFM = PywyOperatorMappings.get_instanceof(filterop)
+# print(pyFM)
+# print(type(pyFM))
\ No newline at end of file
diff --git a/python/src/pywy/translate/translator.py b/python/src/pywy/translate/translator.py
index 303d497..8ec2939 100644
--- a/python/src/pywy/translate/translator.py
+++ b/python/src/pywy/translate/translator.py
@@ -1,10 +1,10 @@
 from pywy.platforms.basic.plugin import Plugin
-from pywy.wayangplan.wayang import WayangPlan
+from pywy.wayangplan.wayang import PywyPlan
 from pywy.platforms.basic.mapping import Mapping
 
 class Translator:
 
-    def __init__(self, plugin: Plugin, plan: WayangPlan):
+    def __init__(self, plugin: Plugin, plan: PywyPlan):
         self.plugin = plugin
         self.plan = plan
 
diff --git a/python/src/pywy/wayangplan/__init__.py b/python/src/pywy/wayangplan/__init__.py
index dae5987..265e2d2 100644
--- a/python/src/pywy/wayangplan/__init__.py
+++ b/python/src/pywy/wayangplan/__init__.py
@@ -1,15 +1,13 @@
-from pywy.wayangplan.wayang import WayangPlan
 from pywy.wayangplan.base import WyOperator
 from pywy.wayangplan.sink import TextFileSink
 from pywy.wayangplan.source import TextFileSource
 from pywy.wayangplan.unary import FilterOperator, MapOperator, FlatmapOperator
-
+#
 __ALL__= [
-    WayangPlan,
-    WyOperator,
-    TextFileSink,
-    TextFileSource,
-    FilterOperator,
-    MapOperator,
-    FlatmapOperator
+     WyOperator,
+     TextFileSink,
+     TextFileSource,
+     FilterOperator,
+#     MapOperator,
+#     FlatmapOperator
 ]
\ No newline at end of file
diff --git a/python/src/pywy/wayangplan/base.py b/python/src/pywy/wayangplan/base.py
index 90e0b62..61a4a5d 100644
--- a/python/src/pywy/wayangplan/base.py
+++ b/python/src/pywy/wayangplan/base.py
@@ -5,9 +5,11 @@
 
     inputSlot : List[TypeVar]
     inputChannel : ChannelDescriptor
+    inputOperator: List['WyOperator']
     inputs : int
     outputSlot : List[TypeVar]
-    OutputChannel: ChannelDescriptor
+    outputChannel: ChannelDescriptor
+    outputOperator: List['WyOperator']
     outputs: int
 
     def __init__(self,
@@ -22,8 +24,10 @@
         self.inputs = input_lenght
         self.outputSlot = output
         self.outputs = output_lenght
+        self.inputOperator = [None] * self.inputs
+        self.outputOperator = [None] * self.outputs
 
-    def validateInputs(self, vec):
+    def validate_inputs(self, vec):
         if len(vec) != self.inputs:
             raise Exception(
                 "the inputs channel contains {} elements and need to have {}".format(
@@ -31,7 +35,7 @@
                     self.inputs
                 )
             )
-    def validateOutputs(self, vec):
+    def validate_outputs(self, vec):
         if len(vec) != self.outputs:
             raise Exception(
                 "the output channel contains {} elements and need to have {}".format(
@@ -39,14 +43,18 @@
                     self.inputs
                 )
             )
-    def validateChannels(self, input, output):
-        self.validateInputs(input)
-        self.validateOutputs(output)
+    def validate_channels(self, input, output):
+        self.validate_inputs(input)
+        self.validate_outputs(output)
 
-    def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+    def connect(self, port:int, that: 'WyOperator', port_that:int):
+        self.outputOperator[port] = that
+        that.inputOperator[port_that] = self
+
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
         pass
 
-    def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
         pass
 
     def __str__(self):
diff --git a/python/src/pywy/wayangplan/sink.py b/python/src/pywy/wayangplan/sink.py
index 8944324..01cb0e9 100644
--- a/python/src/pywy/wayangplan/sink.py
+++ b/python/src/pywy/wayangplan/sink.py
@@ -1,20 +1,15 @@
+from typing import Any
+
+from pywy.types import GenericTco
 from pywy.wayangplan.base import WyOperator
 
 class SinkOperator(WyOperator):
-
-    def __init__(self, name:str):
-        super().__init__(name, None, str, 0, 1)
-
-    def __str__(self):
-        return super().__str__()
-
-    def __repr__(self):
-        return super().__repr__()
+    pass
 
 class SinkUnaryOperator(SinkOperator):
 
-    def __init__(self, name:str):
-        super().__init__(name, None, str, 0, 1)
+    def __init__(self, name:str, input:GenericTco=Any):
+        super().__init__(name, input, None, 1, 0)
 
     def __str__(self):
         return super().__str__()
diff --git a/python/src/pywy/wayangplan/source.py b/python/src/pywy/wayangplan/source.py
index c9ae2ba..7428b90 100644
--- a/python/src/pywy/wayangplan/source.py
+++ b/python/src/pywy/wayangplan/source.py
@@ -3,7 +3,13 @@
 class SourceUnaryOperator(WyOperator):
 
     def __init__(self, name:str):
-        super().__init__(name, None, str, 0, 1)
+        super(SourceUnaryOperator, self).__init__(
+            name = name,
+            input = None,
+            output = str,
+            input_lenght = 0,
+            output_lenght = 1
+        )
 
     def __str__(self):
         return super().__str__()
@@ -18,7 +24,7 @@
     path: str
 
     def __init__(self, path: str):
-        super().__init__('TextFileSource')
+        super(TextFileSource, self).__init__('TextFileSource')
         self.path = path
 
     def __str__(self):
diff --git a/python/src/pywy/wayangplan/wayang.py b/python/src/pywy/wayangplan/wayang.py
index a08b572..7da13e1 100644
--- a/python/src/pywy/wayangplan/wayang.py
+++ b/python/src/pywy/wayangplan/wayang.py
@@ -1,11 +1,102 @@
-from typing import Iterable
+from typing import Iterable, Dict, Callable, NoReturn, List, Set
 
 from pywy.wayangplan.sink import SinkOperator
-from pywy.platforms.basic.platform import Platform
+from pywy.wayangplan.base import WyOperator
+from pywy.platforms.basic.plugin import Plugin
+
+class GraphNodeWayang:
+
+    current: WyOperator
+    visited: bool
+
+    def __init__(self, op: WyOperator):
+        self.current = op
+        self.visited = False
+
+    def successors(self, created: Dict[WyOperator, 'GraphNodeWayang']) -> Iterable['GraphNodeWayang']:
+        if self.current is None or self.current.outputs == 0:
+            return []
+
+        def wrap(op:WyOperator):
+            if op is None:
+                return None;
+            if op not in created:
+                created[op] = GraphNodeWayang(op)
+            return created[op]
+
+        adjacent = self.current.outputOperator
+        return map(wrap, adjacent)
+
+    def predecessors(self, created: Dict[WyOperator, 'GraphNodeWayang']) -> Iterable['GraphNodeWayang']:
+        print("predecessors")
+        print(self)
+        def wrap(op:WyOperator):
+            if op not in created:
+                created[op] = GraphNodeWayang(op)
+            return created[op]
+
+        adjacent = self.current.inputOperator
+        return map(wrap, adjacent)
+
+    def visit(self, parent: 'GraphNodeWayang', udf: Callable[['GraphNodeWayang', 'GraphNodeWayang'], NoReturn], visit_status: bool = True):
+        if(self.visited == visit_status):
+            return
+        udf(self, parent)
+        self.visited = visit_status
+
+class GraphWayang:
+
+    starting_nodes : List[GraphNodeWayang]
+    created_nodes : Dict[WyOperator, GraphNodeWayang]
+
+    def __init__(self, plan:'PywyPlan'):
+        self.created_nodes = {}
+        self.starting_nodes = list()
+        for sink in plan.sinks:
+            tmp = GraphNodeWayang(sink)
+            self.starting_nodes.append(tmp)
+            self.created_nodes[sink] = tmp
 
 
-class WayangPlan:
+    def traversal(
+            self,
+            origin: GraphNodeWayang,
+            nodes: Iterable[GraphNodeWayang],
+            udf: Callable[['GraphNodeWayang', 'GraphNodeWayang'], NoReturn]
+    ):
+        for node in nodes:
+            adjacents = node.predecessors(self.created_nodes)
+            self.traversal(node, adjacents, udf)
+            node.visit(origin, udf)
 
-    def __init__(self, platforms: Iterable[Platform], sinks: Iterable[SinkOperator]):
-        self.platforms = platforms
+class PywyPlan:
+
+    graph: GraphWayang
+
+    def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]):
+        self.plugins = plugins
         self.sinks = sinks
+        self.graph = GraphWayang(self)
+
+    def print(self):
+        def print_plan(current: GraphNodeWayang, previous: GraphNodeWayang):
+            if current is None:
+                print("this is source")
+                print(previous.current)
+                return
+            if previous is None:
+                print("this is sink")
+                print(current.current)
+                return
+
+            print(
+                "===========\n{}\n@@@@@ => previous is\n{}\n===========\n"
+                    .format(
+                        current.current,
+                        previous.current
+                     )
+            )
+        self.graph.traversal(None, self.graph.starting_nodes, print_plan)
+
+
+