blob: e644b83f539bcdce2a119d2af835c2a11a3d0560 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.util.*;
/** if the plan refers to a variable bound to a stream-based Bag and occurs in the code
* multiple times, embed code to materialize this Bag in memory
*/
final public class Materialization extends Translator {
// terms that access bulk terms directly
private static Trees access_terms = #[];
// access_terms that are repeated more than once
private static Trees repeat_terms = #[];
/** is this a direct-access term? (not the results of a bulk operation) */
private static boolean access_variable ( Tree e ) {
match e {
case nth(`x,_):
return access_variable(x);
case project(`x,_):
return access_variable(x);
case typed(union_value(`x),_):
return access_variable(x);
case index(`x,`n):
return access_variable(x);
case `v:
if (v.is_variable())
return true;
};
return false;
}
/** check if the term e is a direct access term that extracts a collection */
private static boolean transient_term ( Tree e ) {
match e {
case materialize(_):
return false;
};
if (e.is_variable() || !access_variable(e))
return false;
match TypeInference.type_inference(e) {
case `T(_):
if (is_collection(T))
return true;
};
return false;
}
/** derive the access_terms and the repated_terms */
private static void find_repeated_terms ( Tree e ) {
if (transient_term(e))
if (access_terms.member(e)) {
if (!repeat_terms.member(e))
repeat_terms = repeat_terms.cons(e);
} else access_terms = access_terms.cons(e);
else match e {
case type(_): ;
case materialize(_): ;
case `f(...as):
for ( Tree a: as)
find_repeated_terms(a);
}
}
/** if the term e is a bag processing operation that accesses a term
in access_terms and it is inside the functional of another
bag processing operation, it is a repeated_term */
private static void repeated_terms ( Tree e, int level ) {
if (access_terms.member(e) && level > 0) {
if (!repeat_terms.member(e))
repeat_terms = repeat_terms.cons(e);
} else match e {
case `f(lambda(`v,`b),`x,...):
if (! #[cmap,map,filter,aggregate].member(#<`f>))
fail;
repeated_terms(b,level+1); // the map function is repeated
repeated_terms(x,level);
case mapReduce(lambda(`v1,`b1),lambda(`v2,`b2),`x,...):
repeated_terms(b1,level+1); // the map function is repeated
repeated_terms(b2,level+1); // the reduce function is repeated
repeated_terms(x,level);
case `f(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v3,`b3),`x,`y,...):
if (! #[join,mapReduce2,crossProduct].member(#<`f>))
fail;
repeated_terms(b1,level+1); // the left key function is repeated
repeated_terms(b2,level+1); // the right key function is repeated
repeated_terms(b3,level+1); // the reduce function is repeated
repeated_terms(x,level);
repeated_terms(y,level);
case `f(...as):
for ( Tree a: as)
repeated_terms(a,level);
}
}
/** if a direct-access term that constructs a lazy bag (an iterator)
is used more than once, materialize it in memory */
public static Tree materialize_terms ( Tree e ) {
access_terms = #[];
repeat_terms = #[];
find_repeated_terms(e);
repeated_terms(e,0);
for ( Tree x: repeat_terms )
e = subst(x,#<materialize(`x)>,e);
return e;
}
}