blob: 834e36a5cebaa5431b1abd87f447896df6f36883 [file] [log] [blame]
application {
name = Spot DNS ingest
batch.milliseconds = 5000
executors = 1
executor.cores = 1
executor.memory = 1g
spark.conf.hive.exec.dynamic.partition = true
spark.conf.hive.exec.dynamic.partition.mode = nonstrict
}
steps {
dns_received {
input {
type = kafka
brokers = "kafka-broker.yourdomain.com:9092"
topic = spot_dns
encoding = string
translator {
type = delimited
delimiter = ","
field.names = [frame_day,frame_time,unix_timestamp,frame_len,ip_src,ip_dst,dns_qry_name,
dns_qry_type,dns_qry_class,dns_qry_rcode,dns_a]
field.types = [string,string,string,string,string,string,string,string,string,string,string]
append.raw.enabled = true
append.raw.value.field.name = raw_value
}
}
}
dns_formatted {
dependencies = [dns_received]
deriver {
type = sql
query.literal = """
SELECT
'dns' as type,
raw_value as raw,
unix_timestamp(concat(frame_day, frame_time), "MMM dd yyyy HH:mm:ss") as event_time,
ip_src as src_ip4_str,
ip_dst as dst_ip4_str,
dns_qry_class as dns_class,
frame_len as dns_len,
dns_qry_name as dns_query,
dns_qry_rcode as dns_response_code,
dns_a as dns_answers,
dns_qry_type as dns_type,
'SOME_VENDOR' as p_dvc_vendor,
'SOME_DEVICE_TYPE' as p_dvc_type,
FROM_UNIXTIME(unix_timestamp(concat(frame_day, frame_time),
"MMM dd yyyy HH:mm:ss"), "yyyyMMdd") as p_dt
FROM dns_received"""
}
planner {
type = append
}
output {
type = hive
table = "spot.event"
align.columns = true
}
}
}