blob: a96ca4b4cc41b3f6c24e9bb79efed7f07fa49f0a [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Used to inject faults into the file replication code
#
#
# THIS IMPORT MUST COME FIRST
#
# import mainUtils FIRST to get python version check
from gppylib.mainUtils import *
from optparse import Option, OptionGroup, OptionParser, OptionValueError, SUPPRESS_USAGE
from gppylib.gpparseopts import OptParser, OptChecker
from gppylib.utils import toNonNoneString
from gppylib import gplog
from gppylib import gparray
from gppylib.commands import base
from gppylib.commands import unix
from gppylib.commands import gp
from gppylib.commands import pg
from gppylib.db import catalog
from gppylib.db import dbconn
from gppylib.system import configurationInterface, fileSystemInterface, osInterface
from gppylib import pgconf
from gppylib.testold.testUtils import testOutput
from gppylib.system.environment import GpMasterEnvironment
logger = gplog.get_default_logger()
#-------------------------------------------------------------------------
class GpInjectFaultProgram:
#
# Constructor:
#
# @param options the options as returned by the options parser
#
def __init__(self, options):
self.options = options
#
# Build the fault transition message. Fault options themselves will NOT be validated by the
# client -- the server will do that when we send the fault
#
def buildMessage(self) :
# note that we don't validate these strings -- if they contain newlines
# (and so mess up the transition protocol) then the server will error
result = ["faultInject"]
result.append(toNonNoneString(self.options.faultName))
result.append(toNonNoneString(self.options.type))
result.append(toNonNoneString(self.options.ddlStatement))
result.append(toNonNoneString(self.options.databaseName))
result.append(toNonNoneString(self.options.tableName))
result.append(toNonNoneString(self.options.numOccurrences))
result.append(toNonNoneString(self.options.sleepTimeSeconds))
return '\n'.join(result)
#
# build a message that will get status of the fault
#
def buildGetStatusMessage(self) :
# note that we don't validate this string then the server may error
result = ["getFaultInjectStatus"]
result.append(toNonNoneString(self.options.faultName))
return '\n'.join(result)
#
# return True if the segment matches the given role, False otherwise
#
def isMatchingRole(self, role, segment):
segmentRole = segment.getSegmentRole()
if role == "primary":
return segmentRole == 'p'
elif role == "mirror":
return segmentRole == 'm'
elif role == "primary_mirror":
return segmentRole == 'm' or segmentRole == 'p'
else:
raise ProgramArgumentValidationException("Invalid role specified: %s" % role)
#
# load the segments and filter to the ones we should target
#
def loadTargetSegments(self) :
targetHost = self.options.targetHost
targetRole = self.options.targetRole
targetDbId = self.options.targetDbId
if targetHost is None and targetDbId is None:
raise ProgramArgumentValidationException(\
"neither --host nor --seg_dbid specified. " \
"Exactly one should be specified.")
if targetHost is not None and targetDbId is not None:
raise ProgramArgumentValidationException(\
"both --host nor --seg_dbid specified. " \
"Exactly one should be specified.")
if targetHost is not None and targetRole is None:
raise ProgramArgumentValidationException(\
"--role not specified when --host is specified. " \
"Role is required when targeting a host.")
if targetDbId is not None and targetRole is not None:
raise ProgramArgumentValidationException(\
"--role specified when --seg_dbid is specified. " \
"Role should not be specified when targeting a single dbid.")
#
# load from master db
#
masterPort = self.options.masterPort
if masterPort is None:
gpEnv = GpMasterEnvironment(self.options.masterDataDirectory, False)
masterPort = gpEnv.getMasterPort()
conf = configurationInterface.getConfigurationProvider().initializeProvider(masterPort)
gpArray = conf.loadSystemConfig(useUtilityMode=True)
segments = gpArray.getDbList()
#
# prune gpArray according to filter settings
#
segments = [seg for seg in segments if seg.isSegmentQE()]
if targetHost is not None and targetHost != "ALL":
segments = [seg for seg in segments if seg.getSegmentHostName() == targetHost]
if targetDbId is not None:
segments = gpArray.getDbList()
dbId = int(targetDbId)
segments = [seg for seg in segments if seg.getSegmentDbId() == dbId]
if targetRole is not None:
segments = [seg for seg in segments if self.isMatchingRole(targetRole, seg)]
# only DOWN segments remaining? Error out
downSegments = [seg for seg in segments if seg.getSegmentStatus() != 'u']
if len(downSegments) > 0:
downSegStr = "\n Down Segment: "
raise ExceptionNoStackTraceNeeded(
"Unable to inject fault. At least one segment is marked as down in the database.%s%s" %
(downSegStr, downSegStr.join([str(downSeg) for downSeg in downSegments])))
return segments
# return True for sync, False for async
def getAndValidateIsSyncSetting(self):
syncMode = self.options.syncMode
if syncMode == "sync":
return True
elif syncMode == "async":
return False
raise ExceptionNoStackTraceNeeded("Invalid -m, --mode option %s" % syncMode)
#
# write string to a temporary file that will be deleted on completion
#
def writeToTempFile(self, str):
inputFile = fileSystemInterface.getFileSystemProvider().createNamedTemporaryFile()
inputFile.write(str)
inputFile.flush()
return inputFile
def injectFaults(self, segments, messageText):
inputFile = self.writeToTempFile(messageText)
logger.info("Injecting fault on %d segment(s)", len(segments))
testOutput("Injecting fault on %d segment(s)" % len(segments))
# run the command in serial to each target
for segment in segments :
logger.info("Injecting fault on %s", segment)
# if there is an error then an exception is raised by command execution
cmd = gp.SendFilerepTransitionMessage("Fault Injector", inputFile.name, \
segment.getSegmentPort(), base.LOCAL, segment.getSegmentHostName())
cmd.run(validateAfter=False)
# validate ourselves
if cmd.results.rc != 0:
raise ExceptionNoStackTraceNeeded("Injection Failed: %s" % cmd.results.stderr)
elif self.options.type == "status":
# server side prints nice success messages on status...so print it
str = cmd.results.stderr
if str.startswith("Success: "):
str = str.replace("Success: ", "", 1)
logger.info("%s", str)
inputFile.close()
def waitForFaults(self, segments, statusQueryText):
inputFile = self.writeToTempFile(statusQueryText)
segments = [seg for seg in segments]
sleepTimeSec = 0.115610199
sleepTimeMultipler = 1.5 # sleepTimeMultipler * sleepTimeMultipler^11 ~= 10
logger.info("Awaiting fault on %d segment(s)", len(segments))
while len(segments) > 0 :
logger.info("Sleeping %.2f seconds " % sleepTimeSec)
osInterface.getOsProvider().sleep(sleepTimeSec)
segmentsForNextPass = []
for segment in segments:
logger.info("Checking for fault completion on %s", segment)
cmd = gp.SendFilerepTransitionMessage.local("Fault Injector Status Check", inputFile.name, \
segment.getSegmentPort(), segment.getSegmentHostName())
resultStr = cmd.results.stderr.strip()
if resultStr == "Success: waitMore":
segmentsForNextPass.append(segment)
elif resultStr != "Success: done":
raise Exception("Unexpected result from server %s" % resultStr)
segments = segmentsForNextPass
sleepTimeSec = sleepTimeSec if sleepTimeSec > 7 else sleepTimeSec * sleepTimeMultipler
inputFile.close()
def isSyncableFaultType(self):
type = self.options.type
return type != "reset" and type != "status"
######
def run(self):
if self.options.masterPort is not None and self.options.masterDataDirectory is not None:
raise ProgramArgumentValidationException("both master port and master data directory options specified;" \
" at most one should be specified, or specify none to use MASTER_DATA_DIRECTORY environment variable")
isSync = self.getAndValidateIsSyncSetting()
messageText = self.buildMessage()
segments = self.loadTargetSegments()
# inject, maybe wait
self.injectFaults(segments, messageText)
if isSync and self.isSyncableFaultType() :
statusQueryText = self.buildGetStatusMessage()
self.waitForFaults(segments, statusQueryText)
logger.info("DONE")
return 0 # success -- exit code 0!
def cleanup(self):
pass
#-------------------------------------------------------------------------
@staticmethod
def createParser():
description = ("""
This utility is NOT SUPPORTED and is for internal-use only.
Used to inject faults into the file replication code.
""")
help = ["""
Return codes:
0 - Fault injected
non-zero: Error or invalid options
"""]
parser = OptParser(option_class=OptChecker,
description=' '.join(description.split()),
version='%prog version $Revision$')
parser.setHelp(help)
addStandardLoggingAndHelpOptions(parser, False)
# these options are used to determine the target segments
addTo = OptionGroup(parser, 'Target Segment Options: ')
parser.add_option_group(addTo)
addTo.add_option('-r', '--role', dest="targetRole", type='string', metavar="<role>",
help="Role of segments to target: primary, mirror, or primary_mirror")
addTo.add_option("-s", "--seg_dbid", dest="targetDbId", type="string", metavar="<dbid>",
help="The segment dbid on which fault should be set and triggered.")
addTo.add_option("-H", "--host", dest="targetHost", type="string", metavar="<host>",
help="The hostname on which fault should be set and triggered; pass ALL to target all hosts")
addTo = OptionGroup(parser, 'Master Connection Options')
parser.add_option_group(addTo)
addMasterDirectoryOptionForSingleClusterProgram(addTo)
addTo.add_option("-p", "--master_port", dest="masterPort", type="int", default=None,
metavar="<masterPort>",
help="DEPRECATED, use MASTER_DATA_DIRECTORY environment variable or -d option. " \
"The port number of the master database on localhost, " \
"used to fetch the segment configuration.")
addTo = OptionGroup(parser, 'Client Polling Options: ')
parser.add_option_group(addTo)
addTo.add_option('-m', '--mode', dest="syncMode", type='string', default="async",
metavar="<syncMode>",
help="Synchronization mode : sync (client waits for fault to occur)" \
" or async (client only sets fault request on server)")
# these options are used to build the message for the segments
addTo = OptionGroup(parser, 'Fault Options: ')
parser.add_option_group(addTo)
addTo.add_option('-y', '--type', dest="type", type='string', metavar="<type>",
help="fault type: sleep (insert sleep), fault (report fault to postmaster and fts prober), " \
"fatal (inject FATAL error), panic (inject PANIC error), error (inject ERROR), " \
"infinite_loop, data_curruption (corrupt data in memory and persistent media), " \
"suspend (suspend execution), resume (resume execution that was suspended), " \
"skip (inject skip i.e. skip checkpoint), " \
"memory_full (all memory is consumed when injected), " \
"reset (remove fault injection), status (report fault injection status), " \
"panic_suppress (inject suppressed PANIC in critical section), " \
"segv (inject a SEGV), " \
"interrupt (inject an Interrupt) ")
addTo.add_option("-z", "--sleep_time_s", dest="sleepTimeSeconds", type="int", default="10" ,
metavar="<sleepTime>",
help="For 'sleep' faults, the amount of time for the sleep. Defaults to %default." \
"Min Max Range is [0, 7200 sec] ")
addTo.add_option('-f', '--fault_name', dest="faultName", type='string', metavar="<name>",
help="fault name: " \
"postmaster (inject fault when new connection is accepted in postmaster), " \
"pg_control (inject fault when global/pg_control file is written), " \
"pg_xlog (inject fault when files in pg_xlog directory are written), " \
"start_prepare (inject fault during start prepare transaction), " \
"filerep_consumer (inject fault before data are processed, i.e. if mirror " \
"then before file operation is issued to file system, if primary " \
"then before mirror file operation is acknowledged to backend processes), " \
"filerep_consumer_verificaton (inject fault before ack verification data are processed on primary), " \
"filerep_change_tracking_compacting (inject fault when compacting change tracking log files), " \
"filerep_sender (inject fault before data are sent to network), " \
"filerep_receiver (inject fault after data are received from network), " \
"filerep_flush (inject fault before fsync is issued to file system), " \
"filerep_resync (inject fault while in resync when first relation is ready to be resynchronized), " \
"filerep_resync_in_progress (inject fault while resync is in progress), " \
"filerep_resync_worker (inject fault after write to mirror), " \
"filerep_resync_worker_read (inject fault before read required for resync), " \
"filerep_transition_to_resync (inject fault during transition to InResync before mirror re-create), " \
"filerep_transition_to_resync_mark_recreate (inject fault during transition to InResync before marking re-created), " \
"filerep_transition_to_resync_mark_completed (inject fault during transition to InResync before marking completed), " \
"filerep_transition_to_sync_begin (inject fault before transition to InSync begin), " \
"filerep_transition_to_sync (inject fault during transition to InSync), " \
"filerep_transition_to_sync_before_checkpoint (inject fault during transition to InSync before checkpoint is created), " \
"filerep_transition_to_sync_mark_completed (inject fault during transition to InSync before marking completed), " \
"filerep_transition_to_change_tracking (inject fault during transition to InChangeTracking), " \
"checkpoint (inject fault before checkpoint is taken), " \
"change_tracking_compacting_report (report if compacting is in progress), " \
"change_tracking_disable (inject fault before fsync to Change Tracking log files), " \
"transaction_abort_after_distributed_prepared (abort prepared transaction), " \
"transaction_commit_pass1_from_create_pending_to_created, " \
"transaction_commit_pass1_from_drop_in_memory_to_drop_pending, " \
"transaction_commit_pass1_from_aborting_create_needed_to_aborting_create, " \
"transaction_abort_pass1_from_create_pending_to_aborting_create, " \
"transaction_abort_pass1_from_aborting_create_needed_to_aborting_create, " \
"transaction_commit_pass2_from_drop_in_memory_to_drop_pending, " \
"transaction_commit_pass2_from_aborting_create_needed_to_aborting_create, " \
"transaction_abort_pass2_from_create_pending_to_aborting_create, " \
"transaction_abort_pass2_from_aborting_create_needed_to_aborting_create, " \
"finish_prepared_transaction_commit_pass1_from_create_pending_to_created, " \
"finish_prepared_transaction_commit_pass2_from_create_pending_to_created, " \
"finish_prepared_transaction_abort_pass1_from_create_pending_to_aborting_create, " \
"finish_prepared_transaction_abort_pass2_from_create_pending_to_aborting_create, " \
"finish_prepared_transaction_commit_pass1_from_drop_in_memory_to_drop_pending, " \
"finish_prepared_transaction_commit_pass2_from_drop_in_memory_to_drop_pending, " \
"finish_prepared_transaction_commit_pass1_aborting_create_needed, " \
"finish_prepared_transaction_commit_pass2_aborting_create_needed, " \
"finish_prepared_transaction_abort_pass1_aborting_create_needed, " \
"finish_prepared_transaction_abort_pass2_aborting_create_needed, " \
"twophase_transaction_commit_prepared (inject fault before transaction commit is inserted in xlog), " \
"twophase_transaction_abort_prepared (inject fault before transaction abort is inserted in xlog), " \
"dtm_broadcast_prepare (inject fault after prepare broadcast), " \
"dtm_broadcast_commit_prepared (inject fault after commit broadcast), " \
"dtm_broadcast_abort_prepared (inject fault after abort broadcast), " \
"dtm_xlog_distributed_commit (inject fault after distributed commit was inserted in xlog), " \
"fault_before_pending_delete_relation_entry (inject fault before putting pending delete relation entry, " \
"fault_before_pending_delete_database_entry (inject fault before putting pending delete database entry, " \
"fault_before_pending_delete_tablespace_entry (inject fault before putting pending delete tablespace entry, " \
"fault_before_pending_delete_filespace_entry (inject fault before putting pending delete filespace entry, " \
"dtm_init (inject fault before initializing dtm), " \
"end_prepare_two_phase_sleep (inject sleep after two phase file creation), " \
"segment_transition_request (inject fault after segment receives state transition request), " \
"segment_probe_response (inject fault after segment is probed by FTS), " \
"sync_persistent_table (inject fault to sync persistent table to disk), " \
"xlog_insert (inject fault to skip insert record into xlog), " \
"local_tm_record_transaction_commit (inject fault for local transactions after transaction commit is recorded and flushed in xlog ), " \
"malloc_failure (inject fault to simulate memory allocation failure), " \
"transaction_abort_failure (inject fault to simulate transaction abort failure), " \
"update_committed_eof_in_persistent_table (inject fault before committed EOF is updated in gp_persistent_relation_node for Append Only segment files), " \
"fault_during_exec_dynamic_table_scan (inject fault during scanning of a partition), " \
"internal_flush_error (inject an error during internal_flush), " \
"exec_simple_query_end_command (inject fault before EndCommand in exec_simple_query), " \
"multi_exec_hash_large_vmem (allocate large vmem using palloc inside MultiExecHash to attempt to exceed vmem limit), " \
"execsort_before_sorting (inject fault in nodeSort after receiving all tuples and before sorting), " \
"workfile_cleanup_set (inject fault in workfile manager cleanup set)" \
"execsort_mksort_mergeruns (inject fault in MKSort during the mergeruns phase), " \
"cdb_copy_start_after_dispatch (inject fault in cdbCopyStart after dispatch), " \
"fault_in_background_writer_main (inject fault in BackgroundWriterMain), " \
"exec_hashjoin_new_batch (inject fault before switching to a new batch in Hash Join), " \
"analyze_subxact_error (inject an error during analyze)," \
"opt_task_allocate_string_buffer (inject fault while allocating string buffer), " \
"runaway_cleanup (inject fault before starting the cleanup for a runaway query)" \
"connection_fail_after_gang_creation (inject fault after gang thread creation, set connection null)" \
"create_cdb_dispath_result_object (inject fault when create cdb dispatch result object, set out of memeory)" \
"worker_manager_submit_job (inject fault when worker manager submit job , set error)" \
"fail_qe_after_connection (inject fault after connecting to QD, sleep to wait QE fail)" \
"fail_qe_when_do_query (inject fault when QE actually working, set error)" \
"fail_qe_when_begin_parquet_scan (inject fault when begin scan parquet table, set error)"\
"fail_qe_when_parquet_get_next (inject fault when get next, set error)"\
"all (affects all faults injected, used for 'status' and 'reset'), ")
addTo.add_option("-c", "--ddl_statement", dest="ddlStatement", type="string",
metavar="ddlStatement",
help="The DDL statement on which fault should be set and triggered " \
"(i.e. create_database, drop_database, create_table, drop_table)")
addTo.add_option("-D", "--database_name", dest="databaseName", type="string",
metavar="databaseName",
help="The database name on which fault should be set and triggered.")
addTo.add_option("-t", "--table_name", dest="tableName", type="string",
metavar="tableName",
help="The table name on which fault should be set and triggered.")
addTo.add_option("-o", "--occurrence", dest="numOccurrences", type="int", default=1,
metavar="numOccurrences",
help="The number of occurrence of the DDL statement with the database name " \
"and the table name before fault is triggered. Defaults to %default. Max is 1000. " \
"Fault is triggered always if set to '0'. ")
parser.set_defaults()
return parser
@staticmethod
def createProgram(options, args):
if len(args) > 0 :
raise ProgramArgumentValidationException(\
"too many arguments: only options may be specified")
return GpInjectFaultProgram(options)