blob: 63a3ce33bfcac4bbdca63e69e541caac4d549fe5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.pig.PigConfiguration;
import org.apache.pig.data.WritableByteArray;
import org.apache.pig.impl.plan.OperatorKey;
/**
* The main split class that maintains important
* information about the input split.
*
* The reason this class implements Configurable is so that Hadoop will call
* {@link Configurable#setConf(Configuration)} on the backend so we can use
* the Configuration to create the SerializationFactory to deserialize the
* wrapped InputSplit.
*/
public class PigSplit extends InputSplit implements Writable, Configurable {
private static String FILESPLIT_CLASSNAME = FileSplit.class.getName();
//The operators to which the tuples from this
//input file are attached. These are the successors
//of the load operator representing this input
private ArrayList<OperatorKey> targetOps;
// index starting from 0 representing the input number
// So if we have 3 inputs (say for a 3 way join), then the
// splits corresponding to the first input will have an index of 0, those
// corresponding to the second will have an index of 1 and so on
// This will be used to get the LoadFunc corresponding to the input
// in PigInputFormat and related code.
private int inputIndex;
// The real InputSplit this split is wrapping
private InputSplit[] wrappedSplits;
// index of the wrappedSplit in the list of splits returned by
// InputFormat.getSplits()
// This will be used by MergeJoinIndexer to record the split # in the
// index
private int splitIndex;
// index of current splits being process
private int currentIdx;
// the flag indicates this is a multi-input join (i.e. join)
// so that custom Hadoop counters will be created in the
// back-end to track the number of records for each input.
private boolean isMultiInputs = false;
// the flag indicates the custom Hadoop counter should be disabled.
// This is to prevent the number of counters exceeding the limit.
// This flag is controlled by Pig property "pig.disable.counter" (
// the default value is 'false').
private boolean disableCounter = false;
/**
* the job Configuration
*/
private Configuration conf;
/**
* total number of splits - required by skew join
*/
private int totalSplits;
/**
* total length
*/
private long length = -1;
/**
* overall locations
*/
String[] locations = null;
/**
* overall splitLocationInfos
*/
SplitLocationInfo[] splitLocationInfos = null;
// this seems necessary for Hadoop to instatiate this split on the
// backend
public PigSplit() {}
public PigSplit(InputSplit[] wrappedSplits, int inputIndex,
List<OperatorKey> targetOps, int splitIndex) {
this.wrappedSplits = wrappedSplits;
this.inputIndex = inputIndex;
this.targetOps = new ArrayList<OperatorKey>(targetOps);
this.splitIndex = splitIndex;
this.currentIdx = 0;
}
public List<OperatorKey> getTargetOps() {
return new ArrayList<OperatorKey>(targetOps);
}
/**
* This methods returns the actual InputSplit (as returned by the
* {@link InputFormat}) which this class is wrapping.
* @return the wrappedSplit
*/
public InputSplit getWrappedSplit() {
return wrappedSplits[currentIdx];
}
/**
*
* @param idx the index into the wrapped splits
* @return the specified wrapped split
*/
public InputSplit getWrappedSplit(int idx) {
return wrappedSplits[idx];
}
@Override
@SuppressWarnings("unchecked")
public String[] getLocations() throws IOException, InterruptedException {
if (locations == null) {
HashMap<String, Long> locMap = new HashMap<String, Long>();
Long lenInMap;
for (InputSplit split : wrappedSplits)
{
String[] locs = split.getLocations();
for (String loc : locs)
{
if ((lenInMap = locMap.get(loc)) == null)
locMap.put(loc, split.getLength());
else
locMap.put(loc, lenInMap + split.getLength());
}
}
Set<Map.Entry<String, Long>> entrySet = locMap.entrySet();
Map.Entry<String, Long>[] hostSize =
entrySet.toArray(new Map.Entry[entrySet.size()]);
Arrays.sort(hostSize, new Comparator<Map.Entry<String, Long>>() {
@Override
public int compare(Entry<String, Long> o1, Entry<String, Long> o2) {
long diff = o1.getValue() - o2.getValue();
if (diff < 0) return 1;
if (diff > 0) return -1;
return 0;
}
});
// maximum 5 locations are in list: refer to PIG-1648 for more details
int nHost = Math.min(hostSize.length, 5);
locations = new String[nHost];
for (int i = 0; i < nHost; ++i) {
locations[i] = hostSize[i].getKey();
}
}
return locations;
}
@Override
public SplitLocationInfo[] getLocationInfo() throws IOException {
if (splitLocationInfos == null) {
HashMap<SplitLocationInfo, Long> locMap = new HashMap<SplitLocationInfo, Long>();
Long lenInMap;
for (InputSplit split : wrappedSplits) {
SplitLocationInfo[] locs = split.getLocationInfo();
if( locs != null) {
for (SplitLocationInfo loc : locs) {
try {
if ((lenInMap = locMap.get(loc)) == null)
locMap.put(loc, split.getLength());
else
locMap.put(loc, lenInMap + split.getLength());
} catch (InterruptedException e) {
throw new IOException("InputSplit.getLength throws exception: ", e);
}
}
}
}
Set<Map.Entry<SplitLocationInfo, Long>> entrySet = locMap.entrySet();
Map.Entry<SplitLocationInfo, Long>[] hostSize =
entrySet.toArray(new Map.Entry[entrySet.size()]);
Arrays.sort(hostSize, new Comparator<Map.Entry<SplitLocationInfo, Long>>() {
@Override
public int compare(Entry<SplitLocationInfo, Long> o1, Entry<SplitLocationInfo, Long> o2) {
long diff = o1.getValue() - o2.getValue();
if (diff < 0) return 1;
if (diff > 0) return -1;
return 0;
}
});
// maximum 5 locations are in list: refer to PIG-1648 for more details
int nHost = Math.min(hostSize.length, 5);
splitLocationInfos = new SplitLocationInfo[nHost];
for (int i = 0; i < nHost; ++i) {
splitLocationInfos[i] = hostSize[i].getKey();
}
}
return splitLocationInfos;
}
@Override
public long getLength() throws IOException, InterruptedException {
if (length == -1) {
length = 0;
for (int i = 0; i < wrappedSplits.length; i++)
length += wrappedSplits[i].getLength();
}
return length;
}
/**
* Return the length of a wrapped split
* @param idx the index into the wrapped splits
* @return number of wrapped splits
*/
public long getLength(int idx) throws IOException, InterruptedException {
return wrappedSplits[idx].getLength();
}
@Override
@SuppressWarnings("unchecked")
public void readFields(DataInput is) throws IOException {
disableCounter = is.readBoolean();
isMultiInputs = is.readBoolean();
totalSplits = is.readInt();
splitIndex = is.readInt();
inputIndex = is.readInt();
targetOps = (ArrayList<OperatorKey>) readObject(is);
int splitLen = is.readInt();
int distinctSplitClassCount = is.readInt();
boolean nonFileSplit = false;
//construct the input split class name list
String[] distinctSplitClassName = new String[distinctSplitClassCount];
for (int i = 0; i < distinctSplitClassCount; i++) {
distinctSplitClassName[i] = is.readUTF();
if (!distinctSplitClassName[i].equals(FILESPLIT_CLASSNAME)) {
nonFileSplit = true;
}
}
try {
SerializationFactory sf = new SerializationFactory(conf);
// The correct call sequence for Deserializer is, we shall open, then deserialize, but we shall not close
wrappedSplits = new InputSplit[splitLen];
if (splitLen <= 0) {
return;
}
// Do not compress if everything is FileSplit as it does not compress much
// but adds few seconds for 30K+ tasks
boolean compress = nonFileSplit && conf.getBoolean(
PigConfiguration.PIG_COMPRESS_INPUT_SPLITS,
PigConfiguration.PIG_COMPRESS_INPUT_SPLITS_DEFAULT);
DataInputStream dis = null;
if (compress) {
int numBytes = is.readInt();
byte[] buf = new byte[numBytes];
is.readFully(buf, 0, numBytes);
dis = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(buf)));
}
DataInput dataIn = compress ? dis : is;
for (int i = 0; i < splitLen; i++)
{
//read the className index
int index = dataIn.readInt();
//get the split class name
String splitClassName = distinctSplitClassName[index];
Class splitClass = conf.getClassByName(splitClassName);
Deserializer d = sf.getDeserializer(splitClass);
d.open((InputStream) dataIn);
wrappedSplits[i] = (InputSplit)ReflectionUtils.newInstance(splitClass, conf);
d.deserialize(wrappedSplits[i]);
}
if (compress && splitLen > 0) {
dis.close();
}
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
@Override
@SuppressWarnings("unchecked")
public void write(DataOutput os) throws IOException {
os.writeBoolean(disableCounter);
os.writeBoolean(isMultiInputs);
os.writeInt(totalSplits);
os.writeInt(splitIndex);
os.writeInt(inputIndex);
writeObject(targetOps, os);
os.writeInt(wrappedSplits.length);
Set<String> splitClassNameSet = new HashSet<String>();
//first get the distinct split class name set
for ( int i= 0; i < wrappedSplits.length; i++) {
splitClassNameSet.add(wrappedSplits[i].getClass().getName());
}
List<String> distinctSplitClassList = new ArrayList<String>();
distinctSplitClassList.addAll(splitClassNameSet);
boolean nonFileSplit = distinctSplitClassList.size() > 1 || (!distinctSplitClassList.contains(FILESPLIT_CLASSNAME));
//write the distinct number of split class name
os.writeInt(distinctSplitClassList.size());
//write each classname once
for (int i = 0 ; i < distinctSplitClassList.size(); i++) {
os.writeUTF(distinctSplitClassList.get(i));
}
SerializationFactory sf = new SerializationFactory(conf);
if (wrappedSplits.length <= 0) {
return;
}
boolean compress = nonFileSplit && conf.getBoolean(
PigConfiguration.PIG_COMPRESS_INPUT_SPLITS,
PigConfiguration.PIG_COMPRESS_INPUT_SPLITS_DEFAULT);
WritableByteArray byteStream = null;
Deflater deflater = null;
DataOutputStream dos = null;
if (compress) {
byteStream = new WritableByteArray(16384);
deflater = new Deflater(Deflater.BEST_COMPRESSION);
dos = new DataOutputStream(new DeflaterOutputStream(byteStream, deflater));
}
DataOutput dataOut = compress ? dos : os;
for (int i = 0; i < wrappedSplits.length; i++)
{
//find out the index of the split class name
int index = distinctSplitClassList.indexOf(wrappedSplits[i].getClass().getName());
dataOut.writeInt(index);
Serializer s = sf.getSerializer(wrappedSplits[i].getClass());
//Checks if Serializer is NULL or not before calling open() method on it.
if (s == null) {
throw new IllegalArgumentException("Could not find Serializer for class "+wrappedSplits[i].getClass()+". InputSplits must implement Writable.");
}
s.open((OutputStream) dataOut);
// The correct call sequence for Serializer is, we shall open, then serialize, but we shall not close
s.serialize(wrappedSplits[i]);
}
if (compress) {
//Get the compressed serialized bytes and write them
dos.close();
os.writeInt(byteStream.getLength());
os.write(byteStream.getData(), 0, byteStream.getLength());
deflater.end();
}
}
private void writeObject(Serializable obj, DataOutput os)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(obj);
oos.flush();
byte[] bytes = baos.toByteArray();
os.writeInt(bytes.length);
os.write(bytes);
}
private Object readObject(DataInput is) throws IOException {
byte[] bytes = new byte[is.readInt()];
is.readFully(bytes);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
bytes));
try {
return ois.readObject();
} catch (ClassNotFoundException cnfe) {
IOException newE = new IOException(cnfe.getMessage());
newE.initCause(cnfe);
throw newE;
}
}
public int getSplitIndex() {
return splitIndex;
}
/**
* Indicates this map has multiple input (such as the result of
* a join operation).
* @param b true if the map has multiple inputs
*/
public void setMultiInputs(boolean b) {
isMultiInputs = b;
}
/**
* Returns true if the map has multiple inputs, else false
* @return true if the map has multiple inputs, else false
*/
public boolean isMultiInputs() {
return isMultiInputs;
}
@Override
public Configuration getConf() {
return conf;
}
/** (non-Javadoc)
* @see org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)
*
* This will be called by
* {@link PigInputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}
* to be used in {@link #write(DataOutput)} for serializing the
* wrappedSplit
*
* This will be called by Hadoop in the backend to set the right Job
* Configuration (hadoop will invoke this method because PigSplit implements
* {@link Configurable} - we need this Configuration in readFields() to
* deserialize the wrappedSplit
*/
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
// package level access because we don't want LoadFunc implementations
// to get this information - this is to be used only from
// PigInputFormat
int getInputIndex() {
return inputIndex;
}
/**
*
* @return the number of wrapped splits
*/
public int getNumPaths() {
return wrappedSplits.length;
}
/**
* @return the totalSplits
* package level access because we don't want LoadFunc implementations
* to get this information - this is to be used only from
* PigInputFormat
*/
int getTotalSplits() {
return totalSplits;
}
/**
* @param totalSplits the totalSplits to set
* package level access because we don't want LoadFunc implementations
* to get this information - this is to be used only from
* PigInputFormat
*/
void setTotalSplits(int totalSplits) {
this.totalSplits = totalSplits;
}
@Override
public String toString() {
StringBuilder st = new StringBuilder();
st.append("Number of splits :" + wrappedSplits.length+"\n");
try {
st.append("Total Length = "+ getLength()+"\n");
for (int i = 0; i < wrappedSplits.length; i++) {
st.append("Input split["+i+"]:\n Length = "+ wrappedSplits[i].getLength()+"\n ClassName: " +
wrappedSplits[i].getClass().getName() + "\n Locations:\n");
if (wrappedSplits[i]!=null && wrappedSplits[i].getLocations()!=null) {
for (String location : wrappedSplits[i].getLocations())
st.append(" "+location+"\n");
st.append("\n-----------------------\n");
}
}
} catch (IOException e) {
return null;
} catch (InterruptedException e) {
return null;
}
return st.toString();
}
public void setDisableCounter(boolean disableCounter) {
this.disableCounter = disableCounter;
}
public boolean disableCounter() {
return disableCounter;
}
public void setCurrentIdx(int idx) {
this.currentIdx = idx;
}
}