blob: 03918818b705134ebbbd699dafa15bc3f52dde21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* cdbutil.c
* Internal utility support functions for Greenplum Database/PostgreSQL.
*
* NOTES
*
* - According to src/backend/executor/execHeapScan.c
* "tuples returned by heap_getnext() are pointers onto disk
* pages and were not created with palloc() and so should not
* be pfree()'d"
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "catalog/catquery.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
#include "parser/parse_oper.h"
#include "parser/parse_type.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "catalog/gp_configuration.h"
#include "catalog/gp_segment_config.h"
#include "cdb/cdbcat.h"
#include "cdb/cdbutil.h"
#include "cdb/cdblink.h"
#include "nodes/execnodes.h" /* Slice, SliceTable */
#include "cdb/cdbmotion.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbdisp.h"
#include "cdb/ml_ipc.h" /* listener_setup */
#include "cdb/cdbfts.h"
#include "libpq/ip.h"
#include "postmaster/checkpoint.h"
#include "catalog/indexing.h"
#include "utils/faultinjection.h"
#include "cdb/executormgr.h"
/*
* Helper Functions
*/
//static int CdbComponentDatabaseInfoCompare(const void *p1, const void *p2);
//static void getAddressesForDBid(CdbComponentDatabaseInfo *c, int elevel);
static HTAB *segment_ip_cache_htab = NULL;
static char *getDnsCachedAddress(char *name, int port, int elevel);
struct segment_ip_cache_entry {
char key[NAMEDATALEN];
char hostinfo[NI_MAXHOST];
};
typedef struct SegmentConfigurationIterator {
Relation gp_seg_config_rel;
HeapScanDesc gp_seg_config_scan;
} SegmentConfigurationIterator;
static Segment *
ParseSegmentConfigurationRow(SegmentConfigurationIterator *iterator, HeapTuple row)
{
Datum attr;
bool isNull;
Segment *segment = NULL;
segment = palloc0(sizeof(*segment));
attr = heap_getattr(row, Anum_gp_segment_configuration_hostname, RelationGetDescr(iterator->gp_seg_config_rel), &isNull);
segment->hostname = TextDatumGetCString(attr);
attr = heap_getattr(row, Anum_gp_segment_configuration_port, RelationGetDescr(iterator->gp_seg_config_rel), &isNull);
segment->port = DatumGetInt32(attr);
/* use role to determine */
attr = heap_getattr(row, Anum_gp_segment_configuration_role, RelationGetDescr(iterator->gp_seg_config_rel), &isNull);
char role = DatumGetChar(attr);
if (role == SEGMENT_ROLE_MASTER_CONFIG)
segment->master = true;
else if (role == SEGMENT_ROLE_STANDBY_CONFIG)
segment->standby = true;
/* Should remove the following code iff we have DNS module and in-memory FTS. */
attr = heap_getattr(row, Anum_gp_segment_configuration_status, RelationGetDescr(iterator->gp_seg_config_rel), &isNull);
segment->alive = (DatumGetChar(attr) == 'u') ? true : false;
attr = heap_getattr(row, Anum_gp_segment_configuration_address, RelationGetDescr(iterator->gp_seg_config_rel), &isNull);
char *address = TextDatumGetCString(attr);
segment->hostip = getDnsCachedAddress(address, segment->port, DEBUG1);
pfree(address);
return segment;
}
static void
InitIterateSegmentConfiguration(SegmentConfigurationIterator *iterator)
{
iterator->gp_seg_config_rel = heap_open(GpSegmentConfigRelationId, AccessShareLock);
iterator->gp_seg_config_scan = heap_beginscan(iterator->gp_seg_config_rel, SnapshotNow, 0, NULL);
}
static Segment *
IterateSegmentConfiguration(SegmentConfigurationIterator *iterator)
{
HeapTuple gp_seg_config_tuple;
while (HeapTupleIsValid(gp_seg_config_tuple = heap_getnext(iterator->gp_seg_config_scan, ForwardScanDirection)))
{
return ParseSegmentConfigurationRow(iterator, gp_seg_config_tuple);
}
/* No more data. */
heap_endscan(iterator->gp_seg_config_scan);
heap_close(iterator->gp_seg_config_rel, AccessShareLock);
return NULL;
}
List *
GetVirtualSegmentList(void)
{
#if 0
List *real_segments = GetSegmentList();
int real_segment_num = list_length(real_segments);
Segment *dest;
Segment *src;
src = linitial(real_segments);
dest = CopySegment(src);
dest->id = real_segment_num;
dest->dbid = real_segment_num + 1;
return lappend(real_segments, dest);
#else
return GetSegmentList();
#endif
}
Segment *
CopySegment(Segment *src, MemoryContext cxt)
{
MemoryContext valid_cxt = cxt ? cxt : CurrentMemoryContext;
Segment *dest = MemoryContextAllocZero(valid_cxt, sizeof(Segment));
memcpy(dest, src, sizeof(Segment));
/* memory leak risk! */
dest->hostname = MemoryContextStrdup(valid_cxt, src->hostname);
dest->hostip = MemoryContextStrdup(valid_cxt, src->hostip);
return dest;
}
void
FreeSegment(Segment *segment)
{
if (segment == NULL)
return;
if (segment->hostname)
{
pfree(segment->hostname);
}
pfree(segment);
}
Segment *
GetMasterSegment(void)
{
SegmentConfigurationIterator iterator;
Segment *current = NULL;
Segment *master = NULL;
InitIterateSegmentConfiguration(&iterator);
while ((current = IterateSegmentConfiguration(&iterator)))
{
if (!current->master)
{
pfree(current->hostname);
pfree(current);
}
else
{
master = current;
}
}
return master;
}
Segment *
GetStandbySegment(void)
{
SegmentConfigurationIterator iterator;
Segment *current = NULL;
Segment *standby = NULL;
InitIterateSegmentConfiguration(&iterator);
while ((current = IterateSegmentConfiguration(&iterator)))
{
if (!current->standby)
{
pfree(current->hostname);
pfree(current);
}
else
{
standby = current;
}
}
return standby;
}
List *
GetSegmentList(void)
{
List *segments = NIL;
SegmentConfigurationIterator iterator;
Segment *current = NULL;
InitIterateSegmentConfiguration(&iterator);
while ((current = IterateSegmentConfiguration(&iterator)))
{
if (current->master || current->standby)
continue;
segments = lappend(segments, current);
}
return segments;
}
/*
* getCdbComponentDatabases
*
*
* Storage for the SegmentInstances block and all subsidiary
* structures are allocated from the caller's context.
*/
CdbComponentDatabases *
getCdbComponentInfo(bool DNSLookupAsError)
{
#if 0
CdbComponentDatabaseInfo *pOld = NULL;
CdbComponentDatabases *component_databases = NULL;
Relation gp_seg_config_rel;
HeapTuple gp_seg_config_tuple = NULL;
HeapScanDesc gp_seg_config_scan;
/*
* Initial size for info arrays.
*/
int segment_array_size = 500;
int entry_array_size = 4; /* we currently support a max of 2 */
/*
* isNull and attr are used when getting the data for a specific column from a HeapTuple
*/
bool isNull;
Datum attr;
/*
* Local variables for fields from the rows of the tables that we are reading.
*/
char role;
char status = 0;
int i;
/*
* Allocate component_databases return structure and
* component_databases->segment_db_info array with an initial size
* of 128, and component_databases->entry_db_info with an initial
* size of 4. If necessary during row fetching, we grow these by
* doubling each time we run out.
*/
component_databases = palloc0(sizeof(CdbComponentDatabases));
component_databases->segment_db_info =
(CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * segment_array_size);
component_databases->entry_db_info =
(CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * entry_array_size);
gp_seg_config_rel = heap_open(GpSegmentConfigRelationId, AccessShareLock);
gp_seg_config_scan = heap_beginscan(gp_seg_config_rel, SnapshotNow, 0, NULL);
while (HeapTupleIsValid(gp_seg_config_tuple = heap_getnext(gp_seg_config_scan, ForwardScanDirection)))
{
/*
* Grab the fields that we need from gp_configuration. We do
* this first, because until we read them, we don't know
* whether this is an entry database row or a segment database
* row.
*/
CdbComponentDatabaseInfo *pRow;
/*
* dbid
*/
//attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_dbid, RelationGetDescr(gp_seg_config_rel), &isNull);
//Assert(!isNull);
//dbid = DatumGetInt16(attr);
/*
* content
*/
//attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_content, RelationGetDescr(gp_seg_config_rel), &isNull);
//Assert(!isNull);
//content = DatumGetInt16(attr);
/*
* role
*/
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_role, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
role = DatumGetChar(attr);
/*
* preferred-role
*/
//attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_preferred_role, RelationGetDescr(gp_seg_config_rel), &isNull);
//Assert(!isNull);
//preferred_role = DatumGetChar(attr);
/*
* mode
*/
//attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_mode, RelationGetDescr(gp_seg_config_rel), &isNull);
//Assert(!isNull);
//mode = DatumGetChar(attr);
/*
* status
*/
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_status, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
status = DatumGetChar(attr);
/*
* Determine which array to place this rows data in: entry or
* segment, based on the content field.
*/
if (role == SEGMENT_ROLE_PRIMARY)
{
/* if we have a dbid bigger than our array we'll have to grow the array. (MPP-2104) */
if (dbid >= segment_array_size || component_databases->total_segment_dbs >= segment_array_size)
{
/*
* Expand CdbComponentDatabaseInfo array if we've used up currently allocated space
*/
segment_array_size = Max((segment_array_size * 2), dbid * 2);
pOld = component_databases->segment_db_info;
component_databases->segment_db_info = (CdbComponentDatabaseInfo *)
repalloc(pOld, sizeof(CdbComponentDatabaseInfo) * segment_array_size);
}
pRow = &component_databases->segment_db_info[component_databases->total_segment_dbs];
component_databases->total_segment_dbs++;
}
else
{
if (component_databases->total_entry_dbs >= entry_array_size)
{
/*
* Expand CdbComponentDatabaseInfo array if we've used up currently allocated space
*/
entry_array_size *= 2;
pOld = component_databases->entry_db_info;
component_databases->entry_db_info = (CdbComponentDatabaseInfo *)
repalloc(pOld, sizeof(CdbComponentDatabaseInfo) * entry_array_size);
}
pRow = &component_databases->entry_db_info[component_databases->total_entry_dbs];
component_databases->total_entry_dbs++;
}
pRow->role = role;
pRow->status = status;
/*
* hostname
*/
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_hostname, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
pRow->hostname = TextDatumGetCString(attr);
/*
* address
*/
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_address, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
pRow->address = TextDatumGetCString(attr);
/*
* port
*/
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_port, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
pRow->port = DatumGetInt32(attr);
getAddressesForDBid(pRow, DNSLookupAsError ? ERROR : LOG);
pRow->hostip = pRow->hostaddrs[0];
}
/*
* We're done with the catalog entries, cleanup them up, closing
* all the relations we opened.
*/
heap_endscan(gp_seg_config_scan);
heap_close(gp_seg_config_rel, AccessShareLock);
/*
* Validate that there exists at least one entry and one segment
* database in the configuration
*/
if (component_databases->total_segment_dbs == 0)
{
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
errmsg("Greenplum Database number of segment databases cannot be 0")));
}
if (component_databases->total_entry_dbs == 0)
{
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
errmsg("Greenplum Database number of entry databases cannot be 0")));
}
/*
* Now sort the data by segindex, isprimary desc
*/
qsort(component_databases->segment_db_info,
component_databases->total_segment_dbs, sizeof(CdbComponentDatabaseInfo),
CdbComponentDatabaseInfoCompare);
qsort(component_databases->entry_db_info,
component_databases->total_entry_dbs, sizeof(CdbComponentDatabaseInfo),
CdbComponentDatabaseInfoCompare);
/*
* Now count the number of distinct segindexes.
* Since it's sorted, this is easy.
*/
for (i = 0; i < component_databases->total_segment_dbs; i++)
{
if (i == 0 ||
(component_databases->segment_db_info[i].segindex != component_databases->segment_db_info[i - 1].segindex))
{
component_databases->total_segments++;
}
}
return component_databases;
#endif
return NULL;
}
/*
* getCdbComponentDatabases
*
*
* Storage for the SegmentInstances block and all subsidiary
* structures are allocated from the caller's context.
*/
CdbComponentDatabases *
getCdbComponentDatabases(void)
{
return getCdbComponentInfo(true);
}
/*
* freeCdbComponentDatabases
*
* Releases the storage occupied by the CdbComponentDatabases
* struct pointed to by the argument.
*/
void
freeCdbComponentDatabases(CdbComponentDatabases *pDBs)
{
int i;
if (pDBs == NULL)
return;
if (pDBs->segment_db_info != NULL)
{
for (i = 0; i < pDBs->total_segment_dbs; i++)
{
CdbComponentDatabaseInfo *cdi = &pDBs->segment_db_info[i];
freeCdbComponentDatabaseInfo(cdi);
}
pfree(pDBs->segment_db_info);
}
if (pDBs->entry_db_info != NULL)
{
for (i = 0; i < pDBs->total_entry_dbs; i++)
{
CdbComponentDatabaseInfo *cdi = &pDBs->entry_db_info[i];
freeCdbComponentDatabaseInfo(cdi);
}
pfree(pDBs->entry_db_info);
}
pfree(pDBs);
}
/*
* freeCdbComponentDatabaseInfo:
* Releases any storage allocated for members variables of a CdbComponentDatabaseInfo struct.
*/
void
freeCdbComponentDatabaseInfo(CdbComponentDatabaseInfo *cdi)
{
int i;
if (cdi == NULL)
return;
if (cdi->hostname != NULL)
pfree(cdi->hostname);
if (cdi->address != NULL)
pfree(cdi->address);
for (i=0; i < COMPONENT_DBS_MAX_ADDRS; i++)
{
if (cdi->hostaddrs[i] != NULL)
{
pfree(cdi->hostaddrs[i]);
cdi->hostaddrs[i] = NULL;
}
}
}
/*
* performs all necessary setup required for Greenplum Database mode.
*
* This includes cdblink_setup() and initializing the Motion Layer.
*/
void
cdb_setup(void)
{
elog(DEBUG1, "Initializing Greenplum components...");
cdblink_setup();
/* If gp_role is UTILITY, skip this call. */
if (Gp_role != GP_ROLE_UTILITY)
{
/* Initialize the Motion Layer IPC subsystem. */
InitMotionLayerIPC();
}
if (Gp_role == GP_ROLE_DISPATCH)
{
/* check mirrored entry db configuration */
buildMirrorQDDefinition();
if (isQDMirroringPendingCatchup())
{
/*
* Do a checkpoint to cause a checkpoint xlog record to
* to be written and force the QD mirror to get
* synchronized.
*/
RequestCheckpoint(true, false);
}
/*
* This call will generate a warning if master mirroring
* is not synchronized.
*/
QDMirroringWriteCheck();
}
}
/*
* performs all necessary cleanup required when leaving Greenplum
* Database mode. This is also called when the process exits.
*
* NOTE: the arguments to this function are here only so that we can
* register it with on_proc_exit(). These parameters should not
* be used since there are some callers to this that pass them
* as NULL.
*
*/
void
cdb_cleanup(int code __attribute__((unused)) , Datum arg __attribute__((unused)) )
{
elog(DEBUG1, "Cleaning up Greenplum components...");
executormgr_cleanup_env();
if (Gp_role != GP_ROLE_UTILITY)
{
/* shutdown our listener socket */
CleanUpMotionLayerIPC();
/* Move self out from CGroup. */
if (GetQEIndex() != -1)
{
OnMoveOutCGroupForQE();
}
}
}
/*
* CdbComponentDatabaseInfoCompare:
* A compare function for CdbComponentDatabaseInfo structs
* that compares based on , isprimary desc
* for use with qsort.
*/
#if 0
static int
CdbComponentDatabaseInfoCompare(const void *p1, const void *p2)
{
const CdbComponentDatabaseInfo *obj1 = (CdbComponentDatabaseInfo *) p1;
const CdbComponentDatabaseInfo *obj2 = (CdbComponentDatabaseInfo *) p2;
int cmp = obj1->segindex - obj2->segindex;
if (cmp == 0)
{
int obj2cmp=0;
int obj1cmp=0;
if (SEGMENT_IS_ACTIVE_PRIMARY(obj2))
obj2cmp = 1;
if (SEGMENT_IS_ACTIVE_PRIMARY(obj1))
obj1cmp = 1;
cmp = obj2cmp - obj1cmp;
}
return cmp;
}
#endif
/*
* We're going to sort interface-ids by priority. So we need a little
* struct, and a comparison function to hand-off to qsort().
*/
struct priority_iface {
int priority;
int interface_id;
};
/*
* iface_priority_compare() A compare function for interface-priority
* structs. for use with qsort.
*/
#if 0
static int
iface_priority_compare(const void *p1, const void *p2)
{
const struct priority_iface *obj1 = (struct priority_iface *) p1;
const struct priority_iface *obj2 = (struct priority_iface *) p2;
return (obj1->priority - obj2->priority);
}
#endif
/*
* Maintain a cache of names.
*
* The keys are all NAMEDATALEN long.
*/
static char *
getDnsCachedAddress(char *name, int port, int elevel)
{
struct segment_ip_cache_entry *e;
if (segment_ip_cache_htab == NULL)
{
HASHCTL hash_ctl;
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = NAMEDATALEN + 1;
hash_ctl.entrysize = sizeof(struct segment_ip_cache_entry);
segment_ip_cache_htab = hash_create("segment_dns_cache",
256,
&hash_ctl,
HASH_ELEM);
Assert(segment_ip_cache_htab != NULL);
}
e = (struct segment_ip_cache_entry *)hash_search(segment_ip_cache_htab,
name, HASH_FIND, NULL);
/* not in our cache, we've got to actually do the name lookup. */
if (e == NULL)
{
MemoryContext oldContext;
int ret;
char portNumberStr[32];
char *service;
struct addrinfo *addrs = NULL,
*addr;
struct addrinfo hint;
/* Initialize hint structure */
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_STREAM;
hint.ai_family = AF_UNSPEC;
snprintf(portNumberStr, sizeof(portNumberStr), "%d", port);
service = portNumberStr;
ret = pg_getaddrinfo_all(name, service, &hint, &addrs);
if (ret || !addrs)
{
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
ereport(elevel,
(errmsg("could not translate host name \"%s\", port \"%d\" to address: %s",
name, port, gai_strerror(ret))));
return NULL;
}
/* save in the cache context */
oldContext = MemoryContextSwitchTo(TopMemoryContext);
for (addr = addrs; addr; addr = addr->ai_next)
{
#ifdef HAVE_UNIX_SOCKETS
/* Ignore AF_UNIX sockets, if any are returned. */
if (addr->ai_family == AF_UNIX)
continue;
#endif
if (addr->ai_family == AF_INET) /* IPv4 address */
{
char hostinfo[NI_MAXHOST];
pg_getnameinfo_all((struct sockaddr_storage *)addr->ai_addr, addr->ai_addrlen,
hostinfo, sizeof(hostinfo),
NULL, 0,
NI_NUMERICHOST);
/* INSERT INTO OUR CACHE HTAB HERE */
e = (struct segment_ip_cache_entry *)hash_search(segment_ip_cache_htab,
name,
HASH_ENTER,
NULL);
Assert(e != NULL);
memcpy(e->hostinfo, hostinfo, sizeof(hostinfo));
break;
}
}
#ifdef HAVE_IPV6
/*
* IPv6 probably would work fine, we'd just need to make sure all the data structures are big enough for
* the IPv6 address. And on some broken systems, you can get an IPv6 address, but not be able to bind to it
* because IPv6 is disabled or missing in the kernel, so we'd only want to use the IPv6 address if there isn't
* an IPv4 address. All we really need to do is test this.
*/
if (e == NULL && addrs->ai_family == AF_INET6)
{
char hostinfo[NI_MAXHOST];
addr = addrs;
pg_getnameinfo_all((struct sockaddr_storage *)addr->ai_addr, addr->ai_addrlen,
hostinfo, sizeof(hostinfo),
NULL, 0,
NI_NUMERICHOST);
/* INSERT INTO OUR CACHE HTAB HERE */
e = (struct segment_ip_cache_entry *)hash_search(segment_ip_cache_htab,
name,
HASH_ENTER,
NULL);
Assert(e != NULL);
memcpy(e->hostinfo, hostinfo, sizeof(hostinfo));
}
#endif
MemoryContextSwitchTo(oldContext);
pg_freeaddrinfo_all(hint.ai_family, addrs);
}
/* return a pointer to our cache. */
return e->hostinfo;
}
/*
* getDnsAddress
*
* same as getDnsCachedAddress, but without caching. Looks like the
* non-cached version was used inline inside of cdbgang.c, and since
* it is needed now elsewhere, it is factored out to this routine.
*/
char *
getDnsAddress(char *hostname, int port, int elevel)
{
int ret;
char portNumberStr[32];
char *service;
char *result = NULL;
struct addrinfo *addrs = NULL,
*addr;
struct addrinfo hint;
/* Initialize hint structure */
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_STREAM;
hint.ai_family = AF_UNSPEC;
snprintf(portNumberStr, sizeof(portNumberStr), "%d", port);
service = portNumberStr;
ret = pg_getaddrinfo_all(hostname, service, &hint, &addrs);
if (ret || !addrs)
{
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
ereport(elevel,
(errmsg("could not translate host name \"%s\", port \"%d\" to address: %s",
hostname, port, gai_strerror(ret))));
}
for (addr = addrs; addr; addr = addr->ai_next)
{
#ifdef HAVE_UNIX_SOCKETS
/* Ignore AF_UNIX sockets, if any are returned. */
if (addr->ai_family == AF_UNIX)
continue;
#endif
if (addr->ai_family == AF_INET) /* IPv4 address */
{
char hostinfo[NI_MAXHOST];
/* Get a text representation of the IP address */
pg_getnameinfo_all((struct sockaddr_storage *) addr->ai_addr, addr->ai_addrlen,
hostinfo, sizeof(hostinfo),
NULL, 0,
NI_NUMERICHOST);
result = pstrdup(hostinfo);
break;
}
}
#ifdef HAVE_IPV6
/*
* IPv6 should would work fine, we'd just need to make sure all the data structures are big enough for
* the IPv6 address. And on some broken systems, you can get an IPv6 address, but not be able to bind to it
* because IPv6 is disabled or missing in the kernel, so we'd only want to use the IPv6 address if there isn't
* an IPv4 address. All we really need to do is test this.
*/
if (result == NULL && addrs->ai_family == AF_INET6)
{
char hostinfo[NI_MAXHOST];
addr = addrs;
/* Get a text representation of the IP address */
pg_getnameinfo_all((struct sockaddr_storage *) addr->ai_addr, addr->ai_addrlen,
hostinfo, sizeof(hostinfo),
NULL, 0,
NI_NUMERICHOST);
result = pstrdup(hostinfo);
}
#endif
pg_freeaddrinfo_all(hint.ai_family, addrs);
return result;
}
/*
* Given a component-db in the system, find the addresses at which it
* can be reached, appropriately populate the argument-structure, and
* maintain the ip-lookup-cache.
*
* We get all of the interface-ids, sort them in priority order, then
* go get their details ... and then make sure they're cached properly.
*/
#if 0
static void
getAddressesForDBid(CdbComponentDatabaseInfo *c, int elevel)
{
Relation gp_db_interface_rel;
Relation gp_interface_rel;
HeapTuple tuple;
ScanKeyData key;
SysScanDesc dbscan, ifacescan;
int j, i=0;
struct priority_iface *ifaces=NULL;
int iface_count, iface_max=0;
Datum attr;
bool isNull;
int dbid;
int iface_id;
int priority;
char *name;
Assert(c != NULL);
gp_db_interface_rel = heap_open(GpDbInterfacesRelationId, AccessShareLock);
/* CaQL UNDONE: no test coverage */
ScanKeyInit(&key, Anum_gp_db_interfaces_dbid,
BTEqualStrategyNumber, F_INT2EQ,
ObjectIdGetDatum(0));
dbscan = systable_beginscan(gp_db_interface_rel, GpDbInterfacesDbidIndexId,
true, SnapshotNow, 1, &key);
while (HeapTupleIsValid(tuple = systable_getnext(dbscan)))
{
i++;
if (i > iface_max)
{
/* allocate 8-more slots */
if (ifaces == NULL)
ifaces = palloc((iface_max + 8) * sizeof(struct priority_iface));
else
ifaces = repalloc(ifaces, (iface_max + 8) * sizeof(struct priority_iface));
memset(ifaces + iface_max, 0, 8 * sizeof(struct priority_iface));
iface_max += 8;
}
/* dbid is for sanity-check on scan condition only */
attr = heap_getattr(tuple, Anum_gp_db_interfaces_dbid, gp_db_interface_rel->rd_att, &isNull);
Assert(!isNull);
dbid = DatumGetInt16(attr);
//Assert(dbid == c->dbid);
attr = heap_getattr(tuple, Anum_gp_db_interfaces_interfaceid, gp_db_interface_rel->rd_att, &isNull);
Assert(!isNull);
iface_id = DatumGetInt16(attr);
attr = heap_getattr(tuple, Anum_gp_db_interfaces_priority, gp_db_interface_rel->rd_att, &isNull);
Assert(!isNull);
priority = DatumGetInt16(attr);
ifaces[i-1].priority = priority;
ifaces[i-1].interface_id = iface_id;
}
iface_count = i;
/* Finish up scan and close appendonly catalog. */
systable_endscan(dbscan);
heap_close(gp_db_interface_rel, AccessShareLock);
/* we now have the unsorted list, or an empty list. */
do
{
/* fallback to using hostname if our list is empty */
if (iface_count == 0)
break;
qsort(ifaces, iface_count, sizeof(struct priority_iface), iface_priority_compare);
/* we now have interfaces, sorted by priority. */
gp_interface_rel = heap_open(GpInterfacesRelationId, AccessShareLock);
j=0;
for (i=0; i < iface_count; i++)
{
int status=0;
/* CaQL UNDONE: no test coverage */
/* Start a new scan. */
ScanKeyInit(&key, Anum_gp_interfaces_interfaceid,
BTEqualStrategyNumber, F_INT2EQ,
ObjectIdGetDatum(ifaces[i].interface_id));
ifacescan = systable_beginscan(gp_interface_rel, GpInterfacesInterfaceidIndexId,
true, SnapshotNow, 1, &key);
tuple = systable_getnext(ifacescan);
Assert(HeapTupleIsValid(tuple));
/* iface_id is for sanity-check on scan condition only */
attr = heap_getattr(tuple, Anum_gp_interfaces_interfaceid, gp_interface_rel->rd_att, &isNull);
Assert(!isNull);
iface_id = DatumGetInt16(attr);
Assert(iface_id == ifaces[i].interface_id);
attr = heap_getattr(tuple, Anum_gp_interfaces_status, gp_interface_rel->rd_att, &isNull);
Assert(!isNull);
status = DatumGetInt16(attr);
/* if the status is "alive" use the interface. */
if (status == 1)
{
attr = heap_getattr(tuple, Anum_gp_interfaces_address, gp_interface_rel->rd_att, &isNull);
Assert(!isNull);
name = getDnsCachedAddress(DatumGetCString(attr), c->port, elevel);
if (name)
c->hostaddrs[j++] = pstrdup(name);
}
systable_endscan(ifacescan);
}
heap_close(gp_interface_rel, AccessShareLock);
/* fallback to using hostname if our list is empty */
if (j == 0)
break;
/* successfully retrieved at least one entry. */
return;
}
while (0);
/* fallback to using hostname */
memset(c->hostaddrs, 0, COMPONENT_DBS_MAX_ADDRS * sizeof(char *));
/*
* add an entry, using the first the "address" and then the
* "hostname" as fallback.
*/
name = getDnsCachedAddress(c->address, c->port, elevel);
if (name)
{
c->hostaddrs[0] = pstrdup(name);
return;
}
/* now the hostname. */
name = getDnsCachedAddress(c->hostname, c->port, elevel);
if (name)
{
c->hostaddrs[0] = pstrdup(name);
}
else
{
c->hostaddrs[0] = NULL;
}
return;
}
#endif
/*
* Given total number of primary segment databases and a number of
* segments to "skip" - this routine creates a boolean map (array) the
* size of total number of segments and randomly selects several
* entries (total number of total_to_skip) to be marked as
* "skipped". This is used for external tables with the 'gpfdist'
* protocol where we want to get a number of *random* segdbs to
* connect to a gpfdist client.
*
* Caller of this function should pfree skip_map when done with it.
*/
bool *
makeRandomSegMap(int total_primaries, int total_to_skip)
{
int randint; /* some random int representing a seg */
int skipped = 0; /* num segs already marked to be skipped */
bool *skip_map;
skip_map = (bool *) palloc(total_primaries * sizeof(bool));
MemSet(skip_map, false, total_primaries * sizeof(bool));
while (total_to_skip != skipped)
{
/*
* create a random int between 0 and (total_primaries - 1).
*
* NOTE that the lower and upper limits in cdb_randint() are
* inclusive so we take them into account. In reality the
* chance of those limits to get selected by the random
* generator is extremely small, so we may want to find a
* better random generator some time (not critical though).
*/
randint = cdb_randint(0, total_primaries - 1);
/*
* mark this random index 'true' in the skip map (marked to be
* skipped) unless it was already marked.
*/
if (skip_map[randint] == false)
{
skip_map[randint] = true;
skipped++;
}
}
return skip_map;
}
/*
* get dbinfo by registration_order
*/
CdbComponentDatabaseInfo *
registration_order_get_dbinfo(int32 order)
{
HeapTuple tuple;
Relation rel;
bool bOnly;
CdbComponentDatabaseInfo *i = NULL;
cqContext cqc;
if (!AmActiveMaster() && !AmStandbyMaster())
elog(ERROR, "registration_order_get_dbinfo() executed on execution segment");
rel = heap_open(GpSegmentConfigRelationId, AccessShareLock);
tuple = caql_getfirst_only(
caql_addrel(cqclr(&cqc), rel),
&bOnly,
cql("SELECT * FROM gp_segment_configuration "
" WHERE registration_order = :1 ",
Int32GetDatum(order)));
if (HeapTupleIsValid(tuple))
{
Datum attr;
bool isNull;
i = palloc(sizeof(CdbComponentDatabaseInfo));
/*
* role
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_role,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->role = DatumGetChar(attr);
/*
* status
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_status,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->status = DatumGetChar(attr);
/*
* hostname
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_hostname,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->hostname = TextDatumGetCString(attr);
/*
* address
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_address,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->address = TextDatumGetCString(attr);
/*
* port
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_port,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->port = DatumGetInt32(attr);
}
else
{
heap_close(rel, NoLock);
elog(ERROR, "could not find configuration entry for registration_order %c",
order);
}
heap_close(rel, NoLock);
return i;
}
/*
* get dbinfo by role
* There should be only one master in gp_segment_configuration table, one standby at most.
*/
CdbComponentDatabaseInfo *
role_get_dbinfo(char role)
{
HeapTuple tuple;
Relation rel;
cqContext cqc;
bool bOnly;
CdbComponentDatabaseInfo *i = NULL;
Assert(role == SEGMENT_ROLE_PRIMARY ||
role == SEGMENT_ROLE_MASTER_CONFIG ||
role == SEGMENT_ROLE_STANDBY_CONFIG);
/*
* Can only run on a master node, this restriction is due to the reliance
* on the gp_segment_configuration table. This may be able to be relaxed
* by switching to a different method of checking.
*/
if (!AmActiveMaster() && !AmStandbyMaster())
elog(ERROR, "role_get_dbinfo() executed on execution segment");
rel = heap_open(GpSegmentConfigRelationId, AccessShareLock);
tuple = caql_getfirst_only(
caql_addrel(cqclr(&cqc), rel),
&bOnly,
cql("SELECT * FROM gp_segment_configuration "
" WHERE role = :1 ",
CharGetDatum(role)));
if (HeapTupleIsValid(tuple))
{
Datum attr;
bool isNull;
i = palloc(sizeof(CdbComponentDatabaseInfo));
/*
* role
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_role,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->role = DatumGetChar(attr);
/*
* status
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_status,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->status = DatumGetChar(attr);
/*
* hostname
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_hostname,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->hostname = TextDatumGetCString(attr);
/*
* address
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_address,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->address = TextDatumGetCString(attr);
/*
* port
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_port,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->port = DatumGetInt32(attr);
}
else
{
elog(ERROR, "could not find configuration entry for role %c", role);
}
heap_close(rel, NoLock);
return i;
}