blob: 4f488635099a4e5446f2253f937c02b79c7cb1b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.api.rest.server.spring.general;
import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.WrappedPythonFunction;
import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
import org.apache.wayang.basic.operators.*;
import org.apache.wayang.commons.serializable.OperatorProto;
import org.apache.wayang.commons.serializable.PlanProto;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.MapPartitionsDescriptor;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import org.apache.wayang.commons.serializable.WayangPlanProto;
import org.springframework.web.multipart.MultipartFile;
@RestController
public class WayangController {
@GetMapping("/plan/create/fromfile")
public String planFromFile(
//@RequestParam("file") MultipartFile file
){
try {
FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);
/*TODO ADD id to executions*/
wpb.getWayangContext().execute(wpb.getWayangPlan());
} catch (IOException e) {
e.printStackTrace();
}
return "Builder works";
}
@PostMapping("/plan/create")
public String planFromMessage(
@RequestParam("message") String message
){
WayangPlanBuilder wpb = new WayangPlanBuilder(message);
/*TODO ADD id to executions*/
wpb.getWayangContext().execute(wpb.getWayangPlan());
return "";
}
@GetMapping("/")
public String all(){
System.out.println("detected!");
try {
FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);
WayangContext wc = buildContext(plan);
WayangPlan wp = buildPlan(plan);
System.out.println("Plan!");
System.out.println(wp.toString());
wc.execute(wp);
return("Works!");
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return "Not working";
}
private WayangContext buildContext(WayangPlanProto plan){
WayangContext ctx = new WayangContext();
plan.getContext().getPlatformsList().forEach(platform -> {
if (platform.getNumber() == 0)
ctx.with(Java.basicPlugin());
else if (platform.getNumber() == 1)
ctx.with(Spark.basicPlugin());
});
//ctx.with(Spark.basicPlugin());
return ctx;
}
private WayangPlan buildPlan(WayangPlanProto plan){
System.out.println(plan);
PlanProto planProto = plan.getPlan();
LinkedList<OperatorProto> protoList = new LinkedList<>();
planProto.getSourcesList().forEach(protoList::addLast);
Map<String, OperatorBase> operators = new HashMap<>();
List<OperatorBase> sinks = new ArrayList<>();
while(! protoList.isEmpty()) {
OperatorProto proto = protoList.pollFirst();
/* Checking if protoOperator can be connected to the current WayangPlan*/
boolean processIt;
if(proto.getType().equals("source")) processIt = true;
else {
/* Checking if ALL predecessors were already processed */
processIt = true;
for(String predecessor : proto.getPredecessorsList()){
if (!operators.containsKey(predecessor)) {
processIt = false;
break;
}
}
}
/* Operators should not be processed twice*/
if(operators.containsKey(proto.getId())) processIt = false;
if(processIt) {
/* Create and store Wayang operator */
OperatorBase operator = createOperatorByType(proto);
operators.put(proto.getId(), operator);
/*TODO Connect with predecessors requires more details in connection slot*/
int order = 0;
for (String pre_id : proto.getPredecessorsList()) {
OperatorBase predecessor = operators.get(pre_id);
/* Only works without replicate topology */
predecessor.connectTo(0, operator, order);
order++;
if(proto.getType().equals("sink")){
sinks.add(operator);
//if(!sinks.contains(operator)) {
// sinks.add(operator);
//}
}
}
/*List of OperatorProto successors
* They will be added to the protoList
* nevertheless they must be processed only if the parents are in operators list */
List<OperatorProto> listSuccessors = planProto.getOperatorsList()
.stream()
.filter(t -> proto.getSuccessorsList().contains(t.getId()))
.collect(Collectors.toList());
for (OperatorProto successor : listSuccessors){
if(!protoList.contains(successor)){
protoList.addLast(successor);
}
}
List<OperatorProto> sinkSuccessors = planProto.getSinksList()
.stream()
.filter(t -> proto.getSuccessorsList().contains(t.getId()))
.collect(Collectors.toList());
for (OperatorProto successor : sinkSuccessors){
if(!protoList.contains(successor)){
protoList.addLast(successor);
}
}
} else {
/* In case we cannot process it yet, It must be added again at the end*/
protoList.addLast(proto);
}
}
WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
return wayangPlan;
}
public OperatorBase createOperatorByType(OperatorProto operator){
System.out.println("Typo: " + operator.getType());
switch(operator.getType()){
case "source":
try {
String source_path = operator.getPath();
URL url = new File(source_path).toURI().toURL();
return new TextFileSource(url.toString());
} catch (MalformedURLException e) {
e.printStackTrace();
}
break;
case "sink":
try {
String sink_path = operator.getPath();
URL url = new File(sink_path).toURI().toURL();
return new TextFileSink<String>(
url.toString(),
String.class
);
} catch (MalformedURLException e) {
e.printStackTrace();
}
break;
case "reduce_by_key":
try {
/* Function to be applied in Python workers */
ByteString function = operator.getUdf();
/* Has dimension or positions that compose GroupKey */
Map<String, String> parameters = operator.getParametersMap();
PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
operator.getParametersMap(),
operator.getUdf() ,
String.class,
String.class,
false
);
String sink_path = operator.getPath();
URL url = new File(sink_path).toURI().toURL();
return new TextFileSink<String>(
url.toString(),
String.class
);
} catch (MalformedURLException e) {
e.printStackTrace();
}
break;
case "map_partition":
return new MapPartitionsOperator<>(
new MapPartitionsDescriptor<String, String>(
new WrappedPythonFunction<String, String>(
l -> l,
operator.getUdf()
),
String.class,
String.class
)
);
case "union":
return new UnionAllOperator<String>(
String.class
);
}
throw new WayangException("Operator Type not supported");
}
public static URI createUri(String resourcePath) {
try {
return Thread.currentThread().getClass().getResource(resourcePath).toURI();
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Illegal URI.", e);
}
}
}