| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.backend.hadoop.executionengine.util; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.pig.FuncSpec; |
| import org.apache.pig.PigConfiguration; |
| import org.apache.pig.PigException; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; |
| import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.builtin.PartitionSkewedKeys; |
| import org.apache.pig.impl.io.FileLocalizer; |
| import org.apache.pig.impl.io.FileSpec; |
| import org.apache.pig.impl.io.ReadToEndLoader; |
| import org.apache.pig.impl.plan.NodeIdGenerator; |
| import org.apache.pig.impl.plan.OperatorKey; |
| import org.apache.pig.impl.util.Pair; |
| import org.apache.pig.impl.util.UDFContext; |
| import org.apache.pig.impl.util.Utils; |
| |
| /** |
| * A class of utility static methods to be used in the hadoop map reduce backend |
| */ |
| public class MapRedUtil { |
| |
| private static Log log = LogFactory.getLog(MapRedUtil.class); |
| private static final TupleFactory tf = TupleFactory.getInstance(); |
| |
| public static final String FILE_SYSTEM_NAME = FileSystem.FS_DEFAULT_NAME_KEY; |
| |
| /** |
| * Loads the key distribution sampler file |
| * |
| * @param keyDistFile the name for the distribution file |
| * @param totalReducers gets set to the total number of reducers as found in the dist file |
| * @param keyType Type of the key to be stored in the return map. It currently treats Tuple as a special case. |
| */ |
| @SuppressWarnings("unchecked") |
| public static <E> Map<E, Pair<Integer, Integer>> loadPartitionFileFromLocalCache( |
| String keyDistFile, Integer[] totalReducers, byte keyType, Configuration mapConf) |
| throws IOException { |
| |
| Map<E, Pair<Integer, Integer>> reducerMap = new HashMap<E, Pair<Integer, Integer>>(); |
| |
| // use local file system to get the keyDistFile |
| Configuration conf = new Configuration(false); |
| |
| if (mapConf.get("yarn.resourcemanager.principal")!=null) { |
| conf.set("yarn.resourcemanager.principal", mapConf.get("yarn.resourcemanager.principal")); |
| } |
| |
| if (mapConf.get("fs.file.impl")!=null) |
| conf.set("fs.file.impl", mapConf.get("fs.file.impl")); |
| if (mapConf.get("fs.hdfs.impl")!=null) |
| conf.set("fs.hdfs.impl", mapConf.get("fs.hdfs.impl")); |
| |
| copyTmpFileConfigurationValues(PigMapReduce.sJobConfInternal.get(), conf); |
| |
| conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); |
| |
| ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(PigMapReduce.sJobConfInternal.get()), conf, |
| keyDistFile, 0); |
| DataBag partitionList; |
| Tuple t = loader.getNext(); |
| if (t == null) { |
| // this could happen if the input directory for sampling is empty |
| log.warn("Empty dist file: " + keyDistFile); |
| return reducerMap; |
| } |
| // The keydist file is structured as (key, min, max) |
| // min, max being the index of the reducers |
| Map<String, Object > distMap = (Map<String, Object>) t.get (0); |
| partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST); |
| totalReducers[0] = Integer.valueOf(""+distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS)); |
| Iterator<Tuple> it = partitionList.iterator(); |
| while (it.hasNext()) { |
| Tuple idxTuple = it.next(); |
| Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1); |
| Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2); |
| // Used to replace the maxIndex with the number of reducers |
| if (maxIndex < minIndex) { |
| maxIndex = totalReducers[0] + maxIndex; |
| } |
| E keyT; |
| |
| // if the join is on more than 1 key |
| if (idxTuple.size() > 3) { |
| // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store |
| // it in the reducer map |
| Tuple keyTuple = tf.newTuple(); |
| for (int i=0; i < idxTuple.size() - 2; i++) { |
| keyTuple.append(idxTuple.get(i)); |
| } |
| keyT = (E) keyTuple; |
| } else { |
| if (keyType == DataType.TUPLE) { |
| keyT = (E)tf.newTuple(1); |
| ((Tuple)keyT).set(0,idxTuple.get(0)); |
| } else { |
| keyT = (E) idxTuple.get(0); |
| } |
| } |
| // number of reducers |
| Integer cnt = maxIndex - minIndex; |
| reducerMap.put(keyT, new Pair(minIndex, cnt));// 1 is added to account for the 0 index |
| } |
| return reducerMap; |
| } |
| |
| public static void copyTmpFileConfigurationValues(Configuration fromConf, Configuration toConf) { |
| // Currently these are used only by loaders (and not storers), so we do not need to copy |
| // mapred properties that are required by @{Link SequenceFileInterStorage} |
| |
| if (fromConf.getBoolean(PigConfiguration.PIG_ENABLE_TEMP_FILE_COMPRESSION, false)) { |
| toConf.setBoolean(PigConfiguration.PIG_ENABLE_TEMP_FILE_COMPRESSION, true); |
| if (fromConf.get(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC) != null) { |
| toConf.set(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC, |
| fromConf.get(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC)); |
| } |
| if (fromConf.get(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE) != null) { |
| toConf.set(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE, |
| fromConf.get(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE)); |
| } |
| } |
| } |
| |
| public static void setupUDFContext(Configuration job) throws IOException { |
| UDFContext udfc = UDFContext.getUDFContext(); |
| udfc.addJobConf(job); |
| // don't deserialize in front-end |
| if (udfc.isUDFConfEmpty()) { |
| udfc.deserialize(); |
| } |
| } |
| |
| /** |
| * Sets up output and log dir paths for a single-store streaming job |
| * |
| * @param st - POStore of the current job |
| * @param pigContext |
| * @param conf |
| * @throws IOException |
| */ |
| public static void setupStreamingDirsConfSingle(POStore st, PigContext pigContext, |
| Configuration conf) throws IOException { |
| // set out filespecs |
| String outputPathString = st.getSFile().getFileName(); |
| if (HadoopShims.hasFileSystemImpl(new Path(outputPathString), conf)) { |
| conf.set("pig.streaming.log.dir", |
| new Path(outputPathString, JobControlCompiler.LOG_DIR).toString()); |
| } |
| else { |
| String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString(); |
| Path tmpLocation = new Path(tmpLocationStr); |
| conf.set("pig.streaming.log.dir", |
| new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString()); |
| } |
| conf.set("pig.streaming.task.output.dir", outputPathString); |
| } |
| |
| /** |
| * Sets up output and log dir paths for a multi-store streaming job |
| * |
| * @param pigContext |
| * @param conf |
| * @throws IOException |
| */ |
| public static void setupStreamingDirsConfMulti(PigContext pigContext, Configuration conf) |
| throws IOException { |
| |
| String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString(); |
| Path tmpLocation = new Path(tmpLocationStr); |
| conf.set("pig.streaming.log.dir", |
| new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString()); |
| conf.set("pig.streaming.task.output.dir", tmpLocation.toString()); |
| } |
| |
| public static FileSpec checkLeafIsStore( |
| PhysicalPlan plan, |
| PigContext pigContext) throws ExecException { |
| try { |
| PhysicalOperator leaf = plan.getLeaves().get(0); |
| FileSpec spec = null; |
| if(!(leaf instanceof POStore)){ |
| String scope = leaf.getOperatorKey().getScope(); |
| POStore str = new POStore(new OperatorKey(scope, |
| NodeIdGenerator.getGenerator().getNextNodeId(scope))); |
| spec = new FileSpec(FileLocalizer.getTemporaryPath( |
| pigContext).toString(), |
| new FuncSpec(Utils.getTmpFileCompressorName(pigContext))); |
| str.setSFile(spec); |
| plan.addAsLeaf(str); |
| } else{ |
| spec = ((POStore)leaf).getSFile(); |
| } |
| return spec; |
| } catch (Exception e) { |
| int errCode = 2045; |
| String msg = "Internal error. Not able to check if the leaf node is a store operator."; |
| throw new ExecException(msg, errCode, PigException.BUG, e); |
| } |
| } |
| |
| /** |
| * Get all files recursively from the given list of files |
| * |
| * @param files a list of FileStatus |
| * @param conf the configuration object |
| * @return the list of fileStatus that contains all the files in the given |
| * list and, recursively, all the files inside the directories in |
| * the given list |
| * @throws IOException |
| */ |
| public static List<FileStatus> getAllFileRecursively( |
| List<FileStatus> files, Configuration conf) throws IOException { |
| List<FileStatus> result = new ArrayList<FileStatus>(); |
| int len = files.size(); |
| for (int i = 0; i < len; ++i) { |
| FileStatus file = files.get(i); |
| if (file.isDir()) { |
| Path p = file.getPath(); |
| FileSystem fs = p.getFileSystem(conf); |
| addInputPathRecursively(result, fs, p, hiddenFileFilter); |
| } else { |
| result.add(file); |
| } |
| } |
| log.info("Total input paths to process : " + result.size()); |
| return result; |
| } |
| |
| private static void addInputPathRecursively(List<FileStatus> result, |
| FileSystem fs, Path path, PathFilter inputFilter) |
| throws IOException { |
| for (FileStatus stat: fs.listStatus(path, inputFilter)) { |
| if (stat.isDir()) { |
| addInputPathRecursively(result, fs, stat.getPath(), inputFilter); |
| } else { |
| result.add(stat); |
| } |
| } |
| } |
| |
| private static final PathFilter hiddenFileFilter = new PathFilter(){ |
| @Override |
| public boolean accept(Path p){ |
| String name = p.getName(); |
| return !name.startsWith("_") && !name.startsWith("."); |
| } |
| }; |
| |
| public static long getPathLength(FileSystem fs, FileStatus status) |
| throws IOException{ |
| return getPathLength(fs, status, Long.MAX_VALUE); |
| } |
| |
| /** |
| * Returns the total number of bytes for this file, or if a directory all |
| * files in the directory. |
| * |
| * @param fs FileSystem |
| * @param status FileStatus |
| * @param max Maximum value of total length that will trigger exit. Many |
| * times we're only interested whether the total length of files is greater |
| * than X or not. In such case, we can exit the function early as soon as |
| * the max is reached. |
| * @return |
| * @throws IOException |
| */ |
| public static long getPathLength(FileSystem fs, FileStatus status, long max) |
| throws IOException { |
| if (!status.isDir()) { |
| return status.getLen(); |
| } else { |
| FileStatus[] children = fs.listStatus( |
| status.getPath(), hiddenFileFilter); |
| long size = 0; |
| for (FileStatus child : children) { |
| size += getPathLength(fs, child, max); |
| if (size > max) return size; |
| } |
| return size; |
| } |
| } |
| |
| /* The following codes are for split combination: see PIG-1518 |
| * |
| */ |
| private static Comparator<Node> nodeComparator = new Comparator<Node>() { |
| @Override |
| public int compare(Node o1, Node o2) { |
| long cmp = o1.length - o2.length; |
| return cmp == 0 ? 0 : cmp < 0 ? -1 : 1; |
| } |
| }; |
| |
| private static final class ComparableSplit implements Comparable<ComparableSplit> { |
| private InputSplit rawInputSplit; |
| private HashSet<Node> nodes; |
| // id used as a tie-breaker when two splits are of equal size. |
| private long id; |
| ComparableSplit(InputSplit split, long id) { |
| rawInputSplit = split; |
| nodes = new HashSet<Node>(); |
| this.id = id; |
| } |
| |
| void add(Node node) { |
| nodes.add(node); |
| } |
| |
| void removeFromNodes() { |
| for (Node node : nodes) |
| node.remove(this); |
| } |
| |
| public InputSplit getSplit() { |
| return rawInputSplit; |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (other == null || !(other instanceof ComparableSplit)) |
| return false; |
| return (compareTo((ComparableSplit) other) == 0); |
| } |
| |
| @Override |
| public int hashCode() { |
| return 41; |
| } |
| |
| @Override |
| public int compareTo(ComparableSplit other) { |
| try { |
| long cmp = rawInputSplit.getLength() - other.rawInputSplit.getLength(); |
| // in descending order |
| return cmp == 0 ? (id == other.id ? 0 : id < other.id ? -1 : 1) : cmp < 0 ? 1 : -1; |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| private static class DummySplit extends InputSplit { |
| private long length; |
| |
| @Override |
| public String[] getLocations() { |
| return null; |
| } |
| |
| @Override |
| public long getLength() { |
| return length; |
| } |
| |
| public void setLength(long length) { |
| this.length = length; |
| } |
| } |
| |
| private static class Node { |
| private long length = 0; |
| private ArrayList<ComparableSplit> splits; |
| private boolean sorted; |
| |
| Node() throws IOException, InterruptedException { |
| length = 0; |
| splits = new ArrayList<ComparableSplit>(); |
| sorted = false; |
| } |
| |
| void add(ComparableSplit split) throws IOException, InterruptedException { |
| splits.add(split); |
| length++; |
| } |
| |
| void remove(ComparableSplit split) { |
| if (!sorted) |
| sort(); |
| int index = Collections.binarySearch(splits, split); |
| if (index >= 0) { |
| splits.remove(index); |
| length--; |
| } |
| } |
| |
| void sort() { |
| if (!sorted) { |
| Collections.sort(splits); |
| sorted = true; |
| } |
| } |
| |
| ArrayList<ComparableSplit> getSplits() { |
| return splits; |
| } |
| |
| public long getLength() { |
| return length; |
| } |
| } |
| |
| public static List<List<InputSplit>> getCombinePigSplits(List<InputSplit> |
| oneInputSplits, long maxCombinedSplitSize, Configuration conf) |
| throws IOException, InterruptedException { |
| ArrayList<Node> nodes = new ArrayList<Node>(); |
| HashMap<String, Node> nodeMap = new HashMap<String, Node>(); |
| List<List<InputSplit>> result = new ArrayList<List<InputSplit>>(); |
| List<Long> resultLengths = new ArrayList<Long>(); |
| long comparableSplitId = 0; |
| |
| int size = 0, nSplits = oneInputSplits.size(); |
| InputSplit lastSplit = null; |
| int emptyCnt = 0; |
| for (InputSplit split : oneInputSplits) { |
| if (split.getLength() == 0) { |
| emptyCnt++; |
| continue; |
| } |
| if (split.getLength() >= maxCombinedSplitSize) { |
| comparableSplitId++; |
| ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>(); |
| combinedSplits.add(split); |
| result.add(combinedSplits); |
| resultLengths.add(split.getLength()); |
| } else { |
| String[] locations = split.getLocations(); |
| if (locations.length == 0) { |
| // This split is missing blocks, or the split returned bad locations. |
| // Don't try to combine. |
| comparableSplitId++; |
| ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>(); |
| combinedSplits.add(split); |
| result.add(combinedSplits); |
| resultLengths.add(split.getLength()); |
| } else { |
| ComparableSplit csplit = new ComparableSplit(split, comparableSplitId++); |
| // sort the locations to stabilize the number of maps: PIG-1757 |
| Arrays.sort(locations); |
| HashSet<String> locationSeen = new HashSet<String>(); |
| for (String location : locations) |
| { |
| if (!locationSeen.contains(location)) |
| { |
| Node node = nodeMap.get(location); |
| if (node == null) { |
| node = new Node(); |
| nodes.add(node); |
| nodeMap.put(location, node); |
| } |
| node.add(csplit); |
| csplit.add(node); |
| locationSeen.add(location); |
| } |
| } |
| lastSplit = split; |
| size++; |
| } |
| } |
| } |
| /* verification code: debug purpose |
| { |
| ArrayList<ComparableSplit> leftoverSplits = new ArrayList<ComparableSplit>(); |
| HashSet<InputSplit> seen = new HashSet<InputSplit>(); |
| for (Node node : nodes) { |
| if (node.getLength() > 0) |
| { |
| ArrayList<ComparableSplit> splits = node.getSplits(); |
| for (ComparableSplit split : splits) { |
| if (!seen.contains(split.getSplit())) { |
| // remove duplicates. The set has to be on the raw input split not the |
| // comparable input split as the latter overrides the compareTo method |
| // so its equality semantics is changed and not we want here |
| seen.add(split.getSplit()); |
| leftoverSplits.add(split); |
| } |
| } |
| } |
| } |
| |
| int combinedSplitLen = 0; |
| for (PigSplit split : result) |
| combinedSplitLen += split.getNumPaths(); |
| if (combinedSplitLen + leftoverSplits.size()!= nSplits-emptyCnt) { |
| throw new AssertionError("number of combined splits {"+combinedSplitLen+"+"+leftoverSplits.size()+"-"+size+"} does not match the number of original splits ["+nSplits+"]."); |
| } |
| } |
| */ |
| if (nSplits > 0 && emptyCnt == nSplits) |
| { |
| // if all splits are empty, add a single empty split as currently an empty directory is |
| // not properly handled somewhere |
| ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>(); |
| combinedSplits.add(oneInputSplits.get(0)); |
| result.add(combinedSplits); |
| } |
| else if (size == 1) { |
| ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>(); |
| combinedSplits.add(lastSplit); |
| result.add(combinedSplits); |
| } else if (size > 1) { |
| // combine small splits |
| Collections.sort(nodes, nodeComparator); |
| DummySplit dummy = new DummySplit(); |
| // dummy is used to search for next split of suitable size to be combined |
| ComparableSplit dummyComparableSplit = new ComparableSplit(dummy, -1); |
| for (Node node : nodes) { |
| // sort the splits on this node in descending order |
| node.sort(); |
| long totalSize = 0; |
| ArrayList<ComparableSplit> splits = node.getSplits(); |
| int idx; |
| int lenSplits; |
| ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>(); |
| ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>(); |
| while (!splits.isEmpty()) { |
| combinedSplits.add(splits.get(0).getSplit()); |
| combinedComparableSplits.add(splits.get(0)); |
| int startIdx = 1; |
| lenSplits = splits.size(); |
| totalSize += splits.get(0).getSplit().getLength(); |
| long spaceLeft = maxCombinedSplitSize - totalSize; |
| dummy.setLength(spaceLeft); |
| idx = Collections.binarySearch(node.getSplits().subList(startIdx, lenSplits), dummyComparableSplit); |
| idx = -idx-1+startIdx; |
| while (idx < lenSplits) |
| { |
| long thisLen = splits.get(idx).getSplit().getLength(); |
| combinedSplits.add(splits.get(idx).getSplit()); |
| combinedComparableSplits.add(splits.get(idx)); |
| totalSize += thisLen; |
| spaceLeft -= thisLen; |
| if (spaceLeft <= 0) |
| break; |
| // find next combinable chunk |
| startIdx = idx + 1; |
| if (startIdx >= lenSplits) |
| break; |
| dummy.setLength(spaceLeft); |
| idx = Collections.binarySearch(node.getSplits().subList(startIdx, lenSplits), dummyComparableSplit); |
| idx = -idx-1+startIdx; |
| } |
| if (totalSize > maxCombinedSplitSize/2) { |
| result.add(combinedSplits); |
| resultLengths.add(totalSize); |
| removeSplits(combinedComparableSplits); |
| totalSize = 0; |
| combinedSplits = new ArrayList<InputSplit>(); |
| combinedComparableSplits.clear(); |
| splits = node.getSplits(); |
| } else { |
| if (combinedSplits.size() != lenSplits) |
| throw new AssertionError("Combined split logic error!"); |
| break; |
| } |
| } |
| } |
| // handle leftovers |
| ArrayList<ComparableSplit> leftoverSplits = new ArrayList<ComparableSplit>(); |
| HashSet<InputSplit> seen = new HashSet<InputSplit>(); |
| for (Node node : nodes) { |
| for (ComparableSplit split : node.getSplits()) { |
| if (!seen.contains(split.getSplit())) { |
| // remove duplicates. The set has to be on the raw input split not the |
| // comparable input split as the latter overrides the compareTo method |
| // so its equality semantics is changed and not we want here |
| seen.add(split.getSplit()); |
| leftoverSplits.add(split); |
| } |
| } |
| } |
| |
| /* verification code |
| int combinedSplitLen = 0; |
| for (PigSplit split : result) |
| combinedSplitLen += split.getNumPaths(); |
| if (combinedSplitLen + leftoverSplits.size()!= nSplits-emptyCnt) |
| throw new AssertionError("number of combined splits ["+combinedSplitLen+"+"+leftoverSplits.size()+"] does not match the number of original splits ["+nSplits+"]."); |
| */ |
| if (!leftoverSplits.isEmpty()) |
| { |
| long totalSize = 0; |
| ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>(); |
| ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>(); |
| |
| int splitLen = leftoverSplits.size(); |
| for (int i = 0; i < splitLen; i++) |
| { |
| ComparableSplit split = leftoverSplits.get(i); |
| long thisLen = split.getSplit().getLength(); |
| if (totalSize + thisLen >= maxCombinedSplitSize) { |
| removeSplits(combinedComparableSplits); |
| result.add(combinedSplits); |
| resultLengths.add(totalSize); |
| combinedSplits = new ArrayList<InputSplit>(); |
| combinedComparableSplits.clear(); |
| totalSize = 0; |
| } |
| combinedSplits.add(split.getSplit()); |
| combinedComparableSplits.add(split); |
| totalSize += split.getSplit().getLength(); |
| if (i == splitLen - 1) { |
| // last piece: it could be very small, try to see it can be squeezed into any existing splits |
| for (int j =0; j < result.size(); j++) |
| { |
| if (resultLengths.get(j) + totalSize <= maxCombinedSplitSize) |
| { |
| List<InputSplit> isList = result.get(j); |
| for (InputSplit csplit : combinedSplits) { |
| isList.add(csplit); |
| } |
| removeSplits(combinedComparableSplits); |
| combinedSplits.clear(); |
| break; |
| } |
| } |
| if (!combinedSplits.isEmpty()) { |
| // last piece can not be squeezed in, create a new combined split for them. |
| removeSplits(combinedComparableSplits); |
| result.add(combinedSplits); |
| } |
| } |
| } |
| } |
| } |
| /* verification codes |
| int combinedSplitLen = 0; |
| for (PigSplit split : result) |
| combinedSplitLen += split.getNumPaths(); |
| if (combinedSplitLen != nSplits-emptyCnt) |
| throw new AssertionError("number of combined splits ["+combinedSplitLen+"] does not match the number of original splits ["+nSplits+"]."); |
| |
| long totalLen = 0; |
| for (PigSplit split : result) |
| totalLen += split.getLength(); |
| |
| long origTotalLen = 0; |
| for (InputSplit split : oneInputSplits) |
| origTotalLen += split.getLength(); |
| if (totalLen != origTotalLen) |
| throw new AssertionError("The total length ["+totalLen+"] does not match the original ["+origTotalLen+"]"); |
| */ |
| log.info("Total input paths (combined) to process : " + result.size()); |
| return result; |
| } |
| |
| private static void removeSplits(List<ComparableSplit> splits) { |
| for (ComparableSplit split: splits) |
| split.removeFromNodes(); |
| } |
| |
| public String inputSplitToString(InputSplit[] splits) throws IOException, InterruptedException { |
| // debugging purpose only |
| StringBuilder st = new StringBuilder(); |
| st.append("Number of splits :" + splits.length+"\n"); |
| long len = 0; |
| for (InputSplit split: splits) |
| len += split.getLength(); |
| st.append("Total Length = "+ len +"\n"); |
| for (int i = 0; i < splits.length; i++) { |
| st.append("Input split["+i+"]:\n Length = "+ splits[i].getLength()+"\n Locations:\n"); |
| for (String location : splits[i].getLocations()) |
| st.append(" "+location+"\n"); |
| st.append("\n-----------------------\n"); |
| } |
| return st.toString(); |
| } |
| |
| /* verification code: debug purpose only |
| public String inputSplitToString(ArrayList<ComparableSplit> splits) throws IOException, InterruptedException { |
| StringBuilder st = new StringBuilder(); |
| st.append("Number of splits :" + splits.size()+"\n"); |
| long len = 0; |
| for (ComparableSplit split: splits) |
| len += split.getSplit().getLength(); |
| st.append("Total Length = "+ len +"\n"); |
| for (int i = 0; i < splits.size(); i++) { |
| st.append("Input split["+i+"]:\n Length = "+ splits.get(i).getSplit().getLength()+"\n Locations:\n"); |
| for (String location : splits.get(i).getSplit().getLocations()) |
| st.append(" "+location+"\n"); |
| st.append("\n-----------------------\n"); |
| } |
| return st.toString(); |
| } |
| */ |
| } |