blob: f46e888e07b2b3ee7ce548af570932d41380db48 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Line too long - pylint: disable=C0301
# Invalid name - pylint: disable=C0103
"""
ComputeCatalogUpdate.py
Used by updateSystemConfig() to compare the db state and
goal state of a gpArray containing the Greenplum segment
configruation details and computes appropriate changes.
"""
import copy
from gppylib.gplog import *
from gppylib.gparray import ROLE_PRIMARY, ROLE_MIRROR, MASTER_CONTENT_ID
from gppylib import gparray
logger = get_default_logger()
class ComputeCatalogUpdate:
"""
Helper class for GpConfigurationProvider.updateSystemConfig().
This computes seven lists of GpDb objects (commonly referenced as 'seg')
from a GpArray, reflecting the logical changes that need to be made
to the database catalog to make it match the system as defined.
The names of the lists are reasonably descriptive:
mirror_to_remove - to be removed (e.g. via gp_remove_segment_mirror())
primary_to_remove - to be removed (e.g. via gp_remove_segment())
primary_to_add - to be added (e.g. via gp_add_segment())
mirror_to_add - to be added (e.g. via gp_add_segment_mirror())
mirror_to_remove_and_add - change or force list requires this mirror
to be removed and then added back
segment_to_update - to be updated (e.g. via SQL)
segment_unchanged - needs no update (included for validation)
"""
def __init__(self, gpArray, forceMap, useUtilityMode, allowPrimary):
"""
This class just holds lists of objects in the underlying gpArray.
As such, it has no methods - the constructor does all the computation.
@param gpArray the array containing the goal and db segment states.
@param forceMap a map of dbid->True for mirrors for which we should force updating via remove/add
@param useUtilityMode True if the operations we're doing are expected to run via utility moed
@param allowPrimary True if caller authorizes add/remove primary operations (e.g. gpexpand)
"""
forceMap = forceMap or {}
self.useUtilityMode = useUtilityMode
self.allowPrimary = allowPrimary
# 'dbsegmap' reflects the current state of the catalog
self.dbsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getSegmentsAsLoadedFromDb()])
# 'goalsegmap' reflects the desired state of the catalog
self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True)])
# find mirrors and primaries to remove
self.mirror_to_remove = [
seg for seg in self.dbsegmap.values() # segment in database
if seg.isSegmentMirror() # segment is a mirror
and (seg.getSegmentDbId() not in self.goalsegmap) # but not in goal configuration
]
self.debuglog("mirror_to_remove: %s", self.mirror_to_remove)
self.primary_to_remove = [
seg for seg in self.dbsegmap.values() # segment is database
if seg.isSegmentPrimary() # segment is a primary
and (seg.getSegmentDbId() not in self.goalsegmap) # but not in goal configuration
]
self.debuglog("primary_to_remove: %s", self.primary_to_remove)
# find primaries and mirrors to add
self.primary_to_add = [
seg for seg in self.goalsegmap.values() # segment in goal configuration
if seg.isSegmentPrimary() # segment is a primary
and (seg.getSegmentDbId() not in self.dbsegmap) # but not in the database
]
self.debuglog("primary_to_add: %s", self.primary_to_add)
self.mirror_to_add = [
seg for seg in self.goalsegmap.values() # segment in goal configuration
if seg.isSegmentMirror() # segment is a mirror
and (seg.getSegmentDbId() not in self.dbsegmap) # but not in the database
]
self.debuglog("mirror_to_add: %s", self.mirror_to_add)
# find segments to update
initial_segment_to_update = [
seg for seg in self.goalsegmap.values() # segment in goal configuration
if (seg.getSegmentDbId() in self.dbsegmap) # and also in the database
and (seg != self.dbsegmap[ seg.getSegmentDbId() ]) # but some attributes differ
]
self.debuglog("initial_segment_to_update: %s", initial_segment_to_update)
# create a map of the segments which we can't update in the
# ordinary way either because they were on the forceMap or
# they differ in an attribute other than mode, status or replication port
removeandaddmap = {}
for seg in initial_segment_to_update:
dbid = seg.getSegmentDbId()
if dbid in forceMap:
removeandaddmap[dbid] = seg
continue
# In GPSQL, no needs to remove the original segment, updating the hostname and address is enough.
if gpArray.getFaultStrategy() == gparray.FAULT_STRATEGY_NONE:
continue
if not seg.equalIgnoringModeAndStatusAndReplicationPort(self.dbsegmap[dbid]):
removeandaddmap[dbid] = seg
continue
# create list of mirrors to update via remove/add
self.mirror_to_remove_and_add = [seg for seg in removeandaddmap.values()]
self.debuglog("mirror_to_remove_and_add: %s", self.mirror_to_remove_and_add)
# find segments to update in the ordinary way
self.segment_to_update = [
seg for seg in initial_segment_to_update # segments to update
if seg.getSegmentDbId() not in removeandaddmap # that don't require remove/add
]
self.debuglog("segment_to_update: %s", self.segment_to_update)
# find segments that don't need change
self.segment_unchanged = [
seg for seg in self.goalsegmap.values() # segment in goal configuration
if (seg.getSegmentDbId() in self.dbsegmap) # and also in the database
and (seg == self.dbsegmap[ seg.getSegmentDbId() ]) # and attribtutes are all the same
]
self.debuglog("segment_unchanged: %s", self.segment_unchanged)
def final_segments(self):
"""
Generate a series of segments appearing in the final configuration.
"""
for seg in self.primary_to_add:
yield seg
for seg in self.mirror_to_add:
yield seg
for seg in self.mirror_to_remove_and_add:
yield seg
for seg in self.segment_to_update:
yield seg
for seg in self.segment_unchanged:
yield seg
def validate(self):
"""
Check that the operation and new configuration is valid.
"""
# Validate that we're not adding or removing primaries unless authorized
#
if not self.allowPrimary:
if len(self.primary_to_add) > 0:
p = self.primary_to_add[0]
raise Exception("Internal error: Operation may not add primary: %s" % repr(p))
if len(self.primary_to_remove) > 0:
p = self.primary_to_remove[0]
raise Exception("Internal error: Operation may not remove primary: %s" % repr(p))
# Validate that operations do not result in a contentid with a pair of segments in same preferred role
#
final = { ROLE_PRIMARY:{}, ROLE_MIRROR:{} }
for seg in self.final_segments():
subset = final[ seg.getSegmentPreferredRole() ]
other = subset.get( seg.getSegmentContentId() )
if other is not None:
raise Exception("Segments sharing a content id may not have same preferred role: %s and %s" % (repr(seg), repr(other)))
subset[ seg.getSegmentContentId() ] = seg
# Validate that if we have any mirrors, that all primaries have mirrors
# If there is only one standby in mirror, skip this check.
check_mirror_sanity = len(final[ ROLE_MIRROR ]) > 0 and len([ contentId for contentId in final[ ROLE_MIRROR] if contentId != MASTER_CONTENT_ID ]) > 0
if check_mirror_sanity:
for contentId in final[ ROLE_PRIMARY ]:
if contentId != MASTER_CONTENT_ID and final[ ROLE_MIRROR ].get( contentId ) is None:
seg = final[ ROLE_PRIMARY ][ contentId ]
raise Exception("Primary must have mirror when mirroring enabled: %s" % repr(seg))
# Validate that the remove/add list contains only qualified mirrors.
# In particular, we disallow remove/add of the master, standby or a primary.
#
for seg in self.mirror_to_remove_and_add:
originalSeg = self.dbsegmap.get(seg.getSegmentDbId())
# filespace and other core has changed, or it's a mirror and we are recovering full
# (in which case we want to call removeMirror and addMirror so we mark
# the primary as full-resyncing)
#
if seg.isSegmentMaster(current_role=True) or seg.isSegmentStandby(current_role=True):
#
# Assertion here -- user should not be allowed to change master/standby info.
#
raise Exception("Internal error: Can only change core details of segments, not masters" \
" (on segment %s) (seg %s vs original %s)" %
(seg.getSegmentDbId(), repr(seg), repr(originalSeg)))
if not seg.isSegmentMirror(current_role=True):
#
# Assertion here -- user should not be allowed to change primary info.
#
return
raise Exception("Internal error: Can only change core details of mirrors, not primaries" \
" (on segment %s) (seg %s vs original %s)" %
(seg.getSegmentDbId(), repr(seg), repr(originalSeg)))
if self.useUtilityMode:
raise Exception("Cannot change core details of mirrors in utility mode")
def debuglog(self, msg, seglist):
"""
Write debugging details about the specified segment list.
"""
logger.debug(msg % ("%s segments" % len(seglist)))
for seg in seglist:
logger.debug(msg % repr(seg))
# minimal test framework when run from command line
#
if __name__ == '__main__':
ROLE_PRIMARY = 'p'
ROLE_MIRROR = 'm'
MODE_NOT_INITIALIZED = ''
MODE_CHANGELOGGING = 'c'
MODE_SYNCHRONIZED = 's'
MODE_RESYNCHRONIZATION = 'r'
class GpDb:
def __init__(self,dbid,content,pref,mode='',curr=None,status='u',rport=0,misc=None):
self.dbid = dbid
self.content = content
self.preferred_role = pref
self.mode = mode
self.role = curr or pref
self.status = status
self.rport = rport
self.misc = misc
def getSegmentDbId(self): return self.dbid
def getSegmentContentId(self): return self.content
def getSegmentPreferredRole(self): return self.preferred_role
def getSegmentMode(self): return self.mode
def getSegmentRole(self): return self.role
def getSegmentStatus(self): return self.status
def getSegmentReplicationPort(self): return self.rport
def setSegmentMode(self,mode): self.mode = mode
def setSegmentStatus(self,status): self.status = status
def setSegmentReplicationPort(self,rport): self.rport = rport
def isSegmentPrimary(self, current_role=False):
role = self.role if current_role else self.preferred_role
return self.content >= 0 and role == ROLE_PRIMARY
def isSegmentMirror(self, current_role=False):
role = self.role if current_role else self.preferred_role
return self.content >= 0 and role == ROLE_MIRROR
def isSegmentMaster(self, current_role=False):
role = self.role if current_role else self.preferred_role
return self.content < 0 and role == ROLE_PRIMARY
def isSegmentStandby(self, current_role=False):
role = self.role if current_role else self.preferred_role
return self.content < 0 and role == ROLE_MIRROR
def __cmp__(self,other): return cmp(repr(self), repr(other))
def __repr__(s):
return '(%s,%s,%s,%s,%s,%s,%s,%s)' % (s.dbid, s.content, s.preferred_role, s.mode, s.role, s.status, s.rport, s.misc)
def equalIgnoringModeAndStatusAndReplicationPort(self, other):
tmp = copy.copy(self)
tmp.setSegmentMode( other.getSegmentMode() )
tmp.setSegmentStatus( other.getSegmentStatus() )
tmp.setSegmentReplicationPort( other.getSegmentReplicationPort() )
return tmp == other
class xxx:
def xxx():
print dbsegmap
print goalsegmap
print 'db not goal', [seg for seg in dbsegmap.values() if seg.getSegmentDbId() not in goalsegmap]
print 'goal not db', [seg for seg in goalsegmap.values() if seg.getSegmentDbId() not in dbsegmap]
class GpArray:
def __init__(s, forceMap=None, useUtilityMode=False, allowPrimary=True):
s.c = ComputeCatalogUpdate(s,forceMap,useUtilityMode,allowPrimary)
s.dump()
def dump(s):
print s.__class__.__name__, s.__class__.__doc__
s.c.validate()
print " -m", s.c.mirror_to_remove,
print " -p", s.c.primary_to_remove,
print " +p", s.c.primary_to_add,
print " +m", s.c.mirror_to_add,
print " +/-m", s.c.mirror_to_remove_and_add,
print " u", s.c.segment_to_update,
print " n", s.c.segment_unchanged
def __repr__(s):
return '<%s,%s>' % (s.getDbList(), s.getSegmentsAsLoadedFromDb())
class GpArrayBad(GpArray):
def __init__(s, forceMap=None, useUtilityMode=False, allowPrimary=True):
try:
GpArray.__init__(s,forceMap,useUtilityMode,allowPrimary)
print " ERROR: expected exception"
except Exception, e:
print " EXPECTED: ", str(e)
class GpArray1(GpArray):
"expect no change"
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray1()
class GpArray1a(GpArray):
"expect update of mirror"
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m','a')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray1a()
class GpArray2(GpArray):
"expect add mirror"
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p','a'), GpDb(2,1,'m','a')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p')]
GpArray2()
class GpArray3(GpArray):
"expect remove mirror"
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m','a')]
GpArray3()
class GpArray4(GpArray):
"expect add primary and mirror"
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m'), GpDb(3,2,'p'), GpDb(4,2,'m')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray4()
class GpArray5(GpArray):
"expect remove primary/mirror and add/primary mirror"
def getDbList(self,includeExpansionSegs): return [GpDb(3,2,'p'), GpDb(4,2,'m')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m','a')]
GpArray5()
class GpArray6(GpArray):
"expect update via add/remove"
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m',misc='x')]
GpArray6()
class GpArray7(GpArrayBad):
"can't rely on remove/add for primary"
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p',misc='x'), GpDb(2,1,'m'), GpDb(3,2,'p'), GpDb(4,2,'m')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray7()
class GpArray8(GpArrayBad):
"can't rely on remove/add for master"
def getDbList(self,includeExpansionSegs): return [GpDb(0,-1,'p',misc="x"), GpDb(1,1,'p'), GpDb(2,1,'m')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(0,-1,'p'), GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray8()
class GpArray9(GpArrayBad):
"can't rely on remove/add for master"
def __init__(s): GpArrayBad.__init__(s,[0])
def getDbList(self,includeExpansionSegs): return [GpDb(0,-1,'p',rport=2), GpDb(1,1,'p'), GpDb(2,1,'m')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(0,-1,'p'), GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray9()
class GpArray10(GpArray):
"expect update"
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m',rport=2)]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray10()
class GpArray11(GpArray):
"expect update via add/remove"
def __init__(s): GpArray.__init__(s,[2])
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m',rport=2)]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray11()
class GpArray12(GpArrayBad):
"can't add primary"
def __init__(s): GpArrayBad.__init__(s,allowPrimary=False)
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m'), GpDb(3,2,'p'), GpDb(4,2,'m')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray12()
class GpArray13(GpArrayBad):
"can't remove primary"
def __init__(s): GpArrayBad.__init__(s,allowPrimary=False)
def getDbList(self,includeExpansionSegs): return [GpDb(2,1,'m')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray13()
class GpArray14(GpArrayBad):
"can't have pair in same preferred role"
def __init__(s): GpArrayBad.__init__(s)
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'p')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray14()
class GpArray15(GpArrayBad):
"can't have pair in same preferred role"
def __init__(s): GpArrayBad.__init__(s)
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'m'), GpDb(2,1,'m')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray15()
class GpArray16(GpArrayBad):
"all primaries must have mirrors when mirroring"
def __init__(s): GpArrayBad.__init__(s)
def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m'), GpDb(3,2,'p')]
def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
GpArray16()