Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-spot into pr/122
diff --git a/spot-oa/oa/flow/flow_oa.py b/spot-oa/oa/flow/flow_oa.py
index 53cec6b..000d9d0 100644
--- a/spot-oa/oa/flow/flow_oa.py
+++ b/spot-oa/oa/flow/flow_oa.py
@@ -34,7 +34,6 @@
from utils import Util, ProgressBar
from components.data.data import Data
from components.geoloc.geoloc import GeoLocalization
-from components.reputation.gti import gti
from impala.util import as_pandas
import time
@@ -267,37 +266,49 @@
# read configuration.
self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file))
rep_conf = json.loads(open(reputation_conf_file).read())
-
- if "gti" in rep_conf and os.path.isfile(rep_conf['gti']['refclient']):
- rep_conf = rep_conf['gti']
- # initialize gti module.
- self._logger.info("Initializing GTI component")
- flow_gti = gti.Reputation(rep_conf,self._logger)
- # get all src ips.
+ # initialize reputation services.
+ self._rep_services = []
+ self._logger.info("Initializing reputation services.")
+ for service in rep_conf:
+ config = rep_conf[service]
+ module = __import__("components.reputation.{0}.{0}".format(service), fromlist=['Reputation'])
+ self._rep_services.append(module.Reputation(config,self._logger))
+
+ if self._rep_services :
+
+ # get all src ips.
src_ip_index = self._conf["flow_score_fields"]["srcIP"]
dst_ip_index = self._conf["flow_score_fields"]["dstIP"]
- self._logger.info("Getting GTI reputation for src IPs")
flow_scores_src = iter(self._flow_scores)
# getting reputation for src IPs
src_ips = [ conn[src_ip_index] for conn in flow_scores_src ]
- src_rep_results = flow_gti.check(src_ips)
+ self._logger.info("Getting reputation for each service in config")
+ src_rep_results = {}
+ for rep_service in self._rep_services:
+ # if more than one reputation service is defined, the last ip match remains after merge
+ # Example fb: returns an entry for every ip, including unknown ones
+ # which overwrites other services that have previously returned a match. Same for dstip
+ # In future should consider a weigted merge, or UX should support multiple reps per IP
+ src_rep_results = dict(rep_service.check(src_ips).items() + src_rep_results.items())
- self._logger.info("Getting GTI reputation for dst IPs")
flow_scores_dst = iter(self._flow_scores)
# getting reputation for dst IPs
dst_ips = [ conn[dst_ip_index] for conn in flow_scores_dst ]
- dst_rep_results = flow_gti.check(dst_ips)
+ dst_rep_results = {}
+ for rep_service in self._rep_services:
+ dst_rep_results = dict(rep_service.check(dst_ips).items() + dst_rep_results.items())
+
flow_scores_final = iter(self._flow_scores)
self._flow_scores = []
flow_scores = [conn + [src_rep_results[conn[src_ip_index]]] + [dst_rep_results[conn[dst_ip_index]]] for conn in flow_scores_final ]
self._flow_scores = flow_scores
-
+
else:
# add values to gtiSrcRep and gtiDstRep.
flow_scores = iter(self._flow_scores)
@@ -460,9 +471,3 @@
else:
self._logger.info("No data found for the ingest summary")
-
-
-
-
-
-