| application { |
| name = Spot proxy ingest |
| batch.milliseconds = 5000 |
| executors = 1 |
| executor.cores = 1 |
| executor.memory = 1g |
| spark.conf.hive.exec.dynamic.partition = true |
| spark.conf.hive.exec.dynamic.partition.mode = nonstrict |
| } |
| |
| steps { |
| proxy_received { |
| input { |
| type = kafka |
| brokers = "kafka-broker.yourdomain.com:9092" |
| topic = spot_proxy |
| encoding = string |
| translator { |
| type = delimited |
| field.names = [date,time,time_taken,c_ip,sc_status,s_action,sc_bytes,cs_bytes, |
| cs_method,cs_uri_scheme,cs_host,cs_uri_path,cs_uri_query,cs_username, |
| s_hierarchy,s_supplier_name,rs_Content_Type,cs_User_Agent, |
| sc_filter_result,sc_filter_category,x_virus_id,s_ip,s_sitename, |
| x_virus_details,x_icap_error_code,x_icap_error_details] |
| field.types = [string,string,string,string,string,string,string,string,string, |
| string,string,string,string,string,string,string,string,string, |
| string,string,string,string,string,string,string,string,string] |
| delimiter = " (?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)" |
| delimiter-regex = true |
| append.raw.enabled = true |
| append.raw.value.field.name = raw_value |
| } |
| } |
| } |
| |
| proxy_process { |
| dependencies = [proxy_received] |
| deriver { |
| type = sql |
| query.literal = """ |
| SELECT |
| 'proxy' as type, |
| raw_value as raw, |
| unix_timestamp(concat(date, ' ', time), "yyyy-MM-dd HH:mm:ss") as event_time, |
| time_taken as duration, |
| c_ip as src_ip4_str, |
| sc_status as prx_code, |
| s_action as prx_action, |
| sc_bytes as in_bytes, |
| cs_bytes as out_bytes, |
| cs_method as prx_method, |
| cs_uri_scheme as prx_type, |
| cs_host as src_host, |
| cs_uri_path as cs_uri_path, |
| cs_uri_query as prx_query, |
| cs_username as user_name, |
| s_hierarchy as org, |
| s_supplier_name as name, |
| rs_Content_Type as category, |
| cs_User_Agent as prx_browser, |
| sc_filter_result as prx_filter_result, |
| sc_filter_category as prx_category, |
| x_virus_id as vuln_id, |
| s_ip as dst_ip4_str, |
| s_sitename as src_domain, |
| x_virus_details as vuln_status, |
| x_icap_error_code as code, |
| x_icap_error_details vuln_severity, |
| 'SOME_VENDOR' as p_dvc_vendor, |
| 'SOME_DEVICE_TYPE' as p_dvc_type, |
| from_unixtime(unix_timestamp(concat(date, ' ', time), |
| "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd") as p_dt |
| FROM proxy_received""" |
| } |
| planner { |
| type = append |
| } |
| output { |
| type = hive |
| table = "spot.event" |
| align.columns = true |
| } |
| } |
| } |