Merge pull request #2 from ktzoulas/SPOT-INGEST-STREAMING

Spot ingest streaming for dns and proxy data
diff --git a/spot-ingest/README.md b/spot-ingest/README.md
index df88ef8..68262ae 100644
--- a/spot-ingest/README.md
+++ b/spot-ingest/README.md
@@ -128,19 +128,23 @@
     python worker.py -t "pipeline_configuration" -i "id of the worker (starts with 0)" --topic "my_topic"
 
 
-#### Ingestion using Spark Streaming
-A new functionality is now available where the Distributed Collector sends to Kafka cluster already processed files in a comma-separated output (CSV) format. Each CSV row corresponds to a table row in the Hive database, so the Streaming Listener consumes batches of CSV messages from the Kafka cluster and registers them in Hive database.
+## Ingestion using Spark Streaming
+A new functionality is now available where the Distributed Collector sends to Kafka cluster already processed files in a comma-separated output (CSV) format. Each row of the CSV, corresponds to a table row in the Hive database, so the Streaming Listener consumes batches of CSV messages from the Kafka cluster and registers them in Hive database.
 
 **Distributed Collector**
-The role of the Distributed Collector is the same, but instead of processing the data after transmission, it processes it before. Distributed Collector tracks a directory backwards for newly created files. When a file is detected, it converts it into a comma-separated output (CSV) format and stores the output in the local staging area. Then, it reads the CSV file line-by-line and creates smaller chunks of bytes. The size of each chunk depends on the maximum request size allowed by Kafka. Finally, it serializes each chunk into an Avro-encoded format and publishes them to Kafka cluster.
-Due to its architecture, Distributed Collector can run on an edge node of the Big Data infrastructure as well as on a remote host (proxy server, vNSF, etc). Additionally, option `--skip-conversion` has been added. If this is enabled, Distributed Collector expects already processed files in the form of CSV. So, when it detects one, it does not apply any transformation to it, just splits it into chunks and sends to the Kafka cluster.
+<br />
+The role of the Distributed Collector is the same, but instead of processing the data after transmission, it processes it before. Distributed Collector tracks a directory backwards for newly created files. When a file is detected, it converts it into a comma-separated output (CSV) format and stores the output in the local staging area. Then, it reads the CSV file line-by-line and creates smaller chunks of bytes. The size of each chunk depends on the maximum request size allowed by Kafka. Finally, it serializes each chunk into an Avro-encoded format and publishes them to Kafka cluster.<br />
+Due to its architecture, Distributed Collector can run **on an edge node** of the Big Data infrastructure as well as **on a remote host** (proxy server, vNSF, etc).<br />
+Additionally, option `--skip-conversion` has been added. If this is enabled, Distributed Collector expects already processed files in the form of CSV. So, when it detects one, it does not apply any transformation to it, just splits it into chunks and sends to the Kafka cluster.<br />
 Distributed Collector publishes to Apache Kafka only the CSV-converted file, and not the original one. The binary file remains to the local filesystem of the current host.
 
 **Streaming Listener**
-On the other hand, the Streaming Listener can only run on the central infrastructure. It listens to a specific Kafka topic and consumes incoming messages. Streaming data is divided into batches (according to a time interval). These batches are deserialized by the Listener, according to the supported Avro schema, parsed and registered in the corresponding table of Hive.
+<br />
+On the other hand, Streaming Listener can only run on the central infrastructure. It listens to a specific Kafka topic and consumes incoming messages. Streaming data is divided into batches (according to a time interval). These batches are deserialized by the Listener, according to the supported Avro schema, parsed and registered in the corresponding table of Hive.
 
-##### Configuration
-Both Distributed Collector and Streaming Listener use the same configuration file as the original Spot Ingest flavor. The only addition is unnder the `kafka` section:
+
+### Configuration
+Both Distributed Collector and Streaming Listener use the same configuration file as the original Spot Ingest flavor. The only addition is under `kafka` section:
 
     "kafka":{
         "kafka_server":"kafka ip",
@@ -151,26 +155,31 @@
         "max_request_size": 1048576
     },
 
-The `max_request_size` defines the maximum size of the chunks that are sent to Kafka cluster. If it is not set, then the default value is used (1MB).
+The `max_request_size` defines the maximum size of the chunks that are sent to Kafka cluster. If it is not set, then the default value is used [1MB].
 
-Furthermore, the list of supported files must be given as regular expressions. For example, to support a filename like `nfcapd.20171103140000`, you have to set:
+Furthermore, the list of the supported files must be given as regular expressions.
+For example, to support a filename like `nfcapd.20171103140000`, you have to set:<br />
+
     "supported_files" :["nfcapd.*"],
-or
+<br />or<br />
+
     "supported_files": ["nfcapd.[0-9]{14}"],
 
-##### Installation
+
+### Installation
 Installation requires a user with `sudo` privileges. Enter `spot-ingest` directory and run:
 <br />`./install_DC.sh`
 
-If you wish to install the Distributed Directory on a remote host, just copy `spot-ingest` folder to the remote host and run the above installation file. It should be mentioned again that the remote host should have access to the Kafka cluster, in order to work properly.
+If you wish to install the Distributed Collector on a remote host, just copy `spot-ingest` folder to the remote host and run the above installation file. It should be mentioned again that the remote host should have access to the Kafka cluster, in order to work properly.
 
-##### Getting Started
 
-**Start Distributed Collector**
-After the installation, enable the virtual environment:
+### Getting Started
+
+**Start Distributed Collector**<br />
+Enable the virtual environment
 <br />`source env/bin/activate`
 
-and check the usage message of the distributed collector.
+and check the usage message of the Distributed Collector.
 
     python collector.py --help
     usage: Distributed Collector Daemon of Apache Spot [-h] [-c] [-l]
@@ -193,20 +202,18 @@
 
 By default, it loads `ingest_conf.json` file, but using `-c , --config-file` option you can ovveride it and use another.
 
-Distributed Collector do not create a new topic, so you have to pass an existing one.
+Distributed Collector does not create a new topic, so you have to pass an existing one.
 
-To start Distributed collector:<br />
+To start Distributed Collector:<br />
 `python collector.py -t "pipeline_configuration" --topic "my_topic"`
 
-Some examples are given below:
-<br />`python collector.py -t flow --topic SPOT-INGEST-TEST-TOPIC`
-<br />
-<br />`python collector.py -t flow --topic SPOT-INGEST-TEST-TOPIC --config-file /tmp/another_ingest_conf.json`
-<br />
-<br />`python collector.py -t proxy --topic SPOT-PROXY-TOPIC --log-level DEBUG`
+Some examples are given below:<br />
+1. `python collector.py -t flow --topic SPOT-INGEST-TEST-TOPIC`<br />
+2. `python collector.py -t flow --topic SPOT-INGEST-TEST-TOPIC --config-file /tmp/another_ingest_conf.json`<br />
+3. `python collector.py -t proxy --topic SPOT-PROXY-TOPIC --log-level DEBUG`<br />
 
-**Start Streaming Listener**
-After the installation, try to print the usage message.
+**Start Streaming Listener**<br />
+Print usage message and check available options.
 
     python start_listener.py --help
     usage: Start Spark Job for Streaming Listener Daemon [-h] [-c] [-d] [-g] [-m]
@@ -222,6 +229,7 @@
                             cluster ("cluster")
       -g , --group-id       name of the consumer group to join for dynamic
                             partition assignment
+      -l , --log-level      determine the level of the logger
       -m , --master         spark://host:port, mesos://host:port, yarn, or local
       -n , --app-name       name of the Spark Job to display on the cluster web UI
       -r , --redirect-spark-logs 
@@ -237,9 +245,9 @@
 
     END
 
-By default, it loads `ingest_conf.json` file, but using `-c , --config-file` option you can ovveride it and use another.
+By default, it loads `ingest_conf.json` file, but using `-c , --config-file` option you can overide it and use another.
 
-Streaming Listener uses for Spark streaming the settings from the configuration file:
+Streaming Listener uses `spark-streaming` parameters from the configuration file:
 
     "spark-streaming":{
         "driver_memory":"",
@@ -250,16 +258,14 @@
 
 The `spark_batch_size` is the time interval (in seconds) at which streaming data will be divided into batches. The default value is 30 seconds.
 
-You can apply a Spark job in local, client or cluster mode (using `-m , --master` and `-d , --deploy-mode` options).
+You can apply a Spark job on local, client or cluster mode (using `-m , --master` and `-d , --deploy-mode` options).
 
-Moreover, you can isolate the logs from Spark using the option `-r , --redirect-spark-logs`. This is usefull in case of debugging.
+Moreover, you can isolate the logs from Spark, using the option `-r , --redirect-spark-logs`. This is usefull in case of debugging.
 
-To start Distributed collector:<br />
+To start Streaming Listener:<br />
 `python start_listener.py -t "pipeline_configuration" --topic "my_topic" -p "number of partitions to consume"`
 
-Some examples are given below:Some examples are given below:
-<br />`python start_listener.py -t flow --topic SPOT-INGEST-TOPIC -p 3 -g CUSTOM-GROUP-ID -n myApplication`
-<br />
-<br />`python start_listener.py -t flow --topic SPOT-INGEST-TOPIC -p 1 --master yarn --deploy-mode cluster`
-<br />
-<br />`python start_listener.py -t dns --topic SPOT-INGEST-DNS-TEST-TOPIC -p 4 --redirect-spark-logs /tmp/StreamingListener_Spark.log`
+Some examples are given below:<br />
+1. `python start_listener.py -t flow --topic SPOT-INGEST-TOPIC -p 3 -g CUSTOM-GROUP-ID -n myApplication`<br />
+2. `python start_listener.py -t flow --topic SPOT-INGEST-TOPIC -p 1 --master yarn --deploy-mode cluster`<br />
+3. `python start_listener.py -t dns --topic SPOT-INGEST-DNS-TEST-TOPIC -p 4 --redirect-spark-logs /tmp/StreamingListener_Spark.log`<br />
diff --git a/spot-ingest/collector.py b/spot-ingest/collector.py
index 897a50d..e1f34e0 100644
--- a/spot-ingest/collector.py
+++ b/spot-ingest/collector.py
@@ -150,15 +150,25 @@
                     .format(conf['kafka']['kafka_server'], conf['kafka']['kafka_port'])]
             }
 
-            if 'max_request_size' in conf['kafka'].keys():
-                conf['producer']['max_request_size'] = conf['kafka']['max_request_size']
-
             conf['file_watcher'] = {
                 'path': conf['pipelines'][args.type]['collector_path'],
                 'supported_files': conf['pipelines'][args.type]['supported_files'],
                 'recursive': True
             }
 
+            # .........................migrate configs
+            if not 'local_staging' in conf['pipelines'][args.type].keys():
+                conf['pipelines'][args.type]['local_staging'] = '/tmp'
+
+            if 'max_request_size' in conf['kafka'].keys():
+                conf['producer']['max_request_size'] = conf['kafka']['max_request_size']
+
+            if not 'process_opt' in conf['pipelines'][args.type].keys():
+                conf['pipelines'][args.type]['process_opt'] = ''
+
+            if 'recursive' in conf['pipelines'][args.type].keys():
+                conf['file_watcher']['recursive'] = conf['pipelines'][args.type]['recursive']
+
             collector = cls(args.type, args.topic, args.skip_conversion, **conf)
             collector.start()
 
@@ -202,7 +212,7 @@
     logger.info('Processing raw file "{0}"...'.format(filename))
 
     proc_dir  = os.path.join(tmpdir, proc_name)
-    status    = True
+    allpassed = True
 
     try:
         module      = getattr(pipelines, datatype)
@@ -221,20 +231,25 @@
         logger.info('Group lines of text-converted file and prepare to publish them.')
 
         for segmentid, datatuple in enumerate(partitioner):
-            status &= send_async(segmentid, *datatuple)
+            try: send_async(segmentid, *datatuple)
+            except RuntimeError: allpassed = False
 
         os.remove(staging)
         logger.info('Remove CSV-converted file "{0}" from local staging area.'
             .format(staging))
 
+    except IOError as ioe:
+        logger.warning(ioe.message)
+        return
+
     except Exception as exc:
-        logger.error('[{0}] {1}'.format(exc.__class__.__name__, str(exc).strip()))
+        logger.error('[{0}] {1}'.format(exc.__class__.__name__, exc.message))
         return
 
     finally:
         if producer: producer.close()
 
-    if status:
+    if allpassed:
         logger.info('All segments of "{0}" published successfully to Kafka cluster.'
             .format(filename))
         return
diff --git a/spot-ingest/common/listener.py b/spot-ingest/common/listener.py
index 60fef97..a67a99e 100644
--- a/spot-ingest/common/listener.py
+++ b/spot-ingest/common/listener.py
@@ -35,7 +35,7 @@
     '''
         Initialize the Spark job.
     '''
-    Util.get_logger('SPOT.INGEST')
+    Util.get_logger('SPOT.INGEST', kwargs.pop('log_level'))
 
     logger  = logging.getLogger('SPOT.INGEST.COMMON.LISTENER')
     logger.info('Initializing Spark Streaming Listener...')
@@ -99,6 +99,11 @@
         help='name of the consumer group to join for dynamic partition assignment',
         metavar='')
 
+    parser.add_argument('-l', '--log-level',
+        default='INFO',
+        help='determine the level of the logger',
+        metavar='')
+
     parser.add_argument('-n', '--app-name',
         help='name of the Spark Job to display on the cluster web UI',
         metavar='')
@@ -127,7 +132,7 @@
 
     return parser.parse_args()
 
-def store(rdd, hsc, dbtable, topic, schema=None, segtype='segment'):
+def store(rdd, hsc, dbtable, topic, schema=None, segtype='segments'):
     '''
         Interface for saving the content of the streaming :class:`DataFrame` out into
     Hive storage.
diff --git a/spot-ingest/common/producer.py b/spot-ingest/common/producer.py
index f92d42c..784b18d 100644
--- a/spot-ingest/common/producer.py
+++ b/spot-ingest/common/producer.py
@@ -71,7 +71,7 @@
 
         try:
             rawbytes = serialize(value)
-            if not rawbytes: return
+            if not rawbytes: raise RuntimeError
 
             _future  = self.send(topic, rawbytes.getvalue(), key, partition, timestamp_ms)
             meta     = _future.get(timeout=10)
diff --git a/spot-ingest/common/utils.py b/spot-ingest/common/utils.py
index dbfb9bf..ca443e0 100644
--- a/spot-ingest/common/utils.py
+++ b/spot-ingest/common/utils.py
@@ -32,7 +32,7 @@
 
 class Util(object):
     '''
-
+        Utility methods.
     '''
 
     @classmethod
diff --git a/spot-ingest/pipelines/dns/__init__.py b/spot-ingest/pipelines/dns/__init__.py
old mode 100755
new mode 100644
index ecb1860..313d9ab
--- a/spot-ingest/pipelines/dns/__init__.py
+++ b/spot-ingest/pipelines/dns/__init__.py
@@ -13,4 +13,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
\ No newline at end of file
+#
+
+from processing import convert, prepare
+from streaming  import StreamPipeline
+
+__all__ = ['convert', 'process', 'StreamPipeline']
diff --git a/spot-ingest/pipelines/dns/processing.py b/spot-ingest/pipelines/dns/processing.py
new file mode 100644
index 0000000..17abaea
--- /dev/null
+++ b/spot-ingest/pipelines/dns/processing.py
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+'''
+    Methods that will be used to process and prepare dns data, before being sent to
+Kafka cluster.
+'''
+
+import logging
+import re
+import sys
+import tempfile
+
+from common.utils import Util
+from datetime     import datetime
+
+COMMAND = 'tshark -r {0} {1} > {2}'
+EPOCH   = datetime(1970, 1, 1)
+
+def convert(pcap, tmpdir, opts='', prefix=None):
+    '''
+        Convert `pcap` file to a comma-separated output format.
+
+    :param pcap    : Path of binary file.
+    :param tmpdir  : Path of local staging area.
+    :param opts    : A set of options for `tshark` command.
+    :param prefix  : If `prefix` is specified, the file name will begin with that;
+                     otherwise, a default `prefix` is used.
+    :returns       : Path of CSV-converted file.
+    :rtype         : ``str``
+    :raises OSError: If an error occurs while executing the `tshark` command.
+    '''
+    logger = logging.getLogger('SPOT.INGEST.DNS.PROCESS')
+
+    with tempfile.NamedTemporaryFile(prefix=prefix, dir=tmpdir, delete=False) as fp:
+        command = COMMAND.format(pcap, opts, fp.name)
+
+        logger.debug('Execute command: {0}'.format(command))
+        Util.popen(command, raises=True)
+
+        return fp.name
+
+def prepare(csvfile, max_req_size):
+    '''
+        Prepare text-formatted data for transmission through the Kafka cluster.
+
+        This method takes a CSV file and groups it into segments, according to the
+    pattern '%Y%m%d%h'. If the size of each segment is greater than the maximum size
+    of a request, then divides each segment into smaller ones so that they can be
+    transmitted.
+
+    :param csvfile     : Path of CSV-converted file; result of `convert` method.
+    :param max_req_size: The maximum size of a request.
+    :returns           : A generator which yields the timestamp (in milliseconds) and a
+                         list of lines from the CSV-converted file.
+    :rtype             : :class:`types.GeneratorType`
+    :raises IOError    : If the given file has no any valid line.
+    '''
+    msg_list  = []
+    msg_size  = segmentid = 0
+    logger    = logging.getLogger('SPOT.INGEST.DNS.PROCESS')
+    partition = timestamp = None
+    pattern   = re.compile('[0-9]{10}.[0-9]{9}')
+
+    with open(csvfile, 'r') as fp:
+        for line in fp:
+            value = line.strip()
+            if not value: continue
+
+            match = pattern.search(value.split(',')[2])
+            if not match: continue
+
+            size  = sys.getsizeof(value)
+            # .........................assume the first 15 characters of the line, as
+            # the `partition` - and not the result of `search` - e.g. 'Sep  8, 2017 11'
+            if value[:15] == partition and (msg_size + size) < max_req_size:
+                msg_list.append(value)
+                msg_size += size
+                continue
+
+            # .........................if the hour is different or the message size is
+            # above the maximum, then yield existing list and continue with an empty one
+            if timestamp:
+                logger.debug('Yield segment-{0}: {1} lines, {2} bytes'.format(segmentid,
+                    len(msg_list), msg_size))
+                segmentid += 1
+
+                yield (int(timestamp.total_seconds() * 1000), msg_list)
+
+            msg_list  = [value]
+            msg_size  = size
+            partition = value[:15]
+            timestamp = datetime.fromtimestamp(float(match.group())) - EPOCH
+
+    # .................................send the last lines from the file. The check of
+    # `timestamp` is in case the file is empty and `timestamp` is still ``None``.
+    if not timestamp:
+        raise IOError('CSV-converted file has no valid lines.')
+
+    logger.debug('Yield segment-{0}: {1} lines, {2} bytes'.format(segmentid,
+        len(msg_list), msg_size))
+
+    yield (int(timestamp.total_seconds() * 1000), msg_list)
diff --git a/spot-ingest/pipelines/dns/streaming.py b/spot-ingest/pipelines/dns/streaming.py
new file mode 100644
index 0000000..d6b7164
--- /dev/null
+++ b/spot-ingest/pipelines/dns/streaming.py
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+'''
+    Methods to be used during the streaming process.
+'''
+
+import datetime
+
+
+class StreamPipeline:
+    '''
+        Create an input stream that pulls pcap messages from Kafka.
+
+    :param ssc     : :class:`pyspark.streaming.context.StreamingContext` object.
+    :param zkQuorum: Zookeeper quorum (host[:port],...).
+    :param groupId : The group id for this consumer.
+    :param topics  : Dictionary of topic -> numOfPartitions to consume. Each partition
+                     is consumed in its own thread.
+    '''
+
+    def __init__(self, ssc, zkQuorum, groupId, topics):
+        from common.serializer       import deserialize
+        from pyspark.streaming.kafka import KafkaUtils
+
+        self.__dstream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics,
+                            keyDecoder=lambda x: x, valueDecoder=deserialize)
+
+    @property
+    def dstream(self):
+        '''
+            Return the schema of this :class:`DataFrame` as a
+        :class:`pyspark.sql.types.StructType`.
+        '''
+        return self.__dstream\
+            .map(lambda x: x[1])\
+            .flatMap(lambda x: x)\
+            .map(lambda x: x.split(','))
+
+    @property
+    def schema(self):
+        '''
+            Return the data type that represents a row from the received data list.
+        '''
+        from pyspark.sql.types import (IntegerType, LongType, ShortType, StringType,
+                                        StructField, StructType)
+
+        return StructType(
+            [
+                StructField('frame_time', StringType(), True),
+                StructField('unix_tstamp', LongType(), True),
+                StructField('frame_len', IntegerType(), True),
+                StructField('ip_dst', StringType(), True),
+                StructField('ip_src', StringType(), True),
+                StructField('dns_qry_name', StringType(), True),
+                StructField('dns_qry_class', StringType(), True),
+                StructField('dns_qry_type', IntegerType(), True),
+                StructField('dns_qry_rcode', IntegerType(), True),
+                StructField('dns_a', StringType(), True),
+                StructField('y', ShortType(), True),
+                StructField('m', ShortType(), True),
+                StructField('d', ShortType(), True),
+                StructField('h', ShortType(), True)
+            ]
+        )
+
+    @property
+    def segtype(self):
+        '''
+            Return the type of the received segments.
+        '''
+        return 'pcap segments'
+
+    @staticmethod
+    def parse(fields):
+        '''
+            Parsing and normalization of data in preparation for import.
+
+        :param fields: Column fields of a row.
+        :returns     : A list of typecast-ed fields, according to the table's schema.
+        :rtype       : ``list``
+        '''
+        dt = datetime.datetime.fromtimestamp(float(fields[2]))
+
+        return [
+            '{0}, {1}'.format(fields[0], fields[1]),
+            long(float(fields[2])),
+            int(fields[3]),
+            fields[5],
+            fields[4],
+            fields[6],
+            fields[8],
+            int(fields[7]),
+            int(fields[9]),
+            fields[10],
+            dt.year,
+            dt.month,
+            dt.day,
+            dt.hour
+        ]
diff --git a/spot-ingest/pipelines/flow/processing.py b/spot-ingest/pipelines/flow/processing.py
index 26986e6..a4455fe 100644
--- a/spot-ingest/pipelines/flow/processing.py
+++ b/spot-ingest/pipelines/flow/processing.py
@@ -49,8 +49,8 @@
     with tempfile.NamedTemporaryFile(prefix=prefix, dir=tmpdir, delete=False) as fp:
         command = COMMAND.format(netflow, opts, fp.name)
 
-        Util.popen(command, raises=True)
         logger.debug('Execute command: {0}'.format(command))
+        Util.popen(command, raises=True)
 
         return fp.name
 
diff --git a/spot-ingest/pipelines/flow/streaming.py b/spot-ingest/pipelines/flow/streaming.py
index 187f828..18c94d9 100644
--- a/spot-ingest/pipelines/flow/streaming.py
+++ b/spot-ingest/pipelines/flow/streaming.py
@@ -101,7 +101,7 @@
         '''
             Return the type of the received segments.
         '''
-        return 'netflow segment'
+        return 'netflow segments'
 
     @staticmethod
     def parse(fields):
@@ -115,7 +115,7 @@
         unix_tstamp = datetime.datetime.strptime(fields[0], '%Y-%m-%d %H:%M:%S')\
                         .strftime('%s')
         return [
-            str(fields[0]),
+            fields[0],
             long(unix_tstamp),
             int(fields[1]),
             int(fields[2]),
@@ -124,12 +124,12 @@
             int(fields[5]),
             int(fields[6]),
             float(fields[7]),
-            str(fields[8]),
-            str(fields[9]),
+            fields[8],
+            fields[9],
             int(fields[10]),
             int(fields[11]),
-            str(fields[12]),
-            str(fields[13]),
+            fields[12],
+            fields[13],
             int(fields[14]),
             int(fields[15]),
             long(fields[16]),
@@ -142,7 +142,7 @@
             int(fields[23]),
             int(fields[24]),
             int(fields[25]),
-            str(fields[26]),
+            fields[26],
             int(fields[1]),
             int(fields[2]),
             int(fields[3]),
diff --git a/spot-ingest/pipelines/proxy/__init__.py b/spot-ingest/pipelines/proxy/__init__.py
index ecb1860..313d9ab 100644
--- a/spot-ingest/pipelines/proxy/__init__.py
+++ b/spot-ingest/pipelines/proxy/__init__.py
@@ -13,4 +13,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
\ No newline at end of file
+#
+
+from processing import convert, prepare
+from streaming  import StreamPipeline
+
+__all__ = ['convert', 'process', 'StreamPipeline']
diff --git a/spot-ingest/pipelines/proxy/processing.py b/spot-ingest/pipelines/proxy/processing.py
new file mode 100644
index 0000000..5b17ca7
--- /dev/null
+++ b/spot-ingest/pipelines/proxy/processing.py
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+'''
+    Methods that will be used to process and prepare dns data, before being sent to
+Kafka cluster.
+'''
+
+import logging
+import re
+import sys
+import tempfile
+
+from common.utils import Util
+from datetime     import datetime
+
+COMMAND = 'cp {0} {1} {2}'
+EPOCH = datetime(1970, 1, 1)
+
+def convert(logfile, tmpdir, opts='', prefix=None):
+    '''
+        Copy log file to the local staging area.
+
+    :param logfile: Path of log file.
+    :param tmpdir : Path of local staging area.
+    :param opts   : A set of options for the `cp` command.
+    :param prefix : If `prefix` is specified, the file name will begin with that;
+                     otherwise, a default `prefix` is used.
+    :returns      : Path of log file in local staging area.
+    :rtype        : ``str``
+    '''
+    logger = logging.getLogger('SPOT.INGEST.PROXY.PROCESS')
+
+    with tempfile.NamedTemporaryFile(prefix=prefix, dir=tmpdir, delete=False) as fp:
+        command = COMMAND.format(opts, logfile, fp.name)
+
+        logger.debug('Execute command: {0}'.format(command))
+        Util.popen(command, raises=True)
+
+        return fp.name
+
+def prepare(logfile, max_req_size):
+    '''
+        Prepare text-formatted data for transmission through the Kafka cluster.
+
+        This method takes a log file and groups it into segments, according to the
+    pattern '%Y%m%d%h'. If the size of each segment is greater than the maximum size
+    of a request, then divides each segment into smaller ones so that they can be
+    transmitted.
+
+    :param logfile     : Path of log file; result of `convert` method.
+    :param max_req_size: The maximum size of a request.
+    :returns           : A generator which yields the timestamp (in milliseconds) and a
+                         list of lines from the log file.
+    :rtype             : :class:`types.GeneratorType`
+    :raises IOError    : If the given file has no any valid line.
+    '''
+    msg_list  = []
+    msg_size  = segmentid = 0
+    logger    = logging.getLogger('SPOT.INGEST.PROXY.PROCESS')
+    partition = timestamp = None
+    pattern   = re.compile('[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}')
+
+    with open(logfile, 'r') as fp:
+        for line in fp:
+            value = line.strip()
+            if not value: continue
+
+            match = pattern.search(' '.join(value.split()[:2]))
+            if not match: continue
+
+            size  = sys.getsizeof(value)
+            # .........................assume the first 13 characters of the `search`
+            # result as the `partition`, e.g. '2018-03-20 09'
+            if match.group()[:13] == partition and (msg_size + size) < max_req_size:
+                msg_list.append(value)
+                msg_size += size
+                continue
+
+            # .........................if the hour is different or the message size is
+            # above the maximum, then yield existing list and continue with an empty one
+            if timestamp:
+                logger.debug('Yield segment-{0}: {1} lines, {2} bytes'.format(segmentid,
+                    len(msg_list), msg_size))
+                segmentid += 1
+
+                yield (int(timestamp.total_seconds() * 1000), msg_list)
+
+            msg_list  = [value]
+            msg_size  = size
+            partition = match.group()[:13]
+            timestamp = datetime.strptime(match.group(), '%Y-%m-%d %H:%M:%S') - EPOCH
+
+    # .................................send the last lines from the file. The check of
+    # `timestamp` is in case the file is empty and `timestamp` is still ``None``
+    if not timestamp:
+        raise IOError('Text-converted file has no valid lines.')
+
+    logger.debug('Yield segment-{0}: {1} lines, {2} bytes'.format(segmentid,
+        len(msg_list), msg_size))
+
+    yield (int(timestamp.total_seconds() * 1000), msg_list)
diff --git a/spot-ingest/pipelines/proxy/streaming.py b/spot-ingest/pipelines/proxy/streaming.py
new file mode 100644
index 0000000..003f64a
--- /dev/null
+++ b/spot-ingest/pipelines/proxy/streaming.py
@@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+'''
+    Methods to be used during the streaming process.
+'''
+
+import datetime
+import shlex
+
+def _analyzer(line):
+    '''
+        A lexical analyzer for simple shell-like syntaxes. Split given line into fields.
+
+    :param line: Line to split.
+    :returs    : List of fields.
+    :rtype     : ``list``
+    '''
+    lex                  = shlex.shlex(line)
+    lex.quotes           = '"'
+    lex.whitespace_split = True
+    lex.commenters       = ''
+
+    return list(lex)
+
+
+class StreamPipeline:
+    '''
+        Create an input stream that pulls proxy log messages from Kafka.
+
+    :param ssc     : :class:`pyspark.streaming.context.StreamingContext` object.
+    :param zkQuorum: Zookeeper quorum (host[:port],...)
+    :param groupId : The group id for this consumer.
+    :param topics  : Dictionary of topic -> numOfPartitions to consume. Each partition
+                     is consumed in its own thread.
+    '''
+
+    def __init__(self, ssc, zkQuorum, groupId, topics):
+        from common.serializer       import deserialize
+        from pyspark.streaming.kafka import KafkaUtils
+
+        self.__dstream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics,
+                            keyDecoder=lambda x: x, valueDecoder=deserialize)
+
+    @property
+    def dstream(self):
+        '''
+            Return the schema of this :class:`DataFrame` as a
+        :class:`pyspark.sql.types.StructType`.
+        '''
+        return self.__dstream\
+            .map(lambda x: x[1])\
+            .flatMap(lambda x: x)\
+            .map(lambda x: _analyzer(x))
+
+    @property
+    def schema(self):
+        '''
+            Return the data type that represents a row from the received data list.
+        '''
+        from pyspark.sql.types import IntegerType, StringType, StructField, StructType
+
+        return StructType(
+            [
+                StructField('p_date', StringType(), True),
+                StructField('p_time', StringType(), True),
+                StructField('clientip', StringType(), True),
+                StructField('host', StringType(), True),
+                StructField('reqmethod', StringType(), True),
+                StructField('useragent', StringType(), True),
+                StructField('resconttype', StringType(), True),
+                StructField('duration', IntegerType(), True),
+                StructField('username', StringType(), True),
+                StructField('authgroup', StringType(), True),
+                StructField('exceptionid', StringType(), True),
+                StructField('filterresult', StringType(), True),
+                StructField('webcat', StringType(), True),
+                StructField('referer', StringType(), True),
+                StructField('respcode', StringType(), True),
+                StructField('action', StringType(), True),
+                StructField('urischeme', StringType(), True),
+                StructField('uriport', StringType(), True),
+                StructField('uripath', StringType(), True),
+                StructField('uriquery', StringType(), True),
+                StructField('uriextension', StringType(), True),
+                StructField('serverip', StringType(), True),
+                StructField('scbytes', IntegerType(), True),
+                StructField('csbytes', IntegerType(), True),
+                StructField('virusid', StringType(), True),
+                StructField('bcappname', StringType(), True),
+                StructField('bcappoper', StringType(), True),
+                StructField('fulluri', StringType(), True),
+                StructField('y', StringType(), True),
+                StructField('m', StringType(), True),
+                StructField('d', StringType(), True),
+                StructField('h', StringType(), True)
+            ]
+        )
+
+    @property
+    def segtype(self):
+        '''
+            Return the type of the received segments.
+        '''
+        return 'proxy-log segments'
+
+    @staticmethod
+    def parse(fields):
+        '''
+            Parsing and normalization of data in preparation for import.
+
+        :param fields: Column fields of a row.
+        :returns     : A list of typecast-ed fields, according to the table's schema.
+        :rtype       : ``list``
+        '''
+        if len(fields) <= 1: return []
+
+        dt       = datetime.datetime.strptime('{0} {1}'.format(fields[0], fields[1]),
+                    '%Y-%m-%d %H:%M:%S')
+        uripath  = fields[17] if len(fields[17]) > 1 else ''
+        uriquery = fields[18] if len(fields[18]) > 1 else ''
+
+        return [
+            fields[0],
+            fields[1],
+            fields[3],
+            fields[15],
+            fields[12],
+            fields[20],
+            fields[13],
+            int(fields[2]),
+            fields[4],
+            fields[5],
+            fields[6],
+            fields[7],
+            fields[8],
+            fields[9],
+            fields[10],
+            fields[11],
+            fields[14],
+            fields[16],
+            fields[17],
+            fields[18],
+            fields[19],
+            fields[21],
+            int(fields[22]),
+            int(fields[23]),
+            fields[24],
+            fields[25],
+            fields[26],
+            '{0}{1}{2}'.format(fields[15], uripath, uriquery),
+            str(dt.year),
+            str(dt.month).zfill(2),
+            str(dt.day).zfill(2),
+            str(dt.hour).zfill(2)
+        ]
diff --git a/spot-ingest/start_listener.py b/spot-ingest/start_listener.py
index 7bfb00b..a584434 100644
--- a/spot-ingest/start_listener.py
+++ b/spot-ingest/start_listener.py
@@ -60,6 +60,8 @@
     if kwargs['group_id'] is not None:
         spark_job += ' -g {0}'.format(kwargs.pop('group_id'))
 
+    spark_job += ' -l {0}'.format(kwargs.pop('log_level'))
+
     if kwargs['app_name'] is not None:
         spark_job += ' -n {0}'.format(kwargs.pop('app_name'))
 
@@ -150,6 +152,11 @@
         help='name of the consumer group to join for dynamic partition assignment',
         metavar='')
 
+    parser.add_argument('-l', '--log-level',
+        default='INFO',
+        help='determine the level of the logger',
+        metavar='')
+
     parser.add_argument('-m', '--master',
         default='yarn',
         help='spark://host:port, mesos://host:port, yarn, or local',