[GRIFFIN-319] Deprecate old Data Connectors
**What changes were proposed in this pull request?**
This ticket aims to inform users of the deprecated data source connectors.
Deprecated connectors:
- MySqlDataConnector in favour of JDBCBasedDataConnector
- AvroBatchDataConnector in favour of FileBasedDataConnector
- TextDirBatchDataConnector in favour of FileBasedDataConnector
The documentation is also updated corresponding to the new connectors for reference.
**Does this PR introduce any user-facing change?**
No
**How was this patch tested?**
Not Applicable
Author: chitralverma <chitralverma@gmail.com>
Closes #564 from chitralverma/deprecate-old-data-connectors.
diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index ac7b5c2..d1a6c36 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -82,7 +82,7 @@
- **sinks**: This field configures list of metrics sink parameters, multiple sink ways are supported. Details of sink configuration [here](#sinks).
- **griffin.checkpoint**: This field configures list of griffin checkpoint parameters, multiple cache ways are supported. It is only for streaming dq case. Details of info cache configuration [here](#griffin-checkpoint).
-### <a name="sinks"></a>Sinks
+### Sinks
- **type**: Metrics and records sink type, "console", "hdfs", "http", "mongo", "custom".
- **config**: Configure parameters of each sink type.
+ console sink (aliases: "log")
@@ -105,7 +105,7 @@
User-provided data sink should be present in Spark job's class path, by providing custom jar as -jar parameter
to spark-submit or by adding to "jars" list in sparkProperties.json.
-### <a name="griffin-checkpoint"></a>Griffin Checkpoint
+### Griffin Checkpoint
- **type**: Griffin checkpoint type, "zk" for zookeeper checkpoint.
- **config**: Configure parameters of griffin checkpoint type.
+ zookeeper checkpoint
@@ -193,33 +193,138 @@
+ rules: List of rules, to define every rule step. Details of rule configuration [here](#rule).
- **sinks**: Whitelisted sink types for this job. Note: no sinks will be used, if empty or omitted.
-### <a name="data-connector"></a>Data Connector
-- **type**: Data connector type: "AVRO", "HIVE", "TEXT-DIR", "CUSTOM" for batch mode; "KAFKA", "CUSTOM" for streaming mode.
-- **version**: Version string of data connector type.
-- **config**: Configure parameters of each data connector type.
- + avro data connector
- * file.path: avro file path, optional, "" as default.
- * file.name: avro file name.
- + hive data connector
- * database: data base name, optional, "default" as default.
- * table.name: table name.
- * where: where conditions string, split by ",", optional.
- e.g. `dt=20170410 AND hour=15, dt=20170411 AND hour=15, dt=20170412 AND hour=15`
- + text dir data connector
- * dir.path: parent directory path.
- * data.dir.depth: integer, depth of data directories, 0 as default.
- * success.file: success file name,
- * done.file:
- + custom connector
- * class: class name for user-provided data connector implementation. For Batch
- it should be implementing BatchDataConnector trait and have static method with signature
- ```def apply(ctx: BatchDataConnectorContext): BatchDataConnector```.
- For Streaming, it should be implementing StreamingDataConnector and have static method
- ```def apply(ctx: StreamingDataConnectorContext): StreamingDataConnector```. User-provided
- data connector should be present in Spark job's class path, by providing custom jar as -jar parameter
- to spark-submit or by adding to "jars" list in sparkProperties.json.
+### Data Connector
-### <a name="rule"></a>Rule
+Data Connectors help connector to external sources on which DQ checks can be applied.
+
+List of supported data connectors:
+ - Hive
+ - Kafka (Steaming only)
+ - **File based:** Parquet, Avro, ORC, CSV, TSV, Text.
+ - **JDBC based:** MySQL, PostgreSQL etc.
+ - **Custom:** Cassandra, ElasticSearch
+
+ #### Configuration
+ A sample data connector configuration is as following,
+
+ ```
+"connectors": [
+ {
+ "type": "file",
+ "version": "1.7",
+ "config": {
+ "key1": "value1",
+ "key2": "value2"
+ }
+ }
+]
+ ```
+
+ ##### Key Parameters:
+ | Name | Type | Description | Supported Values |
+ |:--------|:---------|:---------------------------------------|:-------------------------------------------------|
+ | type | `String` | Type of the Connector | file, hive, kafka (streaming only), jdbc, custom |
+ | version | `String` | Version String of connector (optional) | Depends on connector type |
+ | config | `Object` | Configuration params of the connector | Depends on connector type (see below) |
+
+ ##### For Custom Data Connectors:
+ - **config** object must contain the key **class** whose value specifies class name for user-provided data connector
+ implementation.
+ + For **Batch** it should implement BatchDataConnector trait.
+ + For **Streaming** it should implement StreamingDataConnector trait.
+ - Example:
+ ```
+ "connectors": [
+ {
+ "type": "custom",
+ "config": {
+ "class": "org.apache.griffin.measure.datasource.connector.batch.CassandraDataConnector",
+ ...
+ }
+ }
+ ]
+ ```
+
+ **Note:** User-provided data connector should be present in Spark job's class path, by either providing custom jar with
+`--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark -> config` section of environment config.
+
+ ##### For File based Data Connectors:
+
+ - Currently supported formats like Parquet, ORC, AVRO, Text and Delimited types like CSV, TSV etc.
+ - Local files can also be read by prepending `file://` namespace.
+ - **config** object supports the following keys,
+
+ | Name | Type | Description | Supported Values | Default Values |
+ |:-----------|:---------|:---------------------------------------|:-----------------|:-------------- |
+ | format | `String` | type of file source| parquet, avro, orc, csv, tsv, text | parquet |
+ | paths | `List` | path(s) to be read | | `Empty` |
+ | options | `Object` | format specific options | | `Empty` |
+ | skipOnError| `Boolean`| whether to continue execution if one or more paths are invalid | true, false | false |
+ | schema | `List` | given as list of key value pairs | See example below | `null` |
+
+ - Example:
+ ```
+ "connectors": [
+ {
+ "type": "file",
+ "config": {
+ "format": "csv",
+ "paths": ["/path/to/csv/dir/*", "/path/to/dir/test.csv"],
+ "options": {
+ "header": "true"
+ },
+ "skipOnError": "false",
+ "schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]
+ }
+ }
+ ]
+
+ **Note:** Additional examples of schema:
+- "schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]
+- "schema":[{"name":"user_id","type":"decimal(5,2)","nullable":"true"}]
+- "schema":[{"name":"my_struct","type":"struct<f1:int,f2:string>","nullable":"true"}]
+
+ ##### For Hive Data Connectors:
+ - **config** object supports the following keys,
+ * database: data base name, optional, "default" as default.
+ * table.name: table name.
+ * where: where conditions string, split by ",", optional.
+ e.g. `dt=20170410 AND hour=15, dt=20170411 AND hour=15, dt=20170412 AND hour=15`
+
+ ##### For JDBC based Data Connectors:
+- **config** object supports the following keys,
+
+| Name | Type | Description | Default Values |
+|:-----------|:---------|:---------------------------------------|:-------------- |
+| database | `String` | database name | default |
+| tablename | `String` | table name to be read | `Empty` |
+| url | `String` | the connection string URL to database | `Empty` |
+| user | `String` | user for connection to database | `Empty` |
+| password | `String` | password for connection to database | `null` |
+| driver | `String` | driver class for JDBC connection to database | com.mysql.jdbc.Driver |
+| where | `String` | condition for reading data from table | `Empty` |
+
+- Example:
+ ```
+ "connectors": [
+ {
+ "type": "jdbc",
+ "config": {
+ "database": "default",
+ "tablename": "test",
+ "url": "jdbc:mysql://localhost:3306/default",
+ "user": "test_u",
+ "password": "test_p",
+ "driver": "com.mysql.jdbc.Driver",
+ "where": ""
+ }
+ }
+ ]
+
+**Note:** Jar containing driver class should be present in Spark job's class path, by either providing custom jar with
+`--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark -> config` section of environment config.
+
+### Rule
- **dsl.type**: Rule dsl type, "spark-sql", "df-ops" and "griffin-dsl".
- **dq.type**: DQ type of this rule, only for "griffin-dsl" type. Supported types: "ACCURACY", "PROFILING", "TIMELINESS", "UNIQUENESS", "COMPLETENESS".
- **out.dataframe.name** (step information): Output table name of this rule, could be used in the following rules.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index deb9bd8..ccc5fda 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -32,15 +32,14 @@
object DataConnectorFactory extends Loggable {
+ @deprecated val AvroRegex: Regex = """^(?i)avro$""".r
+ @deprecated val TextDirRegex: Regex = """^(?i)text-dir$""".r
+
val HiveRegex: Regex = """^(?i)hive$""".r
- val AvroRegex: Regex = """^(?i)avro$""".r
val FileRegex: Regex = """^(?i)file$""".r
- val TextDirRegex: Regex = """^(?i)text-dir$""".r
-
val KafkaRegex: Regex = """^(?i)kafka$""".r
-
- val CustomRegex: Regex = """^(?i)custom$""".r
val JDBCRegex: Regex = """^(?i)jdbc$""".r
+ val CustomRegex: Regex = """^(?i)custom$""".r
/**
* create data connector
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
index dcedf48..d069356 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -28,6 +28,9 @@
/**
* batch data connector for avro file
*/
+@deprecated(
+ s"This class is deprecated. Use '${classOf[FileBasedDataConnector].getCanonicalName}'.",
+ "0.6.0")
case class AvroBatchDataConnector(
@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
index 086596b..3fbd73c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
@@ -42,7 +42,7 @@
* - format : [[String]] specifying the type of file source (parquet, orc, etc.).
* - paths : [[Seq]] specifying the paths to be read
* - options : [[Map]] of format specific options
- * - skipOnError : [[Boolean]] specifying where to continue execution if one or more paths are invalid.
+ * - skipOnError : [[Boolean]] specifying whether to continue execution if one or more paths are invalid.
* - schema : [[Seq]] of {colName, colType and isNullable} given as key value pairs. If provided, this can
* help skip the schema inference step for some underlying data sources.
*
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
index 31502b2..feacfc9 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
@@ -25,7 +25,7 @@
import org.apache.griffin.measure.utils.ParamUtil._
@deprecated(
- "This class is deprecated. Use 'org.apache.griffin.measure.datasource.connector.batch.JDBCBasedDataConnector'",
+ s"This class is deprecated. Use '${classOf[JDBCBasedDataConnector].getCanonicalName}'.",
"0.6.0")
case class MySqlDataConnector(
@transient sparkSession: SparkSession,
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index 35bcaa3..946ff7c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -28,6 +28,9 @@
/**
* batch data connector for directory with text format data in the nth depth sub-directories
*/
+@deprecated(
+ s"This class is deprecated. Use '${classOf[FileBasedDataConnector].getCanonicalName}'.",
+ "0.6.0")
case class TextDirBatchDataConnector(
@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,