blob: 5a19fa9a0a5cfebc3b71c9a4a5f6570b713a5db9 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This file defines the interface that can be used to fetch and update system
configuration information.
"""
import os, copy
from gppylib.gplog import *
from gppylib.utils import checkNotNone
from gppylib.system.configurationInterface import *
from gppylib.system.ComputeCatalogUpdate import ComputeCatalogUpdate
from gppylib.gparray import GpArray, GpDB, InvalidSegmentConfiguration
from gppylib import gparray
from gppylib.db import dbconn
from gppylib.commands.gp import get_local_db_mode
logger = get_default_logger()
class GpConfigurationProviderUsingGpdbCatalog(GpConfigurationProvider) :
"""
An implementation of GpConfigurationProvider will provide functionality to
fetch and update gpdb system configuration information (as stored in the
database)
Note that the client of this is assuming that the database data is not
changed by another party between the time segment data is loaded and when it
is updated
"""
def __init__(self):
self.__masterDbUrl = None
def initializeProvider( self, masterPort ) :
"""
Initialize the provider to get information from the given master db, if
it chooses to get its data from the database
returns self
"""
checkNotNone("masterPort", masterPort)
dbUrl = dbconn.DbURL(port=masterPort, dbname='template1')
self.__masterDbUrl = dbUrl
return self
def loadSystemConfig( self, useUtilityMode ) :
"""
Load all segment information from the configuration source.
Returns a new GpArray object
"""
# ensure initializeProvider() was called
checkNotNone("masterDbUrl", self.__masterDbUrl)
logger.info("Obtaining Segment details from master...")
array = GpArray.initFromCatalog(self.__masterDbUrl, useUtilityMode)
if get_local_db_mode(array.master.getSegmentDataDirectory()) != 'UTILITY':
logger.debug("Validating configuration...")
if not array.is_array_valid():
raise InvalidSegmentConfiguration(array)
return array
def sendPgElogFromMaster( self, msg, sendAlerts):
"""
Send a message from the master database using select pg_elog ...
"""
# ensure initializeProvider() was called
checkNotNone("masterDbUrl", self.__masterDbUrl)
conn = None
try:
conn = dbconn.connect(self.__masterDbUrl, utility=True)
dbconn.execSQL(conn, "SELECT GP_ELOG(" +
self.__toSqlCharValue(msg) + "," +
("true" if sendAlerts else "false") + ")")
finally:
if conn:
conn.close()
def updateSystemConfig( self, gpArray, textForConfigTable, dbIdToForceMirrorRemoveAdd, useUtilityMode, allowPrimary) :
"""
Update the configuration for the given segments in the underlying
configuration store to match the current values
Also resets any dirty bits on saved/updated objects
@param textForConfigTable label to be used when adding to segment configuration history
@param dbIdToForceMirrorRemoveAdd a map of dbid -> True for mirrors for which we should force updating the mirror
@param useUtilityMode True if the operations we're doing are expected to run via utility moed
@param allowPrimary True if caller authorizes add/remove primary operations (e.g. gpexpand)
"""
# ensure initializeProvider() was called
checkNotNone("masterDbUrl", self.__masterDbUrl)
logger.debug("Validating configuration changes...")
if not gpArray.is_array_valid():
logger.critical("Configuration is invalid")
raise InvalidSegmentConfiguration(gpArray)
conn = dbconn.connect(self.__masterDbUrl, useUtilityMode, allowSystemTableMods='dml')
dbconn.execSQL(conn, "BEGIN")
# compute what needs to be updated
update = ComputeCatalogUpdate(gpArray, dbIdToForceMirrorRemoveAdd, useUtilityMode, allowPrimary)
update.validate()
# put the mirrors in a map by content id so we can update them later
mirror_map = {}
for seg in update.mirror_to_add:
mirror_map[ seg.getSegmentContentId() ] = seg
# reset dbId of new primary and mirror segments to -1
# before invoking the operations which will assign them new ids
for seg in update.primary_to_add:
seg.setSegmentDbId(-1)
for seg in update.mirror_to_add:
seg.setSegmentDbId(-1)
# remove mirror segments (e.g. for gpexpand rollback)
for seg in update.mirror_to_remove:
self.__updateSystemConfigRemoveMirror(conn, seg, textForConfigTable)
# remove primary segments (e.g for gpexpand rollback)
for seg in update.primary_to_remove:
self.__updateSystemConfigRemovePrimary(conn, seg, textForConfigTable)
# add new primary segments
for seg in update.primary_to_add:
self.__updateSystemConfigAddPrimary(conn, gpArray, seg, textForConfigTable, mirror_map)
# add new mirror segments
for seg in update.mirror_to_add:
self.__updateSystemConfigAddMirror(conn, gpArray, seg, textForConfigTable)
# remove and add mirror segments necessitated by catalog attribute update
for seg in update.mirror_to_remove_and_add:
self.__updateSystemConfigRemoveAddMirror(conn, gpArray, seg, textForConfigTable)
# apply updates to existing segments
for seg in update.segment_to_update:
originalSeg = update.dbsegmap.get(seg.getSegmentDbId())
self.__updateSystemConfigUpdateSegment(conn, gpArray, seg, originalSeg, textForConfigTable)
# apply update to fault strategy
if gpArray.getStrategyAsLoadedFromDb() != gpArray.getFaultStrategy():
self.__updateSystemConfigFaultStrategy(conn, gpArray)
# commit changes
logger.debug("Committing configuration table changes")
dbconn.execSQL(conn, "COMMIT")
conn.close()
gpArray.setStrategyAsLoadedFromDb( [gpArray.getFaultStrategy()])
gpArray.setSegmentsAsLoadedFromDb([seg.copy() for seg in gpArray.getDbList()])
def __updateSystemConfigRemoveMirror(self, conn, seg, textForConfigTable):
"""
Remove a mirror segment currently in gp_segment_configuration
but not present in the goal configuration and record our action
in gp_configuration_history.
"""
dbId = seg.getSegmentDbId()
self.__callSegmentRemoveMirror(conn, seg)
self.__insertConfigHistory(conn, dbId, "%s: removed mirror segment configuration" % textForConfigTable)
def __updateSystemConfigRemovePrimary(self, conn, seg, textForConfigTable):
"""
Remove a primary segment currently in gp_segment_configuration
but not present in the goal configuration and record our action
in gp_configuration_history.
"""
dbId = seg.getSegmentDbId()
self.__callSegmentRemove(conn, seg)
self.__insertConfigHistory(conn, dbId, "%s: removed primary segment configuration" % textForConfigTable)
def __updateSystemConfigAddPrimary(self, conn, gpArray, seg, textForConfigTable, mirror_map):
"""
Add a primary segment specified in our goal configuration but
which is missing from the current gp_segment_configuration table
and record our action in gp_configuration_history.
"""
# lookup the mirror (if any) so that we may correct its content id
mirrorseg = mirror_map.get( seg.getSegmentContentId() )
# add the new segment
dbId = self.__callSegmentAdd(conn, gpArray, seg)
# update the segment mode, status and replication port
self.__updateSegmentModeStatus(conn, seg)
if gpArray.get_mirroring_enabled() == True:
self.__updateSegmentReplicationPort(conn, seg)
# get the newly added segment's content id
# MPP-12393 et al WARNING: there is an unusual side effect going on here.
# Although gp_add_segment() executed by __callSegmentAdd() above returns
# the dbId of the new row in gp_segment_configuration, the following
# select from gp_segment_configuration can return 0 rows if the updates
# done by __updateSegmentModeStatus() and/or __updateSegmentReplicationPort()
# are not done first. Don't change the order of these operations unless you
# understand why gp_add_segment() behaves as it does.
sql = "select content from pg_catalog.gp_segment_configuration where dbId = %s" % self.__toSqlIntValue(seg.getSegmentDbId())
logger.debug(sql)
sqlResult = self.__fetchSingleOutputRow(conn, sql)
contentId = int(sqlResult[0])
# Set the new content id for the primary as well the mirror if present.
seg.setSegmentContentId(contentId)
if mirrorseg is not None:
mirrorseg.setSegmentContentId(contentId)
self.__insertConfigHistory(conn, dbId, "%s: inserted primary segment configuration with contentid %s" % (textForConfigTable, contentId))
def __updateSystemConfigAddMirror(self, conn, gpArray, seg, textForConfigTable):
"""
Add a mirror segment specified in our goal configuration but
which is missing from the current gp_segment_configuration table
and record our action in gp_configuration_history.
"""
dbId = self.__callSegmentAddMirror(conn, gpArray, seg)
self.__updateSegmentModeStatus(conn, seg)
self.__insertConfigHistory(conn, dbId, "%s: inserted mirror segment configuration" % textForConfigTable)
def __updateSystemConfigRemoveAddMirror(self, conn, gpArray, seg, textForConfigTable):
"""
We've been asked to update the mirror in a manner that require
it to be removed and then re-added. Perform the tasks
and record our action in gp_configuration_history.
"""
origDbId = seg.getSegmentDbId()
self.__callSegmentRemoveMirror(conn, seg)
dbId = self.__callSegmentAddMirror(conn, gpArray, seg)
# now update mode/status since this is not done by gp_add_segment_mirror
self.__updateSegmentModeStatus(conn, seg)
self.__insertConfigHistory(conn, seg.getSegmentDbId(),
"%s: inserted segment configuration for full recovery or original dbid %s" \
% (textForConfigTable, origDbId))
def __updateSystemConfigUpdateSegment(self, conn, gpArray, seg, originalSeg, textForConfigTable):
# update mode and status
# when adding a mirror, the replication port may change as well
#
if gpArray.getFaultStrategy() == gparray.FAULT_STRATEGY_NONE:
what = "%s: segment hostname and address"
self.__updateSegmentAddress(conn, seg)
else:
what = "%s: segment mode and status"
self.__updateSegmentModeStatus(conn, seg)
if seg.getSegmentReplicationPort() != originalSeg.getSegmentReplicationPort():
what = "%s: segment mode, status, and replication port"
self.__updateSegmentReplicationPort(conn, seg)
self.__insertConfigHistory(conn, seg.getSegmentDbId(), what % textForConfigTable)
def __updateSystemConfigFaultStrategy(self, conn, gpArray):
"""
Update the fault strategy.
"""
fs = gpArray.getFaultStrategy()
sql = "UPDATE gp_fault_strategy\n SET fault_strategy = " + self.__toSqlCharValue(fs) + "\n"
logger.debug(sql)
dbconn.executeUpdateOrInsert(conn, sql, 1)
def __callSegmentRemoveMirror(self, conn, seg):
"""
Call gp_remove_segment_mirror() to remove the mirror.
"""
sql = "SELECT gp_remove_segment_mirror(%s::int2)" % (self.__toSqlIntValue(seg.getSegmentContentId()))
logger.debug(sql)
result = self.__fetchSingleOutputRow(conn, sql)
assert result[0] # must return True
def __callSegmentRemove(self, conn, seg):
"""
Call gp_remove_segment() to remove the primary.
"""
sql = "SELECT gp_remove_segment(%s::int2)" % (self.__toSqlIntValue(seg.getSegmentDbId()))
logger.debug(sql)
result = self.__fetchSingleOutputRow(conn, sql)
assert result[0]
def __callSegmentAdd(self, conn, gpArray, seg):
"""
Call gp_add_segment() to add the primary.
Return the new segment's dbid.
"""
logger.debug('callSegmentAdd %s' % repr(seg))
filespaceMapStr = self.__toSqlFilespaceMapStr(gpArray, seg)
sql = "SELECT gp_add_segment(%s, %s, %s, %s)" \
% (
self.__toSqlTextValue(seg.getSegmentHostName()),
self.__toSqlTextValue(seg.getSegmentAddress()),
self.__toSqlIntValue(seg.getSegmentPort()),
self.__toSqlTextValue(filespaceMapStr)
)
logger.debug(sql)
sqlResult = self.__fetchSingleOutputRow(conn, sql)
dbId = int(sqlResult[0])
seg.setSegmentDbId(dbId)
return dbId
def __callSegmentAddMirror(self, conn, gpArray, seg):
"""
Call gp_add_segment_mirror() to add the mirror.
Return the new segment's dbid.
"""
logger.debug('callSegmentAddMirror %s' % repr(seg))
filespaceMapStr = self.__toSqlFilespaceMapStr(gpArray, seg)
sql = "SELECT gp_add_segment_mirror(%s::int2, %s, %s, %s, %s, %s)" \
% (
self.__toSqlIntValue(seg.getSegmentContentId()),
self.__toSqlTextValue(seg.getSegmentHostName()),
self.__toSqlTextValue(seg.getSegmentAddress()),
self.__toSqlIntValue(seg.getSegmentPort()),
self.__toSqlIntValue(seg.getSegmentReplicationPort()),
self.__toSqlTextValue(filespaceMapStr)
)
logger.debug(sql)
sqlResult = self.__fetchSingleOutputRow(conn, sql)
dbId = int(sqlResult[0])
seg.setSegmentDbId(dbId)
return dbId
def __updateSegmentReplicationPort(self, conn, seg):
# run an update
sql = "UPDATE pg_catalog.gp_segment_configuration\n" + \
" SET\n" + \
" replication_port = " + self.__toSqlIntValue(seg.getSegmentReplicationPort()) + "\n" \
"WHERE dbid = " + self.__toSqlIntValue(seg.getSegmentDbId())
logger.debug(sql)
dbconn.executeUpdateOrInsert(conn, sql, 1)
def __updateSegmentModeStatus(self, conn, seg):
# run an update
sql = "UPDATE pg_catalog.gp_segment_configuration\n" + \
" SET\n" + \
" mode = " + self.__toSqlCharValue(seg.getSegmentMode()) + ",\n" \
" status = " + self.__toSqlCharValue(seg.getSegmentStatus()) + "\n" \
"WHERE dbid = " + self.__toSqlIntValue(seg.getSegmentDbId())
logger.debug(sql)
dbconn.executeUpdateOrInsert(conn, sql, 1)
def __updateSegmentAddress(self, conn, seg):
# run an update
sql = "UPDATE pg_catalog.gp_segment_configuration\n" + \
" SET\n" + \
" hostname = " + self.__toSqlCharValue(seg.getSegmentHostName()) + ",\n" \
" address = " + self.__toSqlCharValue(seg.getSegmentAddress()) + "\n" \
"WHERE dbid = " + self.__toSqlIntValue(seg.getSegmentDbId())
logger.debug(sql)
dbconn.executeUpdateOrInsert(conn, sql, 1)
def __fetchSingleOutputRow(self, conn, sql, retry=False):
"""
Execute specified SQL command and return what we expect to be a single row.
Raise an exception when more or fewer than one row is seen and when more
than one row is seen display up to 10 rows as logger warnings.
"""
cursor = dbconn.execSQL(conn, sql)
numrows = cursor.rowcount
numshown = 0
res = None
for row in cursor:
if numrows != 1:
#
# if we got back more than one row
# we print a few of the rows first
# instead of immediately raising an exception
#
numshown += 1
if numshown > 10:
break
logger.warning('>>> %s' % row)
else:
assert res is None
res = row
assert res is not None
cursor.close()
if numrows != 1:
raise Exception("SQL returned %d rows, not 1 as expected:\n%s" % (numrows, sql))
return res
def __insertConfigHistory(self, conn, dbId, msg ):
# now update change history
sql = "INSERT INTO gp_configuration_history (time, dbid, \"desc\") VALUES(\n" \
"now(),\n " + \
self.__toSqlIntValue(dbId) + ",\n " + \
self.__toSqlCharValue(msg) + "\n)"
logger.debug(sql)
dbconn.executeUpdateOrInsert(conn, sql, 1)
def __toSqlFilespaceMapStr(self, gpArray, seg):
"""
Return a string representation of the filespace map suitable
for inclusion into the call to gp_add_segment_mirror().
"""
filespaceArrayString = []
for fs in gpArray.getAllFilespaces():
path = seg.getSegmentFilespaces()[ fs.getOid() ]
filespaceArrayString.append("{%s,%s}" % \
(self.__toSqlArrayStringValue(fs.getName()), \
self.__toSqlArrayStringValue(path)))
filespaceMapStr = "{" + ",".join(filespaceArrayString) + "}"
return filespaceMapStr
def __toSqlIntValue(self, val):
if val is None:
return "null"
return str(val)
def __toSqlArrayStringValue(self, val):
if val is None:
return "null"
return '"' + val.replace('"','\\"').replace('\\','\\\\') + '"'
def __toSqlCharValue(self, val):
return self.__toSqlTextValue(val)
def __toSqlTextValue(self, val):
if val is None:
return "null"
return "'" + val.replace("'","''").replace('\\','\\\\') + "'"