code refactoring
diff --git a/spot-ingest/README.md b/spot-ingest/README.md
index 372f12a..68262ae 100644
--- a/spot-ingest/README.md
+++ b/spot-ingest/README.md
@@ -159,8 +159,10 @@
 
 Furthermore, the list of the supported files must be given as regular expressions.
 For example, to support a filename like `nfcapd.20171103140000`, you have to set:<br />
+
     "supported_files" :["nfcapd.*"],
 <br />or<br />
+
     "supported_files": ["nfcapd.[0-9]{14}"],
 
 
@@ -227,6 +229,7 @@
                             cluster ("cluster")
       -g , --group-id       name of the consumer group to join for dynamic
                             partition assignment
+      -l , --log-level      determine the level of the logger
       -m , --master         spark://host:port, mesos://host:port, yarn, or local
       -n , --app-name       name of the Spark Job to display on the cluster web UI
       -r , --redirect-spark-logs 
@@ -242,7 +245,7 @@
 
     END
 
-By default, it loads `ingest_conf.json` file, but using `-c , --config-file` option you can ovveride it and use another.
+By default, it loads `ingest_conf.json` file, but using `-c , --config-file` option you can overide it and use another.
 
 Streaming Listener uses `spark-streaming` parameters from the configuration file:
 
diff --git a/spot-ingest/collector.py b/spot-ingest/collector.py
index 897a50d..e1f34e0 100644
--- a/spot-ingest/collector.py
+++ b/spot-ingest/collector.py
@@ -150,15 +150,25 @@
                     .format(conf['kafka']['kafka_server'], conf['kafka']['kafka_port'])]
             }
 
-            if 'max_request_size' in conf['kafka'].keys():
-                conf['producer']['max_request_size'] = conf['kafka']['max_request_size']
-
             conf['file_watcher'] = {
                 'path': conf['pipelines'][args.type]['collector_path'],
                 'supported_files': conf['pipelines'][args.type]['supported_files'],
                 'recursive': True
             }
 
+            # .........................migrate configs
+            if not 'local_staging' in conf['pipelines'][args.type].keys():
+                conf['pipelines'][args.type]['local_staging'] = '/tmp'
+
+            if 'max_request_size' in conf['kafka'].keys():
+                conf['producer']['max_request_size'] = conf['kafka']['max_request_size']
+
+            if not 'process_opt' in conf['pipelines'][args.type].keys():
+                conf['pipelines'][args.type]['process_opt'] = ''
+
+            if 'recursive' in conf['pipelines'][args.type].keys():
+                conf['file_watcher']['recursive'] = conf['pipelines'][args.type]['recursive']
+
             collector = cls(args.type, args.topic, args.skip_conversion, **conf)
             collector.start()
 
@@ -202,7 +212,7 @@
     logger.info('Processing raw file "{0}"...'.format(filename))
 
     proc_dir  = os.path.join(tmpdir, proc_name)
-    status    = True
+    allpassed = True
 
     try:
         module      = getattr(pipelines, datatype)
@@ -221,20 +231,25 @@
         logger.info('Group lines of text-converted file and prepare to publish them.')
 
         for segmentid, datatuple in enumerate(partitioner):
-            status &= send_async(segmentid, *datatuple)
+            try: send_async(segmentid, *datatuple)
+            except RuntimeError: allpassed = False
 
         os.remove(staging)
         logger.info('Remove CSV-converted file "{0}" from local staging area.'
             .format(staging))
 
+    except IOError as ioe:
+        logger.warning(ioe.message)
+        return
+
     except Exception as exc:
-        logger.error('[{0}] {1}'.format(exc.__class__.__name__, str(exc).strip()))
+        logger.error('[{0}] {1}'.format(exc.__class__.__name__, exc.message))
         return
 
     finally:
         if producer: producer.close()
 
-    if status:
+    if allpassed:
         logger.info('All segments of "{0}" published successfully to Kafka cluster.'
             .format(filename))
         return
diff --git a/spot-ingest/common/listener.py b/spot-ingest/common/listener.py
index b70275c..a67a99e 100644
--- a/spot-ingest/common/listener.py
+++ b/spot-ingest/common/listener.py
@@ -35,7 +35,7 @@
     '''
         Initialize the Spark job.
     '''
-    Util.get_logger('SPOT.INGEST')
+    Util.get_logger('SPOT.INGEST', kwargs.pop('log_level'))
 
     logger  = logging.getLogger('SPOT.INGEST.COMMON.LISTENER')
     logger.info('Initializing Spark Streaming Listener...')
@@ -99,6 +99,11 @@
         help='name of the consumer group to join for dynamic partition assignment',
         metavar='')
 
+    parser.add_argument('-l', '--log-level',
+        default='INFO',
+        help='determine the level of the logger',
+        metavar='')
+
     parser.add_argument('-n', '--app-name',
         help='name of the Spark Job to display on the cluster web UI',
         metavar='')
diff --git a/spot-ingest/common/producer.py b/spot-ingest/common/producer.py
index f92d42c..784b18d 100644
--- a/spot-ingest/common/producer.py
+++ b/spot-ingest/common/producer.py
@@ -71,7 +71,7 @@
 
         try:
             rawbytes = serialize(value)
-            if not rawbytes: return
+            if not rawbytes: raise RuntimeError
 
             _future  = self.send(topic, rawbytes.getvalue(), key, partition, timestamp_ms)
             meta     = _future.get(timeout=10)
diff --git a/spot-ingest/common/utils.py b/spot-ingest/common/utils.py
index dbfb9bf..ca443e0 100644
--- a/spot-ingest/common/utils.py
+++ b/spot-ingest/common/utils.py
@@ -32,7 +32,7 @@
 
 class Util(object):
     '''
-
+        Utility methods.
     '''
 
     @classmethod
diff --git a/spot-ingest/pipelines/dns/processing.py b/spot-ingest/pipelines/dns/processing.py
index d2cf840..17abaea 100644
--- a/spot-ingest/pipelines/dns/processing.py
+++ b/spot-ingest/pipelines/dns/processing.py
@@ -65,8 +65,8 @@
 
     :param csvfile     : Path of CSV-converted file; result of `convert` method.
     :param max_req_size: The maximum size of a request.
-    :returns           : A generator which yields a pair of: (timestamp; in milliseconds,
-                         list of lines from the CSV-converted file)
+    :returns           : A generator which yields the timestamp (in milliseconds) and a
+                         list of lines from the CSV-converted file.
     :rtype             : :class:`types.GeneratorType`
     :raises IOError    : If the given file has no any valid line.
     '''
diff --git a/spot-ingest/pipelines/flow/processing.py b/spot-ingest/pipelines/flow/processing.py
index 26986e6..a4455fe 100644
--- a/spot-ingest/pipelines/flow/processing.py
+++ b/spot-ingest/pipelines/flow/processing.py
@@ -49,8 +49,8 @@
     with tempfile.NamedTemporaryFile(prefix=prefix, dir=tmpdir, delete=False) as fp:
         command = COMMAND.format(netflow, opts, fp.name)
 
-        Util.popen(command, raises=True)
         logger.debug('Execute command: {0}'.format(command))
+        Util.popen(command, raises=True)
 
         return fp.name
 
diff --git a/spot-ingest/pipelines/flow/streaming.py b/spot-ingest/pipelines/flow/streaming.py
index 187f828..18c94d9 100644
--- a/spot-ingest/pipelines/flow/streaming.py
+++ b/spot-ingest/pipelines/flow/streaming.py
@@ -101,7 +101,7 @@
         '''
             Return the type of the received segments.
         '''
-        return 'netflow segment'
+        return 'netflow segments'
 
     @staticmethod
     def parse(fields):
@@ -115,7 +115,7 @@
         unix_tstamp = datetime.datetime.strptime(fields[0], '%Y-%m-%d %H:%M:%S')\
                         .strftime('%s')
         return [
-            str(fields[0]),
+            fields[0],
             long(unix_tstamp),
             int(fields[1]),
             int(fields[2]),
@@ -124,12 +124,12 @@
             int(fields[5]),
             int(fields[6]),
             float(fields[7]),
-            str(fields[8]),
-            str(fields[9]),
+            fields[8],
+            fields[9],
             int(fields[10]),
             int(fields[11]),
-            str(fields[12]),
-            str(fields[13]),
+            fields[12],
+            fields[13],
             int(fields[14]),
             int(fields[15]),
             long(fields[16]),
@@ -142,7 +142,7 @@
             int(fields[23]),
             int(fields[24]),
             int(fields[25]),
-            str(fields[26]),
+            fields[26],
             int(fields[1]),
             int(fields[2]),
             int(fields[3]),
diff --git a/spot-ingest/start_listener.py b/spot-ingest/start_listener.py
index 7bfb00b..a584434 100644
--- a/spot-ingest/start_listener.py
+++ b/spot-ingest/start_listener.py
@@ -60,6 +60,8 @@
     if kwargs['group_id'] is not None:
         spark_job += ' -g {0}'.format(kwargs.pop('group_id'))
 
+    spark_job += ' -l {0}'.format(kwargs.pop('log_level'))
+
     if kwargs['app_name'] is not None:
         spark_job += ' -n {0}'.format(kwargs.pop('app_name'))
 
@@ -150,6 +152,11 @@
         help='name of the consumer group to join for dynamic partition assignment',
         metavar='')
 
+    parser.add_argument('-l', '--log-level',
+        default='INFO',
+        help='determine the level of the logger',
+        metavar='')
+
     parser.add_argument('-m', '--master',
         default='yarn',
         help='spark://host:port, mesos://host:port, yarn, or local',