blob: 12d456bc689045c22096a56e3970fe90a1b294ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.impl.util.UriUtil;
/**
* Class that estimates the number of reducers based on input size.
* Number of reducers is based on two properties:
* <ul>
* <li>pig.exec.reducers.bytes.per.reducer -
* how many bytes of input per reducer (default is 1000*1000*1000)</li>
* <li>pig.exec.reducers.max -
* constrain the maximum number of reducer task (default is 999)</li>
* </ul>
* If using a loader that implements LoadMetadata the reported input size is used, otherwise
* attempt to determine size from the filesystem.
* <p>
* e.g. the following is your pig script
* <pre>
* a = load '/data/a';
* b = load '/data/b';
* c = join a by $0, b by $0;
* store c into '/tmp';
* </pre>
* and the size of /data/a is 1000*1000*1000, and the size of /data/b is
* 2*1000*1000*1000 then the estimated number of reducer to use will be
* (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3
*
*/
public class InputSizeReducerEstimator implements PigReducerEstimator {
private static final Log log = LogFactory.getLog(InputSizeReducerEstimator.class);
/**
* Determines the number of reducers to be used.
*
* @param job job instance
* @param mapReduceOper
* @throws java.io.IOException
*/
@Override
public int estimateNumberOfReducers(Job job, MapReduceOper mapReduceOper) throws IOException {
Configuration conf = job.getConfiguration();
long bytesPerReducer = conf.getLong(BYTES_PER_REDUCER_PARAM, DEFAULT_BYTES_PER_REDUCER);
int maxReducers = conf.getInt(MAX_REDUCER_COUNT_PARAM, DEFAULT_MAX_REDUCER_COUNT_PARAM);
List<POLoad> poLoads = PlanHelper.getPhysicalOperators(mapReduceOper.mapPlan, POLoad.class);
long totalInputFileSize = getTotalInputFileSize(conf, poLoads, job);
log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ maxReducers + " totalInputFileSize=" + totalInputFileSize);
// if totalInputFileSize == -1, we couldn't get the input size so we can't estimate.
if (totalInputFileSize == -1) { return -1; }
int reducers = (int)Math.ceil((double)totalInputFileSize / bytesPerReducer);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);
return reducers;
}
public static long getTotalInputFileSize(Configuration conf,
List<POLoad> lds, Job job) throws IOException {
return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE);
}
/**
* Get the input size for as many inputs as possible. Inputs that do not report
* their size nor can pig look that up itself are excluded from this size.
*
* @param conf Configuration
* @param lds List of POLoads
* @param job Job
* @param max Maximum value of total input size that will trigger exit. Many
* times we're only interested whether the total input size is greater than
* X or not. In such case, we can exit the function early as soon as the max
* is reached.
* @return
* @throws IOException
*/
static long getTotalInputFileSize(Configuration conf,
List<POLoad> lds, Job job, long max) throws IOException {
long totalInputFileSize = 0;
for (POLoad ld : lds) {
long size = getInputSizeFromLoader(ld, job);
if (size > -1) {
totalInputFileSize += size;
continue;
} else {
// the input file location might be a list of comma separated files,
// separate them out
for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
if (UriUtil.isHDFSFileOrLocalOrS3N(location, conf)) {
Path path = new Path(location);
FileSystem fs = path.getFileSystem(conf);
FileStatus[] status = fs.globStatus(path);
if (status != null) {
for (FileStatus s : status) {
totalInputFileSize += MapRedUtil.getPathLength(fs, s, max);
if (totalInputFileSize > max) {
break;
}
}
} else {
// If file is not found, we should report -1
continue;
}
} else {
// If we cannot estimate size of a location, we should report -1
continue;
}
}
}
}
return totalInputFileSize;
}
/**
* Get the total input size in bytes by looking at statistics provided by
* loaders that implement @{link LoadMetadata}.
* @param ld
* @param job
* @return total input size in bytes, or -1 if unknown or incomplete
* @throws IOException on error
*/
static long getInputSizeFromLoader(POLoad ld, Job job) throws IOException {
if (ld.getLoadFunc() == null
|| !(ld.getLoadFunc() instanceof LoadMetadata)
|| ld.getLFile() == null
|| ld.getLFile().getFileName() == null) {
return -1;
}
ResourceStatistics statistics;
try {
statistics = ((LoadMetadata) ld.getLoadFunc())
.getStatistics(ld.getLFile().getFileName(), job);
} catch (Exception e) {
log.warn("Couldn't get statistics from LoadFunc: " + ld.getLoadFunc(), e);
return -1;
}
if (statistics == null || statistics.getSizeInBytes() == null) {
return -1;
}
return statistics.getSizeInBytes();
}
}