blob: a88a00a5cbade45a33e86ef6c6ef11210f25c02e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spot.dns
import org.apache.log4j.{Level, LogManager}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.dns.DNSSchema._
import org.apache.spot.dns.model.DNSSuspiciousConnectsModel
import org.apache.spot.testutils.TestingSparkContextFlatSpec
import org.apache.spot.utilities.FloatPointPrecisionUtility32
import org.scalatest.Matchers
case class DNSInput(frame_time: String,
unix_tstamp: Long,
frame_len: Int,
ip_dst: String,
dns_qry_name: String,
dns_qry_class: String,
dns_qry_type: Int,
dns_qry_rcode: Int)
class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with Matchers {
val emTestConfig = SuspiciousConnectsConfig(analysis = "dns",
inputPath = "",
feedbackFile = "",
duplicationFactor = 1,
topicCount = 20,
hdfsScoredConnect = "",
threshold = 1.0d,
maxResults = 1000,
outputDelimiter = "\t",
ldaPRGSeed = None,
ldaMaxiterations = 20,
ldaAlpha = 1.02,
ldaBeta = 1.001,
ldaOptimizer = "em"
)
val onlineTestConfig = SuspiciousConnectsConfig(analysis = "dns",
inputPath = "",
feedbackFile = "",
duplicationFactor = 1,
topicCount = 20,
hdfsScoredConnect = "",
threshold = 1.0d,
maxResults = 1000,
outputDelimiter = "\t",
ldaPRGSeed = None,
ldaMaxiterations = 200,
ldaAlpha = 0.0009,
ldaBeta = 0.00001,
ldaOptimizer = "online"
)
val testConfigFloatConversion = SuspiciousConnectsConfig(analysis = "dns",
inputPath = "",
feedbackFile = "",
duplicationFactor = 1,
topicCount = 20,
hdfsScoredConnect = "",
threshold = 1.0d,
maxResults = 1000,
outputDelimiter = "\t",
ldaPRGSeed = None,
ldaMaxiterations = 20,
ldaAlpha = 1.02,
ldaBeta = 1.001,
precisionUtility = FloatPointPrecisionUtility32)
"dns suspicious connects analysis" should "estimate correct probabilities in toy data with framelength anomaly using" +
" EMLDAOptimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
val anomalousRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 1, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 168, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfig, data)
val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.userDomain, emTestConfig.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(FrameLength) === 1).first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(FrameLength) === 168).collect().map(_.getAs[Double](Score))
Math.abs(anomalyScore - 0.2d) should be <= 0.01d
typicalScores.length shouldBe 4
Math.abs(typicalScores(0) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(1) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(2) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
}
it should "estimate correct probabilities in toy data with framelength anomaly using" +
" OnlineLDAOptimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
val anomalousRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 1, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 168, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain, onlineTestConfig.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(FrameLength) === 1).first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(FrameLength) === 168).collect().map(_.getAs[Double](Score))
Math.abs(anomalyScore - 0.2d) should be <= 0.01d
typicalScores.length shouldBe 4
Math.abs(typicalScores(0) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(1) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(2) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
}
it should "estimate correct probabilities in toy data with subdomain length anomaly " +
"using EMLDAOptimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
val anomalousRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT",
1463735425L,
168,
"172.16.9.132",
"1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com",
"0x00000001",
1,
0)
val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT",
1463735425L,
168,
"172.16.9.132",
"tinker.turner.com",
"0x00000001",
1,
0)
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfig, data)
val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.userDomain, emTestConfig.precisionUtility)
val anomalyScore = scoredData.
filter(scoredData(QueryName) === "1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com").
first().
getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(QueryName) === "tinker.turner.com").collect().map(_.getAs[Double](Score))
Math.abs(anomalyScore - 0.2d) should be <= 0.01d
typicalScores.length shouldBe 4
Math.abs(typicalScores(0) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(1) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(2) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
}
it should "estimate correct probabilities in toy data with subdomain length anomaly " +
"using OnlineLDAOptimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
val anomalousRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT",
1463735425L,
168,
"172.16.9.132",
"1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com",
"0x00000001",
1,
0)
val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT",
1463735425L,
168,
"172.16.9.132",
"tinker.turner.com",
"0x00000001",
1,
0)
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain, onlineTestConfig.precisionUtility)
val anomalyScore = scoredData.
filter(scoredData(QueryName) === "1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com").
first().
getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(QueryName) === "tinker.turner.com").collect().map(_.getAs[Double](Score))
Math.abs(anomalyScore - 0.2d) should be <= 0.01d
typicalScores.length shouldBe 4
Math.abs(typicalScores(0) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(1) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(2) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
}
it should "estimate correct probabilities in toy data with framelength anomaly converting probabilities to Float " +
"for transportation and converting back to Double" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
val anomalousRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 1, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 168, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, testConfigFloatConversion, data)
val scoredData = model.score(sparkContext, sqlContext, data, testConfigFloatConversion.userDomain,
testConfigFloatConversion.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(FrameLength) === 1).first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(FrameLength) === 168).collect().map(_.getAs[Double](Score))
Math.abs(anomalyScore - 0.2d) should be <= 0.01d
typicalScores.length shouldBe 4
Math.abs(typicalScores(0) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(1) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(2) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
}
it should "estimate correct probabilities in toy data with subdomain length anomaly converting probabilities to " +
"Float for transportation and converting back to Double" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
val anomalousRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT",
1463735425L,
168,
"172.16.9.132",
"1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com",
"0x00000001",
1,
0)
val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT",
1463735425L,
168,
"172.16.9.132",
"tinker.turner.com",
"0x00000001",
1,
0)
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, testConfigFloatConversion, data)
val scoredData = model.score(sparkContext, sqlContext, data, testConfigFloatConversion.userDomain,
testConfigFloatConversion.precisionUtility)
val anomalyScore = scoredData.
filter(scoredData(QueryName) === "1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com").
first().
getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(QueryName) === "tinker.turner.com").collect().map(_.getAs[Double](Score))
Math.abs(anomalyScore - 0.2d) should be <= 0.01d
typicalScores.length shouldBe 4
Math.abs(typicalScores(0) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(1) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(2) - 0.8d) should be <= 0.01d
Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
}
"filterRecords" should "return data set without garbage" in {
val cleanedDNSRecords = DNSSuspiciousConnectsAnalysis.filterRecords(testDNSRecords.inputDNSRecordsDF)
cleanedDNSRecords.count should be(8)
cleanedDNSRecords.schema.size should be(8)
}
"filterInvalidRecords" should "return invalid records" in {
val invalidDNSRecords = DNSSuspiciousConnectsAnalysis.filterInvalidRecords(testDNSRecords.inputDNSRecordsDF)
invalidDNSRecords.count should be(15)
invalidDNSRecords.schema.size should be(8)
}
"filterScoredRecords" should "return records with score less or equal to threshold" in {
val threshold = 10e-5
val scoredDNSRecords = DNSSuspiciousConnectsAnalysis
.filterScoredRecords(testDNSRecords.scoredDNSRecordsDF, threshold)
scoredDNSRecords.count should be(2)
}
def testDNSRecords = new {
val sqlContext = new SQLContext(sparkContext)
val inputDNSRecordsRDD = sparkContext.parallelize(wrapRefArray(Array(
Seq(null, 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0),
Seq("", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0),
Seq("-", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", null, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, null, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", null, "0x00000001", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "", "0x00000001", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "-", "0x00000001", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "(empty)", "0x00000001", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, null, "turner.com.122.2o...", "0x00000001", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "", "turner.com.122.2o...", "0x00000001", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "-", "turner.com.122.2o...", "0x00000001", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", null, null, null),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "", null, null),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "-", null, null),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", null, 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "-", 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", null, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, null),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", null, 1, null),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", null, 1, 0),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", null, null))
.map(row => Row.fromSeq(row))))
val inputDNSRecordsSchema = StructType(
Array(TimestampField,
UnixTimestampField,
FrameLengthField,
ClientIPField,
QueryNameField,
QueryClassField,
QueryTypeField,
QueryResponseCodeField))
val inputDNSRecordsDF = sqlContext.createDataFrame(inputDNSRecordsRDD, inputDNSRecordsSchema)
val scoredDNSRecordsRDD = sparkContext.parallelize(wrapRefArray(Array(
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0, 1d),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0, 0.0000005),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0, 0.05),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0, -1d),
Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0, 0.0001))
.map(row => Row.fromSeq(row))))
val scoredDNSRecordsSchema = StructType(
Array(TimestampField,
UnixTimestampField,
FrameLengthField,
ClientIPField,
QueryNameField,
QueryClassField,
QueryTypeField,
QueryResponseCodeField,
ScoreField))
val scoredDNSRecordsDF = sqlContext.createDataFrame(scoredDNSRecordsRDD, scoredDNSRecordsSchema)
}
}