| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # Line too long - pylint: disable=C0301 |
| |
| import sys |
| import os |
| import errno |
| import shutil |
| import hashlib |
| import signal |
| import urlparse |
| |
| from gppylib.db import dbconn, catalog |
| from gppylib.db.dbconn import UnexpectedRowsError, executeUpdateOrInsert |
| from gppylib import gplog |
| from gppylib.commands.gp import GpStop, GpStart, get_local_db_mode, get_masterdatadir |
| from gppylib.commands.unix import Ping, RemoveDirectory, RemoveFiles, MakeDirectory |
| from gppylib.operations import Operation |
| from gppylib.operations.utils import RemoteOperation, ParallelOperation |
| from gppylib.operations.unix import CheckFile, CheckDir, ListFiles |
| from gppylib.gparray import GpArray |
| from hawqpylib.hawqarray import HAWQArray |
| from hawqpylib.hawqlib import local_ssh, HawqXMLParser, parse_hosts_file |
| |
| logger = gplog.get_default_logger() |
| |
| GP_TRANSACTION_FILES_FILESPACE = 'gp_transaction_files_filespace' |
| GP_TEMPORARY_FILES_FILESPACE = 'gp_temporary_files_filespace' |
| PG_SYSTEM_FILESPACE = 'pg_system' |
| MASTER_DATA_DIR = get_masterdatadir() |
| NUM_WORKERS = 16 #Number of simultaneous parallel process that are started by ParallelOperation |
| |
| class FileType: |
| """ |
| This class is to emulate an enum |
| """ |
| |
| TRANSACTION_FILES, TEMPORARY_FILES = range(2) |
| lookup = ['TRANSACTION_FILES', 'TEMPORARY_FILES'] |
| |
| class MoveFilespaceError(Exception): |
| def __init__(self, msg): |
| Exception.__init__(self, msg) |
| |
| class CheckFilespaceIsSame(Operation): |
| """ |
| Checks that the given filespace is different |
| from the one currently being used. |
| |
| @return: True if Filespace is different False otherwise |
| """ |
| |
| def __init__(self, gparray, filespace_name, file_type): |
| self.gparray = gparray |
| self.filespace_name = filespace_name |
| self.file_type = file_type |
| |
| def execute(self): |
| logger.info('Checking if filespace is same as current filespace') |
| |
| try: |
| if self.file_type == FileType.TRANSACTION_FILES: |
| filename = GP_TRANSACTION_FILES_FILESPACE |
| else: |
| filename = GP_TEMPORARY_FILES_FILESPACE |
| |
| pg_system_filespace_entries = GetFilespaceEntries(self.gparray, PG_SYSTEM_FILESPACE).run() |
| |
| file_path = os.path.join(pg_system_filespace_entries[0][2], filename) |
| |
| if self.filespace_name == 'pg_system' and not CheckFile(file_path).run(): |
| return True |
| |
| if CheckFile(file_path).run(): |
| with open(file_path) as file: |
| oid = int(file.readline().strip()) |
| if self.gparray.getFileSpaceName(oid) == self.filespace_name: |
| return True |
| except Exception, e: |
| raise MoveFilespaceError('Failed to check if filespace is same.') |
| |
| return False |
| |
| class CheckFilespace(Operation): |
| """ |
| Checks that the given filespace exists. |
| |
| @return: True if Filespace exists False otherwise |
| """ |
| SELECT_FILESPACENAME_QUERY = """ |
| select * from pg_filespace where fsname = \'%s\' |
| """ |
| |
| def __init__(self, filespace): |
| self.filespace = filespace |
| |
| def execute(self): |
| logger.info('Checking if filespace %s exists' % self.filespace) |
| |
| dburl = dbconn.DbURL() |
| query = self.SELECT_FILESPACENAME_QUERY % self.filespace |
| |
| logger.debug('Connecting to database') |
| with dbconn.connect(dburl, utility=True) as conn: |
| try: |
| logger.debug('Executing query %s' % query) |
| tuple = dbconn.execSQLForSingletonRow(conn, query) |
| except UnexpectedRowsError, e: |
| if e.actual == 0: |
| return False |
| raise |
| |
| return True |
| |
| class CheckSuperUser(Operation): |
| """ |
| Checks if the user running the gpfilespace utility |
| is a super user or not. |
| """ |
| def __init__(self, dburl): |
| self.dburl = dburl |
| |
| def execute(self): |
| is_super = None |
| conn = dbconn.connect(self.dburl, utility=True) |
| try: |
| q = "SELECT usesuper FROM pg_user where usename = user" |
| rows = catalog.basicSQLExec(conn, q) |
| is_super = rows[0][0] |
| except Exception, e: |
| raise MoveFilespaceError(str(e)) |
| finally: |
| conn.close() |
| |
| if not is_super: |
| return False |
| |
| return True |
| |
| class CheckConnectivity(Operation): |
| """ |
| Check if all hosts are reachable |
| in the cluster. |
| """ |
| def __init__(self, gparray): |
| self.gparray = gparray |
| |
| def execute(self): |
| logger.info('Checking for connectivity') |
| |
| host_set = self.gparray.getHostList() |
| |
| for host in host_set: |
| try: |
| Ping.local("pinging host %s" % host, host) |
| except Exception, e: |
| logger.error('Cannot connect to host %s: %s' % (host, e)) |
| return False |
| |
| return True |
| |
| class CheckFilespaceOidLocally(Operation): |
| """ |
| Returns the oid present in the flat files |
| """ |
| def __init__(self, flat_file_location): |
| self.flat_file_location = flat_file_location |
| |
| def execute(self): |
| logger.info('flat file location = %s' % self.flat_file_location) |
| if not CheckFile(self.flat_file_location).run(): |
| return -1 |
| |
| with open(self.flat_file_location, 'r') as file: |
| oid = file.readline().strip() |
| return oid |
| |
| class CheckFilespaceEntriesLocally(Operation): |
| """ |
| Check if filespace entries are valid |
| """ |
| def __init__(self, cur_filespace_entry, peer_filespace_entry, filespace_location, file_type): |
| self.cur_filespace_entry = cur_filespace_entry |
| self.peer_filespace_entry = peer_filespace_entry |
| self.filespace_location = filespace_location |
| self.file_type = file_type |
| |
| def execute(self): |
| if self.file_type == FileType.TRANSACTION_FILES: |
| flat_file = GP_TRANSACTION_FILES_FILESPACE |
| elif self.file_type == FileType.TEMPORARY_FILES: |
| flat_file = GP_TEMPORARY_FILES_FILESPACE |
| |
| is_consistent = True |
| flat_file_location = os.path.join(self.filespace_location, flat_file) |
| |
| #Default filespace pg_system |
| if not os.path.exists(flat_file_location): |
| return is_consistent |
| |
| if not os.path.exists(self.cur_filespace_entry[2]): |
| logger.info('path %s does not exist' % self.cur_filespace_entry[2]) |
| is_consistent = False |
| |
| #Read flat file entries and check if the directories exist |
| entries_set = set() |
| with open(flat_file_location) as file: |
| for line in file: |
| entries = line.split() |
| if len(entries) != 2: |
| continue |
| else: |
| logger.info('path = %s' % entries[1].strip()) |
| logger.info('dbid = %s' % entries[0].strip()) |
| entries_set.add(entries[0].strip() + ' ' + entries[1].strip()) |
| |
| required_set = set() |
| required_set.add(str(self.cur_filespace_entry[1]) + ' ' + self.cur_filespace_entry[2]) |
| |
| if self.peer_filespace_entry is not None: |
| required_set.add(str(self.peer_filespace_entry[1]) + ' ' + self.peer_filespace_entry[2]) |
| |
| if required_set != entries_set: |
| is_consistent = False |
| |
| return is_consistent |
| |
| class CheckFilespacePermissions(Operation): |
| """ |
| Checks if the filespace has the correct permissions |
| """ |
| def __init__(self, location): |
| self.location = location |
| |
| def execute(self): |
| |
| path = os.path.dirname(self.location) |
| |
| #In case of recoverseg, the segment directories will be |
| #created by the backend code. |
| if not os.path.exists(path): |
| path = os.path.dirname(path) |
| |
| self.location = os.path.join(path, os.path.basename(self.location)) |
| |
| try: |
| with open(self.location, 'w') as file: |
| pass |
| os.remove(self.location) |
| except IOError, e: |
| if e.errno == errno.EACCES: |
| return False |
| else: |
| raise |
| return True |
| |
| class CheckFilespaceConsistency(Operation): |
| """ |
| Checks that the filespaces for transaction and temporary |
| files are consistent on all segments |
| """ |
| def __init__(self, gparray, file_type): |
| self.gparray = gparray |
| self.file_type = file_type |
| |
| def execute(self): |
| logger.info('Checking for filespace consistency') |
| if self.file_type == FileType.TRANSACTION_FILES: |
| flat_file = GP_TRANSACTION_FILES_FILESPACE |
| elif self.file_type == FileType.TEMPORARY_FILES: |
| flat_file = GP_TEMPORARY_FILES_FILESPACE |
| |
| operations = [] |
| pg_system_fs_entries = GetFilespaceEntriesDict(GetFilespaceEntries(self.gparray, |
| PG_SYSTEM_FILESPACE).run()).run() |
| cur_filespace_entries = GetFilespaceEntriesDict(GetCurrentFilespaceEntries(self.gparray, |
| self.file_type).run()).run() |
| for seg in self.gparray.getDbList(): |
| flat_file_location = os.path.join(pg_system_fs_entries[seg.getSegmentDbId()][2], |
| flat_file) |
| logger.debug('flat file location = %s' % flat_file_location) |
| operations.append(RemoteOperation(CheckFilespaceOidLocally(flat_file_location), |
| seg.getSegmentHostName() |
| ) |
| ) |
| ParallelOperation(operations, NUM_WORKERS).run() |
| |
| try: |
| oid_set = set([int(op.get_ret()) for op in operations]) |
| except Exception, e: |
| logger.error('Invalid OID in flat file on host %s' % op.host) |
| return False |
| |
| logger.debug('%s oid set = %s' % (FileType.lookup[self.file_type], oid_set)) |
| |
| if len(oid_set) != 1: |
| logger.error('Multiple OIDs found in flat files') |
| return False |
| |
| #Verify that we have a valid oid and that |
| #the filespace exists in the catalog. |
| try: |
| oid = int(oid_set.pop()) |
| if oid == -1: |
| fs_name = 'pg_system' |
| else: |
| fs_name = self.gparray.getFileSpaceName(oid) |
| except Exception, e: |
| logger.error('%s OIDs are inconsistent for %s filespace' % (FileType.lookup[self.file_type], fs_name)) |
| return False |
| logger.info('%s OIDs are consistent for %s filespace' % (FileType.lookup[self.file_type], fs_name)) |
| |
| #Now check for the filespace entries |
| operation_list = [] |
| for seg in self.gparray.getDbList(): |
| cur_filespace_entry = cur_filespace_entries[seg.getSegmentDbId()] |
| peer_filespace_entry = get_peer_filespace_entry(cur_filespace_entries, seg.getSegmentDbId(), seg.getSegmentContentId(), self.gparray.getDbList()) |
| logger.debug('current_filespace_entry = %s' % str(cur_filespace_entry)) |
| logger.debug('peer_filespace_entry = %s' % str(peer_filespace_entry)) |
| operation_list.append(RemoteOperation(CheckFilespaceEntriesLocally(cur_filespace_entry, peer_filespace_entry, pg_system_fs_entries[seg.getSegmentDbId()][2], self.file_type), |
| seg.getSegmentHostName() |
| ) |
| ) |
| |
| ParallelOperation(operation_list, NUM_WORKERS).run() |
| |
| for operation in operation_list: |
| try: |
| if not operation.get_ret(): |
| logger.error('%s entries are inconsistent for %s filespace on host %s' % (FileType.lookup[self.file_type], fs_name, operation.host)) |
| return False |
| except Exception, e: |
| logger.error('%s entries are inconsistent for %s filespace on host %s' % (FileType.lookup[self.file_type], fs_name, operation.host)) |
| return False |
| |
| logger.info('%s entries are consistent for %s filespace' % (FileType.lookup[self.file_type], fs_name)) |
| |
| return True |
| |
| def get_peer_filespace_entry(filespace_entries, dbid, content_id, segments): |
| """ |
| Returns the filespace entry for a given |
| DbId corresponding to its peer. |
| i.e given primary, it returns the filespace |
| entry of the mirror, and given mirror, it |
| returns the filespace entry of the primary. |
| |
| @param: DbId of the primary segment |
| content id of the segment |
| @return: A tuple of the form |
| filespace oid, dbid, filespace directory |
| """ |
| logger.debug('Getting peer filespace entry for %s' % dbid) |
| for seg in segments: |
| if seg.getSegmentContentId() == content_id and seg.getSegmentDbId() != dbid: |
| return filespace_entries[seg.getSegmentDbId()] |
| |
| |
| class UpdateFlatFilesLocally(Operation): |
| """ |
| Update the flat files on primaries/mirrors locally |
| """ |
| def __init__(self, flat_file, filespace_oid, cur_filespace_entry, peer_filespace_entry): |
| self.flat_file = flat_file |
| self.filespace_oid = filespace_oid |
| self.cur_filespace_entry = cur_filespace_entry |
| self.peer_filespace_entry = peer_filespace_entry |
| |
| def execute(self): |
| logger.info('Updating flat files') |
| |
| #If flat file directory does not exist, we create it |
| if not os.path.exists(os.path.dirname(self.flat_file)): |
| os.mkdir(os.path.dirname(self.flat_file)) |
| |
| temp_flat_file = self.flat_file + '.tmp' |
| lines_to_write = str(self.filespace_oid) + '\n' +\ |
| str(self.cur_filespace_entry[1]) + ' ' + self.cur_filespace_entry[2] + '\n' |
| if self.peer_filespace_entry is not None: |
| lines_to_write += str(self.peer_filespace_entry[1]) + ' ' + self.peer_filespace_entry[2] + '\n' |
| |
| with open(temp_flat_file, 'w') as file: |
| file.write(lines_to_write) |
| |
| logger.debug('Wrote contents = %s' % lines_to_write) |
| |
| #Read back and check if what we've written is the same |
| with open(temp_flat_file) as file: |
| contents = file.read() |
| if contents != lines_to_write: |
| raise MoveFilespaceError('Failed to write contents to flat file %s' % lines_to_write) |
| |
| shutil.move(temp_flat_file, self.flat_file) |
| |
| if not os.path.exists(self.cur_filespace_entry[2]): |
| os.makedirs(self.cur_filespace_entry[2]) |
| |
| return self.flat_file + ' ' + contents |
| |
| class UpdateFlatFiles(Operation): |
| """ |
| Update the flat files on primaries separately |
| in case of addmirrors. |
| Update the flat files on the mirrors separately |
| in case of gpexpand. |
| """ |
| def __init__(self, gparray, primaries=True, expansion=False): |
| self.gparray = gparray |
| self.primaries = primaries |
| self.expansion = expansion |
| |
| def execute(self): |
| |
| #Obtain list of segments from gparray |
| if self.expansion: |
| db_list = self.gparray.getExpansionSegDbList() |
| else: |
| db_list = self.gparray.getDbList() |
| |
| if self.primaries: |
| segments = [seg for seg in db_list if seg.isSegmentPrimary()] |
| else: |
| segments = [seg for seg in db_list if seg.isSegmentMirror()] |
| |
| logger.debug('segment_list = %s' % self.gparray.getDbList()) |
| logger.debug('segments on which flat files will be updated = %s' % segments) |
| pg_system_filespace_entries = GetFilespaceEntriesDict(GetFilespaceEntries(self.gparray, |
| PG_SYSTEM_FILESPACE).run() |
| ).run() |
| transaction_flat_file = os.path.join(pg_system_filespace_entries[1][2], GP_TRANSACTION_FILES_FILESPACE) |
| if os.path.exists(transaction_flat_file): |
| logger.debug('Updating transaction flat files') |
| cur_filespace_entries = GetFilespaceEntriesDict(GetCurrentFilespaceEntries(self.gparray, FileType.TRANSACTION_FILES |
| ).run() |
| ).run() |
| operation_list = [] |
| for seg in segments: |
| filespace_oid = cur_filespace_entries[seg.getSegmentDbId()][0] |
| cur_filespace_entry = cur_filespace_entries[seg.getSegmentDbId()] |
| peer_filespace_entry = get_peer_filespace_entry(cur_filespace_entries, seg.getSegmentDbId(), seg.getSegmentContentId(), db_list) |
| logger.debug('cur_filespace_entry = %s' % str(cur_filespace_entry)) |
| logger.debug('peer_filespace_entry = %s' % str(peer_filespace_entry)) |
| flat_file = os.path.join(pg_system_filespace_entries[seg.getSegmentDbId()][2], |
| GP_TRANSACTION_FILES_FILESPACE) |
| operation_list.append(RemoteOperation(UpdateFlatFilesLocally(flat_file, |
| filespace_oid, |
| cur_filespace_entry, |
| peer_filespace_entry |
| ), |
| seg.getSegmentHostName()) |
| ) |
| |
| ParallelOperation(operation_list, NUM_WORKERS).run() |
| |
| try: |
| for operation in operation_list: |
| operation.get_ret() |
| except Exception, e: |
| raise MoveFilespaceError('Failed to update transaction flat file.') |
| |
| temporary_flat_file = os.path.join(pg_system_filespace_entries[1][2], GP_TEMPORARY_FILES_FILESPACE) |
| if os.path.exists(temporary_flat_file): |
| logger.debug('Updating temporary flat files') |
| cur_filespace_entries = GetFilespaceEntriesDict(GetCurrentFilespaceEntries(self.gparray, FileType.TEMPORARY_FILES |
| ).run() |
| ).run() |
| operation_list = [] |
| for seg in segments: |
| filespace_oid = cur_filespace_entries[seg.getSegmentDbId()][0] |
| cur_filespace_entry = cur_filespace_entries[seg.getSegmentDbId()] |
| peer_filespace_entry = get_peer_filespace_entry(cur_filespace_entries, seg.getSegmentDbId(), seg.getSegmentContentId(), db_list) |
| logger.debug('cur_filespace_entry = %s' % str(cur_filespace_entry)) |
| logger.debug('peer_filespace_entry = %s' % str(peer_filespace_entry)) |
| flat_file = os.path.join(pg_system_filespace_entries[seg.getSegmentDbId()][2], |
| GP_TEMPORARY_FILES_FILESPACE) |
| operation_list.append(RemoteOperation(UpdateFlatFilesLocally(flat_file, |
| filespace_oid, |
| cur_filespace_entry, |
| peer_filespace_entry |
| ), |
| seg.getSegmentHostName()) |
| ) |
| |
| ParallelOperation(operation_list, NUM_WORKERS).run() |
| |
| try: |
| for operation in operation_list: |
| operation.get_ret() |
| except Exception, e: |
| raise MoveFilespaceError('Failed to update temporary flat file.') |
| |
| class MoveTransFilespaceLocally(Operation): |
| """ |
| Move the Filespace to the new location for |
| transaction files. |
| This requires the pg_system filespace entry for a particular dbid |
| because that information is needed to write the gp_transaction_files_filespace |
| flat file. |
| |
| @param: filespace entries from the pg_filespace_entry table. |
| @return: None |
| """ |
| TRANSACTION_FILES_DIRS = ['pg_xlog', 'pg_multixact', 'pg_subtrans', 'pg_clog', |
| 'pg_distributedlog', 'pg_distributedxidmap'] |
| |
| def __init__(self, current_filespace_entry, new_filespace_name, new_filespace_entry, peer_filespace_entry, pg_system_filespace_entry, rollback=False): |
| self.new_filespace_name = new_filespace_name |
| self.new_filespace_entry = new_filespace_entry |
| self.peer_filespace_entry = peer_filespace_entry |
| self.current_filespace_entry = current_filespace_entry |
| self.pg_system_filespace_entry = pg_system_filespace_entry |
| self.rollback = rollback |
| |
| def get_md5(self, directory): |
| m = hashlib.md5() |
| |
| files_to_hash = [] |
| for root, dirs, filenames in os.walk(directory): |
| for filename in filenames: |
| file = os.path.join(root, filename) |
| files_to_hash.append(file) |
| |
| files_to_hash.sort() |
| |
| for file in files_to_hash: |
| with open(file, 'rb') as f: |
| while True: |
| data = f.read(128) |
| if not data: |
| break |
| m.update(data) |
| return m.hexdigest() |
| |
| def md5_check(self, src_dir, dst_dir): |
| src_md5 = self.get_md5(src_dir) |
| dst_md5 = self.get_md5(dst_dir) |
| |
| logger.info('md5 checksum for %s = %s' % (src_dir, src_md5)) |
| logger.info('md5 checksum for %s = %s' % (dst_dir, dst_md5)) |
| |
| if src_md5 != dst_md5: |
| raise MoveFilespaceError('MD5 Checksum failed for %s' % src_dir) |
| |
| def execute(self): |
| |
| #if the filespace directory is not present on the segment, |
| #We simply return. |
| if not CheckDir(self.current_filespace_entry[2]).run(): |
| return |
| |
| tmp_file = os.path.join(self.new_filespace_entry[2], 'tmp_file') |
| if not CheckFilespacePermissions(tmp_file).run(): |
| raise MoveFilespaceError('Invalid permissions for %s' % tmp_file) |
| |
| stats = os.statvfs(os.path.dirname(self.new_filespace_entry[2])) |
| free_bytes = stats.f_bfree * stats.f_frsize |
| logger.info('free_bytes for %s = %s' % (self.new_filespace_entry[2], free_bytes)) |
| if not free_bytes: |
| raise MoveFilespaceError('Insufficient diskspace for %s' % self.new_filespace_entry[2]) |
| |
| #1. Copy the contents of the filespace dirs from old filespace to new filespace |
| for directory in self.TRANSACTION_FILES_DIRS: |
| src_dir = os.path.join(self.current_filespace_entry[2], directory) |
| dst_dir = os.path.join(self.new_filespace_entry[2], directory) |
| |
| logger.info('Copying directories from %s to %s' % (src_dir, dst_dir)) |
| try: |
| #In case of a rollback, we need to retain the directories that |
| #already exist on the source segment. If it doesn't then we do a |
| #copy. |
| if self.rollback: |
| if CheckDir(src_dir).run() and not CheckDir(dst_dir).run(): |
| logger.info('copying %s' % src_dir) |
| shutil.copytree(src_dir, dst_dir) |
| try: |
| self.md5_check(src_dir, dst_dir) |
| except (IOError, OSError), e: |
| raise MoveFilespaceError('Failed to calculate md5 checksums !') |
| elif CheckDir(src_dir).run(): |
| #We use the src segment as the source of truth and copy everything |
| if CheckDir(dst_dir).run(): |
| shutil.rmtree(dst_dir) |
| |
| logger.info('copying %s' % src_dir) |
| shutil.copytree(src_dir, dst_dir) |
| |
| try: |
| self.md5_check(src_dir, dst_dir) |
| except (IOError, OSError), e: |
| raise MoveFilespaceError('Failed to calculate md5 checksums !') |
| |
| except (IOError, OSError), e: |
| logger.error('Failed to copy transaction files to new Filespace location.') |
| raise |
| |
| #2. Drop the directories in current filespace |
| for directory in self.TRANSACTION_FILES_DIRS: |
| src_dir = os.path.join(self.current_filespace_entry[2], directory) |
| try: |
| if CheckDir(src_dir).run(): |
| logger.info('Dropping dir %s' % src_dir) |
| shutil.rmtree(src_dir) |
| except (IOError, OSError), e: |
| logger.error('Failed to drop transaction files directories from current filespace.') |
| raise |
| |
| gp_transaction_files_filespace_path = os.path.join(self.pg_system_filespace_entry[2], GP_TRANSACTION_FILES_FILESPACE) |
| #If we are moving to default Filespace, then we need to delete the flat file |
| if self.new_filespace_name == PG_SYSTEM_FILESPACE: |
| if CheckFile(gp_transaction_files_filespace_path).run(): |
| os.remove(gp_transaction_files_filespace_path) |
| return |
| |
| #3. Write the dbid and directories into tmp directory |
| temp_gp_transaction_files_filespace_path = gp_transaction_files_filespace_path + '.tmp' |
| |
| logger.info('Writing filespace information into flat file') |
| with open(temp_gp_transaction_files_filespace_path, 'w') as file: |
| lines_to_write = str(self.new_filespace_entry[0]) + '\n' +\ |
| str(self.new_filespace_entry[1]) + ' ' + self.new_filespace_entry[2] + '\n' |
| #In case of system without mirrors |
| if self.peer_filespace_entry is not None: |
| lines_to_write += str(self.peer_filespace_entry[1]) + ' ' + self.peer_filespace_entry[2] + '\n' |
| |
| file.write(lines_to_write) |
| |
| #Read back and check the file contents |
| with open(temp_gp_transaction_files_filespace_path) as file: |
| contents = file.read() |
| if contents != lines_to_write: |
| raise MoveFilespaceError('Failed to write contents to flat file %s' % lines_to_write) |
| |
| #4. After the file copy has been completed successfully, |
| # copy over the tmp directory file |
| shutil.move(temp_gp_transaction_files_filespace_path, gp_transaction_files_filespace_path) |
| |
| class MoveTempFilespaceLocally(Operation): |
| """ |
| Move the Filespace to the new location for |
| temporary files. |
| |
| @return: None |
| """ |
| TEMPORARY_FILES_DIR = 'pgsql_tmp' |
| |
| def __init__(self, current_filespace_entry, new_filespace_name, new_filespace_entry, peer_filespace_entry, pg_system_filespace_entry, rollback=None): |
| self.new_filespace_name = new_filespace_name |
| self.new_filespace_entry = new_filespace_entry |
| self.peer_filespace_entry = peer_filespace_entry |
| self.current_filespace_entry = current_filespace_entry |
| self.pg_system_filespace_entry = pg_system_filespace_entry |
| self.rollback = rollback |
| |
| def execute(self): |
| |
| if not CheckDir(self.current_filespace_entry[2]).run(): |
| return |
| |
| tmp_file = os.path.join(self.new_filespace_entry[2], 'tmp_file') |
| if not CheckFilespacePermissions(tmp_file).run(): |
| raise MoveFilespaceError('Invalid permissions for %s' % tmp_file) |
| |
| stats = os.statvfs(os.path.dirname(self.new_filespace_entry[2])) |
| free_bytes = stats.f_bfree * stats.f_frsize |
| logger.info('free_bytes for %s = %s' % (self.new_filespace_entry[2], free_bytes)) |
| if not free_bytes: |
| raise MoveFilespaceError('Insufficient diskspace for %s' % self.new_filespace_entry[2]) |
| |
| #1. Drop the directories from old filespace location |
| #If the filespace is being moved from pg_system filespace |
| #for temp files will be slightly different as they will be stored under |
| #<filespace_dir>/base/<database_oid>/ |
| gp_temporary_files_filespace_path = os.path.join(self.pg_system_filespace_entry[2], GP_TEMPORARY_FILES_FILESPACE) |
| |
| if not CheckFile(gp_temporary_files_filespace_path).run(): |
| base_dir = os.path.join(self.current_filespace_entry[2], 'base') |
| |
| directories = [] |
| try: |
| directories = ListFiles(base_dir).run() |
| except Exception, e: |
| if self.rollback: |
| pass |
| else: |
| raise |
| |
| for directory in directories: |
| try: |
| dst_dir = os.path.join(base_dir, directory, self.TEMPORARY_FILES_DIR) |
| if CheckDir(dst_dir).run(): |
| logger.info('Dropping dir %s' % dst_dir) |
| shutil.rmtree(dst_dir) |
| except (IOError, OSError), e: |
| logger.error('Failed to delete temporary files') |
| raise |
| else: |
| #else if filespace has been moved before, the temp files directory will always be under |
| #<filespace_dir>/pgsql_tmp |
| dst_dir = os.path.join(self.current_filespace_entry[2], self.TEMPORARY_FILES_DIR) |
| try: |
| if CheckDir(dst_dir).run(): |
| logger.info('Dropping dir %s' %dst_dir) |
| shutil.rmtree(dst_dir) |
| except (IOError, OSError), e: |
| logger.error('Failed to delete temporary filespace directories') |
| raise |
| |
| #If we are moving to default Filespace, then we need to delete the flat file |
| if self.new_filespace_name == PG_SYSTEM_FILESPACE: |
| if CheckFile(gp_temporary_files_filespace_path).run(): |
| os.remove(gp_temporary_files_filespace_path) |
| return |
| |
| #2. Write the dbid and directories into tmp directory |
| temp_gp_temporary_files_filespace_path = gp_temporary_files_filespace_path + '.tmp' |
| |
| logger.info('Writing filespace information into flat file') |
| with open(temp_gp_temporary_files_filespace_path, 'w') as file: |
| lines_to_write = str(self.new_filespace_entry[0]) + '\n' +\ |
| str(self.new_filespace_entry[1]) + ' ' + self.new_filespace_entry[2] + '\n' |
| #In case of master, when there is no standby, peer will be None |
| if self.peer_filespace_entry is not None: |
| lines_to_write += str(self.peer_filespace_entry[1]) + ' ' + self.peer_filespace_entry[2] + '\n' |
| |
| file.write(lines_to_write) |
| |
| #Read back and check the contents |
| with open(temp_gp_temporary_files_filespace_path) as file: |
| contents = file.read() |
| if contents != lines_to_write: |
| raise MoveFilespaceError('Failed to write contents to flat file %s' % lines_to_write) |
| |
| #3. After the file copy has been completed successfully, |
| # copy over the tmp directory file |
| shutil.move(temp_gp_temporary_files_filespace_path, gp_temporary_files_filespace_path) |
| |
| class GetFilespaceEntries(Operation): |
| """ |
| Returns the pg_filespace_entry table contents for a given filespace. |
| Each entry will be of the form oid, dbid, filespace_directory |
| |
| @return: List containing the directories used by the filespace |
| """ |
| def __init__(self, gparray, filespace_name): |
| self.filespace_name = filespace_name |
| self.gparray = gparray |
| |
| def execute(self): |
| logger.debug('Obtaining filespace information for filespace %s' % self.filespace_name) |
| filespace_entries = [] |
| |
| filespaces = self.gparray.getFilespaces(includeSystemFilespace=True) |
| filespace_oid = None |
| for fs in filespaces: |
| if fs.getName() == self.filespace_name: |
| filespace_oid = fs.getOid() |
| |
| if filespace_oid is None: |
| raise MoveFilespaceError('Invalid filespace name.') |
| |
| for seg in self.gparray.getDbList(includeExpansionSegs=True): |
| filespace_entries.append((filespace_oid, seg.getSegmentDbId(), seg.getSegmentFilespaces()[filespace_oid])) |
| |
| return filespace_entries |
| |
| class GetCurrentFilespaceEntries(Operation): |
| """ |
| The results returned will contain the entries from |
| pg_filespace_entry table for the current filespace used |
| by transaction/temporary files. |
| """ |
| |
| def __init__(self, gparray, file_type): |
| self.file_type = file_type |
| self.gparray = gparray |
| |
| def execute(self): |
| logger.info('Obtaining current filespace entries used by %s' % FileType.lookup[self.file_type]) |
| filespace_entries = [] |
| default_filespace_entries = GetFilespaceEntries(self.gparray, PG_SYSTEM_FILESPACE).run() |
| flat_files_dir = default_filespace_entries[0][2] |
| flat_file = None |
| |
| if self.file_type == FileType.TRANSACTION_FILES: |
| flat_file = GP_TRANSACTION_FILES_FILESPACE |
| elif self.file_type == FileType.TEMPORARY_FILES: |
| flat_file = GP_TEMPORARY_FILES_FILESPACE |
| |
| gp_filespace_path = os.path.join(flat_files_dir, flat_file) |
| |
| try: |
| if CheckFile(gp_filespace_path).run(): |
| with open(gp_filespace_path) as file: |
| fs_oid = file.readline().strip() |
| filespace_name = self.gparray.getFileSpaceName(int(fs_oid)) |
| filespace_entries = GetFilespaceEntries(self.gparray, filespace_name).run() |
| except TypeError, e: |
| raise MoveFilespaceError('Invalid oid in flat file. %s' % str(e)) |
| except (IOError, OSError), e: |
| raise MoveFilespaceError('Failed to read flat file.%s' % str(e)) |
| |
| if len(filespace_entries) == 0: |
| filespace_entries = default_filespace_entries |
| |
| return filespace_entries |
| |
| class GetFilespaceEntriesDict(Operation): |
| """ |
| @param: A list of filespace entries |
| @return: A dict containing the directories for each |
| corresponding database id. |
| """ |
| |
| def __init__(self, filespace_entries): |
| self.filespace_entries = filespace_entries |
| |
| def execute(self): |
| logger.debug('Converting filespace entries into dict') |
| db_directories_dict = {} |
| |
| for entry in self.filespace_entries: |
| dbid = entry[1] |
| db_directories_dict[dbid] = entry |
| |
| return db_directories_dict |
| |
| class RollBackFilespaceChanges(Operation): |
| """ |
| This does the reverse operation of Move(). |
| Since the Move from cur -> new Filespace failed, |
| we do the reverse and move files back from new -> cur. |
| """ |
| def __init__(self, segments, file_type, cur_filespace_name, cur_filespace_entries, new_filespace_entries, pg_system_filespace_entries): |
| self.segments = segments |
| self.file_type = file_type |
| self.cur_filespace_name = cur_filespace_name |
| self.cur_filespace_entries = cur_filespace_entries |
| self.new_filespace_entries = new_filespace_entries |
| self.pg_system_filespace_entries = pg_system_filespace_entries |
| |
| def execute(self): |
| logger.info('Rolling back filespace changes ...') |
| operations = [] |
| for seg in self.segments: |
| logger.debug('Creating RemoteOperation for segment %s' % seg) |
| peer_filespace_entry = get_peer_filespace_entry(self.cur_filespace_entries, seg.getSegmentDbId(), seg.getSegmentContentId(), self.segments) |
| if self.file_type == FileType.TRANSACTION_FILES: |
| #Move from new -> cur |
| operations.append(RemoteOperation(MoveTransFilespaceLocally(self.new_filespace_entries[seg.getSegmentDbId()], |
| self.cur_filespace_name, |
| self.cur_filespace_entries[seg.getSegmentDbId()], |
| peer_filespace_entry, |
| self.pg_system_filespace_entries[seg.getSegmentDbId()], |
| rollback=True |
| ), |
| seg.getSegmentHostName() |
| ), |
| ) |
| elif self.file_type == FileType.TEMPORARY_FILES: |
| operations.append(RemoteOperation(MoveTempFilespaceLocally(self.new_filespace_entries[seg.getSegmentDbId()], |
| self.cur_filespace_name, |
| self.cur_filespace_entries[seg.getSegmentDbId()], |
| peer_filespace_entry, |
| self.pg_system_filespace_entries[seg.getSegmentDbId()], |
| rollback=True |
| ), |
| seg.getSegmentHostName() |
| ), |
| ) |
| |
| logger.debug('Running remote operations in parallel') |
| ParallelOperation(operations, NUM_WORKERS).run() |
| |
| logger.debug('Checking results of parallel operations') |
| for operation in operations: |
| operation.get_ret() |
| |
| class GetMoveOperationList(Operation): |
| """ |
| @return: A list containing a set of RemoteOperations |
| that are executed on remote hosts via ParallelOperation. |
| """ |
| def __init__(self, segments, file_type, new_filespace_name, new_filespace_entries, cur_filespace_entries, pg_system_filespace_entries): |
| self.segments = segments |
| self.file_type = file_type |
| self.new_filespace_name = new_filespace_name |
| self.new_filespace_entries = new_filespace_entries |
| self.cur_filespace_entries = cur_filespace_entries |
| self.pg_system_filespace_entries = pg_system_filespace_entries |
| |
| def execute(self): |
| logger.info('Creating RemoteOperations list') |
| operations = [] |
| for seg in self.segments: |
| logger.debug('segment_dbid = %s' % seg.getSegmentDbId()) |
| logger.debug('segmenthostname = %s' % seg.getSegmentHostName()) |
| logger.debug(self.new_filespace_entries[seg.getSegmentDbId()]) |
| logger.debug(self.cur_filespace_entries[seg.getSegmentDbId()]) |
| peer_filespace_entry = get_peer_filespace_entry(self.new_filespace_entries, seg.getSegmentDbId(), seg.getSegmentContentId(), self.segments) |
| if self.file_type == FileType.TRANSACTION_FILES: |
| operations.append(RemoteOperation(MoveTransFilespaceLocally(self.cur_filespace_entries[seg.getSegmentDbId()], |
| self.new_filespace_name, |
| self.new_filespace_entries[seg.getSegmentDbId()], |
| peer_filespace_entry, |
| self.pg_system_filespace_entries[seg.getSegmentDbId()] |
| ), |
| seg.getSegmentHostName())) |
| elif self.file_type == FileType.TEMPORARY_FILES: |
| operations.append(RemoteOperation(MoveTempFilespaceLocally(self.cur_filespace_entries[seg.getSegmentDbId()], |
| self.new_filespace_name, |
| self.new_filespace_entries[seg.getSegmentDbId()], |
| peer_filespace_entry, |
| self.pg_system_filespace_entries[seg.getSegmentDbId()] |
| ), |
| seg.getSegmentHostName())) |
| return operations |
| |
| class MoveFileSpaceLocation(Operation): |
| """ |
| Move filespace location to a new location on dfs |
| """ |
| |
| def __init__(self, fsname, location, user, pswd): |
| self.user = user |
| self.pswd = pswd |
| self.fsname = fsname |
| self.location = location |
| self.dburl = dbconn.DbURL(username=self.user, password=self.pswd) |
| |
| def start_master_only(self): |
| logger.info('Starting Database in upgrade mode') |
| cmd = 'hawq start master -U upgrade' |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to start HAWQ Database in upgrade mode") |
| return result |
| |
| def stop_master_only(self): |
| logger.info('Stopping HAWQ master') |
| cmd = 'hawq stop master -a' |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to stop HAWQ master") |
| return result |
| |
| def stop_database(self): |
| logger.info('Stopping HAWQ cluster') |
| cmd = 'hawq stop cluster -a' |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to stop HAWQ master") |
| return result |
| |
| def check_database_stopped(self): |
| try: |
| mode = get_local_db_mode(MASTER_DATA_DIR) |
| logger.info('Database was started in %s mode' % mode) |
| except Exception, e: |
| logger.info('Database might already be stopped.') |
| return True |
| |
| return False |
| |
| def prepare_tasks(self): |
| conn = dbconn.connect(dbconn.DbURL(username=self.user, password=self.pswd), utility=True) |
| |
| try: |
| # Check if the filespace exist and can be changed, get its oid |
| filespaceRow = dbconn.execSQL(conn, ''' |
| SELECT oid, fsfsys |
| FROM pg_filespace |
| WHERE fsname = '%s'; |
| ''' % self.fsname) |
| |
| if filespaceRow.rowcount != 1: |
| logger.error('Filespace %s does not exist' % self.fsname) |
| return None |
| |
| filespace = filespaceRow.fetchall()[0] |
| |
| if filespace[1] == 0: |
| logger.error('Filespace "%s" is not created on distributed file system' % self.fsname) |
| return None |
| |
| self.fsoid = filespace[0] |
| filespaceRow.close() |
| |
| #remove last slash |
| if self.location[-1] is '/': |
| self.location = self.location[:-1] |
| |
| queries = [] |
| #generate sql for pg_filespace_entry |
| uri = self.location |
| queries.append(''' |
| UPDATE pg_filespace_entry |
| SET fselocation = '%s' |
| WHERE fsefsoid = %d; |
| ''' %(uri, self.fsoid)) |
| |
| #get gp_persistent_filespace |
| rows = dbconn.execSQL(conn, ''' |
| SELECT location |
| FROM gp_persistent_filespace_node |
| WHERE filespace_oid = %d; |
| ''' % self.fsoid) |
| |
| #generate sql for gp_persistent_filespace |
| for row in rows: |
| uri = '%s' % (self.location) |
| |
| paddinglen = len(row[0]) |
| if len(uri) > paddinglen: |
| logger.error('target location "%s" is too long' % self.location) |
| return None |
| |
| queries.append(''' |
| UPDATE gp_persistent_filespace_node |
| SET location = '%s' |
| WHERE filespace_oid = %d; |
| ''' %(uri.ljust(paddinglen), self.fsoid)) |
| |
| return queries |
| |
| finally: |
| conn.close() |
| |
| |
| def update_filespace_locations(self, queries): |
| conn = dbconn.connect(dbconn.DbURL(username=self.user, password=self.pswd), utility=True, allowSystemTableMods='all') |
| |
| try: |
| logger.info('Updating catalog...') |
| |
| for query in queries: |
| logger.debug('executing ...%s' % query) |
| executeUpdateOrInsert(conn, query, 1) |
| |
| dbconn.execSQL(conn, "COMMIT") |
| logger.info('catalog updated successfully') |
| |
| finally: |
| conn.close() |
| |
| def execute(self): |
| try: |
| # Disable Ctrl+C |
| signal.signal(signal.SIGINT, signal.SIG_IGN) |
| |
| # StopGPDB - Check for GPDB connections first and fail if connections exist |
| if not self.check_database_stopped(): |
| logger.error('stop Database first') |
| sys.exit(1) |
| |
| # StartGPDB in Master only mode |
| self.start_master_only() |
| |
| try: |
| if not CheckSuperUser(self.dburl).run(): |
| raise MoveFilespaceError('gpfilespace requires database superuser privileges.') |
| |
| queries = self.prepare_tasks(); |
| |
| if queries is None: |
| self.stop_master_only() |
| sys.exit(1) |
| |
| except Exception, e: |
| self.stop_master_only() |
| raise |
| |
| try: |
| self.update_filespace_locations(queries); |
| except Exception, e: |
| logger.fatal("Fatal error happened when updating catalog, PLEASE RESTORE MASTER DATA DIRECTORY!") |
| raise |
| finally: |
| # Bring GPDB Offline |
| self.stop_master_only() |
| |
| finally: |
| # Enable Ctrl+C |
| signal.signal(signal.SIGINT, signal.default_int_handler) |
| |
| class MoveFilespace(Operation): |
| """ |
| Main class which configures the database to use a custom filespace |
| for temporary and transaction files. |
| """ |
| |
| def __init__(self, new_filespace_name, file_type, user, pswd): |
| self.user = user |
| self.pswd = pswd |
| self.new_filespace_name = new_filespace_name |
| self.file_type = file_type |
| |
| def start_master_only(self): |
| logger.info('Starting Greenplum Database in master only mode') |
| cmd = GpStart('Start Greenplum Database in master only mode', masterOnly=True) |
| cmd.run() |
| if cmd.get_results().rc != 0: |
| logger.error('Failed to start Greenplum Database in master only mode.') |
| cmd.validate() |
| |
| def stop_master_only(self): |
| logger.info('Stopping Greenplum Database in master only mode') |
| cmd = GpStop('Stop Greenplum Database in master only mode', masterOnly=True) |
| cmd.run() |
| if cmd.get_results().rc != 0: |
| logger.error('Failed to stop Greenplum Database in master only mode.') |
| cmd.validate() |
| |
| def start_database(self): |
| logger.info('Starting Greenplum Database') |
| cmd = GpStart('Start Greenplum Database') |
| cmd.run() |
| if cmd.get_results().rc != 0: |
| logger.error('Failed to start Greenplum Database.') |
| cmd.validate() |
| |
| def stop_database(self): |
| logger.info('Stopping Greenplum Database') |
| cmd = GpStop('Stop Greenplum Databse') |
| cmd.run() |
| if cmd.get_results().rc != 0: |
| logger.error('Failed to stop Greenplum Database.') |
| cmd.validate() |
| |
| def check_database_stopped(self): |
| try: |
| mode = get_local_db_mode(MASTER_DATA_DIR) |
| logger.info('Database was started in %s mode' % mode) |
| except Exception, e: |
| logger.info('Database might already be stopped.') |
| return True |
| |
| return False |
| |
| def execute(self): |
| |
| try: |
| #Disable Ctrl+C |
| signal.signal(signal.SIGINT,signal.SIG_IGN) |
| |
| #StopGPDB - Check for GPDB connections first and fail if connections exist |
| if not self.check_database_stopped(): |
| self.stop_database() |
| |
| #Restart the system |
| self.start_database() |
| self.stop_database() |
| |
| #StartGPDB in Master only mode |
| self.start_master_only() |
| |
| try: |
| |
| if not CheckSuperUser(dbconn.DbURL(username=self.user, password=self.pswd)).run(): |
| raise MoveFilespaceError('gpfilespace requires database superuser privileges.') |
| |
| gparray = GpArray.initFromCatalog(dbconn.DbURL(), utility=True) |
| |
| #CheckFilespace - Check if new filespace exists |
| if not CheckFilespace(self.new_filespace_name).run(): |
| raise MoveFilespaceError('Filespace %s does not exist' % self.new_filespace_name) |
| |
| #CheckFilespaceIsSame - Check if filespace is different from the old one |
| if CheckFilespaceIsSame(gparray, self.new_filespace_name, self.file_type).run(): |
| raise MoveFilespaceError('Filespace %s is same as the current filespace' % self.new_filespace_name) |
| except Exception, e: |
| #Bring GPDB Offline |
| self.stop_master_only() |
| #Restart the database |
| self.start_database() |
| raise |
| |
| #Bring GPDB Offline |
| if 'UTILITY' == get_local_db_mode(MASTER_DATA_DIR): |
| self.stop_master_only() |
| else: |
| raise MoveFilespaceError('Database state is invalid.') |
| |
| if not CheckConnectivity(gparray).run(): |
| raise MoveFilespaceError('Failed connectivity test') |
| |
| logger.info('Obtaining current filespace information') |
| #Find the filespace directory used for each segment |
| #query using pg_filespace, pg_filespace_entry, gp_segment_configuration |
| #If gp_transaction/temporary flat files exist and is not empty, then we know |
| #the filespace being used. Otherwise, we assume that it is the pg_system |
| #filespace by default |
| |
| cur_filespace_entries = GetFilespaceEntriesDict(GetCurrentFilespaceEntries(gparray, |
| self.file_type).run()).run() |
| new_filespace_entries = GetFilespaceEntriesDict(GetFilespaceEntries(gparray, |
| self.new_filespace_name).run()).run() |
| pg_system_filespace_entries = GetFilespaceEntriesDict(GetFilespaceEntries(gparray, |
| PG_SYSTEM_FILESPACE).run()).run() |
| |
| cur_filespace_name = gparray.getFileSpaceName(int(cur_filespace_entries[1][0])) |
| |
| logger.info('Obtaining segment information ...') |
| segments = gparray.getDbList() |
| |
| #MoveTemp/Trans files |
| |
| try: |
| operations = GetMoveOperationList(segments, |
| self.file_type, |
| self.new_filespace_name, |
| new_filespace_entries, |
| cur_filespace_entries, |
| pg_system_filespace_entries |
| ).run() |
| except Exception, e: |
| raise MoveFilespaceError('Failed to create operations list. %s' % str(e)) |
| |
| logger.info('Moving %s filespace from %s to %s ...' % (FileType.lookup[self.file_type], cur_filespace_name, self.new_filespace_name)) |
| |
| ParallelOperation(operations, NUM_WORKERS).run() |
| |
| try: |
| for operation in operations: |
| operation.get_ret() |
| except Exception, e: |
| logger.error('Failed to move files on remote host. %s' % str(e)) |
| try: |
| RollBackFilespaceChanges(segments, self.file_type, cur_filespace_name, cur_filespace_entries, new_filespace_entries, pg_system_filespace_entries).run() |
| except Exception, e: |
| raise MoveFilespaceError('Rollback Failed !') |
| |
| #Bring GPDB online in normal mode |
| self.start_database() |
| |
| finally: |
| #Enable Ctrl+C |
| signal.signal(signal.SIGINT,signal.default_int_handler) |
| |
| class CreateTempDirectories(Operation): |
| def __init__(self, data_dir): |
| self.directories_file = os.path.join(data_dir, 'gp_temporary_files_directories') |
| self.created_path_history = {} |
| |
| def execute(self): |
| # The temp directories file does not exist, nothing to do |
| if not os.path.exists(self.directories_file): |
| logger.info('temporary directory is set to default path') |
| return True |
| |
| logger.info('create temporary directories recorded in %s' % self.directories_file) |
| try: |
| with open(self.directories_file) as file: |
| for line in file: |
| path = line.strip() |
| # check the parent directories' permission |
| if os.path.exists(path): |
| # raise OSError('create temporary directory failed for %s(path already exists)' % path) |
| break |
| MakeDirectory.local('create temporary direcotry', path) |
| self.created_path_history[path] = True |
| |
| except Exception, e: |
| logger.error('create temporary directory failed. %s' % str(e)) |
| # cleanup the work, don't complain if this is an OSError. |
| for path in self.created_path_history: |
| try: |
| if self.created_path_history[path]: |
| RemoveFiles.local('remove temporary directory', path) |
| except Exception, e: |
| logger.error('temporary directory cannot be removed. %s' % str(e)) |
| return e |
| |
| return True |
| |
| class DeleteTempDirectories(Operation): |
| def __init__(self, data_dir): |
| self.directories_file = os.path.join(data_dir, 'gp_temporary_files_directories') |
| |
| def execute(self): |
| ret = True |
| # The temp directories file does not exist, nothing to do |
| if not os.path.exists(self.directories_file): |
| logger.info('temporary directory is set to default path') |
| return True |
| |
| logger.info('remove temporary directories recorded in %s' % self.directories_file) |
| with open(self.directories_file) as file: |
| for line in file: |
| path = line.strip() |
| try: |
| RemoveFiles.local('temporary directory cleanup', path) |
| except Exception, e: |
| logger.error('remove temporary directory failed. %s' % str(e)) |
| # return the exception |
| ret = e |
| |
| return ret |
| |
| def create_temporary_directories(host, dir): |
| logger.info('Try to create temporary directories') |
| op = RemoteOperation(CreateTempDirectories(dir), host) |
| op.run() |
| |
| try: |
| op.get_ret() |
| except Exception, e: |
| logger.error('create temporary directory failed. %s' % str(e)) |
| raise e |
| |
| def remove_temporary_directories(host, dir): |
| logger.info('Try to remove temporary directories') |
| op = RemoteOperation(DeleteTempDirectories(dir), host) |
| op.run() |
| |
| try: |
| op.get_ret() |
| except Exception, e: |
| logger.error('remove temporary directory failed. %s' % str(e)) |
| raise e |
| |
| def template_temporary_directories(data_dir, contentid): |
| directories_file = os.path.join(data_dir, 'gp_temporary_files_directories') |
| if not os.path.exists(directories_file): |
| logger.info('default temporary direcotry was used') |
| return |
| |
| directories = [] |
| with open(directories_file) as file: |
| for line in file: |
| path = line.strip() |
| # remove the contentid |
| contentid_pos = path.rfind(str(contentid)) |
| if contentid_pos == -1: |
| raise Exception('contentid is wrong in %s' % directories_file) |
| path = path[0:contentid_pos] |
| directories.append(path) |
| |
| os.remove(directories_file) |
| with open(directories_file, 'w' ) as file: |
| for d in directories: |
| file.write(d + '\n') |
| |
| |
| def update_temporary_directories(data_dir, contentid): |
| directories_file = os.path.join(data_dir, 'gp_temporary_files_directories') |
| if not os.path.exists(directories_file): |
| logger.info('default temporary direcotry was used') |
| return |
| |
| logger.info('updating temporary directory file') |
| directories = [] |
| with open(directories_file) as file: |
| for line in file: |
| path = line.strip() |
| # append the contentid |
| path = path + str(contentid) |
| directories.append(path) |
| |
| os.remove(directories_file) |
| with open(directories_file, 'w' ) as file: |
| for d in directories: |
| file.write(d + '\n') |
| |
| # create the temporary directories |
| for d in directories: |
| if os.path.exists(d): |
| logger.warn('temporary directory %s is already exists' % d) |
| |
| MakeDirectory.local('create temporary direcotry', d) |
| |