[WAYANG-#211] seed structure for JVM-Platform

Signed-off-by: bertty <bertty@apache.org>
diff --git a/python/src/pywy/platforms/commons/channels.py b/python/src/pywy/platforms/commons/channels.py
index 743a716..57cebeb 100644
--- a/python/src/pywy/platforms/commons/channels.py
+++ b/python/src/pywy/platforms/commons/channels.py
@@ -17,6 +17,7 @@
 
 from typing import Callable
 from pywy.core import (Channel, ChannelDescriptor)
+from pywy.exception import PywyException
 
 
 class CommonsCallableChannel(Channel):
@@ -35,6 +36,11 @@
 
     @staticmethod
     def concatenate(function_a: Callable, function_b: Callable):
+        if function_a is None:
+            raise PywyException("the function_a can't be None")
+        if function_b is None:
+            return function_a
+
         def executable(iterable):
             return function_a(function_b(iterable))
         return executable
diff --git a/python/src/pywy/platforms/jvm/__init__.py b/python/src/pywy/platforms/jvm/__init__.py
new file mode 100644
index 0000000..d9e26de
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/__init__.py
@@ -0,0 +1,16 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
diff --git a/python/src/pywy/platforms/jvm/channels.py b/python/src/pywy/platforms/jvm/channels.py
new file mode 100644
index 0000000..9c1e76f
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/channels.py
@@ -0,0 +1,47 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+from typing import Callable
+
+from pywy.core import (Channel, ChannelDescriptor)
+from pywy.exception import PywyException
+from pywy.platforms.commons.channels import CommonsCallableChannel
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator, WayangJVMMappartitionOperator
+
+
+class DispatchableChannel(CommonsCallableChannel):
+
+    operator: WayangJVMOperator
+
+    def __init__(self):
+        Channel.__init__(self)
+        self.udf = None
+        self.operator = None
+
+    def provide_dispatchable(self, do_wrapper: bool = False) -> WayangJVMOperator:
+        if self.operator is None:
+            raise PywyException("The operator was not define")
+        if do_wrapper:
+            self.operator.udf = self.udf
+
+        return self.operator
+
+    def accept_dispatchable(self, operator: WayangJVMOperator) -> 'WayangJVMOperator':
+        self.operator = operator
+        return self
+
+
+DISPATCHABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(DispatchableChannel()), True, True)
diff --git a/python/src/pywy/platforms/jvm/execution.py b/python/src/pywy/platforms/jvm/execution.py
new file mode 100644
index 0000000..2dad76d
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/execution.py
@@ -0,0 +1,99 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from pywy.core import Executor, ChannelDescriptor
+from pywy.core import PywyPlan
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR
+from pywy.platforms.jvm.graph import NodeDispatch, WGraphDispatch
+from pywy.platforms.jvm.operator import JVMExecutionOperator
+
+class JVMExecutor(Executor):
+
+    def __init__(self):
+        super(JVMExecutor, self).__init__()
+
+    def execute(self, plan):
+        pywyPlan: PywyPlan = plan
+        graph = WGraphDispatch(pywyPlan.sinks)
+
+        # TODO get this information by a configuration and ideally by the context
+        descriptor_default: ChannelDescriptor = DISPATCHABLE_CHANNEL_DESCRIPTOR
+
+        def execute(op_current: NodeDispatch, op_next: NodeDispatch):
+            if op_current is None:
+                return
+
+            jvm_current: JVMExecutionOperator = op_current.current
+            if jvm_current.outputs == 0:
+                jvm_current.execute(jvm_current.inputChannel, [])
+                return
+
+            if op_next is None:
+                return
+
+            jvm_next: JVMExecutionOperator = op_next.current
+            outputs = jvm_current.get_output_channeldescriptors()
+            inputs = jvm_next.get_input_channeldescriptors()
+
+            intersect = outputs.intersection(inputs)
+            if len(intersect) == 0:
+                raise Exception(
+                    "The operator(A) {} can't connect with (B) {}, "
+                    "because the output of (A) is {} and the input of (B) is {} ".format(
+                        jvm_current,
+                        jvm_next,
+                        outputs,
+                        inputs
+                    )
+                )
+
+            if len(intersect) > 1:
+                if descriptor_default is None:
+                    raise Exception(
+                        "The interaction between the operator (A) {} and (B) {}, "
+                        "can't be decided because are several channel availables {}".format(
+                            jvm_current,
+                            jvm_next,
+                            intersect
+                        )
+                    )
+                descriptor = descriptor_default
+            else:
+                descriptor = intersect.pop()
+
+            # TODO validate if is valite for several output
+            jvm_current.outputChannel[0] = descriptor.create_instance()
+
+            jvm_current.execute(jvm_current.inputChannel, jvm_current.outputChannel)
+
+            jvm_next.inputChannel = jvm_current.outputChannel
+
+        graph.traversal(graph.starting_nodes, execute)
+
+
+
+        starting: WayangJVMOperator = graph.starting_nodes[0].current.dispatch_operator
+        while starting.previous[0]:
+            print(starting)
+            #print(starting.nexts[0])
+            starting = starting.previous[0]
+            if len(starting.previous) == 0 :
+                break
+        print(starting)
+
+
+
diff --git a/python/src/pywy/platforms/jvm/graph.py b/python/src/pywy/platforms/jvm/graph.py
new file mode 100644
index 0000000..7863651
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/graph.py
@@ -0,0 +1,52 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+from typing import Iterable, List, Tuple
+
+from pywy.graph.graph import WayangGraph, GraphNode
+from pywy.operators.base import PO_T
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WJO_T
+
+
+class NodeDispatch(GraphNode[PO_T, PO_T]):
+    wop: WJO_T
+
+    def __init__(self, op: PO_T):
+        super(NodeDispatch, self).__init__(op)
+
+    def get_adjacents(self) -> List[PO_T]:
+        operator: PO_T = self.current
+        if operator is None or operator.inputs == 0:
+            return []
+        return operator.inputOperator
+
+    def build_node(self, t: PO_T) -> 'NodeDispatch':
+        return NodeDispatch(t)
+
+    def __str__(self):
+        return "NodeDispatch {}".format(self.current)
+
+    def __repr__(self):
+        return self.__str__()
+
+
+class WGraphDispatch(WayangGraph[PO_T, NodeDispatch]):
+
+    def __init__(self, nodes: Iterable[PO_T]):
+        super(WGraphDispatch, self).__init__(nodes)
+
+    def build_node(self, t: PO_T) -> NodeDispatch:
+        return NodeDispatch(t)
diff --git a/python/src/pywy/platforms/jvm/mappings.py b/python/src/pywy/platforms/jvm/mappings.py
new file mode 100644
index 0000000..4709a73
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/mappings.py
@@ -0,0 +1,26 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from pywy.core import Mapping
+from pywy.platforms.jvm.operator import *
+
+
+JVM_OPERATOR_MAPPINGS = Mapping()
+
+JVM_OPERATOR_MAPPINGS.add_mapping(JVMFilterOperator())
+JVM_OPERATOR_MAPPINGS.add_mapping(JVMTextFileSourceOperator())
+JVM_OPERATOR_MAPPINGS.add_mapping(JVMTextFileSinkOperator())
diff --git a/python/src/pywy/platforms/jvm/operator/__init__.py b/python/src/pywy/platforms/jvm/operator/__init__.py
new file mode 100644
index 0000000..a0bc5d2
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/__init__.py
@@ -0,0 +1,28 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.jvm.operator.jvm_unary_filter import JVMFilterOperator
+from pywy.platforms.jvm.operator.jvm_source_textfile import JVMTextFileSourceOperator
+from pywy.platforms.jvm.operator.jvm_sink_textfile import JVMTextFileSinkOperator
+
+__ALL__ = [
+    JVMExecutionOperator,
+    JVMFilterOperator,
+    JVMTextFileSourceOperator,
+    JVMTextFileSinkOperator,
+]
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
new file mode 100644
index 0000000..0049c5c
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
@@ -0,0 +1,33 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from typing import List, Type
+
+from pywy.core.channel import CH_T
+from pywy.operators.base import PywyOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator
+
+
+class JVMExecutionOperator(PywyOperator):
+
+    dispatch_operator: WayangJVMOperator
+
+    def prefix(self) -> str:
+        return 'JVM'
+
+    def execute(self, inputs: List[Type[CH_T]], output: List[CH_T]):
+        pass
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
new file mode 100644
index 0000000..555ba04
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
@@ -0,0 +1,56 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from typing import Set, List, Type
+
+from pywy.core.channel import (CH_T, ChannelDescriptor)
+from pywy.exception import PywyException
+from pywy.operators.sink import TextFileSink
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSink
+
+
+class JVMTextFileSinkOperator(TextFileSink, JVMExecutionOperator):
+
+    def __init__(self, origin: TextFileSink = None):
+        path = None if origin is None else origin.path
+        type_class = None if origin is None else origin.inputSlot[0]
+        end_line = None if origin is None else origin.end_line
+        super().__init__(path, type_class, end_line)
+
+    def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+        self.validate_channels(inputs, outputs)
+
+        if isinstance(inputs[0], DispatchableChannel):
+
+            py_in_dispatch_channel: DispatchableChannel = inputs[0]
+            operator = py_in_dispatch_channel.provide_dispatchable(do_wrapper=True)
+
+            sink: WayangJVMTextFileSink = WayangJVMTextFileSink(self.name, self.path)
+
+            operator.connect_to(0, sink, 0)
+
+            self.dispatch_operator = sink
+        else:
+            raise Exception("Channel Type does not supported")
+
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
+
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        raise Exception("The JVMTextFileSink does not support Output Channels")
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
new file mode 100644
index 0000000..3ca3b91
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
@@ -0,0 +1,48 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from typing import Set, List, Type
+
+from pywy.core.channel import (CH_T, ChannelDescriptor)
+from pywy.operators.source import TextFileSource
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSource
+
+
+class JVMTextFileSourceOperator(TextFileSource, JVMExecutionOperator):
+
+    def __init__(self, origin: TextFileSource = None):
+        path = None if origin is None else origin.path
+        super().__init__(path)
+        pass
+
+    def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+        self.validate_channels(inputs, outputs)
+        if isinstance(outputs[0], DispatchableChannel):
+            py_out_dispatch_channel: DispatchableChannel = outputs[0]
+            py_out_dispatch_channel.accept_dispatchable(
+                WayangJVMTextFileSource(self.name, self.path)
+            )
+        else:
+            raise Exception("Channel Type does not supported")
+
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        raise Exception("The JVMTextFileSource does not support Input Channels")
+
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
new file mode 100644
index 0000000..06ba9e1
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
@@ -0,0 +1,72 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T, ChannelDescriptor
+from pywy.operators.unary import FilterOperator
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.commons.channels import (
+    CommonsCallableChannel
+)
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMMappartitionOperator, WayangJVMOperator
+
+
+class JVMFilterOperator(FilterOperator, JVMExecutionOperator):
+
+    def __init__(self, origin: FilterOperator = None):
+        predicate = None if origin is None else origin.predicate
+        super().__init__(predicate)
+        pass
+
+    def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+        self.validate_channels(inputs, outputs)
+        udf = self.predicate
+        if isinstance(inputs[0], DispatchableChannel):
+            py_in_dispatch_channel: DispatchableChannel = inputs[0]
+            py_out_dispatch_channel: DispatchableChannel = outputs[0]
+
+            def func(iterator):
+                return filter(udf, iterator)
+
+            py_out_dispatch_channel.accept_callable(
+                CommonsCallableChannel.concatenate(
+                    func,
+                    py_in_dispatch_channel.provide_callable()
+                )
+            )
+
+            op: WayangJVMOperator = py_in_dispatch_channel.provide_dispatchable()
+
+            if isinstance(op, WayangJVMMappartitionOperator):
+                py_out_dispatch_channel.accept_dispatchable(op)
+                return
+
+            current: WayangJVMMappartitionOperator = WayangJVMMappartitionOperator(self.name)
+            # TODO check for the case where the index matter
+            op.connect_to(0, current, 0)
+            py_out_dispatch_channel.accept_dispatchable(current)
+
+        else:
+            raise Exception("Channel Type does not supported")
+
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
+
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/jvm/platform.py b/python/src/pywy/platforms/jvm/platform.py
new file mode 100644
index 0000000..d52ac47
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/platform.py
@@ -0,0 +1,24 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from pywy.core.platform import Platform
+
+
+class JVMPlatform(Platform):
+
+    def __init__(self):
+        super(JVMPlatform, self).__init__("JVM")
diff --git a/python/src/pywy/platforms/jvm/plugin.py b/python/src/pywy/platforms/jvm/plugin.py
new file mode 100644
index 0000000..eff1e04
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/plugin.py
@@ -0,0 +1,31 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from pywy.core import Executor
+from pywy.core import Plugin
+from pywy.platforms.jvm.execution import JVMExecutor
+from pywy.platforms.jvm.mappings import JVM_OPERATOR_MAPPINGS
+from pywy.platforms.jvm.platform import JVMPlatform
+
+
+class JVMPlugin(Plugin):
+
+    def __init__(self):
+        super(JVMPlugin, self).__init__({JVMPlatform()}, JVM_OPERATOR_MAPPINGS)
+
+    def get_executor(self) -> Executor:
+        return JVMExecutor()
diff --git a/python/src/pywy/platforms/jvm/serializable/__init__.py b/python/src/pywy/platforms/jvm/serializable/__init__.py
new file mode 100644
index 0000000..d9e26de
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/serializable/__init__.py
@@ -0,0 +1,16 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
diff --git a/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
new file mode 100644
index 0000000..d18f155
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
@@ -0,0 +1,96 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+from typing import Callable, List, TypeVar
+
+from pywy.exception import PywyException
+
+
+class WayangJVMOperator:
+
+    kind: str
+    name: str
+    path: str
+    udf: Callable
+
+    previous: List['WayangJVMOperator']
+    nexts: List['WayangJVMOperator']
+
+    def __init__(self, kind, name):
+        self.name = name
+        self.kind = kind
+        self.previous = []
+        self.nexts = []
+
+    def validate_vector(self, vect: List['WayangJVMOperator'], index: int, op: 'WayangJVMOperator' = None):
+        if op is None:
+            op = self
+
+        if vect is None or len(vect) == 0:
+            vect = [None] * (index + 1)
+
+        if len(vect) < index:
+            vect.extend([None for i in range(index + 1 - len(vect))])
+
+        if vect[index] is not None:
+            raise PywyException(
+                'the position in the index "{}" is already in use for "{}" in the operator "{}"'.format(
+                    index,
+                    vect[index],
+                    op
+                )
+            )
+
+        return vect
+
+    def connect_to(self, nexts_index: int, operator: 'WayangJVMOperator', previous_index: int) -> 'WayangJVMOperator':
+        operator.previous = self.validate_vector(operator.previous, previous_index, operator)
+        self.nexts = self.validate_vector(self.nexts, nexts_index)
+
+        self.nexts[nexts_index] = operator
+        operator.previous[previous_index] = self
+        return self
+
+    def __str__(self):
+        return "WayangJVMOperator {}, previous.[{}], nexts.[{}]".format(
+            self.name,
+            self.previous,
+            self.nexts
+        )
+
+WJO_T = TypeVar('WJO_T', bound=WayangJVMOperator)
+
+
+class WayangJVMMappartitionOperator(WayangJVMOperator):
+
+    def __init__(self, name: str, udf: Callable = None):
+        super().__init__("MapPartitionOperator", name)
+        self.udf = udf
+
+
+class WayangJVMTextFileSource(WayangJVMOperator):
+
+    def __init__(self, name: str, path: str):
+        super().__init__("TextFileSource", name)
+        self.path = path
+
+
+class WayangJVMTextFileSink(WayangJVMOperator):
+
+    def __init__(self, name: str, path: str):
+        super().__init__("TextFileSink", name)
+        self.path = path
+
diff --git a/python/src/pywy/plugins.py b/python/src/pywy/plugins.py
index e3509b6..777d297 100644
--- a/python/src/pywy/plugins.py
+++ b/python/src/pywy/plugins.py
@@ -17,6 +17,7 @@
 
 from pywy.core.platform import Platform
 from pywy.core import Plugin
+from pywy.platforms.jvm.plugin import JVMPlugin
 from pywy.platforms.python.plugin import PythonPlugin
 
 # define the basic plugins that can be used
@@ -25,3 +26,4 @@
 FLINK = Plugin({Platform('flink')})
 # plugin for the python platform
 PYTHON = PythonPlugin()
+JVMs = JVMPlugin()
diff --git a/python/src/pywy/tests/integration/jvm_platform_test.py b/python/src/pywy/tests/integration/jvm_platform_test.py
new file mode 100644
index 0000000..987e733
--- /dev/null
+++ b/python/src/pywy/tests/integration/jvm_platform_test.py
@@ -0,0 +1,144 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+import logging
+import os
+import unittest
+import tempfile
+from itertools import chain
+from typing import List, Iterable
+
+from pywy.config import RC_TEST_DIR as ROOT
+from pywy.dataquanta import WayangContext
+from pywy.plugins import JVMs
+
+logger = logging.getLogger(__name__)
+
+
+class TestIntegrationJVMPlatform(unittest.TestCase):
+
+    file_10e0: str
+
+    def setUp(self):
+        self.file_10e0 = "{}/10e0MB.input".format(ROOT)
+
+    @staticmethod
+    def seed_small_grep(validation_file):
+        def pre(a: str) -> bool:
+            return 'six' in a
+
+        fd, path_tmp = tempfile.mkstemp()
+
+        dq = WayangContext() \
+            .register(JVMs) \
+            .textfile(validation_file) \
+            .filter(pre)
+
+        return dq, path_tmp, pre
+
+    def validate_files(self,
+                       validation_file,
+                       outputed_file,
+                       read_and_convert_validation,
+                       read_and_convert_outputed,
+                       delete_outputed=True,
+                       print_variable=False):
+        lines_filter: List[int]
+        with open(validation_file, 'r') as f:
+            lines_filter = list(read_and_convert_validation(f))
+            selectivity = len(lines_filter)
+
+        lines_platform: List[int]
+        with open(outputed_file, 'r') as fp:
+            lines_platform = list(read_and_convert_outputed(fp))
+            elements = len(lines_platform)
+
+        if delete_outputed:
+            os.remove(outputed_file)
+
+        if print_variable:
+            logger.info(f"{lines_platform=}")
+            logger.info(f"{lines_filter=}")
+            logger.info(f"{elements=}")
+            logger.info(f"{selectivity=}")
+
+        self.assertEqual(selectivity, elements)
+        self.assertEqual(lines_filter, lines_platform)
+
+    def test_grep(self):
+
+        dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
+
+        dq.store_textfile(path_tmp)
+
+        def convert_validation(file):
+            return filter(pre, file.readlines())
+
+        def convert_outputed(file):
+            return file.readlines()
+
+        self.validate_files(
+            self.file_10e0,
+            path_tmp,
+            convert_validation,
+            convert_outputed
+        )
+
+    # def test_dummy_map(self):
+    #
+    #     def convert(a: str) -> int:
+    #         return len(a)
+    #
+    #     dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
+    #
+    #     dq.map(convert) \
+    #         .store_textfile(path_tmp)
+    #
+    #     def convert_validation(file):
+    #         return map(convert, filter(pre, file.readlines()))
+    #
+    #     def convert_outputed(file):
+    #         return map(lambda x: int(x), file.read().splitlines())
+    #
+    #     self.validate_files(
+    #         self.file_10e0,
+    #         path_tmp,
+    #         convert_validation,
+    #         convert_outputed
+    #     )
+    #
+    # def test_dummy_flatmap(self):
+    #     def fm_func(string: str) -> Iterable[str]:
+    #         return string.strip().split(" ")
+    #
+    #     dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
+    #
+    #     dq.flatmap(fm_func) \
+    #         .store_textfile(path_tmp, '\n')
+    #
+    #     def convert_validation(file):
+    #         return chain.from_iterable(map(fm_func, filter(pre, file.readlines())))
+    #
+    #     def convert_outputed(file):
+    #         return file.read().splitlines()
+    #
+    #     self.validate_files(
+    #         self.file_10e0,
+    #         path_tmp,
+    #         convert_validation,
+    #         convert_outputed
+    #     )