| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hama.bsp; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.BlockLocation; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.Reporter; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.net.NetworkTopology; |
| import org.apache.hadoop.net.NodeBase; |
| |
| /** |
| * An abstract {@link org.apache.hadoop.mapred.InputFormat}. Splits are |
| * constructed from the files under the input paths. A split cannot have files |
| * from different pools. Each split returned may contain blocks from different |
| * files. If a maxSplitSize is specified, then blocks on the same node are |
| * combined to form a single split. Blocks that are left over are then combined |
| * with other blocks in the same rack. If maxSplitSize is not specified, then |
| * blocks from the same rack are combined in a single split; no attempt is made |
| * to create node-local splits. If the maxSplitSize is equal to the block size, |
| * then this class is similar to the default spliting behaviour in Hadoop: each |
| * block is a locally processed split. Subclasses implement |
| * {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)} |
| * to construct <code>RecordReader</code>'s for <code>CombineFileSplit</code>'s. |
| * |
| * @see CombineFileSplit |
| */ |
| public abstract class CombineFileInputFormat<K, V> extends |
| FileInputFormat<K, V> { |
| |
| // ability to limit the size of a single split |
| private long maxSplitSize = 0; |
| private long minSplitSizeNode = 0; |
| private long minSplitSizeRack = 0; |
| |
| // A pool of input paths filters. A split cannot have blocks from files |
| // across multiple pools. |
| private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>(); |
| |
| // mapping from a rack name to the set of Nodes in the rack |
| private static HashMap<String, Set<String>> rackToNodes = new HashMap<String, Set<String>>(); |
| |
| /** |
| * This has to be overridden from concrete formats, we provide a SequenceFile |
| * version of it for partitioning. |
| * |
| * @param split |
| * @param context |
| * @return a {@link RecordReader} to read the input for processing. |
| * @throws IOException |
| */ |
| public abstract RecordReader<K, V> createRecordReader(InputSplit split, |
| TaskAttemptContext context) throws IOException; |
| |
| /** |
| * Specify the maximum size (in bytes) of each split. Each split is |
| * approximately equal to the specified size. |
| */ |
| protected void setMaxSplitSize(long maxSplitSize) { |
| this.maxSplitSize = maxSplitSize; |
| } |
| |
| /** |
| * Specify the minimum size (in bytes) of each split per node. This applies to |
| * data that is left over after combining data on a single node into splits |
| * that are of maximum size specified by maxSplitSize. This leftover data will |
| * be combined into its own split if its size exceeds minSplitSizeNode. |
| */ |
| protected void setMinSplitSizeNode(long minSplitSizeNode) { |
| this.minSplitSizeNode = minSplitSizeNode; |
| } |
| |
| /** |
| * Specify the minimum size (in bytes) of each split per rack. This applies to |
| * data that is left over after combining data on a single rack into splits |
| * that are of maximum size specified by maxSplitSize. This leftover data will |
| * be combined into its own split if its size exceeds minSplitSizeRack. |
| */ |
| protected void setMinSplitSizeRack(long minSplitSizeRack) { |
| this.minSplitSizeRack = minSplitSizeRack; |
| } |
| |
| /** |
| * Create a new pool and add the filters to it. A split cannot have files from |
| * different pools. |
| */ |
| protected void createPool(Configuration conf, List<PathFilter> filters) { |
| pools.add(new MultiPathFilter(filters)); |
| } |
| |
| /** |
| * Create a new pool and add the filters to it. A pathname can satisfy any one |
| * of the specified filters. A split cannot have files from different pools. |
| */ |
| protected void createPool(Configuration conf, PathFilter... filters) { |
| MultiPathFilter multi = new MultiPathFilter(); |
| for (PathFilter f : filters) { |
| multi.add(f); |
| } |
| pools.add(multi); |
| } |
| |
| /** |
| * default constructor |
| */ |
| public CombineFileInputFormat() { |
| } |
| |
| @Override |
| public InputSplit[] getSplits(BSPJob bspJob, int numSplits) |
| throws IOException { |
| |
| Configuration job = bspJob.getConfiguration(); |
| |
| long minSizeNode = 0; |
| long minSizeRack = 0; |
| long maxSize = 0; |
| |
| // the values specified by setxxxSplitSize() takes precedence over the |
| // values that might have been specified in the config |
| if (minSplitSizeNode != 0) { |
| minSizeNode = minSplitSizeNode; |
| } else { |
| minSizeNode = job.getLong("mapred.min.split.size.per.node", 0); |
| } |
| if (minSplitSizeRack != 0) { |
| minSizeRack = minSplitSizeRack; |
| } else { |
| minSizeRack = job.getLong("mapred.min.split.size.per.rack", 0); |
| } |
| if (maxSplitSize != 0) { |
| maxSize = maxSplitSize; |
| } else { |
| maxSize = job.getLong("mapred.max.split.size", 0); |
| } |
| if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { |
| throw new IOException("Minimum split size pernode " + minSizeNode |
| + " cannot be larger than maximum split size " + maxSize); |
| } |
| if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { |
| throw new IOException("Minimum split size per rack" + minSizeRack |
| + " cannot be larger than maximum split size " + maxSize); |
| } |
| if (minSizeRack != 0 && minSizeNode > minSizeRack) { |
| throw new IOException("Minimum split size per node" + minSizeNode |
| + " cannot be smaller than minimum split size per rack " |
| + minSizeRack); |
| } |
| |
| // all the files in input set |
| Path[] paths = FileUtil.stat2Paths(listStatus(bspJob)); |
| List<CombineFileSplit> splits = new ArrayList<CombineFileSplit>(); |
| if (paths.length == 0) { |
| return splits.toArray(new CombineFileSplit[splits.size()]); |
| } |
| |
| // In one single iteration, process all the paths in a single pool. |
| // Processing one pool at a time ensures that a split contans paths |
| // from a single pool only. |
| for (MultiPathFilter onepool : pools) { |
| ArrayList<Path> myPaths = new ArrayList<Path>(); |
| |
| // pick one input path. If it matches all the filters in a pool, |
| // add it to the output set |
| for (int i = 0; i < paths.length; i++) { |
| if (paths[i] == null) { // already processed |
| continue; |
| } |
| Path p = new Path(paths[i].toUri().getPath()); |
| if (onepool.accept(p)) { |
| myPaths.add(paths[i]); // add it to my output set |
| paths[i] = null; // already processed |
| } |
| } |
| // create splits for all files in this pool. |
| getMoreSplits(bspJob, myPaths.toArray(new Path[myPaths.size()]), maxSize, |
| minSizeNode, minSizeRack, splits); |
| } |
| |
| // Finally, process all paths that do not belong to any pool. |
| ArrayList<Path> myPaths = new ArrayList<Path>(); |
| for (Path path : paths) { |
| if (path == null) { // already processed |
| continue; |
| } |
| myPaths.add(path); |
| } |
| // create splits for all files that are not in any pool. |
| getMoreSplits(bspJob, myPaths.toArray(new Path[myPaths.size()]), maxSize, |
| minSizeNode, minSizeRack, splits); |
| |
| // free up rackToNodes map |
| rackToNodes.clear(); |
| return splits.toArray(new CombineFileSplit[splits.size()]); |
| } |
| |
| /** |
| * Return all the splits in the specified set of paths |
| */ |
| private static void getMoreSplits(BSPJob job, Path[] paths, long maxSize, |
| long minSizeNode, long minSizeRack, List<CombineFileSplit> splits) |
| throws IOException { |
| |
| // all blocks for all the files in input set |
| OneFileInfo[] files; |
| |
| // mapping from a rack name to the list of blocks it has |
| HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>(); |
| |
| // mapping from a block to the nodes on which it has replicas |
| HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>(); |
| |
| // mapping from a node to the list of blocks that it contains |
| HashMap<String, List<OneBlockInfo>> nodeToBlocks = new HashMap<String, List<OneBlockInfo>>(); |
| |
| files = new OneFileInfo[paths.length]; |
| if (paths.length == 0) { |
| return; |
| } |
| |
| // populate all the blocks for all files |
| for (int i = 0; i < paths.length; i++) { |
| files[i] = new OneFileInfo(paths[i], job, rackToBlocks, blockToNodes, |
| nodeToBlocks); |
| } |
| |
| ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>(); |
| ArrayList<String> nodes = new ArrayList<String>(); |
| long curSplitSize = 0; |
| |
| // process all nodes and create splits that are local |
| // to a node. |
| for (Map.Entry<String, List<OneBlockInfo>> one : nodeToBlocks.entrySet()) { |
| |
| nodes.add(one.getKey()); |
| List<OneBlockInfo> blocksInNode = one.getValue(); |
| |
| // for each block, copy it into validBlocks. Delete it from |
| // blockToNodes so that the same block does not appear in |
| // two different splits. |
| for (OneBlockInfo oneblock : blocksInNode) { |
| if (blockToNodes.containsKey(oneblock)) { |
| validBlocks.add(oneblock); |
| blockToNodes.remove(oneblock); |
| curSplitSize += oneblock.length; |
| |
| // if the accumulated split size exceeds the maximum, then |
| // create this split. |
| if (maxSize != 0 && curSplitSize >= maxSize) { |
| // create an input split and add it to the splits array |
| addCreatedSplit(job, splits, nodes, validBlocks); |
| curSplitSize = 0; |
| validBlocks.clear(); |
| } |
| } |
| } |
| // if there were any blocks left over and their combined size is |
| // larger than minSplitNode, then combine them into one split. |
| // Otherwise add them back to the unprocessed pool. It is likely |
| // that they will be combined with other blocks from the same rack later |
| // on. |
| if (minSizeNode != 0 && curSplitSize >= minSizeNode) { |
| // create an input split and add it to the splits array |
| addCreatedSplit(job, splits, nodes, validBlocks); |
| } else { |
| for (OneBlockInfo oneblock : validBlocks) { |
| blockToNodes.put(oneblock, oneblock.hosts); |
| } |
| } |
| validBlocks.clear(); |
| nodes.clear(); |
| curSplitSize = 0; |
| } |
| |
| // if blocks in a rack are below the specified minimum size, then keep them |
| // in 'overflow'. After the processing of all racks is complete, these |
| // overflow |
| // blocks will be combined into splits. |
| ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>(); |
| ArrayList<String> racks = new ArrayList<String>(); |
| |
| // Process all racks over and over again until there is no more work to do. |
| while (blockToNodes.size() > 0) { |
| |
| // Create one split for this rack before moving over to the next rack. |
| // Come back to this rack after creating a single split for each of the |
| // remaining racks. |
| // Process one rack location at a time, Combine all possible blocks that |
| // reside on this rack as one split. (constrained by minimum and maximum |
| // split size). |
| |
| // iterate over all racks |
| for (Map.Entry<String, List<OneBlockInfo>> one : rackToBlocks.entrySet()) { |
| |
| racks.add(one.getKey()); |
| List<OneBlockInfo> blocks = one.getValue(); |
| |
| // for each block, copy it into validBlocks. Delete it from |
| // blockToNodes so that the same block does not appear in |
| // two different splits. |
| boolean createdSplit = false; |
| for (OneBlockInfo oneblock : blocks) { |
| if (blockToNodes.containsKey(oneblock)) { |
| validBlocks.add(oneblock); |
| blockToNodes.remove(oneblock); |
| curSplitSize += oneblock.length; |
| |
| // if the accumulated split size exceeds the maximum, then |
| // create this split. |
| if (maxSize != 0 && curSplitSize >= maxSize) { |
| // create an input split and add it to the splits array |
| addCreatedSplit(job, splits, getHosts(racks), validBlocks); |
| createdSplit = true; |
| break; |
| } |
| } |
| } |
| |
| // if we created a split, then just go to the next rack |
| if (createdSplit) { |
| curSplitSize = 0; |
| validBlocks.clear(); |
| racks.clear(); |
| continue; |
| } |
| |
| if (!validBlocks.isEmpty()) { |
| if (minSizeRack != 0 && curSplitSize >= minSizeRack) { |
| // if there is a mimimum size specified, then create a single split |
| // otherwise, store these blocks into overflow data structure |
| addCreatedSplit(job, splits, getHosts(racks), validBlocks); |
| } else { |
| // There were a few blocks in this rack that remained to be |
| // processed. |
| // Keep them in 'overflow' block list. These will be combined later. |
| overflowBlocks.addAll(validBlocks); |
| } |
| } |
| curSplitSize = 0; |
| validBlocks.clear(); |
| racks.clear(); |
| } |
| } |
| |
| assert blockToNodes.isEmpty(); |
| assert curSplitSize == 0; |
| assert validBlocks.isEmpty(); |
| assert racks.isEmpty(); |
| |
| // Process all overflow blocks |
| for (OneBlockInfo oneblock : overflowBlocks) { |
| validBlocks.add(oneblock); |
| curSplitSize += oneblock.length; |
| |
| // This might cause an exiting rack location to be re-added, |
| // but it should be ok. |
| Collections.addAll(racks, oneblock.racks); |
| |
| // if the accumulated split size exceeds the maximum, then |
| // create this split. |
| if (maxSize != 0 && curSplitSize >= maxSize) { |
| // create an input split and add it to the splits array |
| addCreatedSplit(job, splits, getHosts(racks), validBlocks); |
| curSplitSize = 0; |
| validBlocks.clear(); |
| racks.clear(); |
| } |
| } |
| |
| // Process any remaining blocks, if any. |
| if (!validBlocks.isEmpty()) { |
| addCreatedSplit(job, splits, getHosts(racks), validBlocks); |
| } |
| } |
| |
| /** |
| * Create a single split from the list of blocks specified in validBlocks Add |
| * this new split into splitList. |
| */ |
| private static void addCreatedSplit(BSPJob job, |
| List<CombineFileSplit> splitList, List<String> locations, |
| ArrayList<OneBlockInfo> validBlocks) { |
| // create an input split |
| Path[] fl = new Path[validBlocks.size()]; |
| long[] offset = new long[validBlocks.size()]; |
| long[] length = new long[validBlocks.size()]; |
| for (int i = 0; i < validBlocks.size(); i++) { |
| fl[i] = validBlocks.get(i).onepath; |
| offset[i] = validBlocks.get(i).offset; |
| length[i] = validBlocks.get(i).length; |
| } |
| |
| // add this split to the list that is returned |
| CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, length, |
| locations.toArray(new String[locations.size()])); |
| splitList.add(thissplit); |
| } |
| |
| /** |
| * information about one file from the File System |
| */ |
| private static class OneFileInfo { |
| private long fileSize; // size of the file |
| private OneBlockInfo[] blocks; // all blocks in this file |
| |
| OneFileInfo(Path path, BSPJob job, |
| HashMap<String, List<OneBlockInfo>> rackToBlocks, |
| HashMap<OneBlockInfo, String[]> blockToNodes, |
| HashMap<String, List<OneBlockInfo>> nodeToBlocks) throws IOException { |
| this.fileSize = 0; |
| |
| // get block locations from file system |
| FileSystem fs = path.getFileSystem(job.getConfiguration()); |
| FileStatus stat = fs.getFileStatus(path); |
| BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, |
| stat.getLen()); |
| // create a list of all block and their locations |
| if (locations == null) { |
| blocks = new OneBlockInfo[0]; |
| } else { |
| blocks = new OneBlockInfo[locations.length]; |
| for (int i = 0; i < locations.length; i++) { |
| |
| fileSize += locations[i].getLength(); |
| OneBlockInfo oneblock = new OneBlockInfo(path, |
| locations[i].getOffset(), locations[i].getLength(), |
| locations[i].getHosts(), locations[i].getTopologyPaths()); |
| blocks[i] = oneblock; |
| |
| // add this block to the block --> node locations map |
| blockToNodes.put(oneblock, oneblock.hosts); |
| |
| // add this block to the rack --> block map |
| for (int j = 0; j < oneblock.racks.length; j++) { |
| String rack = oneblock.racks[j]; |
| List<OneBlockInfo> blklist = rackToBlocks.get(rack); |
| if (blklist == null) { |
| blklist = new ArrayList<OneBlockInfo>(); |
| rackToBlocks.put(rack, blklist); |
| } |
| blklist.add(oneblock); |
| // Add this host to rackToNodes map |
| addHostToRack(oneblock.racks[j], oneblock.hosts[j]); |
| } |
| |
| // add this block to the node --> block map |
| for (int j = 0; j < oneblock.hosts.length; j++) { |
| String node = oneblock.hosts[j]; |
| List<OneBlockInfo> blklist = nodeToBlocks.get(node); |
| if (blklist == null) { |
| blklist = new ArrayList<OneBlockInfo>(); |
| nodeToBlocks.put(node, blklist); |
| } |
| blklist.add(oneblock); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * information about one block from the File System |
| */ |
| private static class OneBlockInfo { |
| Path onepath; // name of this file |
| long offset; // offset in file |
| long length; // length of this block |
| String[] hosts; // nodes on whch this block resides |
| String[] racks; // network topology of hosts |
| |
| OneBlockInfo(Path path, long offset, long len, String[] hosts, |
| String[] pTopologyPaths) { |
| String[] topologyPaths = pTopologyPaths; |
| this.onepath = path; |
| this.offset = offset; |
| this.hosts = hosts; |
| this.length = len; |
| assert (hosts.length == topologyPaths.length || topologyPaths.length == 0); |
| |
| // if the file ystem does not have any rack information, then |
| // use dummy rack location. |
| if (topologyPaths.length == 0) { |
| topologyPaths = new String[hosts.length]; |
| for (int i = 0; i < topologyPaths.length; i++) { |
| topologyPaths[i] = (new NodeBase(hosts[i], |
| NetworkTopology.DEFAULT_RACK)).toString(); |
| } |
| } |
| |
| // The topology paths have the host name included as the last |
| // component. Strip it. |
| this.racks = new String[topologyPaths.length]; |
| for (int i = 0; i < topologyPaths.length; i++) { |
| this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation(); |
| } |
| } |
| } |
| |
| private static void addHostToRack(String rack, String host) { |
| Set<String> hosts = rackToNodes.get(rack); |
| if (hosts == null) { |
| hosts = new HashSet<String>(); |
| rackToNodes.put(rack, hosts); |
| } |
| hosts.add(host); |
| } |
| |
| private static List<String> getHosts(List<String> racks) { |
| List<String> hosts = new ArrayList<String>(); |
| for (String rack : racks) { |
| hosts.addAll(rackToNodes.get(rack)); |
| } |
| return hosts; |
| } |
| |
| /** |
| * Accept a path only if any one of filters given in the constructor do. |
| */ |
| private static class MultiPathFilter implements PathFilter { |
| private List<PathFilter> filters; |
| |
| public MultiPathFilter() { |
| this.filters = new ArrayList<PathFilter>(); |
| } |
| |
| public MultiPathFilter(List<PathFilter> filters) { |
| this.filters = filters; |
| } |
| |
| public void add(PathFilter one) { |
| filters.add(one); |
| } |
| |
| @Override |
| public boolean accept(Path path) { |
| for (PathFilter filter : filters) { |
| if (filter.accept(path)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuffer buf = new StringBuffer(); |
| buf.append("["); |
| for (PathFilter f : filters) { |
| buf.append(f); |
| buf.append(","); |
| } |
| buf.append("]"); |
| return buf.toString(); |
| } |
| } |
| } |