blob: 187c57ff0def1622b9fc731fffa4f53c4fa9b269 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
/**
* A map-reduce job that captures a join with group-by. Similar to matrix multiplication.<br>
* It captures queries of the form:
* <pre>
* select r((kx,ky),sum(z))
* from x in X, y in Y, z = (x,y)
* where jx(x) = jy(y)
* group by (kx,ky): (gx(x),gy(y));
* </pre>
* where:<br>
* jx: left join key function from a to k<br>
* jy: right join key function from b to k<br>
* gx: group-by key function from a to k1<br>
* gy: group-by key function from b to k2<br>
* sum: a summation from {(a,b)} to c based on the accumulator
* acc from (c,(a,b)) to c with a left zero of type c<br>
* r: reducer from ((k1,k2),c) to d<br>
* X: left input of type {a}<br>
* Y: right input of type {b}<br>
* It returns a value of type {d}<br>
* <br>
* Example: matrix multiplication:
* <pre>
* select ( s(z), i, j )
* from (x,i,k) in X, (y,k,j) in Y, z = (x,y)
* group by (i,j);
* </pre>
* where the summation s is based on the accumulator acc(c,(x,y))=c+x*y and zero=0<br>
* It uses m*n partitions, so that n/m=|X|/|Y| and a hash table of size |X|/n*|Y|/m can fit in memory M.
* That is, n = |X|/sqrt(M), m = |Y|/sqrt(M).
* Each partition generates |X|/n*|Y|/m data. It replicates X n times and Y m times.
* Uses a hash-table H of size |X|/n*|Y|/m.<br>
* MapReduce pseudo-code:
* <pre>
* mapX ( x )
* for i = 0,n-1
* emit ( ((hash(gx(x)) % m)+m*i, jx(x), 1), (1,x) )
*
* mapY ( y )
* for i = 0,m-1
* emit ( ((hash(gy(y)) % n)*m+i, jy(y), 2), (2,y) )
* </pre>
* mapper output key: (partition,joinkey,tag), value: (tag,data) <br>
* Partitioner: over partition <br>
* GroupingComparator: over partition and joinkey <br>
* SortComparator: major partition, minor joinkey, sub-minor tag <br>
* <pre>
* reduce ( (p,_,_), s )
* if p != current_partition
* flush()
* current_partition = p
* read x from s first and store it to xs
* for each y from the rest of s
* for each x in xs
* H[(gx(x),gy(y))] = acc( H[(gx(x),gy(y))], (x,y) )
* </pre>
* where flush() is: for each ((kx,ky),v) in H: emit r((kx,ky),v)
*/
final public class GroupByJoinPlan extends MapReducePlan {
/** mapper output key: (partition,joinkey,tag) */
private final static class GroupByJoinKey implements Writable {
public int partition; // one of n*m
public byte tag; // 1 or 2
public MRData key;
GroupByJoinKey () {}
GroupByJoinKey ( int p, byte t, MRData k ) {
partition = p;
tag = t;
key = k;
}
public void write ( DataOutput out ) throws IOException {
out.writeByte(tag);
WritableUtils.writeVInt(out,partition);
key.write(out);
}
public void readFields ( DataInput in ) throws IOException {
tag = in.readByte();
partition = WritableUtils.readVInt(in);
key = MRContainer.read(in);
}
public String toString () { return "["+partition+","+tag+","+key+"]"; }
}
/** partition based on key.partition only */
private final static class GroupByJoinPartitioner extends Partitioner<GroupByJoinKey,MRContainer> {
final public int getPartition ( GroupByJoinKey key, MRContainer value, int numPartitions ) {
return key.partition % numPartitions;
}
}
/** sorting with major order key.partition, minor key.key, minor key.tag */
private final static class GroupByJoinSortComparator implements RawComparator<GroupByJoinKey> {
int[] container_size;
public GroupByJoinSortComparator () {
container_size = new int[1];
}
final public int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl ) {
try {
int c = WritableComparator.readVInt(x,xs+1)-WritableComparator.readVInt(y,ys+1);
if (c != 0)
return c;
int tsize = 1+WritableUtils.decodeVIntSize(x[xs+1]);
c = MRContainer.compare(x,xs+tsize,xl-tsize,y,ys+tsize,yl-tsize,container_size);
if (c != 0)
return c;
return x[xs] - y[ys];
} catch (IOException e) {
throw new Error(e);
}
}
final public int compare ( GroupByJoinKey x, GroupByJoinKey y ) {
int c = x.partition - y.partition;
if (c != 0)
return c;
c = x.key.compareTo(y.key);
if (c != 0)
return c;
return x.tag - y.tag;
}
}
/** grouping by key.partition and key.key */
private final static class GroupByJoinGroupingComparator implements RawComparator<GroupByJoinKey> {
int[] container_size;
public GroupByJoinGroupingComparator() {
container_size = new int[1];
}
final public int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl ) {
try {
int c = WritableComparator.readVInt(x,xs+1)-WritableComparator.readVInt(y,ys+1);
if (c != 0)
return c;
int tsize = 1+WritableUtils.decodeVIntSize(x[xs+1]);
return MRContainer.compare(x,xs+tsize,xl-tsize,y,ys+tsize,yl-tsize,container_size);
} catch (IOException e) {
throw new Error(e);
}
}
final public int compare ( GroupByJoinKey x, GroupByJoinKey y ) {
int c = x.partition - y.partition;
return (c != 0) ? c : x.key.compareTo(y.key);
}
}
/** the left GroupByJoin mapper */
private final static class MapperLeft extends Mapper<MRContainer,MRContainer,GroupByJoinKey,MRContainer> {
private static int n, m;
private static Function left_join_key_fnc;
private static Function left_groupby_fnc;
private static GroupByJoinKey ckey = new GroupByJoinKey(0,(byte)1,new MR_int(0));
private static Tuple tvalue = (new Tuple(2)).set(0,new MR_byte(1));
private static MRContainer cvalue = new MRContainer(tvalue);
@Override
public void map ( MRContainer key, MRContainer value, Context context )
throws IOException, InterruptedException {
MRData data = value.data();
for ( int i = 0; i < n; i++ ) {
ckey.partition = (left_groupby_fnc.eval(data).hashCode() % m)+m*i;
ckey.key = left_join_key_fnc.eval(data);
tvalue.set(1,data);
context.write(ckey,cvalue);
}
}
@Override
protected void setup ( Context context ) throws IOException,InterruptedException {
super.setup(context);
try {
Configuration conf = context.getConfiguration();
Config.read(conf);
if (Plan.conf == null)
Plan.conf = conf;
Tree code = Tree.parse(conf.get("mrql.join.key.left"));
left_join_key_fnc = functional_argument(conf,code);
code = Tree.parse(conf.get("mrql.groupby.left"));
left_groupby_fnc = functional_argument(conf,code);
m = conf.getInt("mrql.m",1);
n = conf.getInt("mrql.n",1);
} catch (Exception e) {
throw new Error("Cannot retrieve the left mapper plan");
}
}
}
/** the right GroupByJoin mapper */
private final static class MapperRight extends Mapper<MRContainer,MRContainer,GroupByJoinKey,MRContainer> {
private static int n, m;
private static Function right_join_key_fnc;
private static Function right_groupby_fnc;
private static GroupByJoinKey ckey = new GroupByJoinKey(0,(byte)2,new MR_int(0));
private static Tuple tvalue = (new Tuple(2)).set(0,new MR_byte(2));
private static MRContainer cvalue = new MRContainer(tvalue);
@Override
public void map ( MRContainer key, MRContainer value, Context context )
throws IOException, InterruptedException {
MRData data = value.data();
for ( int i = 0; i < m; i++ ) {
ckey.partition = (right_groupby_fnc.eval(data).hashCode() % n)*m+i;
ckey.key = right_join_key_fnc.eval(data);
tvalue.set(1,data);
context.write(ckey,cvalue);
}
}
@Override
protected void setup ( Context context ) throws IOException,InterruptedException {
super.setup(context);
try {
Configuration conf = context.getConfiguration();
Config.read(conf);
if (Plan.conf == null)
Plan.conf = conf;
Tree code = Tree.parse(conf.get("mrql.join.key.right"));
right_join_key_fnc = functional_argument(conf,code);
code = Tree.parse(conf.get("mrql.groupby.right"));
right_groupby_fnc = functional_argument(conf,code);
m = conf.getInt("mrql.m",1);
n = conf.getInt("mrql.n",1);
} catch (Exception e) {
throw new Error("Cannot retrieve the right mapper plan");
}
}
}
/** the GroupByJoin reducer */
private static class JoinReducer extends Reducer<GroupByJoinKey,MRContainer,MRContainer,MRContainer> {
private static String counter; // a Hadoop user-defined counter used in the repeat operation
private static int n, m; // n*m partitioners
private static Function left_groupby_fnc; // left group-by function
private static Function right_groupby_fnc;// right group-by function
private static Function accumulator_fnc; // the accumulator function
private static MRData zero_value; // the left zero of the accumulator
private static Function reduce_fnc; // the reduce function
private static Bag left = new Bag(); // a cached bag of input fragments from left input
private static int current_partition = -1;
private static Hashtable<MRData,MRData> hashTable; // in-reducer combiner
private static Tuple pair = (new Tuple(2)).set(1,new Tuple(2));
private static MRContainer ckey = new MRContainer(new MR_int(0));
private static MRContainer cvalue = new MRContainer(new MR_int(0));
private static MRContainer container = new MRContainer(new MR_int(0));
private static Tuple tkey = new Tuple(2);
private static Bag tbag = new Bag(2);
private static void write ( MRContainer key, MRData value, Context context )
throws IOException, InterruptedException {
if (counter.equals("-")) {
container.set(value);
context.write(key,container);
} else { // increment the repetition counter if the repeat condition is true
Tuple t = (Tuple)value;
if (((MR_bool)t.second()).get())
context.getCounter("mrql",counter).increment(1);
container.set(t.first());
context.write(key,container);
}
}
private void store ( MRData key, MRData value ) throws IOException {
MRData old = hashTable.get(key);
if (old == null)
old = zero_value;
pair.set(0,old);
pair.set(1,value);
hashTable.put(key,accumulator_fnc.eval(pair));
}
protected static void flush_table ( Context context ) throws IOException, InterruptedException {
Tuple pair = new Tuple(2);
for ( Map.Entry<MRData,MRData> me: hashTable.entrySet() ) {
ckey.set(me.getKey());
pair.set(0,me.getKey());
pair.set(1,me.getValue());
write(ckey,reduce_fnc.eval(pair),context);
};
hashTable.clear();
}
@Override
public void reduce ( GroupByJoinKey key, Iterable<MRContainer> values, Context context )
throws IOException, InterruptedException {
if (key.partition != current_partition && current_partition > 0) {
// at the end of a partition, flush the hash table
flush_table(context);
current_partition = key.partition;
};
left.clear();
Tuple p = null;
final Iterator<MRContainer> i = values.iterator();
// left tuples arrive before right tuples; cache the left values into the left bag
while (i.hasNext()) {
p = (Tuple)i.next().data();
if (((MR_byte)p.first()).get() == 2)
break;
left.add(p.second());
p = null;
};
// the previous value was from right
if (p != null) {
MRData y = p.second();
MRData gy = right_groupby_fnc.eval(y);
// cross product with left (must use new Tuples)
for ( MRData x: left )
store(new Tuple(left_groupby_fnc.eval(x),gy),new Tuple(x,y));
// the rest of values are from right
while (i.hasNext()) {
y = ((Tuple)i.next().data()).second();
gy = right_groupby_fnc.eval(y);
// cross product with left (must use new Tuples)
for ( MRData x: left )
store(new Tuple(left_groupby_fnc.eval(x),gy),new Tuple(x,y));
}
}
}
@Override
protected void setup ( Context context ) throws IOException,InterruptedException {
super.setup(context);
try {
conf = context.getConfiguration();
Plan.conf = conf;
Config.read(Plan.conf);
Tree code = Tree.parse(conf.get("mrql.groupby.left"));
left_groupby_fnc = functional_argument(conf,code);
code = Tree.parse(conf.get("mrql.groupby.right"));
right_groupby_fnc = functional_argument(conf,code);
m = conf.getInt("mrql.m",1);
n = conf.getInt("mrql.n",1);
code = Tree.parse(conf.get("mrql.accumulator"));
accumulator_fnc = functional_argument(conf,code);
code = Tree.parse(conf.get("mrql.zero"));
zero_value = Interpreter.evalE(code);
code = Tree.parse(conf.get("mrql.reducer"));
reduce_fnc = functional_argument(conf,code);
counter = conf.get("mrql.counter");
hashTable = new Hashtable<MRData,MRData>(Config.map_cache_size);
} catch (Exception e) {
throw new Error("Cannot retrieve the reducer plan");
}
}
@Override
protected void cleanup ( Context context ) throws IOException,InterruptedException {
if (hashTable != null)
flush_table(context);
hashTable = null; // garbage-collect it
super.cleanup(context);
}
}
/** the GroupByJoin operation:
* an equi-join combined with a group-by implemented using hashing
* @param left_join_key_fnc left join key function from a to k
* @param right_join_key_fnc right join key function from b to k
* @param left_groupby_fnc left group-by function from a to k1
* @param right_groupby_fnc right group-by function from b to k2
* @param accumulator_fnc accumulator function from (c,(a,b)) to c
* @param zero the left zero of accumulator of type c
* @param reduce_fnc reduce function from ((k1,k2),c) to d
* @param X left data set of type {a}
* @param Y right data set of type {b}
* @param num_reducers number of reducers
* @param n left dimension of the reducer grid
* @param m right dimension of the reducer grid
* @param stop_counter optional counter used in repeat operation
* @return a DataSet that contains the result of type {d}
*/
public final static DataSet groupByJoin
( Tree left_join_key_fnc, // left join key function
Tree right_join_key_fnc, // right join key function
Tree left_groupby_fnc, // left group-by function
Tree right_groupby_fnc, // right group-by function
Tree accumulator_fnc, // accumulator function
Tree zero, // the left zero of accumulator
Tree reduce_fnc, // reduce function
DataSet X, // left data set
DataSet Y, // right data set
int num_reducers, // number of reducers
int n, int m, // dimensions of the reducer grid
String stop_counter ) // optional counter used in repeat operation
throws Exception {
conf = MapReduceEvaluator.clear_configuration(conf);
String newpath = new_path(conf);
conf.set("mrql.join.key.left",left_join_key_fnc.toString());
conf.set("mrql.join.key.right",right_join_key_fnc.toString());
conf.set("mrql.groupby.left",left_groupby_fnc.toString());
conf.set("mrql.groupby.right",right_groupby_fnc.toString());
conf.setInt("mrql.m",m);
conf.setInt("mrql.n",n);
conf.set("mrql.accumulator",accumulator_fnc.toString());
conf.set("mrql.zero",zero.toString());
conf.set("mrql.reducer",reduce_fnc.toString());
conf.set("mrql.counter",stop_counter);
setupSplits(new DataSet[]{X,Y},conf);
Job job = new Job(conf,newpath);
distribute_compiled_arguments(job.getConfiguration());
job.setMapOutputKeyClass(GroupByJoinKey.class);
job.setJarByClass(GroupByJoinPlan.class);
job.setOutputKeyClass(MRContainer.class);
job.setOutputValueClass(MRContainer.class);
job.setPartitionerClass(GroupByJoinPartitioner.class);
job.setSortComparatorClass(GroupByJoinSortComparator.class);
job.setGroupingComparatorClass(GroupByJoinGroupingComparator.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job,new Path(newpath));
for (DataSource p: X.source)
MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MapperLeft.class);
for (DataSource p: Y.source)
MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MapperRight.class);
job.setReducerClass(JoinReducer.class);
if (num_reducers > 0)
job.setNumReduceTasks(num_reducers);
job.waitForCompletion(true);
long c = (stop_counter.equals("-")) ? 0
: job.getCounters().findCounter("mrql",stop_counter).getValue();
DataSource s = new BinaryDataSource(newpath,conf);
s.to_be_merged = false;
return new DataSet(s,c,MapReducePlan.outputRecords(job));
}
}