blob: 6637c8782ac1f37e657ad79fda00766766760074 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spot.netflow.model
import org.apache.log4j.Logger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.lda.SpotLDAWrapperSchema._
import org.apache.spot.lda.{SpotLDAHelper, SpotLDAInput, SpotLDAResult, SpotLDAWrapper}
import org.apache.spot.netflow.FlowSchema._
import org.apache.spot.netflow.FlowWordCreator
import org.apache.spot.utilities.FloatPointPrecisionUtility
import org.apache.spot.utilities.data.validation.InvalidDataHandler
/**
* A probabilistic model of the netflow traffic observed in a network.
*
* The model uses a topic-modelling approach that:
* 1. Simplifies netflow records into words, one word at the source IP and another (possibly different) at the
* destination IP.
* 2. The netflow words about each IP are treated as collections of these words.
* 3. A topic modelling approach is used to infer a collection of "topics" that represent common profiles
* of network traffic. These "topics" are probability distributions on words.
* 4. Each IP has a mix of topics corresponding to its behavior.
* 5. The probability of a word appearing in the traffic about an IP is estimated by simplifying its netflow record
* into a word, and then combining the word probabilities per topic using the topic mix of the particular IP.
*
* Create these models using the factory in the companion object.
*
* @param topicCount Number of topics (profiles of common traffic patterns) used in the topic modelling routine.
* @param ipToTopicMix DataFrame assigning a distribution on topics to each document or IP.
* @param wordToPerTopicProb Map assigning to each word it's per-topic probabilities.
* Ie. Prob [word | t ] for t = 0 to topicCount -1
*/
class FlowSuspiciousConnectsModel(topicCount: Int,
ipToTopicMix: DataFrame,
wordToPerTopicProb: Map[String, Array[Double]]) {
def score(sparkSession: SparkSession, flowRecords: DataFrame, precisionUtility: FloatPointPrecisionUtility): DataFrame = {
val wordToPerTopicProbBC = sparkSession.sparkContext.broadcast(wordToPerTopicProb)
/** A left outer join (below) takes rows from the left DF for which the join expression is not
* satisfied (for any entry in the right DF), and fills in 'null' values (for the additional columns).
*/
val dataWithSrcTopicMix = {
val recordsWithSrcIPTopicMixes = flowRecords.join(ipToTopicMix,
flowRecords(SourceIP) === ipToTopicMix(DocumentName), "left_outer")
val schemaWithSrcTopicMix = flowRecords.schema.fieldNames :+ TopicProbabilityMix
val dataWithSrcIpProb: DataFrame = recordsWithSrcIPTopicMixes.selectExpr(schemaWithSrcTopicMix: _*)
.withColumnRenamed(TopicProbabilityMix, SrcIpTopicMix)
val recordsWithIPTopicMixes = dataWithSrcIpProb.join(ipToTopicMix,
dataWithSrcIpProb(DestinationIP) === ipToTopicMix(DocumentName), "left_outer")
val schema = dataWithSrcIpProb.schema.fieldNames :+ TopicProbabilityMix
recordsWithIPTopicMixes.selectExpr(schema: _*).withColumnRenamed(TopicProbabilityMix, DstIpTopicMix)
}
val scoreFunction = new FlowScoreFunction(topicCount, wordToPerTopicProbBC)
import org.apache.spark.sql.functions.udf
val scoringUDF = udf((hour: Int,
srcIP: String,
dstIP: String,
srcPort: Int,
dstPort: Int,
protocol: String,
ibyt: Long,
ipkt: Long,
srcIpTopicMix: Seq[precisionUtility.TargetType],
dstIpTopicMix: Seq[precisionUtility.TargetType]) =>
scoreFunction.score(precisionUtility)(hour,
srcIP,
dstIP,
srcPort,
dstPort,
protocol,
ibyt,
ipkt,
srcIpTopicMix,
dstIpTopicMix))
dataWithSrcTopicMix.withColumn(Score,
scoringUDF(FlowSuspiciousConnectsModel.ModelColumns :+ col(SrcIpTopicMix) :+ col(DstIpTopicMix): _*))
}
}
/**
* Contains DataFrame schema information as well as the train-from-dataframe routine
* (which is a kind of factory routine) for [[FlowSuspiciousConnectsModel]] instances.
*
*/
object FlowSuspiciousConnectsModel {
val ModelSchema = StructType(List(HourField,
SourceIPField,
DestinationIPField,
SourcePortField,
DestinationPortField,
ProtocolField,
IbytField,
IpktField))
val ModelColumns: List[Column] = ModelSchema.fieldNames.toList.map(col)
def trainModel(sparkSession: SparkSession,
logger: Logger,
config: SuspiciousConnectsConfig,
inputRecords: DataFrame): FlowSuspiciousConnectsModel = {
logger.info("Training netflow suspicious connects model from " + config.inputPath)
val selectedRecords = inputRecords.select(ModelColumns: _*)
val totalRecords = selectedRecords.union(FlowFeedback.loadFeedbackDF(sparkSession,
config.feedbackFile,
config.duplicationFactor))
import sparkSession.implicits._
// simplify netflow log entries into "words"
val dataWithWords = totalRecords.withColumn(SourceWord, FlowWordCreator.srcWordUDF(ModelColumns: _*))
.withColumn(DestinationWord, FlowWordCreator.dstWordUDF(ModelColumns: _*))
// Aggregate per-word counts at each IP
val srcWordCounts = dataWithWords
.filter(dataWithWords(SourceWord).notEqual(InvalidDataHandler.WordError))
.select(SourceIP, SourceWord)
.map({ case Row(sourceIp: String, sourceWord: String) => (sourceIp, sourceWord) -> 1 })
.rdd
.reduceByKey(_ + _)
val dstWordCounts = dataWithWords
.filter(dataWithWords(DestinationWord).notEqual(InvalidDataHandler.WordError))
.select(DestinationIP, DestinationWord)
.map({ case Row(destinationIp: String, destinationWord: String) => (destinationIp, destinationWord) -> 1 })
.rdd
.reduceByKey(_ + _)
val ipWordCounts =
sparkSession.sparkContext.union(srcWordCounts, dstWordCounts)
.reduceByKey(_ + _)
.map({ case ((ip, word), count) => SpotLDAInput(ip, word, count) })
val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(ipWordCounts, config.precisionUtility, sparkSession)
val model = SpotLDAWrapper.run(config.topicCount,
logger,
config.ldaPRGSeed,
config.ldaAlpha,
config.ldaBeta,
config.ldaOptimizer,
config.ldaMaxiterations,
spotLDAHelper)
val results: SpotLDAResult = model.predict(spotLDAHelper)
new FlowSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
}
}