pip install -r requirements.txt
Adding Kafka Service:
Ingest framework needs Kafka to work in real-time streaming. Add Kafka service using Cloudera Manager. If you are using a Cloudera Manager version < 5.4.1 you will need to add the kafka parcel manually.
Ingest module uses a default configuration for the message size (900000 bytes), if you modify this size in the ingest configuration file you will need to modify the following configuration properties in kafka:
Download the following jar file: spark-streaming-kafka-0-8-assembly_2.11. This jar adds support for Spark Streaming + Kafka and needs to be downloaded on the following path : spot-ingest/common (with the same name)
Required Roles
The following roles are required in all the nodes where the Ingest Framework will be running.
Ingest Configuration:
The file ingest_conf.json contains all the required configuration to start the ingest module
Configuration example:
"dbname" : "database name", "hdfs_app_path" : "hdfs application path", "collector_processes":5, "ingestion_interval":1, "spark-streaming":{ "driver_memory":"", "spark_exec":"", "spark_executor_memory":"", "spark_executor_cores":"", "spark_batch_size":"" }, "kafka":{ "kafka_server":"kafka ip", "kafka_port":"kafka port", "zookeper_server":"zk ip", "zookeper_port":"zk port", "message_size":900000 }, "pipelines":{ "flow_internals":{ "type":"flow", "collector_path":"/path_to_flow_collector", "local_staging":"/tmp/", "process_opt":"" }, "flow_externals":{ "type":"flow", "collector_path":"/path_to_flow_collector", "local_staging":"/tmp/", "process_opt":"" }, "dns_server_1":{ "type":"dns", "collector_path":"/path_to_dns_collector", "local_staging":"/tmp/", "pkt_num":"650000", "pcap_split_staging":"/tmp", "process_opt":"-E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1'" }
Starting the Ingest
Running in a Standalone Mode:
bash start_standalone_ingest.sh "pipeline_configuration" "num of workers"
Following the previous configuration example starting ingest module in a stand alone mode will look like:
bash start_standalone_ingest.sh flow_internals 4
Running in a Cluster Mode:
Running Master: Master needs to be run in the same server where the collector path is.
python master_collector.py -t "pipeline_configuration" -w "number of workers"
Running Workers: Worker needs to be executed in a server where the required processing program installed (i.e. nfdump), also the worker needs to be identified with a specific id, this id needs to start with 0.
example:
This “id” is required to attach the worker with the kafka partition.
python worker.py -t "pipeline_configuration" -i "id of the worker (starts with 0)" --topic "my_topic"