blob: d47673311e40eff5a881a10c96d5a848c5cdc31b [file] [log] [blame]
"""
This script adds support for ingesting Bluecoat log files
into Apache Spot.
"""
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import re
import shlex
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import HiveContext
from pyspark.sql.types import *
rex_date = re.compile("\d{4}-\d{2}-\d{2}")
proxy_schema = StructType([
StructField("p_date", StringType(), True),
StructField("p_time", StringType(), True),
StructField("clientip", StringType(), True),
StructField("host", StringType(), True),
StructField("reqmethod", StringType(), True),
StructField("useragent", StringType(), True),
StructField("resconttype", StringType(), True),
StructField("duration", IntegerType(), True),
StructField("username", StringType(), True),
StructField("authgroup", StringType(), True),
StructField("exceptionid", StringType(), True),
StructField("filterresult", StringType(), True),
StructField("webcat", StringType(), True),
StructField("referer", StringType(), True),
StructField("respcode", StringType(), True),
StructField("action", StringType(), True),
StructField("urischeme", StringType(), True),
StructField("uriport", StringType(), True),
StructField("uripath", StringType(), True),
StructField("uriquery", StringType(), True),
StructField("uriextension", StringType(), True),
StructField("serverip", StringType(), True),
StructField("scbytes", IntegerType(), True),
StructField("csbytes", IntegerType(), True),
StructField("virusid", StringType(), True),
StructField("bcappname", StringType(), True),
StructField("bcappoper", StringType(), True),
StructField("fulluri", StringType(), True),
StructField("y", StringType(), True),
StructField("m", StringType(), True),
StructField("d", StringType(), True),
StructField("h", StringType(), True)])
def main():
"""
Handle commandline arguments and
start the collector.
"""
# input Parameters
parser = argparse.ArgumentParser(description="Bluecoat Parser")
parser.add_argument('-zk', '--zookeeper', dest='zk', required=True,
help='Zookeeper IP and port (i.e. 10.0.0.1:2181)', metavar='')
parser.add_argument('-t', '--topic', dest='topic', required=True,
help='Topic to listen for Spark Streaming', metavar='')
parser.add_argument('-db', '--database', dest='db', required=True,
help='Hive database whete the data will be ingested', metavar='')
parser.add_argument('-dt', '--db-table', dest='db_table', required=True,
help='Hive table whete the data will be ingested', metavar='')
parser.add_argument('-w', '--num_of_workers', dest='num_of_workers', required=True,
help='Num of workers for Parallelism in Data Processing', metavar='')
parser.add_argument('-bs', '--batch-size', dest='batch_size', required=True,
help='Batch Size (Milliseconds)', metavar='')
args = parser.parse_args()
# start collector based on data source type.
bluecoat_parse(args.zk, args.topic, args.db, args.db_table, args.num_of_workers, args.batch_size)
def spot_decoder(s):
"""
Dummy decoder function.
:param s: input to decode
:returns: s
"""
if s is None:
return None
return s
def split_log_entry(line):
"""
Split the given line into its fields.
:param line: line to split
:returns: list
"""
lex = shlex.shlex(line)
lex.quotes = '"'
lex.whitespace_split = True
lex.commenters = ''
return list(lex)
def proxy_parser(proxy_fields):
"""
Parse and normalize data.
:param proxy_fields: list with fields from log
:returns: list
"""
proxy_parsed_data = []
if len(proxy_fields) > 1:
# create full URI.
proxy_uri_path = proxy_fields[17] if len(proxy_fields[17]) > 1 else ""
proxy_uri_qry = proxy_fields[18] if len(proxy_fields[18]) > 1 else ""
full_uri = "{0}{1}{2}".format(proxy_fields[15], proxy_uri_path, proxy_uri_qry)
date = proxy_fields[0].split('-')
year = date[0]
month = date[1].zfill(2)
day = date[2].zfill(2)
hour = proxy_fields[1].split(":")[0].zfill(2)
# re-order fields.
proxy_parsed_data = [proxy_fields[0], proxy_fields[1], proxy_fields[3],
proxy_fields[15], proxy_fields[12], proxy_fields[20],
proxy_fields[13], int(proxy_fields[2]), proxy_fields[4],
proxy_fields[5], proxy_fields[6], proxy_fields[7],
proxy_fields[8], proxy_fields[9], proxy_fields[10],
proxy_fields[11], proxy_fields[14], proxy_fields[16],
proxy_fields[17], proxy_fields[18], proxy_fields[19],
proxy_fields[21], int(proxy_fields[22]), int(proxy_fields[23]),
proxy_fields[24], proxy_fields[25], proxy_fields[26],
full_uri, year, month, day, hour]
return proxy_parsed_data
def save_data(rdd, sqc, db, db_table, topic):
"""
Create and save a data frame with the given data.
:param rdd: collection of objects (Resilient Distributed Dataset) to store
:param sqc: Apache Hive context
:param db: Apache Hive database to save into
:param db_table: table of `db` to save into
:param topic: Apache Kafka topic to listen for (if `rdd` is empty)
"""
if not rdd.isEmpty():
df = sqc.createDataFrame(rdd, proxy_schema)
sqc.setConf("hive.exec.dynamic.partition", "true")
sqc.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hive_table = "{0}.{1}".format(db, db_table)
df.write.format("parquet").mode("append").partitionBy('y', 'm', 'd', 'h').insertInto(hive_table)
else:
print("------------------------LISTENING KAFKA TOPIC:{0}------------------------".format(topic))
def bluecoat_parse(zk, topic, db, db_table, num_of_workers, batch_size):
"""
Parse and save bluecoat logs.
:param zk: Apache ZooKeeper quorum
:param topic: Apache Kafka topic (application name)
:param db: Apache Hive database to save into
:param db_table: table of `db` to save into
:param num_of_workers: number of Apache Kafka workers
:param batch_size: batch size for Apache Spark streaming context
"""
app_name = topic
wrks = int(num_of_workers)
# create spark context
sc = SparkContext(appName=app_name)
ssc = StreamingContext(sc, int(batch_size))
sqc = HiveContext(sc)
tp_stream = KafkaUtils.createStream(ssc, zk, app_name, {topic: wrks}, keyDecoder=spot_decoder, valueDecoder=spot_decoder)
proxy_data = tp_stream.map(lambda row: row[1]).flatMap(lambda row: row.split("\n")).filter(lambda row: rex_date.match(row)).map(lambda row: row.strip("\n").strip("\r").replace("\t", " ").replace(" ", " ")).map(lambda row: split_log_entry(row)).map(lambda row: proxy_parser(row))
saved_data = proxy_data.foreachRDD(lambda row: save_data(row, sqc, db, db_table, topic))
ssc.start()
ssc.awaitTermination()
if __name__ == '__main__':
main()