blob: 9e874d20e73183a7453f547876ea4e0c735cd731 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;
import org.apache.hadoop.chukwa.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
public class ConstRateValidator extends Configured implements Tool{
public static class ByteRange implements WritableComparable<ByteRange> {
String stream;
String split ="";
public long start;
public long len;
public ByteRange() {
start=len=0;
}
public ByteRange(ChunkImpl val) {
len = val.getLength();
start = val.getSeqID() - len;
this.stream = val.getSource()+":"+val.getStreamName() ;
}
@Override
public void readFields(DataInput in) throws IOException {
stream = in.readUTF();
split = in.readUTF();
start = in.readLong();
len = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(stream);
out.writeUTF(split);
out.writeLong(start);
out.writeLong(len);
}
public static ByteRange read(DataInput in) throws IOException {
ByteRange b = new ByteRange();
b.readFields(in);
return b;
}
@Override
public int compareTo(ByteRange o) {
int c = stream.compareTo(o.stream);
if(c != 0)
return c;
if(start > o.start)
return 1;
else if (start < o.start)
return -1;
else {
if(len > o.len)
return 1;
else if(len < o.len)
return -1;
else
return split.compareTo(o.split);
}
}
public boolean equals(Object o) {
if(o instanceof ByteRange) {
ByteRange rhs = (ByteRange) o;
return stream.equals(rhs.stream) &&
split.equals(rhs.split)&& rhs.start == start && rhs.len == len;
} else
return false;
}
public int hashCode() {
return (int) (
stream.hashCode() ^ (len>>32) ^ (len & 0xFFFFFFFF) ^ (start >> 32)
^ (start & 0xFFFFFFFF));
}
}
/////// State machine; expects chunks in order ////////
public static class ValidatorSM {
public long ok=0, missingBytes=0,dupBytes=0;
long consecDupchunks=0;
long nextExpectedStart = 0;
public long chunks;
public long dupChunks;
public Set<String> filesContaining = new LinkedHashSet<String>();
public String closeSM() {
if(consecDupchunks > 0)
return consecDupchunks + " consecutive duplicate chunks ending at " + consecDupchunks;
else
return null;
}
public String advanceSM(ByteRange b) {
if(!b.split.equals(""))
filesContaining.add(b.split);
chunks++;
if(b.start == nextExpectedStart) {
String msg = null;
if(consecDupchunks > 0)
msg = consecDupchunks + " consecutive duplicative chunks ending at " + b.start;
consecDupchunks = 0;
nextExpectedStart += b.len;
ok += b.len;
return msg;
} else{
// Text msg = new Text(b.stream + " " + consecOKchunks +
// "consecutive OK chunks ending at " + nextExpectedStart);
String msg;
if(b.start < nextExpectedStart) { //duplicate bytes
consecDupchunks ++;
dupChunks++;
long duplicatedBytes;
if(b.start + b.len <= nextExpectedStart) {
duplicatedBytes = b.len;
msg =" dupchunk of length " + b.len + " at " + b.start;
} else {
duplicatedBytes = b.start + b.len - nextExpectedStart;
ok += b.len - duplicatedBytes;
msg = " overlap of " + duplicatedBytes+ " starting at " + b.start +
" (total chunk len ="+b.len+")";
}
dupBytes += duplicatedBytes;
nextExpectedStart = Math.max(b.start + b.len, nextExpectedStart);
} else { //b.start > nextExpectedStart ==> missing bytes
consecDupchunks = 0;
long missing = (b.start - nextExpectedStart);
msg = "==Missing "+ missing+ " bytes starting from " + nextExpectedStart;
nextExpectedStart = b.start + b.len;
if(b.start < 0 || b.len < 0)
System.out.println("either len or start was negative; something is seriously wrong");
missingBytes += missing;
}
return msg;
} //end not-OK
} //end advance
} //end class
/////// Map Class /////////
public static class MapClass extends Mapper <ChukwaArchiveKey, ChunkImpl, ByteRange, NullWritable> {
@Override
protected void map(ChukwaArchiveKey key, ChunkImpl val,
Mapper<ChukwaArchiveKey, ChunkImpl,ByteRange, NullWritable>.Context context)
throws IOException, InterruptedException
{
boolean valid = ConstRateAdaptor.checkChunk(val);
String fname = "unknown";
ByteRange ret = new ByteRange(val);
InputSplit inSplit = context.getInputSplit();
if(inSplit instanceof FileSplit) {
FileSplit fs = (FileSplit) inSplit;
fname = fs.getPath().getName();
}
ret.split = fname;
if(!valid) {
context.getCounter("app", "badchunks").increment(1);
}
context.write(ret, NullWritable.get());
}
}
public static class ReduceClass extends Reducer<ByteRange, NullWritable, Text,Text> {
ValidatorSM sm;
String curStream = "";
public ReduceClass() {
sm = new ValidatorSM();
}
// @Override
// protected void setup(Reducer<ByteRange, NullWritable, Text,Text>.Context context) { }
@Override
protected void reduce(ByteRange b, Iterable<NullWritable> vals,
Reducer<ByteRange, NullWritable, Text,Text>.Context context) {
try {
if(!curStream.equals(b.stream)) {
if(!curStream.equals("")) {
printEndOfStream(context);
}
System.out.println("rolling over to new stream " + b.stream);
curStream = b.stream;
sm = new ValidatorSM();
}
String msg = sm.advanceSM(b);
if(msg != null)
context.write(new Text(b.stream), new Text(msg));
} catch(InterruptedException e) {
} catch(IOException e) {
e.printStackTrace();
}
}
@Override
protected void cleanup(Reducer<ByteRange, NullWritable, Text,Text>.Context context)
throws IOException, InterruptedException{
printEndOfStream(context);
}
public void printEndOfStream(Reducer<ByteRange, NullWritable, Text,Text>.Context context)
throws IOException, InterruptedException {
Text cs = new Text(curStream);
String t = sm.closeSM();
if(t != null)
context.write(cs, new Text(t));
if(!sm.filesContaining.isEmpty()) {
StringBuilder sb = new StringBuilder();
sb.append("Data contained in");
for(String s: sm.filesContaining)
sb.append(" ").append(s);
context.write(cs, new Text(sb.toString()));
}
context.write(cs, new Text("total of " + sm.chunks + " chunks ("
+ sm.dupChunks + " dups). " +" High byte =" + (sm.nextExpectedStart-1)));
context.getCounter("app", "missing bytes").increment(sm.missingBytes);
context.getCounter("app", "duplicate bytes").increment(sm.dupBytes);
context.getCounter("app", "OK Bytes").increment(sm.ok);
}
} //end reduce class
public static void main(String[] args) throws Exception {
// System.out.println("specify -D textOutput=true for text output");
int res = ToolRunner.run(new Configuration(),
new ConstRateValidator(), args);
return;
}
@Override
public int run(String[] real_args) throws Exception {
GenericOptionsParser gop = new GenericOptionsParser(getConf(), real_args);
Configuration conf = gop.getConfiguration();
String[] args = gop.getRemainingArgs();
Job validate = new Job(conf);
validate.setJobName("Chukwa Test pattern validator");
validate.setJarByClass(this.getClass());
validate.setInputFormatClass(SequenceFileInputFormat.class);
validate.setMapperClass(MapClass.class);
validate.setMapOutputKeyClass(ByteRange.class);
validate.setMapOutputValueClass(NullWritable.class);
validate.setReducerClass(ReduceClass.class);
validate.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(validate, new Path(args[0]));
FileOutputFormat.setOutputPath(validate, new Path(args[1]));
validate.submit();
return 0;
}
}