Bug fix for reflecting a custom sink object

Bug fix for reflecting a custom sink object.

Author: wankunde <wankunde@163.com>

Closes #551 from wankunde/custom_sink.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 2a6d335..6190137 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -82,7 +82,7 @@
   */
 case object MongoSinkType extends SinkType {
   val idPattern = "^(?i)mongo|mongodb$".r
-  val desc = "distinct"
+  val desc = "mongo"
 }
 
 /**
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 7b8bd31..baa1788 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -18,14 +18,15 @@
 */
 package org.apache.griffin.measure.sink
 
-import scala.util.{Success, Try}
+import scala.util.{Failure, Success, Try}
 
+import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.utils.ParamUtil._
 
 case class SinkFactory(sinkParamIter: Iterable[SinkParam],
-                       metricName: String) extends Serializable {
+                       metricName: String) extends Loggable with Serializable {
 
   /**
     * create sink
@@ -51,7 +52,9 @@
     }
     sinkTry match {
       case Success(sink) if (sink.available) => Some(sink)
-      case _ => None
+      case Failure(ex) =>
+        error("Failed to get sink", ex)
+        None
     }
   }
 
@@ -77,9 +80,17 @@
     val className = config.getString("class", "")
     val cls = Class.forName(className)
     if (classOf[Sink].isAssignableFrom(cls)) {
-      val ctx = SinkContext(config, metricName, timeStamp, block)
-      val method = cls.getDeclaredMethod("apply", classOf[SinkContext])
-      method.invoke(null, ctx).asInstanceOf[Sink]
+      val method = cls.getDeclaredMethod("apply",
+        classOf[Map[String, Any]],
+        classOf[String],
+        classOf[Long],
+        classOf[Boolean])
+      method.invoke(
+        null,
+        config,
+        metricName.asInstanceOf[Object],
+        timeStamp.asInstanceOf[Object],
+        block.asInstanceOf[Object]).asInstanceOf[Sink]
     } else {
       throw new ClassCastException(s"$className should extend Sink")
     }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
index 01ccaba..0d7c4d6 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
@@ -25,14 +25,15 @@
 /**
   * sink records and metrics in memory for test.
   *
-  * @param sinkContext
+  * @param config sink configurations
+  * @param metricName
+  * @param timeStamp
+  * @param block
   */
-case class CustomSink(sinkContext: SinkContext) extends Sink {
-  val config: Map[String, Any] = sinkContext.config
-  val metricName: String = sinkContext.metricName
-  val timeStamp: Long = sinkContext.timeStamp
-  val block: Boolean = sinkContext.block
-
+case class CustomSink(config: Map[String, Any],
+                      metricName: String,
+                      timeStamp: Long,
+                      block: Boolean) extends Sink {
   def available(): Boolean = true
 
   def start(msg: String): Unit = {}