blob: e80c925134464d1b1034256d37caa91d4e3815fd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.io.*;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Enumeration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
/** The MapReduce2 physical operator (a reduce-side join) */
final public class JoinOperation extends MapReducePlan {
/** Container for join input values. For reduce-side join, we concatenate
* 1 or 2 to the join key before we mix the right with the left tuples */
public final static class JoinKey implements Writable {
public byte tag; // 1 or 2
public MRData key;
JoinKey () {}
JoinKey ( byte t, MRData k ) { tag = t; key = k; }
public void write ( DataOutput out ) throws IOException {
out.writeByte(tag);
key.write(out);
}
public void readFields ( DataInput in ) throws IOException {
tag = in.readByte();
key = MRContainer.read(in);
}
public String toString () {
return "<"+tag+":"+key+">";
}
}
public final static class MRContainerJoinPartitioner extends Partitioner<JoinKey,MRContainer> {
final public int getPartition ( JoinKey key, MRContainer value, int numPartitions ) {
return Math.abs(key.key.hashCode()) % numPartitions;
}
}
/** The sorting of the joined values uses ths join key for major order and tag for minor order */
public final static class MRContainerSortComparator implements RawComparator<JoinKey> {
int[] container_size;
public MRContainerSortComparator () {
container_size = new int[1];
}
final public int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl ) {
int cmp = MRContainer.compare(x,xs+1,xl-1,y,ys+1,yl-1,container_size);
return (cmp == 0) ? x[xs]-y[ys] : cmp;
}
final public int compare ( JoinKey x, JoinKey y ) {
int c = x.key.compareTo(y.key);
return (c == 0) ? x.tag-y.tag : c;
}
}
/** The grouping of the joined values is based on join key only */
public final static class MRContainerGroupingComparator implements RawComparator<JoinKey> {
int[] container_size;
public MRContainerGroupingComparator() {
container_size = new int[1];
}
final public int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl ) {
return MRContainer.compare(x,xs+1,xl-1,y,ys+1,yl-1,container_size);
}
final public int compare ( JoinKey x, JoinKey y ) {
return x.key.compareTo(y.key);
}
}
/** The left mapper for MapReduce2 */
private final static class MapperLeft extends Mapper<MRContainer,MRContainer,JoinKey,MRContainer> {
private static Function mx; // the left mapper function
private static JoinKey join_key = new JoinKey((byte)2,new MR_int(0));
private static Tuple tvalue = (new Tuple(2)).set(0,new MR_byte(2));
private static MRContainer cvalue = new MRContainer(tvalue);
@Override
public void map ( MRContainer key, MRContainer value, Context context )
throws IOException, InterruptedException {
for ( MRData e: (Bag)mx.eval(value.data()) ) {
Tuple p = (Tuple)e;
join_key.key = p.first();
tvalue.set(1,p.second());
cvalue.set(tvalue);
context.write(join_key,cvalue);
}
}
@Override
protected void setup ( Context context ) throws IOException,InterruptedException {
super.setup(context);
try {
Configuration conf = context.getConfiguration();
Config.read(conf);
if (Plan.conf == null)
Plan.conf = conf;
Tree code = Tree.parse(conf.get("mrql.mapper.left"));
mx = functional_argument(conf,code);
} catch (Exception e) {
throw new Error("Cannot retrieve the left mapper plan");
}
}
@Override
protected void cleanup ( Context context ) throws IOException,InterruptedException {
super.cleanup(context);
}
}
/** The right mapper for MapReduce2 */
private final static class MapperRight extends Mapper<MRContainer,MRContainer,JoinKey,MRContainer> {
private static Function my; // the right mapper function
private static JoinKey join_key = new JoinKey((byte)1,new MR_int(0));
private static Tuple tvalue = (new Tuple(2)).set(0,new MR_byte(1));
private static MRContainer cvalue = new MRContainer(tvalue);
@Override
public void map ( MRContainer key, MRContainer value, Context context )
throws IOException, InterruptedException {
for ( MRData e: (Bag)my.eval(value.data()) ) {
Tuple p = (Tuple)e;
join_key.key = p.first();
tvalue.set(1,p.second());
cvalue.set(tvalue);
context.write(join_key,cvalue);
}
}
@Override
protected void setup ( Context context ) throws IOException,InterruptedException {
super.setup(context);
try {
Configuration conf = context.getConfiguration();
Config.read(conf);
if (Plan.conf == null)
Plan.conf = conf;
Tree code = Tree.parse(conf.get("mrql.mapper.right"));
my = functional_argument(conf,code);
} catch (Exception e) {
throw new Error("Cannot retrieve the right mapper plan");
}
}
@Override
protected void cleanup ( Context context ) throws IOException,InterruptedException {
super.cleanup(context);
}
}
/** The reducer for MapReduce2 */
private static class JoinReducer extends Reducer<JoinKey,MRContainer,MRContainer,MRContainer> {
private static String counter; // a Hadoop user-defined counter used in the repeat operation
private static Function combine_fnc; // the combiner function
private static Function reduce_fnc; // the reduce function
private static Bag left = new Bag(); // a cached bag of input fragments from left input
private static Bag right = new Bag(); // a cached bag of input fragments from right input
private static Function acc_fnc; // aggregator
private static MRData result; // aggregation result
private static Hashtable<MRData,MRData> hashTable; // in-mapper combiner
private static int index;
private static Tuple tkey = new Tuple(2);
private static Bag tbag = new Bag(2);
private static boolean streamed = false;
private static Tuple pair = new Tuple(2);
private static MRContainer ckey = new MRContainer(new MR_int(0));
private static MRContainer cvalue = new MRContainer(new MR_int(0));
private static MRContainer container = new MRContainer(new MR_int(0));
private void write ( MRContainer key, MRData value, Context context )
throws IOException, InterruptedException {
if (result != null) { // aggregation
pair.set(0,result);
pair.set(1,value);
result = acc_fnc.eval(pair);
} else if (hashTable != null) {
MRData k = ((Tuple)value).get(0);
MRData v = ((Tuple)value).get(1);
MRData old = hashTable.get(k);
if (old == null) {
if (index++ == Config.map_cache_size)
flush_table(context);
hashTable.put(k,v);
} else {
tkey.set(0,key.data());
tbag.clear();
tbag.add_element(v).add_element(old);
tkey.set(1,tbag);
for ( MRData x: (Bag)combine_fnc.eval(tkey) )
hashTable.put(k,x); // normally, done once
}
} else if (counter.equals("-")) {
container.set(value);
context.write(key,container);
} else { // increment the repetition counter if the repeat condition is true
Tuple t = (Tuple)value;
if (((MR_bool)t.second()).get())
context.getCounter("mrql",counter).increment(1);
container.set(t.first());
context.write(key,container);
}
}
protected static void flush_table ( Context context ) throws IOException, InterruptedException {
Enumeration<MRData> en = hashTable.keys();
while (en.hasMoreElements()) {
MRData key = en.nextElement();
ckey.set(key);
MRData value = hashTable.get(key);
cvalue.set(value);
if (value != null)
context.write(ckey,new MRContainer(new Tuple(key,value)));
};
index = 0;
hashTable.clear();
}
@Override
public void reduce ( JoinKey key, Iterable<MRContainer> values, Context context )
throws IOException, InterruptedException {
if (!streamed) {
left.clear();
right.clear();
for ( MRContainer val: values ) {
Tuple p = (Tuple)val.data();
if (((MR_byte)p.first()).get() == 1)
right.add(p.second());
else left.add(p.second());
};
} else { // the left input is processed lazily (as a stream-based bag)
right.clear();
Tuple p = null;
final Iterator<MRContainer> i = values.iterator();
while (i.hasNext()) {
p = (Tuple)i.next().data();
if (((MR_byte)p.first()).get() == 2)
break;
right.add(p.second());
p = null;
};
final Tuple data = p;
left = new Bag(new BagIterator () {
boolean first_time = data != null;
public boolean hasNext () {
return first_time || i.hasNext();
}
public MRData next () {
if (!first_time) {
Tuple t = (Tuple)i.next().data();
assert(((MR_byte)(t.first())).get() == 2);
return t.second();
};
first_time = false;
return data.second();
}
});
};
pair.set(0,left);
pair.set(1,right);
Bag s = (Bag)reduce_fnc.eval(pair);
s.materialize();
for ( MRData e: s ) {
ckey.set(key.key);
write(ckey,e,context);
}
}
@Override
protected void setup ( Context context ) throws IOException,InterruptedException {
super.setup(context);
try {
conf = context.getConfiguration();
Plan.conf = conf;
Config.read(Plan.conf);
Tree code = Tree.parse(conf.get("mrql.reducer"));
reduce_fnc = functional_argument(conf,code);
streamed = PlanGeneration.streamed_MapReduce2_reducer(code);
if (conf.get("mrql.zero") != null) {
code = Tree.parse(conf.get("mrql.zero"));
result = Interpreter.evalE(code);
code = Tree.parse(conf.get("mrql.accumulator"));
acc_fnc = functional_argument(conf,code);
} else result = null;
counter = conf.get("mrql.counter");
code = Tree.parse(conf.get("mrql.combiner"));
hashTable = null;
if (code != null && !code.equals(new VariableLeaf("null"))) {
combine_fnc = functional_argument(conf,code);
hashTable = new Hashtable<MRData,MRData>(Config.map_cache_size);
index = 0;
}
} catch (Exception e) {
throw new Error("Cannot retrieve the reducer plan");
}
}
@Override
protected void cleanup ( Context context ) throws IOException,InterruptedException {
if (result != null) // emit the result of aggregation
context.write(new MRContainer(new MR_int(0)),new MRContainer(result));
if (hashTable != null)
flush_table(context);
hashTable = null; // garbage-collect it
super.cleanup(context);
}
}
/** The MapReduce2 physical operator (a reduce-side join)
* @param mx left mapper function
* @param my right mapper function
* @param combine_fnc optional in-mapper combiner function
* @param reduce_fnc reducer function
* @param acc_fnc optional accumulator function
* @param zero optional the zero value for the accumulator
* @param X left data set
* @param Y right data set
* @param num_reduces number of reducers
* @param stop_counter optional counter used in repeat operation
* @param orderp does the result need to be ordered?
* @return a new data source that contains the result
*/
public final static DataSet mapReduce2 ( Tree mx, // left mapper function
Tree my, // right mapper function
Tree combine_fnc, // optional in-mapper combiner function
Tree reduce_fnc, // reducer function
Tree acc_fnc, // optional accumulator function
Tree zero, // optional the zero value for the accumulator
DataSet X, // left data set
DataSet Y, // right data set
int num_reduces, // number of reducers
String stop_counter, // optional counter used in repeat operation
boolean orderp ) // does the result need to be ordered?
throws Exception {
conf = MapReduceEvaluator.clear_configuration(conf);
String newpath = new_path(conf);
conf.set("mrql.mapper.left",mx.toString());
conf.set("mrql.mapper.right",my.toString());
if (combine_fnc != null)
conf.set("mrql.combiner",combine_fnc.toString());
conf.set("mrql.reducer",reduce_fnc.toString());
if (zero != null) {
conf.set("mrql.accumulator",acc_fnc.toString());
conf.set("mrql.zero",zero.toString());
} else conf.set("mrql.zero","");
conf.set("mrql.counter",stop_counter);
setupSplits(new DataSet[]{X,Y},conf);
Job job = new Job(conf,newpath);
distribute_compiled_arguments(job.getConfiguration());
job.setMapOutputKeyClass(JoinKey.class);
job.setJarByClass(MapReducePlan.class);
job.setOutputKeyClass(MRContainer.class);
job.setOutputValueClass(MRContainer.class);
job.setPartitionerClass(MRContainerJoinPartitioner.class);
job.setSortComparatorClass(MRContainerSortComparator.class);
job.setGroupingComparatorClass(MRContainerGroupingComparator.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job,new Path(newpath));
for (DataSource p: X.source)
MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MapperLeft.class);
for (DataSource p: Y.source)
MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MapperRight.class);
if (Config.trace && PlanGeneration.streamed_MapReduce2_reducer(reduce_fnc))
System.out.println("Streamed MapReduce2 reducer");
job.setReducerClass(JoinReducer.class);
if (num_reduces > 0)
job.setNumReduceTasks(num_reduces);
job.waitForCompletion(true);
long c = (stop_counter.equals("-")) ? 0
: job.getCounters().findCounter("mrql",stop_counter).getValue();
DataSource s = new BinaryDataSource(newpath,conf);
s.to_be_merged = orderp;
return new DataSet(s,c,outputRecords(job));
}
}