blob: f54cad8454eb44d40fe8b0de74c0394fb80c0a47 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
/** Generates code for streaming queries */
final public class Streaming extends AlgebraicOptimization {
private static int istate = 0;
// set it to true to debug the monoid inference
private static boolean inference_tracing = false;
// set it to true to debug the split function
private static boolean split_tracing = false;
/** An environment that binds variables to monoids */
final static class Environment {
public String name;
public Tree monoid;
public Environment next;
Environment ( String n, Tree m, Environment next ) {
name = n;
monoid = m;
this.next = next;
}
public String toString () {
return name+": "+monoid+((next==null)?"":", "+next.toString());
}
}
/** Check if name is defined in the environment
* @param name a variable name
* @param env an environment that binds variables to monoids
* @return true if the name is defined in env
*/
private static boolean member ( String name, Environment env ) {
for ( Environment en = env; en != null; en = en.next )
if (en.name.equals(name))
return true;
return false;
}
/** if name is defined in the environment, return its binding
* @param name a variable name
* @param env an environment that binds variables to monoids
* @return the monoid associated with the variable
*/
private static Tree find ( String name, Environment env ) {
for ( Environment en = env; en != null; en = en.next )
if (en.name.equals(name))
return en.monoid;
return #<none>;
}
private static Environment repeat_environment = null;
/* true if e is a variable in a repeat loop */
private static boolean repeat_var ( Tree e ) {
return e.is_variable() && member(e.toString(),repeat_environment);
}
private static int tab_count = -3;
/** If e is a homomorphism, return the associated monoid;
* if it returns none, then it fails to find a monoid;
* if it returns fixed, then its invariant.
* @param e an MRQL algebraic term
* @param env binds variables to monoids
* @return the monoid, or none if it doesn't exist
*/
static Tree inference ( Tree e, Environment env ) {
if (inference_tracing) {
tab_count += 3;
System.out.println(Interpreter.tabs(tab_count)+e);
if (env != null)
System.out.println(Interpreter.tabs(tab_count)+env.toString());
};
Tree res = inference_trace(e,env);
if (inference_tracing) {
System.out.println(Interpreter.tabs(tab_count)+"-> "+res);
tab_count -= 3;
};
return res;
}
private static Tree inference_trace ( Tree e, Environment env ) {
match e {
case groupBy(`x):
match inference(x,env) {
case none: return #<none>;
case fixed: return #<fixed>;
case `m:
return #<groupBy(`m)>;
}
case orderBy(`x):
match inference(x,env) {
case none: return #<none>;
case fixed: return #<fixed>;
case `m:
return #<orderBy(`m)>;
}
case coGroup(`x,`y):
Tree mx = inference(x,env);
Tree my = inference(y,env);
if (mx.equals(#<none>) || my.equals(#<none>))
return #<none>;
else if (mx.equals(#<fixed>) && my.equals(#<fixed>))
return #<fixed>;
else return #<groupBy(product(`mx,`my))>;
case cmap(lambda(`v,bag(`w)),`x):
if (!w.equals(v))
fail;
return inference(x,env);
case cmap(lambda(`v,bag(tuple(`k,`u))),`x):
match inference(x,env) {
case `gb(`m):
if (! #[groupBy,orderBy].member(#<`gb>) ) fail;
match inference(k,new Environment(v.toString(),#<product(fixed,`m)>,env)) {
case fixed:
Tree b = inference(u,new Environment(v.toString(),#<product(fixed,`m)>,env));
if (b.equals(#<none>))
fail;
return #<`gb(`b)>;
}
};
fail
case cmap(lambda(`v,`b),`x):
match inference(x,env) {
case union:
match inference(b,new Environment(v.toString(),#<fixed>,env)) {
case `gb(`m):
return #<`gb(`m)>;
};
return #<union>;
case `gb(`m):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
match inference(b,new Environment(v.toString(),#<product(fixed,`m)>,env)) {
case `gb2(`m2):
if (! #[groupBy,orderBy].member(#<`gb2>) )
fail;
return #<`gb2(`m2)>;
case union:
return #<union>;
case fixed:
return #<fixed>;
}
case fixed:
match inference(b,new Environment(v.toString(),#<fixed>,env)) {
case union:
return #<union>;
case fixed:
return #<fixed>;
case `gb(`m):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
return #<`gb(`m)>;
}
};
fail
case reduce(groupBy(count),`u):
if (inference(u,env).equals(#<none>))
fail;
return #<groupBy(count)>;
case reduce(groupBy(`m),`u):
match inference(u,env) {
case groupBy(union):
return #<groupBy(`m)>;
};
return #<none>;
case reduce(count,`u):
if (inference(u,env).equals(#<none>))
fail;
return #<count>;
case reduce(`m,`u):
match inference(u,env) {
case union:
return m;
case fixed:
return #<fixed>;
};
return #<none>;
case tuple(...s):
Trees ms = #[ ];
for ( Tree x: s )
match inference(x,env) {
case none:
return #<none>;
case `m:
ms = ms.append(m);
};
boolean fixed = true;
for (Tree mm: ms)
fixed &= mm.equals(#<fixed>);
if (fixed)
return #<fixed>;
else return #<product(...ms)>;
case record(...s):
Trees ms = #[ ];
for ( Tree b: s )
match b {
case bind(`n,`x):
match inference(x,env) {
case none:
return #<none>;
case `m:
ms = ms.append(#<bind(`n,`m)>);
}
};
boolean fixed = true;
for (Tree mm: ms)
match mm {
case bind(_,`m):
fixed &= m.equals(#<fixed>);
};
if (fixed)
return #<fixed>;
else return #<record(...ms)>;
case nth(`u,`i):
int n = (int)i.longValue();
match inference(u,env) {
case fixed:
return #<fixed>;
case product(...ms):
return ms.nth(n);
};
return #<none>;
case project(`u,`a):
match inference(u,env) {
case fixed:
return #<fixed>;
};
return #<none>;
case if(`p,`e1,`e2):
return inference(e1,env);
case bag(`u):
match inference(u,env) {
case fixed:
return #<fixed>;
};
return #<none>;
case call(stream,...):
return #<union>;
case call(source,...):
return #<fixed>;
case call(`f,...s):
Trees ms = #[ ];
for ( Tree x: s )
match inference(x,env) {
case none:
return #<none>;
case `m:
ms = ms.append(m);
};
boolean fixed = true;
for (Tree mm: ms)
fixed &= mm.equals(#<fixed>);
if (fixed)
return #<fixed>;
return #<none>;
case bind(_,`u):
return inference(u,env);
case typed(`u,_):
return inference(u,env);
case _:
if (e.is_string() || e.is_long() || e.is_double())
return #<fixed>;
else if (e.is_variable())
if (repeat_var(e))
return find(e.toString(),repeat_environment);
else if (member(e.toString(),env))
return find(e.toString(),env);
else return #<fixed>;
};
return #<none>;
}
public static Tree inference ( Tree e ) {
return inference(e,null);
}
private static Tree smap1 ( Tree f, Tree e ) {
Tree nv = new_var();
Tree b = new_var();
return #<cmap(lambda(`nv,cmap(lambda(`b,bag(tuple(nth(`nv,0),`b))),
apply(`f,tuple(nth(nth(`nv,0),0),nth(`nv,1))))),
`e)>;
}
private static Tree smap2 ( Tree f, Tree e ) {
Tree nv = new_var();
Tree nw = new_var();
return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(nth(`nw,0),
tuple(nth(`nv,0),nth(`nw,1))))),
apply(`f,tuple(nth(nth(`nv,0),0),nth(`nv,1))))),
`e)>;
}
private static Tree smap3 ( Tree f, Tree e ) {
Tree nv = new_var();
Tree nw = new_var();
return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(nth(`nw,0),
tuple(tuple(),nth(`nw,1))))),
apply(`f,`nv))),
`e)>;
}
private static Tree swap ( Tree e ) {
Tree nv = new_var();
return #<cmap(lambda(`nv,bag(tuple(tuple(nth(`nv,0),nth(nth(`nv,1),0)),
nth(nth(`nv,1),1)))),
`e)>;
}
private static Tree mix ( Tree e ) {
Tree nv = new_var();
Tree x = new_var();
Tree y = new_var();
Tree b = #<bag(tuple(tuple(nth(`nv,0),
tuple(nth(`x,0),nth(`y,0))),
tuple(nth(`x,1),nth(`y,1))))>;
return #<cmap(lambda(`nv,cmap(lambda(`x,cmap(lambda(`y,`b),
groupBy(nth(nth(`nv,1),1)))),
groupBy(nth(nth(`nv,1),0)))),
`e)>;
}
/** inject lineage (all groupBy/join keys) to a query e
* @param e an MRQL query
* @return an algebraic term that associates lineage to every query result
*/
public static Tree inject_q ( Tree e ) {
match e {
case reduce(`m,cmap(`f,call(stream,...s))):
return #<bag(tuple(tuple(),`e))>;
case reduce(`m,cmap(`f,`v)):
if (!repeat_var(v))
fail;
return #<bag(tuple(tuple(),`e))>;
case reduce(`m,cmap(`f,`u)):
return #<reduce(groupBy(`m),`(smap1(f,inject_e(u))))>;
case reduce(`m,call(stream,...s)):
return #<bag(tuple(tuple(),`e))>;
case reduce(`m,`v):
if (!repeat_var(v))
fail;
return #<bag(tuple(tuple(),`e))>;
case cmap(`f,call(stream,...s)):
Tree a = new_var();
Tree b = new_var();
return #<cmap(lambda(`a,cmap(lambda(`b,bag(tuple(tuple(),`b))),
apply(`f,`a))),
call(stream,...s))>;
case cmap(`f,`v):
if (!repeat_var(v))
fail;
Tree a = new_var();
Tree b = new_var();
return #<cmap(lambda(`a,cmap(lambda(`b,bag(tuple(tuple(),`b))),
apply(`f,`a))),
`v)>;
case cmap(`f,`u):
return smap1(f,inject_e(u));
};
// otherwise, it's a constant
return #<bag(tuple(tuple(),`e))>;
}
private static boolean stream_source ( Tree e ) {
match e {
case call(stream,...):
return true;
};
return repeat_var(e);
}
/** inject lineage (all groupBy/join keys) to a groupBy/join term e
* @param e a groupBy or coGroup (join) term
* @return an algebraic term that associates lineage to every query result
*/
static Tree inject_e ( Tree e ) {
match e {
case `gb(`c):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
return #<`gb(`(swap(inject_c(c))))>;
case coGroup(`c1,`c2):
match c1 {
case cmap(_,`u):
if (!stream_source(u))
fail;
match c2 {
case cmap(_,`s):
if (!stream_source(s))
fail;
Tree nv = new_var();
return #<cmap(lambda(`nv,bag(tuple(tuple(nth(`nv,0),tuple(tuple(),tuple())),
nth(`nv,1)))),
coGroup(`c1,`c2))>;
};
Tree nv = new_var();
Tree y = new_var();
return #<cmap(lambda(`nv,cmap(lambda(`y,bag(tuple(tuple(nth(`nv,0),
tuple(tuple(),nth(`y,0))),
tuple(nth(nth(`nv,1),0),
nth(`y,1))))),
groupBy(nth(nth(`nv,1),1)))),
coGroup(`c1,`(inject_c(c2))))>;
};
match c2 {
case cmap(_,`u):
if (!stream_source(u))
fail;
Tree nv = new_var();
Tree x = new_var();
return #<cmap(lambda(`nv,cmap(lambda(`x,bag(tuple(tuple(nth(`nv,0),
tuple(nth(`x,0),tuple())),
tuple(nth(`x,1),
nth(nth(`nv,1),1))))),
groupBy(nth(nth(`nv,1),0)))),
coGroup(`(inject_c(c1)),`c2))>;
};
return mix(#<coGroup(`(inject_c(c1)),`(inject_c(c2)))>);
};
return inject_c(e);
}
/** Inject lineage (all groupBy/join keys) to a cMap e
* @param e a cMap term
* @return an algebraic term that associates lineage to every query result
*/
static Tree inject_c ( Tree e ) {
match e {
case cmap(`f,call(stream,...s)):
return smap3(f,#<call(stream,...s)>);
case cmap(`f,`v):
if (!repeat_var(v))
fail;
return smap3(f,v);
case cmap(`f,`u):
return smap2(f,inject_e(u));
case call(stream,...s):
return smap3(#<lambda(x,bag(x))>,#<call(stream,...s)>);
case `v:
if (!repeat_var(v))
fail;
return smap3(#<lambda(x,bag(x))>,v);
};
return e;
}
/** find all homomorphic terms in the algebraic term e
* @param e an algebraic term
* @param env binds variables to monoids
* @return a list of homomorphic terms
*/
static Trees find_homomorphisms ( Tree e, Environment env ) {
if (! #[none,fixed].member(inference(e,env)))
return #[ `e ];
match e {
case cmap(lambda(`v,bag(`b)),`x):
match inference(x,env) {
case union:
return #[ `e ];
case `gb(`m):
if (! #[groupBy,orderBy].member(#<`gb>) ) fail;
match inference(b,new Environment(v.toString(),#<product(fixed,`m)>,env)) {
case product(...):
return #[ `e ];
}
case fixed:
match inference(b,new Environment(v.toString(),#<fixed>,env)) {
case union:
return #[ `e ];
case fixed:
return #[ `e ];
}
};
fail
case record(...ds):
Trees bs = #[ ];
for ( Tree d: ds )
match d {
case bind(`n,`a):
bs = union(bs,find_homomorphisms(a,env));
};
return bs;
case call(`f,...as):
Trees bs = #[ ];
for ( Tree a: as )
bs = union(bs,find_homomorphisms(a,env));
return bs;
case nth(`u,`n):
Trees bs = #[ ];
for ( Tree x: find_homomorphisms(u,env) )
bs = union(bs,#[nth(`x,`n)]);
return bs;
case project(`u,`a):
Trees bs = #[ ];
for ( Tree x: find_homomorphisms(u,env) )
bs = union(bs,#[project(`x,`a)]);
return bs;
case `f(...as):
Trees bs = #[ ];
for ( Tree a: as )
bs = union(bs,find_homomorphisms(a,env));
return bs;
};
return #[ ];
}
/** Split the term e into a homomorhism and an answer function
* @param e an algebraic term
* @param var the input variable of the answer function
* @param env binds variables to monoids
* @return a pair (answer,homomorphism)
*/
static Tree split ( Tree e, Tree var, Environment env ) {
if (split_tracing) {
tab_count += 3;
System.out.println(Interpreter.tabs(tab_count)+var+"\n"
+Interpreter.tabs(tab_count)+e.pretty(tab_count));
};
Tree res = split_trace(e,var,env);
if (split_tracing) {
System.out.println(Interpreter.tabs(tab_count)+"-> "+res.pretty(tab_count+3));
tab_count -= 3;
};
return res;
}
/** Split the term e into a homomorhism and an answer function
* @param e an algebraic term
* @param var the input variable of the answer function
* @param env binds variables to monoids
* @return a pair (answer,homomorphism)
*/
static Tree split_trace ( Tree e, Tree var, Environment env ) {
match e {
case cmap(lambda(`v,bag(tuple(`k,`b))),`u):
Environment nenv = env;
Tree nv = new_var();
match inference(u,env) {
case union:
nv = #<nth(`nv,1)>;
nenv = new Environment(v.toString(),#<union>,nenv);
case `gb(`m):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
nenv = new Environment(v.toString(),#<product(fixed,`m)>,nenv);
};
Trees hs = find_homomorphisms(b,nenv);
if (hs.length() == 1 && hs.nth(0).equals(b))
return #<pair(`var,`e)>;
Tree ne = b;
int i = 0;
for ( Tree h: hs ) {
ne = subst(h,#<nth(nth(`nv,1),`i)>,ne);
i++;
};
return #<pair(cmap(lambda(`nv,bag(tuple(nth(`nv,0),`ne))),`var),
cmap(lambda(`v,bag(tuple(`k,tuple(...hs)))),`u))>;
case cmap(lambda(`v,`b),`u):
match inference(u,env) {
case `gb(`m):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
Environment nenv = new Environment(v.toString(),#<product(fixed,`m)>,env);
match inference(b,nenv) {
case `gb2(`m2):
if (! #[groupBy,orderBy].member(#<`gb2>) )
fail;
return #<pair(`var,`e)>;
case _:
Tree nv = new_var();
match split(b,nv,nenv) {
case pair(`a,`h):
Tree nw = new_var();
if (occurences(v,a) == 0) {
a = subst(nv,var,a);
return #<pair(`a,cmap(lambda(`v,`h),`u))>;
};
// The answer function needs v; need to piggyback v along with the state
a = subst(nv,#<bag(tuple(nth(`nw,0),nth(nth(`nw,1),1)))>,
subst(v,#<nth(nth(`nw,1),0)>,a));
return #<pair(cmap(lambda(`nw,`a),`var),
cmap(lambda(`v,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
tuple(`v,nth(`nv,1))))),
`h)),
`u))>;
}
}
case `m:
if (! #[union,fixed].member(m))
fail;
Tree nv = new_var();
match split(b,nv,new Environment(v.toString(),m,env)) {
case pair(`a,`h):
Tree nw = new_var();
if (occurences(v,a) == 0) {
a = subst(nv,var,a);
return #<pair(`a,cmap(lambda(`v,`h),`u))>;
};
// The answer function needs v; need to piggyback v along with the state
a = subst(nv,#<bag(tuple(nth(`nw,0),nth(nth(`nw,1),1)))>,
subst(v,#<nth(nth(`nw,1),0)>,a));
return #<pair(cmap(lambda(`nw,`a),`var),
cmap(lambda(`v,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
tuple(`v,nth(`nv,1))))),
`h)),
`u))>;
}
}
case call(stream,...):
return #<pair(`var,`e)>;
case reduce(`m,`u):
match inference(u,env) {
case union:
return #<pair(`var,`e)>;
case _:
match split(u,var,env) {
case pair(`b,`h):
return #<pair(reduce(`m,`b),`h)>;
}
};
case tuple(...as):
Trees bs = #[ ];
Trees hs = #[ ];
int i = 0;
for ( Tree a: as )
match split(a,#<nth(`var,`(i++))>,env) {
case pair(`b,`h):
bs = bs.append(b);
hs = hs.append(h);
};
return #<pair(tuple(...bs),tuple(...hs))>;
case record(...as):
Trees bs = #[ ];
Trees hs = #[ ];
for ( Tree a: as )
match a {
case bind(`v,`u):
match split(u,#<project(`var,`a)>,env) {
case pair(`b,`h):
bs = bs.append(#<bind(`v,`b)>);
hs = hs.append(#<bind(`v,`h)>);
}
};
return #<pair(record(...bs),record(...hs))>;
case call(`f,...as):
if (! #[fixed].member(inference(e,env)))
fail;
return #<pair(`e,tuple())>;
case call(`f,`a):
match split(a,var,env) {
case pair(`b,`h):
return #<pair(call(`f,`b),`h)>;
};
case call(`f,...as):
Tree nv = new_var();
Trees bs = #[ ];
Trees hs = #[ ];
int i = 0;
for ( Tree a: as )
match split(a,#<nth(`nv,`(i++))>,env) {
case pair(`b,`h):
bs = bs.append(b);
hs = hs.append(h);
};
return #<pair(apply(lambda(`nv,call(`f,...bs)),`var),tuple(...hs))>;
case nth(`u,`n):
match split(u,var,env) {
case pair(`b,`h):
return #<pair(nth(`b,`n),`h)>;
};
case project(`u,`a):
match split(u,var,env) {
case pair(`b,`h):
return #<pair(project(`b,`a),`h)>;
};
case bag(tuple(`k,`u)):
Tree ne = #<nth(call(elem,`var),1)>;
match TypeInference.type_inference(u) {
case `S(_):
if (!is_persistent_collection(S))
fail;
ne = #<nth(call(elem,Collect(`var)),1)>;
};
match split(u,ne,env) {
case pair(`b,`h):
return #<pair(`b,bag(tuple(`k,`h)))>;
};
case if(`p,`u,bag()):
match split(u,var,env) {
case pair(`b,`h):
return #<pair(`b,if(`p,`h,bag()))>;
};
case `f(...as):
Tree nv = new_var();
Trees bs = #[ ];
Trees hs = #[ ];
int i = 0;
for ( Tree a: as )
match split(a,#<nth(`nv,`(i++))>,env) {
case pair(`b,`h):
bs = bs.append(b);
hs = hs.append(h);
};
return #<pair(apply(lambda(`nv,`f(...bs)),`var),tuple(...hs))>;
};
return #<pair(`var,`e)>;
}
/** Return the monoid of a query e */
private static Tree get_monoid ( Tree e, Environment env ) {
match e {
case reduce(`m,`u):
return #<groupBy(`m)>;
case cmap(`f,call(stream,...s)):
return #<union>;
case cmap(`f,`v):
if (!repeat_var(v))
fail;
return #<union>;
case cmap(lambda(`v,`b),`u):
match inference(u,env) {
case groupBy(`m):
return get_monoid(b,new Environment(v.toString(),#<product(fixed,`m)>,env));
case `m:
return get_monoid(b,env);
}
case bag(tuple(`k,`v)):
Tree m = inference(v,env);
return #<groupBy(`m)>;
case if(`p,`e1,bag()):
return get_monoid(e1,env);
};
throw new Error("Unable to find the merge function for: "+e);
//return #<groupBy(fixed)>;
}
private static Tree convert_reduce_all ( Tree m, Tree e ) {
match m {
case fixed:
return #<call(elem,`e)>;
case union:
Tree v = new_var();
return #<cmap(lambda(`v,`v),`e)>;
case product(...ps):
int i = 0;
Trees as = #[ ];
for ( Tree p: ps ) {
Tree nv = new_var();
as = as.append(convert_reduce_all(p,#<cmap(lambda(`nv,bag(nth(`nv,`(i++)))),`e)>));
};
return #<tuple(...as)>;
case record(...bs):
Trees as = #[ ];
for ( Tree b: bs )
match b {
case bind(`n,`p):
Tree nv = new_var();
Tree nb = convert_reduce_all(p,#<cmap(lambda(`nv,bag(project(`nv,`n))),`e)>);
as = as.append(#<bind(`n,`nb)>);
};
return #<record(...as)>;
case `gb(`gm):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
Tree v = new_var();
Tree me = convert_reduce_all(gm,#<nth(`v,1)>);
return #<cmap(lambda(`v,bag(tuple(nth(`v,0),`me))),
`gb(`e))>;
case _:
if (!m.is_variable())
fail;
for ( Tree monoid: monoids )
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
if (#<`aggr>.equals(m)) {
plus = Normalization.rename(plus);
return #<aggregate(`plus,`zero,`e)>;
}
};
};
return #<call(elem,`e)>;
}
/** Convert a reduce on the monoid m to an aggregation */
private static Tree convert_reduce ( Tree m, Tree e ) {
match m {
case fixed:
return e;
case union:
return e;
case product(...ps):
int i = 0;
Trees as = #[ ];
for ( Tree p: ps )
as = as.append(convert_reduce(p,#<nth(`e,`(i++))>));
return #<tuple(...as)>;
case record(...bs):
Trees as = #[ ];
for ( Tree b: bs )
match b {
case bind(`n,`p):
as = as.append(#<bind(`n,`(convert_reduce(p,#<project(`e,`n)>)))>);
};
return #<record(...as)>;
case `gb(`gm):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
Tree v = new_var();
Tree me = convert_reduce_all(gm,#<nth(`v,1)>);
return #<cmap(lambda(`v,bag(tuple(nth(`v,0),`me))),
`gb(`e))>;
case _:
if (!m.is_variable())
fail;
for ( Tree monoid: monoids )
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
if (#<`aggr>.equals(m))
return #<call(`aggr,`e)>;
};
};
return e;
}
/** Convert coGroups back to joins and reduces to aggregations */
private static Tree convert_to_algebra ( Tree e ) {
match e {
case coGroup(`x,`y):
Tree v = new_var();
Tree vx = new_var();
Tree vy = new_var();
Tree cx = convert_to_algebra(x);
Tree cy = convert_to_algebra(y);
Tree mx = new_var();
Tree my = new_var();
return #<join(lambda(`vx,nth(`vx,0)),lambda(`vy,nth(`vy,0)),
lambda(`v,bag(tuple(call(join_key,nth(`v,0),nth(`v,1)),
tuple(cmap(lambda(`vx,bag(nth(`vx,1))),nth(`v,0)),
cmap(lambda(`vy,bag(nth(`vy,1))),nth(`v,1)))))),
`cx, `cy)>;
case reduce(`aggr,`s):
return convert_reduce(aggr,convert_to_algebra(s));
case `f(...as):
Trees bs = #[ ];
for ( Tree a: as )
bs = bs.append(convert_to_algebra(a));
return #<`f(...bs)>;
};
return e;
}
/** If the key of the join for merging states contains a float/double number,
* use approximing equality */
private static Tree key_equality ( Tree type, Tree e ) {
if (occurences(#<float>,type) == 0 && occurences(#<double>,type) == 0)
return e;
match type {
case tuple(...as):
Trees bs = #[ ];
int i = 0;
for ( Tree a: as )
bs = bs.append(key_equality(a,#<nth(`e,`(i++))>));
return #<tuple(...bs)>;
case record(...bs):
Trees cs = #[ ];
for ( Tree b: bs )
match b {
case bind(`n,`a):
cs = cs.append(#<bind(`n,`(key_equality(a,#<project(`e,`n)>)))>);
}
return #<record(...cs)>;
case `T(`tp):
if (!is_collection(T))
fail;
Tree nv = new_var();
return #<cmap(lambda(`nv,`(key_equality(tp,nv))),`e)>;
case float:
return #<call(round,`e)>;
case double:
return #<call(round,`e)>;
};
return e;
}
/** true if there is only one instance value of type tp */
private static boolean unique_key ( Tree tp ) {
match tp {
case tuple():
return true;
case `T(`t):
if (!is_collection(T))
fail;
return unique_key(t);
case tuple(...as):
for ( Tree a: as )
if (!unique_key(a))
return false;
return true;
case record(...as):
for ( Tree a: as )
match a {
case bind(_,`b):
if (!unique_key(b))
return false;
}
return true;
};
return false;
}
/** Return the merge function (over X and Y) of the monoid m; type is used for key equality */
private static Tree merge ( Tree m, Tree type, Tree X, Tree Y, boolean first ) {
match m {
case `gb(`n):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
match type {
case `T(tuple(`keytp,`tp)):
if (first) {
Tree nv = new_var();
Tree mb = merge(n,tp,#<nth(`nv,0)>,#<nth(`nv,1)>,false);
return #<outerMerge(lambda(`nv,`mb),`X,`Y)>;
} else if (unique_key(keytp)) {
Tree vx = new_var();
Tree vy = new_var();
Tree v = merge(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`vy),1)>,false);
if (is_persistent_collection(T)) {
X = #<Collect(`X)>;
Y = #<Collect(`Y)>;
};
return #<let(`vx,`X,
let(`vy,`Y,
if(call(exists,`vx),
if(call(exists,`vy),
bag(tuple(`keytp,`v)),
`vx),
`vy)))>;
};
// needs an outer-join in the reducer function
Tree v = new_var();
Tree vx = new_var();
Tree vy = new_var();
Tree mx = new_var();
Tree my = new_var();
Tree mb = merge(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>,false);
Tree b = #<cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),`mb))),
`my)),
`mx)>;
return #<join(lambda(`vx,`(key_equality(keytp,#<nth(`vx,0)>))),
lambda(`vy,`(key_equality(keytp,#<nth(`vy,0)>))),
lambda(`v,let(`mx,nth(`v,0),
let(`my,nth(`v,1),
if(call(exists,`mx),
if(call(exists,`my),`b,`mx),
`my)))),
`X, `Y)>;
};
throw new Error("Unknown type for merge: "+type);
case product(...ms):
Trees tps = ((Node)type).children();
Trees bs = #[ ];
int i = 0;
for ( Tree a: ms ) {
bs = bs.append(merge(a,tps.nth(i),#<nth(`X,`i)>,#<nth(`Y,`i)>,first));
i++;
};
return #<tuple(...bs)>;
case record(...bs):
Trees tps = ((Node)type).children();
Trees cs = #[ ];
int i = 0;
for ( Tree b: bs )
match b {
case bind(`n,`a):
cs = cs.append(#<bind(`n,`(merge(a,((Node)tps.nth(i++)).children().nth(1),
#<project(`X,`n)>,#<project(`Y,`n)>,first)))>);
}
return #<record(...cs)>;
case union:
return #<call(plus,`X,`Y)>;
case fixed:
return Y;
case _:
if (!m.is_variable())
fail;
for ( Tree monoid: monoids )
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
if (#<`aggr>.equals(m))
return #<apply(`plus,tuple(`X,`Y))>;
};
};
throw new Error("Undefined monoid: "+m);
}
/** Return the merge function (over X and Y) of the monoid m; type is used for key equality */
private static Tree merge_left ( Tree m, Tree type, Tree X, Tree Y, boolean first ) {
match m {
case `gb(`n):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
match type {
case `T(tuple(`keytp,`tp)):
if (first) {
Tree nv = new_var();
Tree mb = merge(n,tp,#<nth(`nv,0)>,#<nth(`nv,1)>,false);
return #<rightOuterMerge(lambda(`nv,`mb),`X,`Y)>;
} else if (unique_key(keytp)) {
Tree vx = new_var();
Tree v = merge_left(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`Y),1)>,false);
if (is_persistent_collection(T))
X = #<Collect(`X)>;
return #<let(`vx,`X,
if(call(exists,`vx),
bag(tuple(`keytp,`v)),
call(elem,`Y)))>;
};
// needs an outer-join in the reducer function
Tree v = new_var();
Tree vx = new_var();
Tree vy = new_var();
Tree mx = new_var();
Tree mb = merge_left(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>,false);
Tree b = #<cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),`mb))),
nth(`v,1))),
`mx)>;
return #<join(lambda(`vx,`(key_equality(keytp,#<nth(`vx,0)>))),
lambda(`vy,`(key_equality(keytp,#<nth(`vy,0)>))),
lambda(`v,let(`mx,nth(`v,0),
if(call(exists,`mx),`b,nth(`v,1)))),
`X, `Y)>;
};
throw new Error("Unknown type for merge_left: "+type);
case product(...ms):
Trees tps = ((Node)type).children();
Trees bs = #[ ];
int i = 0;
for ( Tree a: ms ) {
bs = bs.append(merge_left(a,tps.nth(i),#<nth(`X,`i)>,#<nth(`Y,`i)>,first));
i++;
};
return #<tuple(...bs)>;
case record(...bs):
Trees tps = ((Node)type).children();
Trees cs = #[ ];
int i = 0;
for ( Tree b: bs )
match b {
case bind(`n,`a):
cs = cs.append(#<bind(`n,`(merge_left(a,((Node)tps.nth(i++)).children().nth(1),
#<project(`X,`n)>,#<project(`Y,`n)>,first)))>);
}
return #<record(...cs)>;
case union:
return #<call(plus,`X,`Y)>;
case fixed:
return Y;
case _:
if (!m.is_variable())
fail;
for ( Tree monoid: monoids )
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
if (#<`aggr>.equals(m))
return #<apply(`plus,tuple(`X,`Y))>;
};
};
throw new Error("Undefined monoid: "+m);
}
/** Return the zero element of the monoid m */
private static Tree zero ( Tree m ) {
match m {
case `gb(`n):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
return #<bag()>;
case product(...ms):
Trees bs = #[ ];
for ( Tree a: ms )
bs = bs.append(zero(a));
return #<tuple(...bs)>;
case record(...bs):
Trees cs = #[ ];
for ( Tree b: bs )
match b {
case bind(`n,`a):
cs = cs.append(#<bind(`n,`(zero(a)))>);
}
return #<record(...cs)>;
case fixed:
return #<bag()>;
case union:
return #<bag()>;
case _:
if (!m.is_variable())
fail;
for ( Tree monoid: monoids )
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
if (#<`aggr>.equals(m))
return zero;
};
};
throw new Error("Undefined monoid: "+m);
}
/** Convert joins to coGroups, plus other transformations */
private static Tree normalize_term ( Tree e ) {
match e {
case join(`kx,`ky,`r,cmap(`f,`x),cmap(`g,`y)):
if (!x.equals(y))
fail;
Tree v = new_var(), w = new_var(),
wx = new_var(), wy = new_var(),
vx = new_var(), vy = new_var();
Tree xtp = TypeInference.type_inference(((Node)kx).children.head);
Tree ytp = TypeInference.type_inference(((Node)ky).children.head);
Tree T = #<union(inL(`xtp),inR(`ytp))>;
Tree X = #<cmap(lambda(`w,call(plus,cmap(lambda(`wx,bag(tuple(apply(`kx,`wx),
typed(tagged_union(0,`wx),`T)))),
apply(`f,`w)),
cmap(lambda(`wy,bag(tuple(apply(`ky,`wy),
typed(tagged_union(1,`wy),`T)))),
apply(`g,`w)))),
`x)>;
Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())),
nth(`v,1))>;
Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())),
nth(`v,1))>;
return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))),
groupBy(`X))>);
case join(`kx,`ky,`r,`x,cmap(`g,`y)):
if (!x.equals(y))
fail;
Tree v = new_var(), w = new_var(), wy = new_var(),
vx = new_var(), vy = new_var();
Tree xtp = TypeInference.type_inference(((Node)kx).children.head);
Tree ytp = TypeInference.type_inference(((Node)ky).children.head);
Tree T = #<union(inL(`xtp),inR(`ytp))>;
Tree X = #<cmap(lambda(`w,call(plus,bag(tuple(apply(`kx,`w),typed(tagged_union(0,`w),`T))),
cmap(lambda(`wy,bag(tuple(apply(`ky,`wy),
typed(tagged_union(1,`wy),`T)))),
apply(`g,`w)))),
`x)>;
Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())),
nth(`v,1))>;
Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())),
nth(`v,1))>;
return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))),
groupBy(`X))>);
case join(`kx,`ky,`r,cmap(`f,`x),`y):
if (!x.equals(y))
fail;
Tree v = new_var(), w = new_var(), wx = new_var(),
vx = new_var(), vy = new_var();
Tree xtp = TypeInference.type_inference(((Node)kx).children.head);
Tree ytp = TypeInference.type_inference(((Node)ky).children.head);
Tree T = #<union(inL(`xtp),inR(`ytp))>;
Tree X = #<cmap(lambda(`w,call(plus,cmap(lambda(`wx,bag(tuple(apply(`kx,`wx),
typed(tagged_union(0,`wx),`T)))),
apply(`f,`w)),
bag(tuple(apply(`ky,`w),typed(tagged_union(1,`w),`T))))),
`x)>;
Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())),
nth(`v,1))>;
Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())),
nth(`v,1))>;
return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))),
groupBy(`X))>);
case join(`kx,`ky,`r,`x,`y):
Tree v = new_var();
Tree vx = new_var();
Tree vy = new_var();
Tree X = #<cmap(lambda(`vx,bag(tuple(apply(`kx,`vx),`vx))),`x)>;
Tree Y = #<cmap(lambda(`vy,bag(tuple(apply(`ky,`vy),`vy))),`y)>;
return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(nth(nth(`v,1),0),
nth(nth(`v,1),1)))),
coGroup(`X,`Y))>);
case `gb(`u):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
match u {
case cmap(...):
return #<`gb(`(normalize_term(u)))>;
};
Tree nv = new_var();
return #<`gb(cmap(lambda(`nv,bag(`nv)),`(normalize_term(u))))>;
case coGroup(`e1,`e2):
match e1 {
case cmap(...):
case _:
Tree v1 = new_var();
e1 = #<cmap(lambda(`v1,bag(`v1)),`(normalize_term(e1)))>;
};
match e2 {
case cmap(...):
case _:
Tree v2 = new_var();
e2 = #<cmap(lambda(`v2,bag(`v2)),`(normalize_term(e2)))>;
};
return #<coGroup(`(normalize_term(e1)),`(normalize_term(e2)))>;
case reduce(`m,`u):
match u {
case cmap(...):
return #<reduce(`m,`(normalize_term(u)))>;
};
Tree nv = new_var();
return #<reduce(`m,cmap(lambda(`nv,bag(`nv)),`(normalize_term(u))))>;
case call(`f,`s):
for ( Tree monoid: monoids )
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
if (#<`aggr>.equals(f)) {
match s {
case cmap(...):
return #<reduce(`aggr,`(normalize_term(s)))>;
};
Tree nv = new_var();
return #<reduce(`aggr,cmap(lambda(`nv,bag(`nv)),`(normalize_term(s))))>;
}
};
fail
case `f(...as):
Trees bs = #[ ];
for ( Tree a: as )
bs = bs.append(normalize_term(a));
return #<`f(...bs)>;
};
return e;
}
/** Embed missing cmaps */
private static Tree embed_missing_cmaps ( Tree e ) {
match e {
case project(`x,`a):
match TypeInference.type_inference(x) {
case `T(`tp):
if (!is_collection(T))
fail;
Tree v = new_var();
type_env.insert(v.toString(),tp);
return embed_missing_cmaps(#<cmap(lambda(`v,bag(project(`v,`a))),`x)>);
};
fail
case nth(`x,`n):
match TypeInference.type_inference(x) {
case `T(`tp):
if (!is_collection(T))
fail;
Tree v = new_var();
type_env.insert(v.toString(),tp);
return embed_missing_cmaps(#<cmap(lambda(`v,bag(nth(`v,`n))),`x)>);
};
fail
case `f(...as):
Trees bs = #[ ];
for ( Tree a: as )
bs = bs.append(embed_missing_cmaps(a));
return #<`f(...bs)>;
};
return e;
}
private static Tree subst_pattern ( Tree pattern, Tree src, Tree dst ) {
match pattern {
case tuple(...as):
Tree res = dst;
int i = 0;
for ( Tree a: as )
res = subst_pattern(a,#<nth(`src,`(i++))>,res);
return res;
};
return subst_var(pattern,src,dst);
}
/** Simplify the term e using rewrite rules */
static Tree simplify_term ( Tree e ) {
match e {
case cmap(`f,cmap(lambda(`v,`u),`x)):
return simplify_term(#<cmap(lambda(`v,cmap(`f,`u)),`x)>);
case cmap(lambda(`x,`b),bag(`a)):
return simplify_term(subst_var(x,a,b));
case cmap(lambda(`x,`b),bag()):
return #<bag()>;
case cmap(`f,if(`p,`e1,`e2)):
return simplify_term(#<if(`p,cmap(`f,`e1),cmap(`f,`e2))>);
case cmap(`f,call(plus,`x,`y)):
return simplify_term(#<call(plus,cmap(`f,`x),cmap(`f,`y))>);
case groupBy(bag()):
return #<bag()>;
case cmap(`f,join(`kx,`ky,lambda(`v,`r),`X,`Y)):
return simplify_term(#<join(`kx,`ky,lambda(`v,cmap(`f,`r)),`X,`Y)>);
case apply(lambda(`v,`b),`u):
if (!v.is_variable())
fail;
return simplify_term(subst_var(v,u,b));
case apply(lambda(`pattern,`b),`u):
return simplify_term(subst_pattern(pattern,u,b));
case call(elem,bag(`u)):
return simplify_term(u);
case nth(tuple(...al),`n):
if (!n.is_long())
fail;
int i = (int)n.longValue();
if (i >= 0 && i < al.length())
return simplify_term(al.nth(i));
case project(record(...bl),`a):
for ( Tree b: bl )
match b {
case bind(`v,`u):
if (v.equals(a))
return simplify_term(u);
};
case `f(...as):
Trees bs = #[ ];
for ( Tree a: as )
bs = bs.append(simplify_term(a));
return #<`f(...bs)>;
};
return e;
}
/** Simplify the term e using rewrite rules */
public static Tree SimplifyTerm ( Tree e ) {
Tree ne = simplify_term(e);
if (e.equals(ne))
return e;
else return SimplifyTerm(ne);
}
/** Does this term contain a stream source? */
public static boolean is_streaming ( Tree e ) {
match e {
case call(stream,...):
return true;
case `f(...al):
for ( Tree a: al )
if (is_streaming(a))
return true;
};
if (repeat_var(e))
return true;
else return false;
}
/** Collect all stream sources */
private static Trees stream_bindings ( Tree e ) {
match e {
case call(stream,...):
return #[`e];
case `f(...al):
if (#[BinaryStream,ParsedStream,SocketStream].member(#<`f>))
return #[`e];
Trees rs = #[];
for ( Tree a: al )
for ( Tree b: stream_bindings(a) )
if (!rs.member(b))
rs = rs.cons(b);
return rs;
};
if (false && repeat_var(e))
return #[`e];
else return #[];
}
/** Convert an algebraic expression into a stream-based by pulling out all the stream sources
* @param e algebraic expression
* @return a stream-based expression
*/
public static Tree streamify ( Tree e ) {
Trees sbs = stream_bindings(e);
Tree ne = e;
for ( Tree sb: sbs ) {
Tree v = new_var();
TypeInference.type_env.insert(v.toString(),TypeInference.type_inference(sb));
ne = #<Stream(lambda(`v,`(subst(sb,v,ne))),`sb)>;
};
return ne;
}
public static Tree unstreamify ( Tree e ) {
match e {
case call(stream,...r):
return #<call(source,...r)>;
case BinaryStream(...r):
return #<BinarySource(...r)>;
case ParsedStream(...r):
return #<ParsedSource(...r)>;
case `f(...al):
Trees bs = #[ ];
for ( Tree a: al )
bs = bs.append(unstreamify(a));
return #<`f(...bs)>;
};
return e;
}
/** Return the answer function of a query e
* @param e a query
* @param var the input of the answer function
* @param merge the monoid associated with the input
* @return the answer function
*/
static Tree answer ( Tree e, Tree var, Tree merge ) {
match e {
case cmap(`f,call(stream,...s)):
return var;
case cmap(`f,`v):
if (!repeat_var(v))
fail;
return var;
case cmap(lambda(`v,`b),`u):
match merge {
case `gb(`m):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
Tree nv = new_var();
return #<reduce(`gb(`m),
cmap(lambda(`nv,bag(tuple(nth(nth(`nv,0),0),nth(`nv,1)))),
`var))>;
}
};
return var;
}
/** Remove the state from the query output */
private static Tree strip_state ( Tree e, Tree var ) {
match e {
case reduce(`m,call(stream,...s)):
return var;
case reduce(`m,`v):
if (!repeat_var(v))
fail;
return var;
case reduce(`m,`u):
return var;
case cmap(`f,`u):
Tree nn = new_var();
return #<cmap(lambda(`nn,bag(nth(`nn,1))),`var)>;
};
return var;
}
/** Convert a stream-based query to an incremental stream-based program.
* It returns the quadruple (zero,merge,homomorphism,answer),
* where (zero,merge) is the monoid for the homomorphism
* and answer(homomorphism) is equal to the original query
* @param e a stream-based query
* @param env binds variables to monoids
* @return a quadruple that represents the incremental stream-based program
*/
static Tree generate_code ( Tree e, Environment env ) {
if (Config.trace) {
System.out.println("Subterm:");
System.out.println(e.pretty(0));
};
Tree ne = SimplifyTerm(normalize_term(e));
TypeInference.type_inference(ne);
ne = SimplifyTerm(embed_missing_cmaps(ne));
if (Config.trace) {
System.out.println("Subterm normalized:");
System.out.println(ne.pretty(0));
};
Tree qe = SimplifyTerm(inject_q(ne));
if (Config.trace) {
System.out.println("After lineage injection:");
System.out.println(qe.pretty(0));
};
Tree nv = new_var();
match split(qe,nv,env) {
case pair(`a,`h):
if (Config.trace) {
System.out.println("After split:");
System.out.println(a.pretty(0)+"\n"+h.pretty(0));
};
h = SimplifyTerm(h);
if (Config.trace) {
System.out.println("After split (simplified):");
System.out.println(SimplifyTerm(a).pretty(0)+"\n"+h.pretty(0));
};
Tree m = get_monoid(h,env);
if (Config.trace)
System.out.println("Subterm monoid: "+m.pretty(0));
Tree q = convert_to_algebra(h);
Tree tp = TypeInference.type_inference(q);
if (Config.trace) {
System.out.println("Subterm type: "+tp);
System.out.println("Merge function over X and Y: ");
System.out.println(SimplifyTerm(merge(m,tp,#<X>,#<Y>,true)).pretty(0));
};
type_env.insert(nv.toString(),tp);
Tree answer = answer(ne,nv,m);
answer = subst(nv,answer,a);
answer = strip_state(ne,answer);
answer = SimplifyTerm(convert_to_algebra(SimplifyTerm(answer)));
if (Config.trace) {
System.out.println("Answer function:");
System.out.println(answer.pretty(0));
};
Tree zero = zero(m);
Tree my = new_var();
Tree merge = convert_to_algebra(SimplifyTerm(merge(m,tp,nv,my,true)));
Tree res = #<tuple(`zero,lambda(tuple(`nv,`my),`merge),`q,lambda(`nv,`answer),`m)>;
if (Config.trace) {
System.out.println("Incremental subterm:");
System.out.println(res.pretty(0));
};
return res;
};
throw new Error("Cannot generate incremental code: "+e);
}
private static Tree generate_incremental_code ( Tree e, Environment env ) {
if (!is_streaming(e))
return #<tuple(tuple(),lambda(tuple(tuple(),tuple()),tuple()),tuple(),lambda(tuple(),`e),fixed)>;
match e {
case call(stream,...):
Tree v = new_var();
return generate_code(#<cmap(lambda(`v,bag(`v)),`e)>,env);
case call(`f,...qs):
boolean is_monoid = false;
for ( Tree monoid: monoids )
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
if (#<`aggr>.equals(f))
is_monoid = true;
};
if (is_monoid)
fail;
Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[], ns = #[];
int i = 0;
for ( Tree q: qs )
match generate_incremental_code(q,env) {
case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a),`n):
zs = zs.append(z); hs = hs.append(h); as = as.append(a);
vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
ns = ns.append(n); i++;
};
return #<tuple(tuple(...zs),
lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
tuple(...hs),
lambda(tuple(...vs),call(`f,...as)),
tuple(...ns))>;
case record(...rs):
Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[], ns = #[];
int i = 0;
for ( Tree r: rs )
match r {
case bind(`k,`q):
match generate_incremental_code(q,env) {
case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a),`n):
zs = zs.append(z); hs = hs.append(h); as = as.append(a);
vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
ns = ns.append(n); i++;
}
};
return #<tuple(tuple(...zs),
lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
tuple(...hs),
lambda(tuple(...vs),record(...as)),
tuple(...ns))>;
case tuple(...qs):
Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[], ns = #[];
int i = 0;
for ( Tree q: qs )
match generate_incremental_code(q,env) {
case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a),`n):
zs = zs.append(z); hs = hs.append(h); as = as.append(a);
vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
ns = ns.append(n); i++;
};
return #<tuple(tuple(...zs),
lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
tuple(...hs),
lambda(tuple(...vs),tuple(...as)),
tuple(...ns))>;
};
return generate_code(e,env);
}
/** bind the pattern variables to types */
private static void bind_pattern2type ( Tree pattern, Tree tp ) {
match pattern {
case tuple(...pl):
match tp {
case tuple(...tl):
int i = 0;
for ( Tree p: pl )
bind_pattern2type(p,tl.nth(i++));
return;
}
};
TypeInference.type_env.insert(pattern.toString(),tp);
}
/** Convert a stream-based query to an incremental stream-based program.
* @param e a stream-based query
* @return the incremental stream-based program
*/
public static Tree generate_incremental_code ( Tree e ) {
match e {
case repeat(lambda(`v,`u),`x,`n):
if (false && !is_streaming(x))
fail;
if (!(n instanceof LongLeaf))
throw new Error("The repeat must have a constant repetition: "+n);
int nn = (int)((LongLeaf)n).value();
if (nn < 1)
throw new Error("Wrong repeat number: "+n);
Tree nv = new_var();
u = #<cmap(lambda(`nv,bag(nth(`nv,0))),`u)>;
TypeInference.type_inference(u);
repeat_environment = new Environment(v.toString(),#<union>,repeat_environment);
match generate_incremental_code(u,null) {
case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a),`monoid):
Tree deltaT = new_var();
Tree w = new_var();
Tree tp = TypeInference.type_inference(s);
TypeInference.type_env.insert(deltaT.toString(),tp);
match tp {
case `T(`etp):
TypeInference.type_env.insert(T.toString(),etp);
};
Tree m_hat = convert_to_algebra(SimplifyTerm(merge_left(monoid,tp,v1,v2,true)));
if (Config.trace) {
System.out.println("Left-merge function over X and Y: ");
System.out.println(SimplifyTerm(merge_left(monoid,tp,#<X>,#<Y>,true)).pretty(0));
};
Tree nm = subst(v1,s,subst(v2,subst(v,subst(s,deltaT,a),h),m_hat));
Tree m0 = subst(v,x,h);//subst(v1,s,subst(v2,subst(v,x,h),m_hat));
Tree q = #<repeat(lambda(`deltaT,cmap(lambda(`w,bag(tuple(`w,true))),`nm)),
`m0,
`(nn-1))>;
Tree res = #<incr(`z,
lambda(`s,apply(lambda(tuple(`v1,`v2),`m),tuple(`s,`q))),
lambda(`s,`a))>;
res = SimplifyTerm(res);
if (Config.trace) {
System.out.println("Incremental program:");
System.out.println(res.pretty(0));
};
TypeInference.type_inference(res);
return res;
}
case repeat(lambda(`v,`u),`x,`n):
if (false && !is_streaming(x))
fail;
if (!(n instanceof LongLeaf))
throw new Error("The repeat must have a constant repetition: "+n);
int nn = (int)((LongLeaf)n).value();
if (nn < 1)
throw new Error("Wrong repeat number: "+n);
Tree nv = new_var();
u = #<cmap(lambda(`nv,bag(nth(`nv,0))),`u)>;
TypeInference.type_inference(u);
repeat_environment = new Environment(v.toString(),#<union>,repeat_environment);
match generate_incremental_code(u,null) {
case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a),_):
Tree deltaT = new_var();
Tree w = new_var();
TypeInference.type_env.insert(deltaT.toString(),TypeInference.type_inference(s));
TypeInference.type_env.insert(w.toString(),TypeInference.type_inference(s));
Tree nm = subst(v1,deltaT,subst(v2,h,m));
Tree m0 = subst(v,x,h);
Tree q = #<repeat(lambda(`deltaT,cmap(lambda(`w,bag(tuple(`w,true))),`nm)),
`m0,
`(nn-1))>;
Tree res = #<incr(tuple(`z,`(unstreamify(x))),
lambda(tuple(`s,`v),tuple(apply(lambda(tuple(`v1,`v2),`m),tuple(`s,`q)),`a)),
lambda(tuple(`s,`v),`v))>;
res = SimplifyTerm(res);
if (Config.trace) {
System.out.println("Incremental program:");
System.out.println(res.pretty(0));
};
TypeInference.type_inference(res);
return res;
}
};
match generate_incremental_code(e,null) {
case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(_,`a),_):
Tree tp = TypeInference.type_inference(h);
bind_pattern2type(v1,tp);
Tree nm = SimplifyTerm(#<apply(lambda(`v2,`m),`h)>);
a = SimplifyTerm(a);
Tree res = #<incr(`z,lambda(`v1,`nm),lambda(`v1,`a))>;
if (Config.trace) {
System.out.println("Incremental program:");
System.out.println(res.pretty(0));
};
TypeInference.type_inference(res);
return res;
};
throw new Error("Cannot generate incremental code: "+e);
}
}