| application { |
| name = Spot flow ingest |
| batch.milliseconds = 5000 |
| executors = 1 |
| executor.cores = 1 |
| executor.memory = 1g |
| spark.conf.hive.exec.dynamic.partition = true |
| spark.conf.hive.exec.dynamic.partition.mode = nonstrict |
| } |
| |
| steps { |
| flow_received { |
| input { |
| type = kafka |
| brokers = "kafka-broker.yourdomain.com:9092" |
| topic = spot_flow |
| encoding = string |
| translator { |
| type = delimited |
| field.names = [tr,try,trm,trd,tr_h,tr_m,tr_s,td,sa,da,sp,dp,pr,flg,fwd, |
| stos,ipkt,ibyt,opkt,obyt,in,out,sas,das,dtos,dir,ra] |
| field.types = [string,string,string,string,string,string,string,string,string, |
| string,string,string,string,string,string,string,string,string, |
| string,string,string,string,string,string,string,string,string] |
| delimiter = "," |
| append.raw.enabled = true |
| append.raw.value.field.name = raw_value |
| } |
| } |
| } |
| |
| // mapping from https://www.sans.org/reading-room/whitepapers/incident/netflow-collection-analysis-nfcapd-python-splunk-35747 (page 21) |
| flow_process { |
| dependencies = [flow_received] |
| deriver { |
| type = sql |
| query.literal = """ |
| SELECT |
| 'flow' as type, |
| raw_value as raw, |
| unix_timestamp(tr, "yyyy-MM-dd HH:mm:ss") as event_time, |
| td as duration, |
| sa as src_ip4_str, |
| da as dst_ip4_str, |
| sp as src_port, |
| dp as dst_port, |
| pr as n_proto, |
| flg as net_flags, |
| fwd as code, |
| stos as service, |
| ipkt as flow_in_packets, |
| ibyt as in_bytes, |
| opkt as flow_out_packets, |
| obyt as out_bytes, |
| in as flow_input, |
| out as flow_output, |
| sas as src_asn, |
| das as dst_asn, |
| dtos as xref, |
| dir as net_direction, |
| ra as dvc_host, |
| 'SOME_VENDOR' as p_dvc_vendor, |
| 'SOME_DEVICE_TYPE' as p_dvc_type, |
| FROM_UNIXTIME(unix_timestamp(tr, "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd") as p_dt |
| FROM flow_received""" |
| } |
| planner { |
| type = append |
| } |
| output { |
| type = hive |
| table = "spot.event" |
| align.columns = true |
| } |
| } |
| } |