[WAYANG-#8] Seed creation of Platforms/python
Signed-off-by: bertty <bertty@apache.org>
diff --git a/python/src/pywayang/dataquanta.py b/python/src/pywayang/dataquanta.py
index 5f74094..3ccc283 100644
--- a/python/src/pywayang/dataquanta.py
+++ b/python/src/pywayang/dataquanta.py
@@ -1,5 +1,5 @@
from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
-from pywayang.operator.base import (BaseOperator)
+from pywayang.operator.base import (WyOperator)
from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator)
@@ -7,9 +7,9 @@
"""
Represents an intermediate result/data flow edge in a [[WayangPlan]].
"""
- previous : BaseOperator = None
+ previous : WyOperator = None
- def __init__(self, operator: BaseOperator):
+ def __init__(self, operator: WyOperator):
self.operator = operator
diff --git a/python/src/pywayang/operator/base.py b/python/src/pywayang/operator/base.py
index ad2deed..b7834d1 100644
--- a/python/src/pywayang/operator/base.py
+++ b/python/src/pywayang/operator/base.py
@@ -1,11 +1,13 @@
-from typing import (TypeVar, Optional, List)
+from typing import (TypeVar, Optional, List, Set)
+from pywayang.platforms.python.channels import ChannelDescriptor
-
-class BaseOperator:
+class WyOperator:
inputSlot : List[TypeVar]
+ inputChannel : ChannelDescriptor
inputs : int
outputSlot : List[TypeVar]
+ OutputChannel: ChannelDescriptor
outputs: int
def __init__(self,
@@ -21,6 +23,32 @@
self.outputSlot = output
self.outputs = output_lenght
+ def validateInputs(self, vec):
+ if len(vec) != self.inputs:
+ raise Exception(
+ "the inputs channel contains {} elements and need to have {}".format(
+ len(vec),
+ self.inputs
+ )
+ )
+ def validateOutputs(self, vec):
+ if len(vec) != self.outputs:
+ raise Exception(
+ "the output channel contains {} elements and need to have {}".format(
+ len(vec),
+ self.inputs
+ )
+ )
+ def validateChannels(self, input, output):
+ self.validateInputs(input)
+ self.validateOutputs(output)
+
+ def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+ pass
+
+ def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+ pass
+
def __str__(self):
return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format(
str(self.name),
diff --git a/python/src/pywayang/operator/source.py b/python/src/pywayang/operator/source.py
index 34a1664..97aff75 100644
--- a/python/src/pywayang/operator/source.py
+++ b/python/src/pywayang/operator/source.py
@@ -1,6 +1,6 @@
-from pywayang.operator.base import BaseOperator
+from pywayang.operator.base import WyOperator
-class SourceUnaryOperator(BaseOperator):
+class SourceUnaryOperator(WyOperator):
def __init__(self, name:str):
super().__init__(name, None, str, 0, 1)
diff --git a/python/src/pywayang/operator/unary.py b/python/src/pywayang/operator/unary.py
index 24e5df2..559effa 100644
--- a/python/src/pywayang/operator/unary.py
+++ b/python/src/pywayang/operator/unary.py
@@ -1,4 +1,4 @@
-from pywayang.operator.base import BaseOperator
+from pywayang.operator.base import WyOperator
from pywayang.types import (
GenericTco,
GenericUco,
@@ -12,7 +12,7 @@
from itertools import chain
-class UnaryToUnaryOperator(BaseOperator):
+class UnaryToUnaryOperator(WyOperator):
def __init__(self, name:str, input:GenericTco, output:GenericUco):
super().__init__(name, input, output, 1, 1)
@@ -30,16 +30,10 @@
predicate: Predicate
def __init__(self, predicate: Predicate):
- type = getTypePredicate(predicate)
+ type = getTypePredicate(predicate) if predicate else None
super().__init__("FilterOperator", type, type)
self.predicate = predicate
- def getWrapper(self):
- udf = self.predicate
- def func(iterator):
- return filter(udf, iterator)
- return func
-
def __str__(self):
return super().__str__()
@@ -51,7 +45,7 @@
function: Function
def __init__(self, function: Function):
- types = getTypeFunction(function)
+ types = getTypeFunction(function) if function else (None, None)
super().__init__("MapOperator", types[0], types[1])
self.function = function
@@ -73,7 +67,7 @@
fmfunction: FlatmapFunction
def __init__(self, fmfunction: FlatmapFunction):
- types = getTypeFlatmapFunction(fmfunction)
+ types = getTypeFlatmapFunction(fmfunction) if fmfunction else (None, None)
super().__init__("FlatmapOperator", types[0], types[1])
self.fmfunction = fmfunction
diff --git a/python/src/pywayang/platforms/__init__.py b/python/src/pywayang/platforms/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/python/src/pywayang/platforms/__init__.py
diff --git a/python/src/pywayang/platforms/python/__init__.py b/python/src/pywayang/platforms/python/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/python/src/pywayang/platforms/python/__init__.py
diff --git a/python/src/pywayang/platforms/python/channels.py b/python/src/pywayang/platforms/python/channels.py
new file mode 100644
index 0000000..f611a25
--- /dev/null
+++ b/python/src/pywayang/platforms/python/channels.py
@@ -0,0 +1,60 @@
+from typing import ( Iterable, Callable )
+
+class Channel:
+
+ def __init__(self):
+ pass
+
+ def getchannel(self) -> 'Channel':
+ return self
+
+ def gettype(self):
+ return type(self)
+
+class ChannelDescriptor:
+
+ def __init__(self, channelType: type, isReusable: bool, isSuitableForBreakpoint: bool):
+ self.channelType = channelType
+ self.isReusable = isReusable
+ self.isSuitableForBreakpoint = isSuitableForBreakpoint
+
+ def create_instance(self) -> Channel:
+ return self.channelType()
+
+
+class PyIteratorChannel(Channel):
+
+ iterable : Iterable
+
+ def __init__(self):
+ Channel.__init__(self)
+
+ def provide_iterable(self) -> Iterable:
+ return self.iterable
+
+ def accept_iterable(self, iterable) -> 'PyIteratorChannel':
+ self.iterable = iterable
+ return self
+
+class PyCallableChannel(Channel):
+
+ udf : Callable
+
+ def __init__(self):
+ Channel.__init__(self)
+
+ def provide_callable(self) -> Callable:
+ return self.udf
+
+ def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
+ self.udf = udf
+ return self
+
+ @staticmethod
+ def concatenate(function_a: Callable, function_b: Callable):
+ def executable(iterable):
+ return function_a(function_b(iterable))
+ return executable
+
+PyIteratorChannelDescriptor = ChannelDescriptor(type(PyIteratorChannel()), False, False)
+PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False)
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/compiler/__init__.py b/python/src/pywayang/platforms/python/compiler/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/python/src/pywayang/platforms/python/compiler/__init__.py
diff --git a/python/src/pywayang/platforms/python/execution/__init__.py b/python/src/pywayang/platforms/python/execution/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/python/src/pywayang/platforms/python/execution/__init__.py
diff --git a/python/src/pywayang/platforms/python/mappings.py b/python/src/pywayang/platforms/python/mappings.py
new file mode 100644
index 0000000..977ccad
--- /dev/null
+++ b/python/src/pywayang/platforms/python/mappings.py
@@ -0,0 +1,35 @@
+from typing import Dict
+
+from pywayang.operator.base import WyOperator
+from pywayang.platforms.python.operators import *
+
+class Mapping:
+ mappings: Dict[str, type]
+
+ def __init__(self):
+ self.mappings = {}
+
+ def add_mapping(self, operator: PythonExecutionOperator):
+ self.mappings[operator.name] = type(operator)
+
+ def get_instanceof(self, operator: WyOperator):
+ template = self.mappings[operator.name]
+ if template is None:
+ raise Exception(
+ "the operator {} does not have valid mapping".format(
+ operator.name
+ )
+ )
+ return template(operator)
+
+
+ def __str__(self):
+ return str(self.mappings)
+
+ def __repr__(self):
+ return self.__str__()
+
+OperatorMappings = Mapping()
+
+OperatorMappings.add_mapping(PyFilterOperator())
+
diff --git a/python/src/pywayang/platforms/python/operators/PyFilterOperator.py b/python/src/pywayang/platforms/python/operators/PyFilterOperator.py
new file mode 100644
index 0000000..f1d7dcb
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/PyFilterOperator.py
@@ -0,0 +1,43 @@
+from pywayang.operator.unary import FilterOperator
+from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywayang.platforms.python.channels import (Channel, ChannelDescriptor, PyIteratorChannel,
+ PyIteratorChannelDescriptor, PyCallableChannelDescriptor,
+ PyCallableChannel)
+from typing import Set
+
+class PyFilterOperator(FilterOperator, PythonExecutionOperator):
+
+ def __init__(self, origin: FilterOperator = None):
+ predicate = None if origin is None else origin.predicate
+ super().__init__(predicate)
+ pass
+
+ def execute(self, inputs: Channel, outputs: Channel):
+ self.validateChannels(inputs, outputs)
+ udf = self.predicate
+ if isinstance(inputs[0], PyIteratorChannel) :
+ py_in_iter_channel: PyIteratorChannel = inputs[0]
+ py_out_iter_channel: PyIteratorChannel = outputs[0]
+ py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable()))
+ elif isinstance(inputs[0], PyCallableChannel) :
+ py_in_call_channel: PyCallableChannel = inputs[0]
+ py_out_call_channel: PyCallableChannel = outputs[0]
+
+ def func(iterator):
+ return filter(udf, iterator)
+
+ py_out_call_channel.accept_callable(
+ PyCallableChannel.concatenate(
+ func,
+ py_in_call_channel.provide_callable()
+ )
+ )
+ else:
+ raise Exception("Channel Type does not supported")
+
+
+ def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+ return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor}
+
+ def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+ return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor}
diff --git a/python/src/pywayang/platforms/python/operators/PythonExecutionOperator.py b/python/src/pywayang/platforms/python/operators/PythonExecutionOperator.py
new file mode 100644
index 0000000..4a79616
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/PythonExecutionOperator.py
@@ -0,0 +1,7 @@
+from pywayang.operator.base import WyOperator
+from pywayang.platforms.python.channels import Channel
+
+class PythonExecutionOperator(WyOperator):
+
+ def execute(self, inputs: Channel, output: Channel):
+ pass
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/operators/__init__.py b/python/src/pywayang/platforms/python/operators/__init__.py
new file mode 100644
index 0000000..208a2fc
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/__init__.py
@@ -0,0 +1,7 @@
+from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator
+
+__ALL__ = [
+ PythonExecutionOperator,
+ PyFilterOperator
+]
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/platform/__init__.py b/python/src/pywayang/platforms/python/platform/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/python/src/pywayang/platforms/python/platform/__init__.py
diff --git a/python/src/pywayang/platforms/python/plugin/__init__.py b/python/src/pywayang/platforms/python/plugin/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/python/src/pywayang/platforms/python/plugin/__init__.py
diff --git a/python/src/pywayang/test.py b/python/src/pywayang/test.py
index 66ddab0..884acfc 100644
--- a/python/src/pywayang/test.py
+++ b/python/src/pywayang/test.py
@@ -2,11 +2,17 @@
from pywayang.platform import Platform
from pywayang.context import WayangContext
+from pywayang.platforms.python.channels import Channel
from pywayang.plugin import java, spark
from pywayang.operator.unary import *
p = Platform("nana")
-print(p)
+print("LALA "+str(p))
+pt = type(p)
+print(pt)
+p2 = pt("chao")
+print(p2)
+print(type(p2))
print(str(WayangContext().register(java, spark)))
@@ -31,8 +37,8 @@
.textFile("here")\
filterop: FilterOperator = fileop.filter(pre).getOperator()
-fop_pre = filterop.getWrapper()
-fop_pre_res = fop_pre(["la", "lala"])
+#fop_pre = filterop.getWrapper()
+#fop_pre_res = fop_pre(["la", "lala"])
#for i in fop_pre_res:
# print(i)
@@ -55,7 +61,37 @@
return function_b(function_a(iterable))
return executable
-res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
-res_pro = res(["la", "lala"])
-for i in res_pro:
- print(i)
\ No newline at end of file
+#res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
+#res_pro = res(["la", "lala"])
+#for i in res_pro:
+# print(i)
+
+from pywayang.platforms.python.mappings import OperatorMappings
+from pywayang.platforms.python.operators import *
+
+print(OperatorMappings)
+
+pyF = PyFilterOperator()
+print(pyF)
+print(pyF.getInputChannelDescriptors())
+print(type(pyF.getInputChannelDescriptors().pop().create_instance()))
+
+qq : Channel = pyF.getInputChannelDescriptors().pop().create_instance()
+print(qq)
+print(type(qq))
+print("ads")
+
+
+def pre_lala(a:str):
+ print("executed")
+ return len(a) > 3
+
+ou1 = filter(pre_lala, ["la", "lala"])
+print(ou1)
+
+for i in ou1:
+ print(i)
+
+pyFM = OperatorMappings.get_instanceof(filterop)
+print(pyFM)
+print(type(pyFM))
\ No newline at end of file