blob: 18c94d9607b9394ef43c2f26253cce9ef571c619 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''
Methods to be used during the streaming process.
'''
import datetime
class StreamPipeline:
'''
Create an input stream that pulls netflow messages from Kafka.
:param ssc : :class:`pyspark.streaming.context.StreamingContext` object.
:param zkQuorum : Zookeeper quorum (host[:port],...).
:param groupId : The group id for this consumer.
:param topics : Dictionary of topic -> numOfPartitions to consume. Each
partition is consumed in its own thread.
'''
def __init__(self, ssc, zkQuorum, groupId, topics):
from common.serializer import deserialize
from pyspark.streaming.kafka import KafkaUtils
self.__dstream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics,
keyDecoder=lambda x: x, valueDecoder=deserialize)
@property
def dstream(self):
'''
Return the schema of this :class:`DataFrame` as a
:class:`pyspark.sql.types.StructType`.
'''
return self.__dstream\
.map(lambda x: x[1])\
.flatMap(lambda x: x)\
.map(lambda x: x.split(','))
@property
def schema(self):
'''
Return the data type that represents a row from the received data list.
'''
from pyspark.sql.types import (FloatType, IntegerType, LongType,
ShortType, StringType, StructField, StructType)
return StructType(
[
StructField('treceived', StringType(), True),
StructField('unix_tstamp', LongType(), True),
StructField('tryear', IntegerType(), True),
StructField('trmonth', IntegerType(), True),
StructField('trday', IntegerType(), True),
StructField('trhour', IntegerType(), True),
StructField('trminute', IntegerType(), True),
StructField('trsecond', IntegerType(), True),
StructField('tdur', FloatType(), True),
StructField('sip', StringType(), True),
StructField('dip', StringType(), True),
StructField('sport', IntegerType(), True),
StructField('dport', IntegerType(), True),
StructField('proto', StringType(), True),
StructField('flag', StringType(), True),
StructField('fwd', IntegerType(), True),
StructField('stos', IntegerType(), True),
StructField('ipkt', LongType(), True),
StructField('ibyt', LongType(), True),
StructField('opkt', LongType(), True),
StructField('obyt', LongType(), True),
StructField('input', IntegerType(), True),
StructField('output', IntegerType(), True),
StructField('sas', IntegerType(), True),
StructField('das', IntegerType(), True),
StructField('dtos', IntegerType(), True),
StructField('dir', IntegerType(), True),
StructField('rip', StringType(), True),
StructField('y', ShortType(), True),
StructField('m', ShortType(), True),
StructField('d', ShortType(), True),
StructField('h', ShortType(), True)
]
)
@property
def segtype(self):
'''
Return the type of the received segments.
'''
return 'netflow segments'
@staticmethod
def parse(fields):
'''
Parsing and normalization of data in preparation for import.
:param fields: Column fields of a row.
:returns : A list of typecast-ed fields, according to the table's schema.
:rtype : ``list``
'''
unix_tstamp = datetime.datetime.strptime(fields[0], '%Y-%m-%d %H:%M:%S')\
.strftime('%s')
return [
fields[0],
long(unix_tstamp),
int(fields[1]),
int(fields[2]),
int(fields[3]),
int(fields[4]),
int(fields[5]),
int(fields[6]),
float(fields[7]),
fields[8],
fields[9],
int(fields[10]),
int(fields[11]),
fields[12],
fields[13],
int(fields[14]),
int(fields[15]),
long(fields[16]),
long(fields[17]),
long(fields[18]),
long(fields[19]),
int(fields[20]),
int(fields[21]),
int(fields[22]),
int(fields[23]),
int(fields[24]),
int(fields[25]),
fields[26],
int(fields[1]),
int(fields[2]),
int(fields[3]),
int(fields[4]),
]