blob: e91befaf57bd0a63d2f27dd352588564dbf3989e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# This module contains utility functions for testing Parquet files,
# and other functions used for checking for strings in files and
# directories.
import os
import re
from subprocess import check_call
from tests.util.filesystem_utils import get_fs_path
def create_table_from_parquet(impala_client, unique_database, table_name):
"""Utility function to create a database table from a Parquet file. A Parquet file must
exist in $IMPALA_HOME/testdata/data with the name 'table_name'.parquet"""
filename = '{0}.parquet'.format(table_name)
local_file = os.path.join(os.environ['IMPALA_HOME'],
assert os.path.isfile(local_file)
# The table doesn't exist, so create the table's directory
tbl_dir = get_fs_path('/test-warehouse/{0}.db/{1}'.format(unique_database, table_name))
check_call(['hdfs', 'dfs', '-mkdir', '-p', tbl_dir])
# Put the parquet file in the table's directory
# Note: -d skips a staging copy
check_call(['hdfs', 'dfs', '-put', '-f', '-d', local_file, tbl_dir])
# Create the table
hdfs_file = '{0}/{1}'.format(tbl_dir, filename)
qualified_table_name = '{0}.{1}'.format(unique_database, table_name)
impala_client.execute('create table {0} like parquet "{1}" stored as parquet'.format(
qualified_table_name, hdfs_file))
def create_table_and_copy_files(impala_client, create_stmt, unique_database, table_name,
# Create the directory
hdfs_dir = get_fs_path('/test-warehouse/{0}.db/{1}'.format(unique_database, table_name))
check_call(['hdfs', 'dfs', '-mkdir', '-p', hdfs_dir])
# Copy the files
# - build a list of source files
# - issue a single put to the hdfs_dir ( -d skips a staging copy)
source_files = []
for local_file in files:
# Cut off leading '/' to make os.path.join() happy
local_file = local_file if local_file[0] != '/' else local_file[1:]
local_file = os.path.join(os.environ['IMPALA_HOME'], local_file)
assert os.path.isfile(local_file)
check_call(['hdfs', 'dfs', '-put', '-f', '-d'] + source_files + [hdfs_dir])
# Create the table
create_stmt = create_stmt.format(db=unique_database, tbl=table_name)
def grep_dir(dir, search, filename_search=""):
'''Recursively search for files that contain 'search' and have a filename that matches
'filename_search' and return a list of matched lines grouped by file.
filename_matcher = re.compile(filename_search)
matching_files = dict()
for dir_name, _, file_names in os.walk(dir):
for file_name in file_names:
file_path = os.path.join(dir_name, file_name)
if os.path.islink(file_path) or not
with open(file_path) as file:
matching_lines = grep_file(file, search)
if matching_lines:
matching_files[file_name] = matching_lines
return matching_files
def grep_file(file, search):
'''Return lines in 'file' that match the 'search' regex. 'file' must already be
matcher = re.compile(search)
matching_lines = list()
for line in file:
return matching_lines
def assert_file_in_dir_contains(dir, search):
'''Asserts that at least one file in the 'dir' contains the 'search' term.'''
results = grep_dir(dir, search)
assert results, "%s should have a file containing '%s' but no file was found" \
% (dir, search)
def assert_no_files_in_dir_contain(dir, search):
'''Asserts that no files in the 'dir' contains the 'search' term.'''
results = grep_dir(dir, search)
assert not results, \
"%s should not have any file containing '%s' but a file was found" \
% (dir, search)