blob: 20914182fb1025c3cd240c57573b21c9a0a9e337 [file] [log] [blame]
#!/usr/bin/python
"""
Copyright 2014 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import os
import csv
import json
import multiprocessing
import logging
logging.basicConfig(level=logging.DEBUG)
def is_field_excluded(fieldname=None):
"""
Checks to see if a field name is a member of a list of names to exclude. Modify to suit your own list.
:param fieldname: A string representing a field name
:return: True or False
"""
import re
# List of fields names to exclude
excluded_fields = [
'Audit_auditUpdatedDate',
#'domainName'
]
if fieldname in excluded_fields:
return True
# Regexes to match for exclusion
excluded_regexes = [
['_rawText$', re.IGNORECASE],
]
for regex in excluded_regexes:
if re.search(regex[0], fieldname, regex[1]):
return True
return False
def process_csv(in_filename, out_filename):
"""
Processes a CSV file of WHOIS data and converts each line to a JSON element, skipping specific fields that
are not deemed necessary (domainName, *_rawText, Audit_auditUpdatedDate)
:param in_filename: Input CSV filename with full path
:param out_filename: Output JSON filename with full path
:return: None
"""
if out_filename:
out_fh = open(out_filename, 'wb')
logging.debug('%s: Converting %s to %s' % (multiprocessing.current_process().name, in_filename, out_filename))
else:
logging.debug('%s: Analyzing %s' % (multiprocessing.current_process().name, in_filename))
with open(in_filename, 'rb') as f:
reader = csv.DictReader(f, delimiter=',', quotechar='"')
line_num = 0
try:
for row in reader:
line_num += 1
try:
if out_filename:
# json conversion and output
new_row = {}
for field in reader.fieldnames:
# fields we don't want include these + anything with rawText
#if field not in ['Audit_auditUpdatedDate', 'domainName'] and not field.endswith('_rawText'):
if not is_field_excluded(field):
new_row[field] = row.get(field)
json.dump(new_row, out_fh)
out_fh.write('\n')
else:
# analysis .. check to be sure fileheader and csv row counts match
if len(row) != len(reader.fieldnames):
raise Exception('Field count mismatch: row: %s / fields: %s' % (len(row), len(reader.fieldnames)))
except Exception, e:
logging.warn("Error with file %s, line %s: %s" % (in_filename, line_num, e))
if not out_filename:
logging.info('Analyzed %s: OK' % in_filename)
except Exception, e:
logging.warn(e)
out_fh.close()
##-------------------------------------------------------------------------
def process_files(source_dir, output_dir, max_processes=10, overwrite=False):
"""
Generates a multiprocessing.Pool() queue with a list of input and output files to be processed with processCSV.
Files are added by walking the source_dir and adding any file with a CSV extension. Output is placed into a single
directory for processing. Output filenames are generated using the first part of the directory name so a file
named source_dir/com/1.csv would become outputDir/com_1.json
:param source_dir: Source directory of CSV files
:param output_dir: Output directory for resultant JSON files
:param max_processes: Maximum number of processes run
:return:
"""
logging.info("Processing Whois files from %s" % source_dir)
if output_dir and not os.path.exists(output_dir):
logging.debug("Creating output directory %s" % output_dir)
os.makedirs(output_dir)
logging.info("Starting %s pool workers" % max_processes)
if sys.version.startswith('2.6'):
# no maxtaskperchild in 2.6
pool = multiprocessing.Pool(processes=max_processes)
else:
pool = multiprocessing.Pool(processes=max_processes, maxtasksperchild=4)
filecount = 0
for dirname, dirnames, filenames in os.walk(source_dir):
for filename in filenames:
if filename.endswith('.csv'):
# output files go to outputDir and are named using the last subdirectory from the dirname
if output_dir:
out_filename = filename.replace('csv', 'json')
out_filename = os.path.join(output_dir, '%s_%s' % (os.path.split(dirname)[-1], out_filename))
# if file does not exist or if overwrite is true, add file process to the pool
if not os.path.isfile(out_filename) or overwrite:
pool.apply_async(process_csv, args=(os.path.join(dirname, filename), out_filename))
filecount += 1
else:
logging.info("Skipping %s, %s exists and overwrite is false" % (filename, out_filename))
else:
# no outputdir so we just analyze the files
pool.apply_async(process_csv, args=(os.path.join(dirname, filename), None))
filecount += 1
try:
pool.close()
logging.info("Starting activities on %s CSV files" % filecount)
pool.join()
except KeyboardInterrupt:
logging.info("Aborting")
pool.terminate()
logging.info("Completed")
##-------------------------------------------------------------------------
if __name__ == "__main__":
max_cpu = multiprocessing.cpu_count()
from optparse import OptionParser
parser = OptionParser()
parser.add_option('-s', '--source', dest='source_dir', action='store',
help='Source directory to walk for CSV files')
parser.add_option('-o', '--output', dest='out_dir', action='store',
help='Output directory for JSON files')
parser.add_option('-O', '--overwrite', dest='overwrite', action='store_true',
help='Overwrite existing files in output directory')
parser.add_option('-p', '--processes', dest='max_processes', action='store', default=max_cpu, type='int',
help='Max number of processes to spawn')
parser.add_option('-a', '--analyze', dest='analyze', action='store_true',
help='Analyze CSV files for validity, no file output')
parser.add_option('-d', '--debug', dest='debug', action='store_true',
help='Enable debug messages')
(options, args) = parser.parse_args()
if not options.source_dir:
logging.error("Source directory required")
sys.exit(-1)
if not options.out_dir or options.analyze:
out_dir = None
elif not options.out_dir:
logging.error("Ouput directory or analysis option required")
sys.exit(-1)
else:
out_dir = options.out_dir
if options.max_processes > max_cpu:
logging.warn('Max Processes (%s) is greater than available Processors (%s)' % (options.max_processes, max_cpu))
if options.debug:
# enable debug level and multiprocessing debugging
logging.basicConfig(level=logging.DEBUG)
multiprocessing.log_to_stderr(logging.DEBUG)
process_files(options.source_dir, options.out_dir, options.max_processes, options.overwrite)