| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.lang.Runtime; |
| import java.net.URL; |
| import java.net.URLConnection; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.StringTokenizer; |
| import java.util.TreeSet; |
| import javax.xml.parsers.DocumentBuilder; |
| import javax.xml.parsers.DocumentBuilderFactory; |
| import javax.xml.parsers.ParserConfigurationException; |
| import org.w3c.dom.Document; |
| import org.w3c.dom.NodeList; |
| import org.xml.sax.SAXException; |
| |
| /** |
| * This class repeatedly queries a namenode looking for corrupt replicas. If |
| * any are found a provided hadoop job is launched and the output printed |
| * to stdout. |
| * |
| * The syntax is: |
| * |
| * java BlockForensics http://[namenode]:[port]/corrupt_replicas_xml.jsp |
| * [sleep time between namenode query for corrupt blocks |
| * (in seconds)] [mapred jar location] [hdfs input path] |
| * |
| * All arguments are required. |
| */ |
| public class BlockForensics { |
| |
| public static String join(List<?> l, String sep) { |
| StringBuilder sb = new StringBuilder(); |
| Iterator it = l.iterator(); |
| |
| while(it.hasNext()){ |
| sb.append(it.next()); |
| if (it.hasNext()) { |
| sb.append(sep); |
| } |
| } |
| |
| return sb.toString(); |
| } |
| |
| |
| // runs hadoop command and prints output to stdout |
| public static void runHadoopCmd(String ... args) |
| throws IOException { |
| String hadoop_home = System.getenv("HADOOP_PREFIX"); |
| |
| List<String> l = new LinkedList<String>(); |
| l.add("bin/hadoop"); |
| l.addAll(Arrays.asList(args)); |
| |
| ProcessBuilder pb = new ProcessBuilder(l); |
| |
| if (hadoop_home != null) { |
| pb.directory(new File(hadoop_home)); |
| } |
| |
| pb.redirectErrorStream(true); |
| |
| Process p = pb.start(); |
| |
| BufferedReader br = new BufferedReader( |
| new InputStreamReader(p.getInputStream())); |
| String line; |
| |
| while ((line = br.readLine()) != null) { |
| System.out.println(line); |
| } |
| |
| |
| } |
| |
| public static void main(String[] args) |
| throws SAXException, ParserConfigurationException, |
| InterruptedException, IOException { |
| |
| if (System.getenv("HADOOP_PREFIX") == null) { |
| System.err.println("The environmental variable HADOOP_PREFIX is undefined"); |
| System.exit(1); |
| } |
| |
| |
| if (args.length < 4) { |
| System.out.println("Usage: java BlockForensics [http://namenode:port/" |
| + "corrupt_replicas_xml.jsp] [sleep time between " |
| + "requests (in milliseconds)] [mapred jar location] " |
| + "[hdfs input path]"); |
| return; |
| } |
| |
| int sleepTime = 30000; |
| |
| try { |
| sleepTime = Integer.parseInt(args[1]); |
| } catch (NumberFormatException e) { |
| System.out.println("The sleep time entered is invalid, " |
| + "using default value: "+sleepTime+"ms"); |
| } |
| |
| Set<Long> blockIds = new TreeSet<Long>(); |
| |
| while (true) { |
| InputStream xml = new URL(args[0]).openConnection().getInputStream(); |
| |
| DocumentBuilderFactory fact = DocumentBuilderFactory.newInstance(); |
| DocumentBuilder builder = fact.newDocumentBuilder(); |
| Document doc = builder.parse(xml); |
| |
| NodeList corruptReplicaNodes = doc.getElementsByTagName("block_id"); |
| |
| List<Long> searchBlockIds = new LinkedList<Long>(); |
| for(int i=0; i<corruptReplicaNodes.getLength(); i++) { |
| Long blockId = new Long(corruptReplicaNodes.item(i) |
| .getFirstChild() |
| .getNodeValue()); |
| if (!blockIds.contains(blockId)) { |
| blockIds.add(blockId); |
| searchBlockIds.add(blockId); |
| } |
| } |
| |
| if (searchBlockIds.size() > 0) { |
| String blockIdsStr = BlockForensics.join(searchBlockIds, ","); |
| System.out.println("\nSearching for: " + blockIdsStr); |
| String tmpDir = |
| new String("/tmp-block-forensics-" + |
| Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); |
| |
| System.out.println("Using temporary dir: "+tmpDir); |
| |
| // delete tmp dir |
| BlockForensics.runHadoopCmd("fs", "-rmr", tmpDir); |
| |
| // launch mapred job |
| BlockForensics.runHadoopCmd("jar", |
| args[2], // jar location |
| args[3], // input dir |
| tmpDir, // output dir |
| blockIdsStr// comma delimited list of blocks |
| ); |
| // cat output |
| BlockForensics.runHadoopCmd("fs", "-cat", tmpDir+"/part*"); |
| |
| // delete temp dir |
| BlockForensics.runHadoopCmd("fs", "-rmr", tmpDir); |
| |
| int sleepSecs = (int)(sleepTime/1000.); |
| System.out.print("Sleeping for "+sleepSecs |
| + " second"+(sleepSecs == 1?"":"s")+"."); |
| } |
| |
| System.out.print("."); |
| Thread.sleep(sleepTime); |
| |
| } |
| } |
| } |