| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import org.apache.mrql.gen.*; |
| |
| /** Generates code for streaming queries */ |
| final public class Streaming extends AlgebraicOptimization { |
| private static int istate = 0; |
| // set it to true to debug the monoid inference |
| private static boolean inference_tracing = false; |
| // set it to true to debug the split function |
| private static boolean split_tracing = false; |
| |
| /** An environment that binds variables to monoids */ |
| final static class Environment { |
| public String name; |
| public Tree monoid; |
| public Environment next; |
| |
| Environment ( String n, Tree m, Environment next ) { |
| name = n; |
| monoid = m; |
| this.next = next; |
| } |
| |
| public String toString () { |
| return name+": "+monoid+((next==null)?"":", "+next.toString()); |
| } |
| } |
| |
| /** Check if name is defined in the environment |
| * @param name a variable name |
| * @param env an environment that binds variables to monoids |
| * @return true if the name is defined in env |
| */ |
| private static boolean member ( String name, Environment env ) { |
| for ( Environment en = env; en != null; en = en.next ) |
| if (en.name.equals(name)) |
| return true; |
| return false; |
| } |
| |
| /** if name is defined in the environment, return its binding |
| * @param name a variable name |
| * @param env an environment that binds variables to monoids |
| * @return the monoid associated with the variable |
| */ |
| private static Tree find ( String name, Environment env ) { |
| for ( Environment en = env; en != null; en = en.next ) |
| if (en.name.equals(name)) |
| return en.monoid; |
| return #<none>; |
| } |
| |
| private static Environment repeat_environment = null; |
| |
| /* true if e is a variable in a repeat loop */ |
| private static boolean repeat_var ( Tree e ) { |
| return e.is_variable() && member(e.toString(),repeat_environment); |
| } |
| |
| private static int tab_count = -3; |
| |
| /** If e is a homomorphism, return the associated monoid; |
| * if it returns none, then it fails to find a monoid; |
| * if it returns fixed, then its invariant. |
| * @param e an MRQL algebraic term |
| * @param env binds variables to monoids |
| * @return the monoid, or none if it doesn't exist |
| */ |
| static Tree inference ( Tree e, Environment env ) { |
| if (inference_tracing) { |
| tab_count += 3; |
| System.out.println(Interpreter.tabs(tab_count)+e); |
| if (env != null) |
| System.out.println(Interpreter.tabs(tab_count)+env.toString()); |
| |
| }; |
| Tree res = inference_trace(e,env); |
| if (inference_tracing) { |
| System.out.println(Interpreter.tabs(tab_count)+"-> "+res); |
| tab_count -= 3; |
| }; |
| return res; |
| } |
| |
| private static Tree inference_trace ( Tree e, Environment env ) { |
| match e { |
| case groupBy(`x): |
| match inference(x,env) { |
| case none: return #<none>; |
| case fixed: return #<fixed>; |
| case `m: |
| return #<groupBy(`m)>; |
| } |
| case orderBy(`x): |
| match inference(x,env) { |
| case none: return #<none>; |
| case fixed: return #<fixed>; |
| case `m: |
| return #<orderBy(`m)>; |
| } |
| case coGroup(`x,`y): |
| Tree mx = inference(x,env); |
| Tree my = inference(y,env); |
| if (mx.equals(#<none>) || my.equals(#<none>)) |
| return #<none>; |
| else if (mx.equals(#<fixed>) && my.equals(#<fixed>)) |
| return #<fixed>; |
| else return #<groupBy(product(`mx,`my))>; |
| case cmap(lambda(`v,bag(`w)),`x): |
| if (!w.equals(v)) |
| fail; |
| return inference(x,env); |
| case cmap(lambda(`v,bag(tuple(`k,`u))),`x): |
| match inference(x,env) { |
| case `gb(`m): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) fail; |
| match inference(k,new Environment(v.toString(),#<product(fixed,`m)>,env)) { |
| case fixed: |
| Tree b = inference(u,new Environment(v.toString(),#<product(fixed,`m)>,env)); |
| if (b.equals(#<none>)) |
| fail; |
| return #<`gb(`b)>; |
| } |
| }; |
| fail |
| case cmap(lambda(`v,`b),`x): |
| match inference(x,env) { |
| case union: |
| match inference(b,new Environment(v.toString(),#<fixed>,env)) { |
| case `gb(`m): |
| return #<`gb(`m)>; |
| }; |
| return #<union>; |
| case `gb(`m): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| match inference(b,new Environment(v.toString(),#<product(fixed,`m)>,env)) { |
| case `gb2(`m2): |
| if (! #[groupBy,orderBy].member(#<`gb2>) ) |
| fail; |
| return #<`gb2(`m2)>; |
| case union: |
| return #<union>; |
| case fixed: |
| return #<fixed>; |
| } |
| case fixed: |
| match inference(b,new Environment(v.toString(),#<fixed>,env)) { |
| case union: |
| return #<union>; |
| case fixed: |
| return #<fixed>; |
| case `gb(`m): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| return #<`gb(`m)>; |
| } |
| }; |
| fail |
| case reduce(groupBy(count),`u): |
| if (inference(u,env).equals(#<none>)) |
| fail; |
| return #<groupBy(count)>; |
| case reduce(groupBy(`m),`u): |
| match inference(u,env) { |
| case groupBy(union): |
| return #<groupBy(`m)>; |
| }; |
| return #<none>; |
| case reduce(count,`u): |
| if (inference(u,env).equals(#<none>)) |
| fail; |
| return #<count>; |
| case reduce(`m,`u): |
| match inference(u,env) { |
| case union: |
| return m; |
| case fixed: |
| return #<fixed>; |
| }; |
| return #<none>; |
| case tuple(...s): |
| Trees ms = #[ ]; |
| for ( Tree x: s ) |
| match inference(x,env) { |
| case none: |
| return #<none>; |
| case `m: |
| ms = ms.append(m); |
| }; |
| boolean fixed = true; |
| for (Tree mm: ms) |
| fixed &= mm.equals(#<fixed>); |
| if (fixed) |
| return #<fixed>; |
| else return #<product(...ms)>; |
| case record(...s): |
| Trees ms = #[ ]; |
| for ( Tree b: s ) |
| match b { |
| case bind(`n,`x): |
| match inference(x,env) { |
| case none: |
| return #<none>; |
| case `m: |
| ms = ms.append(#<bind(`n,`m)>); |
| } |
| }; |
| boolean fixed = true; |
| for (Tree mm: ms) |
| match mm { |
| case bind(_,`m): |
| fixed &= m.equals(#<fixed>); |
| }; |
| if (fixed) |
| return #<fixed>; |
| else return #<record(...ms)>; |
| case nth(`u,`i): |
| int n = (int)i.longValue(); |
| match inference(u,env) { |
| case fixed: |
| return #<fixed>; |
| case product(...ms): |
| return ms.nth(n); |
| }; |
| return #<none>; |
| case project(`u,`a): |
| match inference(u,env) { |
| case fixed: |
| return #<fixed>; |
| }; |
| return #<none>; |
| case if(`p,`e1,`e2): |
| return inference(e1,env); |
| case bag(`u): |
| match inference(u,env) { |
| case fixed: |
| return #<fixed>; |
| }; |
| return #<none>; |
| case call(stream,...): |
| return #<union>; |
| case call(source,...): |
| return #<fixed>; |
| case call(`f,...s): |
| Trees ms = #[ ]; |
| for ( Tree x: s ) |
| match inference(x,env) { |
| case none: |
| return #<none>; |
| case `m: |
| ms = ms.append(m); |
| }; |
| boolean fixed = true; |
| for (Tree mm: ms) |
| fixed &= mm.equals(#<fixed>); |
| if (fixed) |
| return #<fixed>; |
| return #<none>; |
| case bind(_,`u): |
| return inference(u,env); |
| case typed(`u,_): |
| return inference(u,env); |
| case _: |
| if (e.is_string() || e.is_long() || e.is_double()) |
| return #<fixed>; |
| else if (e.is_variable()) |
| if (repeat_var(e)) |
| return find(e.toString(),repeat_environment); |
| else if (member(e.toString(),env)) |
| return find(e.toString(),env); |
| else return #<fixed>; |
| }; |
| return #<none>; |
| } |
| |
| public static Tree inference ( Tree e ) { |
| return inference(e,null); |
| } |
| |
| private static Tree smap1 ( Tree f, Tree e ) { |
| Tree nv = new_var(); |
| Tree b = new_var(); |
| return #<cmap(lambda(`nv,cmap(lambda(`b,bag(tuple(nth(`nv,0),`b))), |
| apply(`f,tuple(nth(nth(`nv,0),0),nth(`nv,1))))), |
| `e)>; |
| } |
| |
| private static Tree smap2 ( Tree f, Tree e ) { |
| Tree nv = new_var(); |
| Tree nw = new_var(); |
| return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(nth(`nw,0), |
| tuple(nth(`nv,0),nth(`nw,1))))), |
| apply(`f,tuple(nth(nth(`nv,0),0),nth(`nv,1))))), |
| `e)>; |
| } |
| |
| private static Tree smap3 ( Tree f, Tree e ) { |
| Tree nv = new_var(); |
| Tree nw = new_var(); |
| return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(nth(`nw,0), |
| tuple(tuple(),nth(`nw,1))))), |
| apply(`f,`nv))), |
| `e)>; |
| } |
| |
| private static Tree swap ( Tree e ) { |
| Tree nv = new_var(); |
| return #<cmap(lambda(`nv,bag(tuple(tuple(nth(`nv,0),nth(nth(`nv,1),0)), |
| nth(nth(`nv,1),1)))), |
| `e)>; |
| } |
| |
| private static Tree mix ( Tree e ) { |
| Tree nv = new_var(); |
| Tree x = new_var(); |
| Tree y = new_var(); |
| Tree b = #<bag(tuple(tuple(nth(`nv,0), |
| tuple(nth(`x,0),nth(`y,0))), |
| tuple(nth(`x,1),nth(`y,1))))>; |
| return #<cmap(lambda(`nv,cmap(lambda(`x,cmap(lambda(`y,`b), |
| groupBy(nth(nth(`nv,1),1)))), |
| groupBy(nth(nth(`nv,1),0)))), |
| `e)>; |
| } |
| |
| /** inject lineage (all groupBy/join keys) to a query e |
| * @param e an MRQL query |
| * @return an algebraic term that associates lineage to every query result |
| */ |
| public static Tree inject_q ( Tree e ) { |
| match e { |
| case reduce(`m,cmap(`f,call(stream,...s))): |
| return #<bag(tuple(tuple(),`e))>; |
| case reduce(`m,cmap(`f,`v)): |
| if (!repeat_var(v)) |
| fail; |
| return #<bag(tuple(tuple(),`e))>; |
| case reduce(`m,cmap(`f,`u)): |
| return #<reduce(groupBy(`m),`(smap1(f,inject_e(u))))>; |
| case reduce(`m,call(stream,...s)): |
| return #<bag(tuple(tuple(),`e))>; |
| case reduce(`m,`v): |
| if (!repeat_var(v)) |
| fail; |
| return #<bag(tuple(tuple(),`e))>; |
| case cmap(`f,call(stream,...s)): |
| Tree a = new_var(); |
| Tree b = new_var(); |
| return #<cmap(lambda(`a,cmap(lambda(`b,bag(tuple(tuple(),`b))), |
| apply(`f,`a))), |
| call(stream,...s))>; |
| case cmap(`f,`v): |
| if (!repeat_var(v)) |
| fail; |
| Tree a = new_var(); |
| Tree b = new_var(); |
| return #<cmap(lambda(`a,cmap(lambda(`b,bag(tuple(tuple(),`b))), |
| apply(`f,`a))), |
| `v)>; |
| case cmap(`f,`u): |
| return smap1(f,inject_e(u)); |
| }; |
| // otherwise, it's a constant |
| return #<bag(tuple(tuple(),`e))>; |
| } |
| |
| private static boolean stream_source ( Tree e ) { |
| match e { |
| case call(stream,...): |
| return true; |
| }; |
| return repeat_var(e); |
| } |
| |
| /** inject lineage (all groupBy/join keys) to a groupBy/join term e |
| * @param e a groupBy or coGroup (join) term |
| * @return an algebraic term that associates lineage to every query result |
| */ |
| static Tree inject_e ( Tree e ) { |
| match e { |
| case `gb(`c): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| return #<`gb(`(swap(inject_c(c))))>; |
| case coGroup(`c1,`c2): |
| match c1 { |
| case cmap(_,`u): |
| if (!stream_source(u)) |
| fail; |
| match c2 { |
| case cmap(_,`s): |
| if (!stream_source(s)) |
| fail; |
| Tree nv = new_var(); |
| return #<cmap(lambda(`nv,bag(tuple(tuple(nth(`nv,0),tuple(tuple(),tuple())), |
| nth(`nv,1)))), |
| coGroup(`c1,`c2))>; |
| }; |
| Tree nv = new_var(); |
| Tree y = new_var(); |
| return #<cmap(lambda(`nv,cmap(lambda(`y,bag(tuple(tuple(nth(`nv,0), |
| tuple(tuple(),nth(`y,0))), |
| tuple(nth(nth(`nv,1),0), |
| nth(`y,1))))), |
| groupBy(nth(nth(`nv,1),1)))), |
| coGroup(`c1,`(inject_c(c2))))>; |
| }; |
| match c2 { |
| case cmap(_,`u): |
| if (!stream_source(u)) |
| fail; |
| Tree nv = new_var(); |
| Tree x = new_var(); |
| return #<cmap(lambda(`nv,cmap(lambda(`x,bag(tuple(tuple(nth(`nv,0), |
| tuple(nth(`x,0),tuple())), |
| tuple(nth(`x,1), |
| nth(nth(`nv,1),1))))), |
| groupBy(nth(nth(`nv,1),0)))), |
| coGroup(`(inject_c(c1)),`c2))>; |
| }; |
| return mix(#<coGroup(`(inject_c(c1)),`(inject_c(c2)))>); |
| }; |
| return inject_c(e); |
| } |
| |
| /** Inject lineage (all groupBy/join keys) to a cMap e |
| * @param e a cMap term |
| * @return an algebraic term that associates lineage to every query result |
| */ |
| static Tree inject_c ( Tree e ) { |
| match e { |
| case cmap(`f,call(stream,...s)): |
| return smap3(f,#<call(stream,...s)>); |
| case cmap(`f,`v): |
| if (!repeat_var(v)) |
| fail; |
| return smap3(f,v); |
| case cmap(`f,`u): |
| return smap2(f,inject_e(u)); |
| case call(stream,...s): |
| return smap3(#<lambda(x,bag(x))>,#<call(stream,...s)>); |
| case `v: |
| if (!repeat_var(v)) |
| fail; |
| return smap3(#<lambda(x,bag(x))>,v); |
| }; |
| return e; |
| } |
| |
| /** find all homomorphic terms in the algebraic term e |
| * @param e an algebraic term |
| * @param env binds variables to monoids |
| * @return a list of homomorphic terms |
| */ |
| static Trees find_homomorphisms ( Tree e, Environment env ) { |
| if (! #[none,fixed].member(inference(e,env))) |
| return #[ `e ]; |
| match e { |
| case cmap(lambda(`v,bag(`b)),`x): |
| match inference(x,env) { |
| case union: |
| return #[ `e ]; |
| case `gb(`m): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) fail; |
| match inference(b,new Environment(v.toString(),#<product(fixed,`m)>,env)) { |
| case product(...): |
| return #[ `e ]; |
| } |
| case fixed: |
| match inference(b,new Environment(v.toString(),#<fixed>,env)) { |
| case union: |
| return #[ `e ]; |
| case fixed: |
| return #[ `e ]; |
| } |
| }; |
| fail |
| case record(...ds): |
| Trees bs = #[ ]; |
| for ( Tree d: ds ) |
| match d { |
| case bind(`n,`a): |
| bs = union(bs,find_homomorphisms(a,env)); |
| }; |
| return bs; |
| case call(`f,...as): |
| Trees bs = #[ ]; |
| for ( Tree a: as ) |
| bs = union(bs,find_homomorphisms(a,env)); |
| return bs; |
| case nth(`u,`n): |
| Trees bs = #[ ]; |
| for ( Tree x: find_homomorphisms(u,env) ) |
| bs = union(bs,#[nth(`x,`n)]); |
| return bs; |
| case project(`u,`a): |
| Trees bs = #[ ]; |
| for ( Tree x: find_homomorphisms(u,env) ) |
| bs = union(bs,#[project(`x,`a)]); |
| return bs; |
| case `f(...as): |
| Trees bs = #[ ]; |
| for ( Tree a: as ) |
| bs = union(bs,find_homomorphisms(a,env)); |
| return bs; |
| }; |
| return #[ ]; |
| } |
| |
| /** Split the term e into a homomorhism and an answer function |
| * @param e an algebraic term |
| * @param var the input variable of the answer function |
| * @param env binds variables to monoids |
| * @return a pair (answer,homomorphism) |
| */ |
| static Tree split ( Tree e, Tree var, Environment env ) { |
| if (split_tracing) { |
| tab_count += 3; |
| System.out.println(Interpreter.tabs(tab_count)+var+"\n" |
| +Interpreter.tabs(tab_count)+e.pretty(tab_count)); |
| |
| }; |
| Tree res = split_trace(e,var,env); |
| if (split_tracing) { |
| System.out.println(Interpreter.tabs(tab_count)+"-> "+res.pretty(tab_count+3)); |
| tab_count -= 3; |
| }; |
| return res; |
| } |
| |
| /** Split the term e into a homomorhism and an answer function |
| * @param e an algebraic term |
| * @param var the input variable of the answer function |
| * @param env binds variables to monoids |
| * @return a pair (answer,homomorphism) |
| */ |
| static Tree split_trace ( Tree e, Tree var, Environment env ) { |
| match e { |
| case cmap(lambda(`v,bag(tuple(`k,`b))),`u): |
| Environment nenv = env; |
| Tree nv = new_var(); |
| match inference(u,env) { |
| case union: |
| nv = #<nth(`nv,1)>; |
| nenv = new Environment(v.toString(),#<union>,nenv); |
| case `gb(`m): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| nenv = new Environment(v.toString(),#<product(fixed,`m)>,nenv); |
| }; |
| Trees hs = find_homomorphisms(b,nenv); |
| if (hs.length() == 1 && hs.nth(0).equals(b)) |
| return #<pair(`var,`e)>; |
| Tree ne = b; |
| int i = 0; |
| for ( Tree h: hs ) { |
| ne = subst(h,#<nth(nth(`nv,1),`i)>,ne); |
| i++; |
| }; |
| return #<pair(cmap(lambda(`nv,bag(tuple(nth(`nv,0),`ne))),`var), |
| cmap(lambda(`v,bag(tuple(`k,tuple(...hs)))),`u))>; |
| case cmap(lambda(`v,`b),`u): |
| match inference(u,env) { |
| case `gb(`m): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| Environment nenv = new Environment(v.toString(),#<product(fixed,`m)>,env); |
| match inference(b,nenv) { |
| case `gb2(`m2): |
| if (! #[groupBy,orderBy].member(#<`gb2>) ) |
| fail; |
| return #<pair(`var,`e)>; |
| case _: |
| Tree nv = new_var(); |
| match split(b,nv,nenv) { |
| case pair(`a,`h): |
| Tree nw = new_var(); |
| if (occurences(v,a) == 0) { |
| a = subst(nv,var,a); |
| return #<pair(`a,cmap(lambda(`v,`h),`u))>; |
| }; |
| // The answer function needs v; need to piggyback v along with the state |
| a = subst(nv,#<bag(tuple(nth(`nw,0),nth(nth(`nw,1),1)))>, |
| subst(v,#<nth(nth(`nw,1),0)>,a)); |
| return #<pair(cmap(lambda(`nw,`a),`var), |
| cmap(lambda(`v,cmap(lambda(`nv,bag(tuple(nth(`nv,0), |
| tuple(`v,nth(`nv,1))))), |
| `h)), |
| `u))>; |
| } |
| } |
| case `m: |
| if (! #[union,fixed].member(m)) |
| fail; |
| Tree nv = new_var(); |
| match split(b,nv,new Environment(v.toString(),m,env)) { |
| case pair(`a,`h): |
| Tree nw = new_var(); |
| if (occurences(v,a) == 0) { |
| a = subst(nv,var,a); |
| return #<pair(`a,cmap(lambda(`v,`h),`u))>; |
| }; |
| // The answer function needs v; need to piggyback v along with the state |
| a = subst(nv,#<bag(tuple(nth(`nw,0),nth(nth(`nw,1),1)))>, |
| subst(v,#<nth(nth(`nw,1),0)>,a)); |
| return #<pair(cmap(lambda(`nw,`a),`var), |
| cmap(lambda(`v,cmap(lambda(`nv,bag(tuple(nth(`nv,0), |
| tuple(`v,nth(`nv,1))))), |
| `h)), |
| `u))>; |
| } |
| } |
| case call(stream,...): |
| return #<pair(`var,`e)>; |
| case reduce(`m,`u): |
| match inference(u,env) { |
| case union: |
| return #<pair(`var,`e)>; |
| case _: |
| match split(u,var,env) { |
| case pair(`b,`h): |
| return #<pair(reduce(`m,`b),`h)>; |
| } |
| }; |
| case tuple(...as): |
| Trees bs = #[ ]; |
| Trees hs = #[ ]; |
| int i = 0; |
| for ( Tree a: as ) |
| match split(a,#<nth(`var,`(i++))>,env) { |
| case pair(`b,`h): |
| bs = bs.append(b); |
| hs = hs.append(h); |
| }; |
| return #<pair(tuple(...bs),tuple(...hs))>; |
| case record(...as): |
| Trees bs = #[ ]; |
| Trees hs = #[ ]; |
| for ( Tree a: as ) |
| match a { |
| case bind(`v,`u): |
| match split(u,#<project(`var,`a)>,env) { |
| case pair(`b,`h): |
| bs = bs.append(#<bind(`v,`b)>); |
| hs = hs.append(#<bind(`v,`h)>); |
| } |
| }; |
| return #<pair(record(...bs),record(...hs))>; |
| case call(`f,...as): |
| if (! #[fixed].member(inference(e,env))) |
| fail; |
| return #<pair(`e,tuple())>; |
| case call(`f,`a): |
| match split(a,var,env) { |
| case pair(`b,`h): |
| return #<pair(call(`f,`b),`h)>; |
| }; |
| case call(`f,...as): |
| Tree nv = new_var(); |
| Trees bs = #[ ]; |
| Trees hs = #[ ]; |
| int i = 0; |
| for ( Tree a: as ) |
| match split(a,#<nth(`nv,`(i++))>,env) { |
| case pair(`b,`h): |
| bs = bs.append(b); |
| hs = hs.append(h); |
| }; |
| return #<pair(apply(lambda(`nv,call(`f,...bs)),`var),tuple(...hs))>; |
| case nth(`u,`n): |
| match split(u,var,env) { |
| case pair(`b,`h): |
| return #<pair(nth(`b,`n),`h)>; |
| }; |
| case project(`u,`a): |
| match split(u,var,env) { |
| case pair(`b,`h): |
| return #<pair(project(`b,`a),`h)>; |
| }; |
| case bag(tuple(`k,`u)): |
| Tree ne = #<nth(call(elem,`var),1)>; |
| match TypeInference.type_inference(u) { |
| case `S(_): |
| if (!is_persistent_collection(S)) |
| fail; |
| ne = #<nth(call(elem,Collect(`var)),1)>; |
| }; |
| match split(u,ne,env) { |
| case pair(`b,`h): |
| return #<pair(`b,bag(tuple(`k,`h)))>; |
| }; |
| case if(`p,`u,bag()): |
| match split(u,var,env) { |
| case pair(`b,`h): |
| return #<pair(`b,if(`p,`h,bag()))>; |
| }; |
| case `f(...as): |
| Tree nv = new_var(); |
| Trees bs = #[ ]; |
| Trees hs = #[ ]; |
| int i = 0; |
| for ( Tree a: as ) |
| match split(a,#<nth(`nv,`(i++))>,env) { |
| case pair(`b,`h): |
| bs = bs.append(b); |
| hs = hs.append(h); |
| }; |
| return #<pair(apply(lambda(`nv,`f(...bs)),`var),tuple(...hs))>; |
| }; |
| return #<pair(`var,`e)>; |
| } |
| |
| /** Return the monoid of a query e */ |
| private static Tree get_monoid ( Tree e, Environment env ) { |
| match e { |
| case reduce(`m,`u): |
| return #<groupBy(`m)>; |
| case cmap(`f,call(stream,...s)): |
| return #<union>; |
| case cmap(`f,`v): |
| if (!repeat_var(v)) |
| fail; |
| return #<union>; |
| case cmap(lambda(`v,`b),`u): |
| match inference(u,env) { |
| case groupBy(`m): |
| return get_monoid(b,new Environment(v.toString(),#<product(fixed,`m)>,env)); |
| case `m: |
| return get_monoid(b,env); |
| } |
| case bag(tuple(`k,`v)): |
| Tree m = inference(v,env); |
| return #<groupBy(`m)>; |
| case if(`p,`e1,bag()): |
| return get_monoid(e1,env); |
| }; |
| throw new Error("Unable to find the merge function for: "+e); |
| //return #<groupBy(fixed)>; |
| } |
| |
| private static Tree convert_reduce_all ( Tree m, Tree e ) { |
| match m { |
| case fixed: |
| return #<call(elem,`e)>; |
| case union: |
| Tree v = new_var(); |
| return #<cmap(lambda(`v,`v),`e)>; |
| case product(...ps): |
| int i = 0; |
| Trees as = #[ ]; |
| for ( Tree p: ps ) { |
| Tree nv = new_var(); |
| as = as.append(convert_reduce_all(p,#<cmap(lambda(`nv,bag(nth(`nv,`(i++)))),`e)>)); |
| }; |
| return #<tuple(...as)>; |
| case record(...bs): |
| Trees as = #[ ]; |
| for ( Tree b: bs ) |
| match b { |
| case bind(`n,`p): |
| Tree nv = new_var(); |
| Tree nb = convert_reduce_all(p,#<cmap(lambda(`nv,bag(project(`nv,`n))),`e)>); |
| as = as.append(#<bind(`n,`nb)>); |
| }; |
| return #<record(...as)>; |
| case `gb(`gm): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| Tree v = new_var(); |
| Tree me = convert_reduce_all(gm,#<nth(`v,1)>); |
| return #<cmap(lambda(`v,bag(tuple(nth(`v,0),`me))), |
| `gb(`e))>; |
| case _: |
| if (!m.is_variable()) |
| fail; |
| for ( Tree monoid: monoids ) |
| match monoid { |
| case `aggr(`mtp,`plus,`zero,`unit): |
| if (#<`aggr>.equals(m)) { |
| plus = Normalization.rename(plus); |
| return #<aggregate(`plus,`zero,`e)>; |
| } |
| }; |
| }; |
| return #<call(elem,`e)>; |
| } |
| |
| /** Convert a reduce on the monoid m to an aggregation */ |
| private static Tree convert_reduce ( Tree m, Tree e ) { |
| match m { |
| case fixed: |
| return e; |
| case union: |
| return e; |
| case product(...ps): |
| int i = 0; |
| Trees as = #[ ]; |
| for ( Tree p: ps ) |
| as = as.append(convert_reduce(p,#<nth(`e,`(i++))>)); |
| return #<tuple(...as)>; |
| case record(...bs): |
| Trees as = #[ ]; |
| for ( Tree b: bs ) |
| match b { |
| case bind(`n,`p): |
| as = as.append(#<bind(`n,`(convert_reduce(p,#<project(`e,`n)>)))>); |
| }; |
| return #<record(...as)>; |
| case `gb(`gm): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| Tree v = new_var(); |
| Tree me = convert_reduce_all(gm,#<nth(`v,1)>); |
| return #<cmap(lambda(`v,bag(tuple(nth(`v,0),`me))), |
| `gb(`e))>; |
| case _: |
| if (!m.is_variable()) |
| fail; |
| for ( Tree monoid: monoids ) |
| match monoid { |
| case `aggr(`mtp,`plus,`zero,`unit): |
| if (#<`aggr>.equals(m)) |
| return #<call(`aggr,`e)>; |
| }; |
| }; |
| return e; |
| } |
| |
| /** Convert coGroups back to joins and reduces to aggregations */ |
| private static Tree convert_to_algebra ( Tree e ) { |
| match e { |
| case coGroup(`x,`y): |
| Tree v = new_var(); |
| Tree vx = new_var(); |
| Tree vy = new_var(); |
| Tree cx = convert_to_algebra(x); |
| Tree cy = convert_to_algebra(y); |
| Tree mx = new_var(); |
| Tree my = new_var(); |
| return #<join(lambda(`vx,nth(`vx,0)),lambda(`vy,nth(`vy,0)), |
| lambda(`v,bag(tuple(call(join_key,nth(`v,0),nth(`v,1)), |
| tuple(cmap(lambda(`vx,bag(nth(`vx,1))),nth(`v,0)), |
| cmap(lambda(`vy,bag(nth(`vy,1))),nth(`v,1)))))), |
| `cx, `cy)>; |
| case reduce(`aggr,`s): |
| return convert_reduce(aggr,convert_to_algebra(s)); |
| case `f(...as): |
| Trees bs = #[ ]; |
| for ( Tree a: as ) |
| bs = bs.append(convert_to_algebra(a)); |
| return #<`f(...bs)>; |
| }; |
| return e; |
| } |
| |
| /** If the key of the join for merging states contains a float/double number, |
| * use approximing equality */ |
| private static Tree key_equality ( Tree type, Tree e ) { |
| if (occurences(#<float>,type) == 0 && occurences(#<double>,type) == 0) |
| return e; |
| match type { |
| case tuple(...as): |
| Trees bs = #[ ]; |
| int i = 0; |
| for ( Tree a: as ) |
| bs = bs.append(key_equality(a,#<nth(`e,`(i++))>)); |
| return #<tuple(...bs)>; |
| case record(...bs): |
| Trees cs = #[ ]; |
| for ( Tree b: bs ) |
| match b { |
| case bind(`n,`a): |
| cs = cs.append(#<bind(`n,`(key_equality(a,#<project(`e,`n)>)))>); |
| } |
| return #<record(...cs)>; |
| case `T(`tp): |
| if (!is_collection(T)) |
| fail; |
| Tree nv = new_var(); |
| return #<cmap(lambda(`nv,`(key_equality(tp,nv))),`e)>; |
| case float: |
| return #<call(round,`e)>; |
| case double: |
| return #<call(round,`e)>; |
| }; |
| return e; |
| } |
| |
| /** true if there is only one instance value of type tp */ |
| private static boolean unique_key ( Tree tp ) { |
| match tp { |
| case tuple(): |
| return true; |
| case `T(`t): |
| if (!is_collection(T)) |
| fail; |
| return unique_key(t); |
| case tuple(...as): |
| for ( Tree a: as ) |
| if (!unique_key(a)) |
| return false; |
| return true; |
| case record(...as): |
| for ( Tree a: as ) |
| match a { |
| case bind(_,`b): |
| if (!unique_key(b)) |
| return false; |
| } |
| return true; |
| }; |
| return false; |
| } |
| |
| /** Return the merge function (over X and Y) of the monoid m; type is used for key equality */ |
| private static Tree merge ( Tree m, Tree type, Tree X, Tree Y ) { |
| match m { |
| case `gb(`n): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| match type { |
| case `T(tuple(`keytp,`tp)): |
| if (unique_key(keytp)) { |
| Tree vx = new_var(); |
| Tree vy = new_var(); |
| Tree v = merge(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`vy),1)>); |
| if (is_persistent_collection(T)) { |
| X = #<Collect(`X)>; |
| Y = #<Collect(`Y)>; |
| }; |
| return #<let(`vx,`X, |
| let(`vy,`Y, |
| if(call(exists,`vx), |
| if(call(exists,`vy), |
| bag(tuple(`keytp,`v)), |
| `vx), |
| `vy)))>; |
| }; |
| // needs an outer-join in the reducer function |
| Tree v = new_var(); |
| Tree vx = new_var(); |
| Tree vy = new_var(); |
| Tree mx = new_var(); |
| Tree my = new_var(); |
| Tree mb = merge(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>); |
| Tree b = #<cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),`mb))), |
| `my)), |
| `mx)>; |
| return #<join(lambda(`vx,`(key_equality(keytp,#<nth(`vx,0)>))), |
| lambda(`vy,`(key_equality(keytp,#<nth(`vy,0)>))), |
| lambda(`v,let(`mx,nth(`v,0), |
| let(`my,nth(`v,1), |
| if(call(exists,`mx), |
| if(call(exists,`my),`b,`mx), |
| `my)))), |
| `X, `Y)>; |
| }; |
| throw new Error("Unknown type for merge: "+type); |
| case product(...ms): |
| Trees tps = ((Node)type).children(); |
| Trees bs = #[ ]; |
| int i = 0; |
| for ( Tree a: ms ) { |
| bs = bs.append(merge(a,tps.nth(i),#<nth(`X,`i)>,#<nth(`Y,`i)>)); |
| i++; |
| }; |
| return #<tuple(...bs)>; |
| case record(...bs): |
| Trees tps = ((Node)type).children(); |
| Trees cs = #[ ]; |
| int i = 0; |
| for ( Tree b: bs ) |
| match b { |
| case bind(`n,`a): |
| cs = cs.append(#<bind(`n,`(merge(a,((Node)tps.nth(i++)).children().nth(1), |
| #<project(`X,`n)>,#<project(`Y,`n)>)))>); |
| } |
| return #<record(...cs)>; |
| case union: |
| return #<call(plus,`X,`Y)>; |
| case fixed: |
| return Y; |
| case _: |
| if (!m.is_variable()) |
| fail; |
| for ( Tree monoid: monoids ) |
| match monoid { |
| case `aggr(`mtp,`plus,`zero,`unit): |
| if (#<`aggr>.equals(m)) |
| return #<apply(`plus,tuple(`X,`Y))>; |
| }; |
| }; |
| throw new Error("Undefined monoid: "+m); |
| } |
| |
| /** Return the zero element of the monoid m */ |
| private static Tree zero ( Tree m ) { |
| match m { |
| case `gb(`n): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| return #<bag()>; |
| case product(...ms): |
| Trees bs = #[ ]; |
| for ( Tree a: ms ) |
| bs = bs.append(zero(a)); |
| return #<tuple(...bs)>; |
| case record(...bs): |
| Trees cs = #[ ]; |
| for ( Tree b: bs ) |
| match b { |
| case bind(`n,`a): |
| cs = cs.append(#<bind(`n,`(zero(a)))>); |
| } |
| return #<record(...cs)>; |
| case fixed: |
| return #<bag()>; |
| case union: |
| return #<bag()>; |
| case _: |
| if (!m.is_variable()) |
| fail; |
| for ( Tree monoid: monoids ) |
| match monoid { |
| case `aggr(`mtp,`plus,`zero,`unit): |
| if (#<`aggr>.equals(m)) |
| return zero; |
| }; |
| }; |
| throw new Error("Undefined monoid: "+m); |
| } |
| |
| /** Convert joins to coGroups, plus other transformations */ |
| private static Tree normalize_term ( Tree e ) { |
| match e { |
| case join(`kx,`ky,`r,cmap(`f,`x),cmap(`g,`y)): |
| if (!x.equals(y)) |
| fail; |
| Tree v = new_var(), w = new_var(), |
| wx = new_var(), wy = new_var(), |
| vx = new_var(), vy = new_var(); |
| Tree xtp = TypeInference.type_inference(((Node)kx).children.head); |
| Tree ytp = TypeInference.type_inference(((Node)ky).children.head); |
| Tree T = #<union(inL(`xtp),inR(`ytp))>; |
| Tree X = #<cmap(lambda(`w,call(plus,cmap(lambda(`wx,bag(tuple(apply(`kx,`wx), |
| typed(tagged_union(0,`wx),`T)))), |
| apply(`f,`w)), |
| cmap(lambda(`wy,bag(tuple(apply(`ky,`wy), |
| typed(tagged_union(1,`wy),`T)))), |
| apply(`g,`w)))), |
| `x)>; |
| Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())), |
| nth(`v,1))>; |
| Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())), |
| nth(`v,1))>; |
| return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))), |
| groupBy(`X))>); |
| case join(`kx,`ky,`r,`x,cmap(`g,`y)): |
| if (!x.equals(y)) |
| fail; |
| Tree v = new_var(), w = new_var(), wy = new_var(), |
| vx = new_var(), vy = new_var(); |
| Tree xtp = TypeInference.type_inference(((Node)kx).children.head); |
| Tree ytp = TypeInference.type_inference(((Node)ky).children.head); |
| Tree T = #<union(inL(`xtp),inR(`ytp))>; |
| Tree X = #<cmap(lambda(`w,call(plus,bag(tuple(apply(`kx,`w),typed(tagged_union(0,`w),`T))), |
| cmap(lambda(`wy,bag(tuple(apply(`ky,`wy), |
| typed(tagged_union(1,`wy),`T)))), |
| apply(`g,`w)))), |
| `x)>; |
| Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())), |
| nth(`v,1))>; |
| Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())), |
| nth(`v,1))>; |
| return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))), |
| groupBy(`X))>); |
| case join(`kx,`ky,`r,cmap(`f,`x),`y): |
| if (!x.equals(y)) |
| fail; |
| Tree v = new_var(), w = new_var(), wx = new_var(), |
| vx = new_var(), vy = new_var(); |
| Tree xtp = TypeInference.type_inference(((Node)kx).children.head); |
| Tree ytp = TypeInference.type_inference(((Node)ky).children.head); |
| Tree T = #<union(inL(`xtp),inR(`ytp))>; |
| Tree X = #<cmap(lambda(`w,call(plus,cmap(lambda(`wx,bag(tuple(apply(`kx,`wx), |
| typed(tagged_union(0,`wx),`T)))), |
| apply(`f,`w)), |
| bag(tuple(apply(`ky,`w),typed(tagged_union(1,`w),`T))))), |
| `x)>; |
| Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())), |
| nth(`v,1))>; |
| Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())), |
| nth(`v,1))>; |
| return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))), |
| groupBy(`X))>); |
| case join(`kx,`ky,`r,`x,`y): |
| Tree v = new_var(); |
| Tree vx = new_var(); |
| Tree vy = new_var(); |
| Tree X = #<cmap(lambda(`vx,bag(tuple(apply(`kx,`vx),`vx))),`x)>; |
| Tree Y = #<cmap(lambda(`vy,bag(tuple(apply(`ky,`vy),`vy))),`y)>; |
| return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(nth(nth(`v,1),0), |
| nth(nth(`v,1),1)))), |
| coGroup(`X,`Y))>); |
| case `gb(`u): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| match u { |
| case cmap(...): |
| return #<`gb(`(normalize_term(u)))>; |
| }; |
| Tree nv = new_var(); |
| return #<`gb(cmap(lambda(`nv,bag(`nv)),`(normalize_term(u))))>; |
| case coGroup(`e1,`e2): |
| match e1 { |
| case cmap(...): |
| case _: |
| Tree v1 = new_var(); |
| e1 = #<cmap(lambda(`v1,bag(`v1)),`(normalize_term(e1)))>; |
| }; |
| match e2 { |
| case cmap(...): |
| case _: |
| Tree v2 = new_var(); |
| e2 = #<cmap(lambda(`v2,bag(`v2)),`(normalize_term(e2)))>; |
| }; |
| return #<coGroup(`(normalize_term(e1)),`(normalize_term(e2)))>; |
| case reduce(`m,`u): |
| match u { |
| case cmap(...): |
| return #<reduce(`m,`(normalize_term(u)))>; |
| }; |
| Tree nv = new_var(); |
| return #<reduce(`m,cmap(lambda(`nv,bag(`nv)),`(normalize_term(u))))>; |
| case call(`f,`s): |
| for ( Tree monoid: monoids ) |
| match monoid { |
| case `aggr(`mtp,`plus,`zero,`unit): |
| if (#<`aggr>.equals(f)) { |
| match s { |
| case cmap(...): |
| return #<reduce(`aggr,`(normalize_term(s)))>; |
| }; |
| Tree nv = new_var(); |
| return #<reduce(`aggr,cmap(lambda(`nv,bag(`nv)),`(normalize_term(s))))>; |
| } |
| }; |
| fail |
| case `f(...as): |
| Trees bs = #[ ]; |
| for ( Tree a: as ) |
| bs = bs.append(normalize_term(a)); |
| return #<`f(...bs)>; |
| }; |
| return e; |
| } |
| |
| /** Embed missing cmaps */ |
| private static Tree embed_missing_cmaps ( Tree e ) { |
| match e { |
| case project(`x,`a): |
| match TypeInference.type_inference(x) { |
| case `T(`tp): |
| if (!is_collection(T)) |
| fail; |
| Tree v = new_var(); |
| type_env.insert(v.toString(),tp); |
| return embed_missing_cmaps(#<cmap(lambda(`v,bag(project(`v,`a))),`x)>); |
| }; |
| fail |
| case nth(`x,`n): |
| match TypeInference.type_inference(x) { |
| case `T(`tp): |
| if (!is_collection(T)) |
| fail; |
| Tree v = new_var(); |
| type_env.insert(v.toString(),tp); |
| return embed_missing_cmaps(#<cmap(lambda(`v,bag(nth(`v,`n))),`x)>); |
| }; |
| fail |
| case `f(...as): |
| Trees bs = #[ ]; |
| for ( Tree a: as ) |
| bs = bs.append(embed_missing_cmaps(a)); |
| return #<`f(...bs)>; |
| }; |
| return e; |
| } |
| |
| private static Tree subst_pattern ( Tree pattern, Tree src, Tree dst ) { |
| match pattern { |
| case tuple(...as): |
| Tree res = dst; |
| int i = 0; |
| for ( Tree a: as ) |
| res = subst_pattern(a,#<nth(`src,`(i++))>,res); |
| return res; |
| }; |
| return subst_var(pattern,src,dst); |
| } |
| |
| /** Simplify the term e using rewrite rules */ |
| static Tree simplify_term ( Tree e ) { |
| match e { |
| case cmap(`f,cmap(lambda(`v,`u),`x)): |
| return simplify_term(#<cmap(lambda(`v,cmap(`f,`u)),`x)>); |
| case cmap(lambda(`x,`b),bag(`a)): |
| return simplify_term(subst_var(x,a,b)); |
| case cmap(lambda(`x,`b),bag()): |
| return #<bag()>; |
| case cmap(`f,if(`p,`e1,`e2)): |
| return simplify_term(#<if(`p,cmap(`f,`e1),cmap(`f,`e2))>); |
| case cmap(`f,call(plus,`x,`y)): |
| return simplify_term(#<call(plus,cmap(`f,`x),cmap(`f,`y))>); |
| case groupBy(bag()): |
| return #<bag()>; |
| case cmap(`f,join(`kx,`ky,lambda(`v,`r),`X,`Y)): |
| return simplify_term(#<join(`kx,`ky,lambda(`v,cmap(`f,`r)),`X,`Y)>); |
| case apply(lambda(`v,`b),`u): |
| if (!v.is_variable()) |
| fail; |
| return simplify_term(subst_var(v,u,b)); |
| case apply(lambda(`pattern,`b),`u): |
| return simplify_term(subst_pattern(pattern,u,b)); |
| case call(elem,bag(`u)): |
| return simplify_term(u); |
| case nth(tuple(...al),`n): |
| if (!n.is_long()) |
| fail; |
| int i = (int)n.longValue(); |
| if (i >= 0 && i < al.length()) |
| return simplify_term(al.nth(i)); |
| case project(record(...bl),`a): |
| for ( Tree b: bl ) |
| match b { |
| case bind(`v,`u): |
| if (v.equals(a)) |
| return simplify_term(u); |
| }; |
| case `f(...as): |
| Trees bs = #[ ]; |
| for ( Tree a: as ) |
| bs = bs.append(simplify_term(a)); |
| return #<`f(...bs)>; |
| }; |
| return e; |
| } |
| |
| /** Simplify the term e using rewrite rules */ |
| public static Tree SimplifyTerm ( Tree e ) { |
| Tree ne = simplify_term(e); |
| if (e.equals(ne)) |
| return e; |
| else return SimplifyTerm(ne); |
| } |
| |
| /** Does this term contain a stream source? */ |
| public static boolean is_streaming ( Tree e ) { |
| match e { |
| case call(stream,...): |
| return true; |
| case `f(...al): |
| for ( Tree a: al ) |
| if (is_streaming(a)) |
| return true; |
| }; |
| if (repeat_var(e)) |
| return true; |
| else return false; |
| } |
| |
| /** Collect all stream sources */ |
| private static Trees stream_bindings ( Tree e ) { |
| match e { |
| case call(stream,...): |
| return #[`e]; |
| case `f(...al): |
| if (#[BinaryStream,ParsedStream,SocketStream].member(#<`f>)) |
| return #[`e]; |
| Trees rs = #[]; |
| for ( Tree a: al ) |
| for ( Tree b: stream_bindings(a) ) |
| if (!rs.member(b)) |
| rs = rs.cons(b); |
| return rs; |
| }; |
| if (false && repeat_var(e)) |
| return #[`e]; |
| else return #[]; |
| } |
| |
| /** Convert an algebraic expression into a stream-based by pulling out all the stream sources |
| * @param e algebraic expression |
| * @return a stream-based expression |
| */ |
| public static Tree streamify ( Tree e ) { |
| Trees sbs = stream_bindings(e); |
| Tree ne = e; |
| for ( Tree sb: sbs ) { |
| Tree v = new_var(); |
| TypeInference.type_env.insert(v.toString(),TypeInference.type_inference(sb)); |
| ne = #<Stream(lambda(`v,`(subst(sb,v,ne))),`sb)>; |
| }; |
| return ne; |
| } |
| |
| public static Tree unstreamify ( Tree e ) { |
| match e { |
| case call(stream,...r): |
| return #<call(source,...r)>; |
| case BinaryStream(...r): |
| return #<BinarySource(...r)>; |
| case ParsedStream(...r): |
| return #<ParsedSource(...r)>; |
| case `f(...al): |
| Trees bs = #[ ]; |
| for ( Tree a: al ) |
| bs = bs.append(unstreamify(a)); |
| return #<`f(...bs)>; |
| }; |
| return e; |
| } |
| |
| /** Return the answer function of a query e |
| * @param e a query |
| * @param var the input of the answer function |
| * @param merge the monoid associated with the input |
| * @return the answer function |
| */ |
| static Tree answer ( Tree e, Tree var, Tree merge ) { |
| match e { |
| case cmap(`f,call(stream,...s)): |
| return var; |
| case cmap(`f,`v): |
| if (!repeat_var(v)) |
| fail; |
| return var; |
| case cmap(lambda(`v,`b),`u): |
| match merge { |
| case `gb(`m): |
| if (! #[groupBy,orderBy].member(#<`gb>) ) |
| fail; |
| Tree nv = new_var(); |
| return #<reduce(`gb(`m), |
| cmap(lambda(`nv,bag(tuple(nth(nth(`nv,0),0),nth(`nv,1)))), |
| `var))>; |
| } |
| }; |
| return var; |
| } |
| |
| /** Remove the state from the query output */ |
| private static Tree strip_state ( Tree e, Tree var ) { |
| match e { |
| case reduce(`m,call(stream,...s)): |
| return var; |
| case reduce(`m,`v): |
| if (!repeat_var(v)) |
| fail; |
| return var; |
| case reduce(`m,`u): |
| return var; |
| case cmap(`f,`u): |
| Tree nn = new_var(); |
| return #<cmap(lambda(`nn,bag(nth(`nn,1))),`var)>; |
| }; |
| return var; |
| } |
| |
| /** Convert a stream-based query to an incremental stream-based program. |
| * It returns the quadruple (zero,merge,homomorphism,answer), |
| * where (zero,merge) is the monoid for the homomorphism |
| * and answer(homomorphism) is equal to the original query |
| * @param e a stream-based query |
| * @param env binds variables to monoids |
| * @return a quadruple that represents the incremental stream-based program |
| */ |
| static Tree generate_code ( Tree e, Environment env ) { |
| if (Config.trace) { |
| System.out.println("Subterm:"); |
| System.out.println(e.pretty(0)); |
| }; |
| Tree ne = SimplifyTerm(normalize_term(e)); |
| TypeInference.type_inference(ne); |
| ne = SimplifyTerm(embed_missing_cmaps(ne)); |
| if (Config.trace) { |
| System.out.println("Subterm normalized:"); |
| System.out.println(ne.pretty(0)); |
| }; |
| Tree qe = SimplifyTerm(inject_q(ne)); |
| if (Config.trace) { |
| System.out.println("After lineage injection:"); |
| System.out.println(qe.pretty(0)); |
| }; |
| Tree nv = new_var(); |
| match split(qe,nv,env) { |
| case pair(`a,`h): |
| if (Config.trace) { |
| System.out.println("After split:"); |
| System.out.println(a.pretty(0)+"\n"+h.pretty(0)); |
| }; |
| h = SimplifyTerm(h); |
| if (Config.trace) { |
| System.out.println("After split (simplified):"); |
| System.out.println(SimplifyTerm(a).pretty(0)+"\n"+h.pretty(0)); |
| }; |
| Tree m = get_monoid(h,env); |
| if (Config.trace) |
| System.out.println("Subterm monoid: "+m.pretty(0)); |
| Tree q = convert_to_algebra(h); |
| Tree tp = TypeInference.type_inference(q); |
| if (Config.trace) { |
| System.out.println("Subterm type: "+tp); |
| System.out.println("Merge function over X and Y: "); |
| System.out.println(SimplifyTerm(merge(m,tp,#<X>,#<Y>)).pretty(0)); |
| }; |
| type_env.insert(nv.toString(),tp); |
| Tree answer = answer(ne,nv,m); |
| answer = subst(nv,answer,a); |
| answer = strip_state(ne,answer); |
| answer = SimplifyTerm(convert_to_algebra(SimplifyTerm(answer))); |
| if (Config.trace) { |
| System.out.println("Answer function:"); |
| System.out.println(answer.pretty(0)); |
| }; |
| Tree zero = zero(m); |
| Tree my = new_var(); |
| Tree merge = convert_to_algebra(SimplifyTerm(merge(m,tp,nv,my))); |
| Tree res = #<tuple(`zero,lambda(tuple(`nv,`my),`merge),`q,lambda(`nv,`answer))>; |
| if (Config.trace) { |
| System.out.println("Incremental subterm:"); |
| System.out.println(res.pretty(0)); |
| }; |
| return res; |
| }; |
| throw new Error("Cannot generate incremental code: "+e); |
| } |
| |
| private static Tree generate_incremental_code ( Tree e, Environment env ) { |
| if (!is_streaming(e)) |
| return #<tuple(tuple(),lambda(tuple(tuple(),tuple()),tuple()),tuple(),lambda(tuple(),`e))>; |
| match e { |
| case call(stream,...): |
| Tree v = new_var(); |
| return generate_code(#<cmap(lambda(`v,bag(`v)),`e)>,env); |
| case call(`f,...qs): |
| boolean is_monoid = false; |
| for ( Tree monoid: monoids ) |
| match monoid { |
| case `aggr(`mtp,`plus,`zero,`unit): |
| if (#<`aggr>.equals(f)) |
| is_monoid = true; |
| }; |
| if (is_monoid) |
| fail; |
| Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[]; |
| int i = 0; |
| for ( Tree q: qs ) |
| match generate_incremental_code(q,env) { |
| case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)): |
| zs = zs.append(z); hs = hs.append(h); as = as.append(a); |
| vs = vs.append(v); ws = ws.append(w); ms = ms.append(m); |
| i++; |
| }; |
| return #<tuple(tuple(...zs), |
| lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)), |
| tuple(...hs), |
| lambda(tuple(...vs),call(`f,...as)))>; |
| case record(...rs): |
| Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[]; |
| int i = 0; |
| for ( Tree r: rs ) |
| match r { |
| case bind(`k,`q): |
| match generate_incremental_code(q,env) { |
| case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)): |
| zs = zs.append(z); hs = hs.append(h); as = as.append(a); |
| vs = vs.append(v); ws = ws.append(w); ms = ms.append(m); |
| i++; |
| } |
| }; |
| return #<tuple(tuple(...zs), |
| lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)), |
| tuple(...hs), |
| lambda(tuple(...vs),record(...as)))>; |
| case tuple(...qs): |
| Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[]; |
| int i = 0; |
| for ( Tree q: qs ) |
| match generate_incremental_code(q,env) { |
| case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)): |
| zs = zs.append(z); hs = hs.append(h); as = as.append(a); |
| vs = vs.append(v); ws = ws.append(w); ms = ms.append(m); |
| i++; |
| }; |
| return #<tuple(tuple(...zs), |
| lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)), |
| tuple(...hs), |
| lambda(tuple(...vs),tuple(...as)))>; |
| }; |
| return generate_code(e,env); |
| } |
| |
| /** bind the pattern variables to types */ |
| private static void bind_pattern2type ( Tree pattern, Tree tp ) { |
| match pattern { |
| case tuple(...pl): |
| match tp { |
| case tuple(...tl): |
| int i = 0; |
| for ( Tree p: pl ) |
| bind_pattern2type(p,tl.nth(i++)); |
| return; |
| } |
| }; |
| TypeInference.type_env.insert(pattern.toString(),tp); |
| } |
| |
| /** Convert a stream-based query to an incremental stream-based program. |
| * @param e a stream-based query |
| * @return the incremental stream-based program |
| */ |
| public static Tree generate_incremental_code ( Tree e ) { |
| match e { |
| case repeat(lambda(`v,`u),`x,`n): |
| if (false && !is_streaming(x)) |
| fail; |
| if (!(n instanceof LongLeaf)) |
| throw new Error("The repeat must have a constant repetition: "+n); |
| int nn = (int)((LongLeaf)n).value(); |
| if (nn < 1) |
| throw new Error("Wrong repeat number: "+n); |
| Tree nv = new_var(); |
| u = #<cmap(lambda(`nv,bag(nth(`nv,0))),`u)>; |
| TypeInference.type_inference(u); |
| repeat_environment = new Environment(v.toString(),#<union>,repeat_environment); |
| match generate_incremental_code(u,null) { |
| case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a)): |
| Tree deltaT = new_var(); |
| Tree w = new_var(); |
| TypeInference.type_env.insert(deltaT.toString(),TypeInference.type_inference(s)); |
| TypeInference.type_env.insert(w.toString(),TypeInference.type_inference(s)); |
| Tree nm = subst(v1,deltaT,subst(v2,h,m)); |
| Tree m0 = subst(v,x,h); |
| Tree q = #<repeat(lambda(`deltaT,cmap(lambda(`w,bag(tuple(`w,true))),`nm)), |
| `m0, |
| `(nn-1))>; |
| Tree res = #<incr(tuple(`z,`(unstreamify(x))), |
| lambda(tuple(`s,`v),tuple(apply(lambda(tuple(`v1,`v2),`m),tuple(`s,`q)),`a)), |
| lambda(tuple(`s,`v),`v))>; |
| res = SimplifyTerm(res); |
| if (Config.trace) { |
| System.out.println("Incremental program:"); |
| System.out.println(res.pretty(0)); |
| }; |
| TypeInference.type_inference(res); |
| return res; |
| } |
| }; |
| match generate_incremental_code(e,null) { |
| case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(_,`a)): |
| Tree tp = TypeInference.type_inference(h); |
| bind_pattern2type(v1,tp); |
| Tree nm = SimplifyTerm(#<apply(lambda(`v2,`m),`h)>); |
| a = SimplifyTerm(a); |
| Tree res = #<incr(`z,lambda(`v1,`nm),lambda(`v1,`a))>; |
| if (Config.trace) { |
| System.out.println("Incremental program:"); |
| System.out.println(res.pretty(0)); |
| }; |
| TypeInference.type_inference(res); |
| return res; |
| }; |
| throw new Error("Cannot generate incremental code: "+e); |
| } |
| } |