blob: afc78ed53b738ac0c519fe564ee84ab98d85118b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
import java.io.IOException;
import java.net.InetAddress;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
import org.apache.hadoop.chukwa.extraction.engine.Record;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
@Table(name="Hadoop",columnFamily="ClientTrace")
public class ClientTraceProcessor extends AbstractProcessor {
private static final String recordType = "ClientTrace";
private final SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
private final Matcher kvMatcher;
private final Matcher idMatcher;
private final Matcher ipMatcher;
// extract date, source
private final Pattern idPattern =
Pattern.compile("^(.{23}).*clienttrace.*");
// extract "key: value" pairs
private final Pattern kvPattern =
Pattern.compile("\\s+(\\w+):\\s+([^,]+)");
private final Pattern ipPattern =
Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
public ClientTraceProcessor() {
super();
kvMatcher = kvPattern.matcher("");
idMatcher = idPattern.matcher("");
ipMatcher = ipPattern.matcher("");
}
public enum Locality {
LOCAL("local"), INTRA("intra_rack"), INTER("inter_rack");
String lbl;
Locality(String lbl) {
this.lbl = lbl;
}
public String getLabel() {
return lbl;
}
};
protected Locality getLocality(String src, String dst) throws Exception {
if (null == src || null == dst) {
throw new IOException("Missing src/dst");
}
ipMatcher.reset(src);
if (!ipMatcher.find()) {
throw new IOException("Could not find src");
}
byte[] srcIP = InetAddress.getByName(ipMatcher.group(0)).getAddress();
ipMatcher.reset(dst);
if (!ipMatcher.find()) {
throw new IOException("Could not find dst");
}
byte[] dstIP = InetAddress.getByName(ipMatcher.group(0)).getAddress();
for (int i = 0; i < 4; ++i) {
if (srcIP[i] != dstIP[i]) {
return (3 == i && (srcIP[i] & 0xC0) == (dstIP[i] & 0xC0))
? Locality.INTRA
: Locality.INTER;
}
}
return Locality.LOCAL;
}
@Override
public void parse(String recordEntry,
OutputCollector<ChukwaRecordKey,ChukwaRecord> output, Reporter reporter)
throws Throwable {
try {
idMatcher.reset(recordEntry);
long ms;
long ms_fullresolution;
if (idMatcher.find()) {
ms = sdf.parse(idMatcher.group(1)).getTime();
ms_fullresolution = ms;
} else {
throw new IOException("Could not find date/source");
}
kvMatcher.reset(recordEntry);
if (!kvMatcher.find()) {
throw new IOException("Failed to find record");
}
ChukwaRecord rec = new ChukwaRecord();
do {
rec.add(kvMatcher.group(1), kvMatcher.group(2));
} while (kvMatcher.find());
Locality loc = getLocality(rec.getValue("src"), rec.getValue("dest"));
rec.add("locality", loc.getLabel());
calendar.setTimeInMillis(ms);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
ms = calendar.getTimeInMillis();
calendar.set(Calendar.MINUTE, 0);
key.setKey(calendar.getTimeInMillis() + "/" + loc.getLabel() + "/" +
rec.getValue("op").toLowerCase() + "/" + ms);
key.setReduceType("ClientTrace");
rec.setTime(ms);
rec.add(Record.tagsField, chunk.getTags());
rec.add(Record.sourceField, chunk.getSource());
rec.add(Record.applicationField, chunk.getStreamName());
rec.add("actual_time",Long.toString(ms_fullresolution));
output.collect(key, rec);
} catch (ParseException e) {
log.warn("Unable to parse the date in DefaultProcessor ["
+ recordEntry + "]", e);
e.printStackTrace();
throw e;
} catch (IOException e) {
log.warn("Unable to collect output in DefaultProcessor ["
+ recordEntry + "]", e);
e.printStackTrace();
throw e;
}
}
public String getDataType() {
return recordType;
}
}