Bug fix for reflecting a custom sink object
Bug fix for reflecting a custom sink object.
Author: wankunde <wankunde@163.com>
Closes #551 from wankunde/custom_sink.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 2a6d335..6190137 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -82,7 +82,7 @@
*/
case object MongoSinkType extends SinkType {
val idPattern = "^(?i)mongo|mongodb$".r
- val desc = "distinct"
+ val desc = "mongo"
}
/**
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 7b8bd31..baa1788 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -18,14 +18,15 @@
*/
package org.apache.griffin.measure.sink
-import scala.util.{Success, Try}
+import scala.util.{Failure, Success, Try}
+import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.utils.ParamUtil._
case class SinkFactory(sinkParamIter: Iterable[SinkParam],
- metricName: String) extends Serializable {
+ metricName: String) extends Loggable with Serializable {
/**
* create sink
@@ -51,7 +52,9 @@
}
sinkTry match {
case Success(sink) if (sink.available) => Some(sink)
- case _ => None
+ case Failure(ex) =>
+ error("Failed to get sink", ex)
+ None
}
}
@@ -77,9 +80,17 @@
val className = config.getString("class", "")
val cls = Class.forName(className)
if (classOf[Sink].isAssignableFrom(cls)) {
- val ctx = SinkContext(config, metricName, timeStamp, block)
- val method = cls.getDeclaredMethod("apply", classOf[SinkContext])
- method.invoke(null, ctx).asInstanceOf[Sink]
+ val method = cls.getDeclaredMethod("apply",
+ classOf[Map[String, Any]],
+ classOf[String],
+ classOf[Long],
+ classOf[Boolean])
+ method.invoke(
+ null,
+ config,
+ metricName.asInstanceOf[Object],
+ timeStamp.asInstanceOf[Object],
+ block.asInstanceOf[Object]).asInstanceOf[Sink]
} else {
throw new ClassCastException(s"$className should extend Sink")
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
index 01ccaba..0d7c4d6 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
@@ -25,14 +25,15 @@
/**
* sink records and metrics in memory for test.
*
- * @param sinkContext
+ * @param config sink configurations
+ * @param metricName
+ * @param timeStamp
+ * @param block
*/
-case class CustomSink(sinkContext: SinkContext) extends Sink {
- val config: Map[String, Any] = sinkContext.config
- val metricName: String = sinkContext.metricName
- val timeStamp: Long = sinkContext.timeStamp
- val block: Boolean = sinkContext.block
-
+case class CustomSink(config: Map[String, Any],
+ metricName: String,
+ timeStamp: Long,
+ block: Boolean) extends Sink {
def available(): Boolean = true
def start(msg: String): Unit = {}