blob: 02174e4a6aac575de02fd3e1ac51518b669f75d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE;
import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE_FREQUENCY;
import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LoadFuncDecorator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
* A wrapper around the actual RecordReader and loadfunc - this is needed for
* two reasons
* 1) To intercept the initialize call from hadoop and initialize the underlying
* actual RecordReader with the right Context object - this is achieved by
* looking up the Context corresponding to the input split this Reader is
* supposed to process
* 2) We need to give hadoop consistent key-value types - text and tuple
* respectively - so PigRecordReader will call underlying Loader's getNext() to
* get the Tuple value - the key is null text since key is not used in input to
* map() in Pig.
*/
public class PigRecordReader extends RecordReader<Text, Tuple> {
private static final Log LOG = LogFactory.getLog(PigRecordReader.class);
transient private String counterGroup = "";
private long timingFrequency = 100L;
private boolean doTiming = false;
/**
* the current Tuple value as returned by underlying
* {@link LoadFunc#getNext()}
*/
Tuple curValue = null;
// the current wrapped RecordReader used by the loader
private RecordReader<?, ?> curReader;
// the loader object
private LoadFunc loadfunc;
// the LoadFuncDecorator
private LoadFuncDecorator decorator;
// the Hadoop counter name
transient private String counterName = null;
// the wrapped inputformat
private InputFormat<?, ?> inputformat;
// the wrapped splits
private PigSplit pigSplit;
// the wrapped split index in use
private int idx;
private long progress;
private TaskAttemptContext context;
private PigStatusReporter reporter;
private final long limit;
private long recordCount = 0;
/**
* the Configuration object with data specific to the input the underlying
* RecordReader will process (this is obtained after a
* {@link LoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)}
* call and hence can contain specific properties the underlying
* {@link InputFormat} might have put in.
*/
private Configuration inputSpecificConf;
/**
* @param context
*
*/
public PigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit,
LoadFuncDecorator decorator, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
this.inputformat = inputformat;
this.pigSplit = pigSplit;
this.decorator = decorator;
this.loadfunc = decorator.getLoader();
this.context = context;
this.reporter = PigStatusReporter.getInstance();
this.inputSpecificConf = context.getConfiguration();
curReader = null;
progress = 0;
idx = 0;
this.limit = limit;
initNextRecordReader();
doTiming = inputSpecificConf.getBoolean(PIG_UDF_PROFILE, false);
if (doTiming) {
counterGroup = loadfunc.toString();
timingFrequency = inputSpecificConf.getLong(PIG_UDF_PROFILE_FREQUENCY, 100L);
}
}
@Override
public void close() throws IOException {
if (curReader != null) {
curReader.close();
curReader = null;
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// In pig we don't really use the key in the input to the map - so send
// null
return null;
}
@Override
public Tuple getCurrentValue() throws IOException, InterruptedException {
// Increment the multi-input record counter
if (counterName != null && curValue != null) {
reporter.incrCounter(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP, counterName, 1);
}
return curValue;
}
@Override
public float getProgress() throws IOException, InterruptedException {
long subprogress = 0; // bytes processed in current split
if (null != curReader) {
// idx is always one past the current subsplit's true index.
subprogress = (long)(curReader.getProgress() * pigSplit.getLength(idx - 1));
}
return Math.max(0.0f, Math.min(1.0f, (progress + subprogress)/(float)(pigSplit.getLength())));
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// initialize the underlying actual RecordReader with the right Context
// object - this is achieved by merging the Context corresponding to
// the input split this Reader is supposed to process with the context
// passed in.
this.pigSplit = (PigSplit)split;
this.context = context;
ConfigurationUtil.mergeConf(context.getConfiguration(),
inputSpecificConf);
// Pass loader signature to LoadFunc and to InputFormat through
// the conf
PigInputFormat.passLoadSignature(loadfunc, pigSplit.getInputIndex(),
context.getConfiguration());
// now invoke initialize() on underlying RecordReader with
// the "adjusted" conf
if (null != curReader) {
curReader.initialize(pigSplit.getWrappedSplit(), context);
loadfunc.prepareToRead(curReader, pigSplit);
}
if (pigSplit.isMultiInputs() && !pigSplit.disableCounter()) {
counterName = getMultiInputsCounerName(pigSplit, inputSpecificConf);
if (counterName != null) {
// Create the counter. This is needed because incrCounter() may
// never be called in case of empty file.
reporter.incrCounter(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP, counterName, 0);
}
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (limit != -1 && recordCount >= limit)
return false;
boolean timeThis = doTiming && ( (recordCount + 1) % timingFrequency == 0);
long startNanos = 0;
if (timeThis) {
startNanos = System.nanoTime();
}
while ((curReader == null) || (curValue = decorator.getNext()) == null) {
if (!initNextRecordReader()) {
return false;
}
}
if (timeThis) {
reporter.incrCounter(counterGroup, TIME_UDFS_ELAPSED_TIME_COUNTER,
Math.round((System.nanoTime() - startNanos) / 1000) * timingFrequency);
}
recordCount++;
return true;
}
@SuppressWarnings("unchecked")
private static String getMultiInputsCounerName(PigSplit pigSplit,
Configuration conf) throws IOException {
ArrayList<POLoad> inputs =
(ArrayList<POLoad>) ObjectSerializer.deserialize(
conf.get(PigInputFormat.PIG_LOADS));
String fname = inputs.get(pigSplit.getInputIndex()).getLFile().getFileName();
return PigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
}
/**
* Get the record reader for the next chunk in this CombineFileSplit.
*/
protected boolean initNextRecordReader() throws IOException, InterruptedException {
if (curReader != null) {
curReader.close();
curReader = null;
if (idx > 0) {
progress += pigSplit.getLength(idx-1); // done processing so far
}
context.progress();
}
// if all chunks have been processed, nothing more to do.
if (idx == pigSplit.getNumPaths()) {
return false;
}
// get a record reader for the idx-th chunk
try {
pigSplit.setCurrentIdx(idx);
curReader = inputformat.createRecordReader(pigSplit.getWrappedSplit(), context);
LOG.info("Current split being processed "+pigSplit.getWrappedSplit());
if (idx > 0) {
// initialize() for the first RecordReader will be called by MapTask;
// we're responsible for initializing subsequent RecordReaders.
curReader.initialize(pigSplit.getWrappedSplit(), context);
loadfunc.prepareToRead(curReader, pigSplit);
}
} catch (Exception e) {
throw new RuntimeException (e);
}
idx++;
return true;
}
}