blob: 7a996630378a07375611358677b658b9c737c3f3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
/** A superclass for Hadoop MapReduce physical operators (files *Operator.java) */
public class MapReducePlan extends Plan {
MapReducePlan () {}
/** find the number of records in the hadoop MapReduce job output */
public final static long outputRecords ( Job job ) throws Exception {
CounterGroup cg = job.getCounters().getGroup("org.apache.hadoop.mapred.Task$Counter");
long rc = cg.findCounter("REDUCE_OUTPUT_RECORDS").getValue();
if (rc == 0)
return cg.findCounter("MAP_OUTPUT_RECORDS").getValue();
return rc;
}
/** Set hadoop map min and max split size based on number of requested nodes */
public static void setupSplits ( DataSet[] dsv, Configuration conf ) throws IOException {
int len = 0;
for ( DataSet ds: dsv )
len += ds.source.size();
long[] sizes = new long[len];
int i = 0;
for ( DataSet ds: dsv )
for ( DataSource s: ds.source ) {
sizes[i] = s.size(conf);
i++;
};
long total_size = 0;
for ( long size: sizes )
total_size += size;
long split_size = Math.max(total_size/Config.nodes,1024L);
int tasks = 0;
int c = 0;
do { // adjust split_size
tasks = 0;
for ( long size: sizes )
tasks += (int)Math.ceil(size/(double)split_size);
if (tasks > Config.nodes)
split_size = (long)Math.ceil((double)split_size*1.01);
} while (c++ < 20 && tasks > Config.nodes);
conf.setLong("mapred.min.split.size",split_size);
conf.setLong("mapred.max.split.size",split_size);
}
/** Set hadoop map min and max split size based on number of requested nodes */
public static void setupSplits ( DataSet ds, Configuration conf ) throws IOException {
setupSplits(new DataSet[]{ds},conf);
}
/** The Aggregate physical operator
* @param acc_fnc the accumulator function from (T,T) to T
* @param zero the zero element of type T
* @param S the dataset that contains the bag of values {T}
* @return the aggregation result of type T
*/
public final static MRData aggregate ( final Tree acc_fnc,
final Tree zero,
final DataSet S ) throws Exception {
MRData res = Interpreter.evalE(zero);
Function accumulator = functional_argument(Plan.conf,acc_fnc);
Tuple pair = new Tuple(2);
for ( DataSource s: S.source )
if (s.inputFormat != MapReduceBinaryInputFormat.class) {
pair.set(0,res);
pair.set(1,aggregate(acc_fnc,zero,
MapOperation.cMap(Interpreter.identity_mapper,acc_fnc,zero,
new DataSet(s,0,0),"-")));
res = accumulator.eval(pair);
} else {
Path path = new Path(s.path);
final FileSystem fs = path.getFileSystem(conf);
final FileStatus[] ds
= fs.listStatus(path,
new PathFilter () {
public boolean accept ( Path path ) {
return !path.getName().startsWith("_");
}
});
MRContainer key = new MRContainer(new MR_int(0));
MRContainer value = new MRContainer(new MR_int(0));
for ( int i = 0; i < ds.length; i++ ) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs,ds[i].getPath(),conf);
while (reader.next(key,value)) {
pair.set(0,res);
pair.set(1,value.data());
res = accumulator.eval(pair);
};
reader.close();
}
};
return res;
}
/** The repeat physical operator. It repeats the loop until all values satisfy
* the termination condition (when dataset.counter == 0)
* @param loop the function from DataSet to DataSet to be repeated
* @param init the initial input DataSet for the loop
* @param max_num max number of repetitions
* @return the resulting DataSet from the loop
*/
public final static DataSet repeat ( Function loop, DataSet init, int max_num ) {
MR_dataset s = new MR_dataset(init);
int i = 0;
do {
s.dataset = ((MR_dataset)loop.eval(s)).dataset;
i++;
if (!Config.testing)
System.err.println("Repeat #"+i+": "+s.dataset.counter+" true results");
} while (s.dataset.counter != 0 && i < max_num);
return s.dataset;
}
/** The closure physical operator. It repeats the loop until the size
* of the new DataSet is equal to that of the previous DataSet
* @param loop the function from DataSet to DataSet to be repeated
* @param init the initial input DataSet for the loop
* @param max_num max number of repetitions
* @return the resulting DataSet from the loop
*/
public final static DataSet closure ( Function loop, DataSet init, int max_num ) {
MR_dataset s = new MR_dataset(init);
int i = 0;
long n = 0;
long old = 0;
do {
s.dataset = ((MR_dataset)loop.eval(s)).dataset;
i++;
if (!Config.testing)
System.err.println("Repeat #"+i+": "+(s.dataset.records-n)+" new records");
old = n;
n = s.dataset.records;
} while (old < n && i < max_num);
return s.dataset;
}
}