| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import org.apache.mrql.gen.*; |
| import java.util.*; |
| import java.io.*; |
| |
| |
| /** Generate a physical plan from an algebraic expression */ |
| final public class PlanGeneration extends AlgebraicOptimization { |
| |
| /** return the monoid associated with the aggregation e */ |
| private static Tree get_monoid ( Tree e ) { |
| match e { |
| case call(`f,`u): |
| Tree etp = #<none>; |
| match TypeInference.type_inference2(u) { |
| case `S(`tp): etp = tp; |
| case _: return #<none>; |
| }; |
| for ( Tree monoid: monoids ) // system & user-defined aggregations |
| match monoid { |
| case `aggr(`mtp,`plus,`zero,`unit): |
| if (!aggr.equals(f.toString())) |
| continue; |
| if (TypeInference.unify(etp,mtp) == null) |
| continue; |
| return monoid; |
| } |
| }; |
| return #<none>; |
| } |
| |
| /** extract the combiner from the reducer in a MapReduce plan */ |
| static class Aggregates { |
| public static Trees maps = #[]; |
| public static Trees combines = #[]; |
| public static Trees reduces = #[]; |
| public static boolean can_use_combiner = true; |
| private static SymbolTable st = new SymbolTable(); |
| |
| private static void clear () { |
| maps = #[]; |
| combines = #[]; |
| reduces = #[]; |
| can_use_combiner = true; |
| } |
| |
| /** extends the combines, maps, and reduces lists with new entries */ |
| private static int union_aggegates ( Tree reduce, Tree map, Tree combine ) { |
| Tree m = simplify_all(map); |
| Tree c = simplify_all(combine); |
| Tree rd = simplify_all(reduce); |
| int i = 0; |
| for ( Trees r = reduces; !r.is_empty(); r = r.tail(), i++ ) |
| if (alpha_equivalent(rd,r.head())) |
| return i; |
| maps = maps.append(m); |
| reduces = reduces.append(rd); |
| combines = combines.append(subst(#<-1>,#<`i>,c)); |
| return i; |
| } |
| |
| /** Generate the MR combiner from the MR reducer. |
| * Find the aggregation calls (eg, call(avg,_)) in the reducer |
| * @param e the body of the reducer |
| * @param map the map function |
| * @param mvar the variable of the map function |
| * @param rvar the variable of the reducer function |
| * @return the combiner, but also sets the combines, maps, and reduces lists |
| */ |
| private static Tree derive_combiner ( Tree e, Tree map, Tree mvar, Tree rvar ) { |
| Tree gvar = #<nth(`rvar,1)>; |
| match e { |
| case call(`f,`u): |
| match get_monoid(e) { |
| case `nm(`mtp,`plus,`zero,`unit): |
| match u { |
| case cmap(`m,`v): |
| if (!v.equals(gvar) || occurences(rvar,m) > 0) |
| fail; |
| Tree ev = new_var(); |
| Tree nv = new_var(); |
| Tree mv = new_var(); |
| int i = union_aggegates(e, |
| #<aggregate(lambda(`ev,apply(`plus,tuple(nth(`ev,0),apply(`unit,nth(`ev,1))))), |
| `zero,cmap(`m,cmap(lambda(x,bag(nth(x,1))),apply(`map,`mvar))))>, |
| #<aggregate(lambda(`nv,apply(`plus,tuple(nth(`nv,0), |
| nth(nth(`nv,1),-1)))), |
| `zero,`gvar)>); |
| return simplify_all(#<aggregate(lambda(`mv,apply(`plus,tuple(nth(`mv,0), |
| nth(nth(`mv,1),`i)))), |
| `zero,`gvar)>); |
| case `v: |
| if (!v.equals(gvar)) |
| fail; |
| Tree ev = new_var(); |
| Tree nv = new_var(); |
| Tree mv = new_var(); |
| int i = union_aggegates(e, |
| #<aggregate(lambda(`ev,apply(`plus,tuple(nth(`ev,0),apply(`unit,nth(`ev,1))))), |
| `zero,cmap(lambda(x,bag(nth(x,1))),apply(`map,`mvar)))>, |
| #<aggregate(lambda(`nv,apply(`plus,tuple(nth(`nv,0),nth(nth(`nv,1),-1)))), |
| `zero,`gvar)>); |
| return simplify_all(#<aggregate(lambda(`mv,apply(`plus,tuple(nth(`mv,0), |
| nth(nth(`mv,1),`i)))), |
| `zero,`gvar)>); |
| } |
| }; |
| fail |
| case nth(`v,0): |
| if (v.is_variable()) |
| return e; |
| else fail |
| case `f(...al): |
| Trees rs = #[]; |
| for ( Tree a: al ) |
| rs = rs.append(derive_combiner(a,map,mvar,rvar)); |
| return #<`f(...rs)>; |
| }; |
| if (#<nth(`e,1)>.equals(gvar)) |
| Aggregates.can_use_combiner = false; |
| return e; |
| } |
| } |
| |
| /** how many times e accesses the bag x? if it's more than one, it can't be streamed */ |
| private static int number_of_accesses ( Tree x, Tree e ) { |
| if (e.equals(x)) |
| return 1; |
| match e { |
| case cmap(`m,`s): |
| return number_of_accesses(x,m)*10+number_of_accesses(x,s); |
| case map(`m,`s): |
| return number_of_accesses(x,m)*10+number_of_accesses(x,s); |
| case filter(`p,`m,`s): |
| return number_of_accesses(x,p)*10+number_of_accesses(x,m)*10+number_of_accesses(x,s); |
| case `f(...r): |
| int i = 0; |
| for ( Tree z: r ) |
| i += number_of_accesses(x,z); |
| return i; |
| }; |
| return 0; |
| } |
| |
| /** can we process the second arg of the MapReducer reducer (a bag) as a stream? */ |
| public static boolean streamed_MapReduce_reducer ( Tree x ) { |
| match x { |
| case lambda(`v,`b): |
| return number_of_accesses(#<nth(`v,1)>,b) <= 1; |
| case compiled(_,lambda(`v,`b)): |
| return number_of_accesses(#<nth(`v,1)>,b) <= 1; |
| }; |
| return false; |
| } |
| |
| /** can we process the first arg of the MapReducer2 reducer (a bag) as a stream? */ |
| public static boolean streamed_MapReduce2_reducer ( Tree x ) { |
| match x { |
| case lambda(`v,`b): |
| return number_of_accesses(#<nth(`v,0)>,b) <= 1; |
| case compiled(_,lambda(`v,`b)): |
| return number_of_accesses(#<nth(`v,0)>,b) <= 1; |
| }; |
| return false; |
| } |
| |
| /** true if e returns a dataset stored in HDFS */ |
| public static boolean is_dataset_expr ( Tree e ) { |
| match TypeInference.type_inference2(e) { |
| case `T(_): |
| if (is_persistent_collection(T)) |
| return true; |
| }; |
| return false; |
| } |
| |
| private static boolean contains_without ( Tree e, Tree with, Tree without ) { |
| if (e.equals(with)) |
| return false; |
| if (e.equals(without)) |
| return true; |
| match e { |
| case `f(...r): |
| for ( Tree a: r ) |
| if (contains_without(a,with,without)) |
| return true; |
| }; |
| return false; |
| } |
| |
| private static Trees extract_combiners ( Tree e, Tree with, Tree var ) { |
| Trees res = #[ ]; |
| match e { |
| case call(`c,`u): |
| if (contains_without(e,with,var) || occurences(with,e) == 0) |
| fail; |
| if (get_monoid(e).equals(#<none>)) |
| fail; |
| return #[`e]; |
| case `f(...r): |
| for ( Tree a: r ) |
| res = res.append(extract_combiners(a,with,var)); |
| }; |
| return res; |
| } |
| |
| private static Tree get_nested_query ( Tree var, Tree e ) { |
| match e { |
| case MapReduce(lambda(`v,`b),`r,`s,...): |
| if (!repeat_variables.member(s)) |
| fail; |
| if (!free_variables(b,#[`var,`v]).is_empty()) |
| fail; |
| return e; |
| case mapReduce(lambda(`v,`b),`r,`s,...): |
| if (!repeat_variables.member(s)) |
| fail; |
| if (!free_variables(b,#[`var,`v]).is_empty()) |
| fail; |
| return e; |
| case `f(...r): |
| for ( Tree a: r ) |
| match get_nested_query(var,a) { |
| case none: ; |
| case `c: |
| return c; |
| } |
| }; |
| return #<none>; |
| } |
| |
| /** compile an algebraic form to a algebraic plan |
| * @param e the algebraic form |
| * @return the algebraic plan |
| */ |
| public static Tree makePlan ( Tree e ) { |
| match e { |
| // combine groupBy with Join into a groupByJoin (generalized matrix multiplication) |
| case mapReduce(lambda(`vm,bag(`bm)),lambda(`vr,bag(`br)),`s,`o): |
| if (!bm.equals(vm) || !is_dataset_expr(s)) |
| fail; |
| match s { |
| case mapReduce2(lambda(`mvx,bag(tuple(`jx,`mx))), |
| lambda(`mvy,bag(tuple(`jy,`my))), |
| lambda(`v,cmap(lambda(`x,cmap(lambda(`y,bag(tuple(tuple(`gx,`gy),`mxy))), |
| nth(`vx,1))), |
| nth(`vy,0))), |
| `X,`Y,`o2): |
| if (!vx.equals(v) || !vy.equals(v) || !mx.equals(mvx) || !my.equals(mvy)) |
| fail; |
| Tree gxx = gx; |
| Tree gyy = gy; |
| if (free_variables(gx,#[`y]).is_empty() && free_variables(gy,#[`x]).is_empty()) { |
| gxx = gy; |
| gyy = gx; |
| } else if (!free_variables(gx,#[`x]).is_empty() || !free_variables(gy,#[`y]).is_empty()) |
| fail; |
| Trees combiners = extract_combiners(br,#<nth(`vr,1)>,vr); |
| if (combiners.is_empty()) |
| fail; |
| Tree nvc = new_var(); |
| Tree nvr = new_var(); |
| Tree nm = subst(x,#<nth(nth(`nvc,1),0)>,subst(y,#<nth(nth(`nvc,1),1)>,mxy)); |
| int i = 0; |
| Trees accs = #[ ]; |
| Trees zeros = #[ ]; |
| for ( Tree combiner: combiners ) |
| match combiner { |
| case call(_,cmap(lambda(`vc,bag(`u)),_)): |
| match get_monoid(combiner) { |
| case `aggr(`mtp,`plus,`zero,`unit): |
| br = subst(combiner,#<nth(nth(`nvr,1),`i)>,br); |
| zeros = zeros.append(zero); |
| accs = accs.append(#<apply(`plus,tuple(nth(nth(`nvc,0),`i), |
| apply(`unit,apply(lambda(`vc,`u),`nm))))>); |
| i++; |
| } |
| case _: throw new Error("Unrecognized aggregation: "+combiner); |
| }; |
| type_env.insert(nvc.toString(),TypeInference.type_inference(#<tuple(tuple(...zeros),tuple(`x,`y))>)); |
| type_env.insert(nvr.toString(), |
| TypeInference.type_inference(#<tuple(nth(`vr,0),tuple(...zeros))>)); |
| Tree reducer = makePlan(subst(#<nth(`vr,0)>,#<nth(`nvr,0)>,br)); |
| if (!free_variables(reducer,#[`nvr]).is_empty()) |
| fail; |
| Tree accumulator = makePlan(simplify_all(#<tuple(...accs)>)); |
| Tree zero = makePlan(#<tuple(...zeros)>); |
| return #<GroupByJoin(lambda(`mvx,`(makePlan(jx))), |
| lambda(`mvy,`(makePlan(jy))), |
| lambda(`x,`(makePlan(gxx))), |
| lambda(`y,`(makePlan(gyy))), |
| lambda(`nvc,`accumulator), |
| `zero, |
| lambda(`nvr,`reducer), |
| `(makePlan(X)), |
| `(makePlan(Y)),`o)>; |
| }; |
| fail |
| case mapReduce(lambda(`vm,`bm),`nr,`s,`o): |
| if (!is_dataset_expr(s) || Config.bsp_mode) |
| fail; |
| Tree pm = makePlan(bm); |
| match get_nested_query(vm,pm) { |
| case MapReduce(`km,`kr,`ks,`ko): |
| Tree nv = new_var(); |
| pm = subst(#<MapReduce(`km,`kr,`ks,`ko)>, |
| #<mapReduce(`km,`kr,`nv,`ko)>, |
| pm); // must be done in memory |
| TypeInference.global_type_env.insert(nv.toString(),TypeInference.type_inference(ks)); |
| return #<let(`nv,DataSetCollect(`ks), |
| MapReduce(lambda(`vm,`pm), |
| `(makePlan(nr)), |
| `(makePlan(s)), |
| `o))>; |
| case mapReduce(`km,`kr,`ks,`ko): |
| Tree nv = new_var(); |
| pm = subst(ks,nv,pm); |
| TypeInference.global_type_env.insert(nv.toString(),TypeInference.type_inference(ks)); |
| return #<let(`nv,DataSetCollect(`ks), |
| MapReduce(lambda(`vm,`pm), |
| `(makePlan(nr)), |
| `(makePlan(s)), |
| `o))>; |
| }; |
| fail |
| // extract the mapReduce combiner |
| case mapReduce(lambda(`vm,`bm),lambda(`vr,`br),`s,`o): |
| if (!Config.use_combiner || !is_dataset_expr(s)) |
| fail; |
| TypeInference.type_inference(e); |
| Aggregates.clear(); |
| Tree nv = new_var(); |
| match TypeInference.type_inference(bm) { |
| case `S(`tp): |
| if (!is_collection(S)) |
| fail; |
| type_env.insert(nv.toString(),tp); |
| }; |
| Tree rd = Aggregates.derive_combiner(br,#<lambda(`vm,`bm)>,vm,vr); |
| if (!Aggregates.can_use_combiner || Aggregates.reduces.is_empty()) |
| fail; |
| Tree vr2 = new_var(); |
| Tree m = simplify_all(#<lambda(`vm,cmap(lambda(`nv,bag(tuple(nth(`nv,0), |
| tuple(...(Aggregates.maps))))),`bm))>); |
| Tree c = subst(vr,vr2,#<bag(tuple(...(Aggregates.combines)))>); |
| c = simplify_all(#<lambda(`vr2,`c)>); |
| Tree r = simplify_all(#<lambda(`vr,`rd)>); |
| Tree mtp = TypeInference.type_inference(#<bag(tuple(...(Aggregates.maps)))>); |
| Tree rtp = #<tuple(`(TypeInference.type_inference(#<nth(`vr,0)>)),`mtp)>; |
| type_env.insert(vr.toString(),rtp); |
| type_env.insert(vr2.toString(),rtp); |
| TypeInference.type_inference(m); |
| TypeInference.type_inference(c); |
| TypeInference.type_inference(r); |
| Tree combiner = makePlan(c); |
| Tree reducer = makePlan(r); |
| match makePlan(s) { |
| // if the MapCombineReduce input is a join, push the combiner to the join |
| case MapReduce2(`mx,`my,lambda(`rv,`rb),`x,`y,`o2): |
| Tree nr = makePlan(simplify_all(#<lambda(`rv,cmap(`m,`rb))>)); |
| return #<MapReduce(lambda(`vm,bag(`vm)),`reducer, |
| MapCombineReduce2(`mx,`my,`combiner,`nr,`x,`y,`o2),`o)>; |
| case `input: |
| return #<MapCombineReduce(`(makePlan(m)),`combiner,`reducer,`input,`o)>; |
| }; |
| fail |
| case mapReduce(`m,`r,`s,`o): |
| if (is_dataset_expr(s)) |
| return #<MapReduce(`(makePlan(m)), |
| `(makePlan(r)), |
| `(makePlan(s)),`o)>; |
| else fail |
| case mapReduce2(`mx,`my,`r,`x,`y,`o): |
| if (is_dataset_expr(x) && is_dataset_expr(y) && streamed_MapReduce2_reducer(r)) |
| return #<MapReduce2(`(makePlan(mx)), |
| `(makePlan(my)), |
| `(makePlan(r)), |
| `(makePlan(x)), |
| `(makePlan(y)),`o)>; |
| else fail |
| case mapReduce2(`mx,`my,lambda(`v,`b),`x,`y,`o): |
| if (!is_dataset_expr(x) || !is_dataset_expr(y)) |
| fail; |
| // if the join reducer is not streaming, switch the inputs |
| Tree nv = new_var(); |
| Tree nr = subst(#<nth(`v,0)>,#<nth(`nv,1)>, |
| subst(#<nth(`v,1)>,#<nth(`nv,0)>,b)); |
| nr = #<lambda(`nv,`nr)>; |
| type_env.insert(nv.toString(),TypeInference.type_inference(#<tuple(nth(`v,1),nth(`v,0))>)); |
| return #<MapReduce2(`(makePlan(my)), |
| `(makePlan(mx)), |
| `(makePlan(nr)), |
| `(makePlan(y)), |
| `(makePlan(x)),`o)>; |
| case crossProduct(`mx,`my,`r,`x,`y): |
| if (is_dataset_expr(x) && is_dataset_expr(y)) |
| return #<CrossProduct(`(makePlan(mx)), |
| `(makePlan(my)), |
| `(makePlan(r)), |
| `(makePlan(x)), |
| `(makePlan(y)))>; |
| else fail |
| case outerMerge(`m,`x,`y): |
| if (is_dataset_expr(x) && is_dataset_expr(y)) |
| return #<OuterMerge(`(makePlan(m)), |
| `(makePlan(x)), |
| `(makePlan(y)))>; |
| else fail |
| case rightOuterMerge(`m,`x,`y): |
| if (is_dataset_expr(x) && is_dataset_expr(y)) |
| return #<RightOuterMerge(`(makePlan(m)), |
| `(makePlan(x)), |
| `(makePlan(y)))>; |
| else fail |
| case cmap(`m,`s): |
| if (is_dataset_expr(s)) |
| return #<cMap(`(makePlan(m)), |
| `(makePlan(s)))>; |
| else fail |
| case cmap(lambda(`v,if(`p,`T(`u),`S())),`s): |
| if (false && is_collection(T) && is_collection(S)) |
| return makePlan(#<filter(lambda(`v,`p),lambda(`v,`u),`s)>); |
| else fail |
| case call(source,binary,`file,`tp): |
| return #<BinarySource(`file,`tp)>; |
| case call(source,gen,`f,`len,`ulen): |
| return #<SequenceSource(`(makePlan(f)),`(makePlan(len)), |
| `(makePlan(ulen)))>; |
| case call(source,`parser,`file,...args): |
| Trees el = #[]; |
| for ( Tree a: args ) |
| el = el.append(makePlan(a)); |
| return #<ParsedSource(`parser,`(makePlan(file)),...el)>; |
| case call(stream,binary,`file,`tp): |
| return #<BinaryStream(`file,`tp)>; |
| case call(stream,gen,`f,`len,`ulen): |
| return #<SequenceStream(`(makePlan(f)),`(makePlan(len)), |
| `(makePlan(ulen)))>; |
| case call(stream,`parser,`host,`port,...args): |
| if (!port.is_long()) |
| fail; |
| Trees el = #[]; |
| for ( Tree a: args ) |
| el = el.append(makePlan(a)); |
| return #<SocketStream(`parser,`(makePlan(host)),`port,...el)>; |
| case call(stream,`parser,`file,...args): |
| Trees el = #[]; |
| for ( Tree a: args ) |
| el = el.append(makePlan(a)); |
| return #<ParsedStream(`parser,`(makePlan(file)),...el)>; |
| case stream(lambda(`v,`b),`u): |
| return #<Stream(lambda(`v,`(makePlan(b))),`(makePlan(u)))>; |
| case type(`x): return e; |
| case gen(`min,`max,`size): |
| return #<Generator(`(makePlan(min)),`(makePlan(max)),`(makePlan(size)))>; |
| case repeat(lambda(`v,`b),`s,`n): |
| if (!is_dataset_expr(s)) |
| fail; |
| repeat_variables = repeat_variables.cons(v); |
| return #<Repeat(lambda(`v,`(makePlan(b))),`(makePlan(s)), |
| `(makePlan(n)))>; |
| case repeat(lambda(`v,`b),`s): |
| if (!is_dataset_expr(s)) |
| fail; |
| repeat_variables = repeat_variables.cons(v); |
| return #<Repeat(lambda(`v,`(makePlan(b))),`(makePlan(s)),`(Integer.MAX_VALUE))>; |
| case closure(lambda(`v,`b),`s,`n): |
| if (!is_dataset_expr(s)) |
| fail; |
| repeat_variables = repeat_variables.cons(v); |
| return #<Closure(lambda(`v,`(makePlan(b))),`(makePlan(s)), |
| `(makePlan(n)))>; |
| case closure(lambda(`v,`b),`s): |
| if (!is_dataset_expr(s)) |
| fail; |
| repeat_variables = repeat_variables.cons(v); |
| return #<Closure(lambda(`v,`(makePlan(b))),`(makePlan(s)),`(Integer.MAX_VALUE))>; |
| case loop(lambda(tuple(...vs),`b),tuple(...s),`n): |
| int i = 0; |
| for ( Tree v: vs ) |
| if (!is_dataset_expr(s.nth(i))) |
| fail; |
| repeat_variables = repeat_variables.append(vs); |
| return #<Loop(lambda(tuple(...vs),`(makePlan(b))),`(makePlan(#<tuple(...s)>)),`(makePlan(n)))>; |
| case incr(`z,lambda(tuple(`s,`v),`m),`a): |
| repeat_variables = repeat_variables.append(v); |
| return #<incr(`(makePlan(z)),lambda(tuple(`s,`v),`(makePlan(m))),`(makePlan(a)))>; |
| case record(...bl): |
| Trees el = #[]; |
| for ( Tree b: bl ) |
| match b { |
| case bind(_,`c): |
| el = el.append(c); |
| }; |
| return makePlan(#<tuple(...el)>); |
| case project(`x,`a): |
| Tree tp = TypeInference.type_inference(x); |
| if (tp.equals(#<XML>) || tp.equals(global_datatype_env.lookup(#<XML>.toString()))) |
| return makePlan(#<call(XMLchildren,`(new StringLeaf(a.toString())),`x)>); |
| match tp { |
| case `S(`xtp): |
| if (is_collection(S)) |
| if (xtp.equals(#<XML>) || xtp.equals(global_datatype_env.lookup(#<XML>.toString()))) |
| return makePlan(#<call(XMLchildren,`(new StringLeaf(a.toString())),`x)>); |
| }; |
| match TypeInference.expand(tp) { |
| case record(...bl): |
| int i = 0; |
| for ( Tree b: bl ) |
| match b { |
| case bind(`c,_): |
| if (a.equals(c)) |
| return makePlan(#<nth(`x,`i)>); |
| else fail |
| case _: i++; |
| }; |
| case union(...tl): |
| for ( Tree t: tl ) |
| match TypeInference.expand(t) { |
| case _(record(...bl)): |
| int i = 0; |
| for ( Tree b: bl ) |
| match b { |
| case bind(`c,_): |
| if (a.equals(c)) |
| return makePlan(#<nth(union_value(`x),`i)>); |
| else fail |
| case _: i++; |
| }; |
| case _(bag(tuple(string,`tv))): |
| return #<map_index(`(makePlan(#<union_value(`x)>)), |
| `(new StringLeaf(a.toString())))>; |
| case `tt: error("wrong projection: "+e+" ("+tt+")"); |
| }; |
| case `S(`ttp): |
| if (!is_collection(S)) |
| fail; |
| match TypeInference.expand(ttp) { |
| case tuple(string,`tv): |
| return #<map_index(`(makePlan(x)), |
| `(new StringLeaf(a.toString())))>; |
| case record(...bl): |
| Tree nv = new_var(); |
| int i = 0; |
| for ( Tree b: bl ) |
| match b { |
| case bind(`c,_): |
| if (a.equals(c)) |
| return makePlan(#<cmap(lambda(`nv,nth(`nv,`i)),`x)>); |
| else fail |
| case _: i++; |
| }; |
| case union(...tl): |
| Tree nv = new_var(); |
| int j = 0; |
| for ( Tree t: tl ) { |
| match t { |
| case `c(record(...bl)): |
| int i = 0; |
| for ( Tree b: bl ) |
| match b { |
| case bind(`w,_): |
| if (a.equals(w)) |
| return makePlan(#<cmap(lambda(`nv,if(call(eq,union_tag(`nv),`j), |
| bag(nth(union_value(`nv),`i)), |
| bag())),`x)>); |
| else fail |
| case _: i++; |
| }; |
| case `c(bag(tuple(string,`tv))): |
| return makePlan(#<cmap(lambda(`nv,if(call(eq,union_tag(`nv),`j), |
| bag(map_index(union_value(`nv), |
| `(new StringLeaf(a.toString())))), |
| bag())),`x)>); |
| case `tt: error("wrong projection: "+e+" ("+tt+")"); |
| }; |
| j++; |
| } |
| }; |
| case `t: error("wrong projection: "+e+" ("+t+")"); |
| }; |
| case typed(`x,`tp): |
| if (tp.is_variable() && !tp.equals(#<string>) && MRContainer.type_code(tp.toString()) >= 0) |
| return makePlan(#<call(coerce,`x, |
| `(MRContainer.type_code(tp.toString())))>); |
| else fail |
| case index(`x,`n): |
| match TypeInference.type_inference2(x) { |
| case bag(tuple(`kt,`vt)): |
| if (TypeInference.type_inference2(n).equals(kt)) |
| return #<map_index(`(makePlan(x)), |
| `(makePlan(n)))>; |
| case Bag(tuple(`kt,`vt)): |
| if (TypeInference.type_inference2(n).equals(kt)) |
| return #<map_index(`(makePlan(x)), |
| `(makePlan(n)))>; |
| case union(...tl): |
| for ( Tree t: tl ) |
| match TypeInference.expand(t) { |
| case _(bag(tuple(`kt,`vt))): |
| if (TypeInference.type_inference2(n).equals(kt)) |
| return #<map_index(`(makePlan(#<union_value(`x)>)), |
| `(makePlan(n)))>; |
| case _(list(`tp)): |
| return #<index(`(makePlan(#<union_value(`x)>)), |
| `(makePlan(n)))>; |
| case `tt: error("wrong indexing: "+e+" ("+tt+")"); |
| }; |
| }; |
| return #<index(`(makePlan(x)), |
| `(makePlan(n)))>; |
| case call(count,mapReduce(`m,lambda(`vr,`br),`X,`o)): |
| Tree nv = new_var(); |
| type_env.insert(nv.toString(),TypeInference.type_inference(vr)); |
| Tree nr = simplify(#<lambda(`nv,bag(call(count,`(subst(vr,nv,br)))))>); |
| Tree plan = #<call(sum,mapReduce(`m,`nr,`X,false))>; |
| return makePlan(plan); |
| case call(`f,`u): |
| for ( Tree monoid: monoids ) |
| match monoid { |
| case `aggr(`mtp,`plus,`zero,`unit): |
| if (aggr.equals(f.toString())) { |
| Tree plan = makePlan(u); |
| Tree nx = new_var(); |
| Tree np = new_var(); |
| Tree na = new_var(); |
| Tree tp = TypeInference.type_inference(e); |
| type_env.insert(np.toString(),#<tuple(`tp,`tp)>); |
| match TypeInference.type_inference(u) { |
| case `T(`t): |
| if (TypeInference.unify(mtp,t) == null) |
| continue; |
| type_env.insert(na.toString(),#<tuple(`tp,`t)>); |
| type_env.insert(nx.toString(),t); |
| }; |
| plus = makePlan(simplify_all(#<lambda(`np,apply(`plus,`np))>)); |
| Tree acc = makePlan(simplify_all(#<lambda(`na,apply(`plus,tuple(nth(`na,0), |
| apply(`unit,nth(`na,1)))))>)); |
| zero = makePlan((f.equals(#<avg>)) ? zero : #<typed(`zero,`tp)>); |
| match plan { |
| case MapCombineReduce(`m,`c,`r,`s,_): |
| plan = #<MapAggregateReduce(`m,`r,`acc,`zero,`s,false)>; |
| case MapReduce(`m,`r,`s,_): |
| plan = #<MapAggregateReduce(`m,`r,`acc,`zero,`s,false)>; |
| case MapReduce2(`mx,`my,`r,`x,`y,_): |
| plan = #<MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,false)>; |
| case MapJoin(`kx,`ky,`r,`x,`y): |
| plan = #<MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y)>; |
| case CrossProduct(`mx,`my,`r,`x,`y): |
| plan = #<CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y)>; |
| case cMap(`m,`s): |
| plan = #<AggregateMap(`m,`acc,`zero,`s)>; |
| case _: |
| if (is_dataset_expr(u)) |
| plan = #<AggregateMap(lambda(`nx,bag(`nx)),`acc,`zero,`plan)>; |
| else return #<aggregate(`acc,`zero,`plan)>; |
| }; |
| if (is_dataset_expr(u)) |
| return #<Aggregate(`plus,`zero,`plan)>; |
| else return #<aggregate(`plus,`zero,`plan)>; |
| } |
| }; |
| fail |
| case call(plus,`x,`y): |
| if (!is_dataset_expr(x) || !is_dataset_expr(y)) |
| fail; |
| return #<Merge(`(makePlan(x)),`(makePlan(y)))>; |
| case trace(`msg,`tp,`x): |
| return #<trace(`msg,`tp,`(makePlan(x)))>; |
| case call(`f,...el): |
| if (!f.is_variable()) |
| fail; |
| Tree ret = data_constructors.lookup(f.toString()); |
| if (ret != null) |
| match ret { |
| case `v(`n,`tp): |
| Tree p = (el.length()==1) |
| ? makePlan(el.head()) |
| : makePlan(#<tuple(...el)>); |
| return #<tagged_union(`n,`p)>; |
| }; |
| ret = global_type_env.lookup(f.toString()); |
| if (ret != null) |
| match ret { |
| case arrow(_,_): |
| Trees args = #[]; |
| for ( Tree a: el ) |
| args = args.append(makePlan(a)); |
| return #<apply(`f,tuple(...args))>; |
| }; |
| Trees tps = #[]; |
| for ( Tree a: el ) |
| tps = tps.append(TypeInference.type_inference(a)); |
| int i = ClassImporter.find_method_number(f.toString(),tps); |
| if (i < 0) |
| error("Method "+f+tps+" has no implementation"); |
| Trees sig = ClassImporter.signature(i); |
| Trees args = #[]; |
| for ( int j = 0; j < el.length(); j++ ) { |
| Tree b = sig.nth(j+1); |
| if (f.equals(#<coerce>) || b.equals(tps.nth(j)) || !b.is_variable() |
| || b.equals(#<union>) || MRContainer.type_code(b.toString()) < 0) |
| args = args.append(makePlan(el.nth(j))); |
| else args = args.append(makePlan(#<call(coerce,`(el.nth(j)), |
| `(MRContainer.type_code(b.toString())))>)); |
| }; |
| return #<callM(`f,`i,...args)>; |
| case let(`v,`u,`body): |
| if (true) |
| fail; // disable |
| body = makePlan(body); |
| match TypeInference.type_inference(u) { |
| case `S(_): |
| // if a variable bound to a collection is used more than once in the body, |
| // materialize the collection in memory |
| if (is_collection(S) && occurences(v,body) > 1) |
| body = #<let(`v,`(makePlan(#<call(materialize,`v)>)),`body)>; |
| }; |
| return #<let(`v,`(makePlan(u)),`body)>; |
| case function(tuple(...params),`outp,`body): |
| boolean is_dataset = false; |
| for ( Tree p: params ) |
| match p { |
| case dataset(`v,`tp): |
| is_dataset = true; |
| }; |
| body = makePlan(body); |
| return #<function(tuple(...params),`outp,`body)>; |
| case provenance(`x,...s): |
| Tree px = makePlan(x); |
| if (is_dataset_expr(x)) |
| return #<Provenance(`px,...s)>; |
| else return #<provenance(`px,...s)>; |
| case `f(...al): |
| Trees bl = #[]; |
| for ( Tree a: al ) |
| bl = bl.append(makePlan(a)); |
| return #<`f(...bl)>; |
| }; |
| return e; |
| } |
| |
| /** convert an algebraic plan to a physical plan |
| * @param plan algebraic plan |
| * @return the physical plan |
| */ |
| public static Tree physical_plan ( Tree plan ) { |
| match plan { |
| case MapReduce2(`mx,`my,`r,`x,`y,`o): |
| return physical_plan(#<MapAggregateReduce2(`mx,`my,`r,null,null,`x,`y,`o)>); |
| case MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,`x,`y,false): |
| // dissable mapjoins in Flink and Spark mode, because to select between a mapjoin |
| // and a coGroup at run-time, it needs to know the input sizes |
| if (!Config.flink_mode && !Config.spark_mode) |
| fail; |
| return #<MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero, |
| `(physical_plan(x)), |
| `(physical_plan(y)),false)>; |
| // convert a reduce-side join to a fragment-replicate join, if either of the join |
| // inputs is small to fit in memory (dataset sizes are extracted from file statistics) |
| case MapAggregateReduce2(`mx,`my, |
| lambda(`v,cmap(lambda(`vx,cmap(lambda(`vy,`b),nth(`nx,1))),nth(`ny,0))), |
| null,null,`x,`y,false): |
| if (Config.noMapJoin) |
| fail; |
| if (!nx.equals(v) || !ny.equals(v) || occurences(v,b) > 0) |
| fail; |
| Tree X = new_var(); |
| Tree Y = new_var(); |
| Tree nv = new_var(); |
| Tree L1 = simplify_all(#<lambda(`nv,cmap(lambda(`vx,cmap(lambda(`vy,`b),nth(`nv,1))),bag(nth(`nv,0))))>); |
| nv = new_var(); |
| Tree L2 = simplify_all(#<lambda(`nv,cmap(lambda(`vx,cmap(lambda(`vy,`b),bag(nth(`nv,0)))),nth(`nv,1)))>); |
| nv = new_var(); |
| Tree L3 = #<lambda(`nv,cmap(lambda(`vx,cmap(lambda(`vy,`b),nth(`nv,0))),nth(`nv,1)))>; |
| Tree cond1 = makePlan(#<call(leq,dataset_size(`Y),`(Config.mapjoin_size))>); |
| Tree cond2 = makePlan(#<call(leq,dataset_size(`X),`(Config.mapjoin_size))>); |
| Tree cond3 = makePlan(#<call(lt,dataset_size(`X),dataset_size(`Y))>); |
| return #<Let(`X,`(physical_plan(x)),Let(`Y,`(physical_plan(y)), |
| If(`cond1, |
| MapJoin(`mx,`my,`L1,`X,`Y), |
| If(`cond2, |
| MapJoin(`my,`mx,`L2,`Y,`X), |
| If(`cond3, |
| MapAggregateReduce2(`my,`mx,`L3,null,null,`Y,`X,false), |
| MapAggregateReduce2(`mx,`my, |
| lambda(`v,cmap(lambda(`vx,cmap(lambda(`vy,`b),nth(`nx,1))), |
| nth(`ny,0))), |
| null,null,`X,`Y,false))))))>; |
| case MapAggregateReduce2(`mx,`my, |
| lambda(`v,cmap(lambda(`vy,cmap(lambda(`vx,`b),nth(`ny,0))),nth(`nx,1))), |
| null,null,`x,`y,false): |
| if (Config.noMapJoin) |
| fail; |
| if (!nx.equals(v) || !ny.equals(v) || occurences(v,b) > 0) |
| fail; |
| return physical_plan(#<MapAggregateReduce2(`mx,`my, |
| lambda(`v,cmap(lambda(`vx,cmap(lambda(`vy,`b),nth(`nx,1))),nth(`ny,0))), |
| null,null,`x,`y,false)>); |
| case xxxMapAggregateReduce2(`mx,`my,lambda(`v,`b),null,null,`x,`y,false): |
| if (Config.noMapJoin) |
| fail; |
| Tree X = new_var(); |
| Tree Y = new_var(); |
| Tree nv = new_var(); |
| Tree L1 = simplify_all(subst(#<nth(`v,0)>,#<bag(nth(`nv,0))>, |
| subst(#<nth(`v,1)>,#<nth(`nv,1)>, |
| #<lambda(`nv,`b)>))); |
| nv = new_var(); |
| Tree L2 = subst(#<nth(`v,0)>,#<nth(`nv,1)>, |
| subst(#<nth(`v,1)>,#<nth(`nv,0)>, |
| #<lambda(`nv,`b)>)); |
| Tree cond1 = makePlan(#<call(leq,dataset_size(`Y),`(Config.mapjoin_size))>); |
| Tree cond2 = makePlan(#<call(lt,dataset_size(`X),dataset_size(`Y))>); |
| return #<Let(`X,`(physical_plan(x)),Let(`Y,`(physical_plan(y)), |
| If(`cond1, |
| MapJoin(`mx,`my,`L1,`X,`Y), |
| If(`cond2, |
| MapAggregateReduce2(`my,`mx,`L2,null,null,`Y,`X,false), |
| MapAggregateReduce2(`mx,`my,lambda(`v,`b),null,null,`X,`Y,false)))))>; |
| case MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,`x,`y,false): |
| Tree X = new_var(); |
| Tree Y = new_var(); |
| Tree nv = new_var(); |
| Tree nr = subst(#<nth(`v,0)>,#<nth(`nv,1)>, |
| subst(#<nth(`v,1)>,#<nth(`nv,0)>,b)); |
| if (!streamed_MapReduce2_reducer(#<lambda(`nv,`nr)>)) |
| fail; |
| Tree cond = makePlan(#<call(lt,dataset_size(`X),dataset_size(`Y))>); |
| return #<Let(`X,`(physical_plan(x)),Let(`Y,`(physical_plan(y)), |
| If(`cond, |
| MapAggregateReduce2(`my,`mx,lambda(`nv,`nr),`acc,`zero,`Y,`X,false), |
| MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,`X,`Y,false))))>; |
| case CrossProduct(`mx,`my,`r,`x,`y): |
| return physical_plan(#<CrossAggregateProduct(`mx,`my,`r,null,null,`x,`y)>); |
| case CrossAggregateProduct(`mx,`my,lambda(`v,`b),`acc,`zero,`x,`y): |
| if (Config.flink_mode || Config.spark_mode) |
| fail; |
| Tree X = new_var(); |
| Tree Y = new_var(); |
| Tree nv = new_var(); |
| Tree nr = subst(#<nth(`v,0)>,#<nth(`nv,1)>, |
| subst(#<nth(`v,1)>,#<nth(`nv,0)>,b)); |
| Tree cond = makePlan(#<call(lt,dataset_size(`X),dataset_size(`Y))>); |
| return #<Let(`X,`(physical_plan(x)),Let(`Y,`(physical_plan(y)), |
| If(`cond, |
| CrossAggregateProduct(`my,`mx,lambda(`nv,`nr),`acc,`zero,`Y,`X), |
| CrossAggregateProduct(`mx,`my,lambda(`v,`b),`acc,`zero,`X,`Y))))>; |
| case `f(...al): |
| Trees bl = #[]; |
| for ( Tree a: al ) |
| bl = bl.append(physical_plan(a)); |
| return #<`f(...bl)>; |
| }; |
| return plan; |
| } |
| } |