blob: 9d7ee7bc4192d489b7ca259d07b1c3a4e2f32323 [file] [log] [blame]
#!/usr/bin/env python
# coding=utf-8
# Copyright [2019] [Apache Software Foundation]
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import mock
except ImportError:
import unittest.mock as mock
from marvin_python_toolbox.management import hive
@mock.patch('marvin_python_toolbox.management.hive.json')
def test_hive_generateconf_write_file_with_json(mocked_json):
default_conf = [{
"origin_host": "xxx_host_name",
"origin_db": "xxx_db_name",
"origin_queue": "marvin",
"target_table_name": "xxx_table_name",
"sample_sql": "SELECT * FROM XXX",
"sql_id": "1"
}]
mocked_open = mock.mock_open()
with mock.patch('marvin_python_toolbox.management.hive.open', mocked_open, create=True):
hive.hive_generateconf(None)
mocked_open.assert_called_once_with('hive_dataimport.conf', 'w')
mocked_json.dump.assert_called_once_with(default_conf, mocked_open(), indent=2)
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.reset_remote_tables')
def test_hive_resetremote_call_HiveDataImporter_reset_remote_tables(reset_mocked):
hive.hive_resetremote(ctx=None, host="test", engine="test", queue="test")
reset_mocked.assert_called_once_with()
@mock.patch('marvin_python_toolbox.management.hive.read_config')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.__init__')
def test_hive_dataimport_without_config(init_mocked, read_config_mocked):
read_config_mocked.return_value = None
ctx = conf = sql_id = engine = \
skip_remote_preparation = force_copy_files = validate = force =\
force_remote = max_query_size = destination_host = destination_port =\
destination_host_username = destination_host_password = destination_hdfs_root_path = None
hive.hive_dataimport(
ctx, conf, sql_id, engine,
skip_remote_preparation, force_copy_files, validate, force,
force_remote, max_query_size, destination_host, destination_port,
destination_host_username, destination_host_password, destination_hdfs_root_path
)
init_mocked.assert_not_called()
@mock.patch('marvin_python_toolbox.management.hive.read_config')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.__init__')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.table_exists')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.import_sample')
def test_hive_dataimport_with_config(import_sample_mocked, table_exists_mocked, init_mocked, read_config_mocked):
read_config_mocked.return_value = [{'origin_db': 'test', 'target_table_name': 'test'}]
init_mocked.return_value = None
ctx = sql_id = engine = \
skip_remote_preparation = force_copy_files = validate =\
force_remote = max_query_size = destination_port =\
destination_host_username = destination_host_password = destination_hdfs_root_path = None
force = True
conf = '/path/to/conf'
destination_host = 'test'
hive.hive_dataimport(
ctx, conf, sql_id, engine,
skip_remote_preparation, force_copy_files, validate, force,
force_remote, max_query_size, destination_host, destination_port,
destination_host_username, destination_host_password, destination_hdfs_root_path
)
init_mocked.assert_called_once_with(
max_query_size=max_query_size,
destination_host=destination_host,
destination_port=destination_port,
destination_host_username=destination_host_username,
destination_host_password=destination_host_password,
destination_hdfs_root_path=destination_hdfs_root_path,
origin_db='test',
target_table_name='test',
engine=engine,
)
import_sample_mocked.assert_called_once_with(
create_temp_table=True,
copy_files=None,
validate_query=None,
force_create_remote_table=None
)
@mock.patch('marvin_python_toolbox.management.hive.read_config')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.__init__')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.table_exists')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.import_sample')
def test_hive_dataimport_with_config_sql_id(import_sample_mocked, table_exists_mocked, init_mocked, read_config_mocked):
read_config_mocked.return_value = [
{'origin_db': 'test', 'target_table_name': 'test', 'sql_id': 'test'},
{'origin_db': 'bla', 'target_table_name': 'bla', 'sql_id': 'bla'},
]
init_mocked.return_value = None
ctx = sql_id = engine = \
skip_remote_preparation = force_copy_files = validate =\
force_remote = max_query_size = destination_port =\
destination_host_username = destination_host_password = destination_hdfs_root_path = None
sql_id= 'test'
force = True
conf = '/path/to/conf'
destination_host = 'test'
hive.hive_dataimport(
ctx, conf, sql_id, engine,
skip_remote_preparation, force_copy_files, validate, force,
force_remote, max_query_size, destination_host, destination_port,
destination_host_username, destination_host_password, destination_hdfs_root_path
)
init_mocked.assert_called_once_with(
max_query_size=max_query_size,
destination_host=destination_host,
destination_port=destination_port,
destination_host_username=destination_host_username,
destination_host_password=destination_host_password,
destination_hdfs_root_path=destination_hdfs_root_path,
origin_db='test',
target_table_name='test',
sql_id='test',
engine=engine,
)
import_sample_mocked.assert_called_once_with(
create_temp_table=True,
copy_files=None,
validate_query=None,
force_create_remote_table=None
)
@mock.patch('marvin_python_toolbox.management.hive.read_config')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.table_exists')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.import_sample')
def test_hive_dataimport_with_config_force_false(import_sample_mocked, table_exists_mocked, read_config_mocked):
table_exists_mocked.return_value = False
read_config_mocked.return_value = [{
'origin_db': 'test',
'target_table_name': 'test',
'origin_queue':'test',
'origin_host':'test',
'sample_sql':'test',
'sql_id':'test'
}]
ctx = sql_id = engine = \
skip_remote_preparation = force_copy_files = validate =\
force_remote = max_query_size = destination_port =\
destination_host_username = destination_host_password = destination_hdfs_root_path = None
force = False
conf = '/path/to/conf'
destination_host = 'test'
hdi = hive.HiveDataImporter(
max_query_size=max_query_size,
destination_host=destination_host,
destination_port=destination_port,
destination_host_username=destination_host_username,
destination_host_password=destination_host_password,
destination_hdfs_root_path=destination_hdfs_root_path,
origin_db='test',
target_table_name='test',
engine=engine,
sql_id='test',
origin_host='test',
origin_queue='test',
sample_sql='test',
)
with mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter', return_value=hdi):
hive.hive_dataimport(
ctx, conf, sql_id, engine,
skip_remote_preparation, force_copy_files, validate, force,
force_remote, max_query_size, destination_host, destination_port,
destination_host_username, destination_host_password, destination_hdfs_root_path
)
table_exists_mocked.assert_called_once_with(
host=hdi.destination_host, db=hdi.origin_db, table=hdi.target_table_name
)
import_sample_mocked.assert_called_once_with(
create_temp_table=True,
copy_files=None,
validate_query=None,
force_create_remote_table=None
)
@mock.patch('marvin_python_toolbox.management.hive.json')
@mock.patch('marvin_python_toolbox.management.hive.os.path')
def test_read_config_with_existing_path(path_mocked, json_mocked):
path_mocked.exists.return_value = True
path_mocked.join.return_value = 'test.conf'
mocked_open = mock.mock_open()
with mock.patch('marvin_python_toolbox.management.hive.open', mocked_open, create=True):
hive.read_config("test.conf")
mocked_open.assert_called_once_with('test.conf', 'r')
json_mocked.load.assert_called_once_with(mocked_open())
@mock.patch('marvin_python_toolbox.management.hive.json')
@mock.patch('marvin_python_toolbox.management.hive.os.path')
def test_read_config_with_not_existing_path(path_mocked, json_mocked):
path_mocked.exists.return_value = False
path_mocked.join.return_value = 'test.conf'
mocked_open = mock.mock_open()
with mock.patch('marvin_python_toolbox.management.hive.open', mocked_open, create=True):
hive.read_config("test.conf")
mocked_open.assert_not_called()
json_mocked.load.assert_not_called()
class TestHiveDataImporter:
def setup(self):
self.hdi = hive.HiveDataImporter(
max_query_size=13,
destination_host='test',
destination_port=None,
destination_host_username=None,
destination_host_password=None,
destination_hdfs_root_path='/tmp',
origin_db='test',
target_table_name='test',
engine='test',
sql_id='test',
origin_host='test',
origin_queue='test',
sample_sql='test',
)
self.mock_methods = {
'get_createtable_ddl': mock.DEFAULT,
'get_partitions': mock.DEFAULT,
'has_partitions': mock.DEFAULT,
'create_database': mock.DEFAULT,
'table_exists': mock.DEFAULT,
'drop_table': mock.DEFAULT,
'create_table': mock.DEFAULT,
'populate_table': mock.DEFAULT,
'get_table_location': mock.DEFAULT,
'generate_table_location': mock.DEFAULT,
'hdfs_dist_copy': mock.DEFAULT,
'create_external_table': mock.DEFAULT,
'refresh_partitions': mock.DEFAULT,
'drop_view': mock.DEFAULT,
'create_view': mock.DEFAULT,
'validade_query': mock.DEFAULT,
'get_connection': mock.DEFAULT,
'print_finish_step': mock.DEFAULT,
'print_start_step': mock.DEFAULT
}
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.count_rows')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.get_connection')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.retrieve_data_sample')
def test_validade_query(self, retrieve_mocked, connection_mocked, count_rows_mocked):
count_rows_mocked.return_value = 1
connection_mocked.return_value = 'connection_mocked'
retrieve_mocked.return_value = {'estimate_query_mean_per_line': 42}
self.hdi.validade_query()
connection_mocked.assert_called_once_with(
host=self.hdi.origin_host,
db=self.hdi.origin_db,
queue=self.hdi.origin_queue
)
count_rows_mocked.assert_called_once_with(conn='connection_mocked', sql=self.hdi.sample_sql)
retrieve_mocked.assert_called_once_with(conn='connection_mocked', full_table_name=self.hdi.full_table_name)
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.show_log')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.get_connection')
def test_table_exists_table_not_exists(self, connection_mocked, show_log_mocked):
cursor = mock.MagicMock()
conn = mock.MagicMock()
conn.cursor.return_value = cursor
cursor.fetchall.return_value = []
connection_mocked.return_value = conn
table_exists = self.hdi.table_exists(host='host', db='db', table='table')
show_log_mocked.assert_called_once_with(cursor)
assert table_exists is False
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.show_log')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.get_connection')
def test_table_exists_table_exists(self, connection_mocked, show_log_mocked):
cursor = mock.MagicMock()
conn = mock.MagicMock()
conn.cursor.return_value = cursor
cursor.fetchall.return_value = ['test']
connection_mocked.return_value = conn
table_exists = self.hdi.table_exists(host='host', db='db', table='table')
show_log_mocked.assert_has_calls([mock.call(cursor)] * 2)
assert table_exists is True
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.show_log')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.drop_table')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.delete_files')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.get_connection')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._get_ssh_client')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.generate_table_location')
def test_reset_remote_tables_without_valids_tables(self, tb_loc_mock, ssh_cli_mock, conn_mock,
delete_mock, drop_mock, log_mock):
cursor = mock.MagicMock()
conn = mock.MagicMock()
conn.cursor.return_value = cursor
cursor.fetchall.return_value = []
conn_mock.return_value = conn
self.hdi.reset_remote_tables()
conn_mock.assert_called_once_with(
host=self.hdi.origin_host,
db=self.hdi.temp_db_name,
queue=self.hdi.origin_queue
)
log_mock.assert_called_once_with(cursor)
drop_mock.assert_not_called()
tb_loc_mock.assert_not_called()
delete_mock.assert_not_called()
ssh_cli_mock.assert_not_called()
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.show_log')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.drop_table')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.delete_files')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.get_connection')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._get_ssh_client')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.generate_table_location')
def test_reset_remote_tables_with_valids_tables(self, tb_loc_mock, ssh_cli_mock,
conn_mock, delete_mock, drop_mock, log_mock):
cursor = mock.MagicMock()
conn = mock.MagicMock()
conn.cursor.return_value = cursor
cursor.fetchall.return_value = [['test']]
conn_mock.return_value = conn
tb_loc_mock.return_value = 'test'
ssh_cli_mock.return_value = 'test'
self.hdi.reset_remote_tables()
conn_mock.assert_called_once_with(
host=self.hdi.origin_host,
db=self.hdi.temp_db_name,
queue=self.hdi.origin_queue
)
log_mock.assert_called_once_with(cursor)
drop_mock.assert_called_once_with(conn=conn, table_name="marvin.test")
tb_loc_mock.assert_called_once_with(
self.hdi.destination_hdfs_root_path,
self.hdi.origin_host,
self.hdi.temp_db_name + '.db',
"test"
)
delete_mock.assert_called_once_with('test', 'test')
ssh_cli_mock.assert_called_once_with(
self.hdi.origin_host,
self.hdi.destination_host_username,
self.hdi.destination_host_password
)
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.validade_query')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.get_connection')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.print_finish_step')
def test_import_sample_with_invalid_query_and_flag_true_stop(self, finish_step_mock, conn_mock, val_query_mock):
val_query_mock.return_value = False
self.hdi.import_sample(validate_query=True)
val_query_mock.assert_called_once_with()
finish_step_mock.assert_called_once_with()
conn_mock.assert_not_called()
@mock.patch('marvin_python_toolbox.management.hive.print')
def test_import_sample_with_invalid_query_and_flag_false_dont_stop(self, print_mocked):
with mock.patch.multiple('marvin_python_toolbox.management.hive.HiveDataImporter',
**self.mock_methods
) as mocks:
self.hdi.import_sample(validate_query=False)
assert mocks['print_finish_step'].call_count == 6
assert mocks['get_connection'].call_count == 5
mocks['validade_query'].assert_not_called()
@mock.patch('marvin_python_toolbox.management.hive.print')
def test_import_sample_with_partitions_stop(self, print_mocked):
with mock.patch.multiple('marvin_python_toolbox.management.hive.HiveDataImporter',
**self.mock_methods
) as mocks:
conn = mock.MagicMock()
mocks['has_partitions'].return_value = True
mocks['get_connection'].return_value = conn
self.hdi.import_sample(validate_query=True)
assert mocks['get_connection'].call_count == 2
mocks['get_createtable_ddl'].assert_called_once_with(
conn=conn,
origin_table_name=self.hdi.target_table_name,
dest_table_name=self.hdi.temp_table_name
)
mocks['get_partitions'].assert_called_once_with(
mocks['get_createtable_ddl'].return_value
)
mocks['create_database'].assert_not_called()
@mock.patch('marvin_python_toolbox.management.hive.print')
def test_import_sample_with_create_temp_table_false_dont_call_create_table(self, print_mocked):
with mock.patch.multiple('marvin_python_toolbox.management.hive.HiveDataImporter',
**self.mock_methods
) as mocks:
self.hdi.import_sample(create_temp_table=False)
mocks['table_exists'].assert_not_called()
mocks['drop_table'].assert_not_called()
mocks['create_table'].assert_not_called()
mocks['populate_table'].assert_not_called()
@mock.patch('marvin_python_toolbox.management.hive.print')
def test_import_sample_with_create_temp_table_true_call_create_table(self, print_mocked):
with mock.patch.multiple('marvin_python_toolbox.management.hive.HiveDataImporter',
**self.mock_methods
) as mocks:
mocks['has_partitions'].return_value = False
self.hdi.import_sample(create_temp_table=True, force_create_remote_table=True)
assert mocks['drop_table'].call_count == 2
assert mocks['create_table'].call_count == 1
assert mocks['populate_table'].call_count == 1
@mock.patch('marvin_python_toolbox.management.hive.print')
def test_import_sample(self, print_mocked):
with mock.patch.multiple('marvin_python_toolbox.management.hive.HiveDataImporter',
**self.mock_methods
) as mocks:
mocks['validade_query'].return_value = True
mocks['has_partitions'].return_value = False
self.hdi.import_sample()
assert mocks['print_finish_step'].call_count == 6
assert mocks['get_connection'].call_count == 5
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.clean_ddl')
def test_get_createtable_ddl(self, clean_ddl_mocked):
cursor = mock.MagicMock()
conn = mock.MagicMock()
conn.cursor.return_value = cursor
cursor.fetchall.return_value = [['l1'], ['l2']]
dll = mock.MagicMock()
clean_ddl_mocked.return_value = dll
self.hdi.get_createtable_ddl(conn, 'marvin', 'test')
cursor.execute.assert_called_once_with("SHOW CREATE TABLE marvin")
clean_ddl_mocked.assert_called_once_with('l1l2', remove_formats=False, remove_general=True)
dll.replace.assert_called_once_with('marvin', 'test')
cursor.close.assert_called_once_with()
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.show_log')
def test_execute_db_command(self, show_log_mocked):
cursor = mock.MagicMock()
conn = mock.MagicMock()
conn.cursor.return_value = cursor
command = "bla bla bla"
self.hdi._execute_db_command(conn, command)
cursor.execute.assert_called_once_with(command)
show_log_mocked.assert_called_once_with(cursor)
cursor.close.assert_called_once_with()
@mock.patch('marvin_python_toolbox.management.hive.hive')
def test_get_connection(self, pyhive_mocked):
host = 'test'
self.hdi.get_connection(host, db='DEFAULT', queue='default')
pyhive_mocked.connect.assert_called_once_with(
host=host, database='DEFAULT',
configuration={'mapred.job.queue.name': 'default',
' hive.exec.dynamic.partition.mode': 'nonstrict'}
)
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.show_log')
def test_retrieve_data_sample(self, show_log_mocked):
cursor = mock.MagicMock()
conn = mock.MagicMock()
conn.cursor.return_value = cursor
cursor.description = [('table.col', 'type')]
cursor.fetchall.return_value = ['test']
full_table_name = 'test'
sample_limit = 10
data = self.hdi.retrieve_data_sample(conn, full_table_name, sample_limit)
sql = "SELECT * FROM {} TABLESAMPLE ({} ROWS)".format(full_table_name, sample_limit)
cursor.execute.assert_called_once_with(sql)
assert data['data_header'][0]['col'] == 'col'
assert data['data_header'][0]['table'] == 'table'
assert data['data_header'][0]['type'] == 'type'
assert data['total_lines'] == 1
assert data['data'] == ['test']
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.show_log')
def test_count_rows(self, show_log_mocked):
cursor = mock.MagicMock()
conn = mock.MagicMock()
cursor.fetchone.return_value = [42]
conn.cursor.return_value = cursor
sql = "SELECT COL1, COL2 FROM TABLE"
count = self.hdi.count_rows(conn, sql)
assert count == 42
cursor.execute.assert_called_once_with("SELECT COUNT(1) FROM TABLE")
show_log_mocked.assert_called_once_with(cursor)
cursor.close.assert_called_once_with()
@mock.patch('marvin_python_toolbox.management.hive.logger')
def test_show_log(self, logger_mocked):
cursor = mock.MagicMock()
cursor.fetch_logs.return_value = ['log log log']
self.hdi.show_log(cursor)
logger_mocked.debug.assert_called_once_with('log log log')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.show_log')
def test_save_data(self, show_log_mocked):
cursor = mock.MagicMock()
conn = mock.MagicMock()
conn.cursor.return_value = cursor
table = 'test'
data = {
'total_lines': 2,
'data_header': [
{'col': 'test_col_1'},
{'col': 'test_col_2'},
],
'data': [
'header',
'test_val_1',
'test_val_2',
]
}
self.hdi.save_data(conn, table, data)
dml = "INSERT INTO test (test_col_1, test_col_2) VALUES (%s, %s)"
cursor.executemany.assert_called_once_with(dml, [('test_val_1',), ('test_val_2',)])
show_log_mocked.assert_called_once_with(cursor)
cursor.close.assert_called_once_with()
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._execute_db_command')
def test_populate_table_with_partitions(self, exec_comm_mock):
conn = None
table_name = 'test'
sql = 'bla bla bla'
partitions = [{'col': 'test1'}, {'col': 'test2'}]
self.hdi.populate_table(conn, table_name, partitions, sql)
dml = "INSERT OVERWRITE TABLE test PARTITION (test1, test2) bla bla bla"
exec_comm_mock.assert_called_once_with(conn, dml)
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._execute_db_command')
def test_populate_table_without_partitions(self, exec_comm_mock):
conn = None
table_name = 'test'
sql = 'bla bla bla'
partitions = []
self.hdi.populate_table(conn, table_name, partitions, sql)
dml = "INSERT OVERWRITE TABLE test bla bla bla"
exec_comm_mock.assert_called_once_with(conn, dml)
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._execute_db_command')
def test_create_view(self, exec_comm_mock):
conn = None
view_name = 'view_test'
table_name = 'table_test'
self.hdi.create_view(conn, view_name, table_name)
dml = "CREATE VIEW {0} AS SELECT * FROM {1}".format(view_name, table_name)
exec_comm_mock.assert_called_once_with(conn, dml)
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._execute_db_command')
def test_refresh_partitions(self, exec_comm_mock):
conn = None
table_name = 'table_test'
self.hdi.refresh_partitions(conn, table_name)
sttmt = "MSCK REPAIR TABLE {0}".format(table_name)
exec_comm_mock.assert_called_once_with(conn, sttmt)
def test_get_table_location(self):
cursor = mock.MagicMock()
conn = mock.MagicMock()
cursor.fetchall.return_value = [[' location: ', ' hdfs://test ']]
conn.cursor.return_value = cursor
table_name = 'test'
loc = self.hdi.get_table_location(conn, table_name)
cursor.execute.assert_called_once_with("DESCRIBE FORMATTED test")
assert loc == 'hftp://test'
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._hdfs_commands')
def test_delete_files(self, cmd_mocked):
ssh = 'ssh'
url = 'test.com'
self.hdi.delete_files(ssh, url)
cmd = "hdfs dfs -rm -R 'test.com'"
cmd_mocked.assert_called_once_with(ssh, cmd)
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._hdfs_commands')
def test_copy_files(self, cmd_mocked):
ssh = 'ssh'
origin = "/home/"
dest = "/tmp/"
self.hdi.copy_files(ssh, origin, dest)
cmd = "hadoop distcp --update '/home/' '/tmp/'"
cmd_mocked.assert_called_once_with(ssh, cmd)
@mock.patch('marvin_python_toolbox.management.hive.logger')
def test_hdfs_commands(self, logger_mocked):
i = mock.MagicMock()
o = mock.MagicMock()
e = mock.MagicMock()
ssh = mock.MagicMock()
o.readlines.return_value = 'output'
e.readlines.return_value = 'error'
ssh.exec_command.return_value = (i, o, e)
cmd = "command"
out, err = self.hdi._hdfs_commands(ssh, cmd)
assert (out, err) == ('output', 'error')
logger_mocked.debug.assert_any_call("Executing remote command: command")
logger_mocked.debug.assert_any_call("output")
logger_mocked.debug.assert_any_call("error")
@mock.patch('marvin_python_toolbox.management.hive.AutoAddPolicy', spec=True)
@mock.patch('marvin_python_toolbox.management.hive.SSHClient.connect')
@mock.patch('marvin_python_toolbox.management.hive.SSHClient.set_missing_host_key_policy')
def test_get_ssh_client(self, set_missing_mocked, connect_mocked, AutoAddPolicyMocked):
hdfs_host = 'hdfs://test.com'
hdfs_port = '1234'
username = 'user'
password = 'pass'
self.hdi._get_ssh_client(hdfs_host, hdfs_port, username, password)
set_missing_mocked.assert_called_once_with(AutoAddPolicyMocked.return_value)
connect_mocked.assert_called_once_with(
hostname=hdfs_host, port=hdfs_port, username=username, password=password
)
@mock.patch('marvin_python_toolbox.management.hive.sys')
@mock.patch('marvin_python_toolbox.management.hive.logger')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.copy_files')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.delete_files')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._hdfs_commands')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._get_ssh_client')
def test_hdfs_dist_copy(self, ssh_cli_mock, hdfs_comm_mock, del_files_mock, copy_mock, logger_mock, sys_mock):
hdfs_comm_mock.return_value = (42, None)
copy_mock.return_value = (None, None)
ssh = mock.MagicMock()
ssh_cli_mock.return_value = ssh
force = False
hdfs_host = 'hdfs://test.com'
hdfs_port = 1234
origin = '/home/'
dest = '/tmp/'
self.hdi.hdfs_dist_copy(force, hdfs_host, hdfs_port, origin, dest, username=None, password=None)
ssh_cli_mock.assert_called_once_with(hdfs_host, hdfs_port, None, None)
del_files_mock.assert_not_called()
hdfs_comm_mock.assert_any_call(ssh, "hdfs dfs -ls -R '/home/' | grep -E '^-' | wc -l")
hdfs_comm_mock.assert_any_call(ssh, "hdfs dfs -ls -R '/tmp/' | grep -E '^-' | wc -l")
logger_mock.debug.assert_not_called()
sys_mock.exit.assert_not_called()
@mock.patch('marvin_python_toolbox.management.hive.sys')
@mock.patch('marvin_python_toolbox.management.hive.logger')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.copy_files')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.delete_files')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._hdfs_commands')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._get_ssh_client')
def test_hdfs_dist_copy_with_force(self, ssh_cli_mock, hdfs_comm_mock, del_files_mock, copy_mock, logger_mock, sys_mock):
hdfs_comm_mock.return_value = (42, None)
copy_mock.return_value = (None, None)
ssh = mock.MagicMock()
ssh_cli_mock.return_value = ssh
force = True
hdfs_host = 'hdfs://test.com'
hdfs_port = 1234
origin = '/home/'
dest = '/tmp/'
self.hdi.hdfs_dist_copy(force, hdfs_host, hdfs_port, origin, dest, username=None, password=None)
ssh_cli_mock.assert_called_once_with(hdfs_host, hdfs_port, None, None)
del_files_mock.assert_called_once_with(ssh, dest)
hdfs_comm_mock.assert_any_call(ssh, "hdfs dfs -ls -R '/home/' | grep -E '^-' | wc -l")
hdfs_comm_mock.assert_any_call(ssh, "hdfs dfs -ls -R '/tmp/' | grep -E '^-' | wc -l")
logger_mock.debug.assert_not_called()
sys_mock.exit.assert_not_called()
@mock.patch('marvin_python_toolbox.management.hive.sys')
@mock.patch('marvin_python_toolbox.management.hive.logger')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.copy_files')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.delete_files')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._hdfs_commands')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter._get_ssh_client')
def test_hdfs_dist_copy_error_copy(self, ssh_cli_mock, hdfs_comm_mock, del_files_mock, copy_mock, logger_mock, sys_mock):
hdfs_comm_mock.side_effect = [(42, None), (13, None)]
copy_mock.return_value = (None, ['error'])
ssh = mock.MagicMock()
ssh_cli_mock.return_value = ssh
force = False
hdfs_host = 'hdfs://test.com'
hdfs_port = 1234
origin = '/home/'
dest = '/tmp/'
self.hdi.hdfs_dist_copy(force, hdfs_host, hdfs_port, origin, dest, username=None, password=None)
ssh_cli_mock.assert_called_once_with(hdfs_host, hdfs_port, None, None)
del_files_mock.assert_not_called()
hdfs_comm_mock.assert_any_call(ssh, "hdfs dfs -ls -R '/home/' | grep -E '^-' | wc -l")
hdfs_comm_mock.assert_any_call(ssh, "hdfs dfs -ls -R '/tmp/' | grep -E '^-' | wc -l")
logger_mock.debug.assert_called_once_with('error')
sys_mock.exit.assert_called_once_with("Stoping process!")
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.clean_ddl')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.create_table')
@mock.patch('marvin_python_toolbox.management.hive.HiveDataImporter.get_table_format')
def test_create_external_table(self, table_formar_mock, create_table_mock, clean_ddl_mock):
table_formar_mock.return_value = 'test'
conn = None
temp_table_name = 'temp'
ddl = "CREATE TABLE bla bla bla"
parquet_file_location = "/tmp/"
clean_ddl_mock.return_value = ddl
self.hdi.create_external_table(conn, temp_table_name, ddl, parquet_file_location)
table_formar_mock.assert_called_once_with(ddl)
clean_ddl_mock.assert_called_once_with(ddl, remove_formats=True, remove_general=False)
ddl = "CREATE EXTERNAL TABLE bla bla bla STORED AS test LOCATION '/tmp/'"
create_table_mock.assert_called_once_with(conn=conn, table_name=temp_table_name, ddl=ddl)