[WAYANG-#211] seed structure for JVM-Platform
Signed-off-by: bertty <bertty@apache.org>
diff --git a/python/src/pywy/platforms/commons/channels.py b/python/src/pywy/platforms/commons/channels.py
index 743a716..57cebeb 100644
--- a/python/src/pywy/platforms/commons/channels.py
+++ b/python/src/pywy/platforms/commons/channels.py
@@ -17,6 +17,7 @@
from typing import Callable
from pywy.core import (Channel, ChannelDescriptor)
+from pywy.exception import PywyException
class CommonsCallableChannel(Channel):
@@ -35,6 +36,11 @@
@staticmethod
def concatenate(function_a: Callable, function_b: Callable):
+ if function_a is None:
+ raise PywyException("the function_a can't be None")
+ if function_b is None:
+ return function_a
+
def executable(iterable):
return function_a(function_b(iterable))
return executable
diff --git a/python/src/pywy/platforms/jvm/__init__.py b/python/src/pywy/platforms/jvm/__init__.py
new file mode 100644
index 0000000..d9e26de
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/python/src/pywy/platforms/jvm/channels.py b/python/src/pywy/platforms/jvm/channels.py
new file mode 100644
index 0000000..9c1e76f
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/channels.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Callable
+
+from pywy.core import (Channel, ChannelDescriptor)
+from pywy.exception import PywyException
+from pywy.platforms.commons.channels import CommonsCallableChannel
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator, WayangJVMMappartitionOperator
+
+
+class DispatchableChannel(CommonsCallableChannel):
+
+ operator: WayangJVMOperator
+
+ def __init__(self):
+ Channel.__init__(self)
+ self.udf = None
+ self.operator = None
+
+ def provide_dispatchable(self, do_wrapper: bool = False) -> WayangJVMOperator:
+ if self.operator is None:
+ raise PywyException("The operator was not define")
+ if do_wrapper:
+ self.operator.udf = self.udf
+
+ return self.operator
+
+ def accept_dispatchable(self, operator: WayangJVMOperator) -> 'WayangJVMOperator':
+ self.operator = operator
+ return self
+
+
+DISPATCHABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(DispatchableChannel()), True, True)
diff --git a/python/src/pywy/platforms/jvm/execution.py b/python/src/pywy/platforms/jvm/execution.py
new file mode 100644
index 0000000..2dad76d
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/execution.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pywy.core import Executor, ChannelDescriptor
+from pywy.core import PywyPlan
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR
+from pywy.platforms.jvm.graph import NodeDispatch, WGraphDispatch
+from pywy.platforms.jvm.operator import JVMExecutionOperator
+
+class JVMExecutor(Executor):
+
+ def __init__(self):
+ super(JVMExecutor, self).__init__()
+
+ def execute(self, plan):
+ pywyPlan: PywyPlan = plan
+ graph = WGraphDispatch(pywyPlan.sinks)
+
+ # TODO get this information by a configuration and ideally by the context
+ descriptor_default: ChannelDescriptor = DISPATCHABLE_CHANNEL_DESCRIPTOR
+
+ def execute(op_current: NodeDispatch, op_next: NodeDispatch):
+ if op_current is None:
+ return
+
+ jvm_current: JVMExecutionOperator = op_current.current
+ if jvm_current.outputs == 0:
+ jvm_current.execute(jvm_current.inputChannel, [])
+ return
+
+ if op_next is None:
+ return
+
+ jvm_next: JVMExecutionOperator = op_next.current
+ outputs = jvm_current.get_output_channeldescriptors()
+ inputs = jvm_next.get_input_channeldescriptors()
+
+ intersect = outputs.intersection(inputs)
+ if len(intersect) == 0:
+ raise Exception(
+ "The operator(A) {} can't connect with (B) {}, "
+ "because the output of (A) is {} and the input of (B) is {} ".format(
+ jvm_current,
+ jvm_next,
+ outputs,
+ inputs
+ )
+ )
+
+ if len(intersect) > 1:
+ if descriptor_default is None:
+ raise Exception(
+ "The interaction between the operator (A) {} and (B) {}, "
+ "can't be decided because are several channel availables {}".format(
+ jvm_current,
+ jvm_next,
+ intersect
+ )
+ )
+ descriptor = descriptor_default
+ else:
+ descriptor = intersect.pop()
+
+ # TODO validate if is valite for several output
+ jvm_current.outputChannel[0] = descriptor.create_instance()
+
+ jvm_current.execute(jvm_current.inputChannel, jvm_current.outputChannel)
+
+ jvm_next.inputChannel = jvm_current.outputChannel
+
+ graph.traversal(graph.starting_nodes, execute)
+
+
+
+ starting: WayangJVMOperator = graph.starting_nodes[0].current.dispatch_operator
+ while starting.previous[0]:
+ print(starting)
+ #print(starting.nexts[0])
+ starting = starting.previous[0]
+ if len(starting.previous) == 0 :
+ break
+ print(starting)
+
+
+
diff --git a/python/src/pywy/platforms/jvm/graph.py b/python/src/pywy/platforms/jvm/graph.py
new file mode 100644
index 0000000..7863651
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/graph.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Iterable, List, Tuple
+
+from pywy.graph.graph import WayangGraph, GraphNode
+from pywy.operators.base import PO_T
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WJO_T
+
+
+class NodeDispatch(GraphNode[PO_T, PO_T]):
+ wop: WJO_T
+
+ def __init__(self, op: PO_T):
+ super(NodeDispatch, self).__init__(op)
+
+ def get_adjacents(self) -> List[PO_T]:
+ operator: PO_T = self.current
+ if operator is None or operator.inputs == 0:
+ return []
+ return operator.inputOperator
+
+ def build_node(self, t: PO_T) -> 'NodeDispatch':
+ return NodeDispatch(t)
+
+ def __str__(self):
+ return "NodeDispatch {}".format(self.current)
+
+ def __repr__(self):
+ return self.__str__()
+
+
+class WGraphDispatch(WayangGraph[PO_T, NodeDispatch]):
+
+ def __init__(self, nodes: Iterable[PO_T]):
+ super(WGraphDispatch, self).__init__(nodes)
+
+ def build_node(self, t: PO_T) -> NodeDispatch:
+ return NodeDispatch(t)
diff --git a/python/src/pywy/platforms/jvm/mappings.py b/python/src/pywy/platforms/jvm/mappings.py
new file mode 100644
index 0000000..4709a73
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/mappings.py
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pywy.core import Mapping
+from pywy.platforms.jvm.operator import *
+
+
+JVM_OPERATOR_MAPPINGS = Mapping()
+
+JVM_OPERATOR_MAPPINGS.add_mapping(JVMFilterOperator())
+JVM_OPERATOR_MAPPINGS.add_mapping(JVMTextFileSourceOperator())
+JVM_OPERATOR_MAPPINGS.add_mapping(JVMTextFileSinkOperator())
diff --git a/python/src/pywy/platforms/jvm/operator/__init__.py b/python/src/pywy/platforms/jvm/operator/__init__.py
new file mode 100644
index 0000000..a0bc5d2
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/__init__.py
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.jvm.operator.jvm_unary_filter import JVMFilterOperator
+from pywy.platforms.jvm.operator.jvm_source_textfile import JVMTextFileSourceOperator
+from pywy.platforms.jvm.operator.jvm_sink_textfile import JVMTextFileSinkOperator
+
+__ALL__ = [
+ JVMExecutionOperator,
+ JVMFilterOperator,
+ JVMTextFileSourceOperator,
+ JVMTextFileSinkOperator,
+]
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
new file mode 100644
index 0000000..0049c5c
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import List, Type
+
+from pywy.core.channel import CH_T
+from pywy.operators.base import PywyOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator
+
+
+class JVMExecutionOperator(PywyOperator):
+
+ dispatch_operator: WayangJVMOperator
+
+ def prefix(self) -> str:
+ return 'JVM'
+
+ def execute(self, inputs: List[Type[CH_T]], output: List[CH_T]):
+ pass
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
new file mode 100644
index 0000000..555ba04
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Set, List, Type
+
+from pywy.core.channel import (CH_T, ChannelDescriptor)
+from pywy.exception import PywyException
+from pywy.operators.sink import TextFileSink
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSink
+
+
+class JVMTextFileSinkOperator(TextFileSink, JVMExecutionOperator):
+
+ def __init__(self, origin: TextFileSink = None):
+ path = None if origin is None else origin.path
+ type_class = None if origin is None else origin.inputSlot[0]
+ end_line = None if origin is None else origin.end_line
+ super().__init__(path, type_class, end_line)
+
+ def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+ self.validate_channels(inputs, outputs)
+
+ if isinstance(inputs[0], DispatchableChannel):
+
+ py_in_dispatch_channel: DispatchableChannel = inputs[0]
+ operator = py_in_dispatch_channel.provide_dispatchable(do_wrapper=True)
+
+ sink: WayangJVMTextFileSink = WayangJVMTextFileSink(self.name, self.path)
+
+ operator.connect_to(0, sink, 0)
+
+ self.dispatch_operator = sink
+ else:
+ raise Exception("Channel Type does not supported")
+
+ def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+ return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
+
+ def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+ raise Exception("The JVMTextFileSink does not support Output Channels")
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
new file mode 100644
index 0000000..3ca3b91
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Set, List, Type
+
+from pywy.core.channel import (CH_T, ChannelDescriptor)
+from pywy.operators.source import TextFileSource
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSource
+
+
+class JVMTextFileSourceOperator(TextFileSource, JVMExecutionOperator):
+
+ def __init__(self, origin: TextFileSource = None):
+ path = None if origin is None else origin.path
+ super().__init__(path)
+ pass
+
+ def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+ self.validate_channels(inputs, outputs)
+ if isinstance(outputs[0], DispatchableChannel):
+ py_out_dispatch_channel: DispatchableChannel = outputs[0]
+ py_out_dispatch_channel.accept_dispatchable(
+ WayangJVMTextFileSource(self.name, self.path)
+ )
+ else:
+ raise Exception("Channel Type does not supported")
+
+ def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+ raise Exception("The JVMTextFileSource does not support Input Channels")
+
+ def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+ return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
new file mode 100644
index 0000000..06ba9e1
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T, ChannelDescriptor
+from pywy.operators.unary import FilterOperator
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.commons.channels import (
+ CommonsCallableChannel
+)
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMMappartitionOperator, WayangJVMOperator
+
+
+class JVMFilterOperator(FilterOperator, JVMExecutionOperator):
+
+ def __init__(self, origin: FilterOperator = None):
+ predicate = None if origin is None else origin.predicate
+ super().__init__(predicate)
+ pass
+
+ def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+ self.validate_channels(inputs, outputs)
+ udf = self.predicate
+ if isinstance(inputs[0], DispatchableChannel):
+ py_in_dispatch_channel: DispatchableChannel = inputs[0]
+ py_out_dispatch_channel: DispatchableChannel = outputs[0]
+
+ def func(iterator):
+ return filter(udf, iterator)
+
+ py_out_dispatch_channel.accept_callable(
+ CommonsCallableChannel.concatenate(
+ func,
+ py_in_dispatch_channel.provide_callable()
+ )
+ )
+
+ op: WayangJVMOperator = py_in_dispatch_channel.provide_dispatchable()
+
+ if isinstance(op, WayangJVMMappartitionOperator):
+ py_out_dispatch_channel.accept_dispatchable(op)
+ return
+
+ current: WayangJVMMappartitionOperator = WayangJVMMappartitionOperator(self.name)
+ # TODO check for the case where the index matter
+ op.connect_to(0, current, 0)
+ py_out_dispatch_channel.accept_dispatchable(current)
+
+ else:
+ raise Exception("Channel Type does not supported")
+
+ def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+ return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
+
+ def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+ return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/jvm/platform.py b/python/src/pywy/platforms/jvm/platform.py
new file mode 100644
index 0000000..d52ac47
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/platform.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pywy.core.platform import Platform
+
+
+class JVMPlatform(Platform):
+
+ def __init__(self):
+ super(JVMPlatform, self).__init__("JVM")
diff --git a/python/src/pywy/platforms/jvm/plugin.py b/python/src/pywy/platforms/jvm/plugin.py
new file mode 100644
index 0000000..eff1e04
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/plugin.py
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pywy.core import Executor
+from pywy.core import Plugin
+from pywy.platforms.jvm.execution import JVMExecutor
+from pywy.platforms.jvm.mappings import JVM_OPERATOR_MAPPINGS
+from pywy.platforms.jvm.platform import JVMPlatform
+
+
+class JVMPlugin(Plugin):
+
+ def __init__(self):
+ super(JVMPlugin, self).__init__({JVMPlatform()}, JVM_OPERATOR_MAPPINGS)
+
+ def get_executor(self) -> Executor:
+ return JVMExecutor()
diff --git a/python/src/pywy/platforms/jvm/serializable/__init__.py b/python/src/pywy/platforms/jvm/serializable/__init__.py
new file mode 100644
index 0000000..d9e26de
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/serializable/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
new file mode 100644
index 0000000..d18f155
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Callable, List, TypeVar
+
+from pywy.exception import PywyException
+
+
+class WayangJVMOperator:
+
+ kind: str
+ name: str
+ path: str
+ udf: Callable
+
+ previous: List['WayangJVMOperator']
+ nexts: List['WayangJVMOperator']
+
+ def __init__(self, kind, name):
+ self.name = name
+ self.kind = kind
+ self.previous = []
+ self.nexts = []
+
+ def validate_vector(self, vect: List['WayangJVMOperator'], index: int, op: 'WayangJVMOperator' = None):
+ if op is None:
+ op = self
+
+ if vect is None or len(vect) == 0:
+ vect = [None] * (index + 1)
+
+ if len(vect) < index:
+ vect.extend([None for i in range(index + 1 - len(vect))])
+
+ if vect[index] is not None:
+ raise PywyException(
+ 'the position in the index "{}" is already in use for "{}" in the operator "{}"'.format(
+ index,
+ vect[index],
+ op
+ )
+ )
+
+ return vect
+
+ def connect_to(self, nexts_index: int, operator: 'WayangJVMOperator', previous_index: int) -> 'WayangJVMOperator':
+ operator.previous = self.validate_vector(operator.previous, previous_index, operator)
+ self.nexts = self.validate_vector(self.nexts, nexts_index)
+
+ self.nexts[nexts_index] = operator
+ operator.previous[previous_index] = self
+ return self
+
+ def __str__(self):
+ return "WayangJVMOperator {}, previous.[{}], nexts.[{}]".format(
+ self.name,
+ self.previous,
+ self.nexts
+ )
+
+WJO_T = TypeVar('WJO_T', bound=WayangJVMOperator)
+
+
+class WayangJVMMappartitionOperator(WayangJVMOperator):
+
+ def __init__(self, name: str, udf: Callable = None):
+ super().__init__("MapPartitionOperator", name)
+ self.udf = udf
+
+
+class WayangJVMTextFileSource(WayangJVMOperator):
+
+ def __init__(self, name: str, path: str):
+ super().__init__("TextFileSource", name)
+ self.path = path
+
+
+class WayangJVMTextFileSink(WayangJVMOperator):
+
+ def __init__(self, name: str, path: str):
+ super().__init__("TextFileSink", name)
+ self.path = path
+
diff --git a/python/src/pywy/plugins.py b/python/src/pywy/plugins.py
index e3509b6..777d297 100644
--- a/python/src/pywy/plugins.py
+++ b/python/src/pywy/plugins.py
@@ -17,6 +17,7 @@
from pywy.core.platform import Platform
from pywy.core import Plugin
+from pywy.platforms.jvm.plugin import JVMPlugin
from pywy.platforms.python.plugin import PythonPlugin
# define the basic plugins that can be used
@@ -25,3 +26,4 @@
FLINK = Plugin({Platform('flink')})
# plugin for the python platform
PYTHON = PythonPlugin()
+JVMs = JVMPlugin()
diff --git a/python/src/pywy/tests/integration/jvm_platform_test.py b/python/src/pywy/tests/integration/jvm_platform_test.py
new file mode 100644
index 0000000..987e733
--- /dev/null
+++ b/python/src/pywy/tests/integration/jvm_platform_test.py
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import os
+import unittest
+import tempfile
+from itertools import chain
+from typing import List, Iterable
+
+from pywy.config import RC_TEST_DIR as ROOT
+from pywy.dataquanta import WayangContext
+from pywy.plugins import JVMs
+
+logger = logging.getLogger(__name__)
+
+
+class TestIntegrationJVMPlatform(unittest.TestCase):
+
+ file_10e0: str
+
+ def setUp(self):
+ self.file_10e0 = "{}/10e0MB.input".format(ROOT)
+
+ @staticmethod
+ def seed_small_grep(validation_file):
+ def pre(a: str) -> bool:
+ return 'six' in a
+
+ fd, path_tmp = tempfile.mkstemp()
+
+ dq = WayangContext() \
+ .register(JVMs) \
+ .textfile(validation_file) \
+ .filter(pre)
+
+ return dq, path_tmp, pre
+
+ def validate_files(self,
+ validation_file,
+ outputed_file,
+ read_and_convert_validation,
+ read_and_convert_outputed,
+ delete_outputed=True,
+ print_variable=False):
+ lines_filter: List[int]
+ with open(validation_file, 'r') as f:
+ lines_filter = list(read_and_convert_validation(f))
+ selectivity = len(lines_filter)
+
+ lines_platform: List[int]
+ with open(outputed_file, 'r') as fp:
+ lines_platform = list(read_and_convert_outputed(fp))
+ elements = len(lines_platform)
+
+ if delete_outputed:
+ os.remove(outputed_file)
+
+ if print_variable:
+ logger.info(f"{lines_platform=}")
+ logger.info(f"{lines_filter=}")
+ logger.info(f"{elements=}")
+ logger.info(f"{selectivity=}")
+
+ self.assertEqual(selectivity, elements)
+ self.assertEqual(lines_filter, lines_platform)
+
+ def test_grep(self):
+
+ dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
+
+ dq.store_textfile(path_tmp)
+
+ def convert_validation(file):
+ return filter(pre, file.readlines())
+
+ def convert_outputed(file):
+ return file.readlines()
+
+ self.validate_files(
+ self.file_10e0,
+ path_tmp,
+ convert_validation,
+ convert_outputed
+ )
+
+ # def test_dummy_map(self):
+ #
+ # def convert(a: str) -> int:
+ # return len(a)
+ #
+ # dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
+ #
+ # dq.map(convert) \
+ # .store_textfile(path_tmp)
+ #
+ # def convert_validation(file):
+ # return map(convert, filter(pre, file.readlines()))
+ #
+ # def convert_outputed(file):
+ # return map(lambda x: int(x), file.read().splitlines())
+ #
+ # self.validate_files(
+ # self.file_10e0,
+ # path_tmp,
+ # convert_validation,
+ # convert_outputed
+ # )
+ #
+ # def test_dummy_flatmap(self):
+ # def fm_func(string: str) -> Iterable[str]:
+ # return string.strip().split(" ")
+ #
+ # dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
+ #
+ # dq.flatmap(fm_func) \
+ # .store_textfile(path_tmp, '\n')
+ #
+ # def convert_validation(file):
+ # return chain.from_iterable(map(fm_func, filter(pre, file.readlines())))
+ #
+ # def convert_outputed(file):
+ # return file.read().splitlines()
+ #
+ # self.validate_files(
+ # self.file_10e0,
+ # path_tmp,
+ # convert_validation,
+ # convert_outputed
+ # )