blob: 5553cc5df5229f60b3e7769f07ed2fd37f3f4a27 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.logs;
import java.io.IOException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.BaseOperator;
/**
* Parse Apache log lines one line at a time. 
* Regex (getAccessLogRegex) is used as a parser. 
* The fields extracted include i/p (outputIPAddress), url (outputUrl),
* status code (outputStatusCode), bytes (outputBytes), referer (outputReferer),
* and agent (outputAgent). Each of the fields are emitted to a separate output port.
* <p>
* Please refer to docs for {@link org.apache.apex.malhar.lib.logs.ApacheLogParseOperator} documentation.
* More output ports in this operator.
* </p>
* @displayName Apache Virtual Log Parse
* @category Tuple Converters
* @tags apache, parse
*
* @since 0.3.2
*/
@Stateless
public class ApacheVirtualLogParseOperator extends BaseOperator
{
// default date format
protected static final String dateFormat = "dd/MMM/yyyy:HH:mm:ss Z";
/**
*
*/
public final transient DefaultInputPort<String> data = new DefaultInputPort<String>()
{
@Override
public void process(String s)
{
try {
processTuple(s);
} catch (ParseException ex) {
// ignore
}
}
};
/**
* This output port emits the IPAddresses contained in log file lines.
*/
public final transient DefaultOutputPort<String> outputIPAddress = new DefaultOutputPort<String>();
/**
* This output port emits URLs contained in log file lines.
*/
public final transient DefaultOutputPort<String> outputUrl = new DefaultOutputPort<String>();
/**
* This output port emits status codes contained in log file lines.
*/
public final transient DefaultOutputPort<String> outputStatusCode = new DefaultOutputPort<String>();
/**
* This output pot emits a Map for each log file line,
* which contains all the information extracted from the log file line.
*/
public final transient DefaultOutputPort<Map<String, Integer>> outputBytes =
new DefaultOutputPort<Map<String, Integer>>();
/**
* This output port emits the referers contained in the log file lines.
*/
public final transient DefaultOutputPort<String> outputReferer = new DefaultOutputPort<String>();
/**
* This output port emits the agents contained in the log file lines.
*/
public final transient DefaultOutputPort<String> outputAgent = new DefaultOutputPort<String>();
/**
* This output port emits the servernames contained in the log file lines.
*/
public final transient DefaultOutputPort<String> outputServerName = new DefaultOutputPort<String>();
/**
* This output port emits the servernames contained in the log file lines.
*/
public final transient DefaultOutputPort<String> outputServerName1 = new DefaultOutputPort<String>();
/**
* This output port emits the status codes corresponding to each url in a log file line.
*/
public final transient DefaultOutputPort<Map<String, String>> outUrlStatus =
new DefaultOutputPort<Map<String, String>>();
/**
* This output port emits the status associated with each server in a log file line.
*/
public final transient DefaultOutputPort<Map<String, String>> outServerStatus =
new DefaultOutputPort<Map<String, String>>();
/**
* This output port emits client data usage contained in log file lines.
*/
public final transient DefaultOutputPort<Integer> clientDataUsage = new DefaultOutputPort<Integer>();
/**
* This output port emits the view counts contained in log file lines.
*/
public final transient DefaultOutputPort<Integer> viewCount = new DefaultOutputPort<Integer>();
protected static String getAccessLogRegex()
{
String regex0 = "^([^\"]+)";
String regex1 = " ([\\d\\.]+)"; // Client IP
String regex2 = " (\\S+)"; // -
String regex3 = " (\\S+)"; // -
String regex4 = " \\[([\\w:/]+\\s[+\\-]\\d{4})\\]"; // Date
String regex5 = " \"[A-Z]+ (.+?) HTTP/\\S+\""; // url
String regex6 = " (\\d{3})"; // HTTP code
String regex7 = " (\\d+)"; // Number of bytes
String regex8 = " \"([^\"]+)\""; // Referer
String regex9 = " \"([^\"]+)\""; // Agent
String regex10 = ".*"; // ignore the rest
return regex0 + regex1 + regex2 + regex3 + regex4 + regex5 + regex6 + regex7 + regex8 + regex9 + regex10;
}
/**
* Parses Apache combined access log, and prints out the following <br>1.
* Requester IP <br>2. Date of Request <br>3. Requested Page Path
*
* @param line : tuple to parsee
* @throws ParseException
* @throws IOException
*/
public void processTuple(String line) throws ParseException
{
// Apache log properties.
String url;
String httpStatusCode;
long numOfBytes;
String referer;
String agent;
String ipAddr;
String serverName;
// Parser log.
Pattern accessLogPattern = Pattern.compile(getAccessLogRegex(), Pattern.CASE_INSENSITIVE
| Pattern.DOTALL);
Matcher accessLogEntryMatcher;
accessLogEntryMatcher = accessLogPattern.matcher(line);
if (accessLogEntryMatcher.matches()) {
serverName = accessLogEntryMatcher.group(1);
ipAddr = accessLogEntryMatcher.group(2);
url = accessLogEntryMatcher.group(6);
httpStatusCode = accessLogEntryMatcher.group(7);
numOfBytes = Long.parseLong(accessLogEntryMatcher.group(8));
referer = accessLogEntryMatcher.group(9);
agent = accessLogEntryMatcher.group(10);
outputIPAddress.emit(ipAddr);
outputUrl.emit(url);
outputStatusCode.emit(httpStatusCode);
Map<String, Integer> ipdata = new HashMap<String, Integer>();
ipdata.put(ipAddr, (int)numOfBytes);
outputBytes.emit(ipdata);
outputReferer.emit(referer);
outputAgent.emit(agent);
outputServerName.emit(serverName);
outputServerName1.emit(serverName);
HashMap<String, String> urlStatus = new HashMap<String, String>();
urlStatus.put(url, httpStatusCode);
outUrlStatus.emit(urlStatus);
HashMap<String, String> serverStatus = new HashMap<String, String>();
serverStatus.put(serverName, httpStatusCode);
outServerStatus.emit(serverStatus);
clientDataUsage.emit((int)numOfBytes);
viewCount.emit(new Integer(1));
}
}
}