Griffin DSL is designed for DQ measurement, as a SQL-like language, trying to describe the DQ domain request.
Griffin DSL is SQL-like, case insensitive, and easy to learn.
null, nan, true, false
not, and, or
in, between, like, is
select, distinct, from, as, where, group, by, having, order, desc, asc, limit
!, &&, ||, =, !=, <, >, <=, >=, <>
+, -, *, /, %
(, )
., [, ]
"test"
, 'string 1'
, "hello \" world \" "
123
, 33.5
3d
, 5h
, 4ms
true
, false
source
, target
, `my table name`
*
, source.*
, target.*
source.age
, target.name
, user_id
source.attributes[3]
count(*)
, *.count()
, source.user_id.count()
, max(source.age)
source.user_id as id
, target.user_name as name
123
, max(1, 2, 3, 4)
, source.age
, (source.age + 13)
-(100 - source.score)
source.age + 13
, score * 2 + ratio
source.country in ("USA", "CHN", "RSA")
source.age between 3 and 30
, source.age between (3, 30)
source.name like "%abc%"
source.desc is not null
is null
source.age is not nan
(source.user_id = target.user_id AND source.age > target.age)
NOT source.has_data
, !(source.age = target.age)
and
, or
and comparison operators.source.age = target.age OR source.ticket = target.tck
max(source.age, target.age)
, count(*)
select user_id.count(), age.max() as max
, source.user_id.count() as cnt, source.age.min()
from source
, from `target`
where source.id = target.id and source.age = target.age
group by cntry
, group by gender having count(*) > 50
order by name
, order by first_name desc, age asc
limit 5
Accuracy rule expression in Griffin DSL is a logical expression, telling the mapping relation between data sources.
e.g. source.id = target.id and source.name = target.name and source.age between (target.age, target.age + 5)
Profiling rule expression in Griffin DSL is a sql-like expression, with select clause ahead, following optional from clause, where clause, group-by clause, order-by clause, limit clause in order.
e.g. source.gender, source.id.count() where source.age > 20 group by source.gender
, select country, max(age), min(age), count(*) as cnt from source group by country order by cnt desc limit 5
Uniqueness rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the columns to check if is unique.
e.g. name, age
, name, (age + 1) as next_age
Distinctness rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the columns to check if is distinct. e.g. name, age
, name, (age + 1) as next_age
Timeliness rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the input time and output time (calculate time as default if not set).
e.g. ts
, ts, end_ts
Griffin DSL is defined for DQ measurement, to describe DQ domain problem.
Actually, in Griffin, we get Griffin DSL rules, translate them into spark-sql rules for calculation in spark-sql engine.
In DQ domain, there're multiple dimensions, we need to translate them in different ways.
For accuracy, we need to get the match count between source and target, the rule describes the mapping relation between data sources. Griffin needs to translate the dsl rule into multiple sql rules.
For example, the dsl rule is source.id = target.id and source.name = target.name
, which represents the match condition of accuracy. After the translation, the sql rules are as below:
SELECT source.* FROM source LEFT JOIN target ON coalesce(source.id, '') = coalesce(target.id, '') and coalesce(source.name, '') = coalesce(target.name, '') WHERE (NOT (source.id IS NULL AND source.name IS NULL)) AND (target.id IS NULL AND target.name IS NULL)
, save as table miss_items
.SELECT COUNT(*) AS miss FROM miss_items
, save as table miss_count
.SELECT COUNT(*) AS total FROM source
, save as table total_count
.SELECT miss_count.miss AS miss, total_count.total AS total, (total_count.total - miss_count.miss) AS matched FROM miss_count FULL JOIN total_count
, save as table accuracy
.After the translation, the metrics will be persisted in table accuracy
.
For profiling, the request is always the aggregation function of data, the rule is mainly the same as sql, but only supporting select
, from
, where
, group-by
, having
, order-by
, limit
clauses, which can describe most of the profiling requests. If any complicate request, you can use sql rule directly to describe it.
For example, the dsl rule is source.cntry, source.id.count(), source.age.max() group by source.cntry
, which represents the profiling requests. After the translation, the sql rule is as below:
SELECT source.cntry, count(source.id), max(source.age) FROM source GROUP BY source.cntry
, save as table profiling
.After the translation, the metrics will be persisted in table profiling
.
For uniqueness, or called duplicate, is to find out the duplicate items of data, and rollup the items count group by duplicate times. For example, the dsl rule is name, age
, which represents the duplicate requests, in this case, source and target are the same data set. After the translation, the sql rule is as below:
SELECT name, age FROM source
, save as table src
.SELECT name, age FROM target
, save as table tgt
.SELECT src.name, src.age FROM tgt RIGHT JOIN src ON coalesce(src.name, '') = coalesce(tgt.name, '') AND coalesce(src.age, '') = coalesce(tgt.age, '')
, save as table joined
.SELECT name, age, (count(*) - 1) AS dup FROM joined GROUP BY name, age
, save as table grouped
.SELECT count(*) FROM source
, save as table total_metric
.SELECT * FROM grouped WHERE dup = 0
, save as table unique_record
.SELECT count(*) FROM unique_record
, save as table unique_metric
.SELECT * FROM grouped WHERE dup > 0
, save as table dup_record
.SELECT dup, count(*) AS num FROM dup_records GROUP BY dup
, save as table dup_metric
.After the translation, the metrics will be persisted in table dup_metric
.
For distinctness, is to find out the duplicate items of data, the same as uniqueness in batch mode, but with some differences in streaming mode. In most time, you need distinctness other than uniqueness. For example, the dsl rule is name, age
, which represents the distinct requests, in this case, source and target are the same data set. After the translation, the sql rule is as below:
SELECT COUNT(*) AS total FROM source
, save as table total_count
.SELECT name, age, (COUNT(*) - 1) AS dup, TRUE AS dist FROM source GROUP BY name, age
, save as table dup_count
.SELECT COUNT(*) AS dist_count FROM dup_count WHERE dist
, save as table distinct_metric
.SELECT source.*, dup_count.dup AS dup, dup_count.dist AS dist FROM source LEFT JOIN dup_count ON source.name = dup_count.name AND source.age = dup_count.age
, save as table dist_joined
.SELECT *, ROW_NUMBER() OVER (DISTRIBUTE BY name, age SORT BY dist) row_num FROM dist_joined
, save as table row_numbered
.SELECT name, age, dup FROM row_numbered WHERE NOT dist OR row_num > 1
, save as table dup_records
.SELECT name, age, dup, COUNT(*) AS num FROM dup_records GROUP BY name, age, dup
, save as table dup_metric
.After the translation, the metrics will be persisted in table distinct_metric
and dup_metric
.
For timeliness, is to measure the latency of each item, and get the statistics of the latencies.
For example, the dsl rule is ts, out_ts
, the first column means the input time of item, the second column means the output time of item, if not set, __tmst
will be the default output time column. After the translation, the sql rule is as below:
SELECT *, ts AS _bts, out_ts AS _ets FROM source
, save as table origin_time
.SELECT *, (_ets - _bts) AS latency FROM origin_time
, save as table lat
.SELECT CAST(AVG(latency) AS BIGINT) AS avg, MAX(latency) AS max, MIN(latency) AS min FROM lat
, save as table time_metric
.After the translation, the metrics will be persisted in table time_metric
.
You can simply use Griffin DSL rule to describe your problem in DQ domain, for some complicate requirement, you can also use some alternative rules supported by Griffin.
Griffin supports spark-sql directly, you can write rule in sql like this:
{ "dsl.type": "spark-sql", "name": "source", "rule": "SELECT count(id) AS cnt, max(timestamp) AS fresh_time FROM source" }
Griffin will calculate it in spark-sql engine directly.
Griffin supports some other operations on data frame in spark, like converting json string data frame into extracted data frame with extracted object schema. For example:
{ "dsl.type": "df-opr", "name": "ext_source", "rule": "from_json", "details": { "df.name": "json_source" } }
Griffin will do the operation to extract json strings.
Actually, you can also extend the df-opr engine and df-opr adaptor in Griffin to support more types of data frame operations.
Griffin engine runs on spark, it might work in two phases, pre-proc phase and run phase.