Wayang 8 (#89)

* [WAYANG-8][API-PYTHON] Creation of functions to be consumed by MapPartitionsDescriptor

* [WAYANG-8][API-PYTHON] Included PythonProcessCaller that manages the python process execution and Java - Python connection

* [WAYANG-8][API-PYTHON] POM fixes plus minor test

* [WAYANG-8][API-PYTHON] Python connection through TCP socket enabled

* [WAYANG-8][API-PYTHON] Writing from Java to Python. Not taking into care about Iterator Datatypes.

* [WAYANG-8][API-PYTHON] Java Socket Writter improvements

* [WAYANG-8][API-PYTHON] Python UTF8 Deserializer included

* [WAYANG-8][API-PYTHON] Python UTF8 Reading Stream

* [WAYANG-8][API-PYTHON] Getting results from Python and continue processing

* [WAYANG-8][API-PYTHON] Config files for pywayang

* [WAYANG-8][API-PYTHON] Structures to save the plan with functional fashion plus most basic operators

* [WAYANG-8][API-PYTHON] Main program to test plan executions locally

* [WAYANG-8][API-PYTHON] Minor comments and
TODOs

* [WAYANG-8][API-PYTHON] Most basic test for protobuff communication with java

* [WAYANG-8][API-PYTHON] Addjacency list from PyWayang Plan

* [WAYANG-8][API-PYTHON] Graph traversal implementation with visitor pattern

* [WAYANG-8][API-PYTHON] Protobuf python message generator

* [WAYANG-8][API-PYTHON] Wayang Web Service project structure

* [WAYANG-8][API-PYTHON] Protobuf message generation fixes

* [WAYANG-8][API-PYTHON] Wayang Web Service executes most basic plans directly

* [WAYANG-8][API-PYTHON] Receiving Base64 passing to byte array and unpickling

* [WAYANG-8][API-PYTHON] Updated classes to process a single Serialized UDF

* [WAYANG-8][API-PYTHON] New test with single UDF

* [WAYANG-8][API-PYTHON] Protobuf command

* [WAYANG-8][API-PYTHON] Protobuf message template updated

* [WAYANG-8][API-PYTHON] POM fixes

* [WAYANG-8][API-PYTHON] License comments added

* [WAYANG-8][API-PYTHON] Correction on missing licenses

* [WAYANG-8][API-PYTHON] Serializable module creation

* [WAYANG-8][API-PYTHON] adding protoc to travis

* [WAYANG-8][API-PYTHON] protoc executable path correction

* [WAYANG-8][API-PYTHON] Commenting objc_class_prefix

* [WAYANG-8][API-PYTHON] Obtaining pipelines

* [WAYANG-8][API-PYTHON] Dataquanta writing message

* [WAYANG-8][API-PYTHON] Plan writer pipeline based adjustments

* [WAYANG-8][API-PYTHON] Operator Python executable indicator

* [WAYANG-8][API-PYTHON] Plan writer improved to use less sockets

* [WAYANG-8][API-PYTHON] New version of Wayang protobuf message

* [WAYANG-8][API-PYTHON] Wayang REST improved to allow multi pipelined executions

* [WAYANG-8][API-PYTHON] More test programs

* [WAYANG-8][API-PYTHON] Commentaries and logging for Graph module

* [WAYANG-8][API-PYTHON] Commentaries and logging for Orchestrator module

* [WAYANG-8][API-PYTHON] Commentaries and logging for Protobuf module

* [WAYANG-8][API-PYTHON] Fix usage of relative paths

* [WAYANG-8][API-PYTHON] Scripts to compile protobuf has been deleted. Now Maven executes them

* [WAYANG-8][API-PYTHON] Execution Log configuration

* [WAYANG-8][API-PYTHON] Fix - Python Map partition with single operator

* [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan

* [WAYANG-8][API-PYTHON] Plugin selection through Plan Descriptor

* [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan with Spark Execution

* [WAYANG-8][API-PYTHON] Pywayang sends protobuf message in API request as bytes using base64

* [WAYANG-8][API-PYTHON] New Operators Flatmap group by, reduce and Reduce By Key. Only Python Side.

* [WAYANG-8][API-PYTHON] Protobuf Wayang Plan message updated to allow more Complex Java-Python Operators

* [WAYANG-8][API-PYTHON] Adding TPC-H 1st Test

* [WAYANG-8][API-PYTHON] Last changes, not working

* [WAYANG-8] Fixing errors with dependencies

* [WAYANG-8] Fix to Pom versions problem

* [WAYANG-8] Protoc path updated

* [WAYANG-8] Correction in the pom.xml for flags

Signed-off-by: bertty <bertty@apache.org>

Co-authored-by: berttty <bertty@scalytics.io>
Co-authored-by: Bertty Contreras-Rojas <bertty@databloom.ai>
diff --git a/.travis.yml b/.travis.yml
index efb62ac..d00414c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -48,6 +48,7 @@
   - echo '  </server>' >> ~/.m2/settings.xml
   - echo ' </servers>' >> ~/.m2/settings.xml
   - echo '</settings>' >> ~/.m2/settings.xml
+  - sudo apt-get install -y protobuf-compiler
   - mkdir -p travis/tmp
   - echo "#" >> travis/wayang.properties
   - echo "# Licensed to the Apache Software Foundation (ASF) under one or more" >> travis/wayang.properties
diff --git a/pom.xml b/pom.xml
index 3ff48f9..2f7fadd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1248,6 +1248,8 @@
                         <exclude>**/README.md</exclude>
                         <exclude>**/general-todos.md</exclude>
                         <exclude>**/scala_1*</exclude>
+
+                        <exclude>**/*pb2.py</exclude>
                     </excludes>
                 </configuration>
             </plugin>
@@ -1278,7 +1280,7 @@
                     </includedLicenses>
                     <failOnBlacklist>true</failOnBlacklist>
                     <excludedGroups>
-                        org.apache.spark.*|org.apache.hadoop.*|org.apache.giraph.*|org.antlr.*|junit.*|org.graphchi.*
+                        org.apache.spark.*|org.apache.hadoop.*|org.apache.giraph.*|org.antlr.*|junit.*|org.graphchi.*|org.springframework.*
                     </excludedGroups>
                     <excludeTransitiveDependencies>true</excludeTransitiveDependencies>
                 </configuration>
diff --git a/pywayang/config/__init__.py b/pywayang/config/__init__.py
new file mode 100644
index 0000000..008475c
--- /dev/null
+++ b/pywayang/config/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from config.config_reader import get_source_types
+from config.config_reader import get_sink_types
+from config.config_reader import get_boundary_types
diff --git a/pywayang/config/config_reader.py b/pywayang/config/config_reader.py
new file mode 100644
index 0000000..c8f5873
--- /dev/null
+++ b/pywayang/config/config_reader.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import configparser
+import os
+
+
+def get_boundary_types():
+    config = configparser.ConfigParser()
+    config.sections()
+    config.read('../config/pywayang_config.ini')
+    boundary_types = dict(config.items('BOUNDARY_TYPES'))
+    boundary_types.pop("variable_to_access")
+    return boundary_types.values()
+
+
+def get_source_types():
+    config = configparser.ConfigParser()
+    #print("path: ", os.getcwd())
+    config.read("../config/pywayang_config.ini")
+    source_types = dict(config.items('SOURCE_TYPES'))
+    source_types.pop("variable_to_access")
+    return source_types.values()
+    #sections_list = config.sections()
+    #for section in sections_list:
+    #    print(section)
+    #print("source_types")
+    #for x in source_types.values():
+    #    print(x)
+
+def get_sink_types():
+    config = configparser.ConfigParser()
+    #print("path: ", os.getcwd())
+    config.read("../config/pywayang_config.ini")
+    sink_types = dict(config.items('SINK_TYPES'))
+    sink_types.pop("variable_to_access")
+    return sink_types.values()
\ No newline at end of file
diff --git a/pywayang/config/pywayang_config.ini b/pywayang/config/pywayang_config.ini
new file mode 100644
index 0000000..78cc2b4
--- /dev/null
+++ b/pywayang/config/pywayang_config.ini
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[DEFAULT]
+variable_to_access = value
+
+[INPUT]
+txnname_mod = string1
+txnmemo_mod = string2
+
+[MODIFY]
+txnname_mod = string3
+txnmemo_mod = string4
+
+[BOUNDARY_TYPES]
+boundary_type_1 = union
+
+[SOURCE_TYPES]
+source_type_1 = source
+source_type_2 = text
+
+[SINK_TYPES]
+sink_type_1 = sink
+sink_type_2 = sonk
\ No newline at end of file
diff --git a/pywayang/graph/__init__.py b/pywayang/graph/__init__.py
new file mode 100644
index 0000000..17e2deb
--- /dev/null
+++ b/pywayang/graph/__init__.py
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import graph.graph
+import graph.node
\ No newline at end of file
diff --git a/pywayang/graph/graph.py b/pywayang/graph/graph.py
new file mode 100644
index 0000000..be7a32f
--- /dev/null
+++ b/pywayang/graph/graph.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from graph.node import Node
+import logging
+
+
+# Adjacency Matrix used to analise the plan
+class Graph:
+    def __init__(self):
+        self.graph = {}
+        self.nodes_no = 0
+        self.nodes = []
+
+    # Fills the Graph
+    def populate(self, sinks):
+        for sink in iter(sinks):
+            self.process_operator(sink)
+
+    # Add current operator and set dependencies
+    def process_operator(self, operator):
+        self.add_node(operator.operator_type, operator.id, operator)
+
+        if len(operator.previous) > 0:
+            for parent in operator.previous:
+                if parent:
+                    self.add_node(parent.operator_type, parent.id, parent)
+                    self.add_link(operator.id, parent.id, 1)
+                    self.process_operator(parent)
+
+    def add_node(self, name, id, operator):
+        if id in self.nodes:
+            return
+
+        self.nodes_no += 1
+        self.nodes.append(id)
+        new_node = Node(name, id, operator)
+
+        self.graph[id] = new_node
+
+    def add_link(self, id_child, id_parent, e):
+        if id_child in self.nodes:
+            if id_parent in self.nodes:
+                self.graph[id_child].add_predecessor(id_parent, e)
+                self.graph[id_parent].add_successor(id_child, e)
+
+    def print_adjlist(self):
+
+        for key in self.graph:
+            logging.debug("Node: ", self.graph[key].operator_type, " - ", key)
+            for key2 in self.graph[key].predecessors:
+                logging.debug("- Parent: ", self.graph[key2].operator_type, " - ", self.graph[key].predecessors[key2], " - ", key2)
+            for key2 in self.graph[key].successors:
+                logging.debug("- Child: ", self.graph[key2].operator_type, " - ", self.graph[key].successors[key2], " - ", key2)
+
+    def get_node(self, id):
+        return self.graph[id]
diff --git a/pywayang/graph/node.py b/pywayang/graph/node.py
new file mode 100644
index 0000000..d0d696f
--- /dev/null
+++ b/pywayang/graph/node.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import abc
+
+
+class Element(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def accept(self, visitor, udf, orientation, last_iter):
+        pass
+
+
+# Describes an Operator in the Graph
+class Node(Element):
+    def __init__(self, operator_type, id, operator):
+        self.operator_type = operator_type
+        self.id = id
+        self.predecessors = {}
+        self.successors = {}
+        self.python_exec = operator.python_exec
+
+        # Temporal
+        self.operator = operator
+
+    def add_predecessor(self, id_parent, e):
+        self.predecessors[id_parent] = e
+
+    def add_successor(self, id_child, e):
+        self.successors[id_child] = e
+
+    # Nodes are visited by objects of class Visitant.
+    # Visitants are being used to execute a UDF through the Graph
+    def accept(self, visitor, udf, orientation, last_iter):
+        visitor.visit_node(self, udf, orientation, last_iter)
diff --git a/pywayang/graph/traversal.py b/pywayang/graph/traversal.py
new file mode 100644
index 0000000..e2dd851
--- /dev/null
+++ b/pywayang/graph/traversal.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from graph.visitant import Visitant
+import logging
+
+
+# Defines how a UDF will be applied over the Graph
+class Traversal:
+
+    def __init__(self, graph, origin, udf):
+        self.graph = graph
+        self.origin = origin
+        self.udf = udf
+        self.app = Visitant(graph, [])
+
+        # Starting from Sinks or Sources sets an specific orientation
+        if origin[0].source:
+            self.orientation = "successors"
+        elif origin[0].sink:
+            self.orientation = "predecessors"
+        else:
+            logging.error("Origin point to traverse the plan wrongly defined")
+            return
+
+        for operator in iter(origin):
+            logging.debug("operator origin: " + str(operator.id))
+            node = graph.get_node(operator.id)
+            self.app.visit_node(
+                node=node,
+                udf=self.udf,
+                orientation=self.orientation,
+                last_iter=None
+            )
+
+    def get_collected_data(self):
+        return self.app.get_collection()
diff --git a/pywayang/graph/visitant.py b/pywayang/graph/visitant.py
new file mode 100644
index 0000000..3d2f874
--- /dev/null
+++ b/pywayang/graph/visitant.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import abc
+import logging
+
+
+class Visitor(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def visit_node(self, node, udf, orientation, last_iter):
+        pass
+
+
+# Applies a UDF in current Node
+class Visitant(Visitor):
+
+    def __init__(self, graph, results):
+        self.collection = results
+        self.graph = graph
+
+    # UDF can store results in ApplyFunction.collection whenever its requires.
+    # last_iter has the generated current value obtained in the previous iteration
+    def visit_node(self, node, udf, orientation, last_iter):
+        logging.debug("Applying UDf" + str(orientation))
+        current_value = udf(node, last_iter, self.collection)
+        logging.debug("orientation result " + str(getattr(node, orientation)))
+        next_iter = getattr(node, orientation)
+        if len(next_iter) > 0:
+            for next_iter_id in next_iter:
+                if next_iter_id:
+                    logging.debug("next_id: " + str(next_iter_id))
+                    next_iter_node = self.graph.get_node(next_iter_id)
+                    logging.debug("next_iter_node: " + next_iter_node.operator_type + " " + str(next_iter_node.id))
+                    next_iter_node.accept(visitor=self, udf=udf, orientation=orientation, last_iter=current_value)
+        pass
+
+    def get_collection(self):
+        return self.collection
diff --git a/pywayang/orchestrator/__init__.py b/pywayang/orchestrator/__init__.py
new file mode 100644
index 0000000..ed7d0ac
--- /dev/null
+++ b/pywayang/orchestrator/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import orchestrator.plan
+import orchestrator.dataquanta
+import graph.graph
diff --git a/pywayang/orchestrator/dataquanta.py b/pywayang/orchestrator/dataquanta.py
new file mode 100644
index 0000000..7d700eb
--- /dev/null
+++ b/pywayang/orchestrator/dataquanta.py
@@ -0,0 +1,330 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.operator import Operator
+from graph.graph import Graph
+from graph.traversal import Traversal
+from protobuf.planwriter import MessageWriter
+import itertools
+import collections
+import logging
+from functools import reduce
+import operator
+
+
+# Wraps a Source operation to create an iterable
+class DataQuantaBuilder:
+    def __init__(self, descriptor):
+        self.descriptor = descriptor
+
+    def source(self, source):
+
+        if type(source) is str:
+            source_ori = open(source, "r")
+        else:
+            source_ori = source
+        return DataQuanta(
+            Operator(
+                operator_type="source",
+                udf=source,
+                iterator=iter(source_ori),
+                previous=[],
+                python_exec=False
+            ),
+            descriptor=self.descriptor
+        )
+
+
+# Wraps an operation over an iterable
+class DataQuanta:
+    def __init__(self, operator=None, descriptor=None):
+        self.operator = operator
+        self.descriptor = descriptor
+        if self.operator.is_source():
+            self.descriptor.add_source(self.operator)
+        if self.operator.is_sink():
+            self.descriptor.add_sink(self.operator)
+
+    # Operational Functions
+    def filter(self, udf):
+        def func(iterator):
+            return filter(udf, iterator)
+
+        return DataQuanta(
+            Operator(
+                operator_type="filter",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    def flatmap(self, udf):
+
+        def auxfunc(iterator):
+            return itertools.chain.from_iterable(map(udf, iterator))
+
+        def func(iterator):
+            mapped = map(udf, iterator)
+            flattened = flatten_single_dim(mapped)
+            yield from flattened
+
+        def flatten_single_dim(mapped):
+            for item in mapped:
+                for subitem in item:
+                    yield subitem
+
+        return DataQuanta(
+            Operator(
+                operator_type="flatmap",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    def group_by(self, udf):
+        def func(iterator):
+            # TODO key should be given by "udf"
+            return itertools.groupby(iterator, key=operator.itemgetter(0))
+            #return itertools.groupby(sorted(iterator), key=itertools.itemgetter(0))
+
+        return DataQuanta(
+            Operator(
+                operator_type="group_by",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    def map(self, udf):
+        def func(iterator):
+            return map(udf, iterator)
+
+        return DataQuanta(
+            Operator(
+                operator_type="map",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    # Key specifies pivot dimensions
+    # UDF specifies reducer function
+    def reduce_by_key(self, keys, udf):
+
+        op = Operator(
+            operator_type="reduce_by_key",
+            udf=udf,
+            previous=[self.operator],
+            python_exec=False
+        )
+
+        print(len(keys), keys)
+        for i in range(0, len(keys)):
+            """if keys[i] is int:
+                op.set_parameter("vector_position|"+str(i), keys[i])
+            else:
+                op.set_parameter("dimension_key|"+str(i), keys[i])"""
+
+            # TODO maybe would be better just leave the number as key
+            op.set_parameter("dimension|"+str(i+1), keys[i])
+
+        return DataQuanta(
+            op,
+            descriptor=self.descriptor
+        )
+
+    def reduce(self, udf):
+        def func(iterator):
+            return reduce(udf, iterator)
+
+        return DataQuanta(
+            Operator(
+                operator_type="reduce",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    def sink(self, path, end="\n"):
+        def consume(iterator):
+            with open(path, 'w') as f:
+                for x in iterator:
+                    f.write(str(x) + end)
+
+        def func(iterator):
+            consume(iterator)
+            # return self.__run(consume)
+
+        return DataQuanta(
+            Operator(
+                operator_type="sink",
+
+                udf=path,
+                # To execute directly uncomment
+                # udf=func,
+
+                previous=[self.operator],
+                python_exec=False
+            ),
+            descriptor=self.descriptor
+        )
+
+    def sort(self, udf):
+
+        def func(iterator):
+            return sorted(iterator, key=udf)
+
+        return DataQuanta(
+            Operator(
+                operator_type="sort",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    # This function allow the union to be performed by Python
+    # Nevertheless, current configuration runs it over Java
+    def union(self, other):
+
+        def func(iterator):
+            return itertools.chain(iterator, other.operator.getIterator())
+
+        return DataQuanta(
+            Operator(
+                operator_type="union",
+                udf=func,
+                previous=[self.operator, other.operator],
+                python_exec=False
+            ),
+            descriptor=self.descriptor
+        )
+
+    def __run(self, consumer):
+        consumer(self.operator.getIterator())
+
+    # Execution Functions
+    def console(self, end="\n"):
+        def consume(iterator):
+            for x in iterator:
+                print(x, end=end)
+
+        self.__run(consume)
+
+    # Only for debugging purposes!
+    # To execute the plan directly in the program driver
+    def execute(self):
+        logging.warn("DEBUG Execution")
+        logging.info("Reminder to swap SINK UDF value from path to func")
+        logging.debug(self.operator.previous[0].operator_type)
+        if self.operator.is_sink():
+            logging.debug(self.operator.operator_type)
+            logging.debug(self.operator.udf)
+            logging.debug(len(self.operator.previous))
+            self.operator.udf(self.operator.previous[0].getIterator())
+        else:
+            logging.error("Plan must call execute from SINK type of operator")
+            raise RuntimeError
+
+    # Converts Python Functional Plan to valid Wayang Plan
+    def to_wayang_plan(self):
+
+        sinks = self.descriptor.get_sinks()
+        if len(sinks) == 0:
+            return
+
+        graph = Graph()
+        graph.populate(self.descriptor.get_sinks())
+
+        # Uncomment to check the Graph built
+        # graph.print_adjlist()
+
+        # Function to be consumed by Traverse
+        # Separates Python Plan into a List of Pipelines
+        def define_pipelines(node1, current_pipeline, collection):
+            def store_unique(pipe_to_insert):
+                for pipe in collection:
+                    if equivalent_lists(pipe, pipe_to_insert):
+                        return
+                collection.append(pipe_to_insert)
+
+            def equivalent_lists(l1, l2):
+                if collections.Counter(l1) == collections.Counter(l2):
+                    return True
+                else:
+                    return False
+
+            if not current_pipeline:
+                current_pipeline = [node1]
+
+            elif node1.operator.is_boundary():
+                store_unique(current_pipeline.copy())
+                current_pipeline.clear()
+                current_pipeline.append(node1)
+
+            else:
+                current_pipeline.append(node1)
+
+            if node1.operator.sink:
+                store_unique(current_pipeline.copy())
+                current_pipeline.clear()
+
+            return current_pipeline
+
+        # Works over the graph
+        trans = Traversal(
+            graph=graph,
+            origin=self.descriptor.get_sources(),
+            # udf=lambda x, y, z: d(x, y, z)
+            # UDF always will receive:
+            # x: a Node object,
+            # y: an object representing the result of the last iteration,
+            # z: a collection to store final results inside your UDF
+            udf=lambda x, y, z: define_pipelines(x, y, z)
+        )
+
+        # Gets the results of the traverse process
+        collected_stages = trans.get_collected_data()
+
+        # Passing the Stages to a Wayang message writer
+        writer = MessageWriter()
+        a = 0
+        # Stage is composed of class Node objects
+        for stage in collected_stages:
+            a += 1
+            logging.info("///")
+            logging.info("stage" + str(a))
+            writer.process_pipeline(stage)
+
+        writer.set_dependencies()
+
+        # Uses a file to provide the plan
+        # writer.write_message(self.descriptor)
+
+        # Send the plan to Wayang REST api directly
+        writer.send_message(self.descriptor)
diff --git a/pywayang/orchestrator/execdirectly.py b/pywayang/orchestrator/execdirectly.py
new file mode 100644
index 0000000..452ccab
--- /dev/null
+++ b/pywayang/orchestrator/execdirectly.py
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+import datetime
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .sort(lambda elem: elem.lower()) \
+            .sink("../test/output.txt", end="")
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort_filter(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .sort(lambda elem: elem.lower()) \
+            .filter(lambda elem: str(elem).startswith("f")) \
+            .sink("../test/output.txt", end="")
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter_text(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .filter(lambda elem: str(elem).startswith("f")) \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/numbers.txt") \
+            .filter(lambda elem: int(elem) % 2 != 0) \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_basic(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/lines.txt") \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_junction(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt") \
+        .filter(lambda elem: str(elem).startswith("I"))
+    dq_source_c = plan.source("../test/lastlines.txt") \
+        .filter(lambda elem: str(elem).startswith("W"))
+
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .union(dq_source_c) \
+        .sort(lambda elem: elem.lower()) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+def plan_java_junction(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt")
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .filter(lambda elem: str(elem).startswith("I")) \
+        .sort(lambda elem: elem.lower()) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+def plan_tpch_q1(descriptor):
+
+    #TODO create reduce by
+    plan = DataQuantaBuilder(descriptor)
+
+    def reducer(obj1, obj2):
+        return obj1[0]
+
+    sink = plan.source("../test/lineitem.txt") \
+        .map(lambda elem: elem.split("|")) \
+        .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime("1998-09-02", '%Y-%m-%d')) \
+        .map(lambda elem:
+           [elem[8], elem[9], elem[4], elem[5],
+            float(elem[5]) * (1 - float(elem[6])),
+            float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])),
+            elem[4], elem[5],
+            elem[6], 1]) \
+        .sink("../test/output.txt", end="")
+        # .group_by(lambda elem: elem) \
+        # .reduce_by(reducer) \
+        # .flatmap(lambda elem: elem.split("|"))
+        # .map(lambda elem: (elem, elem.split("|"))) \
+        # L_RETURNFLAG 8
+        # L_LINESTATUS 9
+        # L_QUANTITY 4
+        # L_EXTENDEDPRICE 5
+        # discount 6
+        # tax 7
+
+    return dq_source_b
+
+
+def plan_full_java(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt")
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+if __name__ == '__main__':
+
+    # Plan will contain general info about the Wayang Plan created here
+    descriptor = Descriptor()
+
+    plan_dataquanta_sink = plan_tpch_q1(descriptor)
+    plan_dataquanta_sink.execute()
diff --git a/pywayang/orchestrator/main.py b/pywayang/orchestrator/main.py
new file mode 100644
index 0000000..b634eeb
--- /dev/null
+++ b/pywayang/orchestrator/main.py
@@ -0,0 +1,173 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+import datetime
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .sort(lambda elem: elem.lower()) \
+            .sink("../test/output.txt", end="")
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort_filter(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .sort(lambda elem: elem.lower()) \
+            .filter(lambda elem: str(elem).startswith("f")) \
+            .sink("../test/output.txt", end="")
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter_text(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .filter(lambda elem: str(elem).startswith("f")) \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/numbers.txt") \
+            .filter(lambda elem: int(elem) % 2 != 0) \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_basic(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/lines.txt") \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_junction(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt") \
+        .filter(lambda elem: str(elem).startswith("I"))
+    dq_source_c = plan.source("../test/lastlines.txt") \
+        .filter(lambda elem: str(elem).startswith("W"))
+
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .union(dq_source_c) \
+        .sort(lambda elem: elem.lower()) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+def plan_java_junction(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt")
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .filter(lambda elem: str(elem).startswith("I")) \
+        .sort(lambda elem: elem.lower()) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+def plan_tpch_q1(descriptor):
+
+    # TODO create reduce by
+    plan = DataQuantaBuilder(descriptor)
+
+    def reducer(obj1, obj2):
+        return obj1[0], obj1[1], obj1[2] + obj2[2], obj1[3] + obj2[3], obj1[4] + obj2[4], obj1[5] + obj2[5], \
+               obj1[6] + obj2[6], obj1[7] + obj2[7], obj1[8] + obj2[8], obj1[9] + obj2[9]
+
+    sink = plan.source("../test/lineitem.txt") \
+        .map(lambda elem: elem.split("|")) \
+        .sink("../test/output.txt", end="")
+    """
+        .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime('1998-09-02', '%Y-%m-%d')) \
+        .map(lambda elem:
+             [elem[8], elem[9], elem[4], elem[5],
+              float(elem[5]) * (1 - float(elem[6])),
+              float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])),
+              elem[4], elem[5],
+              elem[6], 1]) \
+        .sink("../test/output.txt", end="")"""
+        # .reduce_by_key([0, 1], reducer) \
+
+
+    return sink
+
+
+def plan_full_java(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt")
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+def plan_wordcount(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+    sink_wordcount = plan.source("../test/lineitem.txt") \
+        .filter(lambda elem: len(str(elem).split("|")[0]) < 4) \
+        .flatmap(lambda elem: str(elem).split("|")) \
+        .sink("../test/output.txt", end="")
+
+    return sink_wordcount
+
+
+if __name__ == '__main__':
+
+    # Plan will contain general info about the Wayang Plan created here
+    descriptor = Descriptor()
+    descriptor.add_plugin(Descriptor.Plugin.spark)
+    descriptor.add_plugin(Descriptor.Plugin.java)
+
+    plan_dataquanta_sink = plan_wordcount(descriptor)
+    # plan_dataquanta_sink.execute()
+    # plan_dataquanta_sink.console()
+
+    plan_dataquanta_sink.to_wayang_plan()
diff --git a/pywayang/orchestrator/operator.py b/pywayang/orchestrator/operator.py
new file mode 100644
index 0000000..ecaa6bd
--- /dev/null
+++ b/pywayang/orchestrator/operator.py
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import cloudpickle
+from config.config_reader import get_source_types
+from config.config_reader import get_sink_types
+from config.config_reader import get_boundary_types
+import logging
+
+pickle_protocol = pickle.HIGHEST_PROTOCOL
+
+
+# Describes an Operation over an intermediate result
+# Each operation could be processed by Python or Java platforms
+class Operator:
+
+    def __init__(
+            self, operator_type=None, udf=None, previous=None,
+            iterator=None, python_exec=False
+    ):
+
+        # Operator ID
+        self.id = id(self)
+
+        # Operator Type
+        self.operator_type = operator_type
+
+        # Set Boundaries
+        if self.operator_type in get_boundary_types():
+            self.boundary = True
+        else:
+            self.boundary = False
+
+        # UDF Function
+        self.udf = udf
+
+        # Source types must come with an Iterator
+        self.iterator = iterator
+        if operator_type in get_source_types():
+            if iterator is None:
+                print("Source Operator Type without an Iterator")
+                raise
+            else:
+                self.source = True
+        else:
+            self.source = False
+
+        # Sink Operators
+        if operator_type in get_sink_types():
+            self.sink = True
+        else:
+            self.sink = False
+
+        # TODO Why managing previous and predecessors per separate?
+        self.previous = previous
+
+        self.successor = []
+        self.predecessor = []
+
+        self.parameters = {}
+
+        # Set predecessors and successors from previous
+        if self.previous:
+            for prev in self.previous:
+                if prev is not None:
+                    prev.set_successor(self)
+                    self.set_predecessor(prev)
+
+        self.python_exec = python_exec
+
+        logging.info("Operator:" + str(self.getID()) + ", type:" + self.operator_type + ", PythonExecutable: " +
+                     str(self.python_exec) +
+                     ", is boundary: " + str(self.is_boundary()) + ", is source: " +
+                     str(self.source) + ", is sink: " + str(self.sink))
+
+    def getID(self):
+        return self.id
+
+    def is_source(self):
+        return self.source
+
+    def is_sink(self):
+        return self.sink
+
+    def is_boundary(self):
+        return self.boundary
+
+    def serialize_udf(self):
+        self.udf = cloudpickle.dumps(self.udf)
+
+    def getIterator(self):
+        if self.is_source():
+            return self.iterator
+        # TODO this should iterate through previous REDESIGN
+        return self.udf(self.previous[0].getIterator())
+
+    def set_parameter(self, key, value):
+        self.parameters[key] = value
+
+    def set_successor(self, suc):
+        if (not self.is_sink()) and self.successor.count(suc) == 0:
+            self.successor.append(suc)
+
+    def set_predecessor(self, suc):
+        if self.predecessor.count(suc) == 0:
+            self.predecessor.append(suc)
diff --git a/pywayang/orchestrator/plan.py b/pywayang/orchestrator/plan.py
new file mode 100644
index 0000000..25610cc
--- /dev/null
+++ b/pywayang/orchestrator/plan.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+from enum import Enum
+
+class Descriptor:
+
+    def __init__(self):
+        self.sinks = []
+        self.sources = []
+        self.boundary_operators = None
+        logging.basicConfig(filename='../config/execution.log', level=logging.DEBUG)
+        self.plugins = []
+
+    class Plugin(Enum):
+        java = 0
+        spark = 1
+
+    def get_boundary_operators(self):
+        return self.boundary_operators
+
+    def add_source(self, operator):
+        self.sources.append(operator)
+
+    def get_sources(self):
+        return self.sources
+
+    def add_sink(self, operator):
+        self.sinks.append(operator)
+
+    def get_sinks(self):
+        return self.sinks
+
+    def add_plugin(self, plugin):
+        self.plugins.append(plugin)
+
+    def get_plugins(self):
+        return self.plugins
diff --git a/pywayang/protobuf/__init__.py b/pywayang/protobuf/__init__.py
new file mode 100644
index 0000000..15a80ad
--- /dev/null
+++ b/pywayang/protobuf/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2
diff --git a/pywayang/protobuf/old_planwriter.py b/pywayang/protobuf/old_planwriter.py
new file mode 100644
index 0000000..e8700f0
--- /dev/null
+++ b/pywayang/protobuf/old_planwriter.py
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2 as pwb
+import os
+import pickle
+import struct
+import base64
+
+
+class OldMessageWriter:
+
+    def __init__(self, descriptor):
+
+        sink = descriptor.get_sinks()[0]
+        source = descriptor.get_sources()[0]
+
+        op = source
+        visited = []
+        middle_operators = []
+        while op.sink is not True and len(op.successor) > 0:
+            pre = op.successor[0]
+            if pre not in visited and pre.sink is not True:
+                pre.serialize_udf()
+                middle_operators.append(pre)
+                """base64_bytes = base64.b64encode(pre.udf)
+                pre.udf = base64_bytes"""
+
+                """pre.serialize_udf()
+                print("pre.udf")
+                print(pre.udf)
+                func = pickle.loads(pre.udf)
+                print("func")
+                print(func)
+                middle_operators.append(pre)
+
+                # Testing
+                msg = pre.udf
+                base64_bytes = base64.b64encode(msg)
+                base64_message = base64.b64decode(base64_bytes)
+                func2 = pickle.loads(base64_message)
+                print(base64_message)
+                func3 = pickle.loads(b'\x80\x04\x955\x04\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\x02\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x03K\x13C\nt\x00\x88\x00|\x00\x83\x02S\x00\x94N\x85\x94\x8c\x06filter\x94\x85\x94\x8c\x08iterator\x94\x85\x94\x8cS/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/dataquanta.py\x94\x8c\x04func\x94K%C\x02\x00\x01\x94\x8c\x03udf\x94\x85\x94)t\x94R\x94}\x94(\x8c\x0b__package__\x94\x8c\x0corchestrator\x94\x8c\x08__name__\x94\x8c\x17orchestrator.dataquanta\x94\x8c\x08__file__\x94\x8cS/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/dataquanta.py\x94uNNh\x00\x8c\x10_make_empty_cell\x94\x93\x94)R\x94\x85\x94t\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h"}\x94}\x94(h\x19h\x10\x8c\x0c__qualname__\x94\x8c\x1fDataQuanta.filter.<locals>.func\x94\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x1a\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94h\x00\x8c\n_make_cell\x94\x93\x94h\x05(h\x08(K\x01K\x00K\x01K\x02KSC\x10t\x00|\x00\x83\x01d\x01\x16\x00d\x02k\x03S\x00\x94NK\x02K\x00\x87\x94\x8c\x03int\x94\x85\x94\x8c\x04elem\x94\x85\x94\x8cM/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/main.py\x94\x8c\x08<lambda>\x94K\x18C\x00\x94))t\x94R\x94}\x94(h\x17Nh\x19\x8c\x08__main__\x94h\x1b\x8cM/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/main.py\x94uNNNt\x94R\x94h%hB}\x94}\x94(h\x19h:h(\x8c\x1dplan_filter.<locals>.<lambda>\x94h*}\x94h,Nh-Nh.h?h/Nh0N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0\x85\x94R\x94\x85\x94hG]\x94hI}\x94u\x86\x94\x86R0.')
+                for i in func3([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]):
+                    print(i)"""
+            op = pre
+
+        """for mid in middle_operators:
+            print(mid.operator_type)
+            print(pickle.loads(mid.udf))
+            func = pickle.loads(mid.udf)
+            for i in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]):
+                print(i)"""
+
+        finalpath = "/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/filter_message"
+        planconf = pwb.WayangPlan()
+        try:
+            f = open(finalpath, "rb")
+            planconf.ParseFromString(f.read())
+            f.close()
+        except IOError:
+            print(finalpath + ": Could not open file.  Creating a new one.")
+
+        so = pwb.Source()
+        so.id = source.id
+        so.type = source.operator_type
+        so.path = os.path.abspath(source.udf)
+
+        operators = []
+        for mid in middle_operators:
+            op = pwb.Operator()
+            op.id = mid.id
+            op.type = mid.operator_type
+            op.udf = mid.udf
+            operators.append(op)
+
+        si = pwb.Sink()
+        si.id = sink.id
+        si.type = sink.operator_type
+        si.path = os.path.abspath(sink.udf)
+
+        plan = pwb.Plan()
+        plan.source.CopyFrom(so)
+        plan.sink.CopyFrom(si)
+        plan.operators.extend(operators)
+        plan.input = pwb.Plan.string
+        plan.output = pwb.Plan.string
+
+        ctx = pwb.Context()
+        ctx.platforms.extend([pwb.Context.Platform.java])
+
+        planconf.plan.CopyFrom(plan)
+        planconf.context.CopyFrom(ctx)
+
+        f = open(finalpath, "wb")
+        f.write(planconf.SerializeToString())
+        f.close()
+        pass
+
+class func_inteface:
+
+    def __init__(self, node, nested_udf):
+        self.node = node
+        self.nested_udf = nested_udf
+
+    def func(self, iterable):
+        return self.node.operator.udf(self.nested_udf(iterable))
+
+
+class MessageWriter:
+    sources = []
+    operators = []
+    sinks = []
+
+    def add_source(self, operator_id, operator_type, path, predecessors, successors):
+        source = pwb.OperatorProto()
+        source.id = operator_id
+        source.type = operator_type
+        source.path = os.path.abspath(path)
+        source.udf = None
+        source.predecessors = predecessors
+        source.successors = successors
+        self.sources.append(source)
+
+    def add_sink(self, operator_id, operator_type, path, predecessors, successors):
+        sink = pwb.OperatorProto()
+        sink.id = operator_id
+        sink.type = operator_type
+        sink.path = os.path.abspath(path)
+        sink.udf = None
+        sink.predecessors = predecessors
+        sink.successors = successors
+        self.sinks.append(sink)
+
+    def add_operator(self, operator_id, operator_type, udf, path, predecessors, successors):
+        op = pwb.OperatorProto()
+        op.id = operator_id
+        op.type = operator_type
+        op.udf = udf
+        op.path = path
+        op.predecessors = predecessors
+        op.successors = successors
+        self.operators.append(op)
+
+    def process_pipeline(self, stage):
+
+        nested_udf = None
+        nested_id = ""
+        for node in reversed(stage):
+            print("########")
+            print(node.operator_type, "executable:", node.python_exec, "id:", node.id)
+
+            if nested_udf is not None:
+                print("review pre")
+                print( nested_udf)
+                print( nested_udf(["Wilo","lifo","Wifo"]))
+
+            if not node.python_exec:
+                if nested_udf is not None:
+                    """self.add_operator(nested_id, "map_partition", nested_udf, None
+                                      # obtain predecessors and successors
+                                      , successors=[node.id]
+                                      )"""
+                    print("node", nested_id)
+                    print(nested_udf)
+                    print("he muerto")
+                    print( nested_udf(["Wilo","lifo","Wifo"]))
+
+                    t = nested_udf(["Wilo","lifo","Wifo"])
+                    print("jajajarvard")
+                    print(t)
+                    for i in t:
+                        print(i)
+                    nested_udf = None
+                    nested_id = ""
+
+                """if node.operator.source:
+                    self.add_source(
+                        node.id, node.operator_type, node.operator.udf,
+                        node.predecessors, node.operator.successor)
+                else:
+                    self.add_operator(
+                        node.id, node.operator_type, None, node.operator.udf,
+                        node.predecessors, node.operator.successor)"""
+            else:
+                print("adding", node.id)
+                if nested_udf is None:
+                    nested_udf = node.operator.udf
+                    nested_id = node.id
+                else:
+                    print("paseeeeeee viste")
+                    tmp = nested_udf
+
+                    print( tmp(["Wilo","lifo","Wifo"]))
+
+                    #def func(_, iterable):
+                    #    return nested_udf(node.operator.udf(iterable))
+                    nested_udf = self.concatenate(nested_udf, node.operator.udf)
+                    print( nested_udf(["Wilo","lifo","Wifo"]))
+                    print(nested_udf)
+
+                    # nested_udf = func_inteface(node, nested_udf)
+                    nested_id = str(node.id) + "," + str(nested_id)
+
+            if nested_udf is not None:
+                print("review")
+                print( nested_udf)
+                print( nested_udf(["Wilo","lifo","Wifo"]))
+
+        if nested_udf is not None:
+            """self.add_operator(nested_id, "map_partition", nested_udf, None
+                              # obtain predecessors and successors
+                              , successors=[node.id]
+                              )"""
+            print("node", nested_id)
+            print(nested_udf)
+            t = nested_udf(["Wilo","lifo","Wifo"])
+            print("jajajarvard2")
+            print(t)
+            for i in t:
+                print(i)
+            nested_udf = None
+            nested_id = ""
+
+    def __init__(self):
+        print("lala")
+
+    def concatenate(self, function_a, function_b):
+        def executable(iterable):
+            return function_a(function_b(iterable))
+        return executable
+
+    def old(self, descriptor):
+
+        sink = descriptor.get_sinks()[0]
+        source = descriptor.get_sources()[0]
+
+        op = source
+        visited = []
+        middle_operators = []
+        while op.sink is not True and len(op.successor) > 0:
+            pre = op.successor[0]
+            if pre not in visited and pre.sink is not True:
+                pre.serialize_udf()
+                middle_operators.append(pre)
+            op = pre
+
+        finalpath = "/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/filter_message"
+        planconf = pwb.WayangPlan()
+        try:
+            f = open(finalpath, "rb")
+            planconf.ParseFromString(f.read())
+            f.close()
+        except IOError:
+            print(finalpath + ": Could not open file.  Creating a new one.")
+
+        so = pwb.Source()
+        so.id = source.id
+        so.type = source.operator_type
+        so.path = os.path.abspath(source.udf)
+
+        operators = []
+        for mid in middle_operators:
+            op = pwb.Operator()
+            op.id = mid.id
+            op.type = mid.operator_type
+            op.udf = mid.udf
+            operators.append(op)
+
+        si = pwb.Sink()
+        si.id = sink.id
+        si.type = sink.operator_type
+        si.path = os.path.abspath(sink.udf)
+
+        plan = pwb.Plan()
+        plan.source.CopyFrom(so)
+        plan.sink.CopyFrom(si)
+        plan.operators.extend(operators)
+        plan.input = pwb.Plan.string
+        plan.output = pwb.Plan.string
+
+        ctx = pwb.Context()
+        ctx.platforms.extend([pwb.Context.Platform.java])
+
+        planconf.plan.CopyFrom(plan)
+        planconf.context.CopyFrom(ctx)
+
+        f = open(finalpath, "wb")
+        f.write(planconf.SerializeToString())
+        f.close()
+        pass
+
+    def pipeline_singleton(self):
+        print("lala")
diff --git a/pywayang/protobuf/planwriter.py b/pywayang/protobuf/planwriter.py
new file mode 100644
index 0000000..b63dcbb
--- /dev/null
+++ b/pywayang/protobuf/planwriter.py
@@ -0,0 +1,277 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2 as pwb
+import os
+import cloudpickle
+import logging
+import pathlib
+import requests
+import base64
+
+
+# Writes Wayang Plan from several stages
+class MessageWriter:
+    sources = []
+    operators = []
+    sinks = []
+    operator_references = {}
+    boundaries = {}
+
+    # Creates and appends Source type of operator
+    def add_source(self, operator_id, operator_type, path):
+        source = pwb.OperatorProto()
+        source.id = str(operator_id)
+        source.type = operator_type
+        source.path = os.path.abspath(path)
+        source.udf = chr(0).encode('utf-8')
+        # source.parameters = {}
+        self.sources.append(source)
+        return source
+
+    # Creates and appends Sink type of operator
+    def add_sink(self, operator_id, operator_type, path):
+        sink = pwb.OperatorProto()
+        sink.id = str(operator_id)
+        sink.type = operator_type
+        sink.path = os.path.abspath(path)
+        sink.udf = chr(0).encode('utf-8')
+        # sink.parameters = {}
+        self.sinks.append(sink)
+        return sink
+
+    # Creates and appends a Python operator
+    # Python OP don't require parameters, UDF has the function ready to be executed directly
+    def add_operator(self, operator_id, operator_type, udf):
+        op = pwb.OperatorProto()
+        op.id = str(operator_id)
+        op.type = operator_type
+        op.udf = cloudpickle.dumps(udf)
+        op.path = str(None)
+        # op.parameters = {}
+        self.operators.append(op)
+        return op
+
+    # Creates and appends a Java operator
+    def add_java_operator(self, operator_id, operator_type, udf, parameters):
+        op = pwb.OperatorProto()
+        op.id = str(operator_id)
+        op.type = operator_type
+        op.udf = cloudpickle.dumps(udf)
+        op.path = str(None)
+        #op.parameters = parameters
+        for param in parameters:
+            print(param, parameters[param])
+            op.parameters[param] = str(parameters[param])
+            # op.parameters[]
+        #m.mapfield[5] = 10
+        self.operators.append(op)
+        return op
+
+    # Receive a chain of operators, separate them in Wayang Operators
+    # Compacts several Python executable operators in one Map Partition Wayang Operator
+    def process_pipeline(self, stage):
+
+        nested_udf = None
+        nested_id = ""
+        nested_predecessors = None
+        nested_successors = None
+        for node in reversed(stage):
+            logging.debug(node.operator_type + " executable: " + str(node.python_exec) + " id: " + str(node.id))
+
+            if not node.python_exec:
+                if nested_udf is not None:
+
+                    # Predecessors depends on last operator
+                    # Successors depends on first operator
+                    op = self.add_operator(nested_id, "map_partition", nested_udf)
+
+                    ids = str(nested_id).split(",")
+                    for id in ids:
+                        self.operator_references[str(id)] = op
+
+                    self.boundaries[str(nested_id)] = {}
+                    self.boundaries[str(nested_id)]["end"] = nested_successors
+                    self.boundaries[str(nested_id)]["start"] = nested_predecessors
+
+                    nested_udf = None
+                    nested_id = ""
+                    nested_predecessors = None
+                    nested_successors = None
+
+                if node.operator.source:
+                    op = self.add_source(node.id, node.operator_type, node.operator.udf)
+                    self.operator_references[str(node.id)] = op
+                    self.boundaries[str(node.id)] = {}
+                    self.boundaries[str(node.id)]["end"] = node.successors.keys()
+
+                elif node.operator.sink:
+                    op = self.add_sink(node.id, node.operator_type, node.operator.udf)
+                    self.operator_references[str(node.id)] = op
+                    self.boundaries[str(node.id)] = {}
+                    self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
+
+                # Regular operator to be processed in Java
+                # Notice that those could include more parameters for Java
+                else:
+                    op = self.add_java_operator(node.id, node.operator_type, node.operator.udf, node.operator.parameters)
+                    self.operator_references[str(node.id)] = op
+                    self.boundaries[str(node.id)] = {}
+                    self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
+                    self.boundaries[str(node.id)]["end"] = node.successors.keys()
+
+            else:
+
+                if nested_udf is None:
+                    nested_udf = node.operator.udf
+                    nested_id = node.id
+                    # It is the last operator to execute in the map partition
+                    nested_successors = node.successors.keys()
+
+                else:
+                    nested_udf = self.concatenate(nested_udf, node.operator.udf)
+                    nested_id = str(node.id) + "," + str(nested_id)
+
+                # Every iteration assign the first known predecessors
+                nested_predecessors = node.predecessors.keys()
+
+        # Just in case in the future some pipelines start with Python operators
+        if nested_udf is not None:
+            self.add_operator(nested_id, "map_partition", nested_udf)
+
+            ids = nested_id.split(",")
+            for id in ids:
+                self.operator_references[id] = op
+
+            self.boundaries[nested_id] = {}
+            self.boundaries[nested_id]["end"] = nested_successors
+            self.boundaries[nested_id]["start"] = nested_predecessors
+
+    def __init__(self):
+        pass
+
+    # Takes 2 Functions and compact them in only one function
+    @staticmethod
+    def concatenate(function_a, function_b):
+        def executable(iterable):
+            return function_a(function_b(iterable))
+
+        return executable
+
+    # Set dependencies over final Wayang Operators
+    def set_dependencies(self):
+
+        for source in self.sources:
+
+            if 'end' in self.boundaries[source.id]:
+                op_successors = []
+                for op_id in self.boundaries[source.id]['end']:
+                    op_successors.append(str(self.operator_references[str(op_id)].id))
+                source.successors.extend(op_successors)
+
+        for sink in self.sinks:
+            if 'start' in self.boundaries[sink.id]:
+                op_predecessors = []
+                for op_id in self.boundaries[sink.id]['start']:
+                    op_predecessors.append(str(self.operator_references[str(op_id)].id))
+                sink.predecessors.extend(op_predecessors)
+
+        for op in self.operators:
+            if 'start' in self.boundaries[op.id]:
+                op_predecessors = []
+                for op_id in self.boundaries[op.id]['start']:
+                    op_predecessors.append(str(self.operator_references[str(op_id)].id))
+                op.predecessors.extend(op_predecessors)
+
+            if 'end' in self.boundaries[op.id]:
+                op_successors = []
+                for op_id in self.boundaries[op.id]['end']:
+                    op_successors.append(str(self.operator_references[str(op_id)].id))
+                op.successors.extend(op_successors)
+
+    # Writes the message to a local directory
+    def write_message(self, descriptor):
+
+        finalpath = "../../protobuf/wayang_message"
+        plan_configuration = pwb.WayangPlanProto()
+
+        try:
+            f = open(finalpath, "rb")
+            plan_configuration.ParseFromString(f.read())
+            f.close()
+        except IOError:
+            logging.warn("File " + finalpath + " did not exist. System generated a new file")
+
+        plan = pwb.PlanProto()
+        plan.sources.extend(self.sources)
+        plan.operators.extend(self.operators)
+        plan.sinks.extend(self.sinks)
+        plan.input = pwb.PlanProto.string
+        plan.output = pwb.PlanProto.string
+
+        ctx = pwb.ContextProto()
+        # ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
+        for plug in descriptor.plugins:
+            ctx.platforms.append(plug.value)
+        # ctx.platforms.extend(descriptor.get_plugins())
+
+        plan_configuration.plan.CopyFrom(plan)
+        plan_configuration.context.CopyFrom(ctx)
+
+        f = open(finalpath, "wb")
+        f.write(plan_configuration.SerializeToString())
+        f.close()
+        pass
+
+    # Send message as bytes to the Wayang Rest API
+    def send_message(self, descriptor):
+
+        plan_configuration = pwb.WayangPlanProto()
+
+        plan = pwb.PlanProto()
+        plan.sources.extend(self.sources)
+        plan.operators.extend(self.operators)
+        plan.sinks.extend(self.sinks)
+        plan.input = pwb.PlanProto.string
+        plan.output = pwb.PlanProto.string
+
+        ctx = pwb.ContextProto()
+        # ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
+        for plug in descriptor.plugins:
+            ctx.platforms.append(plug.value)
+        # ctx.platforms.extend(descriptor.get_plugins())
+
+        plan_configuration.plan.CopyFrom(plan)
+        plan_configuration.context.CopyFrom(ctx)
+
+        print("plan!")
+        print(plan_configuration)
+
+        msg_bytes = plan_configuration.SerializeToString()
+        msg_64 = base64.b64encode(msg_bytes)
+
+        logging.debug(msg_bytes)
+        # response = requests.get("http://localhost:8080/plan/create/fromfile")
+        data = {
+            'message': msg_64
+        }
+        response = requests.post("http://localhost:8080/plan/create", data)
+        logging.debug(response)
+        # f = open(finalpath, "wb")
+        # f.write(plan_configuration.SerializeToString())
+        # f.close()
+        pass
diff --git a/pywayang/test/demo_testing.py b/pywayang/test/demo_testing.py
new file mode 100644
index 0000000..c096a89
--- /dev/null
+++ b/pywayang/test/demo_testing.py
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+
+class MyTestCase(unittest.TestCase):
+
+    def test_something(self):
+        self.assertEqual(True, False)
+
+    def test_upper(self):
+        self.assertEqual('foo'.upper(), 'FOO')
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/pywayang/test/full_java_test.py b/pywayang/test/full_java_test.py
new file mode 100644
index 0000000..d17aedd
--- /dev/null
+++ b/pywayang/test/full_java_test.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+
+
+class MyTestCase(unittest.TestCase):
+
+    def test_most_basic(self):
+        descriptor = Descriptor()
+        descriptor.add_plugin(Descriptor.Plugin.java)
+
+        plan = DataQuantaBuilder(descriptor)
+        sink_dataquanta = \
+            plan.source("../test/lines.txt") \
+                .sink("../test/output.txt", end="")
+
+        sink_dataquanta.to_wayang_plan()
+
+
+    def test_single_juncture(self):
+        descriptor = Descriptor()
+        descriptor.add_plugin(Descriptor.Plugin.java)
+
+        plan = DataQuantaBuilder(descriptor)
+        dq_source_a = plan.source("../test/lines.txt")
+        dq_source_b = plan.source("../test/morelines.txt")
+        sink_dataquanta = dq_source_a.union(dq_source_b) \
+            .sink("../test/output.txt", end="")
+
+        sink_dataquanta.to_wayang_plan()
+
+
+    def test_multiple_juncture(self):
+        descriptor = Descriptor()
+        descriptor.add_plugin(Descriptor.Plugin.java)
+
+        plan = DataQuantaBuilder(descriptor)
+        dq_source_a = plan.source("../test/lines.txt")
+        dq_source_b = plan.source("../test/morelines.txt") \
+            .filter(lambda elem: str(elem).startswith("I"))
+        dq_source_c = plan.source("../test/lastlines.txt") \
+            .filter(lambda elem: str(elem).startswith("W"))
+
+        sink_dataquanta = dq_source_a.union(dq_source_b) \
+            .union(dq_source_c) \
+            .sort(lambda elem: elem.lower()) \
+            .sink("../test/output.txt", end="")
+
+        sink_dataquanta.to_wayang_plan()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/pywayang/test/full_spark_test.py b/pywayang/test/full_spark_test.py
new file mode 100644
index 0000000..9276ccc
--- /dev/null
+++ b/pywayang/test/full_spark_test.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+
+
+def test_most_basic(self):
+    descriptor = Descriptor()
+    descriptor.add_plugin(Descriptor.Plugin.spark)
+
+    plan = DataQuantaBuilder(descriptor)
+    sink_dataquanta = \
+        plan.source("../test/lines.txt") \
+            .sink("../test/output.txt", end="")
+
+    sink_dataquanta.to_wayang_plan()
+
+
+def test_single_juncture(self):
+    descriptor = Descriptor()
+    descriptor.add_plugin(Descriptor.Plugin.spark)
+
+    plan = DataQuantaBuilder(descriptor)
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt")
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .sink("../test/output.txt", end="")
+
+    sink_dataquanta.to_wayang_plan()
+
+
+def test_multiple_juncture(self):
+    descriptor = Descriptor()
+    descriptor.add_plugin(Descriptor.Plugin.spark)
+
+    plan = DataQuantaBuilder(descriptor)
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt") \
+        .filter(lambda elem: str(elem).startswith("I"))
+    dq_source_c = plan.source("../test/lastlines.txt") \
+        .filter(lambda elem: str(elem).startswith("W"))
+
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .union(dq_source_c) \
+        .sort(lambda elem: elem.lower()) \
+        .sink("../test/output.txt", end="")
+
+    sink_dataquanta.to_wayang_plan()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/wayang-api/pom.xml b/wayang-api/pom.xml
index e31751b..c868801 100644
--- a/wayang-api/pom.xml
+++ b/wayang-api/pom.xml
@@ -38,6 +38,7 @@
     <modules>
         <module>wayang-api-scala-java</module>
         <module>wayang-api-python</module>
+        <module>wayang-api-rest</module>
     </modules>
 
 </project>
diff --git a/wayang-api/wayang-api-python/pom.xml b/wayang-api/wayang-api-python/pom.xml
index 5a3ad44..647f07e 100644
--- a/wayang-api/wayang-api-python/pom.xml
+++ b/wayang-api/wayang-api-python/pom.xml
@@ -30,9 +30,54 @@
     <version>0.6.1-SNAPSHOT</version>
 
     <name>Wayang API Python</name>
-    <description>Wayang implementation of an API of Python to be enable to work with code writed in python</description>
+    <description>Wayang implementation of an API of Python to be enable to work with code wrote in python</description>
 
     <properties>
         <java-module-name>org.apache.wayang.api</java-module-name>
+        <graphchi.version>0.2.2</graphchi.version>
+        <spark.version>2.4.0</spark.version>
+        <flink.version>1.7.1</flink.version>
+        <scala.mayor.version>2.11</scala.mayor.version>
+        <giraph.version>1.2.0-hadoop2</giraph.version>
     </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.wayang</groupId>
+                <artifactId>wayang-commons</artifactId>
+                <version>0.6.1-SNAPSHOT</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-core</artifactId>
+            <version>0.6.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-java</artifactId>
+            <version>0.6.1-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.7.0</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
 </project>
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java
new file mode 100644
index 0000000..b463b36
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.function.PythonUdf;
+import org.apache.wayang.core.api.exception.WayangException;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+
+public class ProcessFeeder<Input, Output> {
+
+    private Socket socket;
+    private PythonUdf<Input, Output> udf;
+    private ByteString serializedUDF;
+    private Iterable<Input> input;
+
+    //TODO add to a config file
+    int END_OF_DATA_SECTION = -1;
+    int NULL = -5;
+
+    public ProcessFeeder(
+            Socket socket,
+            PythonUdf<Input, Output> udf,
+            ByteString serializedUDF,
+            Iterable<Input> input){
+
+        if(input == null) throw new WayangException("Nothing to process with Python API");
+
+        this.socket = socket;
+        this.udf = udf;
+        this.serializedUDF = serializedUDF;
+        this.input = input;
+
+    }
+
+    public void send(){
+
+        try{
+            //TODO use config buffer size
+            int BUFFER_SIZE = 8192;
+
+            BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
+            DataOutputStream dataOut = new DataOutputStream(stream);
+
+            writeUDF(serializedUDF, dataOut);
+            this.writeIteratorToStream(input.iterator(), dataOut);
+            dataOut.writeInt(END_OF_DATA_SECTION);
+            dataOut.flush();
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeUDF(ByteString serializedUDF, DataOutputStream dataOut){
+
+        //write(serializedUDF.toByteArray(), dataOut);
+        writeBytes(serializedUDF.toByteArray(), dataOut);
+        System.out.println("UDF written");
+
+    }
+
+    public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut){
+
+        System.out.println("iterator being send");
+        for (Iterator<Input> it = iter; it.hasNext(); ) {
+            Input elem = it.next();
+            //System.out.println(elem.toString());
+            write(elem, dataOut);
+        }
+    }
+
+    /*TODO Missing case PortableDataStream */
+    public void write(Object obj, DataOutputStream dataOut){
+        try {
+
+            if(obj == null)
+                dataOut.writeInt(this.NULL);
+
+            /**
+             * Byte Array cases
+             */
+            else if (obj instanceof Byte[] || obj instanceof byte[]) {
+                System.out.println("Writing Bytes");
+                writeBytes(obj, dataOut);
+            }
+            /**
+             * String case
+             * */
+            else if (obj instanceof String)
+                writeUTF((String) obj, dataOut);
+
+            /**
+             * Key, Value case
+             * */
+            else if (obj instanceof Map.Entry)
+                writeKeyValue((Map.Entry) obj, dataOut);
+
+            else{
+                throw new WayangException("Unexpected element type " + obj.getClass());
+            }
+
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeBytes(Object obj, DataOutputStream dataOut){
+
+        try{
+
+            if (obj instanceof Byte[]) {
+
+                int length = ((Byte[]) obj).length;
+
+                byte[] bytes = new byte[length];
+                int j=0;
+
+                // Unboxing Byte values. (Byte[] to byte[])
+                for(Byte b: ((Byte[]) obj))
+                    bytes[j++] = b.byteValue();
+
+                dataOut.writeInt(length);
+                dataOut.write(bytes);
+
+            } else if (obj instanceof byte[]) {
+
+                dataOut.writeInt(((byte[]) obj).length);
+                dataOut.write(((byte[]) obj));
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeUTF(String str, DataOutputStream dataOut){
+
+        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+
+        try {
+
+            dataOut.writeInt(bytes.length);
+            dataOut.write(bytes);
+        } catch (SocketException e){
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){
+
+        write(obj.getKey(), dataOut);
+        write(obj.getValue(), dataOut);
+    }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java
new file mode 100644
index 0000000..895245a
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+/*TODO cannot be always string, include definition for every operator
+*  like: map(udf, inputtype, outputtype)*/
+public class ProcessReceiver<Output> {
+
+    private ReaderIterator<Output> iterator;
+
+    public ProcessReceiver(Socket socket){
+        try{
+            //TODO use config buffer size
+            int BUFFER_SIZE = 8192;
+
+            DataInputStream stream = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
+            this.iterator = new ReaderIterator<>(stream);
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public Iterable<Output> getIterable(){
+        return () -> iterator;
+    }
+
+    public void print(){
+        iterator.forEachRemaining(x -> System.out.println(x.toString()));
+
+    }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java
new file mode 100644
index 0000000..0d0e0b9
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.exception.WayangException;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PythonProcessCaller {
+
+    private Process process;
+    private Socket socket;
+    private ServerSocket serverSocket;
+    private boolean ready;
+
+    //TODO How to get the config
+    private Configuration configuration;
+
+    public PythonProcessCaller(ByteString serializedUDF){
+
+        //TODO create documentation to how to the configuration in the code
+        this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");
+        this.ready = false;
+        byte[] addr = new byte[4];
+        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;
+
+        try {
+            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
+            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
+            ProcessBuilder pb = new ProcessBuilder(
+                    Arrays.asList(
+                            "python3",
+                            this.configuration.getStringProperty("wayang.api.python.worker")
+                    )
+            );
+            Map<String, String> workerEnv = pb.environment();
+            workerEnv.put("PYTHON_WORKER_FACTORY_PORT", String.valueOf(this.serverSocket.getLocalPort()));
+
+            // TODO See what is happening with ENV Python version
+            workerEnv.put("PYTHONPATH", "/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/:/Users/rodrigopardomeza/opt/anaconda3/");
+
+            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
+            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+            this.process = pb.start();
+
+
+            // Redirect worker stdout and stderr
+            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
+
+            // Wait for it to connect to our socket
+            this.serverSocket.setSoTimeout(10000);
+
+            try {
+                this.socket = this.serverSocket.accept();
+                this.serverSocket.setSoTimeout(0);
+
+                if(socket.isConnected())
+                    this.ready = true;
+
+            } catch (Exception e) {
+                System.out.println(e);
+                throw new WayangException("Python worker failed to connect back.", e);
+            }
+        } catch (Exception e){
+            System.out.println(e);
+            throw new WayangException("Python worker failed");
+        }
+    }
+
+    public Process getProcess() {
+        return process;
+    }
+
+    public Socket getSocket() {
+        return socket;
+    }
+
+    public boolean isReady(){
+        return ready;
+    }
+
+    public void close(){
+        try {
+            this.process.destroy();
+            this.socket.close();
+            this.serverSocket.close();
+            System.out.println("Everything closed");
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java
new file mode 100644
index 0000000..09b394d
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.function.PythonUdf;
+import org.apache.wayang.core.api.exception.WayangException;
+
+public class PythonWorkerManager<Input, Output> {
+
+    private PythonUdf<Input, Output> udf;
+    private ByteString serializedUDF;
+    private Iterable<Input> inputIterator;
+
+    public PythonWorkerManager(
+            PythonUdf<Input, Output> udf,
+            ByteString serializedUDF,
+            Iterable<Input> input
+    ){
+        this.udf = udf;
+        this.serializedUDF = serializedUDF;
+        this.inputIterator = input;
+    }
+
+    public Iterable<Output> execute(){
+        PythonProcessCaller worker = new PythonProcessCaller(this.serializedUDF);
+
+        if(worker.isReady()){
+
+            ProcessFeeder<Input, Output> feed = new ProcessFeeder<>(
+                    worker.getSocket(),
+                    this.udf,
+                    this.serializedUDF,
+                    this.inputIterator
+            );
+            feed.send();
+            ProcessReceiver<Output> r = new ProcessReceiver<>(worker.getSocket());
+
+            //r.print();
+            return r.getIterable();
+            //return (Iterable<Output>) this.inputIterator;
+
+        } else{
+
+            int port = worker.getSocket().getLocalPort();
+            worker.close();
+            throw new WayangException("Not possible to work with the Socket provided on port: " + port);
+        }
+
+    }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java
new file mode 100644
index 0000000..1f90358
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class ReaderIterator <Output> implements Iterator<Output> {
+
+    private Output nextObj = null;
+    private boolean eos = false;
+    private boolean fst = false;
+    private DataInputStream stream = null;
+
+    public ReaderIterator(DataInputStream stream) {
+
+        this.stream = stream;
+        this.eos = false;
+        this.nextObj = null;
+    }
+
+    private Output read() {
+
+        int END_OF_DATA_SECTION = -1;
+
+        try {
+            int length = this.stream.readInt();
+
+            if (length > 0) {
+                byte[] obj = new byte[length];
+                stream.readFully(obj);
+                String s = new String(obj, StandardCharsets.UTF_8);
+                Output it = (Output) s;
+                return it;
+            } else if (length == END_OF_DATA_SECTION) {
+                this.eos = true;
+                return null;
+            }
+        } catch (IOException e) {
+            //e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+
+        if(!this.eos){
+            nextObj = read();
+            System.out.println(nextObj + " " + !this.eos);
+
+            /*To work with null values it is suppose to use -5
+            if(this.nextObj == null){
+                return false;
+            }*/
+
+            return !this.eos;
+        }
+
+        return false;
+    }
+
+    @Override
+    public Output next() {
+
+        if(!this.eos){
+            Output obj = nextObj;
+            nextObj = null;
+            return obj;
+        }
+
+        throw new NoSuchElementException();
+    }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUdf.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUdf.java
new file mode 100644
index 0000000..88134b3
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUdf.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.function;
+
+import org.apache.wayang.core.function.FunctionDescriptor;
+
+public interface PythonUdf<Input, Output> extends FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>>{
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedPythonFunction.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedPythonFunction.java
new file mode 100644
index 0000000..26382f4
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedPythonFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.function;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.executor.PythonWorkerManager;
+import org.apache.wayang.core.function.FunctionDescriptor;
+
+public class WrappedPythonFunction<Input, Output>  implements FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>> {
+
+    private PythonUdf<Input, Output> myUDF;
+    private ByteString serializedUDF;
+
+    public WrappedPythonFunction(PythonUdf<Input, Output> myUDF, ByteString serializedUDF){
+        this.myUDF = myUDF;
+        this.serializedUDF = serializedUDF;
+    }
+
+    @Override
+    public Iterable<Output> apply(Iterable<Input> input) {
+
+        PythonWorkerManager<Input, Output> manager = new PythonWorkerManager<>(myUDF, serializedUDF, input);
+        Iterable<Output> output = manager.execute();
+        return output;
+    }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/python/worker.py b/wayang-api/wayang-api-python/src/main/python/worker.py
new file mode 100644
index 0000000..6e3d54a
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/python/worker.py
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import socket
+import struct
+import pickle
+import base64
+import re
+import sys
+
+
+class SpecialLengths(object):
+    END_OF_DATA_SECTION = -1
+    PYTHON_EXCEPTION_THROWN = -2
+    TIMING_DATA = -3
+    END_OF_STREAM = -4
+    NULL = -5
+    START_ARROW_STREAM = -6
+
+
+def read_int(stream):
+    length = stream.read(4)
+    if not length:
+        raise EOFError
+    res = struct.unpack("!i", length)[0]
+    return res
+
+
+class UTF8Deserializer:
+    """
+    Deserializes streams written by String.getBytes.
+    """
+
+    def __init__(self, use_unicode=True):
+        self.use_unicode = use_unicode
+
+    def loads(self, stream):
+        length = read_int(stream)
+        if length == SpecialLengths.END_OF_DATA_SECTION:
+            raise EOFError
+        elif length == SpecialLengths.NULL:
+            return None
+        s = stream.read(length)
+        return s.decode("utf-8") if self.use_unicode else s
+
+    def load_stream(self, stream):
+        try:
+            while True:
+                yield self.loads(stream)
+        except struct.error:
+            return
+        except EOFError:
+            return
+
+    def __repr__(self):
+        return "UTF8Deserializer(%s)" % self.use_unicode
+
+
+def write_int(p, outfile):
+    outfile.write(struct.pack("!i", p))
+
+
+def write_with_length(obj, stream):
+    serialized = obj.encode('utf-8')
+    if serialized is None:
+        raise ValueError("serialized value should not be None")
+    if len(serialized) > (1 << 31):
+        raise ValueError("can not serialize object larger than 2G")
+    write_int(len(serialized), stream)
+    stream.write(serialized)
+
+
+def dump_stream(iterator, stream):
+
+    for obj in iterator:
+        if type(obj) is str:
+            write_with_length(obj, stream)
+        ## elif type(obj) is list:
+        ##    write_with_length(obj, stream)
+    print("Termine")
+    write_int(SpecialLengths.END_OF_DATA_SECTION, stream)
+    print("Escribi Fin")
+
+
+def process(infile, outfile):
+    """udf64 = os.environ["UDF"]
+    print("udf64")
+    print(udf64)
+    #serialized_udf = binascii.a2b_base64(udf64)
+    #serialized_udf = base64.b64decode(udf64)
+    serialized_udf = bytearray(udf64, encoding='utf-16')
+    # NOT VALID TO BE UTF8  serialized_udf = bytes(udf64, 'UTF-8')
+    print("serialized_udf")
+    print(serialized_udf)
+    # input to be ast.literal_eval(serialized_udf)
+    func = pickle.loads(serialized_udf, encoding="bytes")
+    print ("func")
+    print (func)
+    print(func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
+    # func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])"""
+
+
+
+    # TODO First we must receive the operator + UDF
+    """udf = lambda elem: elem.lower()
+
+    def func(it):
+        return sorted(it, key=udf)"""
+    udf_length = read_int(infile)
+    print("udf_length")
+    print(udf_length)
+    serialized_udf = infile.read(udf_length)
+    print("serialized_udf")
+    print(serialized_udf)
+    #base64_message = base64.b64decode(serialized_udf + "===")
+    #print("base64_message")
+    #print(base64_message)
+    func = pickle.loads(serialized_udf)
+    #func = ori.lala(serialized_udf)
+    #print (func)
+    #for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)
+
+    """print("example")
+    for x in func("2344|234|efrf|$#|ffrf"): print(x)"""
+    # TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types
+    iterator = UTF8Deserializer().load_stream(infile)
+    # out_iter = sorted(iterator, key=lambda elem: elem.lower())
+    out_iter = func(iterator)
+    dump_stream(iterator=out_iter, stream=outfile)
+
+
+def local_connect(port):
+    sock = None
+    errors = []
+    # Support for both IPv4 and IPv6.
+    # On most of IPv6-ready systems, IPv6 will take precedence.
+    for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+        af, socktype, proto, _, sa = res
+        try:
+            sock = socket.socket(af, socktype, proto)
+            # sock.settimeout(int(os.environ.get("SPARK_AUTH_SOCKET_TIMEOUT", 15)))
+            sock.settimeout(30)
+            sock.connect(sa)
+            # sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
+            sockfile = sock.makefile("rwb", 65536)
+            # _do_server_auth(sockfile, auth_secret)
+            return (sockfile, sock)
+        except socket.error as e:
+            emsg = str(e)
+            errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg))
+            sock.close()
+            sock = None
+    raise Exception("could not open socket: %s" % errors)
+
+
+if __name__ == '__main__':
+    print("Python version")
+    print (sys.version)
+    java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
+    sock_file, sock = local_connect(java_port)
+    process(sock_file, sock_file)
+    sock_file.flush()
+    exit()
diff --git a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
index ff3a99f..99b8470 100644
--- a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
+++ b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
@@ -15,4 +15,5 @@
 # limitations under the License.
 #
 
-# TODO: Add properties here
\ No newline at end of file
+wayang.api.python.worker = /Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/python/worker.py
+# wayang.api.python.worker = src/main/python/worker.py
diff --git a/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/BasicPython.java b/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/BasicPython.java
new file mode 100644
index 0000000..5108de2
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/BasicPython.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python;
+
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.basic.operators.*;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.function.FlatMapDescriptor;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.core.types.DataUnitType;
+import org.apache.wayang.java.Java;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+
+public class BasicPython {
+
+    public static URI createUri(String resourcePath) {
+        try {
+            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Illegal URI.", e);
+        }
+
+    }
+
+    @Test
+    public void testOnJava() throws URISyntaxException, IOException {
+
+        URI FILE_SOME_LINES_TXT = createUri("/python-lines.txt");
+
+        // Instantiate Rheem and activate the backend.
+        WayangContext rheemContext = new WayangContext().with(Java.basicPlugin());
+        TextFileSource textFileSource = new TextFileSource(FILE_SOME_LINES_TXT.toString());
+
+        TextFileSink<String> sink = new TextFileSink<String>(
+                "file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/basic_output.txt",
+                String.class
+        );
+
+        textFileSource.connectTo(0, sink, 0);
+
+        rheemContext.execute(new WayangPlan(sink));
+    }
+}
+
diff --git a/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/SortPython.java b/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/SortPython.java
new file mode 100644
index 0000000..7b74e77
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/test/java/org/apache/wayang/api/python/SortPython.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.api.python;
+
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.basic.operators.*;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.function.FlatMapDescriptor;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.core.types.DataUnitType;
+import org.apache.wayang.java.Java;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+
+public class SortPython {
+    /*
+    public static URI createUri(String resourcePath) {
+        try {
+            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Illegal URI.", e);
+        }
+
+    }
+
+    @Test
+    public void testOnJava() throws URISyntaxException, IOException {
+
+        URI FILE_SOME_LINES_TXT = createUri("/python-lines.txt");
+
+        // Instantiate Rheem and activate the backend.
+        WayangContext rheemContext = new WayangContext().with(Java.basicPlugin());
+        TextFileSource textFileSource = new TextFileSource(FILE_SOME_LINES_TXT.toString());
+
+        // for each line (input) output an iterator of the words
+        FlatMapOperator<String, String> flatMapOperator
+                = new FlatMapOperator<>(
+                new FlatMapDescriptor<>(
+                        line -> Arrays.asList(
+                                (String[]) line.split(" ")
+                        ),
+                        String.class,
+                        String.class
+                )
+        );
+
+        SortOperator<String, String> sortJava
+                = new SortOperator<String, String>(
+                new TransformationDescriptor<String, String>(
+                        (l) -> l.toLowerCase(),
+                        DataUnitType.createBasic(String.class),
+                        DataUnitType.createBasic(String.class)
+                )
+        );
+
+        MapPartitionsOperator<String, String> sortPython =
+                new MapPartitionsOperator<String, String>(
+                        new MapPartitionsDescriptor<String, String>(
+                                new WrappedPythonFunction<String, String>(
+                                        l -> l
+                                ),
+                                String.class,
+                                String.class
+                        )
+                );
+
+        //URI outputfile = Thread.currentThread().getClass().getResource("/output.txt").toURI();
+
+        TextFileSink<String> sink = new TextFileSink<String>(
+                "file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/output.txt",
+                String.class
+        );
+
+        textFileSource.connectTo(0, flatMapOperator, 0);
+        flatMapOperator.connectTo(0, sortPython, 0);
+        sortPython.connectTo(0, sink, 0);
+
+        rheemContext.execute(new WayangPlan(sink));
+    }
+     */
+}
diff --git a/wayang-api/wayang-api-rest/pom.xml b/wayang-api/wayang-api-rest/pom.xml
new file mode 100644
index 0000000..ef86974
--- /dev/null
+++ b/wayang-api/wayang-api-rest/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>wayang-api</artifactId>
+        <groupId>org.apache.wayang</groupId>
+        <version>0.6.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>wayang-api-rest</artifactId>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-dependencies</artifactId>
+                <version>2.2.6.RELEASE</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+<!--        TODO add the version fo protobuf with parameter-->
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.15.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-core</artifactId>
+            <version>0.6.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-java</artifactId>
+            <version>0.6.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-api-python</artifactId>
+            <version>0.6.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-serializable</artifactId>
+            <version>0.6.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-api-python</artifactId>
+            <version>0.6.1-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>java8</id>
+            <activation>
+                <jdk>1.8</jdk>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.wayang</groupId>
+                    <artifactId>wayang-spark_2.11</artifactId>
+                    <version>0.6.1-SNAPSHOT</version>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>java11</id>
+            <activation>
+                <jdk>11</jdk>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.wayang</groupId>
+                    <artifactId>wayang-spark_2.12</artifactId>
+                    <version>0.6.1-SNAPSHOT</version>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+</project>
\ No newline at end of file
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/WayangRESTAPI.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/WayangRESTAPI.java
new file mode 100644
index 0000000..8b37b7c
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/WayangRESTAPI.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest;
+
+public class WayangRESTAPI {
+}
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/WayangApplication.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/WayangApplication.java
new file mode 100644
index 0000000..b9b6a76
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/WayangApplication.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class WayangApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(WayangApplication.class, args);
+    }
+}
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java
new file mode 100644
index 0000000..649c5ef
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring.decoder;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.basic.operators.MapPartitionsOperator;
+import org.apache.wayang.basic.operators.TextFileSink;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.basic.operators.UnionAllOperator;
+import org.apache.wayang.commons.serializable.OperatorProto;
+import org.apache.wayang.commons.serializable.PlanProto;
+import org.apache.wayang.commons.serializable.WayangPlanProto;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.Base64;
+
+public class WayangPlanBuilder {
+
+    private WayangPlan wayangPlan;
+    private WayangContext wayangContext;
+
+    public WayangPlanBuilder(FileInputStream planFile){
+        try {
+
+            WayangPlanProto plan = WayangPlanProto.parseFrom(planFile);
+
+            this.wayangContext = buildContext(plan);
+            this.wayangPlan = buildPlan(plan);
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public WayangPlanBuilder(String writtenPlan){
+
+        System.out.println(writtenPlan);
+        byte[] message = Base64.getDecoder().decode(writtenPlan);
+        System.out.println(message);
+
+        try {
+            WayangPlanProto plan = WayangPlanProto.parseFrom(message);
+
+            this.wayangContext = buildContext(plan);
+            this.wayangPlan = buildPlan(plan);
+        } catch (InvalidProtocolBufferException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    private WayangContext buildContext(WayangPlanProto plan){
+
+        WayangContext ctx = new WayangContext();
+        plan.getContext().getPlatformsList().forEach(platform -> {
+            if (platform.getNumber() == 0)
+                ctx.with(Java.basicPlugin());
+            else if (platform.getNumber() == 1)
+                ctx.with(Spark.basicPlugin());
+        });
+        //ctx.with(Spark.basicPlugin());
+
+        return ctx;
+    }
+
+    private WayangPlan buildPlan(WayangPlanProto plan){
+
+        System.out.println(plan);
+
+        PlanProto planProto = plan.getPlan();
+        LinkedList<OperatorProto> protoList = new LinkedList<>();
+        planProto.getSourcesList().forEach(protoList::addLast);
+
+        Map<String, OperatorBase> operators = new HashMap<>();
+        List<OperatorBase> sinks = new ArrayList<>();
+        while(! protoList.isEmpty()) {
+
+            OperatorProto proto = protoList.pollFirst();
+
+            /* Checking if protoOperator can be connected to the current WayangPlan*/
+            boolean processIt;
+            if(proto.getType().equals("source")) processIt = true;
+
+            else {
+                /* Checking if ALL predecessors were already processed */
+                processIt = true;
+                for(String predecessor : proto.getPredecessorsList()){
+                    if (!operators.containsKey(predecessor)) {
+                        processIt = false;
+                        break;
+                    }
+                }
+            }
+
+            /* Operators should not be processed twice*/
+            if(operators.containsKey(proto.getId())) processIt = false;
+
+            if(processIt) {
+
+                /* Create and store Wayang operator */
+                OperatorBase operator = createOperatorByType(proto);
+                operators.put(proto.getId(), operator);
+
+                /*TODO Connect with predecessors requires more details in connection slot*/
+                int order = 0;
+                for (String pre_id : proto.getPredecessorsList()) {
+
+                    OperatorBase predecessor = operators.get(pre_id);
+                    /* Only works without replicate topology */
+                    predecessor.connectTo(0, operator, order);
+                    order++;
+
+                    if(proto.getType().equals("sink")){
+                        sinks.add(operator);
+                        //if(!sinks.contains(operator)) {
+                        //    sinks.add(operator);
+                        //}
+                    }
+                }
+
+                /*List of OperatorProto successors
+                 * They will be added to the protoList
+                 * nevertheless they must be processed only if the parents are in operators list */
+                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : listSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : sinkSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+            } else {
+
+                /* In case we cannot process it yet, It must be added again at the end*/
+                protoList.addLast(proto);
+            }
+        }
+
+        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
+        return wayangPlan;
+    }
+
+    public OperatorBase createOperatorByType(OperatorProto operator){
+
+        System.out.println("Typo: " + operator.getType());
+        switch(operator.getType()){
+            case "source":
+                try {
+                    String source_path = operator.getPath();
+                    URL url = new File(source_path).toURI().toURL();
+                    return new TextFileSource(url.toString());
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "sink":
+                try {
+                    String sink_path = operator.getPath();
+                    URL url = new File(sink_path).toURI().toURL();
+                    return new TextFileSink<String>(
+                            url.toString(),
+                            String.class
+                    );
+
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "map_partition":
+                return new MapPartitionsOperator<>(
+                        new MapPartitionsDescriptor<String, String>(
+                                new WrappedPythonFunction<String, String>(
+                                        l -> l,
+                                        operator.getUdf()
+                                ),
+                                String.class,
+                                String.class
+                        )
+                );
+
+            case "union":
+                return new UnionAllOperator<String>(
+                        String.class
+                );
+
+        }
+
+        throw new WayangException("Operator Type not supported");
+    }
+
+    public WayangContext getWayangContext() {
+        return wayangContext;
+    }
+
+    public WayangPlan getWayangPlan() {
+        return wayangPlan;
+    }
+}
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java
new file mode 100644
index 0000000..4f48863
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring.general;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
+import org.apache.wayang.basic.operators.*;
+import org.apache.wayang.commons.serializable.OperatorProto;
+import org.apache.wayang.commons.serializable.PlanProto;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+import org.apache.wayang.commons.serializable.WayangPlanProto;
+import org.springframework.web.multipart.MultipartFile;
+
+
+@RestController
+public class WayangController {
+
+    @GetMapping("/plan/create/fromfile")
+    public String planFromFile(
+            //@RequestParam("file") MultipartFile file
+    ){
+
+        try {
+            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
+            WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);
+
+            /*TODO ADD id to executions*/
+            wpb.getWayangContext().execute(wpb.getWayangPlan());
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return "Builder works";
+    }
+
+    @PostMapping("/plan/create")
+    public String planFromMessage(
+            @RequestParam("message") String message
+    ){
+
+        WayangPlanBuilder wpb = new WayangPlanBuilder(message);
+
+        /*TODO ADD id to executions*/
+        wpb.getWayangContext().execute(wpb.getWayangPlan());
+
+        return "";
+    }
+
+    @GetMapping("/")
+    public String all(){
+        System.out.println("detected!");
+
+        try {
+            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
+            WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);
+
+            WayangContext wc = buildContext(plan);
+            WayangPlan wp = buildPlan(plan);
+
+            System.out.println("Plan!");
+            System.out.println(wp.toString());
+
+            wc.execute(wp);
+            return("Works!");
+
+        } catch (FileNotFoundException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return "Not working";
+    }
+
+    private WayangContext buildContext(WayangPlanProto plan){
+
+        WayangContext ctx = new WayangContext();
+        plan.getContext().getPlatformsList().forEach(platform -> {
+            if (platform.getNumber() == 0)
+                ctx.with(Java.basicPlugin());
+            else if (platform.getNumber() == 1)
+                ctx.with(Spark.basicPlugin());
+        });
+        //ctx.with(Spark.basicPlugin());
+
+        return ctx;
+    }
+
+    private WayangPlan buildPlan(WayangPlanProto plan){
+
+        System.out.println(plan);
+
+        PlanProto planProto = plan.getPlan();
+        LinkedList<OperatorProto> protoList = new LinkedList<>();
+        planProto.getSourcesList().forEach(protoList::addLast);
+
+        Map<String, OperatorBase> operators = new HashMap<>();
+        List<OperatorBase> sinks = new ArrayList<>();
+        while(! protoList.isEmpty()) {
+
+            OperatorProto proto = protoList.pollFirst();
+
+            /* Checking if protoOperator can be connected to the current WayangPlan*/
+            boolean processIt;
+            if(proto.getType().equals("source")) processIt = true;
+
+            else {
+                /* Checking if ALL predecessors were already processed */
+                processIt = true;
+                for(String predecessor : proto.getPredecessorsList()){
+                    if (!operators.containsKey(predecessor)) {
+                        processIt = false;
+                        break;
+                    }
+                }
+            }
+
+            /* Operators should not be processed twice*/
+            if(operators.containsKey(proto.getId())) processIt = false;
+
+            if(processIt) {
+
+                /* Create and store Wayang operator */
+                OperatorBase operator = createOperatorByType(proto);
+                operators.put(proto.getId(), operator);
+
+                /*TODO Connect with predecessors requires more details in connection slot*/
+                int order = 0;
+                for (String pre_id : proto.getPredecessorsList()) {
+
+                    OperatorBase predecessor = operators.get(pre_id);
+                    /* Only works without replicate topology */
+                    predecessor.connectTo(0, operator, order);
+                    order++;
+
+                    if(proto.getType().equals("sink")){
+                        sinks.add(operator);
+                        //if(!sinks.contains(operator)) {
+                        //    sinks.add(operator);
+                        //}
+                    }
+                }
+
+                /*List of OperatorProto successors
+                 * They will be added to the protoList
+                 * nevertheless they must be processed only if the parents are in operators list */
+                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : listSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : sinkSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+            } else {
+
+                /* In case we cannot process it yet, It must be added again at the end*/
+                protoList.addLast(proto);
+            }
+        }
+
+        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
+        return wayangPlan;
+    }
+
+    public OperatorBase createOperatorByType(OperatorProto operator){
+
+        System.out.println("Typo: " + operator.getType());
+        switch(operator.getType()){
+            case "source":
+                try {
+                    String source_path = operator.getPath();
+                    URL url = new File(source_path).toURI().toURL();
+                    return new TextFileSource(url.toString());
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "sink":
+                try {
+                    String sink_path = operator.getPath();
+                    URL url = new File(sink_path).toURI().toURL();
+                    return new TextFileSink<String>(
+                            url.toString(),
+                            String.class
+                    );
+
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "reduce_by_key":
+                try {
+                    /* Function to be applied in Python workers */
+                    ByteString function = operator.getUdf();
+
+                    /* Has dimension or positions that compose GroupKey */
+                    Map<String, String> parameters = operator.getParametersMap();
+
+                    PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
+                        operator.getParametersMap(),
+                        operator.getUdf() ,
+                        String.class,
+                        String.class,
+                            false
+                    );
+
+                    String sink_path = operator.getPath();
+                    URL url = new File(sink_path).toURI().toURL();
+                    return new TextFileSink<String>(
+                            url.toString(),
+                            String.class
+                    );
+
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "map_partition":
+                return new MapPartitionsOperator<>(
+                    new MapPartitionsDescriptor<String, String>(
+                        new WrappedPythonFunction<String, String>(
+                            l -> l,
+                            operator.getUdf()
+                        ),
+                        String.class,
+                        String.class
+                    )
+                );
+
+            case "union":
+                return new UnionAllOperator<String>(
+                        String.class
+                );
+
+        }
+
+        throw new WayangException("Operator Type not supported");
+    }
+
+    public static URI createUri(String resourcePath) {
+        try {
+            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Illegal URI.", e);
+        }
+
+    }
+
+}
diff --git a/wayang-api/wayang-api-rest/src/main/resources/application.properties b/wayang-api/wayang-api-rest/src/main/resources/application.properties
new file mode 100644
index 0000000..14b1a83
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/resources/application.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+server.port = 8080
diff --git a/wayang-commons/pom.xml b/wayang-commons/pom.xml
index a0f1d18..58db211 100644
--- a/wayang-commons/pom.xml
+++ b/wayang-commons/pom.xml
@@ -209,6 +209,7 @@
         <module>wayang-core</module>
         <module>wayang-basic</module>
         <module>wayang-utils-profile-db</module>
+        <module>wayang-serializable</module>
     </modules>
 
 </project>
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PyWayangReduceByOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PyWayangReduceByOperator.java
new file mode 100644
index 0000000..1951b31
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/PyWayangReduceByOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.operators;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
+import org.apache.wayang.core.types.DataSetType;
+
+import java.util.Map;
+
+public class PyWayangReduceByOperator<Type, Key> extends UnaryToUnaryOperator<Type, Type> {
+
+    protected final Map<String, String> parameters;
+    protected final ByteString reduceDescriptor;
+
+    public PyWayangReduceByOperator(Map<String, String> parameters,
+                                       ByteString reduceDescriptor,
+                                       DataSetType<Type> inputType, DataSetType<Type> outputType, boolean isSupportingBroadcastInputs) {
+        super(inputType, outputType, isSupportingBroadcastInputs);
+        this.parameters = parameters;
+        this.reduceDescriptor = reduceDescriptor;
+    }
+
+    public PyWayangReduceByOperator(Map<String, String> parameters,
+                                    ByteString reduceDescriptor,
+                                    Class<Type> inputType, Class<Type> outputType, boolean isSupportingBroadcastInputs) {
+        super(DataSetType.createDefault(inputType), DataSetType.createDefault(outputType), isSupportingBroadcastInputs);
+        this.parameters = parameters;
+        this.reduceDescriptor = reduceDescriptor;
+    }
+
+    public PyWayangReduceByOperator(PyWayangReduceByOperator<Type, Type> that) {
+        super(that);
+        this.parameters = that.getParameters();
+        this.reduceDescriptor = that.getReduceDescriptor();
+    }
+
+    public Map<String, String>  getParameters() {
+        return this.parameters;
+    }
+
+    public ByteString getReduceDescriptor() {
+        return this.reduceDescriptor;
+    }
+
+}
\ No newline at end of file
diff --git a/wayang-commons/wayang-serializable/pom.xml b/wayang-commons/wayang-serializable/pom.xml
new file mode 100644
index 0000000..87b17ea
--- /dev/null
+++ b/wayang-commons/wayang-serializable/pom.xml
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>wayang-commons</artifactId>
+        <groupId>org.apache.wayang</groupId>
+        <version>0.6.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>wayang-serializable</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <protobuf.version>3.15.2</protobuf.version>
+        <protoc.version>3.15.2</protoc.version>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.6.2</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>0.5.1</version>
+                <configuration>
+                    <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>compile-python</goal>
+                            <goal>test-compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <!-- Copy all the protobuf code to the folder PyWayang -->
+        <resources>
+            <resource>
+                <!-- TODO: watch if is it possible to put it with parameters -->
+                <directory>${basedir}/target/generated-sources/protobuf/python</directory><!-- from -->
+                <targetPath>${basedir}/../../pywayang/protobuf</targetPath><!-- to -->
+                <includes><!-- Only python files -->
+                    <include>**/*.py</include>
+                </includes>
+            </resource>
+        </resources>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>protof-unix</id>
+            <activation>
+                <os>
+                    <family>unix</family>
+                </os>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.xolstice.maven.plugins</groupId>
+                        <artifactId>protobuf-maven-plugin</artifactId>
+                        <configuration>
+                            <!--This one is for Linux installation -->
+                            <protocExecutable>/usr/bin/protoc</protocExecutable>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>protof-mac</id>
+            <activation>
+                <os>
+                    <family>mac</family>
+                </os>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.xolstice.maven.plugins</groupId>
+                        <artifactId>protobuf-maven-plugin</artifactId>
+                        <configuration>
+                            <!--This one is for Mac executions-->
+                            <protocExecutable>/usr/local/bin/protoc</protocExecutable>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+</project>
\ No newline at end of file
diff --git a/wayang-commons/wayang-serializable/src/main/proto/pywayangplan.proto b/wayang-commons/wayang-serializable/src/main/proto/pywayangplan.proto
new file mode 100644
index 0000000..5fb373e
--- /dev/null
+++ b/wayang-commons/wayang-serializable/src/main/proto/pywayangplan.proto
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax="proto2";
+
+option java_multiple_files = true;
+option java_package = "org.apache.wayang.commons.serializable";
+
+/*Java Operator or concatenated Python Pipeline*/
+message OperatorProto{
+  optional string id = 1;
+  optional string type = 2;
+  optional bytes udf = 3;
+  optional string path = 4;
+  // repeated OperatorProto predecessors = 5;
+  repeated string predecessors = 5;
+  // repeated OperatorProto successors = 6;
+  repeated string successors = 6;
+  map<string, string> parameters = 7;
+}
+
+message PlanProto{
+
+  enum TypesProto{
+    string = 0;
+    integer = 1;
+  }
+
+  repeated OperatorProto sources = 1;
+  repeated OperatorProto operators = 2;
+  repeated OperatorProto sinks = 3;
+  optional TypesProto input = 4;
+  optional TypesProto output = 5;
+
+}
+
+message ContextProto{
+
+  enum PlatformProto{
+    java = 0;
+    spark = 1;
+  }
+
+  repeated PlatformProto platforms = 1;
+}
+
+message WayangPlanProto{
+
+  optional PlanProto plan = 1;
+  optional ContextProto context = 2;
+
+}
\ No newline at end of file
diff --git a/wayang-commons/wayang-serializable/src/main/python/pyplangenerator.sh b/wayang-commons/wayang-serializable/src/main/python/pyplangenerator.sh
new file mode 100644
index 0000000..f86efb5
--- /dev/null
+++ b/wayang-commons/wayang-serializable/src/main/python/pyplangenerator.sh
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+protoc -I=/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf \
+--python_out=/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/protobuf/ \
+/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/pywayangplan.proto
\ No newline at end of file