blob: fd4e6a5f986fcc15edcc9355ee2a6c1804f7f2c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.datasource.connector
import scala.util.Try
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.scalatest.flatspec.AnyFlatSpec
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.datasource.connector.batch.{
BatchDataConnector,
MySqlDataConnector
}
import org.apache.griffin.measure.datasource.connector.streaming.{
KafkaStreamingStringDataConnector,
StreamingDataConnector
}
case class ExampleBatchDataConnector(
@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,
timestampStorage: TimestampStorage)
extends BatchDataConnector {
override def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange(ms))
}
case class ExampleStreamingDataConnector(
@transient sparkSession: SparkSession,
@transient ssc: StreamingContext,
dcParam: DataConnectorParam,
timestampStorage: TimestampStorage,
streamingCacheClientOpt: Option[StreamingCacheClient])
extends StreamingDataConnector {
override type K = Unit
override type V = Unit
override type OUT = Unit
override protected def stream(): Try[InputDStream[this.OUT]] = null
override def transform(rdd: RDD[this.OUT]): Option[DataFrame] = None
override def init(): Unit = ()
}
class NotDataConnector
class DataConnectorWithoutApply extends BatchDataConnector {
override val sparkSession: SparkSession = null
override val dcParam: DataConnectorParam = null
override val timestampStorage: TimestampStorage = null
override def data(ms: Long): (Option[DataFrame], TimeRange) = null
}
class DataConnectorFactorySpec extends AnyFlatSpec {
"DataConnectorFactory" should "be able to create custom batch connector" in {
val param = DataConnectorParam(
"CUSTOM",
null,
Map("class" -> classOf[ExampleBatchDataConnector].getCanonicalName),
Nil)
// apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
val res = DataConnectorFactory.getDataConnector(null, null, param, null, None)
assert(res.get != null)
assert(res.isSuccess)
assert(res.get.isInstanceOf[ExampleBatchDataConnector])
assert(res.get.data(42)._2.begin == 42)
}
it should "be able to create MySqlDataConnector" in {
val param = DataConnectorParam(
"CUSTOM",
null,
Map("class" -> classOf[MySqlDataConnector].getCanonicalName),
Nil)
// apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
val res = DataConnectorFactory.getDataConnector(null, null, param, null, None)
assert(res.isSuccess)
assert(res.get.isInstanceOf[MySqlDataConnector])
}
it should "be able to create KafkaStreamingStringDataConnector" in {
val param = DataConnectorParam(
"CUSTOM",
null,
Map("class" -> classOf[KafkaStreamingStringDataConnector].getCanonicalName),
Nil)
val res = DataConnectorFactory.getDataConnector(null, null, param, null, None)
assert(res.isSuccess)
assert(res.get.isInstanceOf[KafkaStreamingStringDataConnector])
}
it should "fail if class is not extending DataConnectors" in {
val param = DataConnectorParam(
"CUSTOM",
null,
Map("class" -> classOf[NotDataConnector].getCanonicalName),
Nil)
// apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
val res = DataConnectorFactory.getDataConnector(null, null, param, null, None)
assert(res.isFailure)
assert(res.failed.get.isInstanceOf[ClassCastException])
assert(
res.failed.get.getMessage ==
"org.apache.griffin.measure.datasource.connector.NotDataConnector" +
" should extend BatchDataConnector or StreamingDataConnector")
}
it should "fail if class does not have apply() method" in {
val param = DataConnectorParam(
"CUSTOM",
null,
Map("class" -> classOf[DataConnectorWithoutApply].getCanonicalName),
Nil)
// apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
val res = DataConnectorFactory.getDataConnector(null, null, param, null, None)
assert(res.isFailure)
assert(res.failed.get.isInstanceOf[NoSuchMethodException])
assert(
res.failed.get.getMessage ==
"org.apache.griffin.measure.datasource.connector.DataConnectorWithoutApply.apply" +
"(org.apache.spark.sql.SparkSession, " +
"org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam, " +
"org.apache.griffin.measure.datasource.TimestampStorage)")
}
}