blob: 2fe78f5ee761604644f842a02acc4082556fe0cd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.logs;
import java.text.ParseException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.BaseOperator;
/**
* Parse Apache log lines one line at a time. 
* Regex (getAccessLogRegex) is used as a parser. 
* The fields extracted include i/p (outputIPAddress), url (outputUrl),
* status code (outputStatusCode), bytes (outputBytes), referer (outputReferer),
* and agent (outputAgent).
* <p>
* This is a pass through operator<br>
* <br>
* <b>StateFull : No </b><br>
* <b>Partitions : Yes</b>, No dependency among input values. <br>
* <br>
* <b>Ports</b>:<br>
* <b>data</b>: expects String<br>
* <b>outputIPAddress</b>: emits String<br>
* <b>outputUrl</b>: emits String<br>
* <b>outputStatusCode</b>: emits String<br>
* <b>outputBytes</b>: emits String<br>
* <b>outputReferer</b>: emits String<br>
* <b>outputAgent</b>: emits String<br>
* <br>
* <b>Properties</b>: none<br>
* </p>
* @displayName Apache Log Parse
* @category Tuple Converters
* @tags apache, parse
*
* @since 0.3.3
*/
@Stateless
@OperatorAnnotation(partitionable = true)
public class ApacheLogParseOperator extends BaseOperator
{
/**
* This is the input port which receives apache log lines.
*/
public final transient DefaultInputPort<String> data = new DefaultInputPort<String>()
{
@Override
public void process(String s)
{
try {
processTuple(s);
} catch (ParseException ex) {
// ignore
}
}
};
/**
* Client IP address, output port.
*/
public final transient DefaultOutputPort<String> outputIPAddress = new DefaultOutputPort<String>();
/**
* Access url port, output port.
*/
public final transient DefaultOutputPort<String> outputUrl = new DefaultOutputPort<String>();
/**
* Apache status log, output port.
*/
public final transient DefaultOutputPort<String> outputStatusCode = new DefaultOutputPort<String>();
/**
* Number of bytes served, output port.
*/
public final transient DefaultOutputPort<Long> outputBytes = new DefaultOutputPort<Long>();
/**
* Referer name, output port.
*/
public final transient DefaultOutputPort<String> outputReferer = new DefaultOutputPort<String>();
/**
* IP Agent, output port.
*/
public final transient DefaultOutputPort<String> outputAgent = new DefaultOutputPort<String>();
/**
* Get apache log pattern regex.
* @return regex string.
*/
protected static String getAccessLogRegex()
{
String regex1 = "^([\\d\\.]+)"; // Client IP
String regex2 = " (\\S+)"; // -
String regex3 = " (\\S+)"; // -
String regex4 = " \\[([\\w:/]+\\s[+\\-]\\d{4})\\]"; // Date
String regex5 = " \"[A-Z]+ (.+?) HTTP/\\S+\""; // url
String regex6 = " (\\d{3})"; // HTTP code
String regex7 = " (\\d+)"; // Number of bytes
String regex8 = " \"([^\"]+)\""; // Referer
String regex9 = " \"([^\"]+)\""; // Agent
String regex10 = ".*"; // ignore the rest
return regex1 + regex2 + regex3 + regex4 + regex5 + regex6 + regex7
+ regex8 + regex9 + regex10;
}
/**
* Parses Apache combined access log, and prints out the following <br>
* 1. Requester IP <br>
* 2. Date of Request <br>
* 3. Requested Page Path
*
* @param line
* : tuple to parsee
* @throws ParseException
*/
public void processTuple(String line) throws ParseException
{
// Apapche log attaributes on each line.
String url;
String httpStatusCode;
long numOfBytes;
String referer;
String agent;
String ipAddr;
// Parse each log line.
Pattern accessLogPattern = Pattern.compile(getAccessLogRegex(),
Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
Matcher accessLogEntryMatcher;
accessLogEntryMatcher = accessLogPattern.matcher(line);
if (accessLogEntryMatcher.matches()) {
ipAddr = accessLogEntryMatcher.group(1);
url = accessLogEntryMatcher.group(5);
httpStatusCode = accessLogEntryMatcher.group(6);
numOfBytes = Long.parseLong(accessLogEntryMatcher.group(7));
referer = accessLogEntryMatcher.group(8);
agent = accessLogEntryMatcher.group(9);
outputIPAddress.emit(ipAddr);
outputUrl.emit(url);
outputStatusCode.emit(httpStatusCode);
outputBytes.emit(numOfBytes);
outputReferer.emit(referer);
outputAgent.emit(agent);
}
}
}