blob: ec26783bdbc36b90c567b057576edfe05ad52693 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from gppylib import gplog
from gppylib.commands.base import Command, WorkerPool
from gppylib.commands.gp import get_masterdatadir, SendFilerepVerifyMessage
from gppylib.commands.unix import Scp
from gppylib.db import dbconn
from gppylib.db.dbconn import UnexpectedRowsError
from gppylib.gparray import GpArray, FAULT_STRATEGY_LABELS, FAULT_STRATEGY_FILE_REPLICATION
from gppylib.mainUtils import ExceptionNoStackTraceNeeded
from gppylib.operations import Operation
from gppylib.operations.utils import ParallelOperation, RemoteOperation
from gppylib.operations.unix import CheckDir, MakeDir, RemoveTree
# TODO: None of these operations are using master_port to differentiate between various clusters
# Defer until a gpverify feature request comes in.
logger = gplog.get_default_logger()
class VerifyFilerep(Operation):
def __init__(self, content, token, full, verify_file, verify_dir, ignore_dir, ignore_file, results_level, batch_default):
self.content = content
self.token = token
self.full = full
self.verify_file = verify_file
self.verify_dir = verify_dir
self.ignore_dir = ignore_dir
self.ignore_file = ignore_file
self.results_level = results_level
self.batch_default = batch_default
def execute(self):
try:
ValidateVerificationEntry(token = self.token).run()
raise ExceptionNoStackTraceNeeded('Token "%s" has already been used. Use "gpverify --clean --token %s" to reclaim this token.' % (self.token, self.token))
except TokenNotFound:
pass
TriggerFilerepVerifyMessages(content = self.content,
token = self.token,
full = self.full,
verify_file = self.verify_file,
verify_dir = self.verify_dir,
ignore_dir = self.ignore_dir,
ignore_file = self.ignore_file,
results_level = self.results_level,
batch_default = self.batch_default).run()
verify_type = VerificationType.translate(self.full, self.verify_file, self.verify_dir)
verify_content = self.content if self.content is not None else -1
InsertVerificationEntry(token = self.token,
type = verify_type,
content = verify_content).run()
logger.info('Verification %s has started.' % self.token)
# TODO: If a particular segment's gp_primarymirror fails, then here we're considering that a failure and no
# record gets inserted into gp_verification_history. Subsequent invocations of VerifyFilerep will fail because
# verifications are already running on the other N - 1 segments.
# A gp_verification_history schema which supports multiple contents would solve this. This would allow per-content
# verifications to be tracked truthfully, as opposed to the current, lossy approach.
class TriggerFilerepVerifyMessages(Operation):
def __init__(self, content, token, batch_default, full=None, verify_file=None, verify_dir=None,
abort=None, suspend=None, resume=None, ignore_dir=None, ignore_file=None,
results=None, results_level=None):
self.content = content
self.token = token
self.full = full
self.verify_file = verify_file
self.verify_dir = verify_dir
self.abort = abort
self.suspend = suspend
self.resume = resume
self.ignore_dir = ignore_dir
self.ignore_file = ignore_file
self.results = results
self.results_level = results_level
self.batch_default = batch_default
def execute(self):
"""
Sends arbitrary gp_primarymirror requests to the backend processes defined.
"""
to_trigger = ValidateVerification(content = self.content).run()
logger.info('Sending gp_primarymirror requests...')
pool = WorkerPool(min(len(to_trigger), self.batch_default))
for pseg in to_trigger:
host, port = pseg.getSegmentHostName(), pseg.getSegmentPort()
cmd = SendFilerepVerifyMessage(name = 'verify %s' % host, host = host, port = port,
token = self.token,
full = self.full,
verify_file = self.verify_file,
verify_dir = self.verify_dir,
abort = self.abort,
suspend = self.suspend,
resume = self.resume,
ignore_dir = self.ignore_dir,
ignore_file = self.ignore_file,
results = self.results,
results_level = self.results_level)
logger.debug("Sending request to %s:%d" % (host, port))
pool.addCommand(cmd)
logger.info('Waiting for gp_primarymirror commands to complete...')
pool.wait_and_printdots(len(to_trigger))
for cmd in pool.getCompletedItems():
res = cmd.get_results()
if not res.wasSuccessful():
logger.error('Failed to send gp_primarymirror message to %s:%s' % (cmd.host, cmd.port))
logger.error('Error: %s' % res.stderr)
raise TriggerGpPrimaryMirrorFailure()
logger.info('gp_primarymirror messages have been triggered succesfully.')
class TriggerGpPrimaryMirrorFailure(Exception): pass
class ValidateVerification(Operation):
def __init__(self, content, primaries_only = True):
"""
content and primaries_only dictate which portions of the gparray should
be returned. Their effects are cumulative, meaning:
content = None and primaries_only = False => all QEs
content = None and primaries_only = True => all primary segments
content = x and primaries_only = False => all segments with content x
content = x and primaries_only = True => the primary with content x
"""
self.content = content if content >= 0 else None
self.primaries_only = primaries_only
def execute(self):
dburl = dbconn.DbURL()
gparray = GpArray.initFromCatalog(dburl)
my_fault_strategy = gparray.getFaultStrategy()
if my_fault_strategy != FAULT_STRATEGY_FILE_REPLICATION:
raise NoMirroringError('Fault strategy %s does not support mirror verification.' % FAULT_STRATEGY_LABELS[my_fault_strategy])
if self.content is not None:
contents = set( [seg.getSegmentContentId() for seg in gparray.getDbList()] )
if self.content not in contents:
raise InvalidContentIDError(self.content)
logger.info('Validating target contents...')
to_verify = [x for x in gparray.getDbList() if x.isSegmentQE()]
if self.content is not None:
to_verify = [x for x in to_verify if x.getSegmentContentId() == self.content]
if self.primaries_only:
to_verify = [x for x in to_verify if x.isSegmentPrimary(current_role=True)]
return to_verify
class NoMirroringError(Exception): pass
class InvalidContentIDError(Exception):
def __init__(self, content):
Exception.__init__(self, "%d is not a valid content ID." % content)
class AbortVerification(Operation):
def __init__(self, token, batch_default):
self.token = token
self.batch_default = batch_default
def execute(self):
entry = ValidateVerificationEntry(token = self.token).run()
if entry['verdone']:
raise WrongStateError("Only unfinished verification tasks may be aborted.")
TriggerFilerepVerifyMessages(content = entry['vercontent'],
batch_default = self.batch_default,
token = self.token,
abort = True).run()
UpdateVerificationEntry(token = self.token,
state = VerificationState.ABORTED,
done = True).run()
logger.info('Verification %s has been aborted.' % self.token)
class SuspendVerification(Operation):
def __init__(self, token, batch_default):
self.token = token
self.batch_default = batch_default
def execute(self):
entry = ValidateVerificationEntry(token = self.token).run()
if entry['verstate'] != VerificationState.RUNNING:
raise WrongStateError("Only running verification tasks may be resumed.")
TriggerFilerepVerifyMessages(content = entry['vercontent'],
batch_default = self.batch_default,
token = self.token,
suspend = True).run()
UpdateVerificationEntry(token = self.token,
state = VerificationState.SUSPENDED).run()
logger.info('Verification %s has been suspended.' % self.token)
class ResumeVerification(Operation):
def __init__(self, token, batch_default):
self.token = token
self.batch_default = batch_default
def execute(self):
entry = ValidateVerificationEntry(token = self.token).run()
if entry['verstate'] != VerificationState.SUSPENDED:
raise WrongStateError("Only suspended verification tasks may be resumed.")
TriggerFilerepVerifyMessages(content = entry['vercontent'],
batch_default = self.batch_default,
token = self.token,
resume = True).run()
UpdateVerificationEntry(token = self.token,
state = VerificationState.RUNNING).run()
logger.info('Verification %s has been resumed.' % self.token)
class CleanDoneVerifications(Operation):
def __init__(self, batch_default):
self.batch_default = batch_default
def execute(self):
tokens = []
entries = GetAllVerifications().run()
entries = [entry for entry in entries if entry['verdone']]
if len(entries) == 0:
logger.info("There are no completed verifications to clean up.")
return
for entry in entries:
tokens.append(entry['vertoken'])
CleanVerification(token = entry['vertoken'],
batch_default = self.batch_default).run()
logger.info("The following verifications have been cleaned up: %s" % ",".join(tokens))
class CleanVerification(Operation):
def __init__(self, token, batch_default):
self.token = token
self.batch_default = batch_default
def execute(self):
entry = ValidateVerificationEntry(token = self.token).run()
if not entry['verdone']:
raise WrongStateError("Only finished verification tasks may be cleaned up.")
path = os.path.join(get_masterdatadir(), 'pg_verify', self.token)
Command('cleanup', 'rm -rf %s' % path).run(validateAfter=True)
#RemoveTree(path).run()
to_clean = ValidateVerification(content = entry['vercontent'],
primaries_only = False).run()
pool = WorkerPool(min(len(to_clean), self.batch_default))
for seg in to_clean:
host = seg.getSegmentHostName()
path = os.path.join(seg.getSegmentDataDirectory(), 'pg_verify', "*%s*" % self.token)
cmd = Command('cleanup', 'rm -f %s' % path, remoteHost=host)
pool.addCommand(cmd)
logger.info('Waiting for clean commands to complete...')
pool.wait_and_printdots(len(to_clean))
for cmd in pool.getCompletedItems():
res = cmd.get_results()
if not res.wasSuccessful():
logger.error('Failed to send cleanup on %s' % cmd.host)
logger.error('Error: %s' % res.stderr)
raise CleanVerificationError()
RemoveVerificationEntry(token = self.token).run()
logger.info('Verification %s has been cleaned.' % self.token)
class CleanVerificationError(Exception): pass
class FinalizeAllVerifications(Operation):
def __init__(self, batch_default):
self.batch_default = batch_default
def execute(self):
ret = []
entries = GetAllVerifications().run()
for entry in entries:
updated_entry = FinalizeVerification(token = entry['vertoken'],
batch_default = self.batch_default).run()
ret.append(updated_entry)
return ret
class FinalizeVerification(Operation):
def __init__(self, token, batch_default):
self.token = token
self.batch_default = batch_default
def execute(self):
entry = ValidateVerificationEntry(token = self.token).run()
to_inspect = ValidateVerification(content = entry['vercontent']).run()
state_dict = ValidateCompletion(token = self.token,
to_validate = to_inspect,
batch_default = self.batch_default).run()
incomplete = state_dict[VerificationState.RUNNING]
if len(incomplete) > 0:
# TODO: --force to consolidate files despite ongoing
logger.error('One or more content verifications is still in progress: %s' % incomplete)
return entry
GatherResults(master_datadir = get_masterdatadir(),
content = entry['vercontent'],
token = self.token,
batch_default = self.batch_default).run()
state = VerificationState.SUCCEEDED
mismatch = False
aborted = state_dict[VerificationState.ABORTED]
failed = state_dict[VerificationState.FAILED]
if len(failed) > 0: # any FAILED trumps ABORTED
state = VerificationState.FAILED
mismatch = True
logger.warn('One or more contents for verification %s were marked FAILED: %s' % (self.token, failed))
elif len(aborted) > 0:
state = VerificationState.ABORTED
logger.warn('One or more contents for verification %s were marked ABORTED: %s' % (self.token, aborted))
else:
logger.info('Verification %s completed successfully' % self.token)
if not entry['verdone']:
UpdateVerificationEntry(token = self.token,
state = state,
mismatch = mismatch,
done = True).run()
entry.update({ 'vermismatch' : mismatch,
'verdone' : True,
'verstate' : state })
return entry
class GatherResults(Operation):
def __init__(self, master_datadir, token, content, batch_default):
self.master_datadir = master_datadir
self.token = token
self.content = content
self.batch_default = batch_default
def execute(self):
logger.info('Gathering results of verification %s...' % self.token)
to_gather = ValidateVerification(content = self.content,
primaries_only = False).run()
dest_base = os.path.join(self.master_datadir, 'pg_verify', self.token)
if CheckDir(dest_base).run():
# TODO: if end user has mucked around with artifacts on master, a regathering may
# be needed; perhaps, a --force option to accompany --results?
return
MakeDir(dest_base).run()
pool = WorkerPool(min(len(to_gather), self.batch_default))
for seg in to_gather:
host = seg.getSegmentHostName()
content = seg.getSegmentContentId()
role = seg.getSegmentRole()
src = os.path.join(seg.getSegmentDataDirectory(), "pg_verify", "*%s*" % self.token)
dest = os.path.join(dest_base, str(content), str(role))
MakeDir(dest).run()
cmd = Scp('consolidate', srcFile=src, srcHost=host, dstFile=dest)
pool.addCommand(cmd)
logger.info('Waiting for scp commands to complete...')
pool.wait_and_printdots(len(to_gather))
pool.check_results()
dest = os.path.join(dest_base, 'verification_%s.fix' % self.token)
with open(dest, 'w') as output:
for seg in to_gather:
content = seg.getSegmentContentId()
role = seg.getSegmentRole()
src = os.path.join(dest_base, str(content), str(role), 'verification_%s.fix' % self.token)
with open(src, 'r') as input:
output.writelines(input.readlines())
class ValidateCompletion(Operation):
def __init__(self, token, to_validate, batch_default):
self.token = token
self.to_validate = to_validate
self.batch_default = batch_default
def execute(self):
state_dict = { VerificationState.RUNNING: [],
VerificationState.SUCCEEDED: [],
VerificationState.ABORTED: [],
VerificationState.FAILED: [] }
operations = []
for pseg in self.to_validate:
operations.append(RemoteOperation(ValidateResultFile(token = self.token,
datadir = pseg.getSegmentDataDirectory(),
content = pseg.getSegmentContentId()),
pseg.getSegmentHostName()))
ParallelOperation(operations, self.batch_default).run()
for remote in operations:
state = remote.get_ret()
state_dict[state].append(remote.operation.content)
return state_dict
class ValidateResultFile(Operation):
def __init__(self, token, datadir, content):
self.token = token
self.datadir = datadir
self.content = content
self.translate = { 'SUCCESS': VerificationState.SUCCEEDED,
'FAILED': VerificationState.FAILED,
'ABORT': VerificationState.ABORTED }
def execute(self):
path = os.path.join(self.datadir, "pg_verify", "verification_%s.result" % self.token)
with open(path, 'r') as f:
phrase = f.readline().strip()
return self.translate.get(phrase, VerificationState.RUNNING)
class VerificationType:
FULL, FILE, DIR = range(3)
lookup = ['FULL', 'FILE', 'DIR']
@staticmethod
def translate(full, file, dir):
type_tuple = (full, file, dir) # tuple is ordered to emulate the enum
type_flags = [int(bool(type)) for type in type_tuple] # list of 1's and 0's where 1's denote truth
if sum(type_flags) != 1:
raise InvalidVerificationType()
return type_flags.index(1) # index of 1 in type_tuple will correspond to desired enum value
class InvalidVerificationType(Exception): pass
# TODO: This is disgusting. Python needs enum! with __str__! See pypi Enum package.
class VerificationState:
RUNNING, SUSPENDED, ABORTED, FAILED, SUCCEEDED = range(5)
lookup = [ 'RUNNING', 'SUSPENDED', 'ABORTED', 'FAILED', 'SUCCEEDED' ]
class WrongStateError(Exception): pass
class ValidateVerificationEntry(Operation):
SELECT_VERIFICATION_ENTRY = """
select vertoken, vertype, vercontent, verstarttime, verstate, verdone, verendtime, vermismatch
from gp_verification_history where vertoken = '%s';
"""
def __init__(self, token):
self.token = token
def execute(self):
dburl = dbconn.DbURL()
query = self.SELECT_VERIFICATION_ENTRY % self.token
with dbconn.connect(dburl) as conn:
try:
tuple = dbconn.execSQLForSingletonRow(conn, query)
except UnexpectedRowsError, e:
if e.actual == 0:
raise TokenNotFound(self.token)
raise
# TODO: execSQL or pygresql should be able to do this for us
ret = { 'vertoken': tuple[0],
'vertype': tuple[1],
'vercontent': tuple[2],
'verstarttime': tuple[3],
'verstate': tuple[4],
'verdone': tuple[5],
'verendtime': tuple[6],
'vermismatch': tuple[7] }
logger.debug("ValidateVerificationEntry: %s" % ret)
return ret
class TokenNotFound(Exception):
def __init__(self, token):
Exception.__init__(self, "Token %s was not found." % token)
class InsertVerificationEntry(Operation):
INSERT_VERIFICATION_ENTRY = """
insert into gp_verification_history
(vertoken, vertype, vercontent, verstarttime, verstate, verdone, verendtime, vermismatch)
values ('%s', %d, %d, now(), %d, false, now(), false);
"""
def __init__(self, token, type, content):
self.token = token
self.type = type
self.content = content
def execute(self):
dburl = dbconn.DbURL()
query = self.INSERT_VERIFICATION_ENTRY % (self.token, self.type, self.content, VerificationState.RUNNING)
with dbconn.connect(dburl, allowSystemTableMods='dml') as conn:
dbconn.execSQL(conn, query)
conn.commit()
class UpdateVerificationEntry(Operation):
UPDATE_VERIFICATION_ENTRY = "update gp_verification_history set verstate = %d, verdone = %s, vermismatch = %s, verendtime = now() where vertoken = '%s';"
def __init__(self, token, state, done=False, mismatch=False):
self.token = token
self.state = state
self.done = done
self.mismatch = mismatch
def execute(self):
dburl = dbconn.DbURL()
query = self.UPDATE_VERIFICATION_ENTRY % (self.state, self.done, self.mismatch, self.token)
with dbconn.connect(dburl, allowSystemTableMods='dml') as conn:
dbconn.execSQL(conn, query)
conn.commit()
class RemoveVerificationEntry(Operation):
REMOVE_VERIFICATION_ENTRY = "delete from gp_verification_history where vertoken = '%s';"
def __init__(self, token):
self.token = token
def execute(self):
dburl = dbconn.DbURL()
query = self.REMOVE_VERIFICATION_ENTRY % self.token
with dbconn.connect(dburl, allowSystemTableMods='dml') as conn:
dbconn.execSQL(conn, query)
conn.commit()
class GetAllVerifications(Operation):
SELECT_ALL_VERIFICATIONS = """
select vertoken, vertype, vercontent, verstarttime, verstate, verdone, verendtime, vermismatch
from gp_verification_history order by verstarttime ASC;
"""
def __init__(self): pass
def execute(self):
ret = []
dburl = dbconn.DbURL()
with dbconn.connect(dburl) as conn:
# TODO: improve execSQL APIs to avoid need to use cursor here for such a simple task
cursor=conn.cursor()
cursor.execute(self.SELECT_ALL_VERIFICATIONS)
res = cursor.fetchall()
cursor.close()
for tuple in res:
# TODO: execSQL or pygresql should be able to do this for us
ret.append({ 'vertoken': tuple[0],
'vertype': tuple[1],
'vercontent': tuple[2],
'verstarttime': tuple[3],
'verstate': tuple[4],
'verdone': tuple[5],
'verendtime': tuple[6],
'vermismatch': tuple[7] })
return ret