blob: 8c3707c78501d0300acf84975f1dc58dff883983 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
/** Translates physical plans to BSP plans to be executed on Hama */
public class BSPTranslator extends TypeInference {
final static int orM = ClassImporter.find_method_number("or",#[bool,bool]);
final static int andM = ClassImporter.find_method_number("and",#[bool,bool]);
final static int notM = ClassImporter.find_method_number("not",#[bool]);
final static int eqM = ClassImporter.find_method_number("eq",#[int,int]);
final static int plusM = ClassImporter.find_method_number("plus",#[int,int]);
final static int neqM = ClassImporter.find_method_number("neq",#[int,int]);
final static int geqM = ClassImporter.find_method_number("geq",#[long,long]);
final static int gtM = ClassImporter.find_method_number("gt",#[int,int]);
final static int unionM = ClassImporter.find_method_number("union",#[bag(any),bag(any)]);
final static int countM = ClassImporter.find_method_number("count",#[bag(any)]);
final static int coerceM = ClassImporter.find_method_number("coerce",#[any,int]);
final static Trees planNames = plans_with_distributed_lambdas
.append(#[ParsedSource,Generator,BinarySource,GroupByJoin,Repeat,Closure,repeat,trace]);
private static int source_num = 0;
public static void reset () {
source_num = 0;
}
private static Tree new_source ( Tree e ) {
match e {
case `f(...):
if (planNames.member(#<`f>))
return preprocess(e);
};
return #<BSPSource(`(source_num++),`(preprocess(e)))>;
}
/** add a new source num to every BSP operation */
private static Tree preprocess ( Tree e ) {
match e {
case lambda(`v,`b):
return #<lambda(`v,`(preprocess(b)))>;
case Aggregate(`acc,`zero,`S):
return #<Aggregate(`acc,`zero,
`(new_source(S)))>;
case cMap(`m,`S):
return #<cMap(`m,
`(new_source(S)))>;
case AggregateMap(`m,`acc,`zero,`S):
return #<AggregateMap(`m,`acc,`zero,
`(new_source(S)))>;
case MapReduce(`m,`r,`S,`o):
return #<MapReduce(`m,`r,
`(new_source(S)),`o)>;
case MapAggregateReduce(`m,`r,`acc,`zero,`S,`o):
return #<MapAggregateReduce(`m,`r,`acc,`zero,
`(new_source(S)),`o)>;
case MapCombineReduce(`m,`c,`r,`S,`o):
return #<MapCombineReduce(`m,`c,`r,
`(new_source(S)),`o)>;
case MapReduce2(`mx,`my,`r,`x,`y,`o):
return #<MapReduce2(`mx,`my,`r,
`(new_source(x)),
`(new_source(y)),`o)>;
case MapReduce2(`mx,`my,`c,`r,`x,`y,`o):
return #<MapCombineReduce2(`mx,`my,`c,`r,
`(new_source(x)),
`(new_source(y)),`o)>;
case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,`o):
return #<MapAggregateReduce2(`mx,`my,`r,`acc,`zero,
`(new_source(x)),
`(new_source(y)),`o)>;
case MapJoin(`mx,`my,`r,`x,`y):
return #<MapJoin(`mx,`my,`r,
`(new_source(x)),
`(new_source(y)))>;
case MapAggregateJoin(`mx,`my,`r,`acc,`zero,`x,`y):
return #<MapAggregateJoin(`mx,`my,`r,`acc,`zero,
`(new_source(x)),
`(new_source(y)))>;
case CrossProduct(`mx,`my,`r,`x,`y):
return #<CrossProduct(`mx,`my,`r,
`(new_source(x)),
`(new_source(y)))>;
case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
return #<CrossAggregateProduct(`mx,`my,`r,`acc,`zero,
`(new_source(x)),
`(new_source(y)))>;
case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`z,`r,`x,`y,`o):
return #<GroupByJoin(`kx,`ky,`gx,`gy,`acc,`z,`r,
`(new_source(x)),
`(new_source(y)),`o)>;
case Repeat(`f,`ds,`max):
return #<Repeat(`(preprocess(f)),`(preprocess(ds)),`max)>;
case repeat(`f,`ds,`max):
return #<repeat(`(preprocess(f)),`(preprocess(ds)),`max)>;
case Closure(`f,`ds,`max):
return #<Closure(`(preprocess(f)),`(preprocess(ds)),`max)>;
case trace(`msg,`tp,`x):
return #<trace(`msg,`tp,`(preprocess(x)))>;
case `f(...):
if (! #[ParsedSource,Generator,BinarySource].member(#<`f>))
fail;
return #<BSPSource(`(source_num++),`e)>;
case `f(...as):
Trees bs = #[];
for (Tree a: as)
bs = bs.append(preprocess(a));
return #<`f(...bs)>;
};
return e;
}
/** returns the source num of a BSP operation */
private static int source_num ( Tree e ) {
match e {
case trace(`msg,`tp,`x):
return source_num(x);
case BSP(tuple(`i,...),...):
return (int)((LongLeaf)i).value();
case BSP(`i,...):
return (int)((LongLeaf)i).value();
case BSPSource(`i,_):
return (int)((LongLeaf)i).value();
};
return -1;
}
private static Tree subst_getCache_num ( int n, int m, Tree e ) {
match e {
case getCache(`cache,`k):
if (!k.equals(#<`n>))
fail;
return #<getCache(`cache,`m)>;
case `f(...as):
Trees bs = #[];
for (Tree a: as)
bs = bs.append(subst_getCache_num(n,m,a));
return #<`f(...bs)>;
};
return e;
}
private static Tree subst_setCache_num ( int n, int m, Tree e ) {
match e {
case setCache(`cache,`k,...r):
if (!k.equals(#<`n>))
fail;
return #<setCache(`cache,`m,...r)>;
case `f(...as):
Trees bs = #[];
for (Tree a: as)
bs = bs.append(subst_setCache_num(n,m,a));
return #<`f(...bs)>;
};
return e;
}
private static Tree set_cache_num ( Tree e, int n ) {
match e {
case trace(`msg,`tp,`x):
return set_cache_num(x,n);
case BSP(`m,...r):
return subst_setCache_num(source_num(e),n,#<BSP(`n,...r)>);
case BSPSource(_,`x):
return #<BSPSource(`n,`x)>;
};
return e;
}
// return the source numbers of a BSP operation
private static Trees source_nums ( Tree e ) {
match e {
case trace(`msg,`tp,`x):
return source_nums(x);
case BSP(`i,...):
return #[`i];
case Merge(`x,`y):
return source_nums(x).append(source_nums(y));
case BSPSource(`i,_):
return #[`i];
};
return #[];
}
private static Trees source_nums ( Trees s ) {
Trees res = #[];
for ( Tree x: s )
res = res.append(source_nums(x));
return res;
}
private static Tree getCache ( Tree var, Tree e, Tree body ) {
Trees ns = source_nums(e);
if (ns.length() > 0)
return subst(var,#<getCache(cache,...ns)>,body);
return body;
}
private static Tree getCache ( Tree var, Trees s, Tree body ) {
Trees ns = source_nums(s);
return subst(var,#<getCache(cache,...ns)>,body);
}
/** optimize a BSP plan after BSP fusion */
private static Tree post_simplify ( Tree e ) {
match e {
case cmap(lambda(`x,`S(`a)),`b):
if (is_collection(S))
return post_simplify(#<map(lambda(`x,`a),`b)>);
else fail
case `f(...as):
Trees bs = #[];
for (Tree a: as)
bs = bs.append(post_simplify(a));
return #<`f(...bs)>;
};
return e;
}
private static Tree processBSP ( Tree e ) {
return post_simplify(Simplification.simplify_all(Simplification.rename(e)));
}
private static Tree mkBSP ( Tree superstep, Tree state, Tree orderp, Tree input ) {
int rn = source_num++;
superstep = processBSP(subst(#<o_>,#<`rn>,getCache(#<i_>,input,superstep)));
return #<BSP(`rn,`superstep,`state,`orderp,`input)>;
}
private static Tree mkBSPL ( Tree superstep, Tree state, Tree orderp, Trees input ) {
int rn = source_num++;
superstep = processBSP(subst(#<o_>,#<`rn>,getCache(#<i>,input,superstep)));
return #<BSP(`rn,`superstep,`state,`orderp,...input)>;
}
private static Tree mkBSPL ( int[] ns, Tree superstep, Tree state, Tree orderp, Trees input ) {
superstep = processBSP(subst(#<o_>,#<`(ns[0])>,getCache(#<i_>,input,superstep)));
Trees s = #[];
for ( int n: ns )
s = s.append(#<`n>);
return #<BSP(tuple(...s),`superstep,`state,`orderp,...input)>;
}
private static Tree mkBSPL ( int n, Tree superstep, Tree state, Tree orderp, Trees input ) {
superstep = processBSP(subst(#<o_>,#<`n>,getCache(#<i_>,input,superstep)));
return #<BSP(`n,`superstep,`state,`orderp,...input)>;
}
private static Tree mkBSP2 ( Tree superstep, Tree state, Tree orderp, Tree left, Tree right ) {
int rn = source_num++;
superstep = processBSP(subst(#<o_>,#<`rn>,getCache(#<i_>,left,
getCache(#<j_>,right,superstep))));
return #<BSP(`rn,`superstep,`state,`orderp,`left,`right)>;
}
/** construct a BSP plan from a physical plan
* @param e the physical plan
* @return the BSP plan
*/
private static Tree mr2bsp ( Tree e ) {
match e {
case Aggregate(`acc,`zero,`S):
return #<Aggregate(`acc,`zero,`(mr2bsp(S)))>;
case cMap(`m,`S):
return mkBSP(#<lambda(tuple(cache,ms,k,peer),
setCache(cache,o_,cmap(`m,i_),
tuple(BAG(),tuple(),TRUE())))>,
#<tuple()>,
#<false>,
mr2bsp(S));
case AggregateMap(`m,`acc,`zero,`S):
return mkBSP(#<lambda(tuple(cache,ms,k,peer),
setCache(cache,o_,aggregate(`acc,`zero,cmap(`m,i_)),
tuple(BAG(),tuple(),TRUE())))>,
#<tuple()>,
#<false>,
mr2bsp(S));
case MapReduce(`m,`r,`S,`o):
return mkBSP(#<lambda(tuple(cache,ms,map_step,peer),
if(map_step,
tuple(cmap(lambda(tuple(k,c),
bag(tuple(k,tuple(k,c)))),
cmap(`m,i_)),
false,
FALSE()),
setCache(cache,o_,`(o.equals(#<true>) // need to sort the result?
? #<cmap(lambda(tuple(k,s),
cmap(lambda(x,bag(tuple(x,k))),
apply(`r,tuple(k,s)))),
groupBy(ms))>
: #<cmap(`r,groupBy(ms))>),
tuple(BAG(),false,TRUE()))))>,
#<true>,
o,
mr2bsp(S));
case MapAggregateReduce(`m,`r,`acc,`zero,`S,_):
return mkBSP(#<lambda(tuple(cache,ms,map_step,peer),
if(map_step,
tuple(cmap(lambda(tuple(k,c),
bag(tuple(k,tuple(k,c)))),
cmap(`m,i_)),
false,
FALSE()),
setCache(cache,o_,aggregate(`acc,`zero,cmap(`r,groupBy(ms))),
tuple(BAG(),false,TRUE()))))>,
#<true>,
#<false>,
mr2bsp(S));
case MapCombineReduce(`m,`c,`r,`S,`o):
return mkBSP(#<lambda(tuple(cache,ms,map_step,peer),
if(map_step,
tuple(cmap(lambda(tuple(k,s),
cmap(lambda(x,bag(tuple(k,tuple(k,x)))),
apply(`c,tuple(k,s)))),
groupBy(cmap(`m,i_))),
false,
FALSE()),
setCache(cache,o_,`(o.equals(#<true>) // need to sort the result?
? #<cmap(lambda(tuple(k,s),
cmap(lambda(x,bag(tuple(x,k))),
apply(`r,tuple(k,s)))),
groupBy(ms))>
: #<cmap(`r,groupBy(ms))>),
tuple(BAG(),false,TRUE()))))>,
#<true>,
o,
mr2bsp(S));
case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
return mkBSP2(#<lambda(tuple(cache,ms,map_step,peer),
if(map_step,
tuple(callM(union,`unionM,
cmap(lambda(tuple(kx,x),
bag(tuple(kx,tuple(kx,tuple(1,x))))),
cmap(`mx,i_)),
cmap(lambda(tuple(ky,y),
bag(tuple(ky,tuple(ky,tuple(2,y))))),
cmap(`my,j_))),
false,
FALSE()),
setCache(cache,o_,cmap(lambda(tuple(k,s),
cmap(lambda(tuple(kk,ss),
cmap(lambda(x,bag(tuple(kk,x))),
apply(`c,tuple(kk,ss)))),
groupBy(apply(`r,
tuple(cmap(lambda(tuple(kx,x),
if(callM(eq,`eqM,kx,1),
bag(x),
bag())),
s),
cmap(lambda(tuple(ky,y),
if(callM(eq,`eqM,ky,2),
bag(y),
bag())),
s)))))),
groupBy(ms)),
tuple(BAG(),false,TRUE()))))>,
#<true>,
o,
mr2bsp(x),
mr2bsp(y));
case MapReduce2(`mx,`my,`r,`x,`y,`o):
return mkBSP2(#<lambda(tuple(cache,ms,map_step,peer),
if(map_step,
tuple(callM(union,`unionM,
cmap(lambda(tuple(kx,x),
bag(tuple(kx,tuple(kx,tuple(1,x))))),
cmap(`mx,i_)),
cmap(lambda(tuple(ky,y),
bag(tuple(ky,tuple(ky,tuple(2,y))))),
cmap(`my,j_))),
false,
FALSE()),
setCache(cache,o_,cmap(lambda(tuple(k,s),
cmap(lambda(x,bag(`(o.equals(#<true>) ? #<tuple(x,k)> : #<x>))),
apply(`r,
tuple(cmap(lambda(tuple(kx,x),
if(callM(eq,`eqM,kx,1),
bag(x),
bag())),
s),
cmap(lambda(tuple(ky,y),
if(callM(eq,`eqM,ky,2),
bag(y),
bag())),
s))))),
groupBy(ms)),
tuple(BAG(),false,TRUE()))))>,
#<true>,
o,
mr2bsp(x),
mr2bsp(y));
case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
return mkBSP2(#<lambda(tuple(cache,ms,map_step,peer),
if(map_step,
tuple(callM(union,`unionM,
cmap(lambda(tuple(kx,x),
bag(tuple(kx,tuple(kx,tuple(1,x))))),
cmap(`mx,i_)),
cmap(lambda(tuple(ky,y),
bag(tuple(ky,tuple(ky,tuple(2,y))))),
cmap(`my,j_))),
false,
FALSE()),
setCache(cache,o_,aggregate(`acc,`zero,
cmap(lambda(tuple(k,s),
apply(`r,tuple(cmap(lambda(tuple(kx,x),
if(callM(eq,`eqM,kx,1),
bag(x),
bag())),
s),
cmap(lambda(tuple(ky,y),
if(callM(eq,`eqM,ky,2),
bag(y),
bag())),
s)))),
groupBy(ms))),
tuple(BAG(),false,TRUE()))))>,
#<true>,
#<false>,
mr2bsp(x),
mr2bsp(y));
case MapJoin(`mx,`my,`r,`x,`y):
return mr2bsp(#<MapReduce2(`mx,`my,
lambda(tuple(xs,ys),cmap(lambda(x,apply(`r,tuple(x,ys))),xs)),
`x,`y,false)>);
case MapAggregateJoin(`mx,`my,`r,`acc,`zero,`x,`y):
return mr2bsp(#<MapAggregateReduce2(`mx,`my,
lambda(tuple(xs,ys),cmap(lambda(x,apply(`r,tuple(x,ys))),xs)),
`acc,`zero,`x,`y)>);
case GroupByJoin(`kx,`ky,lambda(`vx,`gx),lambda(`vy,`gy),`acc,`zero,`r,`x,`y,`o):
int n = (int)Math.floor(Math.sqrt(Config.nodes));
int m = n;
// System.err.println("Using a groupBy join on a "+n+"*"+m+" grid of partitions");
Tree xkey = #<cmap(lambda(i,bag(tuple(call(plus,call(mod,call(hash_code,`gx),`m),call(times,`m,i)),
tuple(1,call(plus,call(mod,call(hash_code,`gx),`m),call(times,`m,i)),`vx)))),
range(0,`(n-1)))>;
Tree ykey = #<cmap(lambda(j,bag(tuple(call(plus,call(times,call(mod,call(hash_code,`gy),`n),`m),j),
tuple(2,call(plus,call(times,call(mod,call(hash_code,`gy),`n),`m),j),`vy)))),
range(0,`(m-1)))>;
type_inference(xkey);
type_inference(ykey);
xkey = PlanGeneration.makePlan(xkey);
ykey = PlanGeneration.makePlan(ykey);
return mkBSP2(#<lambda(tuple(cache,ms,map_step,peer),
if(map_step,
tuple(callM(union,`unionM,
cmap(lambda(`vx,`xkey),i_),
cmap(lambda(`vy,`ykey),j_)),
false,
FALSE()),
setCache(cache,o_,
mergeGroupByJoin(`kx,`ky,lambda(`vx,`gx),lambda(`vy,`gy),`acc,`zero,`r,
cmap(lambda(tuple(kx,p,x),
if(callM(eq,`eqM,kx,1),bag(tuple(p,x)),bag())),ms),
cmap(lambda(tuple(ky,p,y),
if(callM(eq,`eqM,ky,2),bag(tuple(p,y)),bag())),ms),`o),
tuple(BAG(),false,TRUE()))))>,
#<true>,
o,
mr2bsp(x),
mr2bsp(y));
case CrossProduct(`mx,`my,`r,`x,`y):
return mkBSP(#<lambda(tuple(cache,ms,ys,peer),
tuple(BAG(),
setCache(cache,o_,
cmap(lambda(x,
cmap(lambda(y,apply(`r,tuple(x,y))),
cmap(`my,ys))),
cmap(`mx,i_)),
tuple()),
TRUE()))>,
#<Collect(`(mr2bsp(y)))>,
#<false>,
mr2bsp(x));
case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
return mkBSP(#<lambda(tuple(cache,ms,ys,peer),
tuple(BAG(),
setCache(cache,o_,
aggregate(`acc,`zero,
cmap(lambda(x,
cmap(lambda(y,apply(`r,tuple(x,y))),
cmap(`my,ys))),
cmap(`mx,i_)),
tuple())),
TRUE()))>,
#<Collect(`(mr2bsp(y)))>,
#<false>,
mr2bsp(x));
case Repeat(lambda(`v,`b),`ds,`max):
Tree step = bspSimplify(mr2bsp(b));
int step_cache_num = source_num(step);
match step {
case BSP(`n,`s,`k0,_,...as):
Tree ds_source = mr2bsp(ds);
int ds_cache_num = source_num(ds_source);
ds_source = set_cache_num(ds_source,step_cache_num);
// the initial values of all data sources
Trees sources = #[`ds_source];
Tree step_input = #<0>;
for ( Tree x: as )
match x {
case BSPSource(`j,`y):
if (y.equals(v))
step_input = j;
else sources = sources.append(x);
case _: sources = sources.append(x);
};
s = subst_getCache_num((int)((LongLeaf)step_input).value(),step_cache_num,s);
return mkBSPL(step_cache_num,
#<lambda(tuple(cache,ms,tuple(k,steps),peer),
let(tuple(ts,kk,step_end),
apply(`s,tuple(cache,ms,k,peer)),
if(step_end, // end of repeat step
setCache(cache,`step_cache_num,
map(lambda(tuple(x,bb),x),
getCache(cache,`step_cache_num)),
tuple(bag(),
tuple(`k0,callM(plus,`plusM,steps,1)),
if(callM(gt,`gtM,steps,`max),
TRUE(), // if # of steps > limit, exit
callM(not,`notM, // ... else check the stopping condition
aggregate(lambda(tuple(x,y),callM(or,`orM,x,y)),
false,
map(lambda(tuple(x,bb),bb),
getCache(cache,`step_cache_num))))))),
tuple(ts,tuple(kk,steps),FALSE()))))>,
#<tuple(`k0,2)>,
#<false>,
sources);
case `x: throw new Error("Cannot compile the repeat function: "+x);
}
// when the repeat variable is in memory
case repeat(lambda(`v,`b),`ds,`max):
if (!Config.hadoop_mode)
fail;
Tree step = bspSimplify(mr2bsp(b));
int step_cache_num = source_num(step);
step = subst(v,#<map(lambda(tuple(x,b),x),getCache(cache,`step_cache_num))>,step);
match step {
case BSP(`n,`s,`k0,_,...as):
// the initial values of all data sources
Trees sources = #[];
for ( Tree x: as )
if (!x.equals(v))
sources = sources.append(x);
Tree res = mkBSPL(#<lambda(tuple(cache,ms,tuple(k,steps,firstp,S),peer),
let(ignore,if(firstp,setCache(cache,`step_cache_num,S,0),0),
let(tuple(ts,kk,step_end),
apply(`s,tuple(cache,ms,k,peer)),
if(step_end, // end of repeat step
setCache(cache,o_,map(lambda(tuple(x,b),x),getCache(cache,`step_cache_num)),
setCache(cache,`step_cache_num,
distribute(peer,getCache(cache,`step_cache_num)),
tuple(bag(),
tuple(`k0,callM(plus,`plusM,steps,1),false,bag()),
if(callM(gt,`gtM,steps,`max),
TRUE(), // if # of steps > limit, exit
callM(not,`notM, // ... else check the stopping condition
aggregate(lambda(tuple(x,y),callM(or,`orM,x,y)),
false,
map(lambda(tuple(x,bb),bb),
getCache(cache,`step_cache_num)))))))),
tuple(ts,tuple(kk,steps,false,bag()),FALSE())))))>,
#<tuple(`k0,2,true,map(lambda(x,tuple(x,false)),`ds))>,
#<false>,
sources);
return #<Collect(`res)>; // must return a memory bag
case `x: fail
}
case Closure(lambda(`v,`b),`ds,`max):
Tree step = bspSimplify(mr2bsp(b));
int step_cache_num = source_num(step);
match step {
case BSP(`n,`s,`k0,_,...as):
Tree ds_source = mr2bsp(ds);
int ds_cache_num = source_num(ds_source);
ds_source = set_cache_num(ds_source,step_cache_num);
// the initial values of all data sources
Trees sources = #[`ds_source];
Tree step_input = #<0>;
for ( Tree x: as )
match x {
case BSPSource(`j,`y):
if (y.equals(v))
step_input = j;
else sources = sources.append(x);
case _: sources = sources.append(x);
};
s = subst_getCache_num((int)((LongLeaf)step_input).value(),step_cache_num,s);
return mkBSPL(step_cache_num,
#<lambda(tuple(cache,ms,tuple(k,steps,len),peer),
let(tuple(ts,kk,step_end),
apply(`s,tuple(cache,ms,k,peer)),
if(step_end, // end of repeat step
let(newLen,callM(count,`countM,getCache(cache,`step_cache_num)),
tuple(bag(),
tuple(`k0,callM(plus,`plusM,steps,1),newLen),
if(callM(gt,`gtM,steps,`max),
TRUE(), // if # of steps > limit, exit
// ... else check if the new size is the same as the old size
callM(geq,`geqM,len,newLen)))),
tuple(ts,tuple(kk,steps,len),FALSE()))))>,
#<tuple(`k0,1,callM(coerce,`coerceM,0,4))>,
#<false>,
sources);
case `x: throw new Error("Cannot compile the closure function: "+x);
}
case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`max):
Tree[] steps = new Tree[vs.length()];
Tree[] inits = new Tree[vs.length()];
int[] cache_num = new int[vs.length()];
Tree[] k = new Tree[vs.length()];
Trees sources = #[];
Trees all_cache = #[];
boolean dont_fuse = false;
for ( int i = 0; i < vs.length(); i++ ) {
inits[i] = mr2bsp(ss.nth(i));
sources = sources.append(inits[i]);
cache_num[i] = source_num(inits[i]);
all_cache = all_cache.append(#<getCache(cache,`(cache_num[i]))>);
};
for ( int i = 0; i < vs.length(); i++ ) {
Tree ee = bspSimplify(mr2bsp(bs.nth(i)));
match ee {
case BSP(`n,`s,`k0,_,...as):
steps[i] = subst_setCache_num(source_num(ee),cache_num[i],s);
k[i] = k0;
loop: for ( Tree x: as )
match x {
case BSPSource(`m,`w):
if (!vs.member(w))
fail;
for ( int j = 0; j < vs.length(); j++ )
if (w.equals(vs.nth(j)))
steps[i] = subst_getCache_num(source_num(x),cache_num[j],steps[i]);
case BSPSource(`n1,`d1):
for ( Tree y: sources )
match y {
case BSPSource(`n2,`d2):
if (d1.equals(d2)) {
steps[i] = subst_getCache_num(source_num(x),source_num(y),steps[i]);
continue loop;
}
};
sources = sources.append(x);
case _: sources = sources.append(x);
};
case _: dont_fuse = true;
}
};
if (dont_fuse)
fail;
Tree code = #<tuple(BAG(),tuple(`(k[0]),0,1),TRUE())>;
for ( int i = 0; i < vs.length(); i++ )
code = #<if(callM(eq,`eqM,i,`i),
let(tuple(ts,kk,step_end),
apply(`(steps[i]),tuple(cache,ms,k,peer)),
if(step_end, // end of repeat step
`((i+1 < vs.length())
? #<tuple(bag(),tuple(`(k[i+1]),`(i+1),steps),FALSE())>
: #<tuple(bag(),
tuple(`(k[0]),0,callM(plus,`plusM,steps,1)),
if(callM(gt,`gtM,steps,`max), // if # of steps > limit, exit
TRUE(),
FALSE()))>),
tuple(ts,tuple(kk,i,steps),FALSE()))),
`code)>;
return mkBSPL(cache_num,
#<lambda(tuple(cache,ms,tuple(k,i,steps),peer),`code)>,
#<tuple(`(k[0]),0,2)>,
#<false>,
sources);
case `f(...as):
Trees bs = #[];
for ( Tree a: as )
bs = bs.append(mr2bsp(a));
return #<`f(...bs)>;
};
return e;
}
/** simplify the BSP plan by fusing consecutive BSP plans */
private static Tree bspSimplify ( Tree e ) {
match e {
case BSP(`n,`s2,`k2,`o,...r,BSP(_,`s1,`k1,_,...s),...t):
Trees ys = r.append(t);
Trees ns = #[];
loop: for ( Tree x: s )
match x {
case BSPSource(`n1,`d1):
for ( Tree y: ys )
match y {
case BSPSource(`n2,`d2):
if (d1.equals(d2)) {
s1 = subst_getCache_num(source_num(x),source_num(y),s1);
continue loop;
}
};
ns = ns.append(x);
case _: ns = ns.append(x);
};
int[] nvs = null;
match n {
case `f(...as):
nvs = new int[as.length()];
for ( int i = 0; i < as.length(); i++ )
nvs[i] = (int)((LongLeaf)as.nth(i)).value();
case _:
nvs = new int[1];
nvs[0] = (int)((LongLeaf)n).value();
};
return bspSimplify(mkBSPL(nvs,
#<lambda(tuple(cache,ms,tuple(first,k),peer),
if(first,
let(tuple(ts,kk,b),apply(`s1,tuple(cache,ms,k,peer)),
let(exit,
synchronize(peer,b), // poll all peers: do you want to exit?
// all peers must aggree to exit the inner BSP
// and proceed to the outer BSP
tuple(ts,tuple(callM(not,`notM,exit),
if(exit,`k2,kk)),FALSE()))),
let(tuple(ts,kk,bb),apply(`s2,tuple(cache,ms,k,peer)),
tuple(ts,tuple(false,kk),bb))))>,
#<tuple(true,`k1)>,
o,
#[...r,...ns,...t]));
case `f(...as):
Trees bs = #[];
for ( Tree a: as )
bs = bs.append(bspSimplify(a));
return #<`f(...bs)>;
};
return e;
}
private static Tree post_simplify_plan ( Tree e ) {
match e {
case setCache(`cache,`a,`v,`ret):
return post_simplify_plan(#<setNth(`cache,`a,materialize(`v),`ret)>);
case getCache(`cache,`a,...as):
Tree z = #<nth(`cache,`a)>;
for ( Tree x: as )
z = #<callM(union,`unionM,`z,nth(`cache,`x))>;
return z;
case `f(...as):
Trees bs = #[];
for (Tree a: as)
bs = bs.append(post_simplify_plan(a));
return #<`f(...bs)>;
};
return e;
}
/** construct and simplify the BSP plan from a physical plan
* @param plan the physical plan
* @return the BSP plan
*/
public static Tree constructBSPplan ( Tree plan ) {
return post_simplify_plan(bspSimplify(mr2bsp(preprocess(plan))));
}
}