blob: e8c0bc9108c454c98e3cbf336b9798ca2ae0d61d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.parsing.parsers;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.apache.metron.parser.interfaces.MessageParser;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
import org.krakenapps.pcap.decoder.ethernet.EthernetType;
import org.krakenapps.pcap.decoder.ip.IpDecoder;
import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
import org.krakenapps.pcap.decoder.tcp.TcpPacket;
import org.krakenapps.pcap.decoder.udp.UdpPacket;
import org.krakenapps.pcap.file.GlobalHeader;
import org.krakenapps.pcap.packet.PacketHeader;
import org.krakenapps.pcap.packet.PcapPacket;
import org.krakenapps.pcap.util.Buffer;
import org.apache.metron.pcap.Constants;
import org.apache.metron.pcap.MetronEthernetDecoder;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapByteInputStream;
public class PcapParser implements MessageParser<JSONObject>, Serializable {
private static final Logger LOG = Logger.getLogger(PcapParser.class);
private EthernetDecoder ethernetDecoder;
private long timePrecisionDivisor = 1L;
public PcapParser withTsPrecision(String tsPrecision) {
if (tsPrecision.equalsIgnoreCase("MILLI")) {
//Convert nanos to millis
LOG.info("Configured for MILLI, setting timePrecisionDivisor to 1000000L" );
timePrecisionDivisor = 1000000L;
} else if (tsPrecision.equalsIgnoreCase("MICRO")) {
//Convert nanos to micro
LOG.info("Configured for MICRO, setting timePrecisionDivisor to 1000L" );
timePrecisionDivisor = 1000L;
} else if (tsPrecision.equalsIgnoreCase("NANO")) {
//Keep nano as is.
LOG.info("Configured for NANO, setting timePrecisionDivisor to 1L" );
timePrecisionDivisor = 1L;
} else {
LOG.info("bolt.parser.ts.precision not set. Default to NANO");
timePrecisionDivisor = 1L;
}
return this;
}
@Override
public void init() {
ethernetDecoder = new MetronEthernetDecoder();
IpDecoder ipDecoder = new IpDecoder();
ethernetDecoder.register(EthernetType.IPV4, ipDecoder);
}
@Override
public List<JSONObject> parse(byte[] pcap) {
List<JSONObject> messages = new ArrayList<>();
List<PacketInfo> packetInfoList = new ArrayList<>();
try {
packetInfoList = getPacketInfo(pcap);
} catch (IOException e) {
e.printStackTrace();
}
for (PacketInfo packetInfo : packetInfoList) {
JSONObject message = (JSONObject) JSONValue.parse(packetInfo.getJsonIndexDoc());
messages.add(message);
}
return messages;
}
@Override
public boolean validate(JSONObject message) {
return true;
}
/**
* Parses the.
*
* @param pcap
* the pcap
* @return the list * @throws IOException Signals that an I/O exception has
* occurred. * @throws IOException * @throws IOException * @throws
* IOException
* @throws IOException
* Signals that an I/O exception has occurred.
*/
public List<PacketInfo> getPacketInfo(byte[] pcap) throws IOException {
List<PacketInfo> packetInfoList = new ArrayList<PacketInfo>();
PcapByteInputStream pcapByteInputStream = new PcapByteInputStream(pcap);
GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
while (true) {
try
{
PcapPacket packet = pcapByteInputStream.getPacket();
// int packetCounter = 0;
// PacketHeader packetHeader = null;
// Ipv4Packet ipv4Packet = null;
TcpPacket tcpPacket = null;
UdpPacket udpPacket = null;
// Buffer packetDataBuffer = null;
int sourcePort = 0;
int destinationPort = 0;
// LOG.trace("Got packet # " + ++packetCounter);
// LOG.trace(packet.getPacketData());
ethernetDecoder.decode(packet);
PacketHeader packetHeader = packet.getPacketHeader();
Ipv4Packet ipv4Packet = Ipv4Packet.parse(packet.getPacketData());
if (ipv4Packet.getProtocol() == Constants.PROTOCOL_TCP) {
tcpPacket = TcpPacket.parse(ipv4Packet);
}
if (ipv4Packet.getProtocol() == Constants.PROTOCOL_UDP) {
Buffer packetDataBuffer = ipv4Packet.getData();
sourcePort = packetDataBuffer.getUnsignedShort();
destinationPort = packetDataBuffer.getUnsignedShort();
udpPacket = new UdpPacket(ipv4Packet, sourcePort, destinationPort);
udpPacket.setLength(packetDataBuffer.getUnsignedShort());
udpPacket.setChecksum(packetDataBuffer.getUnsignedShort());
packetDataBuffer.discardReadBytes();
udpPacket.setData(packetDataBuffer);
}
packetInfoList.add(new PacketInfo(globalHeader, packetHeader, packet,
ipv4Packet, tcpPacket, udpPacket));
} catch (NegativeArraySizeException ignored) {
LOG.debug("Ignorable exception while parsing packet.", ignored);
} catch (EOFException eof) { // $codepro.audit.disable logExceptions
// Ignore exception and break
break;
}
}
return packetInfoList;
}
/**
* The main method.
*
* @param args
* the arguments
* @throws IOException
* Signals that an I/O exception has occurred.
* @throws InterruptedException
* the interrupted exception
*/
public static void main(String[] args) throws IOException,
InterruptedException {
double totalIterations = 1000000;
double parallelism = 64;
double targetEvents = 1000000;
PcapParser pcapParser = new PcapParser();
File fin = new File("/Users/sheetal/Downloads/bad_packets/bad_packet_1405988125427.pcap");
File fout = new File(fin.getAbsolutePath() + ".parsed");
byte[] pcapBytes = FileUtils.readFileToByteArray(fin);
long startTime = System.currentTimeMillis();
for (int i = 0; i < totalIterations; i++) {
List<PacketInfo> list = pcapParser.getPacketInfo(pcapBytes);
for (PacketInfo packetInfo : list) {
System.out.println(packetInfo.getJsonIndexDoc());
}
}
long endTime = System.currentTimeMillis();
System.out.println("Time taken to process " + totalIterations + " events :"
+ (endTime - startTime) + " milliseconds");
System.out
.println("With parallelism of "
+ parallelism
+ " estimated time to process "
+ targetEvents
+ " events: "
+ (((((endTime - startTime) / totalIterations) * targetEvents) / parallelism) / 1000)
+ " seconds");
System.out.println("With parallelism of " + parallelism
+ " estimated # of events per second: "
+ ((parallelism * 1000 * totalIterations) / (endTime - startTime))
+ " events");
System.out.println("Expected Parallelism to process " + targetEvents
+ " events in a second: "
+ (targetEvents / ((1000 * totalIterations) / (endTime - startTime))));
}
}