blob: 9d5ffc5f46e3a8cbe7b6f7c5a086f1a64e702cee [file] [log] [blame]
#!/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import subprocess
import fnmatch
import re
import pandas as pd
import datetime
from utilities import util
old_oa_path=sys.argv[1]
staging_db=sys.argv[2]
hdfs_staging_path=sys.argv[3]
dest_db = sys.argv[4]
impala_daemon = sys.argv[5]
# Execution example:
#./migrate_old_flow_data.py '/home/spotuser/incubator-spot_old/spot-oa' 'spot_migration' '/user/spotuser/spot_migration/' 'migrated' 'node01'
def main():
log = util.get_logger('SPOT.MIGRATE.FLOW')
cur_path = os.path.dirname(os.path.realpath(__file__))
new_spot_path = os.path.split(os.path.split(cur_path)[0])[0]
new_oa_path = '{0}/spot-oa'.format(new_spot_path)
log.info('New Spot OA path: {0}'.format(new_oa_path))
old_spot_path = os.path.split(old_oa_path)[0]
log.info("Creating HDFS paths for Impala tables")
util.create_hdfs_folder('{0}/flow/scores'.format(hdfs_staging_path),log)
util.create_hdfs_folder('{0}/flow/chords'.format(hdfs_staging_path),log)
util.create_hdfs_folder('{0}/flow/edge'.format(hdfs_staging_path),log)
util.create_hdfs_folder('{0}/flow/summary'.format(hdfs_staging_path),log)
util.create_hdfs_folder('{0}/flow/storyboard'.format(hdfs_staging_path),log)
util.create_hdfs_folder('{0}/flow/threat_investigation'.format(hdfs_staging_path),log)
util.create_hdfs_folder('{0}/flow/timeline'.format(hdfs_staging_path),log)
util.execute_cmd('hdfs dfs -setfacl -R -m user:impala:rwx {0}'.format(hdfs_staging_path),log)
log.info("Creating Staging tables in Impala")
util.execute_cmd('impala-shell -i {0} --var=hpath={1} --var=dbname={2} -c -f create_flow_migration_tables.hql'.format(impala_daemon, hdfs_staging_path, staging_db),log)
## Flow Ingest Summary
log.info('Processing Flow Ingest Summary')
ing_sum_path='{0}/data/flow/ingest_summary/'.format(old_oa_path)
pattern='is_??????.csv'
staging_table_name = 'flow_ingest_summary_tmp'
dest_table_name = 'flow_ingest_summary'
if os.path.exists(ing_sum_path):
for file in fnmatch.filter(os.listdir(ing_sum_path), pattern):
log.info('Processing file: {0}'.format(file))
filepath='{0}{1}'.format(ing_sum_path, file)
df = pd.read_csv(filepath)
s = df.iloc[:,0]
l_dates = list(s.unique())
l_dates = map(lambda x: x[0:10].strip(), l_dates)
l_dates = filter(lambda x: re.match('\d{4}[-/]\d{2}[-/]\d{1}', x), l_dates)
s_dates = set(l_dates)
for date_str in s_dates:
dt = datetime.datetime.strptime(date_str, '%Y-%m-%d')
log.info('Processing day: {0} {1} {2} {3}'.format(date_str, dt.year, dt.month, dt.day))
records = df[df['date'].str.contains(date_str)]
filename = "ingest_summary_{0}{1}{2}.csv".format(dt.year, dt.month, dt.day)
records.to_csv(filename, index=False)
load_cmd = "LOAD DATA LOCAL INPATH '{0}' OVERWRITE INTO TABLE {1}.{2};".format(filename, staging_db, staging_table_name)
util.execute_hive_cmd(load_cmd, log)
insert_cmd = "INSERT INTO {0}.{1} PARTITION (y={2}, m={3}, d={4}) SELECT tdate, total FROM {5}.{6}".format(dest_db, dest_table_name, dt.year, dt.month, dt.day, staging_db, staging_table_name)
util.execute_hive_cmd(insert_cmd, log)
os.remove(filename)
## Iterating days
days_path='{0}/data/flow/'.format(old_oa_path)
if os.path.exists(days_path):
for day_folder in fnmatch.filter(os.listdir(days_path), '2*'):
print day_folder
dt = datetime.datetime.strptime(day_folder, '%Y%m%d')
log.info('Processing day: {0} {1} {2} {3}'.format(day_folder, dt.year, dt.month, dt.day))
full_day_path = '{0}{1}'.format(days_path,day_folder)
## Flow Scores and Flow Threat Investigation
filename = '{0}/flow_scores.csv'.format(full_day_path)
if os.path.isfile(filename):
log.info("Processing Flow Scores")
staging_table_name = 'flow_scores_tmp'
dest_table_name = 'flow_scores'
load_cmd = "LOAD DATA LOCAL INPATH '{0}' OVERWRITE INTO TABLE {1}.{2};".format(filename, staging_db, staging_table_name)
util.execute_hive_cmd(load_cmd, log)
insert_cmd = "INSERT INTO {0}.{1} PARTITION (y={2}, m={3}, d={4}) SELECT tstart,srcip,dstip,sport,dport,proto,ipkt,ibyt,opkt,obyt,score,rank,srcIpInternal,destIpInternal,srcGeo,dstGeo,srcDomain,dstDomain,srcIP_rep,dstIP_rep FROM {5}.{6}".format(dest_db, dest_table_name, dt.year, dt.month, dt.day, staging_db, staging_table_name)
util.execute_hive_cmd(insert_cmd, log)
log.info("Processing Flow Threat Investigation")
staging_table_name = 'flow_scores_tmp'
dest_table_name = 'flow_threat_investigation'
insert_cmd = "INSERT INTO {0}.{1} PARTITION (y={2}, m={3}, d={4}) SELECT tstart,srcip,dstip,sport,dport,sev FROM {5}.{6} WHERE sev > 0;".format(dest_db, dest_table_name, dt.year, dt.month, dt.day, staging_db, staging_table_name)
util.execute_hive_cmd(insert_cmd, log)
# Flow Chords
log.info("Processing Flow Chords")
staging_table_name = 'flow_chords_tmp'
dest_table_name = 'flow_chords'
for file in fnmatch.filter(os.listdir(full_day_path), 'chord*.tsv'):
ip = re.findall("chord-(\S+).tsv", file)[0]
ip = ip.replace('_', '.')
log.info("Processing File: {0} with IP:{1}".format(file, ip))
filename = '{0}/{1}'.format(full_day_path, file)
load_cmd = "LOAD DATA LOCAL INPATH '{0}' OVERWRITE INTO TABLE {1}.{2};".format(filename, staging_db, staging_table_name)
util.execute_hive_cmd(load_cmd, log)
insert_cmd = "INSERT INTO {0}.{1} PARTITION (y={2}, m={3}, d={4}) SELECT '{5}', srcip, dstip, ibyt, ipkt FROM {6}.{7};".format(dest_db, dest_table_name, dt.year, dt.month, dt.day, ip, staging_db, staging_table_name)
util.execute_hive_cmd(insert_cmd, log)
## Flow Edge
log.info("Processing Flow Edge")
staging_table_name = 'flow_edge_tmp'
dest_table_name = 'flow_edge'
pattern = 'edge*.tsv'
edge_files = fnmatch.filter(os.listdir(full_day_path), pattern)
for file in edge_files:
parts = (re.findall("edge-(\S+).tsv", file)[0]).split('-')
hh = int(parts[2])
mn = int(parts[3])
log.info("Processing File: {0} with HH: {1} and MN: {2}".format(file, hh, mn))
filename = '{0}/{1}'.format(full_day_path, file)
load_cmd = "LOAD DATA LOCAL INPATH '{0}' OVERWRITE INTO TABLE {1}.{2};".format(filename, staging_db, staging_table_name)
util.execute_hive_cmd(load_cmd, log)
insert_cmd = "INSERT INTO {0}.{1} PARTITION (y={2}, m={3}, d={4}) SELECT tstart, srcip, dstip, sport, dport, proto, flags, tos, ibyt, ipkt, input, output, rip, obyt, opkt, {5}, {6} FROM {7}.{8} WHERE srcip is not NULL;".format(dest_db, dest_table_name, dt.year, dt.month, dt.day, hh, mn, staging_db, staging_table_name)
util.execute_hive_cmd(insert_cmd, log)
##flow_storyboard
log.info("Processing Flow Storyboard")
staging_table_name = 'flow_storyboard_tmp'
dest_table_name = 'flow_storyboard'
filename = '{0}/threats.csv'.format(full_day_path)
if os.path.isfile(filename):
load_cmd = "LOAD DATA LOCAL INPATH '{0}' OVERWRITE INTO TABLE {1}.{2};".format(filename, staging_db, staging_table_name)
util.execute_hive_cmd(load_cmd, log)
insert_cmd = "INSERT INTO {0}.{1} PARTITION (y={2}, m={3}, d={4}) SELECT ip_threat, title, text FROM {5}.{6};".format(dest_db, dest_table_name, dt.year, dt.month, dt.day, staging_db, staging_table_name)
util.execute_hive_cmd(insert_cmd, log)
##flow_timeline
log.info("Processing Flow Timeline")
staging_table_name = 'flow_timeline_tmp'
dest_table_name = 'flow_timeline'
for file in fnmatch.filter(os.listdir(full_day_path), 'sbdet*.tsv'):
ip = re.findall("sbdet-(\S+).tsv", file)[0]
log.info("Processing File: {0} with IP:{1}".format(file, ip))
filename = '{0}/{1}'.format(full_day_path, file)
load_cmd = "LOAD DATA LOCAL INPATH '{0}' OVERWRITE INTO TABLE {1}.{2};".format(filename, staging_db, staging_table_name)
util.execute_hive_cmd(load_cmd, log)
insert_cmd = "INSERT INTO {0}.{1} PARTITION (y={2}, m={3}, d={4}) SELECT '{5}', tstart, tend, srcip, dstip, proto, sport, dport, ipkt, ibyt FROM {6}.{7};".format(dest_db, dest_table_name, dt.year, dt.month, dt.day, ip, staging_db, staging_table_name)
util.execute_hive_cmd(insert_cmd, log)
log.info("Dropping staging tables")
util.execute_cmd('impala-shell -i {0} --var=dbname={1} -c -f drop_flow_migration_tables.hql'.format(impala_daemon, staging_db),log)
log.info("Removing staging tables' path in HDFS")
util.execute_cmd('hadoop fs -rm -r {0}/flow/'.format(hdfs_staging_path),log)
log.info("Moving CSV data to backup folder")
util.execute_cmd('mkdir {0}/data/backup/'.format(old_oa_path),log)
util.execute_cmd('mv {0}/data/flow/ {0}/data/backup/'.format(old_oa_path),log)
log.info("Invalidating metadata in Impala to refresh tables content")
util.execute_cmd('impala-shell -i {0} -q "INVALIDATE METADATA;"'.format(impala_daemon),log)
log.info("Creating ipynb template structure and copying advanced mode and threat investigation ipynb templates for each pre-existing day in the new Spot location")
ipynb_pipeline_path = '{0}/ipynb/flow/'.format(old_oa_path)
if os.path.exists(ipynb_pipeline_path):
for folder in os.listdir(ipynb_pipeline_path):
log.info("Creating ipynb flow folders in new Spot locaiton: {0}".format(folder))
util.execute_cmd('mkdir -p {0}/ipynb/flow/{1}/'.format(new_oa_path, folder),log)
log.info("Copying advanced mode ipynb template")
util.execute_cmd('cp {0}/oa/flow/ipynb_templates/Advanced_Mode_master.ipynb {0}/ipynb/flow/{1}/Advanced_Mode.ipynb'.format(new_oa_path, folder),log)
log.info("Copying threat investigation ipynb template")
util.execute_cmd('cp {0}/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb {0}/ipynb/flow/{1}/Threat_Investigation.ipynb'.format(new_oa_path, folder),log)
if __name__=='__main__':
main()