blob: f67b85f5bb03b6c56b29846fd16c7defca377632 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.pcap.decoder;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
/**
* This class is the representation of a TCP session.
*/
public class TcpSession {
private final List<Packet> packetsFromSender;
private final List<Packet> packetsFromReceiver;
private final TcpHandshake handshake;
private final long sessionID;
private long startTime;
private long endTime;
private int packetCount;
private InetAddress srcIP;
private InetAddress dstIP;
private int srcPort;
private int dstPort;
private String srcMac;
private String dstMac;
private long synTime;
private long ackTime;
private long connectTime;
private byte[] sentData;
private byte[] receivedData;
private int sentDataSize;
private int receivedDataSize;
private boolean hasCorruptedData = false;
private static final Logger logger = LoggerFactory.getLogger(TcpSession.class);
public TcpSession (long sessionID) {
packetsFromSender = new ArrayList<>();
packetsFromReceiver = new ArrayList<>();
handshake = new TcpHandshake();
this.sessionID = sessionID;
}
/**
* This function adds a packet to the TCP session.
* @param p The Packet to be added to the session
*/
public void addPacket(Packet p) {
// Only attempt to add TCP packets to session
if (!p.getPacketType().equalsIgnoreCase("TCP")) {
return;
}
// These variables should be consistent within a TCP session
if (packetCount == 0) {
srcIP = p.getSrc_ip();
dstIP = p.getDst_ip();
srcPort = p.getSrc_port();
dstPort = p.getDst_port();
srcMac = p.getEthernetSource();
dstMac = p.getEthernetDestination();
startTime = p.getTimestamp();
} else if (p.getSessionHash() != sessionID) {
logger.warn("Attempting to add session {} to incorrect TCP session.", sessionID);
return;
}
// Add packet to appropriate list and increment the data size counter
if (p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) {
packetsFromSender.add(p);
// Increment the data size counters
if (p.getData() != null) {
sentDataSize += p.getData().length;
}
} else {
packetsFromReceiver.add(p);
if (p.getData() != null) {
receivedDataSize += p.getData().length;
}
}
// Check flags if connection is not established
if (!handshake.isConnected()) {
if (p.getSynFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) {
// This is part 1 of the TCP session handshake
// The host sends the first SYN packet
handshake.setSyn();
synTime = p.getTimestamp();
} else if (p.getSynFlag() && p.getAckFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(dstIP.getHostAddress())) {
// This condition represents the second part of the TCP Handshake,
// where the receiver sends a frame with the SYN/ACK flags set to the originator
handshake.setAck();
} else if (p.getAckFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) {
// Finally, this condition represents a successful opening of a TCP session, when the originator sends a frame with only the ACK flag set.
// At this point we finalize the session object and clear out the flags.
handshake.setAck();
ackTime = p.getTimestamp();
connectTime = ackTime - synTime;
//handshake.setConnected(sessionID);
}
} else {
/* Check for flags to close connection. Closing a TCP session is more difficult than opening a session and there are
* a lot of ways that it can go wrong. See https://accedian.com/enterprises/blog/close-tcp-sessions-diagnose-disconnections/ for references on
* closing TCP sessions.
*
* To close a session correctly, there are four steps:
* 1. Party A sends a frame to party B with the FIN flag
* 2. Party B sends a frame with an ACK flag
* 3. Party B then follows with a frame with the FIN flag set
* 4. Party A then confirms with an ACK flag.
*
* Technically, the session is closed upon the first FIN flag and resources can be released at that point. However, a lot can go wrong, so to force the closing of a
* session, either party can send a frame with a RST flag set which forces the closing of the session.
*/
if (p.getRstFlag()) {
// This is the case for a forced closed connection. Session is immediately closed.
handshake.setRst();
} else if (p.getFinFlag()) {
// This is the beginning of the normal session closure procedure. If a FIN flag has been seen, the session is basically closed even if one party continues to send
// data,
handshake.setFin();
} else if (handshake.getCurrentSessionState() == TcpHandshake.State.CLOSE_WAIT) {
}
if (p.getAckFlag() && p.getFinFlag()) {
handshake.setAck();
}
}
// Augment the packet counter
packetCount++;
if (p.isCorrupt()) {
hasCorruptedData = true;
}
// Add the start and ending time stamp. The packets are not necessarily received in order, so we have to check the timestamps this way
if (p.getTimestampMicro() < startTime) {
startTime = p.getTimestamp();
}
if (p.getTimestampMicro() > endTime) {
endTime = p.getTimestamp();
}
// Close the session if the closing handshake is complete
if (handshake.isClosed()) {
closeSession();
}
}
/**
* This function returns true if the TCP session has been established, false if not.
* @return True if the session has been established, false if not.
*/
public boolean connectionEstablished() {
return handshake.isConnected();
}
public boolean connectionClosed() {
return handshake.isClosed();
}
public void closeSession() {
logger.debug("Closing session {}", sessionID);
/* The sent and received bytes cannot be written until the session is closed.
* Upon receipt of the FIN->FIN/ACK handshake, write everything.
*
* Since it cannot be assumed that the packets were received in the correct order, we must:
* 1. Sort them by TCP Sequence Number
* 2. Write the data to the respective byte array
*/
Collections.sort(packetsFromSender);
Collections.sort(packetsFromReceiver);
sentData = new byte[sentDataSize];
receivedData = new byte[receivedDataSize];
byte[] dataFromPacket;
int dataOffset = 0;
// Now that the lists are sorted, add packet data to the lists
for (int i = 0; i < packetsFromSender.size(); i++) {
// Get the packet;
Packet p = packetsFromSender.get(i);
dataFromPacket = p.getData();
if (dataFromPacket != null) {
for (int j = 0; j < dataFromPacket.length; j++) {
sentData[dataOffset] = dataFromPacket[j];
dataOffset++;
}
}
}
dataOffset = 0;
for (int i = 0; i < packetsFromReceiver.size(); i++) {
// Get the packet;
Packet p = packetsFromReceiver.get(i);
dataFromPacket = p.getData();
if (dataFromPacket != null) {
for (int j = 0; j < dataFromPacket.length; j++) {
receivedData[dataOffset] = dataFromPacket[j];
dataOffset++;
}
}
}
}
public Instant getSessionStartTime() {
return Instant.ofEpochMilli(startTime);
}
public Period getSessionDuration() {
return new Period(endTime - startTime);
}
public Period getConnectionTime() {
return new Period(connectTime);
}
public Instant getSessionEndTime() {
return Instant.ofEpochMilli(endTime);
}
public String getSrcMac() {
return srcMac;
}
public String getDstMac() {
return dstMac;
}
public String getSrcIP() {
return srcIP.getHostAddress();
}
public String getDstIP() {
return dstIP.getHostAddress();
}
public int getSrcPort() {
return srcPort;
}
public int getDstPort() {
return dstPort;
}
public long getSessionID() {
return sessionID;
}
public int getPacketCount() {
return packetsFromReceiver.size() + packetsFromSender.size();
}
public int getPacketCountFromOrigin() { return packetsFromSender.size(); }
public int getPacketCountFromRemote() { return packetsFromReceiver.size(); }
public boolean hasCorruptedData() {
return hasCorruptedData;
}
public int getDataVolumeFromOrigin() {
return sentData.length;
}
public int getDataVolumeFromRemote() {
return receivedData.length;
}
public byte[] getDataFromOriginator() {
return sentData;
}
public String getDataFromOriginatorAsString() {
return parseBytesToASCII(sentData);
}
public byte[] getDataFromRemote() {
return receivedData;
}
public String getDataFromRemoteAsString() {
return parseBytesToASCII(receivedData);
}
}