blob: 9f56f27ef0097ebf7931281da384ad2527982885 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.datasource.connector
import java.io.File
import java.time.ZonedDateTime
import scala.io.Source
import scala.util.Try
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType
import org.scalatest.matchers.should.Matchers
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.datasource.connector.batch.{
BatchDataConnector,
FileBasedDataConnector
}
import org.apache.griffin.measure.SparkSuiteBase
case class SampleBatchDataConnector(
@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,
timestampStorage: TimestampStorage)
extends BatchDataConnector {
val rawData: Seq[Row] = Seq(
"2015-05-26T00:26:00Z,PENAL CODE/MISC (PENALMI),3900 Block BLOCK EL CAMINO REAL,PALO ALTO,94306",
"2015-05-26T00:26:00Z,DRUNK IN PUBLIC ADULT/MISC (647FA),3900 Block BLOCK EL CAMINO REAL,PALO ALTO,94306",
"2015-05-26T00:26:00Z,PENAL CODE/MISC (PENALMI),3900 Block BLOCK EL CAMINO REAL,PALO ALTO,94306",
"2015-05-26T00:26:00Z,PENAL CODE/MISC (PENALMI),3900 Block BLOCK EL CAMINO REAL,PALO ALTO,94306",
"2015-05-26T02:30:00Z,N&D/POSSESSION (11350),WILKIE WAY & JAMES RD,PALO ALTO,94306")
.map(str => Row(str))
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
val schema = new StructType().add("value", "string", nullable = true)
val df = sparkSession.createDataset(rawData)(RowEncoder(schema))
(preProcess(Option(df), ms), TimeRange(ms))
}
}
class DataConnectorPreProcTest extends SparkSuiteBase with Matchers {
private def castSeq(s: Seq[Any]): Seq[Any] = {
val arr = s.toArray
arr(0) =
Try(java.sql.Timestamp.from(ZonedDateTime.parse(s(0).toString).toInstant)).getOrElse(null)
arr(4) = Try(s(4).toString.toInt).getOrElse(null)
arr.toSeq
}
"DataConnector" should "return input data if no pre.proc rules are applied" in {
val param = DataConnectorParam(
"CUSTOM",
null,
Map("class" -> classOf[SampleBatchDataConnector].getCanonicalName),
Nil)
val timestampStorage = TimestampStorage()
val ts = 0L
val dc = DataConnectorFactory.getDataConnector(spark, null, param, timestampStorage, None)
assert(dc.isSuccess)
assert(dc.toOption.isDefined)
assert(dc.get.isInstanceOf[SampleBatchDataConnector])
assert(dc.get.data(ts)._1.isDefined)
val expectedRows =
dc.get
.asInstanceOf[SampleBatchDataConnector]
.rawData
.map(x => Row(x.toSeq.:+(ts): _*))
val actualRows = dc.get.data(ts)._1.get.collect()
actualRows should contain theSameElementsAs expectedRows
assert(actualRows.length == expectedRows.length)
}
it should "apply pre.proc rule if provided" in {
val param = DataConnectorParam(
"CUSTOM",
null,
Map("class" -> classOf[SampleBatchDataConnector].getCanonicalName),
List("select split(value, ',') as part from this"))
val timestampStorage = TimestampStorage()
val ts = 0L
val dc = DataConnectorFactory.getDataConnector(spark, null, param, timestampStorage, None)
assert(dc.isSuccess)
assert(dc.toOption.isDefined)
assert(dc.get.isInstanceOf[SampleBatchDataConnector])
assert(dc.get.data(ts)._1.isDefined)
val expectedRows = dc.get
.asInstanceOf[SampleBatchDataConnector]
.rawData
.map(x => Row(x.getString(0).split(",").toSeq, ts))
val actualRows = dc.get.data(ts)._1.get.collect()
actualRows should contain theSameElementsAs expectedRows
assert(actualRows.length == expectedRows.length)
}
it should "chain pre.proc rules in sequence if more than one rule is provided" in {
val param = DataConnectorParam(
"CUSTOM",
null,
Map("class" -> classOf[SampleBatchDataConnector].getCanonicalName),
List(
"select split(value, ',') as part from this",
"select part[0] as date_time, part[1] as incident, part[2] as address, " +
"part[3] as city, part[4] as zipcode from this",
"select cast(date_time as timestamp) as date_time, incident, address, city, " +
"cast(zipcode as int) as zipcode from this"))
val timestampStorage = TimestampStorage()
val ts = 0L
val dc = DataConnectorFactory.getDataConnector(spark, null, param, timestampStorage, None)
assert(dc.isSuccess)
assert(dc.toOption.isDefined)
assert(dc.get.isInstanceOf[SampleBatchDataConnector])
assert(dc.get.data(ts)._1.isDefined)
val expectedRows = dc.get
.asInstanceOf[SampleBatchDataConnector]
.rawData
.map(x => Row(x.getString(0).split(",").toSeq))
.map(r => Row(castSeq(r.getSeq(0)).:+(ts): _*))
val actualRows = dc.get.data(ts)._1.get.collect()
actualRows should contain theSameElementsAs expectedRows
assert(actualRows.length == expectedRows.length)
}
it should "pre process data from other connectors as well" in {
val dataFilePath = "src/test/resources/crime_report_test.csv"
val param = DataConnectorParam(
"file",
null,
Map(
"format" -> "csv",
"paths" -> Seq(dataFilePath),
"schema" -> Seq(
Map("name" -> "date_time", "type" -> "string"),
Map("name" -> "incident", "type" -> "string"),
Map("name" -> "address", "type" -> "string"),
Map("name" -> "city", "type" -> "string"),
Map("name" -> "zipcode", "type" -> "string"))),
List(
"select cast(date_time as timestamp) as date_time, incident, address, city, " +
"cast(zipcode as int) as zipcode from this"))
val timestampStorage = TimestampStorage()
val ts = 0L
val dc = DataConnectorFactory.getDataConnector(spark, null, param, timestampStorage, None)
assert(dc.isSuccess)
assert(dc.toOption.isDefined)
assert(dc.get.isInstanceOf[FileBasedDataConnector])
assert(dc.get.data(ts)._1.isDefined)
val source = Source.fromFile(new File(dataFilePath))
val expectedRows = source
.getLines()
.toSeq
.map(x => Row(x.split(",").toSeq))
.map(r => Row(castSeq(r.getSeq(0)).:+(ts): _*))
val actualRows = dc.get.data(ts)._1.get.collect()
actualRows should contain theSameElementsAs expectedRows
assert(actualRows.length == expectedRows.length)
}
}