[WAYANG-#8] add TextFileSourceOperator
Signed-off-by: bertty <bertty@apache.org>
diff --git a/python/src/pywayang/platforms/python/channels.py b/python/src/pywayang/platforms/python/channels.py
index f611a25..a286367 100644
--- a/python/src/pywayang/platforms/python/channels.py
+++ b/python/src/pywayang/platforms/python/channels.py
@@ -32,7 +32,7 @@
def provide_iterable(self) -> Iterable:
return self.iterable
- def accept_iterable(self, iterable) -> 'PyIteratorChannel':
+ def accept_iterable(self, iterable: Iterable) -> 'PyIteratorChannel':
self.iterable = iterable
return self
@@ -56,5 +56,20 @@
return function_a(function_b(iterable))
return executable
+class PyFileChannel(Channel):
+
+ path : str
+
+ def __init__(self):
+ Channel.__init__(self)
+
+ def provide_path(self) -> str:
+ return self.path
+
+ def accept_path(self, path: str) -> 'PyIteratorChannel':
+ self.path = path
+ return self
+
PyIteratorChannelDescriptor = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False)
\ No newline at end of file
+PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False)
+PyFileChannelDescriptor = ChannelDescriptor(type(PyFileChannel()), False, False)
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/mappings.py b/python/src/pywayang/platforms/python/mappings.py
index 977ccad..55a8018 100644
--- a/python/src/pywayang/platforms/python/mappings.py
+++ b/python/src/pywayang/platforms/python/mappings.py
@@ -32,4 +32,5 @@
OperatorMappings = Mapping()
OperatorMappings.add_mapping(PyFilterOperator())
+OperatorMappings.add_mapping(PyTextFileSourceOperator())
diff --git a/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py b/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py
new file mode 100644
index 0000000..ccfbec4
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py
@@ -0,0 +1,38 @@
+from pywayang.operator.source import TextFileSource
+from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywayang.platforms.python.channels import (
+ Channel,
+ ChannelDescriptor,
+ PyIteratorChannel,
+ PyIteratorChannelDescriptor,
+ PyFileChannelDescriptor
+ )
+from typing import Set
+
+class PyTextFileSourceOperator(TextFileSource, PythonExecutionOperator):
+
+ def __init__(self, origin: TextFileSource = None):
+ path = None if origin is None else origin.path
+ super().__init__(path)
+ pass
+
+ def execute(self, inputs: Channel, outputs: Channel):
+ self.validateChannels(inputs, outputs)
+ if isinstance(outputs[0], PyIteratorChannel) :
+ py_out_iter_channel: PyIteratorChannel = outputs[0]
+ py_out_iter_channel.accept_iterable(
+ open(
+ self.path,
+ 'r'
+ )
+ )
+
+ else:
+ raise Exception("Channel Type does not supported")
+
+
+ def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+ raise Exception("The PyTextFileSource does not support Input Channels")
+
+ def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+ return {PyIteratorChannelDescriptor}
diff --git a/python/src/pywayang/platforms/python/operators/__init__.py b/python/src/pywayang/platforms/python/operators/__init__.py
index 208a2fc..5db9243 100644
--- a/python/src/pywayang/platforms/python/operators/__init__.py
+++ b/python/src/pywayang/platforms/python/operators/__init__.py
@@ -1,7 +1,9 @@
from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator
+from pywayang.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator
__ALL__ = [
PythonExecutionOperator,
- PyFilterOperator
+ PyFilterOperator,
+ PyTextFileSourceOperator
]
\ No newline at end of file