[MRQL-92] Use outer-joins for incremental queries in Spark streaming mode
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index a9b16cb..77da529 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -72,12 +72,12 @@
BSP_SPLIT_INPUT=
-# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.3.0, 1.3.1, and 1.6.0
+# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.3.0, 1.3.1, 1.6.0, and 1.6.2
# (Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0)
# You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
# For distributed mode, give write permission to /tmp: hadoop fs -chmod -R 777 /tmp
# Tested in local, standalone deploy, and Yarn modes
-SPARK_HOME=${HOME}/spark-1.6.0-bin-hadoop2.6
+SPARK_HOME=${HOME}/spark-1.6.2-bin-hadoop2.6
# URI of the Spark master node:
# to run Spark on Standalone Mode, set it to spark://`hostname`:7077
# to run Spark on a YARN cluster, set it to "yarn-client"
diff --git a/core/src/main/java/org/apache/mrql/Evaluator.java b/core/src/main/java/org/apache/mrql/Evaluator.java
index ac89f4e..a504087 100644
--- a/core/src/main/java/org/apache/mrql/Evaluator.java
+++ b/core/src/main/java/org/apache/mrql/Evaluator.java
@@ -159,7 +159,7 @@
}
/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
- public void streaming ( Tree plan, Environment env, Function f ) {
+ public void streaming ( Tree plan, Environment env, Environment dataset_env, Function f ) {
throw new Error("MRQL Streaming is not supported in this evaluation mode yet");
}
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index 054963c..600f5aa 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -606,7 +606,7 @@
return ((Lambda)fnc).lambda().eval(t);
case Stream(...): // streaming
final Tree qt = query_type;
- Evaluator.evaluator.streaming(e,env,new Function(){
+ Evaluator.evaluator.streaming(e,env,null,new Function(){
public MRData eval ( final MRData data ) {
System.out.println(print(data,qt));
return data;
diff --git a/core/src/main/java/org/apache/mrql/Main.java b/core/src/main/java/org/apache/mrql/Main.java
index 087f439..cafc62f 100644
--- a/core/src/main/java/org/apache/mrql/Main.java
+++ b/core/src/main/java/org/apache/mrql/Main.java
@@ -28,7 +28,7 @@
final public class Main {
- public final static String version = "0.9.6";
+ public final static String version = "0.9.8";
public static PrintStream print_stream;
public static Configuration conf;
diff --git a/core/src/main/java/org/apache/mrql/PlanGeneration.gen b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
index 1b724da..a42380c 100644
--- a/core/src/main/java/org/apache/mrql/PlanGeneration.gen
+++ b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
@@ -418,6 +418,18 @@
`(makePlan(x)),
`(makePlan(y)))>;
else fail
+ case outerMerge(`m,`x,`y):
+ if (is_dataset_expr(x) && is_dataset_expr(y))
+ return #<OuterMerge(`(makePlan(m)),
+ `(makePlan(x)),
+ `(makePlan(y)))>;
+ else fail
+ case rightOuterMerge(`m,`x,`y):
+ if (is_dataset_expr(x) && is_dataset_expr(y))
+ return #<RightOuterMerge(`(makePlan(m)),
+ `(makePlan(x)),
+ `(makePlan(y)))>;
+ else fail
case cmap(`m,`s):
if (is_dataset_expr(s))
return #<cMap(`(makePlan(m)),
diff --git a/core/src/main/java/org/apache/mrql/Printer.gen b/core/src/main/java/org/apache/mrql/Printer.gen
index 18975c9..03fc28e 100644
--- a/core/src/main/java/org/apache/mrql/Printer.gen
+++ b/core/src/main/java/org/apache/mrql/Printer.gen
@@ -342,6 +342,12 @@
case CrossProduct(`mx,`my,`r,`x,`y):
return "CrossProduct:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
+tab(n+3)+"right: "+print_plan(y,n+10,true);
+ case OuterMerge(`merge,`x,`y):
+ return "OuterMerge:\n"+tab(n+3)+"state: "+print_plan(x,n+10,true)+"\n"
+ +tab(n+3)+"delta: "+print_plan(y,n+10,true);
+ case RightOuterMerge(`merge,`x,`y):
+ return "RightOuterMerge:\n"+tab(n+3)+"state: "+print_plan(x,n+10,true)+"\n"
+ +tab(n+3)+"delta: "+print_plan(y,n+10,true);
case CrossAggregateProduct(`mx,`my,`r,`a,null,`x,`y):
return "CrossProduct:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
+tab(n+3)+"right: "+print_plan(y,n+10,true);
diff --git a/core/src/main/java/org/apache/mrql/Streaming.gen b/core/src/main/java/org/apache/mrql/Streaming.gen
index 99ce205..f54cad8 100644
--- a/core/src/main/java/org/apache/mrql/Streaming.gen
+++ b/core/src/main/java/org/apache/mrql/Streaming.gen
@@ -932,17 +932,21 @@
}
/** Return the merge function (over X and Y) of the monoid m; type is used for key equality */
- private static Tree merge ( Tree m, Tree type, Tree X, Tree Y ) {
+ private static Tree merge ( Tree m, Tree type, Tree X, Tree Y, boolean first ) {
match m {
case `gb(`n):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
match type {
case `T(tuple(`keytp,`tp)):
- if (unique_key(keytp)) {
+ if (first) {
+ Tree nv = new_var();
+ Tree mb = merge(n,tp,#<nth(`nv,0)>,#<nth(`nv,1)>,false);
+ return #<outerMerge(lambda(`nv,`mb),`X,`Y)>;
+ } else if (unique_key(keytp)) {
Tree vx = new_var();
Tree vy = new_var();
- Tree v = merge(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`vy),1)>);
+ Tree v = merge(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`vy),1)>,false);
if (is_persistent_collection(T)) {
X = #<Collect(`X)>;
Y = #<Collect(`Y)>;
@@ -961,7 +965,7 @@
Tree vy = new_var();
Tree mx = new_var();
Tree my = new_var();
- Tree mb = merge(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>);
+ Tree mb = merge(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>,false);
Tree b = #<cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),`mb))),
`my)),
`mx)>;
@@ -980,7 +984,7 @@
Trees bs = #[ ];
int i = 0;
for ( Tree a: ms ) {
- bs = bs.append(merge(a,tps.nth(i),#<nth(`X,`i)>,#<nth(`Y,`i)>));
+ bs = bs.append(merge(a,tps.nth(i),#<nth(`X,`i)>,#<nth(`Y,`i)>,first));
i++;
};
return #<tuple(...bs)>;
@@ -992,7 +996,82 @@
match b {
case bind(`n,`a):
cs = cs.append(#<bind(`n,`(merge(a,((Node)tps.nth(i++)).children().nth(1),
- #<project(`X,`n)>,#<project(`Y,`n)>)))>);
+ #<project(`X,`n)>,#<project(`Y,`n)>,first)))>);
+ }
+ return #<record(...cs)>;
+ case union:
+ return #<call(plus,`X,`Y)>;
+ case fixed:
+ return Y;
+ case _:
+ if (!m.is_variable())
+ fail;
+ for ( Tree monoid: monoids )
+ match monoid {
+ case `aggr(`mtp,`plus,`zero,`unit):
+ if (#<`aggr>.equals(m))
+ return #<apply(`plus,tuple(`X,`Y))>;
+ };
+ };
+ throw new Error("Undefined monoid: "+m);
+ }
+
+ /** Return the merge function (over X and Y) of the monoid m; type is used for key equality */
+ private static Tree merge_left ( Tree m, Tree type, Tree X, Tree Y, boolean first ) {
+ match m {
+ case `gb(`n):
+ if (! #[groupBy,orderBy].member(#<`gb>) )
+ fail;
+ match type {
+ case `T(tuple(`keytp,`tp)):
+ if (first) {
+ Tree nv = new_var();
+ Tree mb = merge(n,tp,#<nth(`nv,0)>,#<nth(`nv,1)>,false);
+ return #<rightOuterMerge(lambda(`nv,`mb),`X,`Y)>;
+ } else if (unique_key(keytp)) {
+ Tree vx = new_var();
+ Tree v = merge_left(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`Y),1)>,false);
+ if (is_persistent_collection(T))
+ X = #<Collect(`X)>;
+ return #<let(`vx,`X,
+ if(call(exists,`vx),
+ bag(tuple(`keytp,`v)),
+ call(elem,`Y)))>;
+ };
+ // needs an outer-join in the reducer function
+ Tree v = new_var();
+ Tree vx = new_var();
+ Tree vy = new_var();
+ Tree mx = new_var();
+ Tree mb = merge_left(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>,false);
+ Tree b = #<cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),`mb))),
+ nth(`v,1))),
+ `mx)>;
+ return #<join(lambda(`vx,`(key_equality(keytp,#<nth(`vx,0)>))),
+ lambda(`vy,`(key_equality(keytp,#<nth(`vy,0)>))),
+ lambda(`v,let(`mx,nth(`v,0),
+ if(call(exists,`mx),`b,nth(`v,1)))),
+ `X, `Y)>;
+ };
+ throw new Error("Unknown type for merge_left: "+type);
+ case product(...ms):
+ Trees tps = ((Node)type).children();
+ Trees bs = #[ ];
+ int i = 0;
+ for ( Tree a: ms ) {
+ bs = bs.append(merge_left(a,tps.nth(i),#<nth(`X,`i)>,#<nth(`Y,`i)>,first));
+ i++;
+ };
+ return #<tuple(...bs)>;
+ case record(...bs):
+ Trees tps = ((Node)type).children();
+ Trees cs = #[ ];
+ int i = 0;
+ for ( Tree b: bs )
+ match b {
+ case bind(`n,`a):
+ cs = cs.append(#<bind(`n,`(merge_left(a,((Node)tps.nth(i++)).children().nth(1),
+ #<project(`X,`n)>,#<project(`Y,`n)>,first)))>);
}
return #<record(...cs)>;
case union:
@@ -1431,7 +1510,7 @@
if (Config.trace) {
System.out.println("Subterm type: "+tp);
System.out.println("Merge function over X and Y: ");
- System.out.println(SimplifyTerm(merge(m,tp,#<X>,#<Y>)).pretty(0));
+ System.out.println(SimplifyTerm(merge(m,tp,#<X>,#<Y>,true)).pretty(0));
};
type_env.insert(nv.toString(),tp);
Tree answer = answer(ne,nv,m);
@@ -1444,8 +1523,8 @@
};
Tree zero = zero(m);
Tree my = new_var();
- Tree merge = convert_to_algebra(SimplifyTerm(merge(m,tp,nv,my)));
- Tree res = #<tuple(`zero,lambda(tuple(`nv,`my),`merge),`q,lambda(`nv,`answer))>;
+ Tree merge = convert_to_algebra(SimplifyTerm(merge(m,tp,nv,my,true)));
+ Tree res = #<tuple(`zero,lambda(tuple(`nv,`my),`merge),`q,lambda(`nv,`answer),`m)>;
if (Config.trace) {
System.out.println("Incremental subterm:");
System.out.println(res.pretty(0));
@@ -1457,7 +1536,7 @@
private static Tree generate_incremental_code ( Tree e, Environment env ) {
if (!is_streaming(e))
- return #<tuple(tuple(),lambda(tuple(tuple(),tuple()),tuple()),tuple(),lambda(tuple(),`e))>;
+ return #<tuple(tuple(),lambda(tuple(tuple(),tuple()),tuple()),tuple(),lambda(tuple(),`e),fixed)>;
match e {
case call(stream,...):
Tree v = new_var();
@@ -1472,50 +1551,53 @@
};
if (is_monoid)
fail;
- Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+ Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[], ns = #[];
int i = 0;
for ( Tree q: qs )
match generate_incremental_code(q,env) {
- case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+ case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a),`n):
zs = zs.append(z); hs = hs.append(h); as = as.append(a);
vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
- i++;
+ ns = ns.append(n); i++;
};
return #<tuple(tuple(...zs),
lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
tuple(...hs),
- lambda(tuple(...vs),call(`f,...as)))>;
+ lambda(tuple(...vs),call(`f,...as)),
+ tuple(...ns))>;
case record(...rs):
- Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+ Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[], ns = #[];
int i = 0;
for ( Tree r: rs )
match r {
case bind(`k,`q):
match generate_incremental_code(q,env) {
- case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+ case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a),`n):
zs = zs.append(z); hs = hs.append(h); as = as.append(a);
vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
- i++;
+ ns = ns.append(n); i++;
}
};
return #<tuple(tuple(...zs),
lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
tuple(...hs),
- lambda(tuple(...vs),record(...as)))>;
+ lambda(tuple(...vs),record(...as)),
+ tuple(...ns))>;
case tuple(...qs):
- Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+ Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[], ns = #[];
int i = 0;
for ( Tree q: qs )
match generate_incremental_code(q,env) {
- case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+ case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a),`n):
zs = zs.append(z); hs = hs.append(h); as = as.append(a);
vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
- i++;
+ ns = ns.append(n); i++;
};
return #<tuple(tuple(...zs),
lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
tuple(...hs),
- lambda(tuple(...vs),tuple(...as)))>;
+ lambda(tuple(...vs),tuple(...as)),
+ tuple(...ns))>;
};
return generate_code(e,env);
}
@@ -1554,7 +1636,50 @@
TypeInference.type_inference(u);
repeat_environment = new Environment(v.toString(),#<union>,repeat_environment);
match generate_incremental_code(u,null) {
- case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a)):
+ case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a),`monoid):
+ Tree deltaT = new_var();
+ Tree w = new_var();
+ Tree tp = TypeInference.type_inference(s);
+ TypeInference.type_env.insert(deltaT.toString(),tp);
+ match tp {
+ case `T(`etp):
+ TypeInference.type_env.insert(T.toString(),etp);
+ };
+ Tree m_hat = convert_to_algebra(SimplifyTerm(merge_left(monoid,tp,v1,v2,true)));
+ if (Config.trace) {
+ System.out.println("Left-merge function over X and Y: ");
+ System.out.println(SimplifyTerm(merge_left(monoid,tp,#<X>,#<Y>,true)).pretty(0));
+ };
+ Tree nm = subst(v1,s,subst(v2,subst(v,subst(s,deltaT,a),h),m_hat));
+ Tree m0 = subst(v,x,h);//subst(v1,s,subst(v2,subst(v,x,h),m_hat));
+ Tree q = #<repeat(lambda(`deltaT,cmap(lambda(`w,bag(tuple(`w,true))),`nm)),
+ `m0,
+ `(nn-1))>;
+ Tree res = #<incr(`z,
+ lambda(`s,apply(lambda(tuple(`v1,`v2),`m),tuple(`s,`q))),
+ lambda(`s,`a))>;
+ res = SimplifyTerm(res);
+ if (Config.trace) {
+ System.out.println("Incremental program:");
+ System.out.println(res.pretty(0));
+ };
+ TypeInference.type_inference(res);
+ return res;
+ }
+ case repeat(lambda(`v,`u),`x,`n):
+ if (false && !is_streaming(x))
+ fail;
+ if (!(n instanceof LongLeaf))
+ throw new Error("The repeat must have a constant repetition: "+n);
+ int nn = (int)((LongLeaf)n).value();
+ if (nn < 1)
+ throw new Error("Wrong repeat number: "+n);
+ Tree nv = new_var();
+ u = #<cmap(lambda(`nv,bag(nth(`nv,0))),`u)>;
+ TypeInference.type_inference(u);
+ repeat_environment = new Environment(v.toString(),#<union>,repeat_environment);
+ match generate_incremental_code(u,null) {
+ case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a),_):
Tree deltaT = new_var();
Tree w = new_var();
TypeInference.type_env.insert(deltaT.toString(),TypeInference.type_inference(s));
@@ -1576,8 +1701,8 @@
return res;
}
};
- match generate_incremental_code(e,null) {
- case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(_,`a)):
+ match generate_incremental_code(e,null) {
+ case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(_,`a),_):
Tree tp = TypeInference.type_inference(h);
bind_pattern2type(v1,tp);
Tree nm = SimplifyTerm(#<apply(lambda(`v2,`m),`h)>);
diff --git a/core/src/main/java/org/apache/mrql/TopLevel.gen b/core/src/main/java/org/apache/mrql/TopLevel.gen
index 13f4d68..9c5ecfa 100644
--- a/core/src/main/java/org/apache/mrql/TopLevel.gen
+++ b/core/src/main/java/org/apache/mrql/TopLevel.gen
@@ -321,7 +321,7 @@
Evaluator.evaluator.initialize_query();
final Tree qt = query_type;
final String file = s.stringValue();
- Evaluator.evaluator.streaming(plan,null,new Function(){
+ Evaluator.evaluator.streaming(plan,null,null,new Function(){
long i = 0;
public MRData eval ( final MRData data ) {
try {
@@ -342,7 +342,7 @@
Evaluator.evaluator.initialize_query();
final Tree qt = query_type;
final String file = s.stringValue();
- Evaluator.evaluator.streaming(plan,null,new Function(){
+ Evaluator.evaluator.streaming(plan,null,null,new Function(){
long i = 0;
public MRData eval ( final MRData data ) {
try {
@@ -365,27 +365,10 @@
case incr(`zero,lambda(`state,`merge),lambda(_,`answer)):
final Tree qt = make_persistent_type(query_type);
MRData z = evalE(zero,null);
- final Environment env = bind_list(state,z,null);
- final boolean incr_test = false;
- if (incr_test) {
- // for code testing only
- long t = System.currentTimeMillis();
- Environment nenv = bind_list(state,Streaming.unstreamify(merge),env);
- for ( Environment ev = nenv; ev != null; ev = ev.next )
- if (ev.value instanceof MR_dataset) {
- DataSet ds = ((MR_dataset)ev.value).dataset();
- List<MRData> vals = ds.take(Config.max_bag_size_print);
- System.out.println(ev.name+" = "+vals.size()+ " "+vals);
- };
- MRData a = Evaluator.evaluator.evalE(answer,nenv);
- System.out.println(print(a,qt));
- if (!Config.quiet_execution)
- System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
- return;
- };
+ final Environment env =bind_list(state,z,null);
final Tree ans = answer;
merge = Streaming.streamify(merge);
- Evaluator.evaluator.streaming(#<lambda(`state,`merge)>,env,
+ Evaluator.evaluator.streaming(#<lambda(`state,`merge)>,env,null,
new Function() {
public MRData eval ( final MRData state ) {
MRData a = Evaluator.evaluator.evalE(ans,env);
diff --git a/core/src/main/java/org/apache/mrql/Translator.gen b/core/src/main/java/org/apache/mrql/Translator.gen
index 3be0d9f..2d9d2fe 100644
--- a/core/src/main/java/org/apache/mrql/Translator.gen
+++ b/core/src/main/java/org/apache/mrql/Translator.gen
@@ -178,7 +178,8 @@
static Trees plans_with_distributed_lambdas
= #[MapReduce,MapAggregateReduce,MapCombineReduce,FroupByJoin,Aggregate,
MapReduce2,MapCombineReduce2,MapAggregateReduce2,MapJoin,MapAggregateJoin,
- CrossProduct,CrossAggregateProduct,cMap,AggregateMap,BSP,GroupByJoin];
+ CrossProduct,CrossAggregateProduct,cMap,AggregateMap,BSP,GroupByJoin,
+ OuterMerge,RightOuterMerge];
static Trees algebraic_operators
= #[mapReduce,mapReduce2,cmap,join,groupBy,orderBy,aggregate,map,filter,repeat,closure];
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 7c80240..f6769f4 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -32,12 +32,11 @@
public static Tree make_persistent_type ( Tree tp ) {
match tp {
- case `f(...al):
- Trees bs = #[];
- for ( Tree a: al )
- bs = bs.append(make_persistent_type(a));
- String g = persistent_collection(f);
- return #<`g(...bs)>;
+ case `T(`t):
+ if (!is_collection(T))
+ fail;
+ String S = persistent_collection(T);
+ return #<`S(`t)>;
};
return tp;
}
@@ -1200,6 +1199,32 @@
};
case cstep(`x): // used in QueryPlan for closure
return type_inference(x);
+ case outerMerge(lambda(`v,`b),`s,`d):
+ Tree ts = type_inference(s);
+ if (unify(type_inference(d),ts) == null)
+ type_error(e,"Wrong outerMerge");
+ match ts {
+ case `S(tuple(`k,`tp)):
+ if (!is_collection(S))
+ fail;
+ type_env.insert(v.toString(),#<tuple(`tp,`tp)>);
+ type_inference(b);
+ return ts;
+ };
+ type_error(e,"Wrong OuterMerge");
+ case rightOuterMerge(lambda(`v,`b),`s,`d):
+ Tree ts = type_inference(s);
+ if (unify(type_inference(d),ts) == null)
+ type_error(e,"Wrong RightOuterMerge");
+ match ts {
+ case `S(tuple(`k,`tp)):
+ if (!is_collection(S))
+ fail;
+ type_env.insert(v.toString(),#<tuple(`tp,`tp)>);
+ type_inference(b);
+ return ts;
+ };
+ type_error(e,"Wrong rightOuterMerge");
case if(`p,`x,`y):
if (unify(type_inference2(p),#<bool>) == null)
type_error(e,"Expected a boolean predicate in if-then-else: "+print_query(p));
diff --git a/pom.xml b/pom.xml
index bd9079b..cb1ac62 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
<hadoop.version>1.2.1</hadoop.version>
<yarn.version>2.7.1</yarn.version>
<hama.version>0.7.0</hama.version>
- <spark.version>1.6.0</spark.version>
+ <spark.version>1.6.2</spark.version>
<scala.version>2.10</scala.version>
<flink.version>1.0.2</flink.version>
<skipTests>true</skipTests>
diff --git a/queries/factorization3.mrql b/queries/factorization3.mrql
index de61b5d..3aca55b 100644
--- a/queries/factorization3.mrql
+++ b/queries/factorization3.mrql
@@ -44,7 +44,7 @@
// cell-wise addition:
macro Cadd ( X, Y ) {
- select ( x*y, i, j )
+ select ( x+y, i, j )
from (x,i,j) in X, (y,i,j) in Y
};
diff --git a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
index 60a9bb3..616da62 100644
--- a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
@@ -19,10 +19,7 @@
import org.apache.mrql.gen.*;
import java_cup.runtime.Scanner;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.Iterator;
+import java.util.*;
import java.io.*;
import java.util.Enumeration;
import org.apache.log4j.*;
@@ -59,11 +56,14 @@
final static Bag empty_bag = new Bag();
// an HDFS tmp file used to hold the data source directory information in distributed mode
final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt";
- private static boolean first_time = true; // true at the beginning of a query execution
+ // holds the cached RDDs that need to unpersist
+ private static ArrayList<JavaRDD<MRData>> cached_rdds = new ArrayList<JavaRDD<MRData>>();
+ static Environment global_rdds = null;
/** initialize the Spark evaluator */
final public void init ( Configuration conf ) {
Config.spark_mode = true;
+ global_rdds = null;
SparkConf spark_conf = new SparkConf();
spark_conf.setAppName("MRQL");
if (Config.hadoop_mode && Config.local_mode) {
@@ -100,7 +100,6 @@
}
final public void initialize_query () {
- first_time = true;
Plan.distribute_compiled_arguments(Plan.conf);
if (spark_context != null && Config.compile_functional_arguments)
spark_context.addJar(Plan.conf.get("mrql.jar.path"));
@@ -125,9 +124,15 @@
return SparkGeneratorInputFormat.class;
}
+ final public static void GC_cached_rdds () {
+ for ( JavaRDD<MRData> rdd: cached_rdds )
+ rdd.unpersist();
+ cached_rdds = new ArrayList<JavaRDD<MRData>>();
+ }
+
/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
- final public void streaming ( Tree plan, Environment env, org.apache.mrql.Function f ) {
- SparkStreaming.evaluate(plan,env,f);
+ final public void streaming ( Tree plan, Environment env, Environment dataset_env, org.apache.mrql.Function f ) {
+ SparkStreaming.evaluate(plan,env,dataset_env,f);
}
/** used by the master to send parsing details (eg, record types) to workers */
@@ -179,6 +184,7 @@
return new Tuple2<MRContainer,MRContainer>(zero,new MRContainer(value));
}
}).saveAsHadoopFile(file,MRContainer.class,MRContainer.class,SequenceFileOutputFormat.class);
+ rdd.unpersist();
} else super.dump(file,type,data);
}
@@ -206,23 +212,14 @@
rdd.saveAsTextFile(file);
};
Config.max_bag_size_print = ps;
+ rdd.unpersist();
} else super.dump_text(file,type,data);
}
- private static void set_global_env ( Environment env ) {
- if (true || first_time) { // Bug: it doesn't need to be executed more than once per worker
- // pass the global bindings to workers
- set_global_bindings(env);
- first_time = false;
- }
- }
-
private static Function2<MRData,MRData,MRData> accumulator ( final Tree acc_fnc, final Environment env ) {
- final Environment master_env = global_env;
final org.apache.mrql.Function f = evalF(acc_fnc,env);
return new Function2<MRData,MRData,MRData>() {
public MRData call ( MRData x, MRData y ) {
- set_global_env(master_env);
return f.eval(new Tuple(x,y));
}
};
@@ -243,27 +240,27 @@
MRData z = evalE(zero,env);
match plan {
case AggregateMap(`m,`acc,_,`s):
- return evalD(#<cMap(`m,`s)>,env)
+ return evalD(#<cMap(`m,`s)>,env,null)
.aggregate(z,accumulator(acc,env),f2);
case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
if (acc.equals(#<null>))
fail;
- return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
+ return evalD(#<MapReduce(`m,`r,`s,`o)>,env,null)
.aggregate(z,accumulator(acc,env),f2);
case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
if (acc.equals(#<null>))
fail;
- return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
+ return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env,null)
.aggregate(z,accumulator(acc,env),f2);
case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
if (acc.equals(#<null>))
fail;
- return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
+ return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,null)
.aggregate(z,accumulator(acc,env),f2);
case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
if (acc.equals(#<null>))
fail;
- return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
+ return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env,null)
.aggregate(z,accumulator(acc,env),f2);
};
throw new Error("Unrecognized aggregation: "+plan);
@@ -276,13 +273,15 @@
int limit = ((MR_int)evalE(num,env)).get();
MR_rdd[] s = new MR_rdd[vs.length()];
for ( int i = 0; i < vs.length(); i++ )
- s[i] = new MR_rdd(eval(ss.nth(i),env).cache());
+ s[i] = new MR_rdd(eval(ss.nth(i),env,(Environment)null).cache());
for ( int n = 0; n < limit; n++ ) {
- Environment nenv = env;
+ Environment nenv = null;
for ( int i = 0; i < vs.length(); i ++ )
nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
- for ( int i = 0; i < vs.length(); i ++ )
- s[i] = new MR_rdd(eval(bs.nth(i),nenv).cache());
+ for ( int i = 0; i < vs.length(); i ++ ) {
+ s[i].rdd().unpersist();
+ s[i] = new MR_rdd(eval(bs.nth(i),env,nenv).cache());
+ }
};
Tuple t = new Tuple(vs.length());
for ( int i = 0; i < vs.length(); i++ )
@@ -304,57 +303,18 @@
});
}
- // Return the RDD elements at the given position
- private static Iterator<MRData> partition ( final JavaRDD<MRData> rdd, final int position ) {
- /* Doesn't work (needs the right TaskContext)
- TaskContext context = new TaskContext(0,0,(long)0,Config.local_mode,null);
- List<Partition> ps = rdd.splits();
- return rdd.iterator(ps.get(position),context);
- */
- return rdd.mapPartitionsWithIndex(new Function2<Integer,Iterator<MRData>,Iterator<MRData>>() {
- public Iterator<MRData> call ( Integer partition, Iterator<MRData> values ) {
- if (partition == position)
- return values;
- else return new ArrayList<MRData>().iterator();
- }
- },true).collect().iterator();
- }
-
- final static int MAX_CACHE_SIZE = 1000;
-
/** Convert a Spark RDD into a lazy bag
* @param rdd the Spark RDD
* @return a lazy bag that contains all RDD elements
*/
public static Bag bag ( final JavaRDD<MRData> rdd ) throws IOException {
- final JavaRDD<MRData> rd = rdd.cache();
- if (rd.count() <= MAX_CACHE_SIZE) { // small RDD
- final Iterator<MRData> i = rd.collect().iterator();
- return new Bag(new BagIterator() {
- public MRData next () {
- return i.next();
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- });
- };
- // return the RDD elements lazily, one partition at a time
- final int splits = rd.splits().size();
+ final Iterator<MRData> i = rdd.toLocalIterator();
return new Bag(new BagIterator() {
- Iterator<MRData> i = null;
- int c = 0;
public MRData next () {
return i.next();
}
public boolean hasNext () {
- do {
- if (i != null && i.hasNext())
- return true;
- if (c >= splits)
- return false;
- i = partition(rd,c++);
- } while (true);
+ return i.hasNext();
}
});
}
@@ -370,15 +330,6 @@
}
}
- private static JavaRDD<MRData> materialize ( JavaRDD<MRData> rdd ) {
- return rdd.map(new Function<MRData,MRData>() {
- public MRData call ( MRData value ) {
- value.materializeAll();
- return value;
- };
- }).cache();
- }
-
private final static Function<MRData,MRData> get_first
= new Function<MRData,MRData>() {
public MRData call ( MRData value ) {
@@ -395,9 +346,10 @@
final public DataSet eval ( final Tree e,
final Environment env,
final String counter ) {
- JavaRDD<MRData> rd = eval(e,env);
+ JavaRDD<MRData> rd = eval(e,env,(Environment)null);
long count = 0;
rd = rd.cache();
+ cached_rdds.add(rd);
if (!counter.equals("-")) {
final Accumulator<Integer> c = spark_context.intAccumulator(0);
rd.foreach(new VoidFunction<MRData>() {
@@ -415,14 +367,15 @@
/** Evaluate an MRQL physical plan using Spark and print tracing info
* @param e the physical plan
* @param env contains bindings from variables to values (MRData)
+ * @param rdd_env contains bindings from variables to RDDs
* @return a Spark RDD
*/
- final public JavaRDD<MRData> eval ( final Tree e, final Environment env ) {
+ final public JavaRDD<MRData> eval ( final Tree e, final Environment env, final Environment rdd_env ) {
if (Config.trace_execution) {
tab_count += 3;
System.out.println(tabs(tab_count)+print_query(e));
};
- final JavaRDD<MRData> res = evalD(e,env);
+ final JavaRDD<MRData> res = evalD(e,env,rdd_env);
if (Config.trace_execution)
try {
System.out.println(tabs(tab_count)+"-> "+res.take(Config.max_bag_size_print));
@@ -435,11 +388,11 @@
/* convert an MRQL lambda to a Spark Function */
private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
- final Environment master_env = global_env;
+ final Broadcast<Environment> broadcast_env = spark_context.broadcast(global_env);
final org.apache.mrql.Function f = evalF(fnc,env);
return new FlatMapFunction<MRData,MRData>() {
public Iterable<MRData> call ( MRData value ) {
- set_global_env(master_env);
+ global_env = broadcast_env.value();
return (Bag)f.eval(value);
}
};
@@ -449,17 +402,18 @@
private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) {
match o {
case true: // the result must be sorted
- return s.groupBy(get_first)
- .sortByKey()
- .map(new Function<Tuple2<MRData,Iterable<MRData>>,MRData>() {
- public MRData call ( Tuple2<MRData,Iterable<MRData>> value ) {
- return new Tuple(value._1,bag(value._2));
+ return s.mapToPair(new PairFunction<MRData,MRData,MRData>() {
+ public Tuple2<MRData,MRData> call ( MRData value ) {
+ Tuple t = (Tuple)value;
+ return new Tuple2(t.first(),t.second());
+ }
+ })
+ .sortByKey(true)
+ .map(new Function<Tuple2<MRData,MRData>,MRData>() {
+ public MRData call ( Tuple2<MRData,MRData> value ) {
+ return new Tuple(value._1,new Bag(value._2));
}})
- .flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
- public MRData call ( MRData value ) {
- return ((Tuple)value).second();
- }
- });
+ .flatMap(cmap_fnc(fnc,env));
};
return s.groupBy(get_first)
.map(new Function<Tuple2<MRData,Iterable<MRData>>,MRData> () {
@@ -479,10 +433,10 @@
}
static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
- final Environment master_env = global_env;
+ final Broadcast<Environment> broadcast_env = spark_context.broadcast(global_env);
return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
- set_global_env(master_env);
+ global_env = broadcast_env.value();
return value._2.data();
}
});
@@ -506,10 +460,8 @@
}
private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
- final Environment master_env = global_env;
return new FlatMapFunction<Iterator<MRData>,MRData>() {
public Iterable<MRData> call ( final Iterator<MRData> i ) {
- set_global_env(master_env);
return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
public MRData eval ( MRData x ) {
final MRData key = ((Tuple)x).first();
@@ -535,77 +487,93 @@
};
}
- private static Hashtable<MRData,Bag> make_built_table ( List<Tuple2<MRData,MRData>> values ) {
- Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
- for ( Tuple2<MRData,MRData> t: values ) {
- Bag entries = built_table.get(t._1);
- built_table.put(t._1,
- (entries == null)
- ? (new Bag(t._2))
- : entries.add_element(t._2));
- };
- return built_table;
- }
-
/** Evaluate MRQL physical operators using Spark
* @param e the physical plan
- * @param env contains bindings fro variables to values (MRData)
+ * @param env contains bindings from variables to values (MRData)
+ * @param rdd_env contains bindings from variables to RDDs
* @return a Spark RDD
*/
- final public JavaRDD<MRData> evalD ( final Tree e, final Environment env ) {
- final Environment master_env = global_env;
+ final public JavaRDD<MRData> evalD ( final Tree e, final Environment env, final Environment rdd_env ) {
try {
match e {
case MapAggregateReduce(`m,`r,null,_,`s,`o):
- return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
+ return evalD(#<MapReduce(`m,`r,`s,`o)>,env,rdd_env);
case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
- return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
+ return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env,rdd_env);
case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
- return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
+ return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,rdd_env);
case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
- return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
+ return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env,rdd_env);
case cMap(`f,`s):
- return eval(s,env).flatMap(cmap_fnc(f,env));
+ return eval(s,env,rdd_env).flatMap(cmap_fnc(f,env));
case MapReduce(`m,`r,`s,`o):
- return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
+ return groupBy(eval(s,env,rdd_env).flatMap(cmap_fnc(m,env)),r,env,o);
case MapCombineReduce(`m,`c,`r,`s,`o):
- return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
+ return groupBy(eval(s,env,rdd_env).flatMap(cmap_fnc(m,env))
.mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
- return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
+ return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,rdd_env).mapPartitions(combiner_fnc(evalF(c,env)));
case CrossProduct(`mx,`my,`r,`x,`y):
final org.apache.mrql.Function fr = evalF(r,env);
- return eval(x,env)
+ return eval(x,env,rdd_env)
.flatMap(cmap_fnc(mx,env))
- .cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
+ .cartesian(eval(y,env,rdd_env).flatMap(cmap_fnc(my,env)))
.flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
- set_global_env(master_env);
return (Bag)fr.eval(new Tuple(value._1,value._2));
}
});
case MapReduce2(`mx,`my,`r,`x,`y,`o):
- final org.apache.mrql.Function fx = evalF(mx,env);
- final org.apache.mrql.Function fy = evalF(my,env);
final org.apache.mrql.Function fr = evalF(r,env);
+ boolean sfx = false;
+ match mx {
+ case compiled(`fmx,lambda(`vx,bag(`mex))):
+ sfx = true;
+ mx = #<lambda(`vx,`mex)>;
+ case lambda(`vx,bag(`mex)):
+ sfx = true;
+ mx = #<lambda(`vx,`mex)>;
+ };
+ final org.apache.mrql.Function fx = evalF(mx,env);
JavaPairRDD<MRData,MRData> xs
- = eval(x,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ = (sfx)
+ ? eval(x,env,rdd_env).mapToPair(new PairFunction<MRData,MRData,MRData>() {
+ public Tuple2<MRData,MRData> call ( MRData value ) {
+ Tuple t = (Tuple)fx.eval(value);
+ return new Tuple2(t.first(),t.second());
+ }
+ })
+ : eval(x,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
- set_global_env(master_env);
return joinIterator(((Bag)fx.eval(value)).iterator());
}
});
+ boolean sfy = false;
+ match my {
+ case compiled(`fmy,lambda(`vy,bag(`mey))):
+ sfy = true;
+ my = #<lambda(`vy,`mey)>;
+ case lambda(`vy,bag(`mey)):
+ sfy = true;
+ my = #<lambda(`vy,`mey)>;
+ };
+ final org.apache.mrql.Function fy = evalF(my,env);
JavaPairRDD<MRData,MRData> ys
- = eval(y,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ = (sfy)
+ ? eval(y,env,rdd_env).mapToPair(new PairFunction<MRData,MRData,MRData>() {
+ public Tuple2<MRData,MRData> call ( MRData value ) {
+ Tuple t = (Tuple)fy.eval(value);
+ return new Tuple2(t.first(),t.second());
+ }
+ })
+ : eval(y,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
- set_global_env(master_env);
return joinIterator(((Bag)fy.eval(value)).iterator());
}
});
return xs.cogroup(ys)
.flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
- set_global_env(master_env);
return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
}
});
@@ -623,9 +591,8 @@
final MRData one = new MR_byte(1);
final MRData two = new MR_byte(2);
final JavaPairRDD<MRData,MRData> xs
- = eval(x,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ = eval(x,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
- set_global_env(master_env);
return new Iterable<Tuple2<MRData,MRData>>() {
public Iterator<Tuple2<MRData,MRData>> iterator() {
return new Iterator<Tuple2<MRData,MRData>>() {
@@ -645,9 +612,8 @@
}
});
final JavaPairRDD<MRData,MRData> ys
- = eval(y,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ = eval(y,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
- set_global_env(master_env);
return new Iterable<Tuple2<MRData,MRData>>() {
public Iterator<Tuple2<MRData,MRData>> iterator() {
return new Iterator<Tuple2<MRData,MRData>>() {
@@ -666,7 +632,7 @@
};
}
});
- return xs.union(ys).groupByKey(Config.nodes)
+ return xs.union(ys).groupByKey()
.mapPartitions(new FlatMapFunction<Iterator<Tuple2<MRData,Iterable<MRData>>>,MRData>() {
public Iterable<MRData> call ( final Iterator<Tuple2<MRData,Iterable<MRData>>> value ) {
Bag xb = new Bag();
@@ -688,22 +654,116 @@
}
};
}
+ });
+ case OuterMerge(`merge,`state,`data):
+ final org.apache.mrql.Function fm = evalF(merge,env);
+ JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env)
+ .mapPartitionsToPair(new PairFlatMapFunction<Iterator<MRData>,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( final Iterator<MRData> i ) {
+ return joinIterator(i);
+ }
+ },true); // do not repartition the state S
+ JavaPairRDD<MRData,MRData> ds = eval(data,env,rdd_env)
+ .mapToPair(new PairFunction<MRData,MRData,MRData>() {
+ public Tuple2<MRData,MRData> call ( MRData value ) {
+ Tuple t = (Tuple)value;
+ return new Tuple2(t.first(),t.second());
+ }
});
+ JavaRDD<MRData> res = S.cogroup(ds)
+ .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
+ public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
+ final Iterator<MRData> ix = value._2._1.iterator();
+ final Iterator<MRData> iy = value._2._2.iterator();
+ ArrayList<MRData> a = new ArrayList<MRData>();
+ if (ix.hasNext())
+ if (iy.hasNext())
+ a.add(new Tuple(value._1,fm.eval(new Tuple(ix.next(),iy.next()))));
+ else a.add(new Tuple(value._1,ix.next()));
+ else if (iy.hasNext())
+ a.add(new Tuple(value._1,iy.next()));
+ return a;
+ }
+ }).cache();
+ cached_rdds.add(res);
+ return res;
+ case RightOuterMerge(`merge,`state,`data):
+ final org.apache.mrql.Function fm = evalF(merge,env);
+ JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env)
+ .mapPartitionsToPair(new PairFlatMapFunction<Iterator<MRData>,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( final Iterator<MRData> i ) {
+ return joinIterator(i);
+ }
+ },true); // do not repartition the state S
+ JavaPairRDD<MRData,MRData> ds = eval(data,env,rdd_env)
+ .mapToPair(new PairFunction<MRData,MRData,MRData>() {
+ public Tuple2<MRData,MRData> call ( MRData value ) {
+ Tuple t = (Tuple)value;
+ return new Tuple2(t.first(),t.second());
+ }
+ });
+ JavaRDD<MRData> res = S.cogroup(ds)
+ .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
+ public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
+ final Iterator<MRData> ix = value._2._1.iterator();
+ final Iterator<MRData> iy = value._2._2.iterator();
+ return (ix.hasNext())
+ ? ((iy.hasNext())
+ ? new Bag(new Tuple(value._1,fm.eval(new Tuple(ix.next(),iy.next()))))
+ : new Bag())
+ : ((iy.hasNext())
+ ? new Bag(new Tuple(value._1,iy.next()))
+ : new Bag());
+ }
+ }).cache();
+ cached_rdds.add(res);
+ return res;
+ case MapJoin(lambda(`vx,bag(`mx)),lambda(`vy,bag(`my)),`r,`x,`y,`o):
+ final org.apache.mrql.Function fx = evalF(#<lambda(`vx,`mx)>,env);
+ final org.apache.mrql.Function fy = evalF(#<lambda(`vy,`my)>,env);
+ final org.apache.mrql.Function fr = evalF(r,env);
+ final Broadcast<Map<MRData,MRData>> ys
+ = spark_context.broadcast(eval(y,env,rdd_env)
+ .mapToPair(new PairFunction<MRData,MRData,MRData>() {
+ public Tuple2<MRData,MRData> call ( MRData value ) {
+ Tuple t = (Tuple)fy.eval(value);
+ return new Tuple2(t.first(),t.second());
+ }
+ }).collectAsMap());
+ return eval(x,env,rdd_env).mapPartitions(new FlatMapFunction<Iterator<MRData>,MRData>() {
+ public Iterable<MRData> call ( final Iterator<MRData> i ) {
+ final Map<MRData,MRData> m = ys.value();
+ return new Iterable<MRData>() {
+ public Iterator<MRData> iterator() {
+ return new Iterator<MRData> () {
+ public MRData next () {
+ final Tuple p = (Tuple)fx.eval(i.next());
+ final MRData pd = m.get(p.first());
+ return ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ public void remove () {}
+ };
+ }
+ };
+ }
+ },true);
case MapJoin(`mx,`my,`r,`x,`y):
final org.apache.mrql.Function fx = evalF(mx,env);
final org.apache.mrql.Function fy = evalF(my,env);
final org.apache.mrql.Function fr = evalF(r,env);
- final Broadcast<List<Tuple2<MRData,MRData>>> ys
- = spark_context.broadcast(eval(y,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
- set_global_env(master_env);
- return joinIterator(((Bag)fy.eval(value)).iterator());
- }
- }).collect());
- return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
- final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
+ final Broadcast<Map<MRData,MRData>> ys
+ = spark_context.broadcast(eval(y,env,rdd_env)
+ .flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+ return joinIterator(((Bag)fy.eval(value)).iterator());
+ }
+ }).collectAsMap());
+ return eval(x,env,rdd_env).flatMap(new FlatMapFunction<MRData,MRData>() {
public Iterable<MRData> call ( MRData value ) {
- set_global_env(master_env);
+ final Map<MRData,MRData> m = ys.value();
final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
return new Iterable<MRData>() {
public Iterator<MRData> iterator() {
@@ -718,7 +778,7 @@
return true;
while (i.hasNext()) {
p = (Tuple)i.next();
- MRData pd = built_table.get(p.first());
+ MRData pd = m.get(p.first());
Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
ix = bb.iterator();
if (ix.hasNext())
@@ -749,19 +809,18 @@
MRContainer.class,MRContainer.class,
Config.nodes));
case Merge(`x,`y):
- return eval(x,env).union(eval(y,env));
+ return eval(x,env,rdd_env).union(eval(y,env,rdd_env));
case Repeat(lambda(`v,`b),`s,`n):
int max_num = ((MR_int)evalE(n,env)).get();
JavaRDD<MRData> rd;
- JavaRDD<MRData> res = eval(s,env);
- //res = materialize(res);
+ JavaRDD<MRData> res = eval(s,env,rdd_env).cache();
int i = 0;
boolean cont = true;
do {
- rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
- res = rd.map(get_first);
- //res = materialize(res);
- long size = res.count();
+ rd = eval(b,env,new Environment(v.toString(),new MR_rdd(res),rdd_env)).cache();
+ res.unpersist();
+ res = rd.map(get_first).cache();
+ long size = rd.count();
Integer true_results
= rd.aggregate(new Integer(0),
new Function2<Integer,MRData,Integer>() {
@@ -778,23 +837,28 @@
if (size == 0)
System.err.println("Repeat #"+i);
else System.err.println("Repeat #"+i+": "+true_results+" true results out of "+size);
+ rd.unpersist();
} while (cont);
+ cached_rdds.add(res);
return res;
case Closure(lambda(`v,`b),`s,`m):
int max_num = ((MR_int)evalE(m,env)).get();
- JavaRDD<MRData> res = eval(s,env).cache();
+ JavaRDD<MRData> res = eval(s,env,rdd_env).cache();
long n = 0;
long old = 0;
int i = 0;
boolean cont = true;
do {
- res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
+ JavaRDD<MRData> rd = eval(b,env,new Environment(v.toString(),new MR_rdd(res),rdd_env)).cache();
+ res.unpersist();
+ res = rd;
old = n;
n = res.count();
i++;
if (!Config.testing)
System.err.println("Repeat #"+i+": "+(old-n)+" new records");
} while (old < n && i < max_num);
+ cached_rdds.add(res);
return res;
case Generator(`min,`max,`size):
DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
@@ -815,20 +879,23 @@
else if (x instanceof Bag)
((Bag)x).materialize();
new_distributed_binding(v.toString(),x);
- return eval(body,new Environment(v.toString(),x,env));
+ return eval(body,new Environment(v.toString(),x,env),rdd_env);
case let(`v,`u,`body):
MRData val = evalE(u,env);
Interpreter.new_global_binding(v.toString(),val);
- return eval(body,new Environment(v.toString(),val,env));
+ return eval(body,new Environment(v.toString(),val,env),rdd_env);
case Let(`v,`u,`body):
- return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env).cache()),env));
+ JavaRDD<MRData> val = eval(u,env,rdd_env).cache();
+ JavaRDD<MRData> rd = eval(body,env,new Environment(v.toString(),new MR_rdd(val),rdd_env));
+ cached_rdds.add(val);
+ return rd;
case If(`c,`x,`y):
if (((MR_bool)evalE(c,env)).get())
- return eval(x,env);
- else return eval(y,env);
+ return eval(x,env,rdd_env);
+ else return eval(y,env,rdd_env);
case trace(`msg,`tp,`x):
long n = pre_trace(((MR_string)evalE(msg,env)).get());
- JavaRDD<MRData> ds = evalD(x,env);
+ JavaRDD<MRData> ds = evalD(x,env,rdd_env);
trace(n,tp,new Bag(ds.take(Config.max_bag_size_print)));
return ds;
case apply(`f,`arg):
@@ -839,7 +906,13 @@
case `v:
if (!v.is_variable())
fail;
- MRData x = variable_lookup(v.toString(),env);
+ MRData x = variable_lookup(v.toString(),rdd_env);
+ if (x != null && x instanceof MR_rdd)
+ return ((MR_rdd)x).rdd();
+ x = variable_lookup(v.toString(),global_rdds);
+ if (x != null && x instanceof MR_rdd)
+ return ((MR_rdd)x).rdd();
+ x = variable_lookup(v.toString(),global_env);
if (x != null)
if (x instanceof MR_dataset)
return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
@@ -855,12 +928,10 @@
l.add(x);
return spark_context.parallelize(l);
};
- x = variable_lookup(v.toString(),global_env);
+ x = variable_lookup(v.toString(),env);
if (x != null)
if (x instanceof MR_dataset)
return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
- else if (x instanceof MR_rdd)
- return ((MR_rdd)x).rdd();
else if (x instanceof Bag) {
ArrayList<MRData> l = new ArrayList<MRData>();
for ( MRData a: (Bag)x )
diff --git a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
index d0ec379..1ae28b1 100644
--- a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
@@ -29,6 +29,7 @@
import scala.collection.Seq;
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction2;
+import org.apache.spark.storage.RDDInfo;
/** Evaluates physical plans in Apache Spark stream mode */
@@ -74,28 +75,29 @@
return plan;
}
- private final static Evaluator ef = Evaluator.evaluator;
+ private final static SparkEvaluator ef = (SparkEvaluator)Evaluator.evaluator;
/** bind the pattern variables to values */
- private static Environment bind_list ( Tree pattern, Tree src, Environment env ) {
+ private static void bind_list ( Tree pattern, Tree src, Environment env, Environment rdd_env ) {
match pattern {
case tuple(...ps):
int i = 0;
match src {
case tuple(...es):
for ( Tree p: ps )
- env = bind_list(p,es.nth(i++),env);
- return env;
+ bind_list(p,es.nth(i++),env,rdd_env);
}
};
- MRData value = ef.evalE(src,env);
- env.replace(pattern.toString(),value);
+ MRData value = new MR_rdd(ef.eval(src,env,rdd_env));
+ if (variable_lookup(pattern.toString(),global_rdds) == null)
+ global_rdds = new Environment(pattern.toString(),value,global_rdds);
+ else global_rdds.replace(pattern.toString(),value);
if (repeat_variables.member(pattern))
Interpreter.new_distributed_binding(pattern.toString(),value);
- return env;
}
- private static void stream_processing ( final Tree plan, final Environment env, final Function f ) {
+ private static void stream_processing ( final Tree plan, final Environment env,
+ final Environment dataset_env, final Function f ) {
if (streams.size() == 0)
throw new Error("No input streams in query");
ArrayList<DStream<?>> rdds = new ArrayList<DStream<?>>();
@@ -106,7 +108,7 @@
new AbstractFunction2<Seq<RDD<?>>,Time,RDD<MRData>>() {
public RDD<MRData> apply ( Seq<RDD<?>> rdds, Time tm ) {
long t = System.currentTimeMillis();
- Environment new_env = env;
+ Environment rdd_env = dataset_env;
int i = 0;
for ( RDD<?> rdd: JavaConversions.seqAsJavaList(rdds) ) {
try {
@@ -115,17 +117,25 @@
} catch (Exception ex) {
return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
};
- new_env = new Environment(vars.get(i++),new MR_rdd(new JavaRDD(rdd,classtag)),new_env);
+ MRData d = new MR_rdd(new JavaRDD(rdd,classtag));
+ global_rdds = new Environment(vars.get(i),d,global_rdds);
+ rdd_env = new Environment(vars.get(i++),d,rdd_env);
};
match plan {
case lambda(`pat,`e):
- new_env = bind_list(pat,e,new_env);
+ bind_list(pat,e,env,rdd_env);
f.eval(null);
case _: // non-incremental streaming
- f.eval(ef.evalE(plan,new_env));
+ f.eval(ef.evalE(plan,env));
};
if (!Config.quiet_execution)
System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
+ for ( RDD<?> rdd: JavaConversions.seqAsJavaList(rdds) )
+ rdd.unpersist(true);
+ GC_cached_rdds();
+ System.gc();
+ for ( RDDInfo rdi: SparkEvaluator.spark_context.sc().getRDDStorageInfo() )
+ System.err.println("Warning: cached RDD "+rdi);
if (++tries >= Config.stream_tries) { // for performance experiments only
tries = 0;
SparkEvaluator.stream_context.stop(true,true);
@@ -138,16 +148,16 @@
}
/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
- final public static void evaluate ( Tree plan, Environment env, Function f ) {
+ final public static void evaluate ( Tree plan, Environment env, Environment dataset_env, Function f ) {
streams = new ArrayList<JavaInputDStream<MRData>>();
stream_names = new ArrayList<String>();
match plan {
case lambda(`p,`b):
b = get_streams(b,env);
- stream_processing(#<lambda(`p,`b)>,env,f);
+ stream_processing(#<lambda(`p,`b)>,env,null,f);
case _: // non-incremental streaming
plan = get_streams(plan,env);
- stream_processing(plan,env,f);
+ stream_processing(plan,env,dataset_env,f);
};
stream_context.start();
stream_context.awaitTermination();