Merge pull request #144 from curtishoward/SPOT-181_envelope_ingest

[SPOT-181] spot-ingest for ODM with config-driven Spark streaming (Envelope)
diff --git a/docs/odm/SPOT_ODM_Ingest_Framework.png b/docs/odm/SPOT_ODM_Ingest_Framework.png
new file mode 100644
index 0000000..b6a9e74
--- /dev/null
+++ b/docs/odm/SPOT_ODM_Ingest_Framework.png
Binary files differ
diff --git a/spot-ingest/odm/README.md b/spot-ingest/odm/README.md
new file mode 100644
index 0000000..c4de0b2
--- /dev/null
+++ b/spot-ingest/odm/README.md
@@ -0,0 +1,80 @@
+Spot Ingest Framework for the Open Data Model (ODM)
+======
+Spot ingest for the Open Data Model uses [Envelope](https://github.com/cloudera-labs/envelope) to enable configuration-driven Spark Streaming ingest application. 
+ 
+![Ingest Framework](../../docs/odm/SPOT_ODM_Ingest_Framework.png)
+
+## Getting Started
+
+### Prerequisites
+* [spot-setup/odm](../spot-setup/odm)
+* Apache Spark 2.2 or higher
+* Apache Kafka 0.10 (Cloudera Kafka 2.1) or higher
+* Ingest user with sudo privileges (i.e. spot). This user will execute all the processes in the Ingest Framework also this user needs to have access to hdfs solution path (i.e. /user/spot/).
+
+### Install
+From this directory (`spot-ingest/odm`):
+* Get Envelope:
+```
+    git clone https://github.com/cloudera-labs/envelope.git
+```
+* Include new Envelope features that facilitate Spot ODM ingest (NOTE: these steps will be unnecessary following the upcoming v0.6 release of Envelope):
+```
+    cd envelope/
+    wget https://raw.githubusercontent.com/curtishoward/incubator-spot/SPOT-181_files/spot-ingest/odm/workers/envelope_mods/0001-ENV-252-Add-Hive-output-option-to-align-step-schema-.patch
+    wget https://raw.githubusercontent.com/curtishoward/incubator-spot/SPOT-181_files/spot-ingest/odm/workers/envelope_mods/0001-ENV-256-Add-an-option-to-the-delimited-translator-to.patch
+    wget https://raw.githubusercontent.com/curtishoward/incubator-spot/SPOT-181_files/spot-ingest/odm/workers/envelope_mods/0001-ENV-258-Fix-delimited-translator-to-handle-missing-f.patch
+    patch -p1 < 0001-ENV-258-Fix-delimited-translator-to-handle-missing-f.patch
+    patch -p1 < 0001-ENV-256-Add-an-option-to-the-delimited-translator-to.patch
+    patch -p1 < 0001-ENV-252-Add-Hive-output-option-to-align-step-schema-.patch
+```
+* Build Envelope
+```
+    mvn clean && mvn -DskipTests package
+```
+### Getting Started
+
+**Required Roles**
+
+The following roles are required in all the nodes where the Ingest Workers will be running.
+* Kafka gateway
+* Spark gateway 
+* Hive gateway
+
+**Ingest Configuration**
+
+* Create the Spot ingest Kafka topics:
+```
+    kafka-topics --zookeeper zookeeper-host:2181  --create --topic spot_dns   --replication-factor 3 --partitions 4
+    kafka-topics --zookeeper zookeeper-host:2181  --create --topic spot_flow  --replication-factor 3 --partitions 4
+    kafka-topics --zookeeper zookeeper-host:2181  --create --topic spot_proxy --replication-factor 3 --partitions 4
+```
+* Update Kafka `broker` and `topic` parameters in the `workers/spot_*.conf` Envelope configuration files
+
+**Starting the Ingest**
+
+Start the Spark Streaming application defined by the Envelope configuration (Spark driver logs will be in the working directory):
+```
+    bash start_ingest.sh [dns|flow|proxy]
+```
+
+**Collector Examples**
+
+While the collector can be any application that is a Kafka producer for the source topic, examples are prodived which make use of nfdump, tshark and unzip for flow, DNS and proxy data (respectively), to dissect and then forward records to the relevant Kafka topics using Flume.
+
+The following are required on all (Edge) nodes where the collector examples will be running:
+* [Flume gateway](https://flume.apache.org/download.html)
+* [Kafka gateway](https://kafka.apache.org/downloads)
+* [tshark](https://www.wireshark.org/download.html) (see also [how to install](https://github.com/Open-Network-Insight/open-network-insight/wiki/Install%20Ingest%20Prerequisites) ) _or_ [spot-nfdump](https://github.com/Open-Network-Insight/spot-nfdump)
+
+To run the collector example (from the `spot-ingest/collectors` directory):
+* Update the Kafka `brokerList` parameter in each `spot_flume_*.conf` Flume configuration file
+* Start the following script, which: 1) starts the Flume agent, 2) listens for and dissects new batch files for each source under the `<source_type>/new` directory:
+```
+   bash process_files.sh [dns|flow|proxy]
+```
+* Move a new dns.pcap, flow.nfcap, or proxy.log file to the receiving directory.  See [SPOT-135](https://issues.apache.org/jira/browse/SPOT-135) for sample data.  For example, if `process_files.sh` has been started for DNS:
+```
+    mv sample.pcap dns/new
+```
+* Once the file has been dissected, you should begin to see records in the `spot.event` Hive table
diff --git a/spot-ingest/odm/collectors/process_files.sh b/spot-ingest/odm/collectors/process_files.sh
new file mode 100644
index 0000000..7ef4ba4
--- /dev/null
+++ b/spot-ingest/odm/collectors/process_files.sh
@@ -0,0 +1,38 @@
+#@/bin/bash
+SOURCE_TYPE=$1
+
+ROOT_DIR=.
+NEW_FILES_DIR=$ROOT_DIR/$SOURCE_TYPE/new
+PROCESSED_FILES_DIR=$ROOT_DIR/$SOURCE_TYPE/stage
+FLUME_SPOOL_DIR=$ROOT_DIR/$SOURCE_TYPE/flume_spool
+
+for src_type in dns flow proxy; do for flume_dir in new stage flume_spool; do mkdir -p $src_type/$flume_dir; done; done
+
+# start the flume agent in the background
+flume-ng agent --conf-file spot_flume_${SOURCE_TYPE}.conf --name a1 > $ROOT_DIR/spot_flume_${SOURCE_TYPE}.log 2>&1 &
+FLUME_PID=$!
+trap "kill -9 $FLUME_PID; exit 1" SIGINT SIGTERM
+
+while true
+do
+  for fname in `ls $NEW_FILES_DIR`
+  do
+    if [ $SOURCE_TYPE = "dns" ]
+    then
+      tshark -r $NEW_FILES_DIR/$fname -E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1' > $PROCESSED_FILES_DIR/dns.log
+    elif [ $SOURCE_TYPE = "flow" ]
+    then
+      nfdump -r $NEW_FILES_DIR/$fname -o csv > $PROCESSED_FILES_DIR/flow.log
+    elif [ $SOURCE_TYPE = "proxy" ] 
+    then
+      unzip $NEW_FILES_DIR/$fname -d $PROCESSED_FILES_DIR
+    else
+      echo "USAGE:  process_files.sh <source_type> (valid source types:  \"proxy\", \"dns\", \"flow\")"
+      exit 1
+    fi 
+    mv $PROCESSED_FILES_DIR/*.log $FLUME_SPOOL_DIR
+    rm -f $NEW_FILES_DIR/$fname
+  done
+  rm -f $FLUME_SPOOL_DIR/*.COMPLETED
+  sleep 5
+done
diff --git a/spot-ingest/odm/collectors/spot_flume_dns.conf b/spot-ingest/odm/collectors/spot_flume_dns.conf
new file mode 100644
index 0000000..2d84099
--- /dev/null
+++ b/spot-ingest/odm/collectors/spot_flume_dns.conf
@@ -0,0 +1,17 @@
+a1.channels = ch-1
+a1.sources = src-1
+a1.sinks = sink-1
+
+a1.sources.src-1.type = spooldir
+a1.sources.src-1.channels = ch-1
+a1.sources.src-1.spoolDir = ./dns/flume_spool
+
+a1.channels.ch-1.type = memory
+a1.channels.ch-1.capacity = 10000
+a1.channels.ch-1.transactionCapacity = 1000
+
+a1.sinks.sink-1.type = org.apache.flume.sink.kafka.KafkaSink
+a1.sinks.sink-1.topic = spot_dns
+a1.sinks.sink-1.brokerList = kafka-broker-1.yourdomain.com:9092,kafka-broker-1.yourdomain.com:9092
+a1.sinks.sink-1.channel = ch-1
+a1.sinks.sink-1.batchSize = 20
diff --git a/spot-ingest/odm/collectors/spot_flume_flow.conf b/spot-ingest/odm/collectors/spot_flume_flow.conf
new file mode 100644
index 0000000..2fd0d14
--- /dev/null
+++ b/spot-ingest/odm/collectors/spot_flume_flow.conf
@@ -0,0 +1,17 @@
+a1.channels = ch-1
+a1.sources = src-1
+a1.sinks = sink-1
+
+a1.sources.src-1.type = spooldir
+a1.sources.src-1.channels = ch-1
+a1.sources.src-1.spoolDir = ./flow/flume_spool
+
+a1.channels.ch-1.type = memory
+a1.channels.ch-1.capacity = 10000
+a1.channels.ch-1.transactionCapacity = 1000
+
+a1.sinks.sink-1.type = org.apache.flume.sink.kafka.KafkaSink
+a1.sinks.sink-1.topic = spot_flow
+a1.sinks.sink-1.brokerList = kafka-broker-1.yourdomain.com:9092,kafka-broker-1.yourdomain.com:9092
+a1.sinks.sink-1.channel = ch-1
+a1.sinks.sink-1.batchSize = 20
diff --git a/spot-ingest/odm/collectors/spot_flume_proxy.conf b/spot-ingest/odm/collectors/spot_flume_proxy.conf
new file mode 100644
index 0000000..83b29a6
--- /dev/null
+++ b/spot-ingest/odm/collectors/spot_flume_proxy.conf
@@ -0,0 +1,17 @@
+a1.channels = ch-1
+a1.sources = src-1
+a1.sinks = sink-1
+
+a1.sources.src-1.type = spooldir
+a1.sources.src-1.channels = ch-1
+a1.sources.src-1.spoolDir = ./proxy/flume_spool
+
+a1.channels.ch-1.type = memory
+a1.channels.ch-1.capacity = 10000
+a1.channels.ch-1.transactionCapacity = 1000
+
+a1.sinks.sink-1.type = org.apache.flume.sink.kafka.KafkaSink
+a1.sinks.sink-1.topic = spot_proxy
+a1.sinks.sink-1.brokerList = kafka-broker-1.yourdomain.com:9092,kafka-broker-1.yourdomain.com:9092
+a1.sinks.sink-1.channel = ch-1
+a1.sinks.sink-1.batchSize = 20
diff --git a/spot-ingest/odm/start_ingest.sh b/spot-ingest/odm/start_ingest.sh
new file mode 100644
index 0000000..a47186f
--- /dev/null
+++ b/spot-ingest/odm/start_ingest.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+SOURCE_TYPE=$1
+ENVELOPE_JAR=envelope/build/envelope/target/envelope-*.jar
+
+if [ "$SOURCE_TYPE" = "dns" ] || [ "$SOURCE_TYPE" = "flow" ] || [ "$SOURCE_TYPE" = "proxy" ]
+then
+  SPARK_KAFKA_VERSION=0.10 spark2-submit $ENVELOPE_JAR workers/spot_${SOURCE_TYPE}.conf >> spot_${SOURCE_TYPE}_ingest_spark_driver.log 2>&1 &
+else
+  echo "USAGE:  start_ingest.sh <source_type> (valid source types:  \"proxy\", \"dns\", \"flow\")"
+fi
diff --git a/spot-ingest/odm/workers/spot_dns.conf b/spot-ingest/odm/workers/spot_dns.conf
new file mode 100644
index 0000000..834e36a
--- /dev/null
+++ b/spot-ingest/odm/workers/spot_dns.conf
@@ -0,0 +1,62 @@
+application {
+    name = Spot DNS ingest
+    batch.milliseconds = 5000
+    executors = 1 
+    executor.cores = 1 
+    executor.memory = 1g
+    spark.conf.hive.exec.dynamic.partition = true
+    spark.conf.hive.exec.dynamic.partition.mode = nonstrict
+}
+
+steps {
+    dns_received {
+        input {
+            type = kafka
+            brokers = "kafka-broker.yourdomain.com:9092"
+            topic = spot_dns
+            encoding = string
+            translator {
+                type = delimited
+                delimiter = ","
+                field.names = [frame_day,frame_time,unix_timestamp,frame_len,ip_src,ip_dst,dns_qry_name,
+                               dns_qry_type,dns_qry_class,dns_qry_rcode,dns_a]
+                field.types = [string,string,string,string,string,string,string,string,string,string,string]
+                append.raw.enabled = true
+                append.raw.value.field.name = raw_value
+            }
+        }
+    }
+
+    dns_formatted {
+        dependencies = [dns_received]
+        deriver {
+            type = sql
+            query.literal = """
+                SELECT
+                    'dns' as type,
+                    raw_value as raw,
+                    unix_timestamp(concat(frame_day, frame_time), "MMM dd yyyy HH:mm:ss") as event_time,
+                    ip_src as src_ip4_str,
+                    ip_dst as dst_ip4_str,
+                    dns_qry_class as dns_class,
+                    frame_len as dns_len,
+                    dns_qry_name as dns_query,
+                    dns_qry_rcode as dns_response_code,
+                    dns_a as dns_answers,
+                    dns_qry_type as dns_type,
+                    'SOME_VENDOR' as p_dvc_vendor,
+                    'SOME_DEVICE_TYPE' as p_dvc_type,
+                    FROM_UNIXTIME(unix_timestamp(concat(frame_day, frame_time), 
+                                                 "MMM dd yyyy HH:mm:ss"), "yyyyMMdd") as p_dt
+                 FROM dns_received"""
+        }
+        planner {
+            type = append
+        }
+        output {
+            type = hive
+            table = "spot.event"
+            align.columns = true
+        }
+    }
+}
diff --git a/spot-ingest/odm/workers/spot_flow.conf b/spot-ingest/odm/workers/spot_flow.conf
new file mode 100644
index 0000000..2ae539a
--- /dev/null
+++ b/spot-ingest/odm/workers/spot_flow.conf
@@ -0,0 +1,76 @@
+application {
+    name = Spot flow ingest
+    batch.milliseconds = 5000
+    executors = 1  
+    executor.cores = 1 
+    executor.memory = 1g
+    spark.conf.hive.exec.dynamic.partition = true
+    spark.conf.hive.exec.dynamic.partition.mode = nonstrict
+}
+
+steps {
+    flow_received {
+        input {
+            type = kafka
+            brokers = "kafka-broker.yourdomain.com:9092" 
+            topic = spot_flow
+            encoding = string
+            translator {
+                type = delimited
+                field.names = [tr,try,trm,trd,tr_h,tr_m,tr_s,td,sa,da,sp,dp,pr,flg,fwd,
+                               stos,ipkt,ibyt,opkt,obyt,in,out,sas,das,dtos,dir,ra]
+                field.types = [string,string,string,string,string,string,string,string,string,
+                               string,string,string,string,string,string,string,string,string,
+                               string,string,string,string,string,string,string,string,string]
+                delimiter = ","
+                append.raw.enabled = true
+                append.raw.value.field.name = raw_value
+            }
+        }
+    }
+
+// mapping from https://www.sans.org/reading-room/whitepapers/incident/netflow-collection-analysis-nfcapd-python-splunk-35747 (page 21)
+    flow_process {
+        dependencies = [flow_received]
+        deriver { 
+            type = sql
+            query.literal = """
+                SELECT
+                    'flow' as type,
+                    raw_value as raw,
+                    unix_timestamp(tr, "yyyy-MM-dd HH:mm:ss") as event_time,
+                    td as duration,
+                    sa as src_ip4_str,
+                    da as dst_ip4_str,
+                    sp as src_port,
+                    dp as dst_port,
+                    pr as n_proto,
+                    flg as net_flags,
+                    fwd as code,
+                    stos as service,
+                    ipkt as flow_in_packets,
+                    ibyt as in_bytes,
+                    opkt as flow_out_packets,
+                    obyt as out_bytes,
+                    in as flow_input,
+                    out as flow_output,
+                    sas as src_asn,
+                    das as dst_asn,
+                    dtos as xref,
+                    dir as net_direction,
+                    ra as dvc_host,
+                    'SOME_VENDOR' as p_dvc_vendor,
+                    'SOME_DEVICE_TYPE' as p_dvc_type,
+                    FROM_UNIXTIME(unix_timestamp(tr, "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd") as p_dt
+                FROM flow_received"""
+        }
+        planner { 
+            type = append 
+        }
+        output { 
+            type = hive 
+            table = "spot.event"
+            align.columns = true
+        }
+    }
+}
diff --git a/spot-ingest/odm/workers/spot_proxy.conf b/spot-ingest/odm/workers/spot_proxy.conf
new file mode 100644
index 0000000..ae168b8
--- /dev/null
+++ b/spot-ingest/odm/workers/spot_proxy.conf
@@ -0,0 +1,84 @@
+application {
+    name = Spot proxy ingest
+    batch.milliseconds = 5000
+    executors = 1
+    executor.cores = 1 
+    executor.memory = 1g
+    spark.conf.hive.exec.dynamic.partition = true
+    spark.conf.hive.exec.dynamic.partition.mode = nonstrict
+}
+
+steps {
+    proxy_received {
+        input {
+            type = kafka
+            brokers = "kafka-broker.yourdomain.com:9092"
+            topic = spot_proxy
+            encoding = string
+            translator {
+                type = delimited
+                field.names = [date,time,time_taken,c_ip,sc_status,s_action,sc_bytes,cs_bytes,
+                               cs_method,cs_uri_scheme,cs_host,cs_uri_path,cs_uri_query,cs_username,
+                               s_hierarchy,s_supplier_name,rs_Content_Type,cs_User_Agent,
+                               sc_filter_result,sc_filter_category,x_virus_id,s_ip,s_sitename,
+                               x_virus_details,x_icap_error_code,x_icap_error_details]
+                field.types = [string,string,string,string,string,string,string,string,string,
+                               string,string,string,string,string,string,string,string,string,
+                               string,string,string,string,string,string,string,string,string]
+                delimiter = " (?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
+                delimiter-regex = true
+                append.raw.enabled = true
+                append.raw.value.field.name = raw_value
+            }
+        }
+    }
+
+    proxy_process {
+        dependencies = [proxy_received]
+        deriver { 
+            type = sql
+            query.literal = """
+                SELECT
+                    'proxy' as type,
+                    raw_value as raw,
+                    unix_timestamp(concat(date, ' ', time), "yyyy-MM-dd HH:mm:ss") as event_time,
+                    time_taken as duration,
+                    c_ip as src_ip4_str,
+                    sc_status as prx_code,
+                    s_action as prx_action,
+                    sc_bytes as in_bytes,
+                    cs_bytes as out_bytes,
+                    cs_method as prx_method,
+                    cs_uri_scheme as prx_type,
+                    cs_host as src_host,
+                    cs_uri_path as cs_uri_path,
+                    cs_uri_query as prx_query,
+                    cs_username as user_name,
+                    s_hierarchy as org,
+                    s_supplier_name as name,
+                    rs_Content_Type as category,
+                    cs_User_Agent as prx_browser,
+                    sc_filter_result as prx_filter_result,
+                    sc_filter_category as prx_category,
+                    x_virus_id as vuln_id,
+                    s_ip as dst_ip4_str,
+                    s_sitename as src_domain,
+                    x_virus_details as vuln_status,
+                    x_icap_error_code as code,
+                    x_icap_error_details vuln_severity,
+                    'SOME_VENDOR' as p_dvc_vendor,
+                    'SOME_DEVICE_TYPE' as p_dvc_type,
+                    from_unixtime(unix_timestamp(concat(date, ' ', time), 
+                                                 "yyyy-MM-dd HH:mm:ss"), "yyyyMMdd") as p_dt
+                FROM proxy_received"""
+        }
+        planner { 
+            type = append 
+        }
+        output { 
+            type = hive 
+            table = "spot.event"
+            align.columns = true
+        }
+    }
+}