[WAYANG-#8] add TextFileSinkOperator
Signed-off-by: bertty <bertty@apache.org>
diff --git a/python/src/pywayang/dataquanta.py b/python/src/pywayang/dataquanta.py
index 3ccc283..9cd892e 100644
--- a/python/src/pywayang/dataquanta.py
+++ b/python/src/pywayang/dataquanta.py
@@ -1,6 +1,7 @@
from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
from pywayang.operator.base import (WyOperator)
from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator)
+from pywayang.operator.sink import TextFileSink
class DataQuanta(GenericTco):
@@ -22,6 +23,10 @@
def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" :
return DataQuanta(FlatmapOperator(f))
+ def storeTextFile(self: "DataQuanta[I]", path: str) :
+ last = DataQuanta(TextFileSink(path))
+ # TODO add the logic to execute the plan
+
def getOperator(self):
return self.operator
diff --git a/python/src/pywayang/operator/sink.py b/python/src/pywayang/operator/sink.py
new file mode 100644
index 0000000..52cbeb0
--- /dev/null
+++ b/python/src/pywayang/operator/sink.py
@@ -0,0 +1,28 @@
+from pywayang.operator.base import WyOperator
+
+class SinkUnaryOperator(WyOperator):
+
+ def __init__(self, name:str):
+ super().__init__(name, None, str, 0, 1)
+
+ def __str__(self):
+ return super().__str__()
+
+ def __repr__(self):
+ return super().__repr__()
+
+
+
+class TextFileSink(SinkUnaryOperator):
+
+ path: str
+
+ def __init__(self, path: str):
+ super().__init__('TextFileSink')
+ self.path = path
+
+ def __str__(self):
+ return super().__str__()
+
+ def __repr__(self):
+ return super().__repr__()
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/mappings.py b/python/src/pywayang/platforms/python/mappings.py
index 55a8018..7060ba0 100644
--- a/python/src/pywayang/platforms/python/mappings.py
+++ b/python/src/pywayang/platforms/python/mappings.py
@@ -33,4 +33,5 @@
OperatorMappings.add_mapping(PyFilterOperator())
OperatorMappings.add_mapping(PyTextFileSourceOperator())
+OperatorMappings.add_mapping(PyTextFileSinkOperator())
diff --git a/python/src/pywayang/platforms/python/operators/PyTextFileSinkOperator.py b/python/src/pywayang/platforms/python/operators/PyTextFileSinkOperator.py
new file mode 100644
index 0000000..6589a63
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/PyTextFileSinkOperator.py
@@ -0,0 +1,36 @@
+from pywayang.operator.sink import TextFileSink
+from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywayang.platforms.python.channels import (
+ Channel,
+ ChannelDescriptor,
+ PyIteratorChannel,
+ PyIteratorChannelDescriptor
+ )
+from typing import Set
+
+class PyTextFileSinkOperator(TextFileSink, PythonExecutionOperator):
+
+ def __init__(self, origin: TextFileSink = None):
+ path = None if origin is None else origin.path
+ super().__init__(path)
+ pass
+
+ def execute(self, inputs: Channel, outputs: Channel):
+ self.validateChannels(inputs, outputs)
+ if isinstance(inputs[0], PyIteratorChannel) :
+ file = open(self.path,'w')
+ py_in_iter_channel: PyIteratorChannel = inputs[0]
+ iterable = py_in_iter_channel.provide_iterable();
+ for element in iterable:
+ file.write(str(element))
+ file.close()
+
+ else:
+ raise Exception("Channel Type does not supported")
+
+
+ def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+ return {PyIteratorChannelDescriptor}
+
+ def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+ raise Exception("The PyTextFileSource does not support Output Channels")
diff --git a/python/src/pywayang/platforms/python/operators/__init__.py b/python/src/pywayang/platforms/python/operators/__init__.py
index 5db9243..7a55542 100644
--- a/python/src/pywayang/platforms/python/operators/__init__.py
+++ b/python/src/pywayang/platforms/python/operators/__init__.py
@@ -1,9 +1,11 @@
from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator
from pywayang.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator
+from pywayang.platforms.python.operators.PyTextFileSinkOperator import PyTextFileSinkOperator
__ALL__ = [
PythonExecutionOperator,
PyFilterOperator,
- PyTextFileSourceOperator
+ PyTextFileSourceOperator,
+ PyTextFileSinkOperator
]
\ No newline at end of file