blob: f21f9b4af34c3edeed7fe5c272c8b6ded2d327a8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.io.*;
import java.net.URI;
import java.util.List;
import java.util.Vector;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.filecache.DistributedCache;
/** The CrossProduct physical operation (similar to block-nested loop) */
final public class CrossProductOperation extends MapReducePlan {
/** The mapper for the CrossProduct operation */
private final static class crossProductMapper extends Mapper<MRContainer,MRContainer,MRContainer,MRContainer> {
private static String counter; // a Hadoop user-defined counter used in the repeat operation
private static Function reduce_fnc; // the reduce function
private static Function map_fnc; // the mapper function
private static DataSet cached_dataset;
private final static List<MRData> outer
= new Vector<MRData>(Config.map_cache_size); // fix-size cache for the outer
private static int index;
private static MRContainer last_key;
private static URI[] uris;
private static Path[] local_paths;
private static Function acc_fnc; // aggregator
private static MRData result; // aggregation result
private static Tuple pair = new Tuple(2);
private static MRContainer container = new MRContainer(new MR_int(0));
private void write ( MRContainer key, MRData value, Context context )
throws IOException, InterruptedException {
if (result != null) { // aggregation
pair.set(0,result);
pair.set(1,value);
result = acc_fnc.eval(pair);
} else if (counter.equals("-")) {
container.set(value);
context.write(key,container);
} else { // increment the repetition counter if the repeat condition is true
Tuple t = (Tuple)value;
if (((MR_bool)t.second()).get())
context.getCounter("mrql",counter).increment(1);
container.set(t.first());
context.write(key,container);
}
}
@Override
public void map ( MRContainer key, MRContainer value, Context context )
throws IOException, InterruptedException {
try {
last_key = key;
for ( MRData x: (Bag)map_fnc.eval(value.data()) )
if (index++ == Config.map_cache_size) {
for ( MRData y: cached_data(context.getConfiguration()) ) {
pair.set(1,y);
for ( MRData z: outer ) {
pair.set(0,z);
for ( MRData v: (Bag)reduce_fnc.eval(pair) )
write(key,v,context);
}
};
index = 0;
outer.clear();
} else outer.add(x);
} catch (Exception e) {
throw new Error("Cannot perform the crossProduct: "+e);
}
}
protected Bag cached_data ( final Configuration conf ) {
try {
Bag res = new Bag();
final FileSystem fs = FileSystem.getLocal(conf);
for ( int i = 0; i < local_paths.length; i++ ) {
// hadoop 0.20.2 distributed cache doesn't work in stand-alone
final Path path = (conf.get("mapred.job.tracker").equals("local"))
? new Path(uris[i].toString())
: local_paths[i];
if (path.getName().endsWith(".jar"))
continue;
res = res.union(new Bag(new BagIterator () {
final SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,conf);
final MRContainer key = new MRContainer(new MR_int(0));
final MRContainer value = new MRContainer(new MR_int(0));
public boolean hasNext () {
try {
boolean done = reader.next(key,value);
if (!done)
reader.close();
return done;
} catch (IOException e) {
throw new Error("Cannot collect values from distributed cache");
}
}
public MRData next () {
return value.data();
}
}));
};
return res;
} catch (Exception e) {
throw new Error("Cannot setup the cross product: "+e);
}
}
@Override
protected void setup ( Context context ) throws IOException,InterruptedException {
super.setup(context);
try {
conf = context.getConfiguration();
Plan.conf = conf;
Config.read(Plan.conf);
Tree code = Tree.parse(conf.get("mrql.reducer"));
reduce_fnc = functional_argument(conf,code);
code = Tree.parse(conf.get("mrql.mapper"));
map_fnc = functional_argument(conf,code);
if (conf.get("mrql.zero") != null) {
code = Tree.parse(conf.get("mrql.zero"));
result = Interpreter.evalE(code);
code = Tree.parse(conf.get("mrql.accumulator"));
acc_fnc = functional_argument(conf,code);
} else result = null;
counter = conf.get("mrql.counter");
uris = DistributedCache.getCacheFiles(conf);
local_paths = DistributedCache.getLocalCacheFiles(conf);
index = 0;
} catch (Exception e) {
throw new Error("Cannot setup the crossProduct: "+e);
}
}
@Override
protected void cleanup ( Context context ) throws IOException,InterruptedException {
if (index > 0)
try {
for ( MRData y: cached_data(context.getConfiguration()) ) {
pair.set(1,y);
for ( MRData z: outer ) {
pair.set(0,z);
for ( MRData v: (Bag)reduce_fnc.eval(pair) )
write(last_key,v,context);
}
};
} catch (Exception e) {
throw new Error("Cannot cleanup the crossProduct: "+e);
};
index = 0;
outer.clear();
if (result != null) // emit the result of aggregation
context.write(new MRContainer(new MR_int(0)),new MRContainer(result));
super.cleanup(context);
}
}
/** The CrossProduct physical operator (similar to block-nested loop)
* @param mx left mapper
* @param my right mapper
* @param reduce_fnc reducer
* @param acc_fnc optional accumulator function
* @param zero optional the zero value for the accumulator
* @param X the left source
* @param Y the right source (stored in distributed cache)
* @param stop_counter optional counter used in repeat operation
* @return a new data source that contains the result
*/
public final static DataSet crossProduct ( Tree mx, // left mapper
Tree my, // right mapper
Tree reduce_fnc, // reducer
Tree acc_fnc, // optional accumulator function
Tree zero, // optional the zero value for the accumulator
DataSet X, // the left source
DataSet Y, // the right source (stored in distributed cache)
String stop_counter ) // optional counter used in repeat operation
throws Exception {
DataSet ds = MapOperation.cMap(my,null,null,Y,"-");
conf = MapReduceEvaluator.clear_configuration(conf);
String newpath = new_path(conf);
conf.set("mrql.reducer",reduce_fnc.toString());
conf.set("mrql.mapper",mx.toString());
if (zero != null) {
conf.set("mrql.accumulator",acc_fnc.toString());
conf.set("mrql.zero",zero.toString());
} else conf.set("mrql.zero","");
conf.set("mrql.counter",stop_counter);
setupSplits(new DataSet[]{X,Y},conf);
Job job = new Job(conf,newpath);
distribute_compiled_arguments(job.getConfiguration());
job.setJarByClass(MapReducePlan.class);
job.setOutputKeyClass(MRContainer.class);
job.setOutputValueClass(MRContainer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
PathFilter pf = new PathFilter () { public boolean accept ( Path path ) {
return !path.getName().startsWith("_");
} };
for (DataSource p: ds.source) {
Path path = new Path(p.path);
for ( FileStatus s: path.getFileSystem(conf).listStatus(path,pf) )
DistributedCache.addCacheFile(s.getPath().toUri(),job.getConfiguration());
};
for (DataSource p: X.source)
MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,crossProductMapper.class);
FileOutputFormat.setOutputPath(job,new Path(newpath));
job.setNumReduceTasks(0);
job.waitForCompletion(true);
long c = (stop_counter.equals("-")) ? 0
: job.getCounters().findCounter("mrql",stop_counter).getValue();
return new DataSet(new BinaryDataSource(newpath,conf),c,outputRecords(job));
}
}