blob: 000d9d014e3c893c456ce506bf33f1ffdf2e0436 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import shutil
import os
import sys
import json
import numpy as np
import linecache, bisect
import csv, math
import pandas as pd
import subprocess
import numbers
import api.resources.hdfs_client as HDFSClient
import api.resources.impala_engine as impala
from collections import OrderedDict
from multiprocessing import Process
from utils import Util, ProgressBar
from components.data.data import Data
from components.geoloc.geoloc import GeoLocalization
from impala.util import as_pandas
import time
class OA(object):
def __init__(self,date,limit=500,logger=None):
self._initialize_members(date,limit,logger)
def _initialize_members(self,date,limit,logger):
# get logger if exists. if not, create new instance.
self._logger = logging.getLogger('OA.Flow') if logger else Util.get_logger('OA.Flow',create_file=False)
# initialize required parameters.
self._scrtip_path = os.path.dirname(os.path.abspath(__file__))
self._date = date
self._table_name = "flow"
self._flow_results = []
self._limit = limit
self._data_path = None
self._ipynb_path = None
self._ingest_summary_path = None
self._flow_scores = []
self._results_delimiter = '\t'
# get app configuration.
self._spot_conf = Util.get_spot_conf()
# # get scores fields conf
conf_file = "{0}/flow_conf.json".format(self._scrtip_path)
self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict)
# initialize data engine
self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
def start(self):
####################
start = time.time()
####################
self._create_folder_structure()
self._clear_previous_executions()
self._add_ipynb()
self._get_flow_results()
self._add_network_context()
self._add_geo_localization()
self._add_reputation()
self._create_flow_scores()
self._get_oa_details()
self._ingest_summary()
##################
end = time.time()
print(end - start)
##################
def _clear_previous_executions(self):
self._logger.info("Cleaning data from previous executions for the day")
yr = self._date[:4]
mn = self._date[4:6]
dy = self._date[6:]
table_schema = []
HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '')
table_schema=['suspicious', 'edge','chords','threat_investigation', 'timeline', 'storyboard', 'summary' ]
for path in table_schema:
HDFSClient.delete_folder("{0}/{1}/hive/oa/{2}/y={3}/m={4}/d={5}".format(HUSER,self._table_name,path,yr,int(mn),int(dy)),user="impala")
impala.execute_query("invalidate metadata")
#removes Feedback file
HDFSClient.delete_folder("{0}/{1}/scored_results/{2}{3}{4}/feedback/ml_feedback.csv".format(HUSER,self._table_name,yr,mn,dy))
#removes json files from the storyboard
HDFSClient.delete_folder("{0}/{1}/oa/{2}/{3}/{4}/{5}".format(HUSER,self._table_name,"storyboard",yr,mn,dy))
def _create_folder_structure(self):
self._logger.info("Creating folder structure for OA (data and ipynb)")
self._data_path,self._ingest_summary_path,self._ipynb_path = Util.create_oa_folders("flow",self._date)
def _add_ipynb(self):
if os.path.isdir(self._ipynb_path):
self._logger.info("Adding the advanced mode IPython Notebook")
shutil.copy("{0}/ipynb_templates/Advanced_Mode_master.ipynb".format(self._scrtip_path),"{0}/Advanced_Mode.ipynb".format(self._ipynb_path))
self._logger.info("Adding threat investigation IPython Notebook")
shutil.copy("{0}/ipynb_templates/Threat_Investigation_master.ipynb".format(self._scrtip_path),"{0}/Threat_Investigation.ipynb".format(self._ipynb_path))
else:
self._logger.error("There was a problem adding the IPython Notebooks, please check the directory exists.")
def _get_flow_results(self):
self._logger.info("Getting {0} Machine Learning Results from HDFS".format(self._date))
flow_results = "{0}/flow_results.csv".format(self._data_path)
# get hdfs path from conf file
HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '')
hdfs_path = "{0}/flow/scored_results/{1}/scores/flow_results.csv".format(HUSER,self._date)
# get results file from hdfs
get_command = Util.get_ml_results_form_hdfs(hdfs_path,self._data_path)
self._logger.info("{0}".format(get_command))
# valdiate files exists
if os.path.isfile(flow_results):
# read number of results based in the limit specified.
self._logger.info("Reading {0} flow results file: {1}".format(self._date,flow_results))
self._flow_results = Util.read_results(flow_results,self._limit,self._results_delimiter)
if len(self._flow_results) == 0: self._logger.error("There are not flow results.");sys.exit(1)
else:
self._logger.error("There was an error getting ML results from HDFS")
sys.exit(1)
# filter results add rank.
self._logger.info("Filtering required columns based on configuration")
self._flow_scores.extend([ [ conn[i] for i in self._conf['column_indexes_filter'] ] + [n] for n, conn in enumerate(self._flow_results) ])
def _create_flow_scores(self):
# get date parameters.
yr = self._date[:4]
mn = self._date[4:6]
dy = self._date[6:]
value_string = ""
for row in self._flow_scores:
value_string += str(tuple(Util.cast_val(item) for item in row)) + ","
load_into_impala = ("""
INSERT INTO {0}.flow_scores partition(y={2}, m={3}, d={4}) VALUES {1}
""").format(self._db, value_string[:-1], yr, mn, dy)
impala.execute_query(load_into_impala)
def _add_network_context(self):
# use ipranges to see if the IPs are internals.
ip_ranges_file = "{0}/context/ipranges.csv".format(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# add values to srcIpInternal and destIpInternal.
flow_scores = iter(self._flow_scores)
if os.path.isfile(ip_ranges_file):
self._logger.info("Start adding network context...")
# get ranges from configuration file.
self._logger.info("Reading network context file: {0}".format(ip_ranges_file))
with open(ip_ranges_file, 'rb') as f:
nc_ranges = [ map(Util.ip_to_int,line.strip('\n').split(',')) for line in f ]
# get src and dst IPs
src_ip_index = self._conf["flow_score_fields"]["srcIP"]
dst_ip_index = self._conf["flow_score_fields"]["dstIP"]
# add networkcontext per connection.
ip_internal_ranges = filter(None,nc_ranges)
self._logger.info("Adding networkcontext to suspicious connections.")
self._flow_scores = [ conn + [ self._is_ip_internal(conn[src_ip_index],ip_internal_ranges)]+[ self._is_ip_internal(conn[dst_ip_index],ip_internal_ranges)] for conn in flow_scores]
else:
self._flow_scores = [ conn + [0,0] for conn in flow_scores ]
self._logger.info("WARNING: Network context was not added because the file ipranges.csv does not exist.")
def _is_ip_internal(self,ip, ranges):
result = 0
for row in ranges:
if Util.ip_to_int(ip) >= row[0] and Util.ip_to_int(ip) <= row[1]:
result = 1
break
return result
def _add_geo_localization(self):
# use ipranges to see if the IPs are internals.
iploc_file = "{0}/context/iploc.csv".format(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
self._logger.info("Adding geo localization headers")
# add values to srcIpInternal and destIpInternal.
flow_scores = iter(self._flow_scores)
if os.path.isfile(iploc_file):
self._logger.info("Initializing geo localization component")
geo = GeoLocalization(iploc_file,self._logger)
src_ip_index = self._conf["flow_score_fields"]["srcIP"]
dst_ip_index = self._conf["flow_score_fields"]["dstIP"]
self._logger.info("Adding geo localization...")
self._flow_scores = []
for conn in flow_scores:
# get geo localizastin for src ip
self._logger.debug("Searching geo for src ip {0}".format(conn[src_ip_index]))
src_geo_dict = geo.get_ip_geo_localization(conn[src_ip_index])
# get goe localization for dst ip.
self._logger.debug("Searching geo for dst ip {0}".format(conn[dst_ip_index]))
dst_geo_dict = geo.get_ip_geo_localization(conn[dst_ip_index])
# adding columns to the current connection list.
conn.extend([src_geo_dict["geo_loc"],dst_geo_dict["geo_loc"],src_geo_dict["domain"],dst_geo_dict["domain"]])
self._flow_scores.extend([conn])
else:
self._flow_scores = [ conn + ["","","",""] for conn in flow_scores ]
self._logger.info("WARNING: IP location was not added because the file {0} does not exist.".format(iploc_file))
def _add_reputation(self):
reputation_conf_file = "{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# read configuration.
self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file))
rep_conf = json.loads(open(reputation_conf_file).read())
# initialize reputation services.
self._rep_services = []
self._logger.info("Initializing reputation services.")
for service in rep_conf:
config = rep_conf[service]
module = __import__("components.reputation.{0}.{0}".format(service), fromlist=['Reputation'])
self._rep_services.append(module.Reputation(config,self._logger))
if self._rep_services :
# get all src ips.
src_ip_index = self._conf["flow_score_fields"]["srcIP"]
dst_ip_index = self._conf["flow_score_fields"]["dstIP"]
flow_scores_src = iter(self._flow_scores)
# getting reputation for src IPs
src_ips = [ conn[src_ip_index] for conn in flow_scores_src ]
self._logger.info("Getting reputation for each service in config")
src_rep_results = {}
for rep_service in self._rep_services:
# if more than one reputation service is defined, the last ip match remains after merge
# Example fb: returns an entry for every ip, including unknown ones
# which overwrites other services that have previously returned a match. Same for dstip
# In future should consider a weigted merge, or UX should support multiple reps per IP
src_rep_results = dict(rep_service.check(src_ips).items() + src_rep_results.items())
flow_scores_dst = iter(self._flow_scores)
# getting reputation for dst IPs
dst_ips = [ conn[dst_ip_index] for conn in flow_scores_dst ]
dst_rep_results = {}
for rep_service in self._rep_services:
dst_rep_results = dict(rep_service.check(dst_ips).items() + dst_rep_results.items())
flow_scores_final = iter(self._flow_scores)
self._flow_scores = []
flow_scores = [conn + [src_rep_results[conn[src_ip_index]]] + [dst_rep_results[conn[dst_ip_index]]] for conn in flow_scores_final ]
self._flow_scores = flow_scores
else:
# add values to gtiSrcRep and gtiDstRep.
flow_scores = iter(self._flow_scores)
self._flow_scores = [ conn + ["",""] for conn in flow_scores ]
self._logger.info("WARNING: IP reputation was not added. No refclient configured")
def _get_oa_details(self):
self._logger.info("Getting OA Flow suspicious details/chord diagram")
# start suspicious connects details process.
p_sp = Process(target=self._get_suspicious_details)
p_sp.start()
# start chord diagram process.
p_ch = Process(target=self._get_chord_details)
p_ch.start()
p_sp.join()
p_ch.join()
def _get_suspicious_details(self,bar=None):
# skip header
sp_connections = iter(self._flow_scores)
# loop connections.
connections_added = []
for conn in sp_connections:
# validate if the connection's details are not already extracted.
if conn in connections_added:
continue
else:
connections_added.append(conn)
src_ip_index = self._conf["flow_score_fields"]["srcIP"]
dst_ip_index = self._conf["flow_score_fields"]["dstIP"]
# get src ip
sip = conn[src_ip_index]
# get dst ip
dip = conn[dst_ip_index]
# get hour and date (i.e. 2014-07-08 10:10:40)
date_array = conn[0].split(' ')
date_array_1 = date_array[0].split('-')
date_array_2 = date_array[1].split(':')
yr = date_array_1[0]
dy = date_array_1[2]
mh = date_array_1[1]
hr = date_array_2[0]
mm = date_array_2[1]
query_to_load = ("""
INSERT INTO TABLE {0}.flow_edge PARTITION (y={2}, m={3}, d={4})
SELECT treceived as tstart,sip as srcip,dip as dstip,sport as sport,dport as dport,proto as proto,flag as flags,
stos as tos,ibyt as ibyt,ipkt as ipkt, input as input, output as output,rip as rip, obyt as obyt,
opkt as opkt, h as hh, trminute as mn from {0}.{1} where ((sip='{7}' AND dip='{8}') or (sip='{8}' AND dip='{7}'))
AND y={2} AND m={3} AND d={4} AND h={5} AND trminute={6};
""").format(self._db,self._table_name,yr, mh, dy, hr, mm, sip,dip)
impala.execute_query(query_to_load)
def _get_chord_details(self,bar=None):
# skip header
sp_connections = iter(self._flow_scores)
src_ip_index = self._conf["flow_score_fields"]["srcIP"]
dst_ip_index = self._conf["flow_score_fields"]["dstIP"]
# get date parameters.
yr = self._date[:4]
mn = self._date[4:6]
dy = self._date[6:]
# get number of times each IP appears.
srcdict = {}
for conn in sp_connections:
if conn[src_ip_index] in srcdict:srcdict[conn[src_ip_index]] += 1
else:srcdict[conn[src_ip_index]] = 1
if conn[dst_ip_index] in srcdict:srcdict[conn[dst_ip_index]] += 1
else:srcdict[conn[dst_ip_index]] = 1
for (ip,n) in srcdict.items():
if n > 1:
ip_list = []
sp_connections = iter(self._flow_scores)
for row in sp_connections:
if ip == row[1] : ip_list.append(row[2])
if ip == row[2] :ip_list.append(row[1])
ips = list(set(ip_list))
if len(ips) > 1:
ips_filter = (",".join(str("'{0}'".format(ip)) for ip in ips))
query_to_load = ("""
INSERT INTO TABLE {0}.flow_chords PARTITION (y={2}, m={3}, d={4})
SELECT '{5}' as ip_threat, sip as srcip, dip as dstip, SUM(ibyt) as ibyt, SUM(ipkt) as ipkt from {0}.{1} where y={2} and m={3}
and d={4} and ((sip='{5}' and dip IN({6})) or (sip IN({6}) and dip='{5}')) group by sip,dip,m,d;
""").format(self._db,self._table_name,yr,mn,dy,ip,ips_filter)
impala.execute_query(query_to_load)
def _ingest_summary(self):
# get date parameters.
yr = self._date[:4]
mn = self._date[4:6]
dy = self._date[6:]
self._logger.info("Getting ingest summary data for the day")
ingest_summary_cols = ["date","total"]
result_rows = []
df_filtered = pd.DataFrame()
# get ingest summary.
query_to_load=("""
SELECT tryear, trmonth, trday, trhour, trminute, COUNT(*) as total
FROM {0}.{1} WHERE y={2} AND m={3} AND d={4}
AND unix_tstamp IS NOT NULL
AND sip IS NOT NULL
AND sport IS NOT NULL
AND dip IS NOT NULL
AND dport IS NOT NULL
AND ibyt IS NOT NULL
AND ipkt IS NOT NULL
AND tryear={2}
AND cast(treceived as timestamp) IS NOT NULL
GROUP BY tryear, trmonth, trday, trhour, trminute;
""").format(self._db,self._table_name, yr, mn, dy)
results = impala.execute_query(query_to_load)
if results:
df_results = as_pandas(results)
#Forms a new dataframe splitting the minutes from the time column
df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(val['tryear'],val['trmonth'],val['trday'], val['trhour'], val['trminute']), int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in df_results.iterrows()],columns = ingest_summary_cols)
value_string = ''
#Groups the data by minute
sf = df_new.groupby(by=['date'])['total'].sum()
df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
df_final = df_filtered.append(df_per_min, ignore_index=True).to_records(False,False)
if len(df_final) > 0:
query_to_insert=("""
INSERT INTO {0}.flow_ingest_summary PARTITION (y={1}, m={2}, d={3}) VALUES {4};
""").format(self._db, yr, mn, dy, tuple(df_final))
impala.execute_query(query_to_insert)
else:
self._logger.info("No data found for the ingest summary")