| application { |
| name = Spot DNS ingest |
| batch.milliseconds = 5000 |
| executors = 1 |
| executor.cores = 1 |
| executor.memory = 1g |
| spark.conf.hive.exec.dynamic.partition = true |
| spark.conf.hive.exec.dynamic.partition.mode = nonstrict |
| } |
| |
| steps { |
| dns_received { |
| input { |
| type = kafka |
| brokers = "kafka-broker.yourdomain.com:9092" |
| topic = spot_dns |
| encoding = string |
| translator { |
| type = delimited |
| delimiter = "," |
| field.names = [frame_day,frame_time,unix_timestamp,frame_len,ip_src,ip_dst,dns_qry_name, |
| dns_qry_type,dns_qry_class,dns_qry_rcode,dns_a] |
| field.types = [string,string,string,string,string,string,string,string,string,string,string] |
| append.raw.enabled = true |
| append.raw.value.field.name = raw_value |
| } |
| } |
| } |
| |
| dns_formatted { |
| dependencies = [dns_received] |
| deriver { |
| type = sql |
| query.literal = """ |
| SELECT |
| 'dns' as type, |
| raw_value as raw, |
| unix_timestamp(concat(frame_day, frame_time), "MMM dd yyyy HH:mm:ss") as event_time, |
| ip_src as src_ip4_str, |
| ip_dst as dst_ip4_str, |
| dns_qry_class as dns_class, |
| frame_len as dns_len, |
| dns_qry_name as dns_query, |
| dns_qry_rcode as dns_response_code, |
| dns_a as dns_answers, |
| dns_qry_type as dns_type, |
| 'SOME_VENDOR' as p_dvc_vendor, |
| 'SOME_DEVICE_TYPE' as p_dvc_type, |
| FROM_UNIXTIME(unix_timestamp(concat(frame_day, frame_time), |
| "MMM dd yyyy HH:mm:ss"), "yyyyMMdd") as p_dt |
| FROM dns_received""" |
| } |
| planner { |
| type = append |
| } |
| output { |
| type = hive |
| table = "spot.event" |
| align.columns = true |
| } |
| } |
| } |