blob: 274804dfd8f2d95f77cefd7fd3b8aabe355e7b5b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spot.proxy
import org.apache.log4j.{Level, LogManager}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.proxy.ProxySchema._
import org.apache.spot.testutils.TestingSparkContextFlatSpec
import org.apache.spot.utilities.FloatPointPrecisionUtility32
import org.scalatest.Matchers
case class ProxyInput(p_date: String,
p_time: String,
clientip: String,
host: String,
reqmethod: String,
useragent: String,
resconttype: String,
duration: Int,
username: String,
webcat: String,
referer: String,
respcode: String,
uriport: Int,
uripath: String,
uriquery: String,
serverip: String,
scbytes: Int,
csbytes: Int,
fulluri: String)
class ProxySuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with Matchers {
val emTestConfigProxy = SuspiciousConnectsConfig(analysis = "proxy",
inputPath = "",
feedbackFile = "",
duplicationFactor = 1,
topicCount = 20,
hdfsScoredConnect = "",
threshold = 1.0d,
maxResults = 1000,
outputDelimiter = "\t",
ldaPRGSeed = None,
ldaMaxiterations = 20,
ldaAlpha = 1.02,
ldaBeta = 1.001,
ldaOptimizer = "em")
val onlineTestConfigProxy = SuspiciousConnectsConfig(analysis = "proxy",
inputPath = "",
feedbackFile = "",
duplicationFactor = 1,
topicCount = 20,
hdfsScoredConnect = "",
threshold = 1.0d,
maxResults = 1000,
outputDelimiter = "\t",
ldaPRGSeed = None,
ldaMaxiterations = 200,
ldaAlpha = 0.0009,
ldaBeta = 0.00001,
ldaOptimizer = "online")
val testConfigProxyFloatConversion = SuspiciousConnectsConfig(analysis = "proxy",
inputPath = "",
feedbackFile = "",
duplicationFactor = 1,
topicCount = 20,
hdfsScoredConnect = "",
threshold = 1.0d,
maxResults = 1000,
outputDelimiter = "\t",
ldaPRGSeed = None,
ldaMaxiterations = 20,
ldaAlpha = 1.02,
ldaBeta = 1.001,
precisionUtility = FloatPointPrecisionUtility32)
"proxy supicious connects analysis" should "estimate correct probabilities in toy data with top domain anomaly " +
"using EMLDAOptimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
val (anomalousRecord, typicalRecord) = anomalousAndTypicalRecords()
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord,
typicalRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = ProxySuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfigProxy, data)
val scoredData = model.score(sparkContext, data, emTestConfigProxy.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(Host) === "intel.com").first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(Host) === "maw.bronto.com").collect().map(_.getAs[Double](Score))
Math.abs(anomalyScore - 0.1d) should be <= 0.01d
typicalScores.length shouldBe 9
Math.abs(typicalScores(0) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(1) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(2) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(3) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(4) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(5) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(6) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(7) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(8) - 0.9d) should be <= 0.01d
}
"proxy supicious connects analysis" should "estimate correct probabilities in toy data with top domain anomaly " +
"using OnlineLDAOptimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
val anomalousRecord = ProxyInput("2016-10-03", "04:57:36", "127.0.0.1", "intel.com", "PUT",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36",
"text/plain", 230, "-", "Technology/Internet", "http://www.spoonflower.com/tags/color", "202", 80,
"/sites/c37i4q22szvir8ga3m8mtxaft7gwnm5fio8hfxo35mu81absi1/carts/4b3a313d-50f6-4117-8ffd-4e804fd354ef/fiddle",
"-", "127.0.0.1", 338, 647,
"maw.bronto.com/sites/c37i4q22szvir8ga3m8mtxaft7gwnm5fio8hfxo35mu81absi1/carts/4b3a313d-50f6-4117-8ffd-4e804fd354ef/fiddle")
val typicalRecord = ProxyInput("2016-10-03", "04:57:36", "127.0.0.1", "maw.bronto.com", "PUT",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36",
"text/plain", 230, "-", "Technology/Internet", "http://www.spoonflower.com/tags/color", "202", 80,
"/sites/c37i4q22szvir8ga3m8mtxaft7gwnm5fio8hfxo35mu81absi1/carts/4b3a313d-50f6-4117-8ffd-4e804fd354ef/fiddle",
"-", "127.0.0.1", 338, 647,
"maw.bronto.com/sites/c37i4q22szvir8ga3m8mtxaft7gwnm5fio8hfxo35mu81absi1/carts/4b3a313d-50f6-4117-8ffd-4e804fd354ef/fiddle")
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord,
typicalRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = ProxySuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfigProxy, data)
val scoredData = model.score(sparkContext, data, onlineTestConfigProxy.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(Host) === "intel.com").first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(Host) === "maw.bronto.com").collect().map(_.getAs[Double](Score))
Math.abs(anomalyScore - 0.1d) should be <= 0.01d
typicalScores.length shouldBe 9
Math.abs(typicalScores(0) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(1) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(2) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(3) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(4) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(5) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(6) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(7) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(8) - 0.9d) should be <= 0.01d
}
"proxy suspicious connects analysis" should "estimate correct probabilities in toy data with top domain anomaly " +
"converting probabilities to Float for transportation and converting back to Double" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
val (anomalousRecord, typicalRecord) = anomalousAndTypicalRecords()
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord,
typicalRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = ProxySuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger,
testConfigProxyFloatConversion, data)
val scoredData = model.score(sparkContext, data, testConfigProxyFloatConversion.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(Host) === "intel.com").first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(Host) === "maw.bronto.com").collect().map(_.getAs[Double](Score))
Math.abs(anomalyScore - 0.1d) should be <= 0.01d
typicalScores.length shouldBe 9
Math.abs(typicalScores(0) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(1) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(2) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(3) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(4) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(5) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(6) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(7) - 0.9d) should be <= 0.01d
Math.abs(typicalScores(8) - 0.9d) should be <= 0.01d
}
"filterRecords" should "return data without garbage" in {
val cleanedProxyRecords = ProxySuspiciousConnectsAnalysis
.filterRecords(testProxyRecords.inputProxyRecordsDF)
cleanedProxyRecords.count should be(1)
cleanedProxyRecords.schema.size should be(19)
}
"filterInvalidRecords" should "return invalir records" in {
val invalidProxyRecords = ProxySuspiciousConnectsAnalysis
.filterInvalidRecords(testProxyRecords.inputProxyRecordsDF)
invalidProxyRecords.count should be(5)
invalidProxyRecords.schema.size should be(19)
}
"filterScoredRecords" should "return records with score less or equal to threshold" in {
val threshold = 10e-5
val scoredProxyRecords = ProxySuspiciousConnectsAnalysis
.filterScoredRecords(testProxyRecords.scoredProxyRecordsDF, threshold)
scoredProxyRecords.count should be(2)
}
def anomalousAndTypicalRecords(): (ProxyInput, ProxyInput) = {
val anomalousRecord = ProxyInput("2016-10-03", "04:57:36", "127.0.0.1", "intel.com", "PUT",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36",
"text/plain", 230, "-", "Technology/Internet", "http://www.spoonflower.com/tags/color", "202", 80,
"/sites/c37i4q22szvir8ga3m8mtxaft7gwnm5fio8hfxo35mu81absi1/carts/4b3a313d-50f6-4117-8ffd-4e804fd354ef/fiddle",
"-", "127.0.0.1", 338, 647,
"maw.bronto.com/sites/c37i4q22szvir8ga3m8mtxaft7gwnm5fio8hfxo35mu81absi1/carts/4b3a313d-50f6-4117-8ffd-4e804fd354ef/fiddle")
val typicalRecord = ProxyInput("2016-10-03", "04:57:36", "127.0.0.1", "maw.bronto.com", "PUT",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36",
"text/plain", 230, "-", "Technology/Internet", "http://www.spoonflower.com/tags/color", "202", 80,
"/sites/c37i4q22szvir8ga3m8mtxaft7gwnm5fio8hfxo35mu81absi1/carts/4b3a313d-50f6-4117-8ffd-4e804fd354ef/fiddle",
"-", "127.0.0.1", 338, 647,
"maw.bronto.com/sites/c37i4q22szvir8ga3m8mtxaft7gwnm5fio8hfxo35mu81absi1/carts/4b3a313d-50f6-4117-8ffd-4e804fd354ef/fiddle")
(anomalousRecord, typicalRecord)
}
def testProxyRecords = new {
val sqlContext = new SQLContext(sparkContext)
val inputProxyRecordsRDD = sparkContext.parallelize(wrapRefArray(Array(
Seq(null, "00:09:13", "10.239.160.152", "cn.archive.ubuntu...", "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, "cn.archive.ubuntu..."),
Seq("2016-10-03", null, "10.239.160.152", "cn.archive.ubuntu...", "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, "cn.archive.ubuntu..."),
Seq("2016-10-03", "00:09:13", null, "cn.archive.ubuntu...", "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, "cn.archive.ubuntu..."),
Seq("2016-10-03", "00:09:13", "10.239.160.152", null, "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, "cn.archive.ubuntu..."),
Seq("2016-10-03", "00:09:13", "10.239.160.152", "cn.archive.ubuntu...", "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, null),
Seq("2016-10-03", "00:09:13", "10.239.160.152", "cn.archive.ubuntu...", "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, "cn.archive.ubuntu..."))
.map(row => Row.fromSeq(row))))
val inputProxyRecordsSchema = StructType(
Array(DateField,
TimeField,
ClientIPField,
HostField,
ReqMethodField,
UserAgentField,
ResponseContentTypeField,
DurationField,
UserNameField,
WebCatField,
RefererField,
RespCodeField,
URIPortField,
URIPathField,
URIQueryField,
ServerIPField,
SCBytesField,
CSBytesField,
FullURIField))
val inputProxyRecordsDF = sqlContext.createDataFrame(inputProxyRecordsRDD, inputProxyRecordsSchema)
val scoredProxyRecordsRDD = sparkContext.parallelize(wrapRefArray(Array(
Seq("2016-10-03", "00:09:13", "10.239.160.152", "cn.archive.ubuntu...", "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, "cn.archive.ubuntu...", "a word", -1d),
Seq("2016-10-03", "00:09:13", "10.239.160.152", "cn.archive.ubuntu...", "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, "cn.archive.ubuntu...", "a word", 1d),
Seq("2016-10-03", "00:09:13", "10.239.160.152", "cn.archive.ubuntu...", "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, "cn.archive.ubuntu...", "a word", 0.0000005),
Seq("2016-10-03", "00:09:13", "10.239.160.152", "cn.archive.ubuntu...", "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, "cn.archive.ubuntu...", "a word", 0.05),
Seq("2016-10-03", "00:09:13", "10.239.160.152", "cn.archive.ubuntu...", "GET", "Debian APT-HTTP/...", "text/html", 448, "-",
"-", "-", "404", "80", "/ubuntu/dists/tru...", "-", "10.239.4.160", 2864, 218, "cn.archive.ubuntu...", "a word", 0.0001)
).map(row => Row.fromSeq(row))))
val scoredProxyRecordsSchema = StructType(
Array(DateField,
TimeField,
ClientIPField,
HostField,
ReqMethodField,
UserAgentField,
ResponseContentTypeField,
DurationField,
UserNameField,
WebCatField,
RefererField,
RespCodeField,
URIPortField,
URIPathField,
URIQueryField,
ServerIPField,
SCBytesField,
CSBytesField,
FullURIField,
WordField,
ScoreField))
val scoredProxyRecordsDF = sqlContext.createDataFrame(scoredProxyRecordsRDD, scoredProxyRecordsSchema)
}
}