code refactoring
diff --git a/spot-ingest/README.md b/spot-ingest/README.md
index 372f12a..68262ae 100644
--- a/spot-ingest/README.md
+++ b/spot-ingest/README.md
@@ -159,8 +159,10 @@
Furthermore, the list of the supported files must be given as regular expressions.
For example, to support a filename like `nfcapd.20171103140000`, you have to set:<br />
+
"supported_files" :["nfcapd.*"],
<br />or<br />
+
"supported_files": ["nfcapd.[0-9]{14}"],
@@ -227,6 +229,7 @@
cluster ("cluster")
-g , --group-id name of the consumer group to join for dynamic
partition assignment
+ -l , --log-level determine the level of the logger
-m , --master spark://host:port, mesos://host:port, yarn, or local
-n , --app-name name of the Spark Job to display on the cluster web UI
-r , --redirect-spark-logs
@@ -242,7 +245,7 @@
END
-By default, it loads `ingest_conf.json` file, but using `-c , --config-file` option you can ovveride it and use another.
+By default, it loads `ingest_conf.json` file, but using `-c , --config-file` option you can overide it and use another.
Streaming Listener uses `spark-streaming` parameters from the configuration file:
diff --git a/spot-ingest/collector.py b/spot-ingest/collector.py
index 897a50d..e1f34e0 100644
--- a/spot-ingest/collector.py
+++ b/spot-ingest/collector.py
@@ -150,15 +150,25 @@
.format(conf['kafka']['kafka_server'], conf['kafka']['kafka_port'])]
}
- if 'max_request_size' in conf['kafka'].keys():
- conf['producer']['max_request_size'] = conf['kafka']['max_request_size']
-
conf['file_watcher'] = {
'path': conf['pipelines'][args.type]['collector_path'],
'supported_files': conf['pipelines'][args.type]['supported_files'],
'recursive': True
}
+ # .........................migrate configs
+ if not 'local_staging' in conf['pipelines'][args.type].keys():
+ conf['pipelines'][args.type]['local_staging'] = '/tmp'
+
+ if 'max_request_size' in conf['kafka'].keys():
+ conf['producer']['max_request_size'] = conf['kafka']['max_request_size']
+
+ if not 'process_opt' in conf['pipelines'][args.type].keys():
+ conf['pipelines'][args.type]['process_opt'] = ''
+
+ if 'recursive' in conf['pipelines'][args.type].keys():
+ conf['file_watcher']['recursive'] = conf['pipelines'][args.type]['recursive']
+
collector = cls(args.type, args.topic, args.skip_conversion, **conf)
collector.start()
@@ -202,7 +212,7 @@
logger.info('Processing raw file "{0}"...'.format(filename))
proc_dir = os.path.join(tmpdir, proc_name)
- status = True
+ allpassed = True
try:
module = getattr(pipelines, datatype)
@@ -221,20 +231,25 @@
logger.info('Group lines of text-converted file and prepare to publish them.')
for segmentid, datatuple in enumerate(partitioner):
- status &= send_async(segmentid, *datatuple)
+ try: send_async(segmentid, *datatuple)
+ except RuntimeError: allpassed = False
os.remove(staging)
logger.info('Remove CSV-converted file "{0}" from local staging area.'
.format(staging))
+ except IOError as ioe:
+ logger.warning(ioe.message)
+ return
+
except Exception as exc:
- logger.error('[{0}] {1}'.format(exc.__class__.__name__, str(exc).strip()))
+ logger.error('[{0}] {1}'.format(exc.__class__.__name__, exc.message))
return
finally:
if producer: producer.close()
- if status:
+ if allpassed:
logger.info('All segments of "{0}" published successfully to Kafka cluster.'
.format(filename))
return
diff --git a/spot-ingest/common/listener.py b/spot-ingest/common/listener.py
index b70275c..a67a99e 100644
--- a/spot-ingest/common/listener.py
+++ b/spot-ingest/common/listener.py
@@ -35,7 +35,7 @@
'''
Initialize the Spark job.
'''
- Util.get_logger('SPOT.INGEST')
+ Util.get_logger('SPOT.INGEST', kwargs.pop('log_level'))
logger = logging.getLogger('SPOT.INGEST.COMMON.LISTENER')
logger.info('Initializing Spark Streaming Listener...')
@@ -99,6 +99,11 @@
help='name of the consumer group to join for dynamic partition assignment',
metavar='')
+ parser.add_argument('-l', '--log-level',
+ default='INFO',
+ help='determine the level of the logger',
+ metavar='')
+
parser.add_argument('-n', '--app-name',
help='name of the Spark Job to display on the cluster web UI',
metavar='')
diff --git a/spot-ingest/common/producer.py b/spot-ingest/common/producer.py
index f92d42c..784b18d 100644
--- a/spot-ingest/common/producer.py
+++ b/spot-ingest/common/producer.py
@@ -71,7 +71,7 @@
try:
rawbytes = serialize(value)
- if not rawbytes: return
+ if not rawbytes: raise RuntimeError
_future = self.send(topic, rawbytes.getvalue(), key, partition, timestamp_ms)
meta = _future.get(timeout=10)
diff --git a/spot-ingest/common/utils.py b/spot-ingest/common/utils.py
index dbfb9bf..ca443e0 100644
--- a/spot-ingest/common/utils.py
+++ b/spot-ingest/common/utils.py
@@ -32,7 +32,7 @@
class Util(object):
'''
-
+ Utility methods.
'''
@classmethod
diff --git a/spot-ingest/pipelines/dns/processing.py b/spot-ingest/pipelines/dns/processing.py
index d2cf840..17abaea 100644
--- a/spot-ingest/pipelines/dns/processing.py
+++ b/spot-ingest/pipelines/dns/processing.py
@@ -65,8 +65,8 @@
:param csvfile : Path of CSV-converted file; result of `convert` method.
:param max_req_size: The maximum size of a request.
- :returns : A generator which yields a pair of: (timestamp; in milliseconds,
- list of lines from the CSV-converted file)
+ :returns : A generator which yields the timestamp (in milliseconds) and a
+ list of lines from the CSV-converted file.
:rtype : :class:`types.GeneratorType`
:raises IOError : If the given file has no any valid line.
'''
diff --git a/spot-ingest/pipelines/flow/processing.py b/spot-ingest/pipelines/flow/processing.py
index 26986e6..a4455fe 100644
--- a/spot-ingest/pipelines/flow/processing.py
+++ b/spot-ingest/pipelines/flow/processing.py
@@ -49,8 +49,8 @@
with tempfile.NamedTemporaryFile(prefix=prefix, dir=tmpdir, delete=False) as fp:
command = COMMAND.format(netflow, opts, fp.name)
- Util.popen(command, raises=True)
logger.debug('Execute command: {0}'.format(command))
+ Util.popen(command, raises=True)
return fp.name
diff --git a/spot-ingest/pipelines/flow/streaming.py b/spot-ingest/pipelines/flow/streaming.py
index 187f828..18c94d9 100644
--- a/spot-ingest/pipelines/flow/streaming.py
+++ b/spot-ingest/pipelines/flow/streaming.py
@@ -101,7 +101,7 @@
'''
Return the type of the received segments.
'''
- return 'netflow segment'
+ return 'netflow segments'
@staticmethod
def parse(fields):
@@ -115,7 +115,7 @@
unix_tstamp = datetime.datetime.strptime(fields[0], '%Y-%m-%d %H:%M:%S')\
.strftime('%s')
return [
- str(fields[0]),
+ fields[0],
long(unix_tstamp),
int(fields[1]),
int(fields[2]),
@@ -124,12 +124,12 @@
int(fields[5]),
int(fields[6]),
float(fields[7]),
- str(fields[8]),
- str(fields[9]),
+ fields[8],
+ fields[9],
int(fields[10]),
int(fields[11]),
- str(fields[12]),
- str(fields[13]),
+ fields[12],
+ fields[13],
int(fields[14]),
int(fields[15]),
long(fields[16]),
@@ -142,7 +142,7 @@
int(fields[23]),
int(fields[24]),
int(fields[25]),
- str(fields[26]),
+ fields[26],
int(fields[1]),
int(fields[2]),
int(fields[3]),
diff --git a/spot-ingest/start_listener.py b/spot-ingest/start_listener.py
index 7bfb00b..a584434 100644
--- a/spot-ingest/start_listener.py
+++ b/spot-ingest/start_listener.py
@@ -60,6 +60,8 @@
if kwargs['group_id'] is not None:
spark_job += ' -g {0}'.format(kwargs.pop('group_id'))
+ spark_job += ' -l {0}'.format(kwargs.pop('log_level'))
+
if kwargs['app_name'] is not None:
spark_job += ' -n {0}'.format(kwargs.pop('app_name'))
@@ -150,6 +152,11 @@
help='name of the consumer group to join for dynamic partition assignment',
metavar='')
+ parser.add_argument('-l', '--log-level',
+ default='INFO',
+ help='determine the level of the logger',
+ metavar='')
+
parser.add_argument('-m', '--master',
default='yarn',
help='spark://host:port, mesos://host:port, yarn, or local',