blob: fdb148f4016e7708f804bd2b558debaf1b43a48f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.util.*;
import java.io.*;
/** Generate a physical plan from an algebraic expression */
final public class PlanGeneration extends AlgebraicOptimization {
/** return the monoid associated with the aggregation e */
private static Tree get_monoid ( Tree e ) {
match e {
case call(`f,`u):
Tree etp = #<none>;
match TypeInference.type_inference2(u) {
case `S(`tp): etp = tp;
case _: return #<none>;
};
for ( Tree monoid: monoids ) // system & user-defined aggregations
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
if (!aggr.equals(f.toString()))
continue;
if (TypeInference.unify(etp,mtp) == null)
continue;
return monoid;
}
};
return #<none>;
}
/** extract the combiner from the reducer in a MapReduce plan */
static class Aggregates {
public static Trees maps = #[];
public static Trees combines = #[];
public static Trees reduces = #[];
public static boolean can_use_combiner = true;
private static SymbolTable st = new SymbolTable();
private static void clear () {
maps = #[];
combines = #[];
reduces = #[];
can_use_combiner = true;
}
/** extends the combines, maps, and reduces lists with new entries */
private static int union_aggegates ( Tree reduce, Tree map, Tree combine ) {
Tree m = simplify_all(map);
Tree c = simplify_all(combine);
Tree rd = simplify_all(reduce);
int i = 0;
for ( Trees r = reduces; !r.is_empty(); r = r.tail(), i++ )
if (alpha_equivalent(rd,r.head()))
return i;
maps = maps.append(m);
reduces = reduces.append(rd);
combines = combines.append(subst(#<-1>,#<`i>,c));
return i;
}
/** Generate the MR combiner from the MR reducer.
* Find the aggregation calls (eg, call(avg,_)) in the reducer
* @param e the body of the reducer
* @param map the map function
* @param mvar the variable of the map function
* @param rvar the variable of the reducer function
* @return the combiner, but also sets the combines, maps, and reduces lists
*/
private static Tree derive_combiner ( Tree e, Tree map, Tree mvar, Tree rvar ) {
Tree gvar = #<nth(`rvar,1)>;
match e {
case call(`f,`u):
match get_monoid(e) {
case `nm(`mtp,`plus,`zero,`unit):
match u {
case cmap(`m,`v):
if (!v.equals(gvar) || occurences(rvar,m) > 0)
fail;
Tree ev = new_var();
Tree nv = new_var();
Tree mv = new_var();
int i = union_aggegates(e,
#<aggregate(lambda(`ev,apply(`plus,tuple(nth(`ev,0),apply(`unit,nth(`ev,1))))),
`zero,cmap(`m,cmap(lambda(x,bag(nth(x,1))),apply(`map,`mvar))))>,
#<aggregate(lambda(`nv,apply(`plus,tuple(nth(`nv,0),
nth(nth(`nv,1),-1)))),
`zero,`gvar)>);
return simplify_all(#<aggregate(lambda(`mv,apply(`plus,tuple(nth(`mv,0),
nth(nth(`mv,1),`i)))),
`zero,`gvar)>);
case `v:
if (!v.equals(gvar))
fail;
Tree ev = new_var();
Tree nv = new_var();
Tree mv = new_var();
int i = union_aggegates(e,
#<aggregate(lambda(`ev,apply(`plus,tuple(nth(`ev,0),apply(`unit,nth(`ev,1))))),
`zero,cmap(lambda(x,bag(nth(x,1))),apply(`map,`mvar)))>,
#<aggregate(lambda(`nv,apply(`plus,tuple(nth(`nv,0),nth(nth(`nv,1),-1)))),
`zero,`gvar)>);
return simplify_all(#<aggregate(lambda(`mv,apply(`plus,tuple(nth(`mv,0),
nth(nth(`mv,1),`i)))),
`zero,`gvar)>);
}
};
fail
case nth(`v,0):
if (v.is_variable())
return e;
else fail
case `f(...al):
Trees rs = #[];
for ( Tree a: al )
rs = rs.append(derive_combiner(a,map,mvar,rvar));
return #<`f(...rs)>;
};
if (#<nth(`e,1)>.equals(gvar))
Aggregates.can_use_combiner = false;
return e;
}
}
/** how many times e accesses the bag x? if it's more than one, it can't be streamed */
private static int number_of_accesses ( Tree x, Tree e ) {
if (e.equals(x))
return 1;
match e {
case cmap(`m,`s):
return number_of_accesses(x,m)*10+number_of_accesses(x,s);
case map(`m,`s):
return number_of_accesses(x,m)*10+number_of_accesses(x,s);
case filter(`p,`m,`s):
return number_of_accesses(x,p)*10+number_of_accesses(x,m)*10+number_of_accesses(x,s);
case `f(...r):
int i = 0;
for ( Tree z: r )
i += number_of_accesses(x,z);
return i;
};
return 0;
}
/** can we process the second arg of the MapReducer reducer (a bag) as a stream? */
public static boolean streamed_MapReduce_reducer ( Tree x ) {
match x {
case lambda(`v,`b):
return number_of_accesses(#<nth(`v,1)>,b) <= 1;
case compiled(_,lambda(`v,`b)):
return number_of_accesses(#<nth(`v,1)>,b) <= 1;
};
return false;
}
/** can we process the first arg of the MapReducer2 reducer (a bag) as a stream? */
public static boolean streamed_MapReduce2_reducer ( Tree x ) {
match x {
case lambda(`v,`b):
return number_of_accesses(#<nth(`v,0)>,b) <= 1;
case compiled(_,lambda(`v,`b)):
return number_of_accesses(#<nth(`v,0)>,b) <= 1;
};
return false;
}
/** true if e returns a dataset stored in HDFS */
public static boolean is_dataset_expr ( Tree e ) {
match TypeInference.type_inference2(e) {
case `T(_):
if (is_persistent_collection(T))
return true;
};
return false;
}
private static boolean contains_without ( Tree e, Tree with, Tree without ) {
if (e.equals(with))
return false;
if (e.equals(without))
return true;
match e {
case `f(...r):
for ( Tree a: r )
if (contains_without(a,with,without))
return true;
};
return false;
}
private static Trees extract_combiners ( Tree e, Tree with, Tree var ) {
Trees res = #[ ];
match e {
case call(`c,`u):
if (contains_without(e,with,var) || occurences(with,e) == 0)
fail;
if (get_monoid(e).equals(#<none>))
fail;
return #[`e];
case `f(...r):
for ( Tree a: r )
res = res.append(extract_combiners(a,with,var));
};
return res;
}
private static Tree get_nested_query ( Tree var, Tree e ) {
match e {
case MapReduce(lambda(`v,`b),`r,`s,...):
if (!repeat_variables.member(s))
fail;
if (!free_variables(b,#[`var,`v]).is_empty())
fail;
return e;
case mapReduce(lambda(`v,`b),`r,`s,...):
if (!repeat_variables.member(s))
fail;
if (!free_variables(b,#[`var,`v]).is_empty())
fail;
return e;
case `f(...r):
for ( Tree a: r )
match get_nested_query(var,a) {
case none: ;
case `c:
return c;
}
};
return #<none>;
}
/** compile an algebraic form to a algebraic plan
* @param e the algebraic form
* @return the algebraic plan
*/
public static Tree makePlan ( Tree e ) {
match e {
// combine groupBy with Join into a groupByJoin (generalized matrix multiplication)
case mapReduce(lambda(`vm,bag(`bm)),lambda(`vr,bag(`br)),`s,`o):
if (!bm.equals(vm) || !is_dataset_expr(s))
fail;
match s {
case mapReduce2(lambda(`mvx,bag(tuple(`jx,`mx))),
lambda(`mvy,bag(tuple(`jy,`my))),
lambda(`v,cmap(lambda(`x,cmap(lambda(`y,bag(tuple(tuple(`gx,`gy),`mxy))),
nth(`vx,1))),
nth(`vy,0))),
`X,`Y,`o2):
if (!vx.equals(v) || !vy.equals(v) || !mx.equals(mvx) || !my.equals(mvy))
fail;
Tree gxx = gx;
Tree gyy = gy;
if (free_variables(gx,#[`y]).is_empty() && free_variables(gy,#[`x]).is_empty()) {
gxx = gy;
gyy = gx;
} else if (!free_variables(gx,#[`x]).is_empty() || !free_variables(gy,#[`y]).is_empty())
fail;
Trees combiners = extract_combiners(br,#<nth(`vr,1)>,vr);
if (combiners.is_empty())
fail;
Tree nvc = new_var();
Tree nvr = new_var();
Tree nm = subst(x,#<nth(nth(`nvc,1),0)>,subst(y,#<nth(nth(`nvc,1),1)>,mxy));
int i = 0;
Trees accs = #[ ];
Trees zeros = #[ ];
for ( Tree combiner: combiners )
match combiner {
case call(_,cmap(lambda(`vc,bag(`u)),_)):
match get_monoid(combiner) {
case `aggr(`mtp,`plus,`zero,`unit):
br = subst(combiner,#<nth(nth(`nvr,1),`i)>,br);
zeros = zeros.append(zero);
accs = accs.append(#<apply(`plus,tuple(nth(nth(`nvc,0),`i),
apply(`unit,apply(lambda(`vc,`u),`nm))))>);
i++;
}
case _: throw new Error("Unrecognized aggregation: "+combiner);
};
type_env.insert(nvc.toString(),TypeInference.type_inference(#<tuple(tuple(...zeros),tuple(`x,`y))>));
type_env.insert(nvr.toString(),
TypeInference.type_inference(#<tuple(nth(`vr,0),tuple(...zeros))>));
Tree reducer = makePlan(subst(#<nth(`vr,0)>,#<nth(`nvr,0)>,br));
if (!free_variables(reducer,#[`nvr]).is_empty())
fail;
Tree accumulator = makePlan(simplify_all(#<tuple(...accs)>));
Tree zero = makePlan(#<tuple(...zeros)>);
return #<GroupByJoin(lambda(`mvx,`(makePlan(jx))),
lambda(`mvy,`(makePlan(jy))),
lambda(`x,`(makePlan(gxx))),
lambda(`y,`(makePlan(gyy))),
lambda(`nvc,`accumulator),
`zero,
lambda(`nvr,`reducer),
`(makePlan(X)),
`(makePlan(Y)),`o)>;
};
fail
case mapReduce(lambda(`vm,`bm),`nr,`s,`o):
if (!is_dataset_expr(s) || Config.bsp_mode)
fail;
Tree pm = makePlan(bm);
match get_nested_query(vm,pm) {
case MapReduce(`km,`kr,`ks,`ko):
Tree nv = new_var();
pm = subst(#<MapReduce(`km,`kr,`ks,`ko)>,
#<mapReduce(`km,`kr,`nv,`ko)>,
pm); // must be done in memory
TypeInference.global_type_env.insert(nv.toString(),TypeInference.type_inference(ks));
return #<let(`nv,DataSetCollect(`ks),
MapReduce(lambda(`vm,`pm),
`(makePlan(nr)),
`(makePlan(s)),
`o))>;
case mapReduce(`km,`kr,`ks,`ko):
Tree nv = new_var();
pm = subst(ks,nv,pm);
TypeInference.global_type_env.insert(nv.toString(),TypeInference.type_inference(ks));
return #<let(`nv,DataSetCollect(`ks),
MapReduce(lambda(`vm,`pm),
`(makePlan(nr)),
`(makePlan(s)),
`o))>;
};
fail
// extract the mapReduce combiner
case mapReduce(lambda(`vm,`bm),lambda(`vr,`br),`s,`o):
if (!Config.use_combiner || !is_dataset_expr(s))
fail;
TypeInference.type_inference(e);
Aggregates.clear();
Tree nv = new_var();
match TypeInference.type_inference(bm) {
case `S(`tp):
if (!is_collection(S))
fail;
type_env.insert(nv.toString(),tp);
};
Tree rd = Aggregates.derive_combiner(br,#<lambda(`vm,`bm)>,vm,vr);
if (!Aggregates.can_use_combiner || Aggregates.reduces.is_empty())
fail;
Tree vr2 = new_var();
Tree m = simplify_all(#<lambda(`vm,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
tuple(...(Aggregates.maps))))),`bm))>);
Tree c = subst(vr,vr2,#<bag(tuple(...(Aggregates.combines)))>);
c = simplify_all(#<lambda(`vr2,`c)>);
Tree r = simplify_all(#<lambda(`vr,`rd)>);
Tree mtp = TypeInference.type_inference(#<bag(tuple(...(Aggregates.maps)))>);
Tree rtp = #<tuple(`(TypeInference.type_inference(#<nth(`vr,0)>)),`mtp)>;
type_env.insert(vr.toString(),rtp);
type_env.insert(vr2.toString(),rtp);
TypeInference.type_inference(m);
TypeInference.type_inference(c);
TypeInference.type_inference(r);
Tree combiner = makePlan(c);
Tree reducer = makePlan(r);
match makePlan(s) {
// if the MapCombineReduce input is a join, push the combiner to the join
case MapReduce2(`mx,`my,lambda(`rv,`rb),`x,`y,`o2):
Tree nr = makePlan(simplify_all(#<lambda(`rv,cmap(`m,`rb))>));
return #<MapReduce(lambda(`vm,bag(`vm)),`reducer,
MapCombineReduce2(`mx,`my,`combiner,`nr,`x,`y,`o2),`o)>;
case `input:
return #<MapCombineReduce(`(makePlan(m)),`combiner,`reducer,`input,`o)>;
};
fail
case mapReduce(`m,`r,`s,`o):
if (is_dataset_expr(s))
return #<MapReduce(`(makePlan(m)),
`(makePlan(r)),
`(makePlan(s)),`o)>;
else fail
case mapReduce2(`mx,`my,`r,`x,`y,`o):
if (is_dataset_expr(x) && is_dataset_expr(y) && streamed_MapReduce2_reducer(r))
return #<MapReduce2(`(makePlan(mx)),
`(makePlan(my)),
`(makePlan(r)),
`(makePlan(x)),
`(makePlan(y)),`o)>;
else fail
case mapReduce2(`mx,`my,lambda(`v,`b),`x,`y,`o):
if (!is_dataset_expr(x) || !is_dataset_expr(y))
fail;
// if the join reducer is not streaming, switch the inputs
Tree nv = new_var();
Tree nr = subst(#<nth(`v,0)>,#<nth(`nv,1)>,
subst(#<nth(`v,1)>,#<nth(`nv,0)>,b));
nr = #<lambda(`nv,`nr)>;
type_env.insert(nv.toString(),TypeInference.type_inference(#<tuple(nth(`v,1),nth(`v,0))>));
return #<MapReduce2(`(makePlan(my)),
`(makePlan(mx)),
`(makePlan(nr)),
`(makePlan(y)),
`(makePlan(x)),`o)>;
case crossProduct(`mx,`my,`r,`x,`y):
if (is_dataset_expr(x) && is_dataset_expr(y))
return #<CrossProduct(`(makePlan(mx)),
`(makePlan(my)),
`(makePlan(r)),
`(makePlan(x)),
`(makePlan(y)))>;
else fail
case outerMerge(`m,`x,`y):
if (is_dataset_expr(x) && is_dataset_expr(y))
return #<OuterMerge(`(makePlan(m)),
`(makePlan(x)),
`(makePlan(y)))>;
else fail
case rightOuterMerge(`m,`x,`y):
if (is_dataset_expr(x) && is_dataset_expr(y))
return #<RightOuterMerge(`(makePlan(m)),
`(makePlan(x)),
`(makePlan(y)))>;
else fail
case cmap(`m,`s):
if (is_dataset_expr(s))
return #<cMap(`(makePlan(m)),
`(makePlan(s)))>;
else fail
case cmap(lambda(`v,if(`p,`T(`u),`S())),`s):
if (false && is_collection(T) && is_collection(S))
return makePlan(#<filter(lambda(`v,`p),lambda(`v,`u),`s)>);
else fail
case call(source,binary,`file,`tp):
return #<BinarySource(`file,`tp)>;
case call(source,gen,`f,`len,`ulen):
return #<SequenceSource(`(makePlan(f)),`(makePlan(len)),
`(makePlan(ulen)))>;
case call(source,`parser,`file,...args):
Trees el = #[];
for ( Tree a: args )
el = el.append(makePlan(a));
return #<ParsedSource(`parser,`(makePlan(file)),...el)>;
case call(stream,binary,`file,`tp):
return #<BinaryStream(`file,`tp)>;
case call(stream,gen,`f,`len,`ulen):
return #<SequenceStream(`(makePlan(f)),`(makePlan(len)),
`(makePlan(ulen)))>;
case call(stream,`parser,`host,`port,...args):
if (!port.is_long())
fail;
Trees el = #[];
for ( Tree a: args )
el = el.append(makePlan(a));
return #<SocketStream(`parser,`(makePlan(host)),`port,...el)>;
case call(stream,`parser,`file,...args):
Trees el = #[];
for ( Tree a: args )
el = el.append(makePlan(a));
return #<ParsedStream(`parser,`(makePlan(file)),...el)>;
case stream(lambda(`v,`b),`u):
return #<Stream(lambda(`v,`(makePlan(b))),`(makePlan(u)))>;
case type(`x): return e;
case gen(`min,`max,`size):
return #<Generator(`(makePlan(min)),`(makePlan(max)),`(makePlan(size)))>;
case repeat(lambda(`v,`b),`s,`n):
if (!is_dataset_expr(s))
fail;
repeat_variables = repeat_variables.cons(v);
return #<Repeat(lambda(`v,`(makePlan(b))),`(makePlan(s)),
`(makePlan(n)))>;
case repeat(lambda(`v,`b),`s):
if (!is_dataset_expr(s))
fail;
repeat_variables = repeat_variables.cons(v);
return #<Repeat(lambda(`v,`(makePlan(b))),`(makePlan(s)),`(Integer.MAX_VALUE))>;
case closure(lambda(`v,`b),`s,`n):
if (!is_dataset_expr(s))
fail;
repeat_variables = repeat_variables.cons(v);
return #<Closure(lambda(`v,`(makePlan(b))),`(makePlan(s)),
`(makePlan(n)))>;
case closure(lambda(`v,`b),`s):
if (!is_dataset_expr(s))
fail;
repeat_variables = repeat_variables.cons(v);
return #<Closure(lambda(`v,`(makePlan(b))),`(makePlan(s)),`(Integer.MAX_VALUE))>;
case loop(lambda(tuple(...vs),`b),tuple(...s),`n):
int i = 0;
for ( Tree v: vs )
if (!is_dataset_expr(s.nth(i)))
fail;
repeat_variables = repeat_variables.append(vs);
return #<Loop(lambda(tuple(...vs),`(makePlan(b))),`(makePlan(#<tuple(...s)>)),`(makePlan(n)))>;
case incr(`z,lambda(tuple(`s,`v),`m),`a):
repeat_variables = repeat_variables.append(v);
return #<incr(`(makePlan(z)),lambda(tuple(`s,`v),`(makePlan(m))),`(makePlan(a)))>;
case record(...bl):
Trees el = #[];
for ( Tree b: bl )
match b {
case bind(_,`c):
el = el.append(c);
};
return makePlan(#<tuple(...el)>);
case project(`x,`a):
Tree tp = TypeInference.type_inference(x);
if (tp.equals(#<XML>) || tp.equals(global_datatype_env.lookup(#<XML>.toString())))
return makePlan(#<call(XMLchildren,`(new StringLeaf(a.toString())),`x)>);
match tp {
case `S(`xtp):
if (is_collection(S))
if (xtp.equals(#<XML>) || xtp.equals(global_datatype_env.lookup(#<XML>.toString())))
return makePlan(#<call(XMLchildren,`(new StringLeaf(a.toString())),`x)>);
};
match TypeInference.expand(tp) {
case record(...bl):
int i = 0;
for ( Tree b: bl )
match b {
case bind(`c,_):
if (a.equals(c))
return makePlan(#<nth(`x,`i)>);
else fail
case _: i++;
};
case union(...tl):
for ( Tree t: tl )
match TypeInference.expand(t) {
case _(record(...bl)):
int i = 0;
for ( Tree b: bl )
match b {
case bind(`c,_):
if (a.equals(c))
return makePlan(#<nth(union_value(`x),`i)>);
else fail
case _: i++;
};
case _(bag(tuple(string,`tv))):
return #<map_index(`(makePlan(#<union_value(`x)>)),
`(new StringLeaf(a.toString())))>;
case `tt: error("wrong projection: "+e+" ("+tt+")");
};
case `S(`ttp):
if (!is_collection(S))
fail;
match TypeInference.expand(ttp) {
case tuple(string,`tv):
return #<map_index(`(makePlan(x)),
`(new StringLeaf(a.toString())))>;
case record(...bl):
Tree nv = new_var();
int i = 0;
for ( Tree b: bl )
match b {
case bind(`c,_):
if (a.equals(c))
return makePlan(#<cmap(lambda(`nv,nth(`nv,`i)),`x)>);
else fail
case _: i++;
};
case union(...tl):
Tree nv = new_var();
int j = 0;
for ( Tree t: tl ) {
match t {
case `c(record(...bl)):
int i = 0;
for ( Tree b: bl )
match b {
case bind(`w,_):
if (a.equals(w))
return makePlan(#<cmap(lambda(`nv,if(call(eq,union_tag(`nv),`j),
bag(nth(union_value(`nv),`i)),
bag())),`x)>);
else fail
case _: i++;
};
case `c(bag(tuple(string,`tv))):
return makePlan(#<cmap(lambda(`nv,if(call(eq,union_tag(`nv),`j),
bag(map_index(union_value(`nv),
`(new StringLeaf(a.toString())))),
bag())),`x)>);
case `tt: error("wrong projection: "+e+" ("+tt+")");
};
j++;
}
};
case `t: error("wrong projection: "+e+" ("+t+")");
};
case typed(`x,`tp):
if (tp.is_variable() && !tp.equals(#<string>) && MRContainer.type_code(tp.toString()) >= 0)
return makePlan(#<call(coerce,`x,
`(MRContainer.type_code(tp.toString())))>);
else fail
case index(`x,`n):
match TypeInference.type_inference2(x) {
case bag(tuple(`kt,`vt)):
if (TypeInference.type_inference2(n).equals(kt))
return #<map_index(`(makePlan(x)),
`(makePlan(n)))>;
case Bag(tuple(`kt,`vt)):
if (TypeInference.type_inference2(n).equals(kt))
return #<map_index(`(makePlan(x)),
`(makePlan(n)))>;
case union(...tl):
for ( Tree t: tl )
match TypeInference.expand(t) {
case _(bag(tuple(`kt,`vt))):
if (TypeInference.type_inference2(n).equals(kt))
return #<map_index(`(makePlan(#<union_value(`x)>)),
`(makePlan(n)))>;
case _(list(`tp)):
return #<index(`(makePlan(#<union_value(`x)>)),
`(makePlan(n)))>;
case `tt: error("wrong indexing: "+e+" ("+tt+")");
};
};
return #<index(`(makePlan(x)),
`(makePlan(n)))>;
case call(count,mapReduce(`m,lambda(`vr,`br),`X,`o)):
Tree nv = new_var();
type_env.insert(nv.toString(),TypeInference.type_inference(vr));
Tree nr = simplify(#<lambda(`nv,bag(call(count,`(subst(vr,nv,br)))))>);
Tree plan = #<call(sum,mapReduce(`m,`nr,`X,false))>;
return makePlan(plan);
case call(`f,`u):
for ( Tree monoid: monoids )
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
if (aggr.equals(f.toString())) {
Tree plan = makePlan(u);
Tree nx = new_var();
Tree np = new_var();
Tree na = new_var();
Tree tp = TypeInference.type_inference(e);
type_env.insert(np.toString(),#<tuple(`tp,`tp)>);
match TypeInference.type_inference(u) {
case `T(`t):
if (TypeInference.unify(mtp,t) == null)
continue;
type_env.insert(na.toString(),#<tuple(`tp,`t)>);
type_env.insert(nx.toString(),t);
};
plus = makePlan(simplify_all(#<lambda(`np,apply(`plus,`np))>));
Tree acc = makePlan(simplify_all(#<lambda(`na,apply(`plus,tuple(nth(`na,0),
apply(`unit,nth(`na,1)))))>));
zero = makePlan((f.equals(#<avg>)) ? zero : #<typed(`zero,`tp)>);
match plan {
case MapCombineReduce(`m,`c,`r,`s,_):
plan = #<MapAggregateReduce(`m,`r,`acc,`zero,`s,false)>;
case MapReduce(`m,`r,`s,_):
plan = #<MapAggregateReduce(`m,`r,`acc,`zero,`s,false)>;
case MapReduce2(`mx,`my,`r,`x,`y,_):
plan = #<MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,false)>;
case MapJoin(`kx,`ky,`r,`x,`y):
plan = #<MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y)>;
case CrossProduct(`mx,`my,`r,`x,`y):
plan = #<CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y)>;
case cMap(`m,`s):
plan = #<AggregateMap(`m,`acc,`zero,`s)>;
case _:
if (is_dataset_expr(u))
plan = #<AggregateMap(lambda(`nx,bag(`nx)),`acc,`zero,`plan)>;
else return #<aggregate(`acc,`zero,`plan)>;
};
if (is_dataset_expr(u))
return #<Aggregate(`plus,`zero,`plan)>;
else return #<aggregate(`plus,`zero,`plan)>;
}
};
fail
case call(plus,`x,`y):
if (!is_dataset_expr(x) || !is_dataset_expr(y))
fail;
return #<Merge(`(makePlan(x)),`(makePlan(y)))>;
case trace(`msg,`tp,`x):
return #<trace(`msg,`tp,`(makePlan(x)))>;
case call(`f,...el):
if (!f.is_variable())
fail;
Tree ret = data_constructors.lookup(f.toString());
if (ret != null)
match ret {
case `v(`n,`tp):
Tree p = (el.length()==1)
? makePlan(el.head())
: makePlan(#<tuple(...el)>);
return #<tagged_union(`n,`p)>;
};
ret = global_type_env.lookup(f.toString());
if (ret != null)
match ret {
case arrow(_,_):
Trees args = #[];
for ( Tree a: el )
args = args.append(makePlan(a));
return #<apply(`f,tuple(...args))>;
};
Trees tps = #[];
for ( Tree a: el )
tps = tps.append(TypeInference.type_inference(a));
int i = ClassImporter.find_method_number(f.toString(),tps);
if (i < 0)
error("Method "+f+tps+" has no implementation");
Trees sig = ClassImporter.signature(i);
Trees args = #[];
for ( int j = 0; j < el.length(); j++ ) {
Tree b = sig.nth(j+1);
if (f.equals(#<coerce>) || b.equals(tps.nth(j)) || !b.is_variable()
|| b.equals(#<union>) || MRContainer.type_code(b.toString()) < 0)
args = args.append(makePlan(el.nth(j)));
else args = args.append(makePlan(#<call(coerce,`(el.nth(j)),
`(MRContainer.type_code(b.toString())))>));
};
return #<callM(`f,`i,...args)>;
case let(`v,`u,`body):
if (true)
fail; // disable
body = makePlan(body);
match TypeInference.type_inference(u) {
case `S(_):
// if a variable bound to a collection is used more than once in the body,
// materialize the collection in memory
if (is_collection(S) && occurences(v,body) > 1)
body = #<let(`v,`(makePlan(#<call(materialize,`v)>)),`body)>;
};
return #<let(`v,`(makePlan(u)),`body)>;
case function(tuple(...params),`outp,`body):
boolean is_dataset = false;
for ( Tree p: params )
match p {
case dataset(`v,`tp):
is_dataset = true;
};
body = makePlan(body);
return #<function(tuple(...params),`outp,`body)>;
case provenance(`x,...s):
Tree px = makePlan(x);
if (is_dataset_expr(x))
return #<Provenance(`px,...s)>;
else return #<provenance(`px,...s)>;
case `f(...al):
Trees bl = #[];
for ( Tree a: al )
bl = bl.append(makePlan(a));
return #<`f(...bl)>;
};
return e;
}
/** convert an algebraic plan to a physical plan
* @param plan algebraic plan
* @return the physical plan
*/
public static Tree physical_plan ( Tree plan ) {
match plan {
case MapReduce2(`mx,`my,`r,`x,`y,`o):
return physical_plan(#<MapAggregateReduce2(`mx,`my,`r,null,null,`x,`y,`o)>);
case MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,`x,`y,false):
// dissable mapjoins in Flink and Spark mode, because to select between a mapjoin
// and a coGroup at run-time, it needs to know the input sizes
if (!Config.flink_mode && !Config.spark_mode)
fail;
return #<MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,
`(physical_plan(x)),
`(physical_plan(y)),false)>;
// convert a reduce-side join to a fragment-replicate join, if either of the join
// inputs is small to fit in memory (dataset sizes are extracted from file statistics)
case MapAggregateReduce2(`mx,`my,
lambda(`v,cmap(lambda(`vx,cmap(lambda(`vy,`b),nth(`nx,1))),nth(`ny,0))),
null,null,`x,`y,false):
if (Config.noMapJoin)
fail;
if (!nx.equals(v) || !ny.equals(v) || occurences(v,b) > 0)
fail;
Tree X = new_var();
Tree Y = new_var();
Tree nv = new_var();
Tree L1 = simplify_all(#<lambda(`nv,cmap(lambda(`vx,cmap(lambda(`vy,`b),nth(`nv,1))),bag(nth(`nv,0))))>);
nv = new_var();
Tree L2 = simplify_all(#<lambda(`nv,cmap(lambda(`vx,cmap(lambda(`vy,`b),bag(nth(`nv,0)))),nth(`nv,1)))>);
nv = new_var();
Tree L3 = #<lambda(`nv,cmap(lambda(`vx,cmap(lambda(`vy,`b),nth(`nv,0))),nth(`nv,1)))>;
Tree cond1 = makePlan(#<call(leq,dataset_size(`Y),`(Config.mapjoin_size))>);
Tree cond2 = makePlan(#<call(leq,dataset_size(`X),`(Config.mapjoin_size))>);
Tree cond3 = makePlan(#<call(lt,dataset_size(`X),dataset_size(`Y))>);
return #<Let(`X,`(physical_plan(x)),Let(`Y,`(physical_plan(y)),
If(`cond1,
MapJoin(`mx,`my,`L1,`X,`Y),
If(`cond2,
MapJoin(`my,`mx,`L2,`Y,`X),
If(`cond3,
MapAggregateReduce2(`my,`mx,`L3,null,null,`Y,`X,false),
MapAggregateReduce2(`mx,`my,
lambda(`v,cmap(lambda(`vx,cmap(lambda(`vy,`b),nth(`nx,1))),
nth(`ny,0))),
null,null,`X,`Y,false))))))>;
case MapAggregateReduce2(`mx,`my,
lambda(`v,cmap(lambda(`vy,cmap(lambda(`vx,`b),nth(`ny,0))),nth(`nx,1))),
null,null,`x,`y,false):
if (Config.noMapJoin)
fail;
if (!nx.equals(v) || !ny.equals(v) || occurences(v,b) > 0)
fail;
return physical_plan(#<MapAggregateReduce2(`mx,`my,
lambda(`v,cmap(lambda(`vx,cmap(lambda(`vy,`b),nth(`nx,1))),nth(`ny,0))),
null,null,`x,`y,false)>);
case xxxMapAggregateReduce2(`mx,`my,lambda(`v,`b),null,null,`x,`y,false):
if (Config.noMapJoin)
fail;
Tree X = new_var();
Tree Y = new_var();
Tree nv = new_var();
Tree L1 = simplify_all(subst(#<nth(`v,0)>,#<bag(nth(`nv,0))>,
subst(#<nth(`v,1)>,#<nth(`nv,1)>,
#<lambda(`nv,`b)>)));
nv = new_var();
Tree L2 = subst(#<nth(`v,0)>,#<nth(`nv,1)>,
subst(#<nth(`v,1)>,#<nth(`nv,0)>,
#<lambda(`nv,`b)>));
Tree cond1 = makePlan(#<call(leq,dataset_size(`Y),`(Config.mapjoin_size))>);
Tree cond2 = makePlan(#<call(lt,dataset_size(`X),dataset_size(`Y))>);
return #<Let(`X,`(physical_plan(x)),Let(`Y,`(physical_plan(y)),
If(`cond1,
MapJoin(`mx,`my,`L1,`X,`Y),
If(`cond2,
MapAggregateReduce2(`my,`mx,`L2,null,null,`Y,`X,false),
MapAggregateReduce2(`mx,`my,lambda(`v,`b),null,null,`X,`Y,false)))))>;
case MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,`x,`y,false):
Tree X = new_var();
Tree Y = new_var();
Tree nv = new_var();
Tree nr = subst(#<nth(`v,0)>,#<nth(`nv,1)>,
subst(#<nth(`v,1)>,#<nth(`nv,0)>,b));
if (!streamed_MapReduce2_reducer(#<lambda(`nv,`nr)>))
fail;
Tree cond = makePlan(#<call(lt,dataset_size(`X),dataset_size(`Y))>);
return #<Let(`X,`(physical_plan(x)),Let(`Y,`(physical_plan(y)),
If(`cond,
MapAggregateReduce2(`my,`mx,lambda(`nv,`nr),`acc,`zero,`Y,`X,false),
MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,`X,`Y,false))))>;
case CrossProduct(`mx,`my,`r,`x,`y):
return physical_plan(#<CrossAggregateProduct(`mx,`my,`r,null,null,`x,`y)>);
case CrossAggregateProduct(`mx,`my,lambda(`v,`b),`acc,`zero,`x,`y):
if (Config.flink_mode || Config.spark_mode)
fail;
Tree X = new_var();
Tree Y = new_var();
Tree nv = new_var();
Tree nr = subst(#<nth(`v,0)>,#<nth(`nv,1)>,
subst(#<nth(`v,1)>,#<nth(`nv,0)>,b));
Tree cond = makePlan(#<call(lt,dataset_size(`X),dataset_size(`Y))>);
return #<Let(`X,`(physical_plan(x)),Let(`Y,`(physical_plan(y)),
If(`cond,
CrossAggregateProduct(`my,`mx,lambda(`nv,`nr),`acc,`zero,`Y,`X),
CrossAggregateProduct(`mx,`my,lambda(`v,`b),`acc,`zero,`X,`Y))))>;
case `f(...al):
Trees bl = #[];
for ( Tree a: al )
bl = bl.append(physical_plan(a));
return #<`f(...bl)>;
};
return plan;
}
}