blob: ae168b8913bbbc9f3b338b59b380b54115aae6e9 [file] [log] [blame]
application {
name = Spot proxy ingest
batch.milliseconds = 5000
executors = 1
executor.cores = 1
executor.memory = 1g
spark.conf.hive.exec.dynamic.partition = true
spark.conf.hive.exec.dynamic.partition.mode = nonstrict
}
steps {
proxy_received {
input {
type = kafka
brokers = "kafka-broker.yourdomain.com:9092"
topic = spot_proxy
encoding = string
translator {
type = delimited
field.names = [date,time,time_taken,c_ip,sc_status,s_action,sc_bytes,cs_bytes,
cs_method,cs_uri_scheme,cs_host,cs_uri_path,cs_uri_query,cs_username,
s_hierarchy,s_supplier_name,rs_Content_Type,cs_User_Agent,
sc_filter_result,sc_filter_category,x_virus_id,s_ip,s_sitename,
x_virus_details,x_icap_error_code,x_icap_error_details]
field.types = [string,string,string,string,string,string,string,string,string,
string,string,string,string,string,string,string,string,string,
string,string,string,string,string,string,string,string,string]
delimiter = " (?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
delimiter-regex = true
append.raw.enabled = true
append.raw.value.field.name = raw_value
}
}
}
proxy_process {
dependencies = [proxy_received]
deriver {
type = sql
query.literal = """
SELECT
'proxy' as type,
raw_value as raw,
unix_timestamp(concat(date, ' ', time), "yyyy-MM-dd HH:mm:ss") as event_time,
time_taken as duration,
c_ip as src_ip4_str,
sc_status as prx_code,
s_action as prx_action,
sc_bytes as in_bytes,
cs_bytes as out_bytes,
cs_method as prx_method,
cs_uri_scheme as prx_type,
cs_host as src_host,
cs_uri_path as cs_uri_path,
cs_uri_query as prx_query,
cs_username as user_name,
s_hierarchy as org,
s_supplier_name as name,
rs_Content_Type as category,
cs_User_Agent as prx_browser,
sc_filter_result as prx_filter_result,
sc_filter_category as prx_category,
x_virus_id as vuln_id,
s_ip as dst_ip4_str,
s_sitename as src_domain,
x_virus_details as vuln_status,
x_icap_error_code as code,
x_icap_error_details vuln_severity,
'SOME_VENDOR' as p_dvc_vendor,
'SOME_DEVICE_TYPE' as p_dvc_type,
from_unixtime(unix_timestamp(concat(date, ' ', time),
"yyyy-MM-dd HH:mm:ss"), "yyyyMMdd") as p_dt
FROM proxy_received"""
}
planner {
type = append
}
output {
type = hive
table = "spot.event"
align.columns = true
}
}
}