[GRIFFIN-301] Update custom data connector to have the same parameters as build-in data connector
Now custom data connectors have different parameters with build-in data connector, which will confuse the user.
For example : https://issues.apache.org/jira/browse/GRIFFIN-300
Author: wankunde <wankunde@163.com>
Closes #556 from wankunde/custom_data_connector.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index 371fb7b..35d8ab8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -89,21 +89,30 @@
}
}
- private def getCustomConnector(session: SparkSession,
- context: StreamingContext,
- param: DataConnectorParam,
- storage: TimestampStorage,
- maybeClient: Option[StreamingCacheClient]): DataConnector = {
- val className = param.getConfig("class").asInstanceOf[String]
+ private def getCustomConnector(sparkSession: SparkSession,
+ ssc: StreamingContext,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage,
+ streamingCacheClientOpt: Option[StreamingCacheClient]): DataConnector = {
+ val className = dcParam.getConfig("class").asInstanceOf[String]
val cls = Class.forName(className)
if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
- val ctx = BatchDataConnectorContext(session, param, storage)
- val meth = cls.getDeclaredMethod("apply", classOf[BatchDataConnectorContext])
- meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
+ val method = cls.getDeclaredMethod("apply",
+ classOf[SparkSession],
+ classOf[DataConnectorParam],
+ classOf[TimestampStorage]
+ )
+ method.invoke(null, sparkSession, dcParam, timestampStorage).asInstanceOf[BatchDataConnector]
} else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
- val ctx = StreamingDataConnectorContext(session, context, param, storage, maybeClient)
- val meth = cls.getDeclaredMethod("apply", classOf[StreamingDataConnectorContext])
- meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
+ val method = cls.getDeclaredMethod("apply",
+ classOf[SparkSession],
+ classOf[StreamingContext],
+ classOf[DataConnectorParam],
+ classOf[TimestampStorage],
+ classOf[Option[StreamingCacheClient]]
+ )
+ method.invoke(null, sparkSession, ssc, dcParam, timestampStorage, streamingCacheClientOpt)
+ .asInstanceOf[StreamingDataConnector]
} else {
throw new ClassCastException(s"$className should extend BatchDataConnector or StreamingDataConnector")
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala
deleted file mode 100644
index c77fb35..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.datasource.connector.batch
-
-import org.apache.spark.sql.SparkSession
-
-import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
-import org.apache.griffin.measure.datasource.TimestampStorage
-
-case class BatchDataConnectorContext(@transient sparkSession: SparkSession,
- dcParam: DataConnectorParam,
- timestampStorage: TimestampStorage)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala
deleted file mode 100644
index ec7a9ff..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.datasource.connector.streaming
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
-
-import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
-import org.apache.griffin.measure.datasource.TimestampStorage
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
-
-case class StreamingDataConnectorContext(@transient sparkSession: SparkSession,
- @transient ssc: StreamingContext,
- dcParam: DataConnectorParam,
- timestampStorage: TimestampStorage,
- streamingCacheClientOpt: Option[StreamingCacheClient])
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
similarity index 68%
rename from measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala
rename to measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
index 0310557..e2c51d3 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
@@ -16,13 +16,14 @@
specific language governing permissions and limitations
under the License.
*/
-package org.apache.griffin.measure.configuration.dqdefinition.reader
+package org.apache.griffin.measure.datasource.connector
import scala.util.Try
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.StreamingContext
import org.scalatest.FlatSpec
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
@@ -30,19 +31,23 @@
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.datasource.connector.DataConnectorFactory
-import org.apache.griffin.measure.datasource.connector.batch.{BatchDataConnector, BatchDataConnectorContext}
-import org.apache.griffin.measure.datasource.connector.streaming.{StreamingDataConnector, StreamingDataConnectorContext}
+import org.apache.griffin.measure.datasource.connector.batch.{BatchDataConnector, MySqlDataConnector}
+import org.apache.griffin.measure.datasource.connector.streaming.{KafkaStreamingStringDataConnector, StreamingDataConnector}
-case class ExampleBatchDataConnector(ctx: BatchDataConnectorContext) extends BatchDataConnector {
- override val sparkSession: SparkSession = ctx.sparkSession
- override val dcParam: DataConnectorParam = ctx.dcParam
- override val timestampStorage: TimestampStorage = ctx.timestampStorage
+case class ExampleBatchDataConnector(@transient sparkSession: SparkSession,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage) extends BatchDataConnector {
override def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange(ms))
}
-case class ExampleStreamingDataConnector(ctx: StreamingDataConnectorContext) extends StreamingDataConnector {
+case class ExampleStreamingDataConnector(@transient sparkSession: SparkSession,
+ @transient ssc: StreamingContext,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage,
+ streamingCacheClientOpt: Option[StreamingCacheClient]
+ ) extends StreamingDataConnector {
override type K = Unit
override type V = Unit
override type OUT = Unit
@@ -51,11 +56,6 @@
override def transform(rdd: RDD[this.OUT]): Option[DataFrame] = None
- override val streamingCacheClientOpt: Option[StreamingCacheClient] = ctx.streamingCacheClientOpt
- override val sparkSession: SparkSession = ctx.sparkSession
- override val dcParam: DataConnectorParam = ctx.dcParam
- override val timestampStorage: TimestampStorage = ctx.timestampStorage
-
override def init(): Unit = ()
}
@@ -86,16 +86,25 @@
assert(res.get.data(42)._2.begin == 42)
}
- it should "be able to create custom streaming connector" in {
+ it should "be able to create MySqlDataConnector" in {
val param = DataConnectorParam(
"CUSTOM", null, null,
- Map("class" -> classOf[ExampleStreamingDataConnector].getCanonicalName), Nil)
+ Map("class" -> classOf[MySqlDataConnector].getCanonicalName), Nil)
// apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
val res = DataConnectorFactory.getDataConnector(
null, null, param, null, None)
assert(res.isSuccess)
- assert(res.get.isInstanceOf[ExampleStreamingDataConnector])
- assert(res.get.data(0)._2 == TimeRange.emptyTimeRange)
+ assert(res.get.isInstanceOf[MySqlDataConnector])
+ }
+
+ it should "be able to create KafkaStreamingStringDataConnector" in {
+ val param = DataConnectorParam(
+ "CUSTOM", null, null,
+ Map("class" -> classOf[KafkaStreamingStringDataConnector].getCanonicalName), Nil)
+ val res = DataConnectorFactory.getDataConnector(
+ null, null, param, null, None)
+ assert(res.isSuccess)
+ assert(res.get.isInstanceOf[KafkaStreamingStringDataConnector])
}
it should "fail if class is not extending DataConnectors" in {
@@ -108,7 +117,7 @@
assert(res.isFailure)
assert(res.failed.get.isInstanceOf[ClassCastException])
assert(res.failed.get.getMessage ==
- "org.apache.griffin.measure.configuration.dqdefinition.reader.NotDataConnector" +
+ "org.apache.griffin.measure.datasource.connector.NotDataConnector" +
" should extend BatchDataConnector or StreamingDataConnector")
}
@@ -122,8 +131,10 @@
assert(res.isFailure)
assert(res.failed.get.isInstanceOf[NoSuchMethodException])
assert(res.failed.get.getMessage ==
- "org.apache.griffin.measure.configuration.dqdefinition.reader.DataConnectorWithoutApply" +
- ".apply(org.apache.griffin.measure.datasource.connector.batch.BatchDataConnectorContext)")
+ "org.apache.griffin.measure.datasource.connector.DataConnectorWithoutApply.apply" +
+ "(org.apache.spark.sql.SparkSession, " +
+ "org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam, " +
+ "org.apache.griffin.measure.datasource.TimestampStorage)")
}
}