[WAYANG-#211] restructure the channels for Python-Platform

Signed-off-by: bertty <bertty@apache.org>
diff --git a/python/src/pywy/platforms/commons/__init__.py b/python/src/pywy/platforms/commons/__init__.py
new file mode 100644
index 0000000..8d2bad8
--- /dev/null
+++ b/python/src/pywy/platforms/commons/__init__.py
@@ -0,0 +1,16 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
\ No newline at end of file
diff --git a/python/src/pywy/platforms/commons/channels.py b/python/src/pywy/platforms/commons/channels.py
new file mode 100644
index 0000000..743a716
--- /dev/null
+++ b/python/src/pywy/platforms/commons/channels.py
@@ -0,0 +1,59 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from typing import Callable
+from pywy.core import (Channel, ChannelDescriptor)
+
+
+class CommonsCallableChannel(Channel):
+
+    udf: Callable
+
+    def __init__(self):
+        Channel.__init__(self)
+
+    def provide_callable(self) -> Callable:
+        return self.udf
+
+    def accept_callable(self, udf: Callable) -> 'CommonsCallableChannel':
+        self.udf = udf
+        return self
+
+    @staticmethod
+    def concatenate(function_a: Callable, function_b: Callable):
+        def executable(iterable):
+            return function_a(function_b(iterable))
+        return executable
+
+
+class CommonsFileChannel(Channel):
+
+    path: str
+
+    def __init__(self):
+        Channel.__init__(self)
+
+    def provide_path(self) -> str:
+        return self.path
+
+    def accept_path(self, path: str) -> 'PyIteratorChannel':
+        self.path = path
+        return self
+
+
+COMMONS_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsCallableChannel()), False, False)
+COMMONS_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsFileChannel()), False, False)
diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/python/channels.py
index 0d0f65e..f79a67e 100644
--- a/python/src/pywy/platforms/python/channels.py
+++ b/python/src/pywy/platforms/python/channels.py
@@ -15,7 +15,7 @@
 #  limitations under the License.
 #
 
-from typing import ( Iterable, Callable )
+from typing import Iterable
 from pywy.core import (Channel, ChannelDescriptor)
 
 
@@ -34,42 +34,4 @@
         return self
 
 
-class PyCallableChannel(Channel):
-
-    udf: Callable
-
-    def __init__(self):
-        Channel.__init__(self)
-
-    def provide_callable(self) -> Callable:
-        return self.udf
-
-    def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
-        self.udf = udf
-        return self
-
-    @staticmethod
-    def concatenate(function_a: Callable, function_b: Callable):
-        def executable(iterable):
-            return function_a(function_b(iterable))
-        return executable
-
-
-class PyFileChannel(Channel):
-
-    path: str
-
-    def __init__(self):
-        Channel.__init__(self)
-
-    def provide_path(self) -> str:
-        return self.path
-
-    def accept_path(self, path: str) -> 'PyIteratorChannel':
-        self.path = path
-        return self
-
-
 PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False)
-PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False)
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index 9914382..7d8eec1 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -17,11 +17,10 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.sink import TextFileSink
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 from pywy.platforms.python.channels import (
-    ChannelDescriptor,
     PyIteratorChannel,
     PY_ITERATOR_CHANNEL_DESCRIPTOR
 )
diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py
index 3083188..245d090 100644
--- a/python/src/pywy/platforms/python/operator/py_source_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py
@@ -17,14 +17,13 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.source import TextFileSource
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 from pywy.platforms.python.channels import (
-    ChannelDescriptor,
     PyIteratorChannel,
     PY_ITERATOR_CHANNEL_DESCRIPTOR
-                                            )
+)
 
 
 class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py
index 0788e97..2d80728 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_filter.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py
@@ -17,16 +17,17 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import CH_T, ChannelDescriptor
 from pywy.operators.unary import FilterOperator
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+    COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+    CommonsCallableChannel
+)
 from pywy.platforms.python.channels import (
-                                                ChannelDescriptor,
-                                                PyIteratorChannel,
-                                                PY_ITERATOR_CHANNEL_DESCRIPTOR,
-                                                PY_CALLABLE_CHANNEL_DESCRIPTOR,
-                                                PyCallableChannel
-                                            )
+    PyIteratorChannel,
+    PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
 
 
 class PyFilterOperator(FilterOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@
             py_in_iter_channel: PyIteratorChannel = inputs[0]
             py_out_iter_channel: PyIteratorChannel = outputs[0]
             py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable()))
-        elif isinstance(inputs[0], PyCallableChannel):
-            py_in_call_channel: PyCallableChannel = inputs[0]
-            py_out_call_channel: PyCallableChannel = outputs[0]
+        elif isinstance(inputs[0], CommonsCallableChannel):
+            py_in_call_channel: CommonsCallableChannel = inputs[0]
+            py_out_call_channel: CommonsCallableChannel = outputs[0]
 
             def func(iterator):
                 return filter(udf, iterator)
 
             py_out_call_channel.accept_callable(
-                PyCallableChannel.concatenate(
+                CommonsCallableChannel.concatenate(
                     func,
                     py_in_call_channel.provide_callable()
                 )
@@ -60,7 +61,7 @@
             raise Exception("Channel Type does not supported")
 
     def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
 
     def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
index 97f467d..72016a8 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
@@ -18,16 +18,17 @@
 from itertools import chain
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.unary import FlatmapOperator
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+    COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+    CommonsCallableChannel
+)
 from pywy.platforms.python.channels import (
-                                                ChannelDescriptor,
-                                                PyIteratorChannel,
-                                                PY_ITERATOR_CHANNEL_DESCRIPTOR,
-                                                PY_CALLABLE_CHANNEL_DESCRIPTOR,
-                                                PyCallableChannel
-                                            )
+    PyIteratorChannel,
+    PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
 
 
 class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@
             py_in_iter_channel: PyIteratorChannel = inputs[0]
             py_out_iter_channel: PyIteratorChannel = outputs[0]
             py_out_iter_channel.accept_iterable(chain.from_iterable(map(udf, py_in_iter_channel.provide_iterable())))
-        elif isinstance(inputs[0], PyCallableChannel):
-            py_in_call_channel: PyCallableChannel = inputs[0]
-            py_out_call_channel: PyCallableChannel = outputs[0]
+        elif isinstance(inputs[0], CommonsCallableChannel):
+            py_in_call_channel: CommonsCallableChannel = inputs[0]
+            py_out_call_channel: CommonsCallableChannel = outputs[0]
 
             def fm_func(iterator):
                 return chain.from_iterable(map(udf, iterator))
 
             py_out_call_channel.accept_callable(
-                PyCallableChannel.concatenate(
+                CommonsCallableChannel.concatenate(
                     fm_func,
                     py_in_call_channel.provide_callable()
                 )
@@ -60,7 +61,7 @@
             raise Exception("Channel Type does not supported")
 
     def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
 
     def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py
index b8741a9..a8e53a4 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_map.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_map.py
@@ -17,16 +17,17 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.unary import MapOperator
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+    COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+    CommonsCallableChannel
+)
 from pywy.platforms.python.channels import (
-                                                ChannelDescriptor,
-                                                PyIteratorChannel,
-                                                PY_ITERATOR_CHANNEL_DESCRIPTOR,
-                                                PY_CALLABLE_CHANNEL_DESCRIPTOR,
-                                                PyCallableChannel
-                                            )
+    PyIteratorChannel,
+    PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
 
 
 class PyMapOperator(MapOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@
             py_in_iter_channel: PyIteratorChannel = inputs[0]
             py_out_iter_channel: PyIteratorChannel = outputs[0]
             py_out_iter_channel.accept_iterable(map(udf, py_in_iter_channel.provide_iterable()))
-        elif isinstance(inputs[0], PyCallableChannel):
-            py_in_call_channel: PyCallableChannel = inputs[0]
-            py_out_call_channel: PyCallableChannel = outputs[0]
+        elif isinstance(inputs[0], CommonsCallableChannel):
+            py_in_call_channel: CommonsCallableChannel = inputs[0]
+            py_out_call_channel: CommonsCallableChannel = outputs[0]
 
             def func(iterator):
                 return map(udf, iterator)
 
             py_out_call_channel.accept_callable(
-                PyCallableChannel.concatenate(
+                CommonsCallableChannel.concatenate(
                     func,
                     py_in_call_channel.provide_callable()
                 )
@@ -60,7 +61,7 @@
             raise Exception("Channel Type does not supported")
 
     def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
 
     def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}