blob: 2ae539aa4da21896b4eea14047b3e316de80cb9b [file] [log] [blame]
application {
name = Spot flow ingest
batch.milliseconds = 5000
executors = 1
executor.cores = 1
executor.memory = 1g
spark.conf.hive.exec.dynamic.partition = true
spark.conf.hive.exec.dynamic.partition.mode = nonstrict
}
steps {
flow_received {
input {
type = kafka
brokers = "kafka-broker.yourdomain.com:9092"
topic = spot_flow
encoding = string
translator {
type = delimited
field.names = [tr,try,trm,trd,tr_h,tr_m,tr_s,td,sa,da,sp,dp,pr,flg,fwd,
stos,ipkt,ibyt,opkt,obyt,in,out,sas,das,dtos,dir,ra]
field.types = [string,string,string,string,string,string,string,string,string,
string,string,string,string,string,string,string,string,string,
string,string,string,string,string,string,string,string,string]
delimiter = ","
append.raw.enabled = true
append.raw.value.field.name = raw_value
}
}
}
// mapping from https://www.sans.org/reading-room/whitepapers/incident/netflow-collection-analysis-nfcapd-python-splunk-35747 (page 21)
flow_process {
dependencies = [flow_received]
deriver {
type = sql
query.literal = """
SELECT
'flow' as type,
raw_value as raw,
unix_timestamp(tr, "yyyy-MM-dd HH:mm:ss") as event_time,
td as duration,
sa as src_ip4_str,
da as dst_ip4_str,
sp as src_port,
dp as dst_port,
pr as n_proto,
flg as net_flags,
fwd as code,
stos as service,
ipkt as flow_in_packets,
ibyt as in_bytes,
opkt as flow_out_packets,
obyt as out_bytes,
in as flow_input,
out as flow_output,
sas as src_asn,
das as dst_asn,
dtos as xref,
dir as net_direction,
ra as dvc_host,
'SOME_VENDOR' as p_dvc_vendor,
'SOME_DEVICE_TYPE' as p_dvc_type,
FROM_UNIXTIME(unix_timestamp(tr, "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd") as p_dt
FROM flow_received"""
}
planner {
type = append
}
output {
type = hive
table = "spot.event"
align.columns = true
}
}
}