[WAYANG-#211] restructure the channels for Python-Platform
Signed-off-by: bertty <bertty@apache.org>
diff --git a/python/src/pywy/platforms/commons/__init__.py b/python/src/pywy/platforms/commons/__init__.py
new file mode 100644
index 0000000..8d2bad8
--- /dev/null
+++ b/python/src/pywy/platforms/commons/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
\ No newline at end of file
diff --git a/python/src/pywy/platforms/commons/channels.py b/python/src/pywy/platforms/commons/channels.py
new file mode 100644
index 0000000..743a716
--- /dev/null
+++ b/python/src/pywy/platforms/commons/channels.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Callable
+from pywy.core import (Channel, ChannelDescriptor)
+
+
+class CommonsCallableChannel(Channel):
+
+ udf: Callable
+
+ def __init__(self):
+ Channel.__init__(self)
+
+ def provide_callable(self) -> Callable:
+ return self.udf
+
+ def accept_callable(self, udf: Callable) -> 'CommonsCallableChannel':
+ self.udf = udf
+ return self
+
+ @staticmethod
+ def concatenate(function_a: Callable, function_b: Callable):
+ def executable(iterable):
+ return function_a(function_b(iterable))
+ return executable
+
+
+class CommonsFileChannel(Channel):
+
+ path: str
+
+ def __init__(self):
+ Channel.__init__(self)
+
+ def provide_path(self) -> str:
+ return self.path
+
+ def accept_path(self, path: str) -> 'PyIteratorChannel':
+ self.path = path
+ return self
+
+
+COMMONS_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsCallableChannel()), False, False)
+COMMONS_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsFileChannel()), False, False)
diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/python/channels.py
index 0d0f65e..f79a67e 100644
--- a/python/src/pywy/platforms/python/channels.py
+++ b/python/src/pywy/platforms/python/channels.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from typing import ( Iterable, Callable )
+from typing import Iterable
from pywy.core import (Channel, ChannelDescriptor)
@@ -34,42 +34,4 @@
return self
-class PyCallableChannel(Channel):
-
- udf: Callable
-
- def __init__(self):
- Channel.__init__(self)
-
- def provide_callable(self) -> Callable:
- return self.udf
-
- def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
- self.udf = udf
- return self
-
- @staticmethod
- def concatenate(function_a: Callable, function_b: Callable):
- def executable(iterable):
- return function_a(function_b(iterable))
- return executable
-
-
-class PyFileChannel(Channel):
-
- path: str
-
- def __init__(self):
- Channel.__init__(self)
-
- def provide_path(self) -> str:
- return self.path
-
- def accept_path(self, path: str) -> 'PyIteratorChannel':
- self.path = path
- return self
-
-
PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False)
-PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False)
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index 9914382..7d8eec1 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -17,11 +17,10 @@
from typing import Set, List, Type
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
from pywy.operators.sink import TextFileSink
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
from pywy.platforms.python.channels import (
- ChannelDescriptor,
PyIteratorChannel,
PY_ITERATOR_CHANNEL_DESCRIPTOR
)
diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py
index 3083188..245d090 100644
--- a/python/src/pywy/platforms/python/operator/py_source_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py
@@ -17,14 +17,13 @@
from typing import Set, List, Type
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
from pywy.operators.source import TextFileSource
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
from pywy.platforms.python.channels import (
- ChannelDescriptor,
PyIteratorChannel,
PY_ITERATOR_CHANNEL_DESCRIPTOR
- )
+)
class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py
index 0788e97..2d80728 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_filter.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py
@@ -17,16 +17,17 @@
from typing import Set, List, Type
-from pywy.core.channel import CH_T
+from pywy.core.channel import CH_T, ChannelDescriptor
from pywy.operators.unary import FilterOperator
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+ COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+ CommonsCallableChannel
+)
from pywy.platforms.python.channels import (
- ChannelDescriptor,
- PyIteratorChannel,
- PY_ITERATOR_CHANNEL_DESCRIPTOR,
- PY_CALLABLE_CHANNEL_DESCRIPTOR,
- PyCallableChannel
- )
+ PyIteratorChannel,
+ PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
class PyFilterOperator(FilterOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@
py_in_iter_channel: PyIteratorChannel = inputs[0]
py_out_iter_channel: PyIteratorChannel = outputs[0]
py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable()))
- elif isinstance(inputs[0], PyCallableChannel):
- py_in_call_channel: PyCallableChannel = inputs[0]
- py_out_call_channel: PyCallableChannel = outputs[0]
+ elif isinstance(inputs[0], CommonsCallableChannel):
+ py_in_call_channel: CommonsCallableChannel = inputs[0]
+ py_out_call_channel: CommonsCallableChannel = outputs[0]
def func(iterator):
return filter(udf, iterator)
py_out_call_channel.accept_callable(
- PyCallableChannel.concatenate(
+ CommonsCallableChannel.concatenate(
func,
py_in_call_channel.provide_callable()
)
@@ -60,7 +61,7 @@
raise Exception("Channel Type does not supported")
def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
index 97f467d..72016a8 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
@@ -18,16 +18,17 @@
from itertools import chain
from typing import Set, List, Type
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
from pywy.operators.unary import FlatmapOperator
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+ COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+ CommonsCallableChannel
+)
from pywy.platforms.python.channels import (
- ChannelDescriptor,
- PyIteratorChannel,
- PY_ITERATOR_CHANNEL_DESCRIPTOR,
- PY_CALLABLE_CHANNEL_DESCRIPTOR,
- PyCallableChannel
- )
+ PyIteratorChannel,
+ PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@
py_in_iter_channel: PyIteratorChannel = inputs[0]
py_out_iter_channel: PyIteratorChannel = outputs[0]
py_out_iter_channel.accept_iterable(chain.from_iterable(map(udf, py_in_iter_channel.provide_iterable())))
- elif isinstance(inputs[0], PyCallableChannel):
- py_in_call_channel: PyCallableChannel = inputs[0]
- py_out_call_channel: PyCallableChannel = outputs[0]
+ elif isinstance(inputs[0], CommonsCallableChannel):
+ py_in_call_channel: CommonsCallableChannel = inputs[0]
+ py_out_call_channel: CommonsCallableChannel = outputs[0]
def fm_func(iterator):
return chain.from_iterable(map(udf, iterator))
py_out_call_channel.accept_callable(
- PyCallableChannel.concatenate(
+ CommonsCallableChannel.concatenate(
fm_func,
py_in_call_channel.provide_callable()
)
@@ -60,7 +61,7 @@
raise Exception("Channel Type does not supported")
def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py
index b8741a9..a8e53a4 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_map.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_map.py
@@ -17,16 +17,17 @@
from typing import Set, List, Type
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
from pywy.operators.unary import MapOperator
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+ COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+ CommonsCallableChannel
+)
from pywy.platforms.python.channels import (
- ChannelDescriptor,
- PyIteratorChannel,
- PY_ITERATOR_CHANNEL_DESCRIPTOR,
- PY_CALLABLE_CHANNEL_DESCRIPTOR,
- PyCallableChannel
- )
+ PyIteratorChannel,
+ PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
class PyMapOperator(MapOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@
py_in_iter_channel: PyIteratorChannel = inputs[0]
py_out_iter_channel: PyIteratorChannel = outputs[0]
py_out_iter_channel.accept_iterable(map(udf, py_in_iter_channel.provide_iterable()))
- elif isinstance(inputs[0], PyCallableChannel):
- py_in_call_channel: PyCallableChannel = inputs[0]
- py_out_call_channel: PyCallableChannel = outputs[0]
+ elif isinstance(inputs[0], CommonsCallableChannel):
+ py_in_call_channel: CommonsCallableChannel = inputs[0]
+ py_out_call_channel: CommonsCallableChannel = outputs[0]
def func(iterator):
return map(udf, iterator)
py_out_call_channel.accept_callable(
- PyCallableChannel.concatenate(
+ CommonsCallableChannel.concatenate(
func,
py_in_call_channel.provide_callable()
)
@@ -60,7 +61,7 @@
raise Exception("Channel Type does not supported")
def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}