| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.tools; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.util.Random; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.Configuration.DeprecationDelta; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.WritableComparable; |
| import org.apache.hadoop.io.WritableComparator; |
| import org.apache.hadoop.mapred.FileInputFormat; |
| import org.apache.hadoop.mapred.FileOutputFormat; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.MapReduceBase; |
| import org.apache.hadoop.mapred.Mapper; |
| import org.apache.hadoop.mapred.OutputCollector; |
| import org.apache.hadoop.mapred.Reporter; |
| import org.apache.hadoop.mapred.TextInputFormat; |
| import org.apache.hadoop.mapred.TextOutputFormat; |
| import org.apache.hadoop.mapred.lib.LongSumReducer; |
| import org.apache.hadoop.mapreduce.lib.map.RegexMapper; |
| |
| /** |
| * Logalyzer: A utility tool for archiving and analyzing hadoop logs. |
| * <p> |
| * This tool supports archiving and anaylzing (sort/grep) of log-files. |
| * It takes as input |
| * a) Input uri which will serve uris of the logs to be archived. |
| * b) Output directory (not mandatory). |
| * b) Directory on dfs to archive the logs. |
| * c) The sort/grep patterns for analyzing the files and separator for boundaries. |
| * Usage: |
| * Logalyzer -archive -archiveDir <directory to archive logs> -analysis <directory> -logs <log-list uri> -grep <pattern> -sort <col1, col2> -separator <separator> |
| * <p> |
| */ |
| |
| public class Logalyzer { |
| // Constants |
| private static Configuration fsConfig = new Configuration(); |
| public static final String SORT_COLUMNS = |
| "logalizer.logcomparator.sort.columns"; |
| public static final String COLUMN_SEPARATOR = |
| "logalizer.logcomparator.column.separator"; |
| |
| static { |
| Configuration.addDeprecations(new DeprecationDelta[] { |
| new DeprecationDelta("mapred.reducer.sort", SORT_COLUMNS), |
| new DeprecationDelta("mapred.reducer.separator", COLUMN_SEPARATOR) |
| }); |
| } |
| |
| /** A {@link Mapper} that extracts text matching a regular expression. */ |
| public static class LogRegexMapper<K extends WritableComparable> |
| extends MapReduceBase |
| implements Mapper<K, Text, Text, LongWritable> { |
| |
| private Pattern pattern; |
| |
| public void configure(JobConf job) { |
| pattern = Pattern.compile(job.get(RegexMapper.PATTERN)); |
| } |
| |
| public void map(K key, Text value, |
| OutputCollector<Text, LongWritable> output, |
| Reporter reporter) |
| throws IOException { |
| String text = value.toString(); |
| Matcher matcher = pattern.matcher(text); |
| while (matcher.find()) { |
| output.collect(value, new LongWritable(1)); |
| } |
| } |
| |
| } |
| |
| /** A WritableComparator optimized for UTF8 keys of the logs. */ |
| public static class LogComparator extends Text.Comparator implements Configurable { |
| |
| private static Log LOG = LogFactory.getLog(Logalyzer.class); |
| private JobConf conf = null; |
| private String[] sortSpec = null; |
| private String columnSeparator = null; |
| |
| public void setConf(Configuration conf) { |
| if (conf instanceof JobConf) { |
| this.conf = (JobConf) conf; |
| } else { |
| this.conf = new JobConf(conf); |
| } |
| |
| //Initialize the specification for *comparision* |
| String sortColumns = this.conf.get(SORT_COLUMNS, null); |
| if (sortColumns != null) { |
| sortSpec = sortColumns.split(","); |
| } |
| |
| //Column-separator |
| columnSeparator = this.conf.get(COLUMN_SEPARATOR, ""); |
| } |
| |
| public Configuration getConf() { |
| return conf; |
| } |
| |
| public int compare(byte[] b1, int s1, int l1, |
| byte[] b2, int s2, int l2) { |
| |
| if (sortSpec == null) { |
| return super.compare(b1, s1, l1, b2, s2, l2); |
| } |
| |
| try { |
| Text logline1 = new Text(); |
| logline1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); |
| String line1 = logline1.toString(); |
| String[] logColumns1 = line1.split(columnSeparator); |
| |
| Text logline2 = new Text(); |
| logline2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); |
| String line2 = logline2.toString(); |
| String[] logColumns2 = line2.split(columnSeparator); |
| |
| if (logColumns1 == null || logColumns2 == null) { |
| return super.compare(b1, s1, l1, b2, s2, l2); |
| } |
| |
| //Compare column-wise according to *sortSpec* |
| for(int i=0; i < sortSpec.length; ++i) { |
| int column = (Integer.valueOf(sortSpec[i]).intValue()); |
| String c1 = logColumns1[column]; |
| String c2 = logColumns2[column]; |
| |
| //Compare columns |
| int comparision = super.compareBytes( |
| c1.getBytes(), 0, c1.length(), |
| c2.getBytes(), 0, c2.length() |
| ); |
| |
| //They differ! |
| if (comparision != 0) { |
| return comparision; |
| } |
| } |
| |
| } catch (IOException ioe) { |
| LOG.fatal("Caught " + ioe); |
| return 0; |
| } |
| |
| return 0; |
| } |
| |
| static { |
| // register this comparator |
| WritableComparator.define(Text.class, new LogComparator()); |
| } |
| } |
| |
| /** |
| * doArchive: Workhorse function to archive log-files. |
| * @param logListURI : The uri which will serve list of log-files to archive. |
| * @param archiveDirectory : The directory to store archived logfiles. |
| * @throws IOException |
| */ |
| public void |
| doArchive(String logListURI, String archiveDirectory) |
| throws IOException |
| { |
| String destURL = FileSystem.getDefaultUri(fsConfig) + archiveDirectory; |
| DistCpV1.copy(new JobConf(fsConfig), logListURI, destURL, null, true, false); |
| } |
| |
| /** |
| * doAnalyze: |
| * @param inputFilesDirectory : Directory containing the files to be analyzed. |
| * @param outputDirectory : Directory to store analysis (output). |
| * @param grepPattern : Pattern to *grep* for. |
| * @param sortColumns : Sort specification for output. |
| * @param columnSeparator : Column separator. |
| * @throws IOException |
| */ |
| public void |
| doAnalyze(String inputFilesDirectory, String outputDirectory, |
| String grepPattern, String sortColumns, String columnSeparator) |
| throws IOException |
| { |
| Path grepInput = new Path(inputFilesDirectory); |
| |
| Path analysisOutput = null; |
| if (outputDirectory.equals("")) { |
| analysisOutput = new Path(inputFilesDirectory, "logalyzer_" + |
| Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); |
| } else { |
| analysisOutput = new Path(outputDirectory); |
| } |
| |
| JobConf grepJob = new JobConf(fsConfig); |
| grepJob.setJobName("logalyzer-grep-sort"); |
| |
| FileInputFormat.setInputPaths(grepJob, grepInput); |
| grepJob.setInputFormat(TextInputFormat.class); |
| |
| grepJob.setMapperClass(LogRegexMapper.class); |
| grepJob.set(RegexMapper.PATTERN, grepPattern); |
| grepJob.set(SORT_COLUMNS, sortColumns); |
| grepJob.set(COLUMN_SEPARATOR, columnSeparator); |
| |
| grepJob.setCombinerClass(LongSumReducer.class); |
| grepJob.setReducerClass(LongSumReducer.class); |
| |
| FileOutputFormat.setOutputPath(grepJob, analysisOutput); |
| grepJob.setOutputFormat(TextOutputFormat.class); |
| grepJob.setOutputKeyClass(Text.class); |
| grepJob.setOutputValueClass(LongWritable.class); |
| grepJob.setOutputKeyComparatorClass(LogComparator.class); |
| |
| grepJob.setNumReduceTasks(1); // write a single file |
| |
| JobClient.runJob(grepJob); |
| } |
| |
| public static void main(String[] args) { |
| |
| Log LOG = LogFactory.getLog(Logalyzer.class); |
| |
| String version = "Logalyzer.0.0.1"; |
| String usage = "Usage: Logalyzer [-archive -logs <urlsFile>] " + |
| "-archiveDir <archiveDirectory> " + |
| "-grep <pattern> -sort <column1,column2,...> -separator <separator> " + |
| "-analysis <outputDirectory>"; |
| |
| System.out.println(version); |
| if (args.length == 0) { |
| System.err.println(usage); |
| System.exit(-1); |
| } |
| |
| //Command line arguments |
| boolean archive = false; |
| boolean grep = false; |
| boolean sort = false; |
| |
| String archiveDir = ""; |
| String logListURI = ""; |
| String grepPattern = ".*"; |
| String sortColumns = ""; |
| String columnSeparator = " "; |
| String outputDirectory = ""; |
| |
| for (int i = 0; i < args.length; i++) { // parse command line |
| if (args[i].equals("-archive")) { |
| archive = true; |
| } else if (args[i].equals("-archiveDir")) { |
| archiveDir = args[++i]; |
| } else if (args[i].equals("-grep")) { |
| grep = true; |
| grepPattern = args[++i]; |
| } else if (args[i].equals("-logs")) { |
| logListURI = args[++i]; |
| } else if (args[i].equals("-sort")) { |
| sort = true; |
| sortColumns = args[++i]; |
| } else if (args[i].equals("-separator")) { |
| columnSeparator = args[++i]; |
| } else if (args[i].equals("-analysis")) { |
| outputDirectory = args[++i]; |
| } |
| } |
| |
| LOG.info("analysisDir = " + outputDirectory); |
| LOG.info("archiveDir = " + archiveDir); |
| LOG.info("logListURI = " + logListURI); |
| LOG.info("grepPattern = " + grepPattern); |
| LOG.info("sortColumns = " + sortColumns); |
| LOG.info("separator = " + columnSeparator); |
| |
| try { |
| Logalyzer logalyzer = new Logalyzer(); |
| |
| // Archive? |
| if (archive) { |
| logalyzer.doArchive(logListURI, archiveDir); |
| } |
| |
| // Analyze? |
| if (grep || sort) { |
| logalyzer.doAnalyze(archiveDir, outputDirectory, grepPattern, sortColumns, columnSeparator); |
| } |
| } catch (IOException ioe) { |
| ioe.printStackTrace(); |
| System.exit(-1); |
| } |
| |
| } //main |
| |
| } //class Logalyzer |