blob: 649c5efad9b756cc726325400bdfc2c8d91d583c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.api.rest.server.spring.decoder;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.wayang.api.python.function.WrappedPythonFunction;
import org.apache.wayang.basic.operators.MapPartitionsOperator;
import org.apache.wayang.basic.operators.TextFileSink;
import org.apache.wayang.basic.operators.TextFileSource;
import org.apache.wayang.basic.operators.UnionAllOperator;
import org.apache.wayang.commons.serializable.OperatorProto;
import org.apache.wayang.commons.serializable.PlanProto;
import org.apache.wayang.commons.serializable.WayangPlanProto;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.MapPartitionsDescriptor;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
import java.util.Base64;
public class WayangPlanBuilder {
private WayangPlan wayangPlan;
private WayangContext wayangContext;
public WayangPlanBuilder(FileInputStream planFile){
try {
WayangPlanProto plan = WayangPlanProto.parseFrom(planFile);
this.wayangContext = buildContext(plan);
this.wayangPlan = buildPlan(plan);
} catch (IOException e) {
e.printStackTrace();
}
}
public WayangPlanBuilder(String writtenPlan){
System.out.println(writtenPlan);
byte[] message = Base64.getDecoder().decode(writtenPlan);
System.out.println(message);
try {
WayangPlanProto plan = WayangPlanProto.parseFrom(message);
this.wayangContext = buildContext(plan);
this.wayangPlan = buildPlan(plan);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
private WayangContext buildContext(WayangPlanProto plan){
WayangContext ctx = new WayangContext();
plan.getContext().getPlatformsList().forEach(platform -> {
if (platform.getNumber() == 0)
ctx.with(Java.basicPlugin());
else if (platform.getNumber() == 1)
ctx.with(Spark.basicPlugin());
});
//ctx.with(Spark.basicPlugin());
return ctx;
}
private WayangPlan buildPlan(WayangPlanProto plan){
System.out.println(plan);
PlanProto planProto = plan.getPlan();
LinkedList<OperatorProto> protoList = new LinkedList<>();
planProto.getSourcesList().forEach(protoList::addLast);
Map<String, OperatorBase> operators = new HashMap<>();
List<OperatorBase> sinks = new ArrayList<>();
while(! protoList.isEmpty()) {
OperatorProto proto = protoList.pollFirst();
/* Checking if protoOperator can be connected to the current WayangPlan*/
boolean processIt;
if(proto.getType().equals("source")) processIt = true;
else {
/* Checking if ALL predecessors were already processed */
processIt = true;
for(String predecessor : proto.getPredecessorsList()){
if (!operators.containsKey(predecessor)) {
processIt = false;
break;
}
}
}
/* Operators should not be processed twice*/
if(operators.containsKey(proto.getId())) processIt = false;
if(processIt) {
/* Create and store Wayang operator */
OperatorBase operator = createOperatorByType(proto);
operators.put(proto.getId(), operator);
/*TODO Connect with predecessors requires more details in connection slot*/
int order = 0;
for (String pre_id : proto.getPredecessorsList()) {
OperatorBase predecessor = operators.get(pre_id);
/* Only works without replicate topology */
predecessor.connectTo(0, operator, order);
order++;
if(proto.getType().equals("sink")){
sinks.add(operator);
//if(!sinks.contains(operator)) {
// sinks.add(operator);
//}
}
}
/*List of OperatorProto successors
* They will be added to the protoList
* nevertheless they must be processed only if the parents are in operators list */
List<OperatorProto> listSuccessors = planProto.getOperatorsList()
.stream()
.filter(t -> proto.getSuccessorsList().contains(t.getId()))
.collect(Collectors.toList());
for (OperatorProto successor : listSuccessors){
if(!protoList.contains(successor)){
protoList.addLast(successor);
}
}
List<OperatorProto> sinkSuccessors = planProto.getSinksList()
.stream()
.filter(t -> proto.getSuccessorsList().contains(t.getId()))
.collect(Collectors.toList());
for (OperatorProto successor : sinkSuccessors){
if(!protoList.contains(successor)){
protoList.addLast(successor);
}
}
} else {
/* In case we cannot process it yet, It must be added again at the end*/
protoList.addLast(proto);
}
}
WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
return wayangPlan;
}
public OperatorBase createOperatorByType(OperatorProto operator){
System.out.println("Typo: " + operator.getType());
switch(operator.getType()){
case "source":
try {
String source_path = operator.getPath();
URL url = new File(source_path).toURI().toURL();
return new TextFileSource(url.toString());
} catch (MalformedURLException e) {
e.printStackTrace();
}
break;
case "sink":
try {
String sink_path = operator.getPath();
URL url = new File(sink_path).toURI().toURL();
return new TextFileSink<String>(
url.toString(),
String.class
);
} catch (MalformedURLException e) {
e.printStackTrace();
}
break;
case "map_partition":
return new MapPartitionsOperator<>(
new MapPartitionsDescriptor<String, String>(
new WrappedPythonFunction<String, String>(
l -> l,
operator.getUdf()
),
String.class,
String.class
)
);
case "union":
return new UnionAllOperator<String>(
String.class
);
}
throw new WayangException("Operator Type not supported");
}
public WayangContext getWayangContext() {
return wayangContext;
}
public WayangPlan getWayangPlan() {
return wayangPlan;
}
}