blob: b63dcbbcf49474a5b7c9ad6b6eb36c6bfb5b3547 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import protobuf.pywayangplan_pb2 as pwb
import os
import cloudpickle
import logging
import pathlib
import requests
import base64
# Writes Wayang Plan from several stages
class MessageWriter:
sources = []
operators = []
sinks = []
operator_references = {}
boundaries = {}
# Creates and appends Source type of operator
def add_source(self, operator_id, operator_type, path):
source = pwb.OperatorProto()
source.id = str(operator_id)
source.type = operator_type
source.path = os.path.abspath(path)
source.udf = chr(0).encode('utf-8')
# source.parameters = {}
self.sources.append(source)
return source
# Creates and appends Sink type of operator
def add_sink(self, operator_id, operator_type, path):
sink = pwb.OperatorProto()
sink.id = str(operator_id)
sink.type = operator_type
sink.path = os.path.abspath(path)
sink.udf = chr(0).encode('utf-8')
# sink.parameters = {}
self.sinks.append(sink)
return sink
# Creates and appends a Python operator
# Python OP don't require parameters, UDF has the function ready to be executed directly
def add_operator(self, operator_id, operator_type, udf):
op = pwb.OperatorProto()
op.id = str(operator_id)
op.type = operator_type
op.udf = cloudpickle.dumps(udf)
op.path = str(None)
# op.parameters = {}
self.operators.append(op)
return op
# Creates and appends a Java operator
def add_java_operator(self, operator_id, operator_type, udf, parameters):
op = pwb.OperatorProto()
op.id = str(operator_id)
op.type = operator_type
op.udf = cloudpickle.dumps(udf)
op.path = str(None)
#op.parameters = parameters
for param in parameters:
print(param, parameters[param])
op.parameters[param] = str(parameters[param])
# op.parameters[]
#m.mapfield[5] = 10
self.operators.append(op)
return op
# Receive a chain of operators, separate them in Wayang Operators
# Compacts several Python executable operators in one Map Partition Wayang Operator
def process_pipeline(self, stage):
nested_udf = None
nested_id = ""
nested_predecessors = None
nested_successors = None
for node in reversed(stage):
logging.debug(node.operator_type + " executable: " + str(node.python_exec) + " id: " + str(node.id))
if not node.python_exec:
if nested_udf is not None:
# Predecessors depends on last operator
# Successors depends on first operator
op = self.add_operator(nested_id, "map_partition", nested_udf)
ids = str(nested_id).split(",")
for id in ids:
self.operator_references[str(id)] = op
self.boundaries[str(nested_id)] = {}
self.boundaries[str(nested_id)]["end"] = nested_successors
self.boundaries[str(nested_id)]["start"] = nested_predecessors
nested_udf = None
nested_id = ""
nested_predecessors = None
nested_successors = None
if node.operator.source:
op = self.add_source(node.id, node.operator_type, node.operator.udf)
self.operator_references[str(node.id)] = op
self.boundaries[str(node.id)] = {}
self.boundaries[str(node.id)]["end"] = node.successors.keys()
elif node.operator.sink:
op = self.add_sink(node.id, node.operator_type, node.operator.udf)
self.operator_references[str(node.id)] = op
self.boundaries[str(node.id)] = {}
self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
# Regular operator to be processed in Java
# Notice that those could include more parameters for Java
else:
op = self.add_java_operator(node.id, node.operator_type, node.operator.udf, node.operator.parameters)
self.operator_references[str(node.id)] = op
self.boundaries[str(node.id)] = {}
self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
self.boundaries[str(node.id)]["end"] = node.successors.keys()
else:
if nested_udf is None:
nested_udf = node.operator.udf
nested_id = node.id
# It is the last operator to execute in the map partition
nested_successors = node.successors.keys()
else:
nested_udf = self.concatenate(nested_udf, node.operator.udf)
nested_id = str(node.id) + "," + str(nested_id)
# Every iteration assign the first known predecessors
nested_predecessors = node.predecessors.keys()
# Just in case in the future some pipelines start with Python operators
if nested_udf is not None:
self.add_operator(nested_id, "map_partition", nested_udf)
ids = nested_id.split(",")
for id in ids:
self.operator_references[id] = op
self.boundaries[nested_id] = {}
self.boundaries[nested_id]["end"] = nested_successors
self.boundaries[nested_id]["start"] = nested_predecessors
def __init__(self):
pass
# Takes 2 Functions and compact them in only one function
@staticmethod
def concatenate(function_a, function_b):
def executable(iterable):
return function_a(function_b(iterable))
return executable
# Set dependencies over final Wayang Operators
def set_dependencies(self):
for source in self.sources:
if 'end' in self.boundaries[source.id]:
op_successors = []
for op_id in self.boundaries[source.id]['end']:
op_successors.append(str(self.operator_references[str(op_id)].id))
source.successors.extend(op_successors)
for sink in self.sinks:
if 'start' in self.boundaries[sink.id]:
op_predecessors = []
for op_id in self.boundaries[sink.id]['start']:
op_predecessors.append(str(self.operator_references[str(op_id)].id))
sink.predecessors.extend(op_predecessors)
for op in self.operators:
if 'start' in self.boundaries[op.id]:
op_predecessors = []
for op_id in self.boundaries[op.id]['start']:
op_predecessors.append(str(self.operator_references[str(op_id)].id))
op.predecessors.extend(op_predecessors)
if 'end' in self.boundaries[op.id]:
op_successors = []
for op_id in self.boundaries[op.id]['end']:
op_successors.append(str(self.operator_references[str(op_id)].id))
op.successors.extend(op_successors)
# Writes the message to a local directory
def write_message(self, descriptor):
finalpath = "../../protobuf/wayang_message"
plan_configuration = pwb.WayangPlanProto()
try:
f = open(finalpath, "rb")
plan_configuration.ParseFromString(f.read())
f.close()
except IOError:
logging.warn("File " + finalpath + " did not exist. System generated a new file")
plan = pwb.PlanProto()
plan.sources.extend(self.sources)
plan.operators.extend(self.operators)
plan.sinks.extend(self.sinks)
plan.input = pwb.PlanProto.string
plan.output = pwb.PlanProto.string
ctx = pwb.ContextProto()
# ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
for plug in descriptor.plugins:
ctx.platforms.append(plug.value)
# ctx.platforms.extend(descriptor.get_plugins())
plan_configuration.plan.CopyFrom(plan)
plan_configuration.context.CopyFrom(ctx)
f = open(finalpath, "wb")
f.write(plan_configuration.SerializeToString())
f.close()
pass
# Send message as bytes to the Wayang Rest API
def send_message(self, descriptor):
plan_configuration = pwb.WayangPlanProto()
plan = pwb.PlanProto()
plan.sources.extend(self.sources)
plan.operators.extend(self.operators)
plan.sinks.extend(self.sinks)
plan.input = pwb.PlanProto.string
plan.output = pwb.PlanProto.string
ctx = pwb.ContextProto()
# ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
for plug in descriptor.plugins:
ctx.platforms.append(plug.value)
# ctx.platforms.extend(descriptor.get_plugins())
plan_configuration.plan.CopyFrom(plan)
plan_configuration.context.CopyFrom(ctx)
print("plan!")
print(plan_configuration)
msg_bytes = plan_configuration.SerializeToString()
msg_64 = base64.b64encode(msg_bytes)
logging.debug(msg_bytes)
# response = requests.get("http://localhost:8080/plan/create/fromfile")
data = {
'message': msg_64
}
response = requests.post("http://localhost:8080/plan/create", data)
logging.debug(response)
# f = open(finalpath, "wb")
# f.write(plan_configuration.SerializeToString())
# f.close()
pass