blob: 8398a2ba66c00bd6af6e93647b5ddedfbe01426a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#####################################################################
# This is a snapshot of dump.py in 4.2 main at March 5, 2015.
# It is used solely for analyzedb as part of the porting of analyzedb
# to HAWQ. We choose not to refresh dump.py since there may be utilities
# that rely on the old version of dump.py.
#####################################################################
import os
import tempfile
import time
from datetime import datetime
import shutil
import sys
import gppylib
import gppylib.operations.backup_utils as backup_utils
from gppylib import gplog
from gppylib.db import dbconn
from gppylib.db.dbconn import execSQL, execSQLForSingleton, UnexpectedRowsError
from gppylib.commands.base import Command, REMOTE, ExecutionError
from gppylib.commands.gp import Psql
from gppylib.commands.unix import getUserName, findCmdInPath, curr_platform, SUNOS
from gppylib.gparray import GpArray
from gppylib.mainUtils import ExceptionNoStackTraceNeeded
from gppylib.operations import Operation
from gppylib.operations.unix import CheckDir, CheckFile, ListFiles, ListFilesByPattern, MakeDir, RemoveFile, RemoveTree, RemoveRemoteTree
from gppylib.operations.utils import RemoteOperation, ParallelOperation
from gppylib.operations.backup_utils import write_lines_to_file, verify_lines_in_file, \
get_lines_from_file, DUMP_DIR, get_incremental_ts_from_report_file, \
generate_ao_state_filename, generate_co_state_filename, generate_report_filename,\
generate_increments_filename, generate_dirtytable_filename, generate_partition_list_filename, \
generate_pgstatlastoperation_filename, validate_timestamp, get_latest_report_timestamp, \
create_temp_file_from_list, execute_sql, generate_master_config_filename, \
generate_segment_config_filename, generate_global_prefix, generate_master_dbdump_prefix, \
generate_master_status_prefix, generate_seg_dbdump_prefix, generate_seg_status_prefix, \
generate_dbdump_prefix, generate_createdb_filename, generate_filter_filename, get_latest_full_dump_timestamp
logger = gplog.get_default_logger()
# MPP-15307
# DUMP_DATE dictates the db_dumps/ subdirectory to which gpcrondump will dump.
# It is computed just once to ensure different pieces of logic herein operate on the same subdirectory.
TIMESTAMP = datetime.now()
TIMESTAMP_KEY = TIMESTAMP.strftime("%Y%m%d%H%M%S")
DUMP_DATE = TIMESTAMP.strftime("%Y%m%d")
COMPRESSION_FACTOR = 12 # TODO: Where did 12 come from?
INJECT_GP_DUMP_FAILURE = None
GET_ALL_DATATABLES_SQL = """
SELECT ALLTABLES.oid, ALLTABLES.schemaname, ALLTABLES.tablename FROM
(SELECT c.oid, n.nspname AS schemaname, c.relname AS tablename FROM pg_class c, pg_namespace n
WHERE n.oid = c.relnamespace) as ALLTABLES,
(SELECT n.nspname AS schemaname, c.relname AS tablename
FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_tablespace t ON t.oid = c.reltablespace
WHERE c.relkind = 'r'::"char" AND c.oid > 16384 AND (c.relnamespace > 16384 or n.nspname = 'public')
EXCEPT
((SELECT x.schemaname, x.partitiontablename FROM
(SELECT distinct schemaname, tablename, partitiontablename, partitionlevel FROM pg_partitions) as X,
(SELECT schemaname, tablename maxtable, max(partitionlevel) maxlevel FROM pg_partitions group by (tablename, schemaname)) as Y
WHERE x.schemaname = y.schemaname and x.tablename = Y.maxtable and x.partitionlevel != Y.maxlevel)
UNION (SELECT distinct schemaname, tablename FROM pg_partitions))) as DATATABLES
WHERE ALLTABLES.schemaname = DATATABLES.schemaname and ALLTABLES.tablename = DATATABLES.tablename AND ALLTABLES.oid not in (select reloid from pg_exttable) AND ALLTABLES.schemaname NOT LIKE 'pg_temp_%'
"""
GET_ALL_USER_TABLES_SQL = """
SELECT n.nspname AS schemaname, c.relname AS tablename
FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_tablespace t ON t.oid = c.reltablespace
WHERE c.relkind = 'r'::"char" AND c.oid > 16384 AND (c.relnamespace > 16384 or n.nspname = 'public')
"""
GET_APPENDONLY_DATA_TABLE_INFO_SQL = """
SELECT ALL_DATA_TABLES.oid, ALL_DATA_TABLES.schemaname, ALL_DATA_TABLES.tablename, OUTER_PG_CLASS.relname as tupletable FROM
(%s) as ALL_DATA_TABLES, pg_appendonly, pg_class OUTER_PG_CLASS
WHERE ALL_DATA_TABLES.oid = pg_appendonly.relid
AND OUTER_PG_CLASS.oid = pg_appendonly.segrelid
""" % GET_ALL_DATATABLES_SQL
GET_ALL_AO_DATATABLES_SQL = """
%s AND pg_appendonly.columnstore = 'f'
""" % GET_APPENDONLY_DATA_TABLE_INFO_SQL
GET_ALL_CO_DATATABLES_SQL = """
%s AND pg_appendonly.columnstore = 't'
""" % GET_APPENDONLY_DATA_TABLE_INFO_SQL
GET_ALL_HEAP_DATATABLES_SQL = """
%s AND ALLTABLES.oid not in (SELECT relid from pg_appendonly)
""" % GET_ALL_DATATABLES_SQL
GET_ALL_AO_CO_DATATABLES_SQL = """
%s AND ALLTABLES.oid in (SELECT relid from pg_appendonly)
""" % GET_ALL_DATATABLES_SQL
GET_LAST_OPERATION_SQL = """
SELECT PGN.nspname, PGC.relname, objid, staactionname, stasubtype, statime FROM pg_stat_last_operation, pg_class PGC, pg_namespace PGN
WHERE objid = PGC.oid
AND PGC.relnamespace = PGN.oid
AND staactionname IN ('CREATE', 'ALTER', 'TRUNCATE')
AND objid IN (SELECT oid FROM (%s) as AOCODATATABLES)
ORDER BY objid, staactionname
""" % GET_ALL_AO_CO_DATATABLES_SQL
def generate_dump_timestamp(timestamp):
global TIMESTAMP
global TIMESTAMP_KEY
global DUMP_DATE
if timestamp is not None:
TIMESTAMP = timestamp
else:
TIMESTAMP = datetime.now()
TIMESTAMP_KEY = TIMESTAMP.strftime("%Y%m%d%H%M%S")
DUMP_DATE = TIMESTAMP.strftime("%Y%m%d")
def get_ao_partition_state(master_port, dbname):
ao_partition_info = get_ao_partition_list(master_port, dbname)
ao_partition_list = get_partition_state(master_port, dbname, 'pg_aoseg', ao_partition_info)
return ao_partition_list
def get_co_partition_state(master_port, dbname):
co_partition_info = get_co_partition_list(master_port, dbname)
co_partition_list = get_partition_state(master_port, dbname, 'pg_aoseg', co_partition_info)
return co_partition_list
def validate_tuple_count(schema, tablename, cnt):
if not cnt:
return
if not cnt.isdigit():
raise Exception("Can not convert tuple count for table. Possibly exceeded backup max tuple count of 1 quadrillion rows per table for: '%s.%s' '%s'" % (schema, tablename, cnt))
if len(cnt) > 15:
raise Exception("Exceeded backup max tuple count of 1 quadrillion rows per table for: '%s.%s' '%s'" % (schema, tablename, cnt))
def get_partition_state(master_port, dbname, catalog_schema, partition_info):
partition_list = list()
dburl = dbconn.DbURL(port=master_port, dbname=dbname)
num_sqls = 0
with dbconn.connect(dburl) as conn:
for (oid, schemaname, partition_name, tupletable) in partition_info:
tuple_count_sql = "select to_char(sum(tupcount::bigint), '999999999999999999999') from %s.%s" % (catalog_schema, tupletable)
try:
tuple_count = execSQLForSingleton(conn, tuple_count_sql)
except Exception as e:
logger.error(str(e))
raise Exception('Query to obtain tuple count failed for the table: %s.%s having oid: %s and tuple table: %s' % (schemaname, partition_name, oid, tupletable))
num_sqls += 1
if num_sqls == 1000: # The choice of batch size was chosen arbitrarily
logger.debug('Completed executing batch of 1000 tuple count SQLs')
conn.commit()
num_sqls = 0
if tuple_count:
tuple_count = tuple_count.strip()
validate_tuple_count(schemaname, partition_name, tuple_count)
partition_list.append('%s, %s, %s' %(schemaname, partition_name, tuple_count))
return partition_list
def get_tables_with_dirty_metadata(master_datadir, backup_dir, full_timestamp, cur_pgstatoperations):
last_dump_timestamp = get_last_dump_timestamp(master_datadir, backup_dir, full_timestamp)
old_pgstatoperations_file = generate_pgstatlastoperation_filename(master_datadir, backup_dir, last_dump_timestamp)
old_pgstatoperations = get_lines_from_file(old_pgstatoperations_file)
old_pgstatoperations_dict = get_pgstatlastoperations_dict(old_pgstatoperations)
dirty_tables = compare_metadata(old_pgstatoperations_dict, cur_pgstatoperations)
return dirty_tables
def get_dirty_partition_tables(table_type, curr_state_partition_list, master_datadir, backup_dir, full_timestamp):
last_state_partition_list = get_last_state(table_type, master_datadir, backup_dir, full_timestamp)
last_state_dict = create_partition_dict(last_state_partition_list)
curr_state_dict = create_partition_dict(curr_state_partition_list)
return compare_dict(last_state_dict, curr_state_dict)
def get_last_state(table_type, master_datadir, backup_dir, full_timestamp):
last_ts = get_last_dump_timestamp(master_datadir, backup_dir, full_timestamp)
last_state_filename = get_filename_from_filetype(table_type, master_datadir, backup_dir, last_ts.strip())
if not os.path.isfile(last_state_filename):
raise Exception('%s state file does not exist: %s' % (table_type, last_state_filename))
return get_lines_from_file(last_state_filename)
def compare_metadata(old_pgstatoperations, cur_pgstatoperations):
diffs = set()
for operation in cur_pgstatoperations:
toks = operation.split(',')
if len(toks) != 6:
raise Exception('Wrong number of tokens in last_operation data for current backup: "%s"' % operation)
if (toks[2], toks[3]) not in old_pgstatoperations or old_pgstatoperations[(toks[2], toks[3])] != operation:
tname = '%s.%s' % (toks[0], toks[1])
diffs.add(tname)
return diffs
def get_pgstatlastoperations_dict(last_operations):
last_operations_dict = {}
for operation in last_operations:
toks = operation.split(',')
if len(toks) != 6:
raise Exception('Wrong number of tokens in last_operation data for last backup: "%s"' % operation)
last_operations_dict[(toks[2], toks[3])] = operation
return last_operations_dict
def get_last_dump_timestamp(master_datadir, backup_dir, full_timestamp):
increments_filename = generate_increments_filename(master_datadir, backup_dir, full_timestamp)
if not os.path.isfile(increments_filename):
return full_timestamp
lines = get_lines_from_file(increments_filename)
if not lines:
raise Exception("increments file exists but is empty: '%s'" % increments_filename)
ts = lines[-1].strip()
if not validate_timestamp(ts):
raise Exception("get_last_dump_timestamp found invalid ts in file '%s': '%s'" % (increments_filename, ts))
return ts
def create_partition_dict(partition_list):
table_dict = dict()
for partition in partition_list:
fields = partition.split(',')
if len(fields) != 3:
raise Exception('Invalid state file format %s' % partition)
key = '%s.%s' % (fields[0].strip(), fields[1].strip())
table_dict[key] = fields[2].strip()
return table_dict
def compare_dict(last_dict, curr_dict):
diffkeys = set()
for k in curr_dict:
if k not in last_dict or (curr_dict[k] != last_dict[k]):
diffkeys.add(k)
return diffkeys
def get_filename_from_filetype(table_type, master_datadir, backup_dir, timestamp_key=None):
if timestamp_key is None:
timestamp_key = TIMESTAMP_KEY
if table_type == 'ao':
filename = generate_ao_state_filename(master_datadir, backup_dir, timestamp_key)
elif table_type == 'co':
filename = generate_co_state_filename(master_datadir, backup_dir, timestamp_key)
else:
raise Exception('Invalid table type %s provided. Supported table types ao/co.' % table_type)
return filename
def write_state_file(table_type, master_datadir, backup_dir, partition_list):
filename = get_filename_from_filetype(table_type, master_datadir, backup_dir)
write_lines_to_file(filename, partition_list)
verify_lines_in_file(filename, partition_list)
# return a list of dirty tables
def get_dirty_tables(master_port, dbname, master_datadir, backup_dir, fulldump_ts,
ao_partition_list, co_partition_list, last_operation_data):
dirty_heap_tables = get_dirty_heap_tables(master_port, dbname)
dirty_ao_tables = get_dirty_partition_tables('ao', ao_partition_list, master_datadir,
backup_dir, fulldump_ts)
dirty_co_tables = get_dirty_partition_tables('co', co_partition_list, master_datadir,
backup_dir, fulldump_ts)
dirty_metadata_set = get_tables_with_dirty_metadata(master_datadir, backup_dir,
fulldump_ts, last_operation_data)
return list(dirty_heap_tables | dirty_ao_tables | dirty_co_tables | dirty_metadata_set)
def get_dirty_heap_tables(master_port, dbname):
dirty_tables = set()
qresult = get_heap_partition_list(master_port, dbname)
for row in qresult:
if len(row) != 3:
raise Exception("Heap tables query returned rows with unexpected number of columns %d" % len(row))
tname = '%s.%s' % (row[1], row[2])
dirty_tables.add(tname)
return dirty_tables
def write_dirty_file_to_temp(dirty_tables):
return create_temp_file_from_list(dirty_tables, 'dirty_backup_list_')
def write_dirty_file(mdd, dirty_tables, backup_dir, timestamp_key=None):
if timestamp_key is None:
timestamp_key = TIMESTAMP_KEY
if dirty_tables is None:
return None
dirty_list_file = generate_dirtytable_filename(mdd, backup_dir, timestamp_key)
write_lines_to_file(dirty_list_file, dirty_tables)
verify_lines_in_file(dirty_list_file, dirty_tables)
return dirty_list_file
def get_heap_partition_list(master_port, dbname):
return execute_sql(GET_ALL_HEAP_DATATABLES_SQL, master_port, dbname)
def get_ao_partition_list(master_port, dbname):
partition_list = execute_sql(GET_ALL_AO_DATATABLES_SQL, master_port, dbname)
for line in partition_list:
if len(line) != 4:
raise Exception('Invalid results from query to get all AO tables: [%s]' % (','.join(line)))
return partition_list
def get_co_partition_list(master_port, dbname):
partition_list = execute_sql(GET_ALL_CO_DATATABLES_SQL, master_port, dbname)
for line in partition_list:
if len(line) != 4:
raise Exception('Invalid results from query to get all CO tables: [%s]' % (','.join(line)))
return partition_list
def get_partition_list(master_port, dbname):
return execute_sql(GET_ALL_DATATABLES_SQL, master_port, dbname)
def get_user_table_list(master_port, dbname):
return execute_sql(GET_ALL_USER_TABLES_SQL, master_port, dbname)
def get_last_operation_data(master_port, dbname):
# oid, action, subtype, timestamp
rows = execute_sql(GET_LAST_OPERATION_SQL, master_port, dbname)
data = []
for row in rows:
if len(row) != 6:
raise Exception("Invalid return from query in get_last_operation_data: % cols" % (len(row)))
line = "%s,%s,%d,%s,%s,%s" % (row[0], row[1], row[2], row[3], row[4], row[5])
data.append(line)
return data
def write_partition_list_file(master_datadir, backup_dir, timestamp_key, master_port, dbname):
filter_file = get_filter_file(dbname, master_datadir, backup_dir)
partition_list_file_name = generate_partition_list_filename(master_datadir, backup_dir, timestamp_key)
if filter_file:
shutil.copyfile(filter_file, partition_list_file_name)
verify_lines_in_file(partition_list_file_name, get_lines_from_file(filter_file))
else:
lines_to_write = get_partition_list(master_port, dbname)
partition_list = []
for line in lines_to_write:
if len(line) != 3:
raise Exception('Invalid results from query to get all tables: [%s]' % (','.join(line)))
partition_list.append("%s.%s" % (line[1], line[2]))
write_lines_to_file(partition_list_file_name, partition_list)
verify_lines_in_file(partition_list_file_name, partition_list)
def write_last_operation_file(master_datadir, backup_dir, rows, timestamp_key=None):
if timestamp_key is None:
timestamp_key = TIMESTAMP_KEY
filename = generate_pgstatlastoperation_filename(master_datadir, backup_dir, timestamp_key)
write_lines_to_file(filename, rows)
verify_lines_in_file(filename, rows)
def validate_current_timestamp(backup_dir, current=None):
if current is None:
current = TIMESTAMP_KEY
latest = get_latest_report_timestamp(backup_dir)
if not latest:
return
if latest >= current:
raise Exception('There is a future dated backup on the system preventing new backups')
def get_backup_dir(master_datadir, backup_dir):
if backup_dir:
return backup_dir
return master_datadir
def update_filter_file(dump_database, master_datadir, backup_dir, master_port):
filter_filename = get_filter_file(dump_database, master_datadir, backup_dir)
filter_tables = get_lines_from_file(filter_filename)
tables_sql = "SELECT DISTINCT schemaname||'.'||tablename FROM pg_partitions";
partitions_sql = "SELECT schemaname||'.'||partitiontablename FROM pg_partitions WHERE schemaname||'.'||tablename='%s';"
table_list = execute_sql(tables_sql, master_port, dump_database)
for table in table_list:
if table[0] in filter_tables:
partitions_list = execute_sql(partitions_sql % table[0], master_port, dump_database)
filter_tables.extend([x[0] for x in partitions_list])
write_lines_to_file(filter_filename, list(set(filter_tables)))
def get_filter_file(dump_database, master_datadir, backup_dir):
timestamp = get_latest_full_dump_timestamp(dump_database, get_backup_dir(master_datadir, backup_dir))
filter_file = generate_filter_filename(master_datadir, backup_dir, timestamp)
if os.path.isfile(filter_file):
return filter_file
return None
def filter_dirty_tables(dirty_tables, dump_database, master_datadir, backup_dir):
timestamp = get_latest_full_dump_timestamp(dump_database, get_backup_dir(master_datadir, backup_dir))
filter_file = get_filter_file(dump_database, master_datadir, backup_dir)
if filter_file:
tables_to_filter = get_lines_from_file(filter_file)
dirty_copy = dirty_tables[:]
for table in dirty_copy:
if table not in tables_to_filter:
dirty_tables.remove(table)
return dirty_tables
class DumpDatabase(Operation):
# TODO: very verbose constructor = room for improvement. in the parent constructor, we could use kwargs
# to automatically take in all arguments and perhaps do some data type validation.
def __init__(self, dump_database, dump_schema, include_dump_tables, exclude_dump_tables, include_dump_tables_file, exclude_dump_tables_file, backup_dir, free_space_percent, compress, clear_catalog_dumps, encoding, output_options, batch_default, master_datadir, master_port, dump_dir, incremental=False):
self.dump_database = dump_database
self.dump_schema = dump_schema
self.include_dump_tables = include_dump_tables
self.exclude_dump_tables = exclude_dump_tables
self.include_dump_tables_file = include_dump_tables_file,
self.exclude_dump_tables_file = exclude_dump_tables_file,
self.backup_dir = backup_dir
self.free_space_percent = free_space_percent
self.compress = compress
self.clear_catalog_dumps = clear_catalog_dumps
self.encoding = encoding
self.output_options = output_options
self.batch_default = batch_default
self.master_datadir = master_datadir
self.master_port = master_port
self.dump_dir = dump_dir
self.incremental = incremental
def execute(self):
self.exclude_dump_tables = ValidateDumpDatabase(dump_database = self.dump_database,
dump_schema = self.dump_schema,
include_dump_tables = self.include_dump_tables,
exclude_dump_tables = self.exclude_dump_tables,
include_dump_tables_file = self.include_dump_tables_file[0],
exclude_dump_tables_file = self.exclude_dump_tables_file[0],
backup_dir = self.backup_dir,
free_space_percent = self.free_space_percent,
compress = self.compress,
batch_default = self.batch_default,
master_datadir = self.master_datadir,
master_port = self.master_port,
dump_dir = self.dump_dir,
incremental = self.incremental).run()
if self.incremental and backup_utils.dump_prefix \
and get_filter_file(self.dump_database, self.master_datadir, self.backup_dir):
filtered_dump_line = self.create_filtered_dump_string(getUserName(), DUMP_DATE, TIMESTAMP_KEY)
(start, end, rc) = self.perform_dump('Dump process', filtered_dump_line)
return self.create_dump_outcome(start, end, rc)
dump_line = self.create_dump_string(getUserName(), DUMP_DATE, TIMESTAMP_KEY)
(start, end, rc) = self.perform_dump('Dump process', dump_line)
if backup_utils.dump_prefix and self.include_dump_tables_file and not self.incremental:
self.create_filter_file()
return self.create_dump_outcome(start, end, rc)
def perform_dump(self, title, dump_line):
logger.info("%s command line %s" % (title, dump_line))
logger.info("Starting %s" % title)
start = TIMESTAMP
cmd = Command('Invoking gp_dump', dump_line)
cmd.run()
rc = cmd.get_results().rc
if INJECT_GP_DUMP_FAILURE is not None:
rc = INJECT_GP_DUMP_FAILURE
if rc != 0:
logger.warn("%s returned exit code %d" % (title, rc))
else:
logger.info("%s returned exit code 0" % title)
end = datetime.now()
return (start, end, rc)
# If using -t, copy the filter file over the table list to be passed to the master
# If using -T, get the intersection of the filter and the table list
# In either case, the filter file contains the list of tables to include
def create_filter_file(self):
filter_name = generate_filter_filename(self.master_datadir,
get_backup_dir(self.master_datadir, self.backup_dir),
TIMESTAMP_KEY)
if self.include_dump_tables_file[0]:
shutil.copyfile(self.include_dump_tables_file[0], filter_name)
verify_lines_in_file(filter_name, get_lines_from_file(self.include_dump_tables_file[0]))
elif self.exclude_dump_tables_file[0]:
filter = get_lines_from_file(self.exclude_dump_tables_file[0])
partitions = get_user_table_list(self.master_port, self.dump_database)
tables = []
for p in partitions:
tablename = '%s.%s' % (p[0], p[1])
if tablename not in filter:
tables.append(tablename)
write_lines_to_file(filter_name, tables)
logger.info('Creating filter file: %s' % filter_name)
def create_dump_outcome(self, start, end, rc):
return {'timestamp_start': start.strftime("%Y%m%d%H%M%S"),
'time_start': start.strftime("%H:%M:%S"),
'time_end': end.strftime("%H:%M:%S"),
'exit_status': rc}
def create_filtered_dump_string(self, user_name, dump_date, timestamp_key):
filter_filename = get_filter_file(self.dump_database, self.master_datadir, self.backup_dir)
dump_string = self.create_dump_string(user_name, dump_date, timestamp_key, filter_filename)
dump_string += ' --incremental-filter=%s' % filter_filename
return dump_string
def create_dump_string(self, user_name, dump_date, timestamp_key, filter_filename=None):
if self.backup_dir is not None:
dump_path = report_path = os.path.join(self.backup_dir, DUMP_DIR, dump_date)
else:
dump_path = os.path.join(self.dump_dir, dump_date)
report_path = os.path.join(self.master_datadir, self.dump_dir, dump_date)
if self.incremental:
if self.backup_dir:
report_path = dump_path
else:
report_path = os.path.join(self.master_datadir, dump_path)
dump_line = "gp_dump -p %d -U %s --gp-d=%s --gp-r=%s --gp-s=p --gp-k=%s --no-lock" % (self.master_port, user_name, dump_path, report_path, timestamp_key)
if self.clear_catalog_dumps:
dump_line += " -c"
if self.compress:
logger.info("Adding compression parameter")
dump_line += " --gp-c"
if self.encoding is not None:
logger.info("Adding encoding %s" % self.encoding)
dump_line += " --encoding=%s" % self.encoding
if self.incremental:
logger.info("Adding --incremental")
dump_line += " --incremental"
if backup_utils.dump_prefix:
logger.info("Adding --prefix")
dump_line += " --prefix=%s" % backup_utils.dump_prefix
logger.info('Adding --no-expand-children')
dump_line += " --no-expand-children"
"""
AK: Some ridiculous escaping here. I apologize.
These options get passed-through gp_dump to gp_dump_agent.
Commented out lines use escaping that would be reasonable, if gp_dump escaped properly.
"""
if self.dump_schema is not None:
logger.info("Adding schema name %s" % self.dump_schema)
dump_line += " -n \"\\\"%s\\\"\"" % self.dump_schema
#dump_line += " -n \"%s\"" % self.dump_schema
dump_line += " %s" % self.dump_database
for dump_table in self.include_dump_tables:
schema, table = dump_table.split('.')
dump_line += " --table=\"\\\"%s\\\"\".\"\\\"%s\\\"\"" % (schema, table)
#dump_line += " --table=\"%s\".\"%s\"" % (schema, table)
for dump_table in self.exclude_dump_tables:
schema, table = dump_table.split('.')
dump_line += " --exclude-table=\"\\\"%s\\\"\".\"\\\"%s\\\"\"" % (schema, table)
#dump_line += " --exclude-table=\"%s\".\"%s\"" % (schema, table)
if self.include_dump_tables_file[0] is not None:
dump_line += " --table-file=%s" % self.include_dump_tables_file
if self.exclude_dump_tables_file[0] is not None:
dump_line += " --exclude-table-file=%s" % self.exclude_dump_tables_file
for opt in self.output_options:
dump_line += " %s" % opt
return dump_line
class CreateIncrementsFile(Operation):
def __init__(self, dump_database, full_timestamp, timestamp, master_datadir, backup_dir):
self.full_timestamp = full_timestamp
self.timestamp = timestamp
self.master_datadir = master_datadir
self.backup_dir = backup_dir
self.increments_filename = generate_increments_filename(master_datadir, backup_dir, full_timestamp)
self.orig_lines_in_file = []
self.dump_database = dump_database
def execute(self):
if os.path.isfile(self.increments_filename):
CreateIncrementsFile.validate_increments_file(self.dump_database, self.increments_filename, self.master_datadir, self.backup_dir)
self.orig_lines_in_file = get_lines_from_file(self.increments_filename)
with open(self.increments_filename, 'a') as fd:
fd.write('%s\n' % self.timestamp)
newlines_in_file = gppylib.operations.backup_utils.get_lines_from_file(self.increments_filename)
if len(newlines_in_file) < 1:
raise Exception("File not written to: %s" % self.increments_filename)
if newlines_in_file[-1].strip() != self.timestamp:
raise Exception("Timestamp '%s' not written to: %s" % (self.timestamp, self.increments_filename))
# remove the last line and the contents should be the same as before
del newlines_in_file[-1]
if self.orig_lines_in_file != newlines_in_file:
raise Exception("trouble adding timestamp '%s' to file '%s'" % (self.timestamp, self.increments_filename))
return (len(newlines_in_file) + 1)
@staticmethod
def validate_increments_file(dump_database, inc_file_name, master_data_dir, backup_dir):
tstamps = gppylib.operations.backup_utils.get_lines_from_file(inc_file_name)
for ts in tstamps:
ts = ts.strip()
if not ts:
continue
fn = generate_report_filename(master_data_dir, backup_dir, ts)
ts_in_rpt = None
try:
ts_in_rpt = get_incremental_ts_from_report_file(dump_database, fn)
except Exception as e:
logger.error(str(e))
if not ts_in_rpt:
raise Exception("Timestamp '%s' from increments file '%s' is not a valid increment" % (ts, inc_file_name))
class PostDumpDatabase(Operation):
def __init__(self, timestamp_start, compress, backup_dir, batch_default, master_datadir, master_port, dump_dir, incremental=False):
self.timestamp_start = timestamp_start
self.compress = compress
self.backup_dir = backup_dir
self.batch_default = batch_default
self.master_datadir = master_datadir
self.master_port = master_port
self.dump_dir = dump_dir
self.incremental = incremental
def get_report_dir(self, dump_date):
path = self.backup_dir if self.backup_dir is not None else self.master_datadir
path = os.path.join(path, self.dump_dir, dump_date)
return path
def execute(self):
report_dir = self.get_report_dir(DUMP_DATE)
reports = ListFilesByPattern(report_dir, "*gp_dump_*.rpt").run()
if not reports:
logger.error("Could not locate a report file on master.")
return {'exit_status': 2, 'timestamp': 'n/a'}
reports.sort(key=lambda x: int(x.split('_')[-1].split('.')[0]))
reports.reverse()
report = reports[0]
timestamp = report[-18:-4] # last 14 digits, just before .rpt
if int(timestamp) < int(self.timestamp_start):
logger.error("Could not locate the newly generated report %s file on master, for timestamp %s (%s)." % (timestamp, self.timestamp_start, reports))
return {'exit_status': 2, 'timestamp': 'n/a'}
logger.info("Timestamp key = %s" % timestamp)
# Check master dumps
path = self.backup_dir if self.backup_dir is not None else self.master_datadir
path = os.path.join(path, self.dump_dir, DUMP_DATE)
status_file = os.path.join(path, "%s%s" % (generate_master_status_prefix(), timestamp))
dump_file = os.path.join(path, "%s%s" % (generate_master_dbdump_prefix(), timestamp))
if self.compress: dump_file += ".gz"
try:
PostDumpSegment(status_file = status_file,
dump_file = dump_file).run()
except NoStatusFile, e:
logger.warn('Status file %s not found on master' % status_file)
return {'exit_status': 1, 'timestamp': timestamp}
except StatusFileError, e:
logger.warn('Status file %s on master indicates errors' % status_file)
return {'exit_status': 1, 'timestamp': timestamp}
except NoDumpFile, e:
logger.warn('Dump file %s not found on master' % dump_file)
return {'exit_status': 1, 'timestamp': timestamp}
else:
logger.info('Checked master status file and master dump file.')
# Perform similar checks for primary segments
operations = []
gparray = GpArray.initFromCatalog(dbconn.DbURL(port = self.master_port), utility=True)
segs = [seg for seg in gparray.getDbList() if seg.isSegmentPrimary(current_role=True)]
for seg in segs:
path = self.backup_dir if self.backup_dir is not None else seg.getSegmentDataDirectory()
path = os.path.join(path, self.dump_dir, DUMP_DATE)
status_file = os.path.join(path, "%s%d_%s" % (generate_seg_status_prefix(), seg.getSegmentDbId(), timestamp))
dump_file = os.path.join(path, "%s%d_%s" % (generate_seg_dbdump_prefix(), seg.getSegmentDbId(), timestamp))
if self.compress: dump_file += ".gz"
operations.append(RemoteOperation(PostDumpSegment(status_file = status_file,
dump_file = dump_file),
seg.getSegmentHostName()))
ParallelOperation(operations, self.batch_default).run()
success = 0
for remote in operations:
host = remote.host
status_file = remote.operation.status_file
dump_file = remote.operation.dump_file
try:
remote.get_ret()
except NoStatusFile, e:
logger.warn('Status file %s not found on %s' % (status_file, host))
except StatusFileError, e:
logger.warn('Status file %s on %s indicates errors' % (status_file, host))
except NoDumpFile, e:
logger.warn('Dump file %s not found on %s' % (dump_file, host))
else:
success += 1
if success < len(operations):
logger.warn("Dump was unsuccessful. %d segment(s) failed post-dump checks." % (len(operations) - success))
return {'exit_status': 1, 'timestamp': timestamp}
return {'exit_status': 0, 'timestamp': timestamp}
class PostDumpSegment(Operation):
def __init__(self, status_file, dump_file):
self.status_file = status_file
self.dump_file = dump_file
def execute(self):
# Ensure that status file exists
if not CheckFile(self.status_file).run():
logger.error('Could not locate status file: %s' % self.status_file)
raise NoStatusFile()
# Ensure that status file indicates successful dump
with open(self.status_file, 'r') as f:
for line in f:
if line.find("Finished successfully") != -1:
break
else:
logger.error("Status report file indicates errors: %s" % self.status_file)
for line in f:
logger.info(line)
logger.error("Status file contents dumped to log file")
raise StatusFileError()
# Ensure that dump file exists
if not os.path.exists(self.dump_file):
logger.error("Could not locate dump file: %s" % self.dump_file)
raise NoDumpFile()
class NoStatusFile(Exception): pass
class StatusFileError(Exception): pass
class NoDumpFile(Exception): pass
class ValidateDumpDatabase(Operation):
def __init__(self, dump_database, dump_schema, include_dump_tables, exclude_dump_tables,
include_dump_tables_file, exclude_dump_tables_file, backup_dir,
free_space_percent, compress, batch_default, master_datadir, master_port, dump_dir, incremental):
self.dump_database = dump_database
self.dump_schema = dump_schema
self.include_dump_tables = include_dump_tables
self.exclude_dump_tables = exclude_dump_tables
self.include_dump_tables_file = include_dump_tables_file
self.exclude_dump_tables_file = exclude_dump_tables_file
self.backup_dir = backup_dir
self.free_space_percent = free_space_percent
self.compress = compress
self.batch_default = batch_default
self.master_datadir = master_datadir
self.master_port = master_port
self.dump_dir = dump_dir
self.incremental = incremental
def execute(self):
ValidateDatabaseExists(database = self.dump_database,
master_port = self.master_port).run()
if self.dump_schema is not None:
ValidateSchemaExists(database = self.dump_database,
schema = self.dump_schema,
master_port = self.master_port).run()
ValidateCluster(master_port = self.master_port).run()
ValidateAllDumpDirs(backup_dir = self.backup_dir,
batch_default = self.batch_default,
master_datadir = self.master_datadir,
master_port = self.master_port,
dump_dir = self.dump_dir).run()
if not self.incremental:
self.exclude_dump_tables = ValidateDumpTargets(dump_database = self.dump_database,
dump_schema = self.dump_schema,
include_dump_tables = self.include_dump_tables,
exclude_dump_tables = self.exclude_dump_tables,
include_dump_tables_file = self.include_dump_tables_file,
exclude_dump_tables_file = self.exclude_dump_tables_file,
master_port = self.master_port).run()
if self.free_space_percent is not None:
logger.info('Validating disk space')
ValidateDiskSpace(free_space_percent = self.free_space_percent,
compress = self.compress,
dump_database = self.dump_database,
include_dump_tables = self.include_dump_tables,
batch_default = self.batch_default,
master_port = self.master_port).run()
return self.exclude_dump_tables
class ValidateDiskSpace(Operation):
# TODO: this doesn't take into account that multiple segments may be dumping to the same logical disk.
def __init__(self, free_space_percent, compress, dump_database, include_dump_tables, batch_default, master_port):
self.free_space_percent = free_space_percent
self.compress = compress
self.dump_database = dump_database
self.include_dump_tables = include_dump_tables
self.batch_default = batch_default
self.master_port = master_port
def execute(self):
operations = []
gparray = GpArray.initFromCatalog(dbconn.DbURL(port = self.master_port), utility=True)
segs = [seg for seg in gparray.getDbList() if seg.isSegmentPrimary(current_role=True)]
for seg in segs:
operations.append(RemoteOperation(ValidateSegDiskSpace(free_space_percent = self.free_space_percent,
compress = self.compress,
dump_database = self.dump_database,
include_dump_tables = self.include_dump_tables,
datadir = seg.getSegmentDataDirectory(),
segport = seg.getSegmentPort()),
seg.getSegmentHostName()))
ParallelOperation(operations, self.batch_default).run()
success = 0
for remote in operations:
host = remote.host
try:
remote.get_ret()
except NotEnoughDiskSpace, e:
logger.error("%s has insufficient disk space. [Need: %dK, Free %dK]" % (host, e.needed_space, e.free_space))
else:
success += 1
if success < len(operations):
raise ExceptionNoStackTraceNeeded("Cannot continue. %d segment(s) failed disk space checks" % (len(operations) - success))
class ValidateSegDiskSpace(Operation):
# TODO: this estimation of needed space needs work. it doesn't include schemas or exclusion tables.
def __init__(self, free_space_percent, compress, dump_database, include_dump_tables, datadir, segport):
self.free_space_percent = free_space_percent
self.compress = compress
self.dump_database = dump_database
self.include_dump_tables = include_dump_tables
self.datadir = datadir
self.segport = segport
def execute(self):
needed_space = 0
dburl = dbconn.DbURL(dbname=self.dump_database, port=self.segport)
conn = None
try:
conn = dbconn.connect(dburl, utility=True)
if self.include_dump_tables:
for dump_table in self.include_dump_tables:
needed_space += execSQLForSingleton(conn, "SELECT pg_relation_size('%s')/1024;" % dump_table)
else:
needed_space = execSQLForSingleton(conn, "SELECT pg_database_size('%s')/1024;" % self.dump_database)
finally:
if conn is not None:
conn.close()
if self.compress:
needed_space = needed_space / COMPRESSION_FACTOR
# get free available space
stat_res = os.statvfs(self.datadir);
free_space = (stat_res.f_bavail * stat_res.f_frsize) / 1024
if free_space == 0 or (free_space - needed_space) / free_space < self.free_space_percent / 100:
logger.error("Disk space: [Need: %dK, Free %dK]" % (needed_space, free_space))
raise NotEnoughDiskSpace(free_space, needed_space)
logger.info("Disk space: [Need: %dK, Free %dK]" % (needed_space, free_space))
class NotEnoughDiskSpace(Exception):
def __init__(self, free_space, needed_space):
self.free_space, self.needed_space = free_space, needed_space
Exception.__init__(self, free_space, needed_space)
class ValidateGpToolkit(Operation):
def __init__(self, database, master_port):
self.database = database
self.master_port = master_port
def execute(self):
dburl = dbconn.DbURL(dbname=self.database, port=self.master_port)
conn = None
try:
conn = dbconn.connect(dburl)
count = execSQLForSingleton(conn, "select count(*) from pg_class, pg_namespace where pg_namespace.nspname = 'gp_toolkit' and pg_class.relnamespace = pg_namespace.oid")
finally:
if conn is not None:
conn.close()
if count > 0:
logger.debug("gp_toolkit exists within database %s." % self.database)
return
logger.info("gp_toolkit not found. Installing...")
Psql('Installing gp_toolkit',
filename='$GPHOME/share/postgresql/gp_toolkit.sql',
database=self.database,
port=self.master_port).run(validateAfter=True)
class ValidateAllDumpDirs(Operation):
def __init__(self, backup_dir, batch_default, master_datadir, master_port, dump_dir):
self.backup_dir = backup_dir
self.batch_default = batch_default
self.master_datadir = master_datadir
self.master_port = master_port
self.dump_dir = dump_dir
def execute(self):
if self.backup_dir is not None:
master_dirs_to_check = [self.backup_dir, self.master_datadir]
else:
master_dirs_to_check = [self.master_datadir]
for dir in master_dirs_to_check:
try:
ValidateDumpDirs(dir, self.dump_dir).run()
except DumpDirCreateFailed, e:
raise ExceptionNoStackTraceNeeded('Could not create %s on master. Cannot continue.' % dir)
except DumpDirNotWritable, e:
raise ExceptionNoStackTraceNeeded('Could not write to %s on master. Cannot continue.' % dir)
else:
logger.info('Checked %s on master' % dir)
# Check backup target on segments (either master_datadir or backup_dir, if present)
operations = []
gparray = GpArray.initFromCatalog(dbconn.DbURL(port = self.master_port), utility=True)
segs = [seg for seg in gparray.getDbList() if seg.isSegmentPrimary(current_role=True)]
for seg in segs:
dir = self.backup_dir if self.backup_dir is not None else seg.getSegmentDataDirectory()
operations.append(RemoteOperation(ValidateDumpDirs(dir, self.dump_dir), seg.getSegmentHostName()))
ParallelOperation(operations, self.batch_default).run()
success = 0
for remote in operations:
dir = remote.operation.dir
host = remote.host
try:
remote.get_ret()
except DumpDirCreateFailed, e:
logger.error("Could not create %s on %s." % (dir, host))
except DumpDirNotWritable, e:
logger.error("Could not write to %s on %s." % (dir, host))
else:
success += 1
if success < len(operations):
raise ExceptionNoStackTraceNeeded("Cannot continue. %d segment(s) failed directory checks" % (len(operations) - success))
class ValidateDumpDirs(Operation):
def __init__(self, dir, dump_dir):
self.dir = dir
self.dump_dir = dump_dir
def execute(self):
path = os.path.join(self.dir, self.dump_dir, DUMP_DATE)
exists = CheckDir(path).run()
if exists:
logger.info("Directory %s exists" % path)
else:
logger.info("Directory %s not found, will try to create" % path)
try:
MakeDir(path).run()
except OSError, e:
logger.exception("Could not create directory %s" % path)
raise DumpDirCreateFailed()
else:
logger.info("Created %s" % path)
try:
with tempfile.TemporaryFile(dir=path) as f:
pass
except Exception, e:
logger.exception("Cannot write to %s" % path)
raise DumpDirNotWritable()
class DumpDirCreateFailed(Exception): pass
class DumpDirNotWritable(Exception): pass
class ValidateDumpTargets(Operation):
def __init__(self, dump_database, dump_schema, include_dump_tables, exclude_dump_tables,
include_dump_tables_file, exclude_dump_tables_file, master_port):
self.dump_database = dump_database
self.dump_schema = dump_schema
self.include_dump_tables = include_dump_tables
self.exclude_dump_tables = exclude_dump_tables
self.include_dump_tables_file = include_dump_tables_file
self.exclude_dump_tables_file = exclude_dump_tables_file
self.master_port = master_port
def execute(self):
if ((len(self.include_dump_tables) > 0 or (self.include_dump_tables_file is not None)) and
(len(self.exclude_dump_tables) > 0 or (self.exclude_dump_tables_file is not None))):
raise ExceptionNoStackTraceNeeded("Cannot use -t/--table-file and -T/--exclude-table-file options at same time")
elif len(self.include_dump_tables) > 0 or self.include_dump_tables_file is not None:
logger.info("Configuring for single-database, include-table dump")
ValidateIncludeTargets(dump_database = self.dump_database,
dump_schema = self.dump_schema,
include_dump_tables = self.include_dump_tables,
include_dump_tables_file = self.include_dump_tables_file,
master_port = self.master_port).run()
elif len(self.exclude_dump_tables) > 0 or self.exclude_dump_tables_file is not None:
logger.info("Configuring for single-database, exclude-table dump")
self.exclude_dump_tables = ValidateExcludeTargets(dump_database = self.dump_database,
dump_schema = self.dump_schema,
exclude_dump_tables = self.exclude_dump_tables,
exclude_dump_tables_file = self.exclude_dump_tables_file,
master_port = self.master_port).run()
else:
logger.info("Configuring for single database dump")
return self.exclude_dump_tables
class ValidateIncludeTargets(Operation):
def __init__(self, dump_database, dump_schema, include_dump_tables, include_dump_tables_file, master_port):
self.dump_database = dump_database
self.dump_schema = dump_schema
self.include_dump_tables = include_dump_tables
self.include_dump_tables_file = include_dump_tables_file
self.master_port = master_port
def execute(self):
dump_tables = []
for dump_table in self.include_dump_tables:
dump_tables.append(dump_table)
if self.include_dump_tables_file is not None:
include_file = open(self.include_dump_tables_file, 'rU')
if not include_file:
raise ExceptionNoStackTraceNeeded("Can't open file %s" % self.include_dump_tables_file);
for line in include_file:
dump_tables.append(line.strip('\n'));
include_file.close()
for dump_table in dump_tables:
if '.' not in dump_table:
raise ExceptionNoStackTraceNeeded("No schema name supplied for table %s" % dump_table)
schema, table = dump_table.split('.')
exists = CheckTableExists(schema = schema,
table = table,
database = self.dump_database,
master_port = self.master_port).run()
if not exists:
raise ExceptionNoStackTraceNeeded("Table %s does not exist in %s database" % (dump_table, self.dump_database))
if self.dump_schema is not None:
if self.dump_schema != schema:
raise ExceptionNoStackTraceNeeded("Schema name %s not same as schema on %s" % (self.dump_schema, dump_table))
class ValidateExcludeTargets(Operation):
def __init__(self, dump_database, dump_schema, exclude_dump_tables, exclude_dump_tables_file, master_port):
self.dump_database = dump_database
self.dump_schema = dump_schema
self.exclude_dump_tables = exclude_dump_tables
self.exclude_dump_tables_file = exclude_dump_tables_file
self.master_port = master_port
def execute(self):
rebuild_excludes = []
dump_tables = []
for dump_table in self.exclude_dump_tables:
dump_tables.append(dump_table)
if self.exclude_dump_tables_file is not None:
exclude_file = open(self.exclude_dump_tables_file, 'rU')
if not exclude_file:
raise ExceptionNoStackTraceNeeded("Can't open file %s" % self.exclude_dump_tables_file);
for line in exclude_file:
dump_tables.append(line.strip('\n'));
exclude_file.close()
for dump_table in dump_tables:
if '.' not in dump_table:
raise ExceptionNoStackTraceNeeded("No schema name supplied for exclude table %s" % dump_table)
schema, table = dump_table.split('.')
exists = CheckTableExists(schema = schema,
table = table,
database = self.dump_database,
master_port = self.master_port).run()
if exists:
if self.dump_schema != schema:
logger.info("Adding table %s to exclude list" % dump_table)
rebuild_excludes.append(dump_table)
else:
logger.warn("Schema dump request and exclude table %s not in that schema, ignoring" % dump_table)
else:
logger.warn("Exclude table %s does not exist in %s database, ignoring" % (dump_table, self.dump_database))
if len(rebuild_excludes) == 0:
logger.warn("All exclude table names have been removed due to issues, see log file")
return self.exclude_dump_tables
class ValidateDatabaseExists(Operation):
""" TODO: move this to gppylib.operations.common? """
def __init__(self, database, master_port):
self.master_port = master_port
self.database = database
def execute(self):
conn = None
try:
dburl = dbconn.DbURL(port = self.master_port )
conn = dbconn.connect(dburl)
count = execSQLForSingleton(conn, "select count(*) from pg_database where datname='%s';" % self.database)
if count == 0:
raise ExceptionNoStackTraceNeeded("Database %s does not exist." % self.database)
finally:
if conn is not None:
conn.close()
class ValidateSchemaExists(Operation):
""" TODO: move this to gppylib.operations.common? """
def __init__(self, database, schema, master_port):
self.database = database
self.schema = schema
self.master_port = master_port
def execute(self):
conn = None
try:
dburl = dbconn.DbURL(port = self.master_port, dbname = self.database)
conn = dbconn.connect(dburl)
count = execSQLForSingleton(conn, "select count(*) from pg_namespace where nspname='%s';" % self.schema)
if count == 0:
raise ExceptionNoStackTraceNeeded("Schema %s does not exist in database %s." % (self.schema, self.database))
finally:
if conn is not None:
conn.close()
class CheckTableExists(Operation):
all_tables = None
def __init__(self, database, schema, table, master_port):
self.database = database
self.schema = schema
self.table = table
self.master_port = master_port
if CheckTableExists.all_tables is None:
CheckTableExists.all_tables = set()
for (schema, table) in get_user_table_list(self.master_port, self.database):
CheckTableExists.all_tables.add((schema, table))
def execute(self):
if (self.schema, self.table) in CheckTableExists.all_tables:
return True
return False
class ValidateCluster(Operation):
def __init__(self, master_port):
self.master_port = master_port
def execute(self):
gparray = GpArray.initFromCatalog(dbconn.DbURL(port = self.master_port), utility=True)
failed_segs = [seg for seg in gparray.getDbList() if seg.isSegmentPrimary(current_role=True) and seg.isSegmentDown()]
if len(failed_segs) != 0:
logger.warn("Failed primary segment instances detected")
failed_dbids = [seg.getSegmentDbid() for seg in failed_segs]
raise ExceptionNoStackTraceNeeded("Detected failed segment(s) with dbid=%s" % ",".join(failed_dbids))
class UpdateHistoryTable(Operation):
HISTORY_TABLE = "public.gpcrondump_history"
def __init__(self, dump_database, time_start, time_end, options_list, timestamp, dump_exit_status, pseudo_exit_status, master_port):
self.dump_database = dump_database
self.time_start = time_start
self.time_end = time_end
self.options_list = options_list
self.timestamp = timestamp
self.dump_exit_status = dump_exit_status
self.pseudo_exit_status = pseudo_exit_status
self.master_port = master_port
def execute(self):
schema, table = UpdateHistoryTable.HISTORY_TABLE.split('.')
exists = CheckTableExists(database = self.dump_database,
schema = schema,
table = table,
master_port = self.master_port).run()
if not exists:
conn = None
CREATE_HISTORY_TABLE = """ create table %s (rec_date timestamp, start_time char(8), end_time char(8), options text, dump_key varchar(20), dump_exit_status smallint, script_exit_status smallint, exit_text varchar(10)) distributed by (rec_date); """ % UpdateHistoryTable.HISTORY_TABLE
try:
dburl = dbconn.DbURL(port=self.master_port, dbname=self.dump_database)
conn = dbconn.connect(dburl)
execSQL(conn, CREATE_HISTORY_TABLE)
conn.commit()
except Exception, e:
logger.exception("Unable to create %s in %s database" % (UpdateHistoryTable.HISTORY_TABLE, self.dump_database))
return
else:
logger.info("Created %s in %s database" % (UpdateHistoryTable.HISTORY_TABLE, self.dump_database))
finally:
if conn is not None:
conn.close()
translate_rc_to_msg = { 0: "COMPLETED", 1: "WARNING", 2: "FATAL" }
exit_msg = translate_rc_to_msg[self.pseudo_exit_status]
APPEND_HISTORY_TABLE = """ insert into %s values (now(), '%s', '%s', '%s', '%s', %d, %d, '%s'); """ % (UpdateHistoryTable.HISTORY_TABLE, self.time_start, self.time_end, self.options_list, self.timestamp, self.dump_exit_status, self.pseudo_exit_status, exit_msg)
conn = None
try:
dburl = dbconn.DbURL(port=self.master_port, dbname=self.dump_database)
conn = dbconn.connect(dburl)
execSQL(conn, APPEND_HISTORY_TABLE)
conn.commit()
except Exception, e:
logger.exception("Failed to insert record into %s in %s database" % (UpdateHistoryTable.HISTORY_TABLE, self.dump_database))
else:
logger.info("Inserted dump record into %s in %s database" % (UpdateHistoryTable.HISTORY_TABLE, self.dump_database))
finally:
if conn is not None:
conn.close()
class DumpGlobal(Operation):
def __init__(self, timestamp, master_datadir, master_port, backup_dir, dump_dir):
self.timestamp = timestamp
self.master_datadir = master_datadir
self.master_port = master_port
self.backup_dir = backup_dir
self.dump_dir = dump_dir
def execute(self):
logger.info("Commencing pg_catalog dump")
if self.backup_dir is not None:
global_file = os.path.join(self.backup_dir, DUMP_DIR, DUMP_DATE, "%s%s" % (generate_global_prefix(), self.timestamp))
else:
global_file = os.path.join(self.master_datadir, self.dump_dir, DUMP_DATE, "%s%s" % (generate_global_prefix(), self.timestamp))
Command('Dump global objects',
self.create_pgdump_command_line(self.master_port, global_file)).run(validateAfter=True)
def create_pgdump_command_line(self, master_port, global_file):
return "pg_dumpall -p %s -g --gp-syntax > %s" % (master_port, global_file)
class DumpConfig(Operation):
# TODO: Should we really just give up if one of the tars fails?
# TODO: WorkerPool
def __init__(self, backup_dir, master_datadir, master_port, dump_dir):
self.backup_dir = backup_dir
self.master_datadir = master_datadir
self.master_port = master_port
self.dump_dir = dump_dir
def execute(self):
timestamp = TIMESTAMP_KEY
config_backup_file = generate_master_config_filename(timestamp)
if self.backup_dir is not None:
path = os.path.join(self.backup_dir, DUMP_DIR, DUMP_DATE, config_backup_file)
else:
path = os.path.join(self.master_datadir, self.dump_dir, DUMP_DATE, config_backup_file)
logger.info("Dumping master config files")
Command("Dumping master configuration files",
"tar cf %s %s/*.conf" % (path, self.master_datadir)).run(validateAfter=True)
logger.info("Dumping segment config files")
gparray = GpArray.initFromCatalog(dbconn.DbURL(port = self.master_port), utility=True)
primaries = [seg for seg in gparray.getDbList() if seg.isSegmentPrimary(current_role=True)]
for seg in primaries:
config_backup_file = generate_segment_config_filename(seg.getSegmentDbId(), timestamp)
if self.backup_dir is not None:
path = os.path.join(self.backup_dir, DUMP_DIR, DUMP_DATE, config_backup_file)
else:
path = os.path.join(seg.getSegmentDataDirectory(), self.dump_dir, DUMP_DATE, config_backup_file)
host = seg.getSegmentHostName()
Command("Dumping segment config files",
"tar cf %s %s/*.conf" % (path, seg.getSegmentDataDirectory()),
ctxt=REMOTE,
remoteHost=host).run(validateAfter=True)
class DeleteCurrentDump(Operation):
def __init__(self, timestamp, master_datadir, master_port, dump_dir):
self.timestamp = timestamp
self.master_datadir = master_datadir
self.master_port = master_port
self.dump_dir = dump_dir
def execute(self):
try:
DeleteCurrentSegDump(self.timestamp, self.master_datadir, self.dump_dir).run()
except OSError, e:
logger.warn("Error encountered during deletion of %s on master" % self.timestamp)
gparray = GpArray.initFromCatalog(dbconn.DbURL(port = self.master_port), utility=True)
segs = [seg for seg in gparray.getDbList() if seg.isSegmentPrimary(current_role=True)]
for seg in segs:
try:
RemoteOperation(DeleteCurrentSegDump(self.timestamp, seg.getSegmentDataDirectory(), self.dump_dir),
seg.getSegmentHostName()).run()
except OSError, e:
logger.warn("Error encountered during deletion of %s on %s" % (self.timestamp, seg.getSegmentHostName()))
class DeleteCurrentSegDump(Operation):
""" TODO: Improve with grouping by host. """
def __init__(self, timestamp, datadir, dump_dir):
self.timestamp = timestamp
self.datadir = datadir
self.dump_dir = dump_dir
def execute(self):
path = os.path.join(self.datadir, self.dump_dir, DUMP_DATE)
filenames = ListFilesByPattern(path, "*%s*" % self.timestamp).run()
for filename in filenames:
RemoveFile(os.path.join(path, filename)).run()
class DeleteOldestDumps(Operation):
# TODO: This Operation isn't consuming backup_dir. Should it?
def __init__(self, master_datadir, master_port, dump_dir):
self.master_datadir = master_datadir
self.master_port = master_port
self.dump_dir = dump_dir
def execute(self):
dburl = dbconn.DbURL(port=self.master_port)
old_dates = ListFiles(os.path.join(self.master_datadir, DUMP_DIR)).run()
try:
old_dates.remove(DUMP_DATE)
except ValueError, e: # DUMP_DATE was not found in old_dates
pass
if len(old_dates) == 0:
logger.info("No old backup sets to remove")
return
old_dates.sort()
old_date = old_dates[0]
# This will avoid the problem where we might accidently end up deleting local backup files
logger.info("Preparing to remove dump %s from all hosts" % old_date)
path = os.path.join(self.master_datadir, DUMP_DIR, old_date)
try:
RemoveTree(path).run()
except OSError, e:
logger.warn("Error encountered during deletion of %s" % path)
gparray = GpArray.initFromCatalog(dbconn.DbURL(port = self.master_port), utility=True)
primaries = [seg for seg in gparray.getDbList() if seg.isSegmentPrimary(current_role=True)]
for seg in primaries:
path = os.path.join(seg.getSegmentDataDirectory(), DUMP_DIR, old_date)
try:
RemoveRemoteTree(path, seg.getSegmentHostName()).run()
except ExecutionError, e:
logger.warn("Error encountered during deletion of %s on %s" % (path, seg.getSegmentHostName()))
return old_date
class VacuumDatabase(Operation):
# TODO: move this to gppylib.operations.common?
def __init__(self, database, master_port):
self.database = database
self.master_port = master_port
def execute(self):
conn = None
logger.info('Commencing vacuum of %s database, please wait' % self.database)
try:
dburl = dbconn.DbURL(port=self.master_port, dbname=self.database)
conn = dbconn.connect(dburl)
cursor = conn.cursor()
cursor.execute("commit") # hack to move drop stmt out of implied transaction
cursor.execute("vacuum")
cursor.close()
except Exception, e:
logger.exception('Error encountered with vacuum of %s database' % self.database)
else:
logger.info('Vacuum of %s completed without error' % self.database)
finally:
if conn is not None:
conn.close()
class MailDumpEvent(Operation):
def __init__(self, subject, message):
self.subject = subject
self.message = message
def execute(self):
if "HOME" not in os.environ or "GPHOME" not in os.environ:
logger.warn("Could not find mail_contacts file. Set $HOME and $GPHOME.")
return
mail_file = os.path.join(os.environ["GPHOME"], "bin", "mail_contacts")
home_mail_file = os.path.join(os.environ["HOME"], "mail_contacts")
contacts_file = None
if CheckFile(home_mail_file).run():
contacts_file = home_mail_file
elif CheckFile(mail_file).run():
contacts_file = mail_file
else:
logger.warn("Found neither %s nor %s" % (mail_file, home_mail_file))
logger.warn("Unable to send dump email notification")
logger.info("To enable email notification, create %s or %s containing required email addresses" % (mail_file, home_mail_file))
return
to_addrs = None
with open(contacts_file, 'r') as f:
to_addrs = [line.strip() for line in f]
MailEvent(subject = self.subject,
message = self.message,
to_addrs = to_addrs).run()
class MailEvent(Operation):
# TODO: move this to gppylib.operations.common?
def __init__(self, subject, message, to_addrs):
if isinstance(to_addrs, str):
to_addrs = [to_addrs]
self.subject = subject
self.message = message
self.to_addrs = to_addrs
def execute(self):
logger.info("Sending mail to %s" % ",".join(self.to_addrs))
cmd = "/bin/mailx" if curr_platform == SUNOS else findCmdInPath('mail')
Command('Sending email',
'echo "%s" | %s -s "%s" %s' % (self.message, cmd, self.subject, " ".join(self.to_addrs))).run(validateAfter=True)