blob: 3d828b27f2f8a4e2c7cef9de72875ec86d115c94 [file] [log] [blame]
#!/usr/bin/env python
# coding=utf-8
# Copyright [2019] [Apache Software Foundation]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import click
import time
import os
import re
import sys
import json
from paramiko import SSHClient, AutoAddPolicy
from pyhive import hive
from slugify import slugify
import hashlib
from .._logging import get_logger
from .._compatibility import six
logger = get_logger('management.hive')
@click.group('hive')
def cli():
pass
@cli.command('hive-generateconf', help='Generate default configuration file')
@click.pass_context
def hive_generateconf_cli(ctx):
hive_generateconf(ctx)
def hive_generateconf(ctx):
default_conf = [{
"origin_host": "xxx_host_name",
"origin_db": "xxx_db_name",
"origin_queue": "marvin",
"target_table_name": "xxx_table_name",
"sample_sql": "SELECT * FROM XXX",
"sql_id": "1"
}]
with open('hive_dataimport.conf', 'w') as outfile:
json.dump(default_conf, outfile, indent=2)
print("Done!!!")
@cli.command('hive-resetremote', help='Drop all remote tables from informed engine on host')
@click.option('--host', '-h', default='marvin-hadoop')
@click.option('--queue', '-h', default='default')
@click.option('--engine', default=(os.path.relpath(".", "..")), help='Marvin engine name (default is the current folder)')
@click.pass_context
def hive_resetremote_cli(ctx, host, engine, queue):
hive_resetremote(ctx, host, engine, queue)
def hive_resetremote(ctx, host, engine, queue):
hdi = HiveDataImporter(
engine=engine,
origin_host=host,
origin_queue=queue,
origin_db=None,
target_table_name=None,
sample_sql=None,
max_query_size=None,
destination_host=None,
destination_port=None,
destination_host_username='vagrant',
destination_host_password='vagrant',
destination_hdfs_root_path='/user/hive/warehouse/',
sql_id=None
)
hdi.reset_remote_tables()
@cli.command(
'hive-dataimport',
help='Export and import data samples from a hive databse to the hive running in this toolbox, cloning same data structure (db and table).')
@click.option('--destination-hdfs-root-path', '-hdfs', default='/user/hive/warehouse/')
@click.option('--destination-host-password', '-p', default='vagrant')
@click.option('--destination-host-username', '-u', default='vagrant')
@click.option('--destination-host', '-dh', default='marvin-hadoop')
@click.option('--destination-port', '-dp', default=22)
@click.option('--max-query-size', '-s', default=(50 * 1024 * 1024), help='Max query size in bytes')
@click.option('--force', is_flag=True, help='Force table creation even table already exists in destination')
@click.option('--force-remote', is_flag=True, help='Force remote temp table creation even table already exists in origin')
@click.option('--validate', is_flag=True, help='Validate the query sample')
@click.option('--force-copy-files', is_flag=True, help='Force the hdfs files copy procedure')
@click.option('--skip-remote-preparation', is_flag=True, help='Skip the creation of remote temp table')
@click.option('--engine', default=(os.path.relpath(".", "..")), help='Marvin engine name (default is the current folder)')
@click.option('--sql-id', '-q', help='If informed the process will be applied exclusivelly for this sample sql')
@click.option('--conf', '-c', default='hive_dataimport.conf', help='Hive data import configuration file')
@click.pass_context
def hive_dataimport_cli(
ctx, conf, sql_id, engine, skip_remote_preparation, force_copy_files, validate, force,
force_remote, max_query_size, destination_host, destination_port, destination_host_username,
destination_host_password, destination_hdfs_root_path
):
hive_dataimport(
ctx, conf, sql_id, engine, skip_remote_preparation, force_copy_files, validate, force,
force_remote, max_query_size, destination_host, destination_port, destination_host_username,
destination_host_password, destination_hdfs_root_path
)
def hive_dataimport(
ctx, conf, sql_id, engine, skip_remote_preparation, force_copy_files, validate, force,
force_remote, max_query_size, destination_host, destination_port, destination_host_username,
destination_host_password, destination_hdfs_root_path
):
initial_start_time = time.time()
confs = read_config(filename=conf)
if confs:
print(chr(27) + "[2J")
if sql_id:
confs = [x for x in confs if x['sql_id'] == sql_id]
for conf in confs:
hdi = HiveDataImporter(
max_query_size=max_query_size,
destination_host=destination_host,
destination_port=destination_port,
destination_host_username=destination_host_username,
destination_host_password=destination_host_password,
destination_hdfs_root_path=destination_hdfs_root_path,
engine=engine,
**conf)
if force:
table_exists = False
else:
table_exists = hdi.table_exists(host=hdi.destination_host, db=hdi.origin_db, table=hdi.target_table_name)
if not table_exists:
hdi.import_sample(
create_temp_table=(not skip_remote_preparation),
copy_files=force_copy_files,
validate_query=validate,
force_create_remote_table=force_remote,
)
else:
print ("Table {} already exists, skiping data import. Use --force flag to force data importation".format(hdi.full_table_name))
print("Total Time : {:.2f}s".format(time.time() - initial_start_time))
print("\n")
def read_config(filename):
fname = os.path.join("", filename)
if os.path.exists(fname):
with open(fname, 'r') as fp:
return json.load(fp)
else:
print("Configuration file {} doesn't exists...".format(filename))
return {}
class HiveDataImporter():
def __init__(
self, origin_host, origin_db, origin_queue, target_table_name, sample_sql, engine,
max_query_size, destination_host, destination_port, destination_host_username, destination_host_password,
destination_hdfs_root_path, sql_id
):
self.sql_id = sql_id
self.origin_host = origin_host
self.origin_db = origin_db
self.origin_queue = origin_queue
self.target_table_name = target_table_name
self.sample_sql = sample_sql
self.engine = engine
self.destination_host = destination_host
self.destination_port = destination_port
self.destination_host_username = destination_host_username
self.destination_host_password = destination_host_password
self.destination_hdfs_root_path = destination_hdfs_root_path
self.temp_db_name = 'marvin'
self.max_query_size = max_query_size
self.supported_format_types = {
'TextInputFormat': 'TEXTFILE',
'SequenceFileInputFormat': 'SEQUENCEFILE',
'OrcInputFormat': 'ORC',
'MapredParquetInputFormat': 'PARQUET',
'AvroContainerInputFormat': 'AVRO',
'RCFileInputFormat': 'RCFILE'
}
print("\n------------------------------------------------------------------------------")
print("Initializing process for sql_id [{}]:".format(self.sql_id))
print(" Origin -->")
print(" Host: [{}]".format(self.origin_host))
print(" DataBase: [{}]".format(self.origin_db))
print(" Table Name: [{}]".format(self.target_table_name))
print(" Sample SQL: [{}]".format(self.sample_sql))
print("\n")
print(" Destination -->")
print(" Host: [{}]".format(self.destination_host))
print(" DataBase: [{}]".format(self.origin_db))
print(" Table Name: [{}]".format(self.target_table_name))
print("\n")
def validade_query(self):
# creating connections
print("Connecting with {} database on {} .. ".format(self.origin_db, self.origin_host))
conn_origin = self.get_connection(host=self.origin_host, db=self.origin_db, queue=self.origin_queue)
print("Counting sample sql ...")
total_rows = self.count_rows(conn=conn_origin, sql=self.sample_sql)
print("Found [{}] rows!".format(total_rows))
print("Retrieve data sample for query estimation reasons...")
data_sample = self.retrieve_data_sample(conn=conn_origin, full_table_name=self.full_table_name)
print("Calculated [{}] bytes per row!".format(data_sample['estimate_query_mean_per_line']))
estimated_size = data_sample['estimate_query_mean_per_line'] * total_rows
print ("Estimated query size is : {} bytes".format(estimated_size))
print ("Max permited query size is: {} bytes".format(self.max_query_size))
return estimated_size <= self.max_query_size
def table_exists(self, host, db, table):
print("Verifiying if table {}.{} exists on {} ...".format(db, table, host))
local_conn = self.get_connection(host=host)
cursor = local_conn.cursor()
cursor.execute("SHOW DATABASES LIKE '{}'".format(db))
dbs = cursor.fetchall()
self.show_log(cursor)
if not len(dbs) == 1:
table_exists = False
else:
cursor.execute("USE {} ".format(db))
cursor.execute("SHOW TABLES LIKE '{}'".format(table))
tbs = cursor.fetchall()
self.show_log(cursor)
if not len(tbs) == 1:
table_exists = False
else:
table_exists = True
cursor.close()
return table_exists
def reset_remote_tables(self):
self.print_start_step(name="Reset Remote Tables for {}".format(self.temp_table_prefix), step_number=1, total_steps=1)
print("Connecting with {} database on {} .. ".format(self.temp_db_name, self.origin_host))
remote_temp_db_conn = self.get_connection(host=self.origin_host, db=self.temp_db_name, queue=self.origin_queue)
cursor = remote_temp_db_conn.cursor()
cursor.execute("SHOW TABLES LIKE '{}*'".format(self.temp_table_prefix))
tbs = cursor.fetchall()
self.show_log(cursor)
cursor.close()
valid_tbs = [tb[0] for tb in tbs]
if valid_tbs:
print("Found {} tables for deletion....".format(len(tbs)))
for tb in valid_tbs:
table_name = "{}.{}".format(self.temp_db_name, tb)
print("Dropping table {} on {} .. ".format(table_name, self.origin_host))
self.drop_table(conn=remote_temp_db_conn, table_name=table_name)
hdfs_location = self.generate_table_location(self.destination_hdfs_root_path, self.origin_host, self.temp_db_name + '.db', tb)
print("Removing hdfs files from {} .. ".format(hdfs_location))
ssh = self._get_ssh_client(self.origin_host, self.destination_host_username, self.destination_host_password)
self.delete_files(ssh, hdfs_location)
else:
print("No table found! Skiping reset remote tables process!!")
self.print_finish_step()
def print_finish_step(self):
print("\n STEP TAKES {:.4f} (seconds) ".format((time.time() - self.start_time)))
def print_start_step(self, name, step_number, total_steps):
print("\n------------------------------------------------------------------------------")
print("MARVIN DATA IMPORT - STEP ({}) of ({}) - [{}]".format(step_number, total_steps, name))
print("------------------------------------------------------------------------------\n")
self.start_time = time.time()
def import_sample(self, create_temp_table=True, copy_files=True, validate_query=True, force_create_remote_table=False):
#
#################################################################################
# Step 1 - Query validation
self.print_start_step(name="Query Validation", step_number=1, total_steps=6)
is_valid = self.validade_query() if validate_query else True
if not is_valid:
print("Informed sample query is not valid!")
self.print_finish_step()
return
self.print_finish_step()
#
##################################################################################
# Step 2 - Testing remote connecitons and getting table schema
self.print_start_step(name="Table Schema Achievement", step_number=2, total_steps=6)
# creating connections
print("Connecting with {} database on {} .. ".format(self.origin_db, self.origin_host))
conn_origin = self.get_connection(host=self.origin_host, db=self.origin_db, queue=self.origin_queue)
print("Connecting with {} database on {} .. ".format(self.temp_db_name, self.origin_host))
remote_temp_db_conn = self.get_connection(host=self.origin_host, db=self.temp_db_name, queue=self.origin_queue)
# getting ddl from real table
print("Getting DDL from {} table ".format(self.target_table_name))
ddl = self.get_createtable_ddl(conn=conn_origin, origin_table_name=self.target_table_name, dest_table_name=self.temp_table_name)
# validanting if partitions is used in query statement
partitions = self.get_partitions(ddl)
if validate_query and self.has_partitions(self.sample_sql, [p['cols'] for p in partitions]):
print("Informed sample query doesn't have valid partitions in the clausule where!!!! Informe at lest one partition.")
print("To disable the query validation use --skip-validation flag.")
self.print_finish_step()
return
print("Connecting with DEFAULT database on {} .. ".format(self.destination_host))
local_conn = self.get_connection(host=self.destination_host)
# creating databases if not exists
print("Creating database {} ...".format(self.origin_db))
self.create_database(conn=local_conn, db=self.origin_db)
print("Connecting with {} database on {} .. ".format(self.origin_db, self.destination_host))
local_conn = self.get_connection(host=self.destination_host, db=self.origin_db)
# creating databases if not exists
print("Creating database {} ...".format(self.temp_db_name))
self.create_database(conn=local_conn, db=self.temp_db_name)
print("Connecting with {} database on {} .. ".format(self.temp_db_name, self.destination_host))
local_temp_db_conn = self.get_connection(host=self.destination_host, db=self.temp_db_name)
self.print_finish_step()
#
##################################################################################
# Step 3 - Remote Table Preparation
self.print_start_step(name="Remote Table Preparation", step_number=3, total_steps=6)
if create_temp_table:
if force_create_remote_table:
remote_table_exists = False
else:
remote_table_exists = self.table_exists(host=self.origin_host, db=self.temp_db_name, table=self.temp_table_name)
# verify if remote table alredy exists
if not remote_table_exists:
print("Dropping table {} on {} .. ".format(self.full_temp_table_name, self.origin_host))
self.drop_table(conn=remote_temp_db_conn, table_name=self.full_temp_table_name)
print("Creating table {} on {} .. ".format(self.full_temp_table_name, self.origin_host))
self.create_table(conn=remote_temp_db_conn, table_name=self.full_temp_table_name, ddl=ddl)
# insert from select
print("Populating table {} on {} using informed sample sql.. ".format(self.full_temp_table_name, self.origin_host))
self.populate_table(conn=conn_origin, table_name=self.full_temp_table_name, partitions=partitions, sql=self.sample_sql)
else:
print("Table {} on {} already exists ...".format(self.full_temp_table_name, self.origin_host))
self.print_finish_step()
#
##################################################################################
# Step 4 - Copying remote hdfs files
self.print_start_step(name="Copying HDFS Files", step_number=4, total_steps=6)
# get temp location
print("Getting hdfs files location from {} table ...".format(self.full_temp_table_name))
temp_table_location = self.get_table_location(conn=remote_temp_db_conn, table_name=self.full_temp_table_name)
# copy hdfs files for local hdfs
external_table_location = self.generate_table_location(
host=self.destination_host,
root_path=self.destination_hdfs_root_path,
db_name=self.temp_db_name, table_name=self.temp_table_name)
print("Copying files from [{}] to [{}]".format(temp_table_location, external_table_location))
self.hdfs_dist_copy(force=copy_files,
hdfs_host=self.destination_host,
hdfs_port=self.destination_port,
origin=temp_table_location,
dest=external_table_location,
password=self.destination_host_password,
username=self.destination_host_username)
self.print_finish_step()
#
##################################################################################
# Step 5 - External table creation using hdfs files
self.print_start_step(name="Local Temporary Table Creation", step_number=5, total_steps=6)
# creating external table using parquet files in hdfs
print("Dropping temp table {} on {} .. ".format(self.full_temp_table_name, self.destination_host))
self.drop_table(conn=local_temp_db_conn, table_name=self.full_temp_table_name)
# create temp table
print("Creating temp table {} using imported hdfs files from [{}] ...".format(self.full_temp_table_name, external_table_location))
self.create_external_table(conn=local_temp_db_conn,
temp_table_name=self.full_temp_table_name,
ddl=ddl,
parquet_file_location=external_table_location)
print("Refreshing table {} partitions on {} ..".format(self.full_temp_table_name, self.destination_host))
self.refresh_partitions(conn=local_temp_db_conn, table_name=self.full_temp_table_name)
self.print_finish_step()
#
##################################################################################
# Step 6 - Destination table creation from external table
self.print_start_step(name="Table population", step_number=6, total_steps=6)
# create view
print("Dropping table view {} on {} .. ".format(self.full_table_name, self.destination_host))
self.drop_view(conn=local_conn, view_name=self.full_table_name)
print("Creating table view {} ... ".format(self.full_table_name, self.destination_host))
self.create_view(conn=local_conn, view_name=self.full_table_name, table_name=self.full_temp_table_name)
self.print_finish_step()
print("Procedure done!!!!")
@property
def temp_table_prefix(self):
return "{}".format(slugify(self.engine).replace('-', '_'))
@property
def temp_table_name(self):
return "{}_{}_{}_{}".format(
self.temp_table_prefix,
self.origin_db,
self.target_table_name,
hashlib.sha1(slugify(self.sample_sql).encode('utf-8')).hexdigest()
)
@property
def full_table_name(self):
return "{}.{}".format(self.origin_db, self.target_table_name)
@property
def full_temp_table_name(self):
return "{}.{}".format(self.temp_db_name, self.temp_table_name)
def generate_table_location(self, root_path, host, db_name, table_name):
return "hdfs://{}:8020{}".format(host, os.path.join(root_path, db_name, table_name))
def clean_ddl(self, ddl, remove_formats=True, remove_general=True):
if remove_general:
# Removing LOCATION statement
regex = "(LOCATION\s+'(.*?)')"
result = re.search(regex, ddl)
ddl = ddl.replace(result.group(1), " ") if result else ddl
# Removing TBLPROPERTIES statement
regex = "(TBLPROPERTIES\s+(.*?)\))"
result = re.search(regex, ddl)
ddl = ddl.replace(result.group(1), " ") if result else ddl
# Removing WITH SERDEPROPERTIES statement
regex = "(WITH SERDEPROPERTIES\s+(.*?)\))"
result = re.search(regex, ddl)
ddl = ddl.replace(result.group(1), " ") if result else ddl
if remove_formats:
# Removing STORED AS INPUTFORMAT statement
regex = "(STORED AS INPUTFORMAT\s+'(.*?)')"
result = re.search(regex, ddl)
ddl = ddl.replace(result.group(1), " ") if result else ddl
# Removing OUTPUTFORMAT statement
regex = "(OUTPUTFORMAT\s+'(.*?)')"
result = re.search(regex, ddl)
ddl = ddl.replace(result.group(1), " ") if result else ddl
return ddl
def get_table_format(self, ddl):
regex = "(STORED AS INPUTFORMAT\s+'(.*?)')"
result = re.search(regex, ddl)
input_format = result.group(2)
return self.supported_format_types[input_format.split(".")[-1]]
def get_database_info(self, ddl):
regex = "CREATE TABLE `((.*?)\.)?(.*?)`\("
result = re.search(regex, ddl)
if result:
groups = result.groups()
if groups[0]:
# found db name
return {'db': groups[1], 'table': groups[2]}
else:
{'db': None, 'table': groups[2]}
return {'db': None, 'table': None}
def get_createtable_ddl(self, conn, origin_table_name, dest_table_name):
cursor = conn.cursor()
cursor.execute("SHOW CREATE TABLE " + origin_table_name)
_lines = [_line[0] for _line in cursor.fetchall()]
ddl = ''.join(_lines)
ddl = self.clean_ddl(ddl, remove_formats=False, remove_general=True)
ddl = ddl.replace(origin_table_name, dest_table_name)
cursor.close()
return ddl
def create_database(self, conn, db):
self._execute_db_command(conn, "CREATE DATABASE IF NOT EXISTS " + db)
def drop_table(self, conn, table_name):
self._execute_db_command(conn, 'DROP TABLE IF EXISTS ' + table_name)
def drop_view(self, conn, view_name):
self._execute_db_command(conn, 'DROP VIEW ' + view_name)
def create_table(self, conn, table_name, ddl):
self._execute_db_command(conn, ddl)
def _execute_db_command(self, conn, command):
cursor = conn.cursor()
cursor.execute(command)
self.show_log(cursor)
cursor.close()
def get_connection(self, host, db='DEFAULT', queue='default'):
return hive.connect(host=host,
database=db,
configuration={'mapred.job.queue.name': queue,
' hive.exec.dynamic.partition.mode': 'nonstrict'})
def retrieve_data_sample(self, conn, full_table_name, sample_limit=100):
cursor = conn.cursor()
sql = "SELECT * FROM {} TABLESAMPLE ({} ROWS)".format(full_table_name, sample_limit)
cursor.execute(sql)
data_header = [{'col': line[0].split('.')[1], 'table': line[0].split('.')[0], 'type': line[1]} for line in cursor.description]
data = [row for row in cursor.fetchall()]
self.show_log(cursor)
cursor.close()
return {'data_header': data_header,
'total_lines': len(data),
'data': data,
'estimate_query_size': sys.getsizeof(data),
'estimate_query_mean_per_line': sys.getsizeof(data) / len(data)}
def count_rows(self, conn, sql):
cursor = conn.cursor()
cursor.execute("SELECT COUNT(1) " + sql[sql.upper().rfind("FROM"):])
size = cursor.fetchone()[0]
self.show_log(cursor)
cursor.close()
return size
def show_log(self, cursor):
for l in cursor.fetch_logs():
logger.debug(l)
def save_data(self, conn, table, data):
cursor = conn.cursor()
print('Inserting {} rows in {} table...'.format(data['total_lines'], table))
cols = [v['col'] for v in data['data_header']]
dml = "INSERT INTO {0} ({1}) VALUES ({2})".format(table, ", ".join(cols), ", ".join(['%s' for col in cols]))
cursor.executemany(dml, [(v,) for v in data['data'][1:10]])
self.show_log(cursor)
cursor.close()
def get_partitions(self, ddl):
regex = "(PARTITIONED BY\s+\((.*?)\))"
result = re.search(regex, ddl)
if result:
p_cols = result.group(2).strip().replace('`', '').split(",")
return [{'col': p_col.split()[0], 'type': p_col.split()[1]} for p_col in p_cols]
else:
return []
def has_partitions(self, sql, partitions):
regex = "WHERE(.*?)(" + "|".join(partitions).upper() + ")"
result = re.search(regex, sql.upper())
if result:
return True
else:
return False
def populate_table(self, conn, table_name, partitions, sql):
partitions = [p['col'] for p in partitions]
partitions_statement = "PARTITION ({})".format(", ".join(partitions)) if partitions else ""
dml = "INSERT OVERWRITE TABLE {0} {1} {2}".format(table_name, partitions_statement, sql)
self._execute_db_command(conn, dml)
def create_view(self, conn, view_name, table_name):
dml = "CREATE VIEW {0} AS SELECT * FROM {1}".format(view_name, table_name)
self._execute_db_command(conn, dml)
def refresh_partitions(self, conn, table_name):
refresh_statement = "MSCK REPAIR TABLE {0}".format(table_name)
self._execute_db_command(conn, refresh_statement)
def get_table_location(self, conn, table_name):
cursor = conn.cursor()
cursor.execute("DESCRIBE FORMATTED {}".format(table_name))
location = [key[1].strip() for key in cursor.fetchall() if key[0] and key[0].strip().upper() == 'LOCATION:']
location = location[0].replace('hdfs://', 'hftp://')
cursor.close()
return location
def delete_files(self, ssh, url):
cmd = "hdfs dfs -rm -R '{}'".format(url)
self._hdfs_commands(ssh, cmd)
def copy_files(self, ssh, origin, dest):
cmd = "hadoop distcp --update '{}' '{}'".format(origin, dest)
return self._hdfs_commands(ssh, cmd)
def _hdfs_commands(self, ssh, cmd):
logger.debug("Executing remote command: {}".format(cmd))
i, o, e = ssh.exec_command(cmd)
errors = e.readlines()
output = o.readlines()
logger.debug(output)
logger.debug(errors)
return output, errors
def _get_ssh_client(self, hdfs_host, hdfs_port, username, password):
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect(hostname=hdfs_host, port=hdfs_port, username=username, password=password, )
return ssh
def hdfs_dist_copy(self, force, hdfs_host, hdfs_port, origin, dest, username=None, password=None):
# connecting with hdfs host
ssh = self._get_ssh_client(hdfs_host, hdfs_port, username, password)
if force:
print("Removing old hdfs files if necessary. To force copy remote files use --force-copy-files flag.")
# delete files from dest
self.delete_files(ssh, dest)
else:
print("Using old hdfs files to complete the procedure. If necessary to copy files again use --force-copy-files flag.")
# copy files from origin to destination
_, copy_errors = self.copy_files(ssh, origin, dest)
# validate copy
cmd_template = "hdfs dfs -ls -R '{}' | grep -E '^-' | wc -l"
cmd = cmd_template.format(origin)
result1, _ = self._hdfs_commands(ssh, cmd)
cmd = cmd_template.format(dest)
result2, _ = self._hdfs_commands(ssh, cmd)
if result1 == result2:
print("Files {} successfully transferred!!".format(result1))
else:
print("Errors during hdfs files copy process!!")
for e_l in copy_errors:
logger.debug(e_l)
sys.exit("Stoping process!")
def create_external_table(self, conn, temp_table_name, ddl, parquet_file_location):
format_type = self.get_table_format(ddl)
ddl = self.clean_ddl(ddl, remove_formats=True, remove_general=False)
ddl = ddl.replace("CREATE TABLE", "CREATE EXTERNAL TABLE")
ddl = "{} STORED AS {} LOCATION '{}'".format(ddl, format_type, parquet_file_location)
self.create_table(conn=conn, table_name=temp_table_name, ddl=ddl)