Wayang 8 (#89)
* [WAYANG-8][API-PYTHON] Creation of functions to be consumed by MapPartitionsDescriptor
* [WAYANG-8][API-PYTHON] Included PythonProcessCaller that manages the python process execution and Java - Python connection
* [WAYANG-8][API-PYTHON] POM fixes plus minor test
* [WAYANG-8][API-PYTHON] Python connection through TCP socket enabled
* [WAYANG-8][API-PYTHON] Writing from Java to Python. Not taking into care about Iterator Datatypes.
* [WAYANG-8][API-PYTHON] Java Socket Writter improvements
* [WAYANG-8][API-PYTHON] Python UTF8 Deserializer included
* [WAYANG-8][API-PYTHON] Python UTF8 Reading Stream
* [WAYANG-8][API-PYTHON] Getting results from Python and continue processing
* [WAYANG-8][API-PYTHON] Config files for pywayang
* [WAYANG-8][API-PYTHON] Structures to save the plan with functional fashion plus most basic operators
* [WAYANG-8][API-PYTHON] Main program to test plan executions locally
* [WAYANG-8][API-PYTHON] Minor comments and
TODOs
* [WAYANG-8][API-PYTHON] Most basic test for protobuff communication with java
* [WAYANG-8][API-PYTHON] Addjacency list from PyWayang Plan
* [WAYANG-8][API-PYTHON] Graph traversal implementation with visitor pattern
* [WAYANG-8][API-PYTHON] Protobuf python message generator
* [WAYANG-8][API-PYTHON] Wayang Web Service project structure
* [WAYANG-8][API-PYTHON] Protobuf message generation fixes
* [WAYANG-8][API-PYTHON] Wayang Web Service executes most basic plans directly
* [WAYANG-8][API-PYTHON] Receiving Base64 passing to byte array and unpickling
* [WAYANG-8][API-PYTHON] Updated classes to process a single Serialized UDF
* [WAYANG-8][API-PYTHON] New test with single UDF
* [WAYANG-8][API-PYTHON] Protobuf command
* [WAYANG-8][API-PYTHON] Protobuf message template updated
* [WAYANG-8][API-PYTHON] POM fixes
* [WAYANG-8][API-PYTHON] License comments added
* [WAYANG-8][API-PYTHON] Correction on missing licenses
* [WAYANG-8][API-PYTHON] Serializable module creation
* [WAYANG-8][API-PYTHON] adding protoc to travis
* [WAYANG-8][API-PYTHON] protoc executable path correction
* [WAYANG-8][API-PYTHON] Commenting objc_class_prefix
* [WAYANG-8][API-PYTHON] Obtaining pipelines
* [WAYANG-8][API-PYTHON] Dataquanta writing message
* [WAYANG-8][API-PYTHON] Plan writer pipeline based adjustments
* [WAYANG-8][API-PYTHON] Operator Python executable indicator
* [WAYANG-8][API-PYTHON] Plan writer improved to use less sockets
* [WAYANG-8][API-PYTHON] New version of Wayang protobuf message
* [WAYANG-8][API-PYTHON] Wayang REST improved to allow multi pipelined executions
* [WAYANG-8][API-PYTHON] More test programs
* [WAYANG-8][API-PYTHON] Commentaries and logging for Graph module
* [WAYANG-8][API-PYTHON] Commentaries and logging for Orchestrator module
* [WAYANG-8][API-PYTHON] Commentaries and logging for Protobuf module
* [WAYANG-8][API-PYTHON] Fix usage of relative paths
* [WAYANG-8][API-PYTHON] Scripts to compile protobuf has been deleted. Now Maven executes them
* [WAYANG-8][API-PYTHON] Execution Log configuration
* [WAYANG-8][API-PYTHON] Fix - Python Map partition with single operator
* [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan
* [WAYANG-8][API-PYTHON] Plugin selection through Plan Descriptor
* [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan with Spark Execution
* [WAYANG-8][API-PYTHON] Pywayang sends protobuf message in API request as bytes using base64
* [WAYANG-8][API-PYTHON] New Operators Flatmap group by, reduce and Reduce By Key. Only Python Side.
* [WAYANG-8][API-PYTHON] Protobuf Wayang Plan message updated to allow more Complex Java-Python Operators
* [WAYANG-8][API-PYTHON] Adding TPC-H 1st Test
* [WAYANG-8][API-PYTHON] Last changes, not working
* [WAYANG-8] Fixing errors with dependencies
* [WAYANG-8] Fix to Pom versions problem
* [WAYANG-8] Protoc path updated
* [WAYANG-8] Correction in the pom.xml for flags
Signed-off-by: bertty <bertty@apache.org>
Co-authored-by: berttty <bertty@scalytics.io>
Co-authored-by: Bertty Contreras-Rojas <bertty@databloom.ai>
diff --git a/.travis.yml b/.travis.yml
index efb62ac..d00414c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -48,6 +48,7 @@
- echo ' </server>' >> ~/.m2/settings.xml
- echo ' </servers>' >> ~/.m2/settings.xml
- echo '</settings>' >> ~/.m2/settings.xml
+ - sudo apt-get install -y protobuf-compiler
- mkdir -p travis/tmp
- echo "#" >> travis/wayang.properties
- echo "# Licensed to the Apache Software Foundation (ASF) under one or more" >> travis/wayang.properties
diff --git a/pom.xml b/pom.xml
index 3ff48f9..2f7fadd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1248,6 +1248,8 @@
<exclude>**/README.md</exclude>
<exclude>**/general-todos.md</exclude>
<exclude>**/scala_1*</exclude>
+
+ <exclude>**/*pb2.py</exclude>
</excludes>
</configuration>
</plugin>
@@ -1278,7 +1280,7 @@
</includedLicenses>
<failOnBlacklist>true</failOnBlacklist>
<excludedGroups>
- org.apache.spark.*|org.apache.hadoop.*|org.apache.giraph.*|org.antlr.*|junit.*|org.graphchi.*
+ org.apache.spark.*|org.apache.hadoop.*|org.apache.giraph.*|org.antlr.*|junit.*|org.graphchi.*|org.springframework.*
</excludedGroups>
<excludeTransitiveDependencies>true</excludeTransitiveDependencies>
</configuration>
diff --git a/pywayang/config/__init__.py b/pywayang/config/__init__.py
new file mode 100644
index 0000000..008475c
--- /dev/null
+++ b/pywayang/config/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from config.config_reader import get_source_types
+from config.config_reader import get_sink_types
+from config.config_reader import get_boundary_types
diff --git a/pywayang/config/config_reader.py b/pywayang/config/config_reader.py
new file mode 100644
index 0000000..c8f5873
--- /dev/null
+++ b/pywayang/config/config_reader.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import configparser
+import os
+
+
+def get_boundary_types():
+ config = configparser.ConfigParser()
+ config.sections()
+ config.read('../config/pywayang_config.ini')
+ boundary_types = dict(config.items('BOUNDARY_TYPES'))
+ boundary_types.pop("variable_to_access")
+ return boundary_types.values()
+
+
+def get_source_types():
+ config = configparser.ConfigParser()
+ #print("path: ", os.getcwd())
+ config.read("../config/pywayang_config.ini")
+ source_types = dict(config.items('SOURCE_TYPES'))
+ source_types.pop("variable_to_access")
+ return source_types.values()
+ #sections_list = config.sections()
+ #for section in sections_list:
+ # print(section)
+ #print("source_types")
+ #for x in source_types.values():
+ # print(x)
+
+def get_sink_types():
+ config = configparser.ConfigParser()
+ #print("path: ", os.getcwd())
+ config.read("../config/pywayang_config.ini")
+ sink_types = dict(config.items('SINK_TYPES'))
+ sink_types.pop("variable_to_access")
+ return sink_types.values()
\ No newline at end of file
diff --git a/pywayang/config/pywayang_config.ini b/pywayang/config/pywayang_config.ini
new file mode 100644
index 0000000..78cc2b4
--- /dev/null
+++ b/pywayang/config/pywayang_config.ini
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[DEFAULT]
+variable_to_access = value
+
+[INPUT]
+txnname_mod = string1
+txnmemo_mod = string2
+
+[MODIFY]
+txnname_mod = string3
+txnmemo_mod = string4
+
+[BOUNDARY_TYPES]
+boundary_type_1 = union
+
+[SOURCE_TYPES]
+source_type_1 = source
+source_type_2 = text
+
+[SINK_TYPES]
+sink_type_1 = sink
+sink_type_2 = sonk
\ No newline at end of file
diff --git a/pywayang/graph/__init__.py b/pywayang/graph/__init__.py
new file mode 100644
index 0000000..17e2deb
--- /dev/null
+++ b/pywayang/graph/__init__.py
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import graph.graph
+import graph.node
\ No newline at end of file
diff --git a/pywayang/graph/graph.py b/pywayang/graph/graph.py
new file mode 100644
index 0000000..be7a32f
--- /dev/null
+++ b/pywayang/graph/graph.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from graph.node import Node
+import logging
+
+
+# Adjacency Matrix used to analise the plan
+class Graph:
+ def __init__(self):
+ self.graph = {}
+ self.nodes_no = 0
+ self.nodes = []
+
+ # Fills the Graph
+ def populate(self, sinks):
+ for sink in iter(sinks):
+ self.process_operator(sink)
+
+ # Add current operator and set dependencies
+ def process_operator(self, operator):
+ self.add_node(operator.operator_type, operator.id, operator)
+
+ if len(operator.previous) > 0:
+ for parent in operator.previous:
+ if parent:
+ self.add_node(parent.operator_type, parent.id, parent)
+ self.add_link(operator.id, parent.id, 1)
+ self.process_operator(parent)
+
+ def add_node(self, name, id, operator):
+ if id in self.nodes:
+ return
+
+ self.nodes_no += 1
+ self.nodes.append(id)
+ new_node = Node(name, id, operator)
+
+ self.graph[id] = new_node
+
+ def add_link(self, id_child, id_parent, e):
+ if id_child in self.nodes:
+ if id_parent in self.nodes:
+ self.graph[id_child].add_predecessor(id_parent, e)
+ self.graph[id_parent].add_successor(id_child, e)
+
+ def print_adjlist(self):
+
+ for key in self.graph:
+ logging.debug("Node: ", self.graph[key].operator_type, " - ", key)
+ for key2 in self.graph[key].predecessors:
+ logging.debug("- Parent: ", self.graph[key2].operator_type, " - ", self.graph[key].predecessors[key2], " - ", key2)
+ for key2 in self.graph[key].successors:
+ logging.debug("- Child: ", self.graph[key2].operator_type, " - ", self.graph[key].successors[key2], " - ", key2)
+
+ def get_node(self, id):
+ return self.graph[id]
diff --git a/pywayang/graph/node.py b/pywayang/graph/node.py
new file mode 100644
index 0000000..d0d696f
--- /dev/null
+++ b/pywayang/graph/node.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import abc
+
+
+class Element(metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ def accept(self, visitor, udf, orientation, last_iter):
+ pass
+
+
+# Describes an Operator in the Graph
+class Node(Element):
+ def __init__(self, operator_type, id, operator):
+ self.operator_type = operator_type
+ self.id = id
+ self.predecessors = {}
+ self.successors = {}
+ self.python_exec = operator.python_exec
+
+ # Temporal
+ self.operator = operator
+
+ def add_predecessor(self, id_parent, e):
+ self.predecessors[id_parent] = e
+
+ def add_successor(self, id_child, e):
+ self.successors[id_child] = e
+
+ # Nodes are visited by objects of class Visitant.
+ # Visitants are being used to execute a UDF through the Graph
+ def accept(self, visitor, udf, orientation, last_iter):
+ visitor.visit_node(self, udf, orientation, last_iter)
diff --git a/pywayang/graph/traversal.py b/pywayang/graph/traversal.py
new file mode 100644
index 0000000..e2dd851
--- /dev/null
+++ b/pywayang/graph/traversal.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from graph.visitant import Visitant
+import logging
+
+
+# Defines how a UDF will be applied over the Graph
+class Traversal:
+
+ def __init__(self, graph, origin, udf):
+ self.graph = graph
+ self.origin = origin
+ self.udf = udf
+ self.app = Visitant(graph, [])
+
+ # Starting from Sinks or Sources sets an specific orientation
+ if origin[0].source:
+ self.orientation = "successors"
+ elif origin[0].sink:
+ self.orientation = "predecessors"
+ else:
+ logging.error("Origin point to traverse the plan wrongly defined")
+ return
+
+ for operator in iter(origin):
+ logging.debug("operator origin: " + str(operator.id))
+ node = graph.get_node(operator.id)
+ self.app.visit_node(
+ node=node,
+ udf=self.udf,
+ orientation=self.orientation,
+ last_iter=None
+ )
+
+ def get_collected_data(self):
+ return self.app.get_collection()
diff --git a/pywayang/graph/visitant.py b/pywayang/graph/visitant.py
new file mode 100644
index 0000000..3d2f874
--- /dev/null
+++ b/pywayang/graph/visitant.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import abc
+import logging
+
+
+class Visitor(metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ def visit_node(self, node, udf, orientation, last_iter):
+ pass
+
+
+# Applies a UDF in current Node
+class Visitant(Visitor):
+
+ def __init__(self, graph, results):
+ self.collection = results
+ self.graph = graph
+
+ # UDF can store results in ApplyFunction.collection whenever its requires.
+ # last_iter has the generated current value obtained in the previous iteration
+ def visit_node(self, node, udf, orientation, last_iter):
+ logging.debug("Applying UDf" + str(orientation))
+ current_value = udf(node, last_iter, self.collection)
+ logging.debug("orientation result " + str(getattr(node, orientation)))
+ next_iter = getattr(node, orientation)
+ if len(next_iter) > 0:
+ for next_iter_id in next_iter:
+ if next_iter_id:
+ logging.debug("next_id: " + str(next_iter_id))
+ next_iter_node = self.graph.get_node(next_iter_id)
+ logging.debug("next_iter_node: " + next_iter_node.operator_type + " " + str(next_iter_node.id))
+ next_iter_node.accept(visitor=self, udf=udf, orientation=orientation, last_iter=current_value)
+ pass
+
+ def get_collection(self):
+ return self.collection
diff --git a/pywayang/orchestrator/__init__.py b/pywayang/orchestrator/__init__.py
new file mode 100644
index 0000000..ed7d0ac
--- /dev/null
+++ b/pywayang/orchestrator/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import orchestrator.plan
+import orchestrator.dataquanta
+import graph.graph
diff --git a/pywayang/orchestrator/dataquanta.py b/pywayang/orchestrator/dataquanta.py
new file mode 100644
index 0000000..7d700eb
--- /dev/null
+++ b/pywayang/orchestrator/dataquanta.py
@@ -0,0 +1,330 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.operator import Operator
+from graph.graph import Graph
+from graph.traversal import Traversal
+from protobuf.planwriter import MessageWriter
+import itertools
+import collections
+import logging
+from functools import reduce
+import operator
+
+
+# Wraps a Source operation to create an iterable
+class DataQuantaBuilder:
+ def __init__(self, descriptor):
+ self.descriptor = descriptor
+
+ def source(self, source):
+
+ if type(source) is str:
+ source_ori = open(source, "r")
+ else:
+ source_ori = source
+ return DataQuanta(
+ Operator(
+ operator_type="source",
+ udf=source,
+ iterator=iter(source_ori),
+ previous=[],
+ python_exec=False
+ ),
+ descriptor=self.descriptor
+ )
+
+
+# Wraps an operation over an iterable
+class DataQuanta:
+ def __init__(self, operator=None, descriptor=None):
+ self.operator = operator
+ self.descriptor = descriptor
+ if self.operator.is_source():
+ self.descriptor.add_source(self.operator)
+ if self.operator.is_sink():
+ self.descriptor.add_sink(self.operator)
+
+ # Operational Functions
+ def filter(self, udf):
+ def func(iterator):
+ return filter(udf, iterator)
+
+ return DataQuanta(
+ Operator(
+ operator_type="filter",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ def flatmap(self, udf):
+
+ def auxfunc(iterator):
+ return itertools.chain.from_iterable(map(udf, iterator))
+
+ def func(iterator):
+ mapped = map(udf, iterator)
+ flattened = flatten_single_dim(mapped)
+ yield from flattened
+
+ def flatten_single_dim(mapped):
+ for item in mapped:
+ for subitem in item:
+ yield subitem
+
+ return DataQuanta(
+ Operator(
+ operator_type="flatmap",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ def group_by(self, udf):
+ def func(iterator):
+ # TODO key should be given by "udf"
+ return itertools.groupby(iterator, key=operator.itemgetter(0))
+ #return itertools.groupby(sorted(iterator), key=itertools.itemgetter(0))
+
+ return DataQuanta(
+ Operator(
+ operator_type="group_by",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ def map(self, udf):
+ def func(iterator):
+ return map(udf, iterator)
+
+ return DataQuanta(
+ Operator(
+ operator_type="map",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ # Key specifies pivot dimensions
+ # UDF specifies reducer function
+ def reduce_by_key(self, keys, udf):
+
+ op = Operator(
+ operator_type="reduce_by_key",
+ udf=udf,
+ previous=[self.operator],
+ python_exec=False
+ )
+
+ print(len(keys), keys)
+ for i in range(0, len(keys)):
+ """if keys[i] is int:
+ op.set_parameter("vector_position|"+str(i), keys[i])
+ else:
+ op.set_parameter("dimension_key|"+str(i), keys[i])"""
+
+ # TODO maybe would be better just leave the number as key
+ op.set_parameter("dimension|"+str(i+1), keys[i])
+
+ return DataQuanta(
+ op,
+ descriptor=self.descriptor
+ )
+
+ def reduce(self, udf):
+ def func(iterator):
+ return reduce(udf, iterator)
+
+ return DataQuanta(
+ Operator(
+ operator_type="reduce",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ def sink(self, path, end="\n"):
+ def consume(iterator):
+ with open(path, 'w') as f:
+ for x in iterator:
+ f.write(str(x) + end)
+
+ def func(iterator):
+ consume(iterator)
+ # return self.__run(consume)
+
+ return DataQuanta(
+ Operator(
+ operator_type="sink",
+
+ udf=path,
+ # To execute directly uncomment
+ # udf=func,
+
+ previous=[self.operator],
+ python_exec=False
+ ),
+ descriptor=self.descriptor
+ )
+
+ def sort(self, udf):
+
+ def func(iterator):
+ return sorted(iterator, key=udf)
+
+ return DataQuanta(
+ Operator(
+ operator_type="sort",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ # This function allow the union to be performed by Python
+ # Nevertheless, current configuration runs it over Java
+ def union(self, other):
+
+ def func(iterator):
+ return itertools.chain(iterator, other.operator.getIterator())
+
+ return DataQuanta(
+ Operator(
+ operator_type="union",
+ udf=func,
+ previous=[self.operator, other.operator],
+ python_exec=False
+ ),
+ descriptor=self.descriptor
+ )
+
+ def __run(self, consumer):
+ consumer(self.operator.getIterator())
+
+ # Execution Functions
+ def console(self, end="\n"):
+ def consume(iterator):
+ for x in iterator:
+ print(x, end=end)
+
+ self.__run(consume)
+
+ # Only for debugging purposes!
+ # To execute the plan directly in the program driver
+ def execute(self):
+ logging.warn("DEBUG Execution")
+ logging.info("Reminder to swap SINK UDF value from path to func")
+ logging.debug(self.operator.previous[0].operator_type)
+ if self.operator.is_sink():
+ logging.debug(self.operator.operator_type)
+ logging.debug(self.operator.udf)
+ logging.debug(len(self.operator.previous))
+ self.operator.udf(self.operator.previous[0].getIterator())
+ else:
+ logging.error("Plan must call execute from SINK type of operator")
+ raise RuntimeError
+
+ # Converts Python Functional Plan to valid Wayang Plan
+ def to_wayang_plan(self):
+
+ sinks = self.descriptor.get_sinks()
+ if len(sinks) == 0:
+ return
+
+ graph = Graph()
+ graph.populate(self.descriptor.get_sinks())
+
+ # Uncomment to check the Graph built
+ # graph.print_adjlist()
+
+ # Function to be consumed by Traverse
+ # Separates Python Plan into a List of Pipelines
+ def define_pipelines(node1, current_pipeline, collection):
+ def store_unique(pipe_to_insert):
+ for pipe in collection:
+ if equivalent_lists(pipe, pipe_to_insert):
+ return
+ collection.append(pipe_to_insert)
+
+ def equivalent_lists(l1, l2):
+ if collections.Counter(l1) == collections.Counter(l2):
+ return True
+ else:
+ return False
+
+ if not current_pipeline:
+ current_pipeline = [node1]
+
+ elif node1.operator.is_boundary():
+ store_unique(current_pipeline.copy())
+ current_pipeline.clear()
+ current_pipeline.append(node1)
+
+ else:
+ current_pipeline.append(node1)
+
+ if node1.operator.sink:
+ store_unique(current_pipeline.copy())
+ current_pipeline.clear()
+
+ return current_pipeline
+
+ # Works over the graph
+ trans = Traversal(
+ graph=graph,
+ origin=self.descriptor.get_sources(),
+ # udf=lambda x, y, z: d(x, y, z)
+ # UDF always will receive:
+ # x: a Node object,
+ # y: an object representing the result of the last iteration,
+ # z: a collection to store final results inside your UDF
+ udf=lambda x, y, z: define_pipelines(x, y, z)
+ )
+
+ # Gets the results of the traverse process
+ collected_stages = trans.get_collected_data()
+
+ # Passing the Stages to a Wayang message writer
+ writer = MessageWriter()
+ a = 0
+ # Stage is composed of class Node objects
+ for stage in collected_stages:
+ a += 1
+ logging.info("///")
+ logging.info("stage" + str(a))
+ writer.process_pipeline(stage)
+
+ writer.set_dependencies()
+
+ # Uses a file to provide the plan
+ # writer.write_message(self.descriptor)
+
+ # Send the plan to Wayang REST api directly
+ writer.send_message(self.descriptor)
diff --git a/pywayang/orchestrator/execdirectly.py b/pywayang/orchestrator/execdirectly.py
new file mode 100644
index 0000000..452ccab
--- /dev/null
+++ b/pywayang/orchestrator/execdirectly.py
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+import datetime
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort_filter(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .sort(lambda elem: elem.lower()) \
+ .filter(lambda elem: str(elem).startswith("f")) \
+ .sink("../test/output.txt", end="")
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter_text(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .filter(lambda elem: str(elem).startswith("f")) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/numbers.txt") \
+ .filter(lambda elem: int(elem) % 2 != 0) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_basic(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/lines.txt") \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_junction(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt") \
+ .filter(lambda elem: str(elem).startswith("I"))
+ dq_source_c = plan.source("../test/lastlines.txt") \
+ .filter(lambda elem: str(elem).startswith("W"))
+
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .union(dq_source_c) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+def plan_java_junction(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .filter(lambda elem: str(elem).startswith("I")) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+def plan_tpch_q1(descriptor):
+
+ #TODO create reduce by
+ plan = DataQuantaBuilder(descriptor)
+
+ def reducer(obj1, obj2):
+ return obj1[0]
+
+ sink = plan.source("../test/lineitem.txt") \
+ .map(lambda elem: elem.split("|")) \
+ .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime("1998-09-02", '%Y-%m-%d')) \
+ .map(lambda elem:
+ [elem[8], elem[9], elem[4], elem[5],
+ float(elem[5]) * (1 - float(elem[6])),
+ float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])),
+ elem[4], elem[5],
+ elem[6], 1]) \
+ .sink("../test/output.txt", end="")
+ # .group_by(lambda elem: elem) \
+ # .reduce_by(reducer) \
+ # .flatmap(lambda elem: elem.split("|"))
+ # .map(lambda elem: (elem, elem.split("|"))) \
+ # L_RETURNFLAG 8
+ # L_LINESTATUS 9
+ # L_QUANTITY 4
+ # L_EXTENDEDPRICE 5
+ # discount 6
+ # tax 7
+
+ return dq_source_b
+
+
+def plan_full_java(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+if __name__ == '__main__':
+
+ # Plan will contain general info about the Wayang Plan created here
+ descriptor = Descriptor()
+
+ plan_dataquanta_sink = plan_tpch_q1(descriptor)
+ plan_dataquanta_sink.execute()
diff --git a/pywayang/orchestrator/main.py b/pywayang/orchestrator/main.py
new file mode 100644
index 0000000..b634eeb
--- /dev/null
+++ b/pywayang/orchestrator/main.py
@@ -0,0 +1,173 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+import datetime
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort_filter(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .sort(lambda elem: elem.lower()) \
+ .filter(lambda elem: str(elem).startswith("f")) \
+ .sink("../test/output.txt", end="")
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter_text(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .filter(lambda elem: str(elem).startswith("f")) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/numbers.txt") \
+ .filter(lambda elem: int(elem) % 2 != 0) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_basic(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/lines.txt") \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_junction(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt") \
+ .filter(lambda elem: str(elem).startswith("I"))
+ dq_source_c = plan.source("../test/lastlines.txt") \
+ .filter(lambda elem: str(elem).startswith("W"))
+
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .union(dq_source_c) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+def plan_java_junction(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .filter(lambda elem: str(elem).startswith("I")) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+def plan_tpch_q1(descriptor):
+
+ # TODO create reduce by
+ plan = DataQuantaBuilder(descriptor)
+
+ def reducer(obj1, obj2):
+ return obj1[0], obj1[1], obj1[2] + obj2[2], obj1[3] + obj2[3], obj1[4] + obj2[4], obj1[5] + obj2[5], \
+ obj1[6] + obj2[6], obj1[7] + obj2[7], obj1[8] + obj2[8], obj1[9] + obj2[9]
+
+ sink = plan.source("../test/lineitem.txt") \
+ .map(lambda elem: elem.split("|")) \
+ .sink("../test/output.txt", end="")
+ """
+ .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime('1998-09-02', '%Y-%m-%d')) \
+ .map(lambda elem:
+ [elem[8], elem[9], elem[4], elem[5],
+ float(elem[5]) * (1 - float(elem[6])),
+ float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])),
+ elem[4], elem[5],
+ elem[6], 1]) \
+ .sink("../test/output.txt", end="")"""
+ # .reduce_by_key([0, 1], reducer) \
+
+
+ return sink
+
+
+def plan_full_java(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+def plan_wordcount(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+ sink_wordcount = plan.source("../test/lineitem.txt") \
+ .filter(lambda elem: len(str(elem).split("|")[0]) < 4) \
+ .flatmap(lambda elem: str(elem).split("|")) \
+ .sink("../test/output.txt", end="")
+
+ return sink_wordcount
+
+
+if __name__ == '__main__':
+
+ # Plan will contain general info about the Wayang Plan created here
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.spark)
+ descriptor.add_plugin(Descriptor.Plugin.java)
+
+ plan_dataquanta_sink = plan_wordcount(descriptor)
+ # plan_dataquanta_sink.execute()
+ # plan_dataquanta_sink.console()
+
+ plan_dataquanta_sink.to_wayang_plan()
diff --git a/pywayang/orchestrator/operator.py b/pywayang/orchestrator/operator.py
new file mode 100644
index 0000000..ecaa6bd
--- /dev/null
+++ b/pywayang/orchestrator/operator.py
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import cloudpickle
+from config.config_reader import get_source_types
+from config.config_reader import get_sink_types
+from config.config_reader import get_boundary_types
+import logging
+
+pickle_protocol = pickle.HIGHEST_PROTOCOL
+
+
+# Describes an Operation over an intermediate result
+# Each operation could be processed by Python or Java platforms
+class Operator:
+
+ def __init__(
+ self, operator_type=None, udf=None, previous=None,
+ iterator=None, python_exec=False
+ ):
+
+ # Operator ID
+ self.id = id(self)
+
+ # Operator Type
+ self.operator_type = operator_type
+
+ # Set Boundaries
+ if self.operator_type in get_boundary_types():
+ self.boundary = True
+ else:
+ self.boundary = False
+
+ # UDF Function
+ self.udf = udf
+
+ # Source types must come with an Iterator
+ self.iterator = iterator
+ if operator_type in get_source_types():
+ if iterator is None:
+ print("Source Operator Type without an Iterator")
+ raise
+ else:
+ self.source = True
+ else:
+ self.source = False
+
+ # Sink Operators
+ if operator_type in get_sink_types():
+ self.sink = True
+ else:
+ self.sink = False
+
+ # TODO Why managing previous and predecessors per separate?
+ self.previous = previous
+
+ self.successor = []
+ self.predecessor = []
+
+ self.parameters = {}
+
+ # Set predecessors and successors from previous
+ if self.previous:
+ for prev in self.previous:
+ if prev is not None:
+ prev.set_successor(self)
+ self.set_predecessor(prev)
+
+ self.python_exec = python_exec
+
+ logging.info("Operator:" + str(self.getID()) + ", type:" + self.operator_type + ", PythonExecutable: " +
+ str(self.python_exec) +
+ ", is boundary: " + str(self.is_boundary()) + ", is source: " +
+ str(self.source) + ", is sink: " + str(self.sink))
+
+ def getID(self):
+ return self.id
+
+ def is_source(self):
+ return self.source
+
+ def is_sink(self):
+ return self.sink
+
+ def is_boundary(self):
+ return self.boundary
+
+ def serialize_udf(self):
+ self.udf = cloudpickle.dumps(self.udf)
+
+ def getIterator(self):
+ if self.is_source():
+ return self.iterator
+ # TODO this should iterate through previous REDESIGN
+ return self.udf(self.previous[0].getIterator())
+
+ def set_parameter(self, key, value):
+ self.parameters[key] = value
+
+ def set_successor(self, suc):
+ if (not self.is_sink()) and self.successor.count(suc) == 0:
+ self.successor.append(suc)
+
+ def set_predecessor(self, suc):
+ if self.predecessor.count(suc) == 0:
+ self.predecessor.append(suc)
diff --git a/pywayang/orchestrator/plan.py b/pywayang/orchestrator/plan.py
new file mode 100644
index 0000000..25610cc
--- /dev/null
+++ b/pywayang/orchestrator/plan.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+from enum import Enum
+
+class Descriptor:
+
+ def __init__(self):
+ self.sinks = []
+ self.sources = []
+ self.boundary_operators = None
+ logging.basicConfig(filename='../config/execution.log', level=logging.DEBUG)
+ self.plugins = []
+
+ class Plugin(Enum):
+ java = 0
+ spark = 1
+
+ def get_boundary_operators(self):
+ return self.boundary_operators
+
+ def add_source(self, operator):
+ self.sources.append(operator)
+
+ def get_sources(self):
+ return self.sources
+
+ def add_sink(self, operator):
+ self.sinks.append(operator)
+
+ def get_sinks(self):
+ return self.sinks
+
+ def add_plugin(self, plugin):
+ self.plugins.append(plugin)
+
+ def get_plugins(self):
+ return self.plugins
diff --git a/pywayang/protobuf/__init__.py b/pywayang/protobuf/__init__.py
new file mode 100644
index 0000000..15a80ad
--- /dev/null
+++ b/pywayang/protobuf/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2
diff --git a/pywayang/protobuf/old_planwriter.py b/pywayang/protobuf/old_planwriter.py
new file mode 100644
index 0000000..e8700f0
--- /dev/null
+++ b/pywayang/protobuf/old_planwriter.py
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2 as pwb
+import os
+import pickle
+import struct
+import base64
+
+
+class OldMessageWriter:
+
+ def __init__(self, descriptor):
+
+ sink = descriptor.get_sinks()[0]
+ source = descriptor.get_sources()[0]
+
+ op = source
+ visited = []
+ middle_operators = []
+ while op.sink is not True and len(op.successor) > 0:
+ pre = op.successor[0]
+ if pre not in visited and pre.sink is not True:
+ pre.serialize_udf()
+ middle_operators.append(pre)
+ """base64_bytes = base64.b64encode(pre.udf)
+ pre.udf = base64_bytes"""
+
+ """pre.serialize_udf()
+ print("pre.udf")
+ print(pre.udf)
+ func = pickle.loads(pre.udf)
+ print("func")
+ print(func)
+ middle_operators.append(pre)
+
+ # Testing
+ msg = pre.udf
+ base64_bytes = base64.b64encode(msg)
+ base64_message = base64.b64decode(base64_bytes)
+ func2 = pickle.loads(base64_message)
+ print(base64_message)
+ func3 = pickle.loads(b'\x80\x04\x955\x04\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\x02\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x03K\x13C\nt\x00\x88\x00|\x00\x83\x02S\x00\x94N\x85\x94\x8c\x06filter\x94\x85\x94\x8c\x08iterator\x94\x85\x94\x8cS/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/dataquanta.py\x94\x8c\x04func\x94K%C\x02\x00\x01\x94\x8c\x03udf\x94\x85\x94)t\x94R\x94}\x94(\x8c\x0b__package__\x94\x8c\x0corchestrator\x94\x8c\x08__name__\x94\x8c\x17orchestrator.dataquanta\x94\x8c\x08__file__\x94\x8cS/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/dataquanta.py\x94uNNh\x00\x8c\x10_make_empty_cell\x94\x93\x94)R\x94\x85\x94t\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h"}\x94}\x94(h\x19h\x10\x8c\x0c__qualname__\x94\x8c\x1fDataQuanta.filter.<locals>.func\x94\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x1a\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94h\x00\x8c\n_make_cell\x94\x93\x94h\x05(h\x08(K\x01K\x00K\x01K\x02KSC\x10t\x00|\x00\x83\x01d\x01\x16\x00d\x02k\x03S\x00\x94NK\x02K\x00\x87\x94\x8c\x03int\x94\x85\x94\x8c\x04elem\x94\x85\x94\x8cM/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/main.py\x94\x8c\x08<lambda>\x94K\x18C\x00\x94))t\x94R\x94}\x94(h\x17Nh\x19\x8c\x08__main__\x94h\x1b\x8cM/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/main.py\x94uNNNt\x94R\x94h%hB}\x94}\x94(h\x19h:h(\x8c\x1dplan_filter.<locals>.<lambda>\x94h*}\x94h,Nh-Nh.h?h/Nh0N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0\x85\x94R\x94\x85\x94hG]\x94hI}\x94u\x86\x94\x86R0.')
+ for i in func3([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]):
+ print(i)"""
+ op = pre
+
+ """for mid in middle_operators:
+ print(mid.operator_type)
+ print(pickle.loads(mid.udf))
+ func = pickle.loads(mid.udf)
+ for i in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]):
+ print(i)"""
+
+ finalpath = "/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/filter_message"
+ planconf = pwb.WayangPlan()
+ try:
+ f = open(finalpath, "rb")
+ planconf.ParseFromString(f.read())
+ f.close()
+ except IOError:
+ print(finalpath + ": Could not open file. Creating a new one.")
+
+ so = pwb.Source()
+ so.id = source.id
+ so.type = source.operator_type
+ so.path = os.path.abspath(source.udf)
+
+ operators = []
+ for mid in middle_operators:
+ op = pwb.Operator()
+ op.id = mid.id
+ op.type = mid.operator_type
+ op.udf = mid.udf
+ operators.append(op)
+
+ si = pwb.Sink()
+ si.id = sink.id
+ si.type = sink.operator_type
+ si.path = os.path.abspath(sink.udf)
+
+ plan = pwb.Plan()
+ plan.source.CopyFrom(so)
+ plan.sink.CopyFrom(si)
+ plan.operators.extend(operators)
+ plan.input = pwb.Plan.string
+ plan.output = pwb.Plan.string
+
+ ctx = pwb.Context()
+ ctx.platforms.extend([pwb.Context.Platform.java])
+
+ planconf.plan.CopyFrom(plan)
+ planconf.context.CopyFrom(ctx)
+
+ f = open(finalpath, "wb")
+ f.write(planconf.SerializeToString())
+ f.close()
+ pass
+
+class func_inteface:
+
+ def __init__(self, node, nested_udf):
+ self.node = node
+ self.nested_udf = nested_udf
+
+ def func(self, iterable):
+ return self.node.operator.udf(self.nested_udf(iterable))
+
+
+class MessageWriter:
+ sources = []
+ operators = []
+ sinks = []
+
+ def add_source(self, operator_id, operator_type, path, predecessors, successors):
+ source = pwb.OperatorProto()
+ source.id = operator_id
+ source.type = operator_type
+ source.path = os.path.abspath(path)
+ source.udf = None
+ source.predecessors = predecessors
+ source.successors = successors
+ self.sources.append(source)
+
+ def add_sink(self, operator_id, operator_type, path, predecessors, successors):
+ sink = pwb.OperatorProto()
+ sink.id = operator_id
+ sink.type = operator_type
+ sink.path = os.path.abspath(path)
+ sink.udf = None
+ sink.predecessors = predecessors
+ sink.successors = successors
+ self.sinks.append(sink)
+
+ def add_operator(self, operator_id, operator_type, udf, path, predecessors, successors):
+ op = pwb.OperatorProto()
+ op.id = operator_id
+ op.type = operator_type
+ op.udf = udf
+ op.path = path
+ op.predecessors = predecessors
+ op.successors = successors
+ self.operators.append(op)
+
+ def process_pipeline(self, stage):
+
+ nested_udf = None
+ nested_id = ""
+ for node in reversed(stage):
+ print("########")
+ print(node.operator_type, "executable:", node.python_exec, "id:", node.id)
+
+ if nested_udf is not None:
+ print("review pre")
+ print( nested_udf)
+ print( nested_udf(["Wilo","lifo","Wifo"]))
+
+ if not node.python_exec:
+ if nested_udf is not None:
+ """self.add_operator(nested_id, "map_partition", nested_udf, None
+ # obtain predecessors and successors
+ , successors=[node.id]
+ )"""
+ print("node", nested_id)
+ print(nested_udf)
+ print("he muerto")
+ print( nested_udf(["Wilo","lifo","Wifo"]))
+
+ t = nested_udf(["Wilo","lifo","Wifo"])
+ print("jajajarvard")
+ print(t)
+ for i in t:
+ print(i)
+ nested_udf = None
+ nested_id = ""
+
+ """if node.operator.source:
+ self.add_source(
+ node.id, node.operator_type, node.operator.udf,
+ node.predecessors, node.operator.successor)
+ else:
+ self.add_operator(
+ node.id, node.operator_type, None, node.operator.udf,
+ node.predecessors, node.operator.successor)"""
+ else:
+ print("adding", node.id)
+ if nested_udf is None:
+ nested_udf = node.operator.udf
+ nested_id = node.id
+ else:
+ print("paseeeeeee viste")
+ tmp = nested_udf
+
+ print( tmp(["Wilo","lifo","Wifo"]))
+
+ #def func(_, iterable):
+ # return nested_udf(node.operator.udf(iterable))
+ nested_udf = self.concatenate(nested_udf, node.operator.udf)
+ print( nested_udf(["Wilo","lifo","Wifo"]))
+ print(nested_udf)
+
+ # nested_udf = func_inteface(node, nested_udf)
+ nested_id = str(node.id) + "," + str(nested_id)
+
+ if nested_udf is not None:
+ print("review")
+ print( nested_udf)
+ print( nested_udf(["Wilo","lifo","Wifo"]))
+
+ if nested_udf is not None:
+ """self.add_operator(nested_id, "map_partition", nested_udf, None
+ # obtain predecessors and successors
+ , successors=[node.id]
+ )"""
+ print("node", nested_id)
+ print(nested_udf)
+ t = nested_udf(["Wilo","lifo","Wifo"])
+ print("jajajarvard2")
+ print(t)
+ for i in t:
+ print(i)
+ nested_udf = None
+ nested_id = ""
+
+ def __init__(self):
+ print("lala")
+
+ def concatenate(self, function_a, function_b):
+ def executable(iterable):
+ return function_a(function_b(iterable))
+ return executable
+
+ def old(self, descriptor):
+
+ sink = descriptor.get_sinks()[0]
+ source = descriptor.get_sources()[0]
+
+ op = source
+ visited = []
+ middle_operators = []
+ while op.sink is not True and len(op.successor) > 0:
+ pre = op.successor[0]
+ if pre not in visited and pre.sink is not True:
+ pre.serialize_udf()
+ middle_operators.append(pre)
+ op = pre
+
+ finalpath = "/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/filter_message"
+ planconf = pwb.WayangPlan()
+ try:
+ f = open(finalpath, "rb")
+ planconf.ParseFromString(f.read())
+ f.close()
+ except IOError:
+ print(finalpath + ": Could not open file. Creating a new one.")
+
+ so = pwb.Source()
+ so.id = source.id
+ so.type = source.operator_type
+ so.path = os.path.abspath(source.udf)
+
+ operators = []
+ for mid in middle_operators:
+ op = pwb.Operator()
+ op.id = mid.id
+ op.type = mid.operator_type
+ op.udf = mid.udf
+ operators.append(op)
+
+ si = pwb.Sink()
+ si.id = sink.id
+ si.type = sink.operator_type
+ si.path = os.path.abspath(sink.udf)
+
+ plan = pwb.Plan()
+ plan.source.CopyFrom(so)
+ plan.sink.CopyFrom(si)
+ plan.operators.extend(operators)
+ plan.input = pwb.Plan.string
+ plan.output = pwb.Plan.string
+
+ ctx = pwb.Context()
+ ctx.platforms.extend([pwb.Context.Platform.java])
+
+ planconf.plan.CopyFrom(plan)
+ planconf.context.CopyFrom(ctx)
+
+ f = open(finalpath, "wb")
+ f.write(planconf.SerializeToString())
+ f.close()
+ pass
+
+ def pipeline_singleton(self):
+ print("lala")
diff --git a/pywayang/protobuf/planwriter.py b/pywayang/protobuf/planwriter.py
new file mode 100644
index 0000000..b63dcbb
--- /dev/null
+++ b/pywayang/protobuf/planwriter.py
@@ -0,0 +1,277 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2 as pwb
+import os
+import cloudpickle
+import logging
+import pathlib
+import requests
+import base64
+
+
+# Writes Wayang Plan from several stages
+class MessageWriter:
+ sources = []
+ operators = []
+ sinks = []
+ operator_references = {}
+ boundaries = {}
+
+ # Creates and appends Source type of operator
+ def add_source(self, operator_id, operator_type, path):
+ source = pwb.OperatorProto()
+ source.id = str(operator_id)
+ source.type = operator_type
+ source.path = os.path.abspath(path)
+ source.udf = chr(0).encode('utf-8')
+ # source.parameters = {}
+ self.sources.append(source)
+ return source
+
+ # Creates and appends Sink type of operator
+ def add_sink(self, operator_id, operator_type, path):
+ sink = pwb.OperatorProto()
+ sink.id = str(operator_id)
+ sink.type = operator_type
+ sink.path = os.path.abspath(path)
+ sink.udf = chr(0).encode('utf-8')
+ # sink.parameters = {}
+ self.sinks.append(sink)
+ return sink
+
+ # Creates and appends a Python operator
+ # Python OP don't require parameters, UDF has the function ready to be executed directly
+ def add_operator(self, operator_id, operator_type, udf):
+ op = pwb.OperatorProto()
+ op.id = str(operator_id)
+ op.type = operator_type
+ op.udf = cloudpickle.dumps(udf)
+ op.path = str(None)
+ # op.parameters = {}
+ self.operators.append(op)
+ return op
+
+ # Creates and appends a Java operator
+ def add_java_operator(self, operator_id, operator_type, udf, parameters):
+ op = pwb.OperatorProto()
+ op.id = str(operator_id)
+ op.type = operator_type
+ op.udf = cloudpickle.dumps(udf)
+ op.path = str(None)
+ #op.parameters = parameters
+ for param in parameters:
+ print(param, parameters[param])
+ op.parameters[param] = str(parameters[param])
+ # op.parameters[]
+ #m.mapfield[5] = 10
+ self.operators.append(op)
+ return op
+
+ # Receive a chain of operators, separate them in Wayang Operators
+ # Compacts several Python executable operators in one Map Partition Wayang Operator
+ def process_pipeline(self, stage):
+
+ nested_udf = None
+ nested_id = ""
+ nested_predecessors = None
+ nested_successors = None
+ for node in reversed(stage):
+ logging.debug(node.operator_type + " executable: " + str(node.python_exec) + " id: " + str(node.id))
+
+ if not node.python_exec:
+ if nested_udf is not None:
+
+ # Predecessors depends on last operator
+ # Successors depends on first operator
+ op = self.add_operator(nested_id, "map_partition", nested_udf)
+
+ ids = str(nested_id).split(",")
+ for id in ids:
+ self.operator_references[str(id)] = op
+
+ self.boundaries[str(nested_id)] = {}
+ self.boundaries[str(nested_id)]["end"] = nested_successors
+ self.boundaries[str(nested_id)]["start"] = nested_predecessors
+
+ nested_udf = None
+ nested_id = ""
+ nested_predecessors = None
+ nested_successors = None
+
+ if node.operator.source:
+ op = self.add_source(node.id, node.operator_type, node.operator.udf)
+ self.operator_references[str(node.id)] = op
+ self.boundaries[str(node.id)] = {}
+ self.boundaries[str(node.id)]["end"] = node.successors.keys()
+
+ elif node.operator.sink:
+ op = self.add_sink(node.id, node.operator_type, node.operator.udf)
+ self.operator_references[str(node.id)] = op
+ self.boundaries[str(node.id)] = {}
+ self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
+
+ # Regular operator to be processed in Java
+ # Notice that those could include more parameters for Java
+ else:
+ op = self.add_java_operator(node.id, node.operator_type, node.operator.udf, node.operator.parameters)
+ self.operator_references[str(node.id)] = op
+ self.boundaries[str(node.id)] = {}
+ self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
+ self.boundaries[str(node.id)]["end"] = node.successors.keys()
+
+ else:
+
+ if nested_udf is None:
+ nested_udf = node.operator.udf
+ nested_id = node.id
+ # It is the last operator to execute in the map partition
+ nested_successors = node.successors.keys()
+
+ else:
+ nested_udf = self.concatenate(nested_udf, node.operator.udf)
+ nested_id = str(node.id) + "," + str(nested_id)
+
+ # Every iteration assign the first known predecessors
+ nested_predecessors = node.predecessors.keys()
+
+ # Just in case in the future some pipelines start with Python operators
+ if nested_udf is not None:
+ self.add_operator(nested_id, "map_partition", nested_udf)
+
+ ids = nested_id.split(",")
+ for id in ids:
+ self.operator_references[id] = op
+
+ self.boundaries[nested_id] = {}
+ self.boundaries[nested_id]["end"] = nested_successors
+ self.boundaries[nested_id]["start"] = nested_predecessors
+
+ def __init__(self):
+ pass
+
+ # Takes 2 Functions and compact them in only one function
+ @staticmethod
+ def concatenate(function_a, function_b):
+ def executable(iterable):
+ return function_a(function_b(iterable))
+
+ return executable
+
+ # Set dependencies over final Wayang Operators
+ def set_dependencies(self):
+
+ for source in self.sources:
+
+ if 'end' in self.boundaries[source.id]:
+ op_successors = []
+ for op_id in self.boundaries[source.id]['end']:
+ op_successors.append(str(self.operator_references[str(op_id)].id))
+ source.successors.extend(op_successors)
+
+ for sink in self.sinks:
+ if 'start' in self.boundaries[sink.id]:
+ op_predecessors = []
+ for op_id in self.boundaries[sink.id]['start']:
+ op_predecessors.append(str(self.operator_references[str(op_id)].id))
+ sink.predecessors.extend(op_predecessors)
+
+ for op in self.operators:
+ if 'start' in self.boundaries[op.id]:
+ op_predecessors = []
+ for op_id in self.boundaries[op.id]['start']:
+ op_predecessors.append(str(self.operator_references[str(op_id)].id))
+ op.predecessors.extend(op_predecessors)
+
+ if 'end' in self.boundaries[op.id]:
+ op_successors = []
+ for op_id in self.boundaries[op.id]['end']:
+ op_successors.append(str(self.operator_references[str(op_id)].id))
+ op.successors.extend(op_successors)
+
+ # Writes the message to a local directory
+ def write_message(self, descriptor):
+
+ finalpath = "../../protobuf/wayang_message"
+ plan_configuration = pwb.WayangPlanProto()
+
+ try:
+ f = open(finalpath, "rb")
+ plan_configuration.ParseFromString(f.read())
+ f.close()
+ except IOError:
+ logging.warn("File " + finalpath + " did not exist. System generated a new file")
+
+ plan = pwb.PlanProto()
+ plan.sources.extend(self.sources)
+ plan.operators.extend(self.operators)
+ plan.sinks.extend(self.sinks)
+ plan.input = pwb.PlanProto.string
+ plan.output = pwb.PlanProto.string
+
+ ctx = pwb.ContextProto()
+ # ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
+ for plug in descriptor.plugins:
+ ctx.platforms.append(plug.value)
+ # ctx.platforms.extend(descriptor.get_plugins())
+
+ plan_configuration.plan.CopyFrom(plan)
+ plan_configuration.context.CopyFrom(ctx)
+
+ f = open(finalpath, "wb")
+ f.write(plan_configuration.SerializeToString())
+ f.close()
+ pass
+
+ # Send message as bytes to the Wayang Rest API
+ def send_message(self, descriptor):
+
+ plan_configuration = pwb.WayangPlanProto()
+
+ plan = pwb.PlanProto()
+ plan.sources.extend(self.sources)
+ plan.operators.extend(self.operators)
+ plan.sinks.extend(self.sinks)
+ plan.input = pwb.PlanProto.string
+ plan.output = pwb.PlanProto.string
+
+ ctx = pwb.ContextProto()
+ # ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
+ for plug in descriptor.plugins:
+ ctx.platforms.append(plug.value)
+ # ctx.platforms.extend(descriptor.get_plugins())
+
+ plan_configuration.plan.CopyFrom(plan)
+ plan_configuration.context.CopyFrom(ctx)
+
+ print("plan!")
+ print(plan_configuration)
+
+ msg_bytes = plan_configuration.SerializeToString()
+ msg_64 = base64.b64encode(msg_bytes)
+
+ logging.debug(msg_bytes)
+ # response = requests.get("http://localhost:8080/plan/create/fromfile")
+ data = {
+ 'message': msg_64
+ }
+ response = requests.post("http://localhost:8080/plan/create", data)
+ logging.debug(response)
+ # f = open(finalpath, "wb")
+ # f.write(plan_configuration.SerializeToString())
+ # f.close()
+ pass
diff --git a/pywayang/test/demo_testing.py b/pywayang/test/demo_testing.py
new file mode 100644
index 0000000..c096a89
--- /dev/null
+++ b/pywayang/test/demo_testing.py
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+
+class MyTestCase(unittest.TestCase):
+
+ def test_something(self):
+ self.assertEqual(True, False)
+
+ def test_upper(self):
+ self.assertEqual('foo'.upper(), 'FOO')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pywayang/test/full_java_test.py b/pywayang/test/full_java_test.py
new file mode 100644
index 0000000..d17aedd
--- /dev/null
+++ b/pywayang/test/full_java_test.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+
+
+class MyTestCase(unittest.TestCase):
+
+ def test_most_basic(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.java)
+
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/lines.txt") \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+ def test_single_juncture(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.java)
+
+ plan = DataQuantaBuilder(descriptor)
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+ def test_multiple_juncture(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.java)
+
+ plan = DataQuantaBuilder(descriptor)
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt") \
+ .filter(lambda elem: str(elem).startswith("I"))
+ dq_source_c = plan.source("../test/lastlines.txt") \
+ .filter(lambda elem: str(elem).startswith("W"))
+
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .union(dq_source_c) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pywayang/test/full_spark_test.py b/pywayang/test/full_spark_test.py
new file mode 100644
index 0000000..9276ccc
--- /dev/null
+++ b/pywayang/test/full_spark_test.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+
+
+def test_most_basic(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.spark)
+
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/lines.txt") \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+def test_single_juncture(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.spark)
+
+ plan = DataQuantaBuilder(descriptor)
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+def test_multiple_juncture(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.spark)
+
+ plan = DataQuantaBuilder(descriptor)
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt") \
+ .filter(lambda elem: str(elem).startswith("I"))
+ dq_source_c = plan.source("../test/lastlines.txt") \
+ .filter(lambda elem: str(elem).startswith("W"))
+
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .union(dq_source_c) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/wayang-api/pom.xml b/wayang-api/pom.xml
index e31751b..c868801 100644
--- a/wayang-api/pom.xml
+++ b/wayang-api/pom.xml
@@ -38,6 +38,7 @@
<modules>
<module>wayang-api-scala-java</module>
<module>wayang-api-python</module>
+ <module>wayang-api-rest</module>
</modules>
</project>
diff --git a/wayang-api/wayang-api-python/pom.xml b/wayang-api/wayang-api-python/pom.xml
index 5a3ad44..647f07e 100644
--- a/wayang-api/wayang-api-python/pom.xml
+++ b/wayang-api/wayang-api-python/pom.xml
@@ -30,9 +30,54 @@
<version>0.6.1-SNAPSHOT</version>
<name>Wayang API Python</name>
- <description>Wayang implementation of an API of Python to be enable to work with code writed in python</description>
+ <description>Wayang implementation of an API of Python to be enable to work with code wrote in python</description>
<properties>
<java-module-name>org.apache.wayang.api</java-module-name>
+ <graphchi.version>0.2.2</graphchi.version>
+ <spark.version>2.4.0</spark.version>
+ <flink.version>1.7.1</flink.version>
+ <scala.mayor.version>2.11</scala.mayor.version>
+ <giraph.version>1.2.0-hadoop2</giraph.version>
</properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-commons</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-core</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-java</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.7.0</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
</project>
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java
new file mode 100644
index 0000000..b463b36
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.function.PythonUdf;
+import org.apache.wayang.core.api.exception.WayangException;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+
+public class ProcessFeeder<Input, Output> {
+
+ private Socket socket;
+ private PythonUdf<Input, Output> udf;
+ private ByteString serializedUDF;
+ private Iterable<Input> input;
+
+ //TODO add to a config file
+ int END_OF_DATA_SECTION = -1;
+ int NULL = -5;
+
+ public ProcessFeeder(
+ Socket socket,
+ PythonUdf<Input, Output> udf,
+ ByteString serializedUDF,
+ Iterable<Input> input){
+
+ if(input == null) throw new WayangException("Nothing to process with Python API");
+
+ this.socket = socket;
+ this.udf = udf;
+ this.serializedUDF = serializedUDF;
+ this.input = input;
+
+ }
+
+ public void send(){
+
+ try{
+ //TODO use config buffer size
+ int BUFFER_SIZE = 8192;
+
+ BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
+ DataOutputStream dataOut = new DataOutputStream(stream);
+
+ writeUDF(serializedUDF, dataOut);
+ this.writeIteratorToStream(input.iterator(), dataOut);
+ dataOut.writeInt(END_OF_DATA_SECTION);
+ dataOut.flush();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void writeUDF(ByteString serializedUDF, DataOutputStream dataOut){
+
+ //write(serializedUDF.toByteArray(), dataOut);
+ writeBytes(serializedUDF.toByteArray(), dataOut);
+ System.out.println("UDF written");
+
+ }
+
+ public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut){
+
+ System.out.println("iterator being send");
+ for (Iterator<Input> it = iter; it.hasNext(); ) {
+ Input elem = it.next();
+ //System.out.println(elem.toString());
+ write(elem, dataOut);
+ }
+ }
+
+ /*TODO Missing case PortableDataStream */
+ public void write(Object obj, DataOutputStream dataOut){
+ try {
+
+ if(obj == null)
+ dataOut.writeInt(this.NULL);
+
+ /**
+ * Byte Array cases
+ */
+ else if (obj instanceof Byte[] || obj instanceof byte[]) {
+ System.out.println("Writing Bytes");
+ writeBytes(obj, dataOut);
+ }
+ /**
+ * String case
+ * */
+ else if (obj instanceof String)
+ writeUTF((String) obj, dataOut);
+
+ /**
+ * Key, Value case
+ * */
+ else if (obj instanceof Map.Entry)
+ writeKeyValue((Map.Entry) obj, dataOut);
+
+ else{
+ throw new WayangException("Unexpected element type " + obj.getClass());
+ }
+
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void writeBytes(Object obj, DataOutputStream dataOut){
+
+ try{
+
+ if (obj instanceof Byte[]) {
+
+ int length = ((Byte[]) obj).length;
+
+ byte[] bytes = new byte[length];
+ int j=0;
+
+ // Unboxing Byte values. (Byte[] to byte[])
+ for(Byte b: ((Byte[]) obj))
+ bytes[j++] = b.byteValue();
+
+ dataOut.writeInt(length);
+ dataOut.write(bytes);
+
+ } else if (obj instanceof byte[]) {
+
+ dataOut.writeInt(((byte[]) obj).length);
+ dataOut.write(((byte[]) obj));
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void writeUTF(String str, DataOutputStream dataOut){
+
+ byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+
+ try {
+
+ dataOut.writeInt(bytes.length);
+ dataOut.write(bytes);
+ } catch (SocketException e){
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){
+
+ write(obj.getKey(), dataOut);
+ write(obj.getValue(), dataOut);
+ }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java
new file mode 100644
index 0000000..895245a
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+/*TODO cannot be always string, include definition for every operator
+* like: map(udf, inputtype, outputtype)*/
+public class ProcessReceiver<Output> {
+
+ private ReaderIterator<Output> iterator;
+
+ public ProcessReceiver(Socket socket){
+ try{
+ //TODO use config buffer size
+ int BUFFER_SIZE = 8192;
+
+ DataInputStream stream = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
+ this.iterator = new ReaderIterator<>(stream);
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public Iterable<Output> getIterable(){
+ return () -> iterator;
+ }
+
+ public void print(){
+ iterator.forEachRemaining(x -> System.out.println(x.toString()));
+
+ }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java
new file mode 100644
index 0000000..0d0e0b9
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.exception.WayangException;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PythonProcessCaller {
+
+ private Process process;
+ private Socket socket;
+ private ServerSocket serverSocket;
+ private boolean ready;
+
+ //TODO How to get the config
+ private Configuration configuration;
+
+ public PythonProcessCaller(ByteString serializedUDF){
+
+ //TODO create documentation to how to the configuration in the code
+ this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");
+ this.ready = false;
+ byte[] addr = new byte[4];
+ addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;
+
+ try {
+ /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
+ this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
+ ProcessBuilder pb = new ProcessBuilder(
+ Arrays.asList(
+ "python3",
+ this.configuration.getStringProperty("wayang.api.python.worker")
+ )
+ );
+ Map<String, String> workerEnv = pb.environment();
+ workerEnv.put("PYTHON_WORKER_FACTORY_PORT", String.valueOf(this.serverSocket.getLocalPort()));
+
+ // TODO See what is happening with ENV Python version
+ workerEnv.put("PYTHONPATH", "/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/:/Users/rodrigopardomeza/opt/anaconda3/");
+
+ pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
+ pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+ this.process = pb.start();
+
+
+ // Redirect worker stdout and stderr
+ //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
+
+ // Wait for it to connect to our socket
+ this.serverSocket.setSoTimeout(10000);
+
+ try {
+ this.socket = this.serverSocket.accept();
+ this.serverSocket.setSoTimeout(0);
+
+ if(socket.isConnected())
+ this.ready = true;
+
+ } catch (Exception e) {
+ System.out.println(e);
+ throw new WayangException("Python worker failed to connect back.", e);
+ }
+ } catch (Exception e){
+ System.out.println(e);
+ throw new WayangException("Python worker failed");
+ }
+ }
+
+ public Process getProcess() {
+ return process;
+ }
+
+ public Socket getSocket() {
+ return socket;
+ }
+
+ public boolean isReady(){
+ return ready;
+ }
+
+ public void close(){
+ try {
+ this.process.destroy();
+ this.socket.close();
+ this.serverSocket.close();
+ System.out.println("Everything closed");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java
new file mode 100644
index 0000000..09b394d
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.function.PythonUdf;
+import org.apache.wayang.core.api.exception.WayangException;
+
+public class PythonWorkerManager<Input, Output> {
+
+ private PythonUdf<Input, Output> udf;
+ private ByteString serializedUDF;
+ private Iterable<Input> inputIterator;
+
+ public PythonWorkerManager(
+ PythonUdf<Input, Output> udf,
+ ByteString serializedUDF,
+ Iterable<Input> input
+ ){
+ this.udf = udf;
+ this.serializedUDF = serializedUDF;
+ this.inputIterator = input;
+ }
+
+ public Iterable<Output> execute(){
+ PythonProcessCaller worker = new PythonProcessCaller(this.serializedUDF);
+
+ if(worker.isReady()){
+
+ ProcessFeeder<Input, Output> feed = new ProcessFeeder<>(
+ worker.getSocket(),
+ this.udf,
+ this.serializedUDF,
+ this.inputIterator
+ );
+ feed.send();
+ ProcessReceiver<Output> r = new ProcessReceiver<>(worker.getSocket());
+
+ //r.print();
+ return r.getIterable();
+ //return (Iterable<Output>) this.inputIterator;
+
+ } else{
+
+ int port = worker.getSocket().getLocalPort();
+ worker.close();
+ throw new WayangException("Not possible to work with the Socket provided on port: " + port);
+ }
+
+ }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java
new file mode 100644
index 0000000..1f90358
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class ReaderIterator <Output> implements Iterator<Output> {
+
+ private Output nextObj = null;
+ private boolean eos = false;
+ private boolean fst = false;
+ private DataInputStream stream = null;
+
+ public ReaderIterator(DataInputStream stream) {
+
+ this.stream = stream;
+ this.eos = false;
+ this.nextObj = null;
+ }
+
+ private Output read() {
+
+ int END_OF_DATA_SECTION = -1;
+
+ try {
+ int length = this.stream.readInt();
+
+ if (length > 0) {
+ byte[] obj = new byte[length];
+ stream.readFully(obj);
+ String s = new String(obj, StandardCharsets.UTF_8);
+ Output it = (Output) s;
+ return it;
+ } else if (length == END_OF_DATA_SECTION) {
+ this.eos = true;
+ return null;
+ }
+ } catch (IOException e) {
+ //e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+
+ if(!this.eos){
+ nextObj = read();
+ System.out.println(nextObj + " " + !this.eos);
+
+ /*To work with null values it is suppose to use -5
+ if(this.nextObj == null){
+ return false;
+ }*/
+
+ return !this.eos;
+ }
+
+ return false;
+ }
+
+ @Override
+ public Output next() {
+
+ if(!this.eos){
+ Output obj = nextObj;
+ nextObj = null;
+ return obj;
+ }
+
+ throw new NoSuchElementException();
+ }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUdf.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUdf.java
new file mode 100644
index 0000000..88134b3
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUdf.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.function;
+
+import org.apache.wayang.core.function.FunctionDescriptor;
+
+public interface PythonUdf<Input, Output> extends FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>>{
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedPythonFunction.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedPythonFunction.java
new file mode 100644
index 0000000..26382f4
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedPythonFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.function;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.executor.PythonWorkerManager;
+import org.apache.wayang.core.function.FunctionDescriptor;
+
+public class WrappedPythonFunction<Input, Output> implements FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>> {
+
+ private PythonUdf<Input, Output> myUDF;
+ private ByteString serializedUDF;
+
+ public WrappedPythonFunction(PythonUdf<Input, Output> myUDF, ByteString serializedUDF){
+ this.myUDF = myUDF;
+ this.serializedUDF = serializedUDF;
+ }
+
+ @Override
+ public Iterable<Output> apply(Iterable<Input> input) {
+
+ PythonWorkerManager<Input, Output> manager = new PythonWorkerManager<>(myUDF, serializedUDF, input);
+ Iterable<Output> output = manager.execute();
+ return output;
+ }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/python/worker.py b/wayang-api/wayang-api-python/src/main/python/worker.py
new file mode 100644
index 0000000..6e3d54a
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/python/worker.py
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import socket
+import struct
+import pickle
+import base64
+import re
+import sys
+
+
+class SpecialLengths(object):
+ END_OF_DATA_SECTION = -1
+ PYTHON_EXCEPTION_THROWN = -2
+ TIMING_DATA = -3
+ END_OF_STREAM = -4
+ NULL = -5
+ START_ARROW_STREAM = -6
+
+
+def read_int(stream):
+ length = stream.read(4)
+ if not length:
+ raise EOFError
+ res = struct.unpack("!i", length)[0]
+ return res
+
+
+class UTF8Deserializer:
+ """
+ Deserializes streams written by String.getBytes.
+ """
+
+ def __init__(self, use_unicode=True):
+ self.use_unicode = use_unicode
+
+ def loads(self, stream):
+ length = read_int(stream)
+ if length == SpecialLengths.END_OF_DATA_SECTION:
+ raise EOFError
+ elif length == SpecialLengths.NULL:
+ return None
+ s = stream.read(length)
+ return s.decode("utf-8") if self.use_unicode else s
+
+ def load_stream(self, stream):
+ try:
+ while True:
+ yield self.loads(stream)
+ except struct.error:
+ return
+ except EOFError:
+ return
+
+ def __repr__(self):
+ return "UTF8Deserializer(%s)" % self.use_unicode
+
+
+def write_int(p, outfile):
+ outfile.write(struct.pack("!i", p))
+
+
+def write_with_length(obj, stream):
+ serialized = obj.encode('utf-8')
+ if serialized is None:
+ raise ValueError("serialized value should not be None")
+ if len(serialized) > (1 << 31):
+ raise ValueError("can not serialize object larger than 2G")
+ write_int(len(serialized), stream)
+ stream.write(serialized)
+
+
+def dump_stream(iterator, stream):
+
+ for obj in iterator:
+ if type(obj) is str:
+ write_with_length(obj, stream)
+ ## elif type(obj) is list:
+ ## write_with_length(obj, stream)
+ print("Termine")
+ write_int(SpecialLengths.END_OF_DATA_SECTION, stream)
+ print("Escribi Fin")
+
+
+def process(infile, outfile):
+ """udf64 = os.environ["UDF"]
+ print("udf64")
+ print(udf64)
+ #serialized_udf = binascii.a2b_base64(udf64)
+ #serialized_udf = base64.b64decode(udf64)
+ serialized_udf = bytearray(udf64, encoding='utf-16')
+ # NOT VALID TO BE UTF8 serialized_udf = bytes(udf64, 'UTF-8')
+ print("serialized_udf")
+ print(serialized_udf)
+ # input to be ast.literal_eval(serialized_udf)
+ func = pickle.loads(serialized_udf, encoding="bytes")
+ print ("func")
+ print (func)
+ print(func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
+ # func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])"""
+
+
+
+ # TODO First we must receive the operator + UDF
+ """udf = lambda elem: elem.lower()
+
+ def func(it):
+ return sorted(it, key=udf)"""
+ udf_length = read_int(infile)
+ print("udf_length")
+ print(udf_length)
+ serialized_udf = infile.read(udf_length)
+ print("serialized_udf")
+ print(serialized_udf)
+ #base64_message = base64.b64decode(serialized_udf + "===")
+ #print("base64_message")
+ #print(base64_message)
+ func = pickle.loads(serialized_udf)
+ #func = ori.lala(serialized_udf)
+ #print (func)
+ #for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)
+
+ """print("example")
+ for x in func("2344|234|efrf|$#|ffrf"): print(x)"""
+ # TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types
+ iterator = UTF8Deserializer().load_stream(infile)
+ # out_iter = sorted(iterator, key=lambda elem: elem.lower())
+ out_iter = func(iterator)
+ dump_stream(iterator=out_iter, stream=outfile)
+
+
+def local_connect(port):
+ sock = None
+ errors = []
+ # Support for both IPv4 and IPv6.
+ # On most of IPv6-ready systems, IPv6 will take precedence.
+ for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+ af, socktype, proto, _, sa = res
+ try:
+ sock = socket.socket(af, socktype, proto)
+ # sock.settimeout(int(os.environ.get("SPARK_AUTH_SOCKET_TIMEOUT", 15)))
+ sock.settimeout(30)
+ sock.connect(sa)
+ # sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
+ sockfile = sock.makefile("rwb", 65536)
+ # _do_server_auth(sockfile, auth_secret)
+ return (sockfile, sock)
+ except socket.error as e:
+ emsg = str(e)
+ errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg))
+ sock.close()
+ sock = None
+ raise Exception("could not open socket: %s" % errors)
+
+
+if __name__ == '__main__':
+ print("Python version")
+ print (sys.version)
+ java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
+ sock_file, sock = local_connect(java_port)
+ process(sock_file, sock_file)
+ sock_file.flush()
+ exit()
diff --git a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
index ff3a99f..99b8470 100644
--- a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
+++ b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
@@ -15,4 +15,5 @@
# limitations under the License.
#
-# TODO: Add properties here
\ No newline at end of file
+wayang.api.python.worker = /Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/python/worker.py
+# wayang.api.python.worker = src/main/python/worker.py
diff --git a/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/BasicPython.java b/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/BasicPython.java
new file mode 100644
index 0000000..5108de2
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/BasicPython.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python;
+
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.basic.operators.*;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.function.FlatMapDescriptor;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.core.types.DataUnitType;
+import org.apache.wayang.java.Java;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+
+public class BasicPython {
+
+ public static URI createUri(String resourcePath) {
+ try {
+ return Thread.currentThread().getClass().getResource(resourcePath).toURI();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Illegal URI.", e);
+ }
+
+ }
+
+ @Test
+ public void testOnJava() throws URISyntaxException, IOException {
+
+ URI FILE_SOME_LINES_TXT = createUri("/python-lines.txt");
+
+ // Instantiate Rheem and activate the backend.
+ WayangContext rheemContext = new WayangContext().with(Java.basicPlugin());
+ TextFileSource textFileSource = new TextFileSource(FILE_SOME_LINES_TXT.toString());
+
+ TextFileSink<String> sink = new TextFileSink<String>(
+ "file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/basic_output.txt",
+ String.class
+ );
+
+ textFileSource.connectTo(0, sink, 0);
+
+ rheemContext.execute(new WayangPlan(sink));
+ }
+}
+
diff --git a/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/SortPython.java b/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/SortPython.java
new file mode 100644
index 0000000..7b74e77
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/SortPython.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.api.python;
+
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.basic.operators.*;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.function.FlatMapDescriptor;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.core.types.DataUnitType;
+import org.apache.wayang.java.Java;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+
+public class SortPython {
+ /*
+ public static URI createUri(String resourcePath) {
+ try {
+ return Thread.currentThread().getClass().getResource(resourcePath).toURI();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Illegal URI.", e);
+ }
+
+ }
+
+ @Test
+ public void testOnJava() throws URISyntaxException, IOException {
+
+ URI FILE_SOME_LINES_TXT = createUri("/python-lines.txt");
+
+ // Instantiate Rheem and activate the backend.
+ WayangContext rheemContext = new WayangContext().with(Java.basicPlugin());
+ TextFileSource textFileSource = new TextFileSource(FILE_SOME_LINES_TXT.toString());
+
+ // for each line (input) output an iterator of the words
+ FlatMapOperator<String, String> flatMapOperator
+ = new FlatMapOperator<>(
+ new FlatMapDescriptor<>(
+ line -> Arrays.asList(
+ (String[]) line.split(" ")
+ ),
+ String.class,
+ String.class
+ )
+ );
+
+ SortOperator<String, String> sortJava
+ = new SortOperator<String, String>(
+ new TransformationDescriptor<String, String>(
+ (l) -> l.toLowerCase(),
+ DataUnitType.createBasic(String.class),
+ DataUnitType.createBasic(String.class)
+ )
+ );
+
+ MapPartitionsOperator<String, String> sortPython =
+ new MapPartitionsOperator<String, String>(
+ new MapPartitionsDescriptor<String, String>(
+ new WrappedPythonFunction<String, String>(
+ l -> l
+ ),
+ String.class,
+ String.class
+ )
+ );
+
+ //URI outputfile = Thread.currentThread().getClass().getResource("/output.txt").toURI();
+
+ TextFileSink<String> sink = new TextFileSink<String>(
+ "file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/output.txt",
+ String.class
+ );
+
+ textFileSource.connectTo(0, flatMapOperator, 0);
+ flatMapOperator.connectTo(0, sortPython, 0);
+ sortPython.connectTo(0, sink, 0);
+
+ rheemContext.execute(new WayangPlan(sink));
+ }
+ */
+}
diff --git a/wayang-api/wayang-api-rest/pom.xml b/wayang-api/wayang-api-rest/pom.xml
new file mode 100644
index 0000000..ef86974
--- /dev/null
+++ b/wayang-api/wayang-api-rest/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>wayang-api</artifactId>
+ <groupId>org.apache.wayang</groupId>
+ <version>0.6.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>wayang-api-rest</artifactId>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-dependencies</artifactId>
+ <version>2.2.6.RELEASE</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+<!-- TODO add the version fo protobuf with parameter-->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.15.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-core</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-java</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-api-python</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-serializable</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-api-python</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>java8</id>
+ <activation>
+ <jdk>1.8</jdk>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-spark_2.11</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>java11</id>
+ <activation>
+ <jdk>11</jdk>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-spark_2.12</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+</project>
\ No newline at end of file
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/WayangRESTAPI.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/WayangRESTAPI.java
new file mode 100644
index 0000000..8b37b7c
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/WayangRESTAPI.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest;
+
+public class WayangRESTAPI {
+}
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/WayangApplication.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/WayangApplication.java
new file mode 100644
index 0000000..b9b6a76
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/WayangApplication.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class WayangApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(WayangApplication.class, args);
+ }
+}
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java
new file mode 100644
index 0000000..649c5ef
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring.decoder;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.basic.operators.MapPartitionsOperator;
+import org.apache.wayang.basic.operators.TextFileSink;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.basic.operators.UnionAllOperator;
+import org.apache.wayang.commons.serializable.OperatorProto;
+import org.apache.wayang.commons.serializable.PlanProto;
+import org.apache.wayang.commons.serializable.WayangPlanProto;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.Base64;
+
+public class WayangPlanBuilder {
+
+ private WayangPlan wayangPlan;
+ private WayangContext wayangContext;
+
+ public WayangPlanBuilder(FileInputStream planFile){
+ try {
+
+ WayangPlanProto plan = WayangPlanProto.parseFrom(planFile);
+
+ this.wayangContext = buildContext(plan);
+ this.wayangPlan = buildPlan(plan);
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public WayangPlanBuilder(String writtenPlan){
+
+ System.out.println(writtenPlan);
+ byte[] message = Base64.getDecoder().decode(writtenPlan);
+ System.out.println(message);
+
+ try {
+ WayangPlanProto plan = WayangPlanProto.parseFrom(message);
+
+ this.wayangContext = buildContext(plan);
+ this.wayangPlan = buildPlan(plan);
+ } catch (InvalidProtocolBufferException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ private WayangContext buildContext(WayangPlanProto plan){
+
+ WayangContext ctx = new WayangContext();
+ plan.getContext().getPlatformsList().forEach(platform -> {
+ if (platform.getNumber() == 0)
+ ctx.with(Java.basicPlugin());
+ else if (platform.getNumber() == 1)
+ ctx.with(Spark.basicPlugin());
+ });
+ //ctx.with(Spark.basicPlugin());
+
+ return ctx;
+ }
+
+ private WayangPlan buildPlan(WayangPlanProto plan){
+
+ System.out.println(plan);
+
+ PlanProto planProto = plan.getPlan();
+ LinkedList<OperatorProto> protoList = new LinkedList<>();
+ planProto.getSourcesList().forEach(protoList::addLast);
+
+ Map<String, OperatorBase> operators = new HashMap<>();
+ List<OperatorBase> sinks = new ArrayList<>();
+ while(! protoList.isEmpty()) {
+
+ OperatorProto proto = protoList.pollFirst();
+
+ /* Checking if protoOperator can be connected to the current WayangPlan*/
+ boolean processIt;
+ if(proto.getType().equals("source")) processIt = true;
+
+ else {
+ /* Checking if ALL predecessors were already processed */
+ processIt = true;
+ for(String predecessor : proto.getPredecessorsList()){
+ if (!operators.containsKey(predecessor)) {
+ processIt = false;
+ break;
+ }
+ }
+ }
+
+ /* Operators should not be processed twice*/
+ if(operators.containsKey(proto.getId())) processIt = false;
+
+ if(processIt) {
+
+ /* Create and store Wayang operator */
+ OperatorBase operator = createOperatorByType(proto);
+ operators.put(proto.getId(), operator);
+
+ /*TODO Connect with predecessors requires more details in connection slot*/
+ int order = 0;
+ for (String pre_id : proto.getPredecessorsList()) {
+
+ OperatorBase predecessor = operators.get(pre_id);
+ /* Only works without replicate topology */
+ predecessor.connectTo(0, operator, order);
+ order++;
+
+ if(proto.getType().equals("sink")){
+ sinks.add(operator);
+ //if(!sinks.contains(operator)) {
+ // sinks.add(operator);
+ //}
+ }
+ }
+
+ /*List of OperatorProto successors
+ * They will be added to the protoList
+ * nevertheless they must be processed only if the parents are in operators list */
+ List<OperatorProto> listSuccessors = planProto.getOperatorsList()
+ .stream()
+ .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+ .collect(Collectors.toList());
+ for (OperatorProto successor : listSuccessors){
+ if(!protoList.contains(successor)){
+ protoList.addLast(successor);
+ }
+ }
+
+ List<OperatorProto> sinkSuccessors = planProto.getSinksList()
+ .stream()
+ .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+ .collect(Collectors.toList());
+ for (OperatorProto successor : sinkSuccessors){
+ if(!protoList.contains(successor)){
+ protoList.addLast(successor);
+ }
+ }
+
+ } else {
+
+ /* In case we cannot process it yet, It must be added again at the end*/
+ protoList.addLast(proto);
+ }
+ }
+
+ WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
+ return wayangPlan;
+ }
+
+ public OperatorBase createOperatorByType(OperatorProto operator){
+
+ System.out.println("Typo: " + operator.getType());
+ switch(operator.getType()){
+ case "source":
+ try {
+ String source_path = operator.getPath();
+ URL url = new File(source_path).toURI().toURL();
+ return new TextFileSource(url.toString());
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ break;
+ case "sink":
+ try {
+ String sink_path = operator.getPath();
+ URL url = new File(sink_path).toURI().toURL();
+ return new TextFileSink<String>(
+ url.toString(),
+ String.class
+ );
+
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ break;
+ case "map_partition":
+ return new MapPartitionsOperator<>(
+ new MapPartitionsDescriptor<String, String>(
+ new WrappedPythonFunction<String, String>(
+ l -> l,
+ operator.getUdf()
+ ),
+ String.class,
+ String.class
+ )
+ );
+
+ case "union":
+ return new UnionAllOperator<String>(
+ String.class
+ );
+
+ }
+
+ throw new WayangException("Operator Type not supported");
+ }
+
+ public WayangContext getWayangContext() {
+ return wayangContext;
+ }
+
+ public WayangPlan getWayangPlan() {
+ return wayangPlan;
+ }
+}
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java
new file mode 100644
index 0000000..4f48863
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring.general;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
+import org.apache.wayang.basic.operators.*;
+import org.apache.wayang.commons.serializable.OperatorProto;
+import org.apache.wayang.commons.serializable.PlanProto;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+import org.apache.wayang.commons.serializable.WayangPlanProto;
+import org.springframework.web.multipart.MultipartFile;
+
+
+@RestController
+public class WayangController {
+
+ @GetMapping("/plan/create/fromfile")
+ public String planFromFile(
+ //@RequestParam("file") MultipartFile file
+ ){
+
+ try {
+ FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
+ WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);
+
+ /*TODO ADD id to executions*/
+ wpb.getWayangContext().execute(wpb.getWayangPlan());
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return "Builder works";
+ }
+
+ @PostMapping("/plan/create")
+ public String planFromMessage(
+ @RequestParam("message") String message
+ ){
+
+ WayangPlanBuilder wpb = new WayangPlanBuilder(message);
+
+ /*TODO ADD id to executions*/
+ wpb.getWayangContext().execute(wpb.getWayangPlan());
+
+ return "";
+ }
+
+ @GetMapping("/")
+ public String all(){
+ System.out.println("detected!");
+
+ try {
+ FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
+ WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);
+
+ WayangContext wc = buildContext(plan);
+ WayangPlan wp = buildPlan(plan);
+
+ System.out.println("Plan!");
+ System.out.println(wp.toString());
+
+ wc.execute(wp);
+ return("Works!");
+
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return "Not working";
+ }
+
+ private WayangContext buildContext(WayangPlanProto plan){
+
+ WayangContext ctx = new WayangContext();
+ plan.getContext().getPlatformsList().forEach(platform -> {
+ if (platform.getNumber() == 0)
+ ctx.with(Java.basicPlugin());
+ else if (platform.getNumber() == 1)
+ ctx.with(Spark.basicPlugin());
+ });
+ //ctx.with(Spark.basicPlugin());
+
+ return ctx;
+ }
+
+ private WayangPlan buildPlan(WayangPlanProto plan){
+
+ System.out.println(plan);
+
+ PlanProto planProto = plan.getPlan();
+ LinkedList<OperatorProto> protoList = new LinkedList<>();
+ planProto.getSourcesList().forEach(protoList::addLast);
+
+ Map<String, OperatorBase> operators = new HashMap<>();
+ List<OperatorBase> sinks = new ArrayList<>();
+ while(! protoList.isEmpty()) {
+
+ OperatorProto proto = protoList.pollFirst();
+
+ /* Checking if protoOperator can be connected to the current WayangPlan*/
+ boolean processIt;
+ if(proto.getType().equals("source")) processIt = true;
+
+ else {
+ /* Checking if ALL predecessors were already processed */
+ processIt = true;
+ for(String predecessor : proto.getPredecessorsList()){
+ if (!operators.containsKey(predecessor)) {
+ processIt = false;
+ break;
+ }
+ }
+ }
+
+ /* Operators should not be processed twice*/
+ if(operators.containsKey(proto.getId())) processIt = false;
+
+ if(processIt) {
+
+ /* Create and store Wayang operator */
+ OperatorBase operator = createOperatorByType(proto);
+ operators.put(proto.getId(), operator);
+
+ /*TODO Connect with predecessors requires more details in connection slot*/
+ int order = 0;
+ for (String pre_id : proto.getPredecessorsList()) {
+
+ OperatorBase predecessor = operators.get(pre_id);
+ /* Only works without replicate topology */
+ predecessor.connectTo(0, operator, order);
+ order++;
+
+ if(proto.getType().equals("sink")){
+ sinks.add(operator);
+ //if(!sinks.contains(operator)) {
+ // sinks.add(operator);
+ //}
+ }
+ }
+
+ /*List of OperatorProto successors
+ * They will be added to the protoList
+ * nevertheless they must be processed only if the parents are in operators list */
+ List<OperatorProto> listSuccessors = planProto.getOperatorsList()
+ .stream()
+ .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+ .collect(Collectors.toList());
+ for (OperatorProto successor : listSuccessors){
+ if(!protoList.contains(successor)){
+ protoList.addLast(successor);
+ }
+ }
+
+ List<OperatorProto> sinkSuccessors = planProto.getSinksList()
+ .stream()
+ .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+ .collect(Collectors.toList());
+ for (OperatorProto successor : sinkSuccessors){
+ if(!protoList.contains(successor)){
+ protoList.addLast(successor);
+ }
+ }
+
+ } else {
+
+ /* In case we cannot process it yet, It must be added again at the end*/
+ protoList.addLast(proto);
+ }
+ }
+
+ WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
+ return wayangPlan;
+ }
+
+ public OperatorBase createOperatorByType(OperatorProto operator){
+
+ System.out.println("Typo: " + operator.getType());
+ switch(operator.getType()){
+ case "source":
+ try {
+ String source_path = operator.getPath();
+ URL url = new File(source_path).toURI().toURL();
+ return new TextFileSource(url.toString());
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ break;
+ case "sink":
+ try {
+ String sink_path = operator.getPath();
+ URL url = new File(sink_path).toURI().toURL();
+ return new TextFileSink<String>(
+ url.toString(),
+ String.class
+ );
+
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ break;
+ case "reduce_by_key":
+ try {
+ /* Function to be applied in Python workers */
+ ByteString function = operator.getUdf();
+
+ /* Has dimension or positions that compose GroupKey */
+ Map<String, String> parameters = operator.getParametersMap();
+
+ PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
+ operator.getParametersMap(),
+ operator.getUdf() ,
+ String.class,
+ String.class,
+ false
+ );
+
+ String sink_path = operator.getPath();
+ URL url = new File(sink_path).toURI().toURL();
+ return new TextFileSink<String>(
+ url.toString(),
+ String.class
+ );
+
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ break;
+ case "map_partition":
+ return new MapPartitionsOperator<>(
+ new MapPartitionsDescriptor<String, String>(
+ new WrappedPythonFunction<String, String>(
+ l -> l,
+ operator.getUdf()
+ ),
+ String.class,
+ String.class
+ )
+ );
+
+ case "union":
+ return new UnionAllOperator<String>(
+ String.class
+ );
+
+ }
+
+ throw new WayangException("Operator Type not supported");
+ }
+
+ public static URI createUri(String resourcePath) {
+ try {
+ return Thread.currentThread().getClass().getResource(resourcePath).toURI();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Illegal URI.", e);
+ }
+
+ }
+
+}
diff --git a/wayang-api/wayang-api-rest/src/main/resources/application.properties b/wayang-api/wayang-api-rest/src/main/resources/application.properties
new file mode 100644
index 0000000..14b1a83
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/resources/application.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+server.port = 8080
diff --git a/wayang-commons/pom.xml b/wayang-commons/pom.xml
index a0f1d18..58db211 100644
--- a/wayang-commons/pom.xml
+++ b/wayang-commons/pom.xml
@@ -209,6 +209,7 @@
<module>wayang-core</module>
<module>wayang-basic</module>
<module>wayang-utils-profile-db</module>
+ <module>wayang-serializable</module>
</modules>
</project>
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PyWayangReduceByOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PyWayangReduceByOperator.java
new file mode 100644
index 0000000..1951b31
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PyWayangReduceByOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.operators;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
+import org.apache.wayang.core.types.DataSetType;
+
+import java.util.Map;
+
+public class PyWayangReduceByOperator<Type, Key> extends UnaryToUnaryOperator<Type, Type> {
+
+ protected final Map<String, String> parameters;
+ protected final ByteString reduceDescriptor;
+
+ public PyWayangReduceByOperator(Map<String, String> parameters,
+ ByteString reduceDescriptor,
+ DataSetType<Type> inputType, DataSetType<Type> outputType, boolean isSupportingBroadcastInputs) {
+ super(inputType, outputType, isSupportingBroadcastInputs);
+ this.parameters = parameters;
+ this.reduceDescriptor = reduceDescriptor;
+ }
+
+ public PyWayangReduceByOperator(Map<String, String> parameters,
+ ByteString reduceDescriptor,
+ Class<Type> inputType, Class<Type> outputType, boolean isSupportingBroadcastInputs) {
+ super(DataSetType.createDefault(inputType), DataSetType.createDefault(outputType), isSupportingBroadcastInputs);
+ this.parameters = parameters;
+ this.reduceDescriptor = reduceDescriptor;
+ }
+
+ public PyWayangReduceByOperator(PyWayangReduceByOperator<Type, Type> that) {
+ super(that);
+ this.parameters = that.getParameters();
+ this.reduceDescriptor = that.getReduceDescriptor();
+ }
+
+ public Map<String, String> getParameters() {
+ return this.parameters;
+ }
+
+ public ByteString getReduceDescriptor() {
+ return this.reduceDescriptor;
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-commons/wayang-serializable/pom.xml b/wayang-commons/wayang-serializable/pom.xml
new file mode 100644
index 0000000..87b17ea
--- /dev/null
+++ b/wayang-commons/wayang-serializable/pom.xml
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>wayang-commons</artifactId>
+ <groupId>org.apache.wayang</groupId>
+ <version>0.6.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>wayang-serializable</artifactId>
+ <packaging>jar</packaging>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <protobuf.version>3.15.2</protobuf.version>
+ <protoc.version>3.15.2</protoc.version>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.5.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-python</goal>
+ <goal>test-compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <!-- Copy all the protobuf code to the folder PyWayang -->
+ <resources>
+ <resource>
+ <!-- TODO: watch if is it possible to put it with parameters -->
+ <directory>${basedir}/target/generated-sources/protobuf/python</directory><!-- from -->
+ <targetPath>${basedir}/../../pywayang/protobuf</targetPath><!-- to -->
+ <includes><!-- Only python files -->
+ <include>**/*.py</include>
+ </includes>
+ </resource>
+ </resources>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>protof-unix</id>
+ <activation>
+ <os>
+ <family>unix</family>
+ </os>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <configuration>
+ <!--This one is for Linux installation -->
+ <protocExecutable>/usr/bin/protoc</protocExecutable>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>protof-mac</id>
+ <activation>
+ <os>
+ <family>mac</family>
+ </os>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <configuration>
+ <!--This one is for Mac executions-->
+ <protocExecutable>/usr/local/bin/protoc</protocExecutable>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
\ No newline at end of file
diff --git a/wayang-commons/wayang-serializable/src/main/proto/pywayangplan.proto b/wayang-commons/wayang-serializable/src/main/proto/pywayangplan.proto
new file mode 100644
index 0000000..5fb373e
--- /dev/null
+++ b/wayang-commons/wayang-serializable/src/main/proto/pywayangplan.proto
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax="proto2";
+
+option java_multiple_files = true;
+option java_package = "org.apache.wayang.commons.serializable";
+
+/*Java Operator or concatenated Python Pipeline*/
+message OperatorProto{
+ optional string id = 1;
+ optional string type = 2;
+ optional bytes udf = 3;
+ optional string path = 4;
+ // repeated OperatorProto predecessors = 5;
+ repeated string predecessors = 5;
+ // repeated OperatorProto successors = 6;
+ repeated string successors = 6;
+ map<string, string> parameters = 7;
+}
+
+message PlanProto{
+
+ enum TypesProto{
+ string = 0;
+ integer = 1;
+ }
+
+ repeated OperatorProto sources = 1;
+ repeated OperatorProto operators = 2;
+ repeated OperatorProto sinks = 3;
+ optional TypesProto input = 4;
+ optional TypesProto output = 5;
+
+}
+
+message ContextProto{
+
+ enum PlatformProto{
+ java = 0;
+ spark = 1;
+ }
+
+ repeated PlatformProto platforms = 1;
+}
+
+message WayangPlanProto{
+
+ optional PlanProto plan = 1;
+ optional ContextProto context = 2;
+
+}
\ No newline at end of file
diff --git a/wayang-commons/wayang-serializable/src/main/python/pyplangenerator.sh b/wayang-commons/wayang-serializable/src/main/python/pyplangenerator.sh
new file mode 100644
index 0000000..f86efb5
--- /dev/null
+++ b/wayang-commons/wayang-serializable/src/main/python/pyplangenerator.sh
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+protoc -I=/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf \
+--python_out=/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/protobuf/ \
+/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/pywayangplan.proto
\ No newline at end of file