blob: e01df0d40425eb0f7dc6cd0edbec127169695da1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Tests for column lineage.
import json
import logging
import os
import pytest
import re
import tempfile
import time
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfABFS, SkipIfADLS, SkipIfS3, SkipIfGCS
LOG = logging.getLogger(__name__)
class TestLineage(CustomClusterTestSuite):
START_END_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="start_end_time")
CREATE_TABLE_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="create_table_time")
DDL_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="ddl_lineage")
LINEAGE_TESTS_DIR = tempfile.mkdtemp(prefix="test_lineage")
def setup_class(cls):
super(TestLineage, cls).setup_class()
def get_workload(cls):
return 'functional-query'
def test_start_end_timestamp(self, vector):
"""Test that 'timestamp' and 'endTime' in the lineage graph are populated with valid
UNIX times.""""lineage_event_log_dir is {0}".format(self.START_END_TIME_LINEAGE_LOG_DIR))
before_time = int(time.time())
query = "select count(*) from functional.alltypes"
result = self.execute_query_expect_success(self.client, query)
profile_query_id ="Query \(id=(.*)\):", result.runtime_profile).group(1)
after_time = int(time.time())"before_time " + str(before_time) + " after_time " + str(after_time))
# Stop the cluster in order to flush the lineage log files.
for log_filename in os.listdir(self.START_END_TIME_LINEAGE_LOG_DIR):
log_path = os.path.join(self.START_END_TIME_LINEAGE_LOG_DIR, log_filename)
# Only the coordinator's log file will be populated.
if os.path.getsize(log_path) > 0:"examining file: " + log_path)
with open(log_path) as log_file:
lineage_json = json.load(log_file)
assert lineage_json["queryId"] == profile_query_id
timestamp = int(lineage_json["timestamp"])
end_time = int(lineage_json["endTime"])
assert before_time <= timestamp
assert timestamp <= end_time
assert end_time <= after_time
else:"empty file: " + log_path)
def test_create_table_timestamp(self, vector, unique_database):
"""Test that 'createTableTime' in the lineage graph are populated with valid value
from HMS."""
query = "create table {0}.lineage_test_tbl as select int_col, tinyint_col " \
"from functional.alltypes".format(unique_database)
result = self.execute_query_expect_success(self.client, query)
profile_query_id ="Query \(id=(.*)\):", result.runtime_profile).group(1)
# Wait to flush the lineage log files.
for log_filename in os.listdir(self.CREATE_TABLE_TIME_LINEAGE_LOG_DIR):
log_path = os.path.join(self.CREATE_TABLE_TIME_LINEAGE_LOG_DIR, log_filename)
# Only the coordinator's log file will be populated.
if os.path.getsize(log_path) > 0:
with open(log_path) as log_file:
lineage_json = json.load(log_file)
assert lineage_json["queryId"] == profile_query_id
vertices = lineage_json["vertices"]
for vertex in vertices:
if vertex["vertexId"] == "int_col":
assert "metadata" in vertex
table_name = vertex["metadata"]["tableName"]
table_create_time = int(vertex["metadata"]["tableCreateTime"])
assert "{0}.lineage_test_tbl".format(unique_database) == table_name
assert table_create_time != -1
def test_ddl_lineage(self, unique_database):
""" Test that DDLs like 'create table' have query text populated in the lineage
query = "create external table {0}.ddl_lineage_tbl (id int)".format(unique_database)
result = self.execute_query_expect_success(self.client, query)
profile_query_id ="Query \(id=(.*)\):", result.runtime_profile).group(1)
# Wait to flush the lineage log files.
for log_filename in os.listdir(self.DDL_LINEAGE_LOG_DIR):
log_path = os.path.join(self.DDL_LINEAGE_LOG_DIR, log_filename)
# Only the coordinator's log file will be populated.
if os.path.getsize(log_path) > 0:
with open(log_path) as log_file:
lineage_json = json.load(log_file)
assert lineage_json["queryId"] == profile_query_id
assert lineage_json["queryText"] is not None
assert lineage_json["queryText"] == query
assert lineage_json["tableLocation"] is not None
# Test explain statements don't create lineages.
query = "explain create table {0}.lineage_test_tbl as select int_col, " \
"tinyint_col from functional.alltypes".format(unique_database)
result = self.execute_query_expect_success(self.client, query)
profile_query_id ="Query \(id=(.*)\):", result.runtime_profile).group(1)
# Wait to flush the lineage log files.
for log_filename in os.listdir(self.DDL_LINEAGE_LOG_DIR):
log_path = os.path.join(self.DDL_LINEAGE_LOG_DIR, log_filename)
# Only the coordinator's log file will be populated.
if os.path.getsize(log_path) > 0:
with open(log_path) as log_file:
lineage_json = json.load(log_file)
assert lineage_json["queryId"] is not profile_query_id
def test_lineage_output(self, vector):
self.run_test_case('QueryTest/lineage', vector)
# Clean up the test database
db_cleanup = "drop database if exists lineage_test_db cascade"
self.execute_query_expect_success(self.client, db_cleanup)