| /*------------------------------------------------------------------------- |
| * |
| * cdbutil.c |
| * Internal utility support functions for Apache Cloudberry/PostgreSQL. |
| * |
| * Portions Copyright (c) 2005-2011, Greenplum inc |
| * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates. |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/cdb/cdbutil.c |
| * |
| * NOTES |
| * |
| * - According to src/backend/executor/execHeapScan.c |
| * "tuples returned by heap_getnext() are pointers onto disk |
| * pages and were not created with palloc() and so should not |
| * be pfree()'d" |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #ifdef HAVE_SYS_RESOURCE_H |
| #include <sys/time.h> |
| #include <sys/resource.h> |
| #endif |
| |
| #include <sys/param.h> /* for MAXHOSTNAMELEN */ |
| |
| #include "access/genam.h" |
| #include "common/ip.h" |
| #include "nodes/makefuncs.h" |
| #include "utils/builtins.h" |
| #include "utils/fmgroids.h" |
| #include "utils/memutils.h" |
| #include "catalog/gp_id.h" |
| #include "catalog/indexing.h" |
| #include "cdb/cdbhash.h" |
| #include "cdb/cdbutil.h" |
| #include "cdb/cdbmotion.h" |
| #include "cdb/cdbvars.h" |
| #include "cdb/cdbgang.h" |
| #include "cdb/cdbdisp_query.h" |
| #include "cdb/ml_ipc.h" /* listener_setup */ |
| #include "cdb/cdbtm.h" |
| #include "libpq-fe.h" |
| #include "libpq-int.h" |
| #include "cdb/cdbconn.h" |
| #include "cdb/cdbfts.h" |
| #include "storage/ipc.h" |
| #include "storage/proc.h" |
| #include "postmaster/fts.h" |
| #include "postmaster/postmaster.h" |
| #include "catalog/namespace.h" |
| #include "utils/gpexpand.h" |
| #include "access/xact.h" |
| #include "common/hashfn.h" |
| #include "catalog/gp_indexing.h" |
| #include "utils/etcd.h" |
| #include "common/etcdutils.h" |
| |
| #include "catalog/gp_indexing.h" |
| |
| #define MAX_CACHED_1_GANGS 1 |
| #define INCR_COUNT(cdbinfo, arg) \ |
| (cdbinfo)->arg++; \ |
| (cdbinfo)->cdbs->arg++; |
| |
| #define DECR_COUNT(cdbinfo, arg) \ |
| (cdbinfo)->arg--; \ |
| (cdbinfo)->cdbs->arg--; \ |
| Assert((cdbinfo)->arg >= 0); \ |
| Assert((cdbinfo)->cdbs->arg >= 0); \ |
| |
| #define GPSEGCONFIGDUMPFILE "gpsegconfig_dump" |
| #define GPSEGCONFIGDUMPFILETMP "gpsegconfig_dump_tmp" |
| |
| MemoryContext CdbComponentsContext = NULL; |
| static CdbComponentDatabases *cdb_component_dbs = NULL; |
| |
| #ifdef USE_INTERNAL_FTS |
| |
| /* |
| * Helper Functions |
| */ |
| static CdbComponentDatabases *getCdbComponentInfo(void); |
| static void cleanupComponentIdleQEs(CdbComponentDatabaseInfo *cdi, bool includeWriter); |
| |
| static int CdbComponentDatabaseInfoCompare(const void *p1, const void *p2); |
| |
| static GpSegConfigEntry * readGpSegConfigFromCatalog(int *total_dbs); |
| static GpSegConfigEntry * readGpSegConfigFromFTSFiles(int *total_dbs); |
| static GpSegConfigEntry * readGpSegConfigFromFiles(int *total_dbs); |
| |
| static void getAddressesForDBid(GpSegConfigEntry *c, int elevel); |
| static HTAB *hostPrimaryCountHashTableInit(void); |
| |
| static int nextQEIdentifer(CdbComponentDatabases *cdbs); |
| |
| Datum gp_get_suboverflowed_backends(PG_FUNCTION_ARGS); |
| |
| static HTAB *segment_ip_cache_htab = NULL; |
| |
| int numsegmentsFromQD = -1; |
| |
| typedef struct SegIpEntry |
| { |
| char key[NAMEDATALEN]; |
| char hostinfo[NI_MAXHOST]; |
| } SegIpEntry; |
| |
| typedef struct HostPrimaryCountEntry |
| { |
| char hostname[MAXHOSTNAMELEN]; |
| int segmentCount; |
| } HostPrimaryCountEntry; |
| |
| /* |
| * Helper functions for fetching latest gp_segment_configuration outside of |
| * the transaction. |
| * |
| * In phase 2 of 2PC, current xact has been marked to TRANS_COMMIT/ABORT, |
| * COMMIT_PREPARED or ABORT_PREPARED DTM are performed, if they failed, |
| * dispatcher disconnect and destroy all gangs and fetch the latest segment |
| * configurations to do RETRY_COMMIT_PREPARED or RETRY_ABORT_PREPARED, |
| * however, postgres disallow catalog lookups outside of xacts. |
| * |
| * readGpSegConfigFromFTSFiles() notify FTS to dump the configs from catalog |
| * to a flat file and then read configurations from that file. |
| */ |
| static GpSegConfigEntry * |
| readGpSegConfigFromFTSFiles(int *total_dbs) |
| { |
| Assert(!IsTransactionState() && !IS_HOT_DR_CLUSTER()); |
| /* notify and wait FTS to finish a probe and update the dump file */ |
| FtsNotifyProber(); |
| return readGpSegConfigFromFiles(total_dbs); |
| } |
| |
| static GpSegConfigEntry * |
| readGpSegConfigFromFiles(int *total_dbs) |
| { |
| FILE *fd; |
| int idx = 0; |
| int array_size = 500; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| |
| char hostname[MAXHOSTNAMELEN]; |
| char address[MAXHOSTNAMELEN]; |
| char buf[MAXHOSTNAMELEN * 2 + 32]; |
| |
| fd = AllocateFile(GPSEGCONFIGDUMPFILE, "r"); |
| |
| if (!fd) |
| elog(ERROR, "could not open gp_segment_configutation dump file:%s:%m", GPSEGCONFIGDUMPFILE); |
| |
| configs = palloc0(sizeof (GpSegConfigEntry) * array_size); |
| |
| while (fgets(buf, sizeof(buf), fd)) |
| { |
| config = &configs[idx]; |
| |
| if (sscanf(buf, "%d %d %c %c %c %c %d %s %s", (int *)&config->dbid, (int *)&config->segindex, |
| &config->role, &config->preferred_role, &config->mode, &config->status, |
| &config->port, hostname, address) != GPSEGCONFIGNUMATTR) |
| { |
| FreeFile(fd); |
| elog(ERROR, "invalid data in gp_segment_configuration dump file: %s:%m", GPSEGCONFIGDUMPFILE); |
| } |
| |
| config->hostname = pstrdup(hostname); |
| config->address = pstrdup(address); |
| |
| idx++; |
| /* |
| * Expand CdbComponentDatabaseInfo array if we've used up |
| * currently allocated space |
| */ |
| if (idx >= array_size) |
| { |
| array_size = array_size * 2; |
| configs = (GpSegConfigEntry *) |
| repalloc(configs, sizeof(GpSegConfigEntry) * array_size); |
| } |
| } |
| |
| FreeFile(fd); |
| |
| *total_dbs = idx; |
| return configs; |
| } |
| |
| bool |
| checkGpSegConfigFtsFiles() |
| { |
| FILE *fd = AllocateFile(GPSEGCONFIGDUMPFILE, "r"); |
| |
| if (!fd) |
| return false; |
| |
| FreeFile(fd); |
| return true; |
| } |
| |
| /* |
| * writeGpSegConfigToFTSFiles() dump gp_segment_configuration to the file |
| * GPSEGCONFIGDUMPFILE, in $PGDATA, only FTS process can use this function. |
| * |
| * write contents to GPSEGCONFIGDUMPFILETMP first, then rename it to |
| * GPSEGCONFIGDUMPFILE, it makes lockless read and write concurrently. |
| */ |
| void |
| writeGpSegConfigToFTSFiles(void) |
| { |
| FILE *fd; |
| int idx = 0; |
| int total_dbs = 0; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| |
| Assert(IsTransactionState()); |
| Assert(am_ftsprobe); |
| |
| fd = AllocateFile(GPSEGCONFIGDUMPFILETMP, "w+"); |
| |
| if (!fd) |
| elog(ERROR, "could not create tmp file: %s: %m", GPSEGCONFIGDUMPFILETMP); |
| |
| configs = readGpSegConfigFromCatalog(&total_dbs); |
| |
| for (idx = 0; idx < total_dbs; idx++) |
| { |
| config = &configs[idx]; |
| |
| if (fprintf(fd, "%d %d %c %c %c %c %d %s %s\n", config->dbid, config->segindex, |
| config->role, config->preferred_role, config->mode, config->status, |
| config->port, config->hostname, config->address) < 0) |
| { |
| FreeFile(fd); |
| elog(ERROR, "could not dump gp_segment_configuration to file: %s: %m", GPSEGCONFIGDUMPFILE); |
| } |
| } |
| |
| FreeFile(fd); |
| |
| /* rename tmp file to permanent file */ |
| if (rename(GPSEGCONFIGDUMPFILETMP, GPSEGCONFIGDUMPFILE) != 0) |
| elog(ERROR, "could not rename file %s to file %s: %m", |
| GPSEGCONFIGDUMPFILETMP, GPSEGCONFIGDUMPFILE); |
| } |
| |
| static GpSegConfigEntry * |
| readGpSegConfigFromCatalog(int *total_dbs) |
| { |
| int idx = 0; |
| int array_size; |
| bool isNull; |
| Datum attr; |
| Oid warehouseid = InvalidOid; |
| Relation gp_seg_config_rel; |
| HeapTuple gp_seg_config_tuple = NULL; |
| SysScanDesc gp_seg_config_scan; |
| GpSegConfigEntry *configs; |
| GpSegConfigEntry *config; |
| |
| array_size = 500; |
| configs = palloc0(sizeof(GpSegConfigEntry) * array_size); |
| |
| gp_seg_config_rel = table_open(GpSegmentConfigRelationId, AccessShareLock); |
| gp_seg_config_scan = systable_beginscan(gp_seg_config_rel, InvalidOid, false, NULL, |
| 0, NULL); |
| |
| while (HeapTupleIsValid(gp_seg_config_tuple = systable_getnext(gp_seg_config_scan))) |
| { |
| /* warehouseid */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_warehouseid, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| warehouseid = DatumGetObjectId(attr); |
| |
| /* content */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_content, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| |
| if (warehouseid == GetCurrentWarehouseId() || DatumGetInt16(attr) == MASTER_CONTENT_ID) |
| { |
| config = &configs[idx]; |
| |
| /* dbid */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_dbid, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| config->dbid = DatumGetInt16(attr); |
| |
| /* content */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_content, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| config->segindex= DatumGetInt16(attr); |
| |
| /* role */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_role, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| config->role = DatumGetChar(attr); |
| |
| /* preferred-role */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_preferred_role, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| config->preferred_role = DatumGetChar(attr); |
| |
| /* mode */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_mode, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| config->mode = DatumGetChar(attr); |
| |
| /* status */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_status, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| config->status = DatumGetChar(attr); |
| |
| /* hostname */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_hostname, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| config->hostname = TextDatumGetCString(attr); |
| |
| /* address */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_address, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| config->address = TextDatumGetCString(attr); |
| |
| /* port */ |
| attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_port, RelationGetDescr(gp_seg_config_rel), &isNull); |
| Assert(!isNull); |
| config->port = DatumGetInt32(attr); |
| |
| /* datadir is not dumped*/ |
| |
| idx++; |
| |
| /* |
| * Expand CdbComponentDatabaseInfo array if we've used up |
| * currently allocated space |
| */ |
| if (idx >= array_size) |
| { |
| array_size = array_size * 2; |
| configs = (GpSegConfigEntry *) |
| repalloc(configs, sizeof(GpSegConfigEntry) * array_size); |
| } |
| } |
| } |
| |
| /* |
| * We're done with the catalog config, clean them up, closing all the |
| * relations we opened. |
| */ |
| systable_endscan(gp_seg_config_scan); |
| table_close(gp_seg_config_rel, AccessShareLock); |
| |
| *total_dbs = idx; |
| return configs; |
| } |
| |
| /* |
| * Internal function to initialize each component info |
| */ |
| static CdbComponentDatabases * |
| getCdbComponentInfo(void) |
| { |
| MemoryContext oldContext; |
| CdbComponentDatabaseInfo *cdbInfo; |
| CdbComponentDatabases *component_databases = NULL; |
| GpSegConfigEntry *configs; |
| |
| int i; |
| int x = 0; |
| int total_dbs = 0; |
| |
| bool found; |
| HostPrimaryCountEntry *hsEntry; |
| |
| if (!CdbComponentsContext) |
| CdbComponentsContext = AllocSetContextCreate(TopMemoryContext, "cdb components Context", |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE); |
| |
| oldContext = MemoryContextSwitchTo(CdbComponentsContext); |
| |
| HTAB *hostPrimaryCountHash = hostPrimaryCountHashTableInit(); |
| |
| if (EnableHotDR) |
| { |
| configs = readGpSegConfigFromFiles(&total_dbs); |
| } |
| else |
| { |
| if (IsTransactionState()) |
| configs = readGpSegConfigFromCatalog(&total_dbs); |
| else |
| configs = readGpSegConfigFromFTSFiles(&total_dbs); |
| } |
| |
| component_databases = palloc0(sizeof(CdbComponentDatabases)); |
| |
| component_databases->numActiveQEs = 0; |
| component_databases->numIdleQEs = 0; |
| component_databases->qeCounter = 0; |
| component_databases->freeCounterList = NIL; |
| |
| component_databases->segment_db_info = |
| (CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * total_dbs); |
| |
| component_databases->entry_db_info = |
| (CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * 2); |
| |
| for (i = 0; i < total_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *pRow; |
| GpSegConfigEntry *config = &configs[i]; |
| |
| if (config->hostname == NULL || strlen(config->hostname) > MAXHOSTNAMELEN) |
| { |
| /* |
| * We should never reach here, but add sanity check |
| * The reason we check length is we find MAXHOSTNAMELEN might be |
| * smaller than the ones defined in /etc/hosts. Those are rare cases. |
| */ |
| elog(ERROR, |
| "Invalid length (%d) of hostname (%s)", |
| config->hostname == NULL ? 0 : (int) strlen(config->hostname), |
| config->hostname == NULL ? "" : config->hostname); |
| } |
| |
| /* lookup hostip/hostaddrs cache */ |
| config->hostip= NULL; |
| getAddressesForDBid(config, !am_ftsprobe? ERROR : LOG); |
| |
| /* |
| * We make sure we get a valid hostip for primary here, |
| * if hostip for mirrors can not be get, ignore the error. |
| */ |
| if (config->hostaddrs[0] == NULL && |
| config->role == GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY) |
| ereport(!am_ftsprobe ? ERROR : LOG, |
| (errcode(ERRCODE_CONNECTION_FAILURE), |
| errmsg("cannot resolve network address for dbid=%d", config->dbid))); |
| |
| if (config->hostaddrs[0] != NULL) |
| config->hostip = pstrdup(config->hostaddrs[0]); |
| AssertImply(config->hostip, strlen(config->hostip) <= INET6_ADDRSTRLEN); |
| |
| /* |
| * Determine which array to place this rows data in: entry or segment, |
| * based on the content field. |
| */ |
| if (config->segindex >= 0) |
| { |
| pRow = &component_databases->segment_db_info[component_databases->total_segment_dbs]; |
| component_databases->total_segment_dbs++; |
| } |
| else |
| { |
| pRow = &component_databases->entry_db_info[component_databases->total_entry_dbs]; |
| component_databases->total_entry_dbs++; |
| } |
| |
| pRow->cdbs = component_databases; |
| pRow->config = config; |
| pRow->freelist = NIL; |
| pRow->activelist = NIL; |
| pRow->numIdleQEs = 0; |
| pRow->numActiveQEs = 0; |
| |
| if (config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY) |
| continue; |
| |
| hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, config->hostname, HASH_ENTER, &found); |
| if (found) |
| hsEntry->segmentCount++; |
| else |
| hsEntry->segmentCount = 1; |
| } |
| |
| /* |
| * Validate that there exists at least one entry and one segment database |
| * in the configuration |
| */ |
| /* |
| * In singlenode deployment, total_segment_dbs is zero and it should still work. |
| */ |
| if (component_databases->total_segment_dbs == 0 && !IS_SINGLENODE()) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_CARDINALITY_VIOLATION), |
| errmsg("number of segment databases cannot be 0"))); |
| } |
| if (component_databases->total_entry_dbs == 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_CARDINALITY_VIOLATION), |
| errmsg("number of entry databases cannot be 0"))); |
| } |
| |
| /* |
| * Now sort the data by segindex, isprimary desc |
| */ |
| qsort(component_databases->segment_db_info, |
| component_databases->total_segment_dbs, sizeof(CdbComponentDatabaseInfo), |
| CdbComponentDatabaseInfoCompare); |
| |
| qsort(component_databases->entry_db_info, |
| component_databases->total_entry_dbs, sizeof(CdbComponentDatabaseInfo), |
| CdbComponentDatabaseInfoCompare); |
| |
| /* |
| * Now count the number of distinct segindexes. Since it's sorted, this is |
| * easy. |
| */ |
| for (i = 0; i < component_databases->total_segment_dbs; i++) |
| { |
| if (i == 0 || |
| (component_databases->segment_db_info[i].config->segindex != component_databases->segment_db_info[i - 1].config->segindex)) |
| { |
| component_databases->total_segments++; |
| } |
| } |
| |
| /* |
| * Now validate that our identity is present in the entry databases |
| */ |
| for (i = 0; i < component_databases->total_entry_dbs; i++) |
| { |
| cdbInfo = &component_databases->entry_db_info[i]; |
| |
| if (cdbInfo->config->dbid == GpIdentity.dbid && cdbInfo->config->segindex == GpIdentity.segindex) |
| { |
| break; |
| } |
| } |
| if (i == component_databases->total_entry_dbs) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_DATA_EXCEPTION), |
| errmsg("cannot locate entry database"), |
| errdetail("Entry database represented by this db in gp_segment_configuration: dbid %d content %d", |
| GpIdentity.dbid, GpIdentity.segindex))); |
| } |
| |
| /* |
| * Now validate that the segindexes for the segment databases are between |
| * 0 and (numsegments - 1) inclusive, and that we hit them all. |
| * Since it's sorted, this is relatively easy. |
| */ |
| x = 0; |
| for (i = 0; i < component_databases->total_segments; i++) |
| { |
| int this_segindex = -1; |
| |
| while (x < component_databases->total_segment_dbs) |
| { |
| this_segindex = component_databases->segment_db_info[x].config->segindex; |
| if (this_segindex < i) |
| x++; |
| else if (this_segindex == i) |
| break; |
| else if (this_segindex > i) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_DATA_EXCEPTION), |
| errmsg("content values not valid in %s table", |
| GpSegmentConfigRelationName), |
| errdetail("Content values must be in the range 0 to %d inclusive.", |
| component_databases->total_segments - 1))); |
| } |
| } |
| if (this_segindex != i) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_DATA_EXCEPTION), |
| errmsg("content values not valid in %s table", |
| GpSegmentConfigRelationName), |
| errdetail("Content values must be in the range 0 to %d inclusive", |
| component_databases->total_segments - 1))); |
| } |
| } |
| |
| for (i = 0; i < component_databases->total_segment_dbs; i++) |
| { |
| cdbInfo = &component_databases->segment_db_info[i]; |
| |
| if (!IS_HOT_STANDBY_QD() && cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY) |
| continue; |
| |
| hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, cdbInfo->config->hostname, HASH_FIND, &found); |
| Assert(found || IS_HOT_STANDBY_QD()); |
| /* |
| * Standby and mirror entries can legitimately live on hosts that do not |
| * own any primary segments. In that case the lookup is absent and the |
| * count should be treated as zero instead of dereferencing a NULL entry. |
| */ |
| cdbInfo->hostPrimaryCount = found ? hsEntry->segmentCount : 0; |
| } |
| |
| for (i = 0; i < component_databases->total_entry_dbs; i++) |
| { |
| cdbInfo = &component_databases->entry_db_info[i]; |
| |
| if (!IS_HOT_STANDBY_QD() && cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY) |
| continue; |
| |
| hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, cdbInfo->config->hostname, HASH_FIND, &found); |
| Assert(found || IS_HOT_STANDBY_QD()); |
| /* |
| * Standby and mirror entries can legitimately live on hosts that do not |
| * own any primary segments. In that case the lookup is absent and the |
| * count should be treated as zero instead of dereferencing a NULL entry. |
| */ |
| cdbInfo->hostPrimaryCount = found ? hsEntry->segmentCount : 0; |
| } |
| |
| hash_destroy(hostPrimaryCountHash); |
| |
| MemoryContextSwitchTo(oldContext); |
| |
| return component_databases; |
| } |
| |
| /* |
| * Helper function to clean up the idle segdbs list of |
| * a segment component. |
| */ |
| static void |
| cleanupComponentIdleQEs(CdbComponentDatabaseInfo *cdi, bool includeWriter) |
| { |
| SegmentDatabaseDescriptor *segdbDesc; |
| MemoryContext oldContext; |
| ListCell *curItem; |
| |
| Assert(CdbComponentsContext); |
| oldContext = MemoryContextSwitchTo(CdbComponentsContext); |
| curItem = list_head(cdi->freelist); |
| |
| foreach(curItem, cdi->freelist) |
| { |
| segdbDesc = (SegmentDatabaseDescriptor *)lfirst(curItem); |
| Assert(segdbDesc); |
| |
| if (segdbDesc->isWriter && !includeWriter) |
| continue; |
| |
| cdi->freelist = foreach_delete_current(cdi->freelist, curItem); |
| DECR_COUNT(cdi, numIdleQEs); |
| |
| cdbconn_termSegmentDescriptor(segdbDesc); |
| } |
| |
| MemoryContextSwitchTo(oldContext); |
| } |
| |
| void |
| cdbcomponent_cleanupIdleQEs(bool includeWriter) |
| { |
| CdbComponentDatabases *cdbs; |
| int i; |
| |
| /* use cdb_component_dbs directly */ |
| cdbs = cdb_component_dbs; |
| |
| if (cdbs == NULL) |
| return; |
| |
| if (cdbs->segment_db_info != NULL) |
| { |
| for (i = 0; i < cdbs->total_segment_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *cdi = &cdbs->segment_db_info[i]; |
| cleanupComponentIdleQEs(cdi, includeWriter); |
| } |
| } |
| |
| if (cdbs->entry_db_info != NULL) |
| { |
| for (i = 0; i < cdbs->total_entry_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *cdi = &cdbs->entry_db_info[i]; |
| cleanupComponentIdleQEs(cdi, includeWriter); |
| } |
| } |
| |
| return; |
| } |
| |
| /* |
| * This function is called when a transaction is started and the snapshot of |
| * segments info will not changed until the end of transaction |
| */ |
| void |
| cdbcomponent_updateCdbComponents(void) |
| { |
| uint8 ftsVersion= getFtsVersion(); |
| int expandVersion = GetGpExpandVersion(); |
| |
| /* |
| * FTS takes responsibility for updating gp_segment_configuration, in each |
| * fts probe cycle, FTS firstly gets a copy of current configuration, then |
| * probe the segments based on it and finally free the copy in the end. In |
| * the probe stage, FTS might start/close transactions many times, so FTS |
| * should not update current copy of gp_segment_configuration when a new |
| * transaction is started. |
| */ |
| if (am_ftsprobe) |
| return; |
| |
| PG_TRY(); |
| { |
| if (cdb_component_dbs == NULL) |
| { |
| cdb_component_dbs = getCdbComponentInfo(); |
| cdb_component_dbs->fts_version = ftsVersion; |
| cdb_component_dbs->expand_version = GetGpExpandVersion(); |
| } |
| else if ((cdb_component_dbs->fts_version != ftsVersion || |
| cdb_component_dbs->expand_version != expandVersion)) |
| { |
| if (TempNamespaceOidIsValid()) |
| { |
| /* |
| * Do not update here, otherwise, temp files will be lost |
| * in segments; |
| */ |
| } |
| else |
| { |
| ELOG_DISPATCHER_DEBUG("FTS rescanned, get new component databases info."); |
| cdbcomponent_destroyCdbComponents(); |
| cdb_component_dbs = getCdbComponentInfo(); |
| cdb_component_dbs->fts_version = ftsVersion; |
| cdb_component_dbs->expand_version = expandVersion; |
| } |
| } |
| } |
| PG_CATCH(); |
| { |
| FtsNotifyProber(); |
| |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| Assert(cdb_component_dbs->numActiveQEs == 0); |
| } |
| |
| /* |
| * cdbcomponent_getCdbComponents |
| * |
| * |
| * Storage for the SegmentInstances block and all subsidiary |
| * structures are allocated from the caller's context. |
| */ |
| CdbComponentDatabases * |
| cdbcomponent_getCdbComponents() |
| { |
| PG_TRY(); |
| { |
| if (cdb_component_dbs == NULL) |
| { |
| cdb_component_dbs = getCdbComponentInfo(); |
| cdb_component_dbs->fts_version = getFtsVersion(); |
| cdb_component_dbs->expand_version = GetGpExpandVersion(); |
| } |
| } |
| PG_CATCH(); |
| { |
| if (Gp_role == GP_ROLE_DISPATCH) |
| FtsNotifyProber(); |
| |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| return cdb_component_dbs; |
| } |
| |
| /* |
| * cdbcomponet_destroyCdbComponents |
| * |
| * Disconnect and destroy all idle QEs, releases the memory |
| * occupied by the CdbComponentDatabases |
| * |
| * callers must clean up QEs used by dispatcher states. |
| */ |
| void |
| cdbcomponent_destroyCdbComponents(void) |
| { |
| /* caller must clean up all segdbs used by dispatcher states */ |
| Assert(!cdbcomponent_activeQEsExist()); |
| |
| hash_destroy(segment_ip_cache_htab); |
| segment_ip_cache_htab = NULL; |
| |
| /* disconnect and destroy idle QEs include writers */ |
| cdbcomponent_cleanupIdleQEs(true); |
| |
| /* delete the memory context */ |
| if (CdbComponentsContext) |
| MemoryContextDelete(CdbComponentsContext); |
| CdbComponentsContext = NULL; |
| cdb_component_dbs = NULL; |
| } |
| |
| /* |
| * Allocated a segdb |
| * |
| * If there is idle segdb in the freelist, return it, otherwise, initialize |
| * a new segdb. |
| * |
| * idle segdbs has an established connection with segment, but new segdb is |
| * not setup yet, callers need to establish the connection by themselves. |
| */ |
| SegmentDatabaseDescriptor * |
| cdbcomponent_allocateIdleQE(int contentId, SegmentType segmentType) |
| { |
| SegmentDatabaseDescriptor *segdbDesc = NULL; |
| CdbComponentDatabaseInfo *cdbinfo; |
| ListCell *curItem; |
| MemoryContext oldContext; |
| bool isWriter; |
| |
| cdbinfo = cdbcomponent_getComponentInfo(contentId); |
| |
| oldContext = MemoryContextSwitchTo(CdbComponentsContext); |
| |
| /* |
| * Always try to pop from the head. Make sure to push them back to head |
| * in cdbcomponent_recycleIdleQE(). |
| */ |
| foreach(curItem, cdbinfo->freelist) |
| { |
| SegmentDatabaseDescriptor *tmp = |
| (SegmentDatabaseDescriptor *)lfirst(curItem); |
| Assert(tmp); |
| |
| if ((segmentType == SEGMENTTYPE_EXPLICT_WRITER && !tmp->isWriter) || |
| (segmentType == SEGMENTTYPE_EXPLICT_READER && tmp->isWriter)) |
| continue; |
| |
| cdbinfo->freelist = foreach_delete_current(cdbinfo->freelist, curItem); |
| /* update numIdleQEs */ |
| DECR_COUNT(cdbinfo, numIdleQEs); |
| |
| segdbDesc = tmp; |
| break; |
| } |
| |
| if (!segdbDesc) |
| { |
| /* |
| * 1. for entrydb, it's never be writer. |
| * 2. for first QE, it must be a writer. |
| */ |
| isWriter = contentId == -1 ? false: (cdbinfo->numIdleQEs == 0 && cdbinfo->numActiveQEs == 0); |
| segdbDesc = cdbconn_createSegmentDescriptor(cdbinfo, nextQEIdentifer(cdbinfo->cdbs), isWriter); |
| } |
| |
| cdbconn_setQEIdentifier(segdbDesc, -1); |
| |
| cdbinfo->activelist = lcons(segdbDesc, cdbinfo->activelist); |
| INCR_COUNT(cdbinfo, numActiveQEs); |
| |
| MemoryContextSwitchTo(oldContext); |
| |
| return segdbDesc; |
| } |
| |
| static bool |
| cleanupQE(SegmentDatabaseDescriptor *segdbDesc) |
| { |
| Assert(segdbDesc != NULL); |
| |
| #ifdef FAULT_INJECTOR |
| if (SIMPLE_FAULT_INJECTOR("cleanup_qe") == FaultInjectorTypeSkip) |
| return false; |
| #endif |
| |
| /* |
| * if the process is in the middle of blowing up... then we don't do |
| * anything here. making libpq and other calls can definitely result in |
| * things getting HUNG. |
| */ |
| if (proc_exit_inprogress) |
| return false; |
| |
| if (cdbconn_isBadConnection(segdbDesc)) |
| return false; |
| |
| /* if segment is down, the gang can not be reused */ |
| if (FtsIsSegmentDown(segdbDesc->segment_database_info)) |
| return false; |
| |
| /* If a reader exceed the cached memory limitation, destroy it */ |
| if (!segdbDesc->isWriter && |
| (segdbDesc->conn->mop_high_watermark >> 20) > gp_vmem_protect_gang_cache_limit) |
| return false; |
| |
| /* Note, we cancel all "still running" queries */ |
| if (!cdbconn_discardResults(segdbDesc, 20)) |
| { |
| elog(LOG, "cleaning up seg%d while it is still busy", segdbDesc->segindex); |
| return false; |
| } |
| |
| /* QE is no longer associated with a slice. */ |
| cdbconn_setQEIdentifier(segdbDesc, /* slice index */ -1); |
| |
| return true; |
| } |
| |
| void |
| cdbcomponent_recycleIdleQE(SegmentDatabaseDescriptor *segdbDesc, bool forceDestroy) |
| { |
| CdbComponentDatabaseInfo *cdbinfo; |
| MemoryContext oldContext; |
| int maxLen; |
| bool isWriter; |
| |
| Assert(cdb_component_dbs); |
| Assert(CdbComponentsContext); |
| |
| cdbinfo = segdbDesc->segment_database_info; |
| isWriter = segdbDesc->isWriter; |
| |
| /* update num of active QEs */ |
| Assert(list_member_ptr(cdbinfo->activelist, segdbDesc)); |
| cdbinfo->activelist = list_delete_ptr(cdbinfo->activelist, segdbDesc); |
| DECR_COUNT(cdbinfo, numActiveQEs); |
| |
| oldContext = MemoryContextSwitchTo(CdbComponentsContext); |
| |
| if (forceDestroy || !cleanupQE(segdbDesc)) |
| goto destroy_segdb; |
| |
| /* If freelist length exceed gp_cached_gang_threshold, destroy it */ |
| maxLen = segdbDesc->segindex == -1 ? |
| MAX_CACHED_1_GANGS : gp_cached_gang_threshold; |
| if (!isWriter && list_length(cdbinfo->freelist) >= maxLen) |
| goto destroy_segdb; |
| |
| /* Recycle the QE, put it to freelist */ |
| if (isWriter) |
| { |
| /* writer is always the header of freelist */ |
| segdbDesc->segment_database_info->freelist = |
| lcons(segdbDesc, segdbDesc->segment_database_info->freelist); |
| } |
| else |
| { |
| int lastWriterPos = -1; |
| ListCell *cell; |
| |
| /* |
| * In cdbcomponent_allocateIdleQE() readers are always popped from the |
| * head, so to restore the original order we must pushed them back to |
| * the head, and keep in mind readers must be put after the writers. |
| */ |
| |
| for (cell = list_head(segdbDesc->segment_database_info->freelist); |
| cell && ((SegmentDatabaseDescriptor *) lfirst(cell))->isWriter; |
| lastWriterPos++, cell = lnext(segdbDesc->segment_database_info->freelist, cell)) ; |
| |
| if (lastWriterPos >= 0) |
| segdbDesc->segment_database_info->freelist = list_insert_nth(segdbDesc->segment_database_info->freelist, |
| lastWriterPos + 1, segdbDesc); |
| else |
| segdbDesc->segment_database_info->freelist = |
| lcons(segdbDesc, segdbDesc->segment_database_info->freelist); |
| } |
| |
| INCR_COUNT(cdbinfo, numIdleQEs); |
| |
| MemoryContextSwitchTo(oldContext); |
| |
| return; |
| |
| destroy_segdb: |
| |
| cdbconn_termSegmentDescriptor(segdbDesc); |
| |
| if (isWriter) |
| { |
| markCurrentGxactWriterGangLost(); |
| } |
| |
| MemoryContextSwitchTo(oldContext); |
| } |
| |
| static int |
| nextQEIdentifer(CdbComponentDatabases *cdbs) |
| { |
| int result; |
| |
| if (!cdbs->freeCounterList) |
| return cdbs->qeCounter++; |
| |
| result = linitial_int(cdbs->freeCounterList); |
| cdbs->freeCounterList = list_delete_first(cdbs->freeCounterList); |
| return result; |
| } |
| |
| bool |
| cdbcomponent_qesExist(void) |
| { |
| return !cdb_component_dbs ? false : |
| (cdb_component_dbs->numIdleQEs > 0 || cdb_component_dbs->numActiveQEs > 0); |
| } |
| |
| bool |
| cdbcomponent_activeQEsExist(void) |
| { |
| return !cdb_component_dbs ? false : cdb_component_dbs->numActiveQEs > 0; |
| } |
| |
| /* |
| * Find CdbComponentDatabaseInfo in the array by segment index. |
| */ |
| CdbComponentDatabaseInfo * |
| cdbcomponent_getComponentInfo(int contentId) |
| { |
| CdbComponentDatabaseInfo *cdbInfo = NULL; |
| CdbComponentDatabases *cdbs; |
| |
| cdbs = cdbcomponent_getCdbComponents(); |
| |
| if (contentId < -1 || contentId >= cdbs->total_segments) |
| ereport(FATAL, |
| (errcode(ERRCODE_DATA_EXCEPTION), |
| errmsg("unexpected content id %d, should be [-1, %d]", |
| contentId, cdbs->total_segments - 1))); |
| /* entry db */ |
| if (contentId == -1) |
| { |
| Assert(cdbs->total_entry_dbs == 1 || cdbs->total_entry_dbs == 2); |
| /* |
| * For a standby QD, get the last entry db which can be the first (on |
| * a replica cluster) or the second (on a mirrored cluster) entry. |
| */ |
| if (IS_HOT_STANDBY_QD()) |
| cdbInfo = &cdbs->entry_db_info[cdbs->total_entry_dbs - 1]; |
| else |
| cdbInfo = &cdbs->entry_db_info[0]; |
| |
| return cdbInfo; |
| } |
| |
| /* no mirror, segment_db_info is sorted by content id */ |
| if (cdbs->total_segment_dbs == cdbs->total_segments) |
| { |
| cdbInfo = &cdbs->segment_db_info[contentId]; |
| return cdbInfo; |
| } |
| |
| /* with mirror, segment_db_info is sorted by content id */ |
| if (cdbs->total_segment_dbs != cdbs->total_segments) |
| { |
| Assert(cdbs->total_segment_dbs == cdbs->total_segments * 2); |
| cdbInfo = &cdbs->segment_db_info[2 * contentId]; |
| |
| /* use the other segment if it is not what the QD wants */ |
| if ((IS_HOT_STANDBY_QD() && SEGMENT_IS_ACTIVE_PRIMARY(cdbInfo)) |
| || (!IS_HOT_STANDBY_QD() && !SEGMENT_IS_ACTIVE_PRIMARY(cdbInfo))) |
| cdbInfo = &cdbs->segment_db_info[2 * contentId + 1]; |
| |
| return cdbInfo; |
| } |
| |
| return cdbInfo; |
| } |
| |
| static void |
| ensureInterconnectAddress(void) |
| { |
| /* |
| * If the address type is wildcard, there is no need to populate an unicast |
| * address in interconnect_address. |
| */ |
| if (Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_WILDCARD) |
| { |
| interconnect_address = NULL; |
| return; |
| } |
| |
| Assert(Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_UNICAST); |
| |
| /* If the unicast address has already been assigned, exit early. */ |
| if (interconnect_address) |
| return; |
| |
| /* |
| * Retrieve the segment's gp_segment_configuration.address value, in order |
| * to setup interconnect_address |
| */ |
| |
| if (GpIdentity.segindex >= 0) |
| { |
| Assert(Gp_role == GP_ROLE_EXECUTE); |
| Assert(MyProcPort->laddr.addr.ss_family == AF_INET |
| || MyProcPort->laddr.addr.ss_family == AF_INET6); |
| /* |
| * We assume that the QD, using the address in gp_segment_configuration |
| * as its destination IP address, connects to the segment/QE. |
| * So, the local address in the PORT can be used for interconnect. |
| */ |
| char local_addr[NI_MAXHOST]; |
| getnameinfo((const struct sockaddr *)&MyProcPort->laddr.addr, |
| MyProcPort->laddr.salen, |
| local_addr, sizeof(local_addr), |
| NULL, 0, NI_NUMERICHOST); |
| interconnect_address = MemoryContextStrdup(TopMemoryContext, local_addr); |
| } |
| else if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| /* |
| * Here, we can only retrieve the ADDRESS in gp_segment_configuration |
| * from `cdbcomponent*`. We couldn't get it in a way as the QEs. |
| */ |
| CdbComponentDatabaseInfo *qdInfo; |
| qdInfo = cdbcomponent_getComponentInfo(MASTER_CONTENT_ID); |
| interconnect_address = MemoryContextStrdup(TopMemoryContext, qdInfo->config->hostip); |
| } |
| else if (qdHostname && qdHostname[0] != '\0') |
| { |
| Assert(Gp_role == GP_ROLE_EXECUTE); |
| /* |
| * QE on the master can't get its interconnect address like that on the primary. |
| * The QD connects to its postmaster via the unix domain socket. |
| */ |
| interconnect_address = qdHostname; |
| } |
| Assert(interconnect_address && strlen(interconnect_address) > 0); |
| } |
| /* |
| * performs all necessary setup required for Apache Cloudberry mode. |
| * |
| * This includes cdblink_setup() and initializing the Motion Layer. |
| */ |
| void |
| cdb_setup(void) |
| { |
| elog(DEBUG1, "Initializing Cloudberry components..."); |
| |
| if (Gp_role != GP_ROLE_UTILITY && !IS_SINGLENODE()) |
| { |
| if (!CurrentMotionIPCLayer) { |
| ereport(ERROR, |
| (errmsg("Interconnect moudle have not been preloaded"), |
| errdetail("Please make sure interconnect is included in option shared_preload_libraries"))); |
| } |
| |
| ensureInterconnectAddress(); |
| /* Initialize the Motion Layer IPC subsystem. */ |
| CurrentMotionIPCLayer->InitMotionLayerIPC(); |
| } |
| |
| /* |
| * Backend process requires consistent state, it cannot proceed until |
| * dtx recovery process finish up the recovery of distributed transactions. |
| * |
| * Ignore background worker because bgworker_should_start_mpp() already did |
| * the check. |
| * |
| * Ignore if we are the standby coordinator started in hot standby mode. |
| * We don't expect dtx recovery to have finished, as dtx recovery is |
| * performed at the end of startup. In hot standby, we are recovering |
| * continuously and should allow queries much earlier. Since a hot standby |
| * won't proceed dtx, it is not required to wait for recovery of the dtx |
| * that has been prepared but not committed (i.e. to commit them); on the |
| * other hand, the recovery of any in-doubt transactions (i.e. not prepared) |
| * won't bother a hot standby either, just like they can be recovered in the |
| * background when a primary instance is running. |
| */ |
| if (!IsBackgroundWorker && |
| Gp_role == GP_ROLE_DISPATCH && |
| !*shmDtmStarted && |
| !IS_HOT_STANDBY_QD()) |
| { |
| ereport(FATAL, |
| (errcode(ERRCODE_CANNOT_CONNECT_NOW), |
| errmsg(POSTMASTER_IN_RECOVERY_MSG), |
| errdetail("waiting for distributed transaction recovery to complete"))); |
| } |
| } |
| |
| /* |
| * performs all necessary cleanup required when leaving Cloudberry |
| * Database mode. This is also called when the process exits. |
| * |
| * NOTE: the arguments to this function are here only so that we can |
| * register it with on_proc_exit(). These parameters should not |
| * be used since there are some callers to this that pass them |
| * as NULL. |
| * |
| */ |
| void |
| cdb_cleanup(int code pg_attribute_unused(), Datum arg |
| pg_attribute_unused()) |
| { |
| elog(DEBUG1, "Cleaning up Cloudberry components..."); |
| |
| DisconnectAndDestroyAllGangs(true); |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| if (cdb_total_plans > 0) |
| { |
| elog(DEBUG1, "session dispatched %d plans %d slices (%f), largest plan %d", |
| cdb_total_plans, cdb_total_slices, |
| ((double) cdb_total_slices / (double) cdb_total_plans), |
| cdb_max_slices); |
| } |
| } |
| |
| if (Gp_role != GP_ROLE_UTILITY && !IS_SINGLENODE()) |
| { |
| /* shutdown our listener socket */ |
| CurrentMotionIPCLayer->CleanUpMotionLayerIPC(); |
| } |
| } |
| |
| /* |
| * CdbComponentDatabaseInfoCompare: |
| * A compare function for CdbComponentDatabaseInfo structs |
| * that compares based on , isprimary desc |
| * for use with qsort. |
| */ |
| static int |
| CdbComponentDatabaseInfoCompare(const void *p1, const void *p2) |
| { |
| const CdbComponentDatabaseInfo *obj1 = (CdbComponentDatabaseInfo *) p1; |
| const CdbComponentDatabaseInfo *obj2 = (CdbComponentDatabaseInfo *) p2; |
| |
| int cmp = obj1->config->segindex - obj2->config->segindex; |
| |
| if (cmp == 0) |
| { |
| int obj2cmp = 0; |
| int obj1cmp = 0; |
| |
| if (SEGMENT_IS_ACTIVE_PRIMARY(obj2)) |
| obj2cmp = 1; |
| |
| if (SEGMENT_IS_ACTIVE_PRIMARY(obj1)) |
| obj1cmp = 1; |
| |
| cmp = obj2cmp - obj1cmp; |
| } |
| |
| return cmp; |
| } |
| |
| /* |
| * Maintain a cache of names. |
| * |
| * The keys are all NAMEDATALEN long. |
| */ |
| static char * |
| getDnsCachedAddress(char *name, int port, int elevel, bool use_cache) |
| { |
| SegIpEntry *e = NULL; |
| char hostinfo[NI_MAXHOST]; |
| |
| if (use_cache) |
| { |
| if (segment_ip_cache_htab == NULL) |
| { |
| HASHCTL hash_ctl; |
| |
| MemSet(&hash_ctl, 0, sizeof(hash_ctl)); |
| |
| hash_ctl.keysize = NAMEDATALEN + 1; |
| hash_ctl.entrysize = sizeof(SegIpEntry); |
| |
| segment_ip_cache_htab = hash_create("segment_dns_cache", |
| 256, &hash_ctl, HASH_ELEM | HASH_STRINGS); |
| } |
| else |
| { |
| e = (SegIpEntry *) hash_search(segment_ip_cache_htab, |
| name, HASH_FIND, NULL); |
| if (e != NULL) |
| return e->hostinfo; |
| } |
| } |
| |
| /* |
| * The name is either not in our cache, or we've been instructed to not |
| * use the cache. Perform the name lookup. |
| */ |
| if (!use_cache || (use_cache && e == NULL)) |
| { |
| MemoryContext oldContext = NULL; |
| int ret; |
| char portNumberStr[32]; |
| char *service; |
| struct addrinfo *addrs = NULL, |
| *addr; |
| struct addrinfo hint; |
| |
| /* Initialize hint structure */ |
| MemSet(&hint, 0, sizeof(hint)); |
| hint.ai_socktype = SOCK_STREAM; |
| hint.ai_family = AF_UNSPEC; |
| |
| snprintf(portNumberStr, sizeof(portNumberStr), "%d", port); |
| service = portNumberStr; |
| |
| ret = pg_getaddrinfo_all(name, service, &hint, &addrs); |
| if (ret || !addrs) |
| { |
| if (addrs) |
| pg_freeaddrinfo_all(hint.ai_family, addrs); |
| |
| /* |
| * If a host name is unknown, whether it is an error depends on its role: |
| * - if it is a primary then it's an error; |
| * - if it is a mirror then it's just a warning; |
| * but we do not know the role information here, so always treat it as a |
| * warning, the callers should check the role and decide what to do. |
| */ |
| if (ret != EAI_FAIL && elevel == ERROR) |
| elevel = WARNING; |
| |
| ereport(elevel, |
| (errmsg("could not translate host name \"%s\", port \"%d\" to address: %s", |
| name, port, gai_strerror(ret)))); |
| |
| return NULL; |
| } |
| |
| /* save in the cache context */ |
| if (use_cache) |
| oldContext = MemoryContextSwitchTo(TopMemoryContext); |
| |
| hostinfo[0] = '\0'; |
| for (addr = addrs; addr; addr = addr->ai_next) |
| { |
| #ifdef HAVE_UNIX_SOCKETS |
| /* Ignore AF_UNIX sockets, if any are returned. */ |
| if (addr->ai_family == AF_UNIX) |
| continue; |
| #endif |
| if (addr->ai_family == AF_INET) /* IPv4 address */ |
| { |
| memset(hostinfo, 0, sizeof(hostinfo)); |
| pg_getnameinfo_all((struct sockaddr_storage *) addr->ai_addr, addr->ai_addrlen, |
| hostinfo, sizeof(hostinfo), |
| NULL, 0, |
| NI_NUMERICHOST); |
| |
| if (use_cache) |
| { |
| /* Insert into our cache htab */ |
| e = (SegIpEntry *) hash_search(segment_ip_cache_htab, |
| name, HASH_ENTER, NULL); |
| memcpy(e->hostinfo, hostinfo, sizeof(hostinfo)); |
| } |
| |
| break; |
| } |
| } |
| |
| #ifdef HAVE_IPV6 |
| |
| /* |
| * IPv6 probably would work fine, we'd just need to make sure all the |
| * data structures are big enough for the IPv6 address. And on some |
| * broken systems, you can get an IPv6 address, but not be able to |
| * bind to it because IPv6 is disabled or missing in the kernel, so |
| * we'd only want to use the IPv6 address if there isn't an IPv4 |
| * address. All we really need to do is test this. |
| */ |
| if (((!use_cache && !hostinfo[0]) || (use_cache && e == NULL)) |
| && addrs->ai_family == AF_INET6) |
| { |
| addr = addrs; |
| /* Get a text representation of the IP address */ |
| pg_getnameinfo_all((struct sockaddr_storage *) addr->ai_addr, addr->ai_addrlen, |
| hostinfo, sizeof(hostinfo), |
| NULL, 0, |
| NI_NUMERICHOST); |
| |
| if (use_cache) |
| { |
| /* Insert into our cache htab */ |
| e = (SegIpEntry *) hash_search(segment_ip_cache_htab, |
| name, HASH_ENTER, NULL); |
| memcpy(e->hostinfo, hostinfo, sizeof(hostinfo)); |
| } |
| } |
| #endif |
| |
| if (use_cache) |
| MemoryContextSwitchTo(oldContext); |
| |
| pg_freeaddrinfo_all(hint.ai_family, addrs); |
| } |
| |
| /* return a pointer to our cache. */ |
| if (use_cache) |
| return e->hostinfo; |
| |
| return pstrdup(hostinfo); |
| } |
| |
| /* |
| * getDnsAddress |
| * |
| * same as getDnsCachedAddress, but without using the cache. A non-cached |
| * version was used inline inside of cdbgang.c, and since it is needed now |
| * elsewhere, it is available externally. |
| */ |
| char * |
| getDnsAddress(char *hostname, int port, int elevel) |
| { |
| return getDnsCachedAddress(hostname, port, elevel, false); |
| } |
| |
| |
| /* |
| * Given a component-db in the system, find the addresses at which it |
| * can be reached, appropriately populate the argument-structure, and |
| * maintain the ip-lookup-cache. |
| */ |
| static void |
| getAddressesForDBid(GpSegConfigEntry *c, int elevel) |
| { |
| char *name; |
| |
| Assert(c != NULL); |
| |
| /* Use hostname */ |
| memset(c->hostaddrs, 0, COMPONENT_DBS_MAX_ADDRS * sizeof(char *)); |
| |
| #ifdef FAULT_INJECTOR |
| if (am_ftsprobe && |
| SIMPLE_FAULT_INJECTOR("get_dns_cached_address") == FaultInjectorTypeSkip) |
| { |
| /* inject a dns error for primary of segment 0 */ |
| if (c->segindex == 0 && |
| c->preferred_role == GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY) |
| { |
| c->address = pstrdup("dnserrordummyaddress"); |
| c->hostname = pstrdup("dnserrordummyaddress"); |
| } |
| } |
| #endif |
| |
| /* |
| * add an entry, using the first the "address" and then the "hostname" as |
| * fallback. |
| */ |
| name = getDnsCachedAddress(c->address, c->port, elevel, true); |
| |
| if (name) |
| { |
| c->hostaddrs[0] = pstrdup(name); |
| return; |
| } |
| |
| /* now the hostname. */ |
| name = getDnsCachedAddress(c->hostname, c->port, elevel, true); |
| if (name) |
| { |
| c->hostaddrs[0] = pstrdup(name); |
| } |
| else |
| { |
| c->hostaddrs[0] = NULL; |
| } |
| |
| return; |
| } |
| |
| /* |
| * hostPrimaryCountHashTableInit() |
| * Construct a hash table of HostPrimaryCountEntry |
| */ |
| static HTAB * |
| hostPrimaryCountHashTableInit(void) |
| { |
| HASHCTL info; |
| |
| /* Set key and entry sizes. */ |
| MemSet(&info, 0, sizeof(info)); |
| info.keysize = MAXHOSTNAMELEN; |
| info.entrysize = sizeof(HostPrimaryCountEntry); |
| |
| return hash_create("HostSegs", 32, &info, HASH_ELEM | HASH_STRINGS); |
| } |
| |
| /* |
| * Given total number of primary segment databases and a number of |
| * segments to "skip" - this routine creates a boolean map (array) the |
| * size of total number of segments and randomly selects several |
| * entries (total number of total_to_skip) to be marked as |
| * "skipped". This is used for external tables with the 'gpfdist' |
| * protocol where we want to get a number of *random* segdbs to |
| * connect to a gpfdist client. |
| * |
| * Caller of this function should pfree skip_map when done with it. |
| */ |
| bool * |
| makeRandomSegMap(int total_primaries, int total_to_skip) |
| { |
| int randint; /* some random int representing a seg */ |
| int skipped = 0; /* num segs already marked to be skipped */ |
| bool *skip_map; |
| |
| skip_map = (bool *) palloc(total_primaries * sizeof(bool)); |
| MemSet(skip_map, false, total_primaries * sizeof(bool)); |
| |
| while (total_to_skip != skipped) |
| { |
| /* |
| * create a random int between 0 and (total_primaries - 1). |
| */ |
| randint = cdbhashrandomseg(total_primaries); |
| |
| /* |
| * mark this random index 'true' in the skip map (marked to be |
| * skipped) unless it was already marked. |
| */ |
| if (skip_map[randint] == false) |
| { |
| skip_map[randint] = true; |
| skipped++; |
| } |
| } |
| |
| return skip_map; |
| } |
| |
| /* |
| * Determine the dbid for the master standby |
| */ |
| int16 |
| master_standby_dbid(void) |
| { |
| int16 dbid = 0; |
| HeapTuple tup; |
| Relation rel; |
| ScanKeyData scankey[2]; |
| SysScanDesc scan; |
| |
| /* |
| * Can only run on a master node, this restriction is due to the reliance |
| * on the gp_segment_configuration table. |
| */ |
| if (!IS_QUERY_DISPATCHER()) |
| elog(ERROR, "master_standby_dbid() executed on execution segment"); |
| |
| /* |
| * SELECT * FROM gp_segment_configuration WHERE content = -1 AND role = |
| * GP_SEGMENT_CONFIGURATION_ROLE_MIRROR |
| */ |
| rel = table_open(GpSegmentConfigRelationId, AccessShareLock); |
| ScanKeyInit(&scankey[0], |
| Anum_gp_segment_configuration_content, |
| BTEqualStrategyNumber, F_INT2EQ, |
| Int16GetDatum(-1)); |
| ScanKeyInit(&scankey[1], |
| Anum_gp_segment_configuration_role, |
| BTEqualStrategyNumber, F_CHAREQ, |
| CharGetDatum(GP_SEGMENT_CONFIGURATION_ROLE_MIRROR)); |
| /* no index */ |
| scan = systable_beginscan(rel, InvalidOid, false, |
| NULL, 2, scankey); |
| |
| tup = systable_getnext(scan); |
| |
| if (HeapTupleIsValid(tup)) |
| { |
| dbid = ((Form_gp_segment_configuration) GETSTRUCT(tup))->dbid; |
| /* We expect a single result, assert this */ |
| Assert(systable_getnext(scan) == NULL); |
| } |
| |
| systable_endscan(scan); |
| /* no need to hold the lock, it's a catalog */ |
| table_close(rel, AccessShareLock); |
| |
| return dbid; |
| } |
| |
| GpSegConfigEntry * |
| dbid_get_dbinfo(int16 dbid) |
| { |
| HeapTuple tuple; |
| Relation rel; |
| ScanKeyData scankey; |
| SysScanDesc scan; |
| GpSegConfigEntry *i = NULL; |
| |
| /* |
| * Can only run on a master node, this restriction is due to the reliance |
| * on the gp_segment_configuration table. This may be able to be relaxed |
| * by switching to a different method of checking. |
| */ |
| if (!IS_QUERY_DISPATCHER()) |
| elog(ERROR, "dbid_get_dbinfo() executed on execution segment"); |
| |
| rel = heap_open(GpSegmentConfigRelationId, AccessShareLock); |
| |
| /* SELECT * FROM gp_segment_configuration WHERE dbid = :1 */ |
| ScanKeyInit(&scankey, |
| Anum_gp_segment_configuration_dbid, |
| BTEqualStrategyNumber, F_INT2EQ, |
| Int16GetDatum(dbid)); |
| scan = systable_beginscan(rel, GpSegmentConfigDbidWarehouseIndexId, true, |
| NULL, 1, &scankey); |
| |
| while (HeapTupleIsValid(tuple = systable_getnext(scan))) |
| { |
| Datum attr; |
| bool isNull; |
| Oid warehouseid = InvalidOid; |
| |
| attr = heap_getattr(tuple, Anum_gp_segment_configuration_warehouseid, |
| RelationGetDescr(rel), &isNull); |
| Assert(!isNull); |
| warehouseid = DatumGetObjectId(attr); |
| |
| if (!OidIsValid(warehouseid) || warehouseid == GetCurrentWarehouseId()) |
| { |
| i = palloc(sizeof(GpSegConfigEntry)); |
| |
| /* |
| * dbid |
| */ |
| attr = heap_getattr(tuple, Anum_gp_segment_configuration_dbid, |
| RelationGetDescr(rel), &isNull); |
| Assert(!isNull); |
| i->dbid = DatumGetInt16(attr); |
| |
| /* |
| * content |
| */ |
| attr = heap_getattr(tuple, Anum_gp_segment_configuration_content, |
| RelationGetDescr(rel), &isNull); |
| Assert(!isNull); |
| i->segindex = DatumGetInt16(attr); |
| |
| /* |
| * role |
| */ |
| attr = heap_getattr(tuple, Anum_gp_segment_configuration_role, |
| RelationGetDescr(rel), &isNull); |
| Assert(!isNull); |
| i->role = DatumGetChar(attr); |
| |
| /* |
| * preferred-role |
| */ |
| attr = heap_getattr(tuple, |
| Anum_gp_segment_configuration_preferred_role, |
| RelationGetDescr(rel), &isNull); |
| Assert(!isNull); |
| i->preferred_role = DatumGetChar(attr); |
| |
| /* |
| * mode |
| */ |
| attr = heap_getattr(tuple, Anum_gp_segment_configuration_mode, |
| RelationGetDescr(rel), &isNull); |
| Assert(!isNull); |
| i->mode = DatumGetChar(attr); |
| |
| /* |
| * status |
| */ |
| attr = heap_getattr(tuple, Anum_gp_segment_configuration_status, |
| RelationGetDescr(rel), &isNull); |
| Assert(!isNull); |
| i->status = DatumGetChar(attr); |
| |
| /* |
| * hostname |
| */ |
| attr = heap_getattr(tuple, Anum_gp_segment_configuration_hostname, |
| RelationGetDescr(rel), &isNull); |
| Assert(!isNull); |
| i->hostname = TextDatumGetCString(attr); |
| |
| /* |
| * address |
| */ |
| attr = heap_getattr(tuple, Anum_gp_segment_configuration_address, |
| RelationGetDescr(rel), &isNull); |
| Assert(!isNull); |
| i->address = TextDatumGetCString(attr); |
| |
| /* |
| * port |
| */ |
| attr = heap_getattr(tuple, Anum_gp_segment_configuration_port, |
| RelationGetDescr(rel), &isNull); |
| Assert(!isNull); |
| i->port = DatumGetInt32(attr); |
| |
| break; |
| } |
| } |
| if (i == NULL) |
| { |
| elog(ERROR, "could not find configuration entry for dbid %i", dbid); |
| } |
| |
| systable_endscan(scan); |
| heap_close(rel, NoLock); |
| |
| return i; |
| } |
| |
| /* |
| * Obtain the dbid of a of a segment at a given segment index (i.e., content id) |
| * currently fulfilling the role specified. This means that the segment is |
| * really performing the role of primary or mirror, irrespective of their |
| * preferred role. |
| */ |
| int16 |
| contentid_get_dbid(int16 contentid, char role, bool getPreferredRoleNotCurrentRole) |
| { |
| int16 dbid = 0; |
| Relation rel; |
| ScanKeyData scankey[3]; |
| int nkeys = 2; |
| SysScanDesc scan; |
| HeapTuple tup; |
| |
| /* |
| * Can only run on a master node, this restriction is due to the reliance |
| * on the gp_segment_configuration table. This may be able to be relaxed |
| * by switching to a different method of checking. |
| */ |
| if (!IS_QUERY_DISPATCHER()) |
| elog(ERROR, "contentid_get_dbid() executed on execution segment"); |
| |
| rel = heap_open(GpSegmentConfigRelationId, AccessShareLock); |
| |
| /* XXX XXX: CHECK THIS XXX jic 2011/12/09 */ |
| if (getPreferredRoleNotCurrentRole) |
| { |
| /* |
| * SELECT * FROM gp_segment_configuration WHERE content = :1 AND |
| * preferred_role = :2 |
| */ |
| ScanKeyInit(&scankey[0], |
| Anum_gp_segment_configuration_content, |
| BTEqualStrategyNumber, F_INT2EQ, |
| Int16GetDatum(contentid)); |
| ScanKeyInit(&scankey[1], |
| Anum_gp_segment_configuration_preferred_role, |
| BTEqualStrategyNumber, F_CHAREQ, |
| CharGetDatum(role)); |
| if (contentid != MASTER_CONTENT_ID) |
| { |
| nkeys++; |
| ScanKeyInit(&scankey[2], |
| Anum_gp_segment_configuration_warehouseid, |
| BTEqualStrategyNumber, F_OIDEQ, |
| ObjectIdGetDatum(GetCurrentWarehouseId())); |
| } |
| |
| scan = systable_beginscan(rel, GpSegmentConfigContentPreferred_roleWarehouseIndexId, true, |
| NULL, nkeys, scankey); |
| } |
| else |
| { |
| /* |
| * SELECT * FROM gp_segment_configuration WHERE content = :1 AND role |
| * = :2 |
| */ |
| ScanKeyInit(&scankey[0], |
| Anum_gp_segment_configuration_content, |
| BTEqualStrategyNumber, F_INT2EQ, |
| Int16GetDatum(contentid)); |
| ScanKeyInit(&scankey[1], |
| Anum_gp_segment_configuration_role, |
| BTEqualStrategyNumber, F_CHAREQ, |
| CharGetDatum(role)); |
| if (contentid != MASTER_CONTENT_ID) |
| { |
| nkeys++; |
| ScanKeyInit(&scankey[2], |
| Anum_gp_segment_configuration_warehouseid, |
| BTEqualStrategyNumber, F_OIDEQ, |
| ObjectIdGetDatum(GetCurrentWarehouseId())); |
| } |
| /* no index */ |
| scan = systable_beginscan(rel, InvalidOid, false, |
| NULL, nkeys, scankey); |
| } |
| |
| tup = systable_getnext(scan); |
| if (HeapTupleIsValid(tup)) |
| { |
| dbid = ((Form_gp_segment_configuration) GETSTRUCT(tup))->dbid; |
| /* We expect a single result, assert this */ |
| Assert(systable_getnext(scan) == NULL); /* should be only 1 */ |
| } |
| |
| /* no need to hold the lock, it's a catalog */ |
| systable_endscan(scan); |
| heap_close(rel, AccessShareLock); |
| |
| return dbid; |
| } |
| |
| List * |
| cdbcomponent_getCdbComponentsList(void) |
| { |
| CdbComponentDatabases *cdbs; |
| List *segments = NIL; |
| int i; |
| |
| cdbs = cdbcomponent_getCdbComponents(); |
| |
| for (i = 0; i < cdbs->total_segments; i++) |
| { |
| segments = lappend_int(segments, i); |
| } |
| |
| return segments; |
| } |
| |
| |
| int16 |
| cdbcomponent_get_maxdbid(void) |
| { |
| Relation rel = table_open(GpSegmentConfigRelationId, AccessExclusiveLock); |
| int16 dbid = 0; |
| HeapTuple tuple; |
| SysScanDesc sscan; |
| |
| sscan = systable_beginscan(rel, InvalidOid, false, NULL, 0, NULL); |
| while ((tuple = systable_getnext(sscan)) != NULL) |
| { |
| dbid = Max(dbid, |
| ((Form_gp_segment_configuration) GETSTRUCT(tuple))->dbid); |
| } |
| systable_endscan(sscan); |
| table_close(rel, NoLock); |
| |
| return dbid; |
| } |
| |
| int16 |
| cdbcomponent_get_availableDbId(void) |
| { |
| /* |
| * Set up hash of used dbids. We use int32 here because int16 doesn't |
| * have a convenient hash and we can use casting below to check for |
| * overflow of int16 |
| */ |
| HASHCTL hash_ctl; |
| |
| memset(&hash_ctl, 0, sizeof(hash_ctl)); |
| hash_ctl.keysize = sizeof(int32); |
| hash_ctl.entrysize = sizeof(int32); |
| hash_ctl.hash = int32_hash; |
| HTAB *htab = hash_create("Temporary table of dbids", |
| 1024, |
| &hash_ctl, |
| HASH_ELEM | HASH_FUNCTION); |
| |
| /* scan GpSegmentConfigRelationId */ |
| Relation rel = heap_open(GpSegmentConfigRelationId, AccessExclusiveLock); |
| HeapTuple tuple; |
| SysScanDesc sscan; |
| |
| sscan = systable_beginscan(rel, InvalidOid, false, NULL, 0, NULL); |
| while ((tuple = systable_getnext(sscan)) != NULL) |
| { |
| int32 dbid = (int32) ((Form_gp_segment_configuration) GETSTRUCT(tuple))->dbid; |
| |
| (void) hash_search(htab, (void *) &dbid, HASH_ENTER, NULL); |
| } |
| systable_endscan(sscan); |
| |
| heap_close(rel, NoLock); |
| |
| /* search for available dbid */ |
| for (int32 dbid = 1;; dbid++) |
| { |
| if (dbid != (int16) dbid) |
| elog(ERROR, "unable to find available dbid"); |
| |
| if (hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL) |
| { |
| hash_destroy(htab); |
| return dbid; |
| } |
| } |
| } |
| |
| int16 |
| cdbcomponent_get_maxcontentid(void) |
| { |
| Relation rel = heap_open(GpSegmentConfigRelationId, AccessExclusiveLock); |
| int16 contentid = 0; |
| HeapTuple tuple; |
| SysScanDesc sscan; |
| |
| sscan = systable_beginscan(rel, InvalidOid, false, NULL, 0, NULL); |
| while ((tuple = systable_getnext(sscan)) != NULL) |
| { |
| contentid = Max(contentid, |
| ((Form_gp_segment_configuration) GETSTRUCT(tuple))->content); |
| } |
| systable_endscan(sscan); |
| heap_close(rel, NoLock); |
| |
| return contentid; |
| } |
| |
| /* |
| * return the number of total segments for current snapshot of |
| * segments info |
| */ |
| int |
| getgpsegmentCount(void) |
| { |
| /* 1 represents a singleton postgresql in utility mode */ |
| int32 numsegments = 1; |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| numsegments = cdbcomponent_getCdbComponents()->total_segments; |
| else if (Gp_role == GP_ROLE_EXECUTE) |
| numsegments = numsegmentsFromQD; |
| /* |
| * If we are in 'Utility & Binary Upgrade' mode, it must be launched |
| * by the pg_upgrade, so we give it an correct numsegments to make |
| * sure the pg_upgrade can run normally. |
| * Only Utility QD process have the entire information in the |
| * gp_segment_configuration, so we count the segments count in this |
| * process. |
| */ |
| else if (Gp_role == GP_ROLE_UTILITY && |
| IsBinaryUpgrade && |
| IS_QUERY_DISPATCHER()) |
| { |
| /* |
| * While upgrading in single node mode, we will get zero total |
| * segment count, which will cause assertion error in makeGpPolicy. |
| * |
| * Segment number should be set to a value not less than 1 anyway. |
| */ |
| numsegments = Max(cdbcomponent_getCdbComponents()->total_segments, 1); |
| } |
| |
| return numsegments; |
| } |
| |
| /* |
| * IsOnConflictUpdate |
| * Return true if a plannedstmt is an upsert: insert ... on conflict do update |
| */ |
| bool |
| IsOnConflictUpdate(PlannedStmt *ps) |
| { |
| Plan *plan; |
| |
| if (ps == NULL || ps->commandType != CMD_INSERT) |
| return false; |
| |
| plan = ps->planTree; |
| |
| if (plan && IsA(plan, Motion)) |
| plan = outerPlan(plan); |
| |
| if (plan == NULL || !IsA(plan, ModifyTable)) |
| return false; |
| |
| return ((ModifyTable *)plan)->onConflictAction == ONCONFLICT_UPDATE; |
| } |
| |
| /* |
| * Avoid core file generation for this PANIC. It helps to avoid |
| * filling up disks during tests and also saves time. |
| */ |
| void |
| AvoidCorefileGeneration() |
| { |
| #if defined(HAVE_GETRLIMIT) && defined(RLIMIT_CORE) |
| struct rlimit lim; |
| getrlimit(RLIMIT_CORE, &lim); |
| lim.rlim_cur = 0; |
| if (setrlimit(RLIMIT_CORE, &lim) != 0) |
| { |
| int save_errno = errno; |
| |
| elog(NOTICE, |
| "setrlimit failed for RLIMIT_CORE soft limit to zero. errno: %d (%m).", |
| save_errno); |
| } |
| #endif |
| } |
| |
| PG_FUNCTION_INFO_V1(gp_get_suboverflowed_backends); |
| /* |
| * Find the backends where subtransaction overflowed. |
| */ |
| Datum |
| gp_get_suboverflowed_backends(PG_FUNCTION_ARGS) |
| { |
| int i; |
| ArrayBuildState *astate = NULL; |
| |
| LWLockAcquire(ProcArrayLock, LW_SHARED); |
| for (i = 0; i < ProcGlobal->allProcCount; i++) |
| { |
| if (ProcGlobal->subxidStates[i].overflowed) |
| astate = accumArrayResult(astate, |
| Int32GetDatum(ProcGlobal->allProcs[i].pid), |
| false, INT4OID, CurrentMemoryContext); |
| } |
| LWLockRelease(ProcArrayLock); |
| |
| if (astate) |
| PG_RETURN_DATUM(makeArrayResult(astate, |
| CurrentMemoryContext)); |
| else |
| PG_RETURN_NULL(); |
| } |
| |
| #else |
| bool am_ftshandler = false; |
| |
| |
| typedef struct CdbConfigsCache { |
| int counts; |
| int buff_size; |
| |
| char buff[SEGMENT_CONFIGURATION_ONE_LINE_LENGTH * SEGMENT_CONFIGURATION_CACHE_COUNTS]; |
| } CdbConfigsCache; |
| |
| static CdbConfigsCache * cdb_config_cache = NULL; |
| |
| Size |
| ShmemSegmentConfigsCacheSize(void) |
| { |
| return sizeof(CdbConfigsCache); |
| } |
| |
| void |
| ShmemSegmentConfigsCacheAllocation(void) |
| { |
| Size size = ShmemSegmentConfigsCacheSize(); |
| |
| cdb_config_cache = (CdbConfigsCache *) ShmemAlloc(size); |
| /* Mark all slots as empty */ |
| MemSet(cdb_config_cache, 0, size); |
| } |
| |
| void |
| ShmemSegmentConfigsCacheReset(void) |
| { |
| LWLockAcquire(CdbConfigCacheLock, LW_EXCLUSIVE); |
| if (!isSegmentConfigsCacheEnable()) |
| { |
| LWLockRelease(CdbConfigCacheLock); |
| return; |
| } |
| |
| MemSet(cdb_config_cache, 0, ShmemSegmentConfigsCacheSize()); |
| LWLockRelease(CdbConfigCacheLock); |
| } |
| |
| void |
| ShmemSegmentConfigsCacheForceFlush(void) |
| { |
| int total_dbs = 0; |
| |
| if (!isSegmentConfigsCacheEnable()) |
| return; |
| |
| ShmemSegmentConfigsCacheReset(); |
| GpSegConfigEntry * configs = readGpSegConfigFromETCDAllowNull(&total_dbs); |
| elog(DEBUG1, "Segment configs cache have been force flushed, total dbs=%d", total_dbs); |
| cleanGpSegConfigs(configs, total_dbs); |
| } |
| |
| static int |
| dumpSegConfigInfo(GpSegConfigEntry * config, char * buf) |
| { |
| return sprintf(buf, "%d %d %c %c %c %c %d %s %s %s\n", config->dbid, |
| config->segindex, config->role, config->preferred_role, |
| config->mode, config->status, config->port, |
| config->hostname, config->address, config->datadir); |
| } |
| |
| bool |
| isSegmentConfigsCacheEnable(void) |
| { |
| return cdb_config_cache && gp_etcd_enable_cache; |
| } |
| |
| bool |
| isSegmentConfigsCached(void) |
| { |
| return cdb_config_cache && \ |
| gp_etcd_enable_cache && \ |
| cdb_config_cache->counts != 0 && \ |
| cdb_config_cache->buff_size != 0; |
| } |
| |
| char * |
| readSegmentConfigsCache(int *total_dbs) |
| { |
| char *buff = NULL; |
| |
| LWLockAcquire(CdbConfigCacheLock, LW_EXCLUSIVE); |
| if (isSegmentConfigsCached()) |
| { |
| /* Can't use pstrdup here */ |
| buff = palloc(cdb_config_cache->buff_size + 1); |
| memcpy(buff, cdb_config_cache->buff, cdb_config_cache->buff_size); |
| *total_dbs = cdb_config_cache->counts; |
| } |
| else |
| *total_dbs = 0; |
| |
| LWLockRelease(CdbConfigCacheLock); |
| return buff; |
| } |
| |
| void |
| writeSegmentConfigsCacheBuff(char * config_buff, int buff_length, int total_dbs) |
| { |
| LWLockAcquire(CdbConfigCacheLock, LW_EXCLUSIVE); |
| if (!isSegmentConfigsCacheEnable()) |
| { |
| LWLockRelease(CdbConfigCacheLock); |
| return; |
| } |
| |
| /* FTS_TODO: support for cache expansion */ |
| if (buff_length >= SEGMENT_CONFIGURATION_ONE_LINE_LENGTH * SEGMENT_CONFIGURATION_CACHE_COUNTS) |
| { |
| gp_etcd_enable_cache = false; |
| LWLockRelease(CdbConfigCacheLock); |
| return; |
| } |
| |
| memcpy(cdb_config_cache->buff, config_buff, buff_length); |
| cdb_config_cache->buff_size = buff_length; |
| cdb_config_cache->counts = total_dbs; |
| LWLockRelease(CdbConfigCacheLock); |
| } |
| |
| /* |
| * Helper Functions |
| */ |
| static CdbComponentDatabases *getCdbComponentInfo(void); |
| static void cleanupComponentIdleQEs(CdbComponentDatabaseInfo *cdi, bool includeWriter); |
| |
| static int CdbComponentDatabaseInfoCompare(const void *p1, const void *p2); |
| |
| static void getAddressesForDBid(GpSegConfigEntry *c, int elevel); |
| static HTAB *hostSegsHashTableInit(void); |
| |
| static int nextQEIdentifer(CdbComponentDatabases *cdbs); |
| |
| static HTAB *segment_ip_cache_htab = NULL; |
| |
| int numsegmentsFromQD = -1; |
| |
| typedef struct SegIpEntry |
| { |
| char key[NAMEDATALEN]; |
| char hostinfo[NI_MAXHOST]; |
| } SegIpEntry; |
| |
| typedef struct HostSegsEntry |
| { |
| char hostip[INET6_ADDRSTRLEN]; |
| int segmentCount; |
| } HostSegsEntry; |
| |
| /* Fixed GUC */ |
| bool gp_etcd_enable_cache = true; |
| static etcdlib_t * static_etcdlib = NULL; |
| char *fts_dump_file_key = NULL; |
| char *fts_standby_promote_ready_key = NULL; |
| char *gp_etcd_account_id = NULL; |
| char *gp_etcd_cluster_id = NULL; |
| char *gp_etcd_namespace = NULL; |
| char *gp_etcd_endpoints = NULL; |
| |
| static etcdlib_endpoint_t etcd_endpoints[GP_ETCD_ENDPOINTS_NUM] = {0}; |
| static int etcd_endpoints_num = 0; |
| |
| static bool |
| etcdlib_init_config(etcdlib_endpoint_t *petcd_endpoints) |
| { |
| bool ret = false; |
| fts_dump_file_key = (char *)palloc0(sizeof(char)*GP_ETCD_KEY_LEN); |
| ret = generateGPSegConfigKey(fts_dump_file_key, gp_etcd_namespace, gp_etcd_account_id, gp_etcd_cluster_id, |
| gp_etcd_endpoints, petcd_endpoints, &etcd_endpoints_num); |
| if (!ret) |
| { |
| elog(ERROR, "failed to init ETCD config for fts dump key: %s", fts_dump_file_key); |
| return false; |
| } |
| |
| fts_standby_promote_ready_key = (char *)palloc0(sizeof(char)*GP_ETCD_KEY_LEN); |
| generateGPFtsPromoteReadyKey(fts_standby_promote_ready_key, gp_etcd_namespace, gp_etcd_account_id, gp_etcd_cluster_id); |
| if (fts_standby_promote_ready_key[0] == '\0') |
| { |
| elog(ERROR, "failed to init ETCD config for fts standby promote key: %s", fts_standby_promote_ready_key); |
| return false; |
| } |
| elog(LOG, "etcdlib_init_config successfully initialized with fts_dump_file_key: %s, fts_standby_promote_ready_key:%s", |
| fts_dump_file_key, fts_standby_promote_ready_key); |
| return true; |
| } |
| |
| static etcdlib_t * |
| get_etcdlib(void) |
| { |
| /* No need lock here |
| * Cause etcdlib will be inited after postmaster start |
| */ |
| if (unlikely(static_etcdlib == NULL)) |
| { |
| bool isEtcdKeyConfigured = false; |
| MemoryContext oldContext; |
| oldContext = MemoryContextSwitchTo(TopMemoryContext); |
| etcdlib_endpoint_t *petcd_endpoints = etcd_endpoints; |
| isEtcdKeyConfigured = etcdlib_init_config(petcd_endpoints); |
| Assert(isEtcdKeyConfigured != false); |
| static_etcdlib = etcdlib_create(petcd_endpoints, etcd_endpoints_num, 0); |
| if (!static_etcdlib) |
| ereport(ERROR, errmsg("Cannot connect to ETCD. [gp_etcd_host=%s, gp_etcd_port=%d]", static_etcdlib->host, static_etcdlib->port)); |
| MemoryContextSwitchTo(oldContext); |
| } |
| |
| return static_etcdlib; |
| } |
| |
| bool |
| setStandbyPromoteReady(bool standby_promote_ready) |
| { |
| etcdlib_t *etcdlib = NULL; |
| int rc = 0; |
| |
| etcdlib = get_etcdlib(); |
| |
| rc = etcdlib_set(etcdlib, fts_standby_promote_ready_key, |
| standby_promote_ready ? FTS_STANDBY_PROMOTE_READY : FTS_STANDBY_PROMOTE_NO_READY |
| , 0, false); |
| if (rc != 0) |
| { |
| elog(ERROR, |
| "fail to write standby_promote_ready info into ETCD. rc=%d", rc); |
| return false; |
| } |
| return true; |
| } |
| |
| bool |
| getStandbyPromoteReady(bool *standby_promote_ready) |
| { |
| etcdlib_t *etcdlib = NULL; |
| int rc = 0; |
| char *buff = NULL; |
| |
| etcdlib = get_etcdlib(); |
| |
| rc = etcdlib_get(etcdlib, fts_standby_promote_ready_key, &buff, NULL); |
| if (rc != 0 || !buff) |
| { |
| if (buff) |
| pfree(buff); |
| elog(ERROR, |
| "fail to read standby_promote_ready info into ETCD. rc=%d", rc); |
| return false; |
| } |
| |
| rc = strncmp(buff, FTS_STANDBY_PROMOTE_READY, FTS_STANDBY_PROMOTE_LEN); |
| |
| if (strlen(buff) != FTS_STANDBY_PROMOTE_LEN || |
| (rc != 0 |
| && strncmp(buff, FTS_STANDBY_PROMOTE_NO_READY, FTS_STANDBY_PROMOTE_LEN) != 0)) |
| { |
| pfree(buff); |
| elog(ERROR, "invalid standby_promote_ready in ETCD. value=%s", buff); |
| return false; |
| } |
| |
| *standby_promote_ready = rc == 0; |
| |
| pfree(buff); |
| return true; |
| } |
| |
| GpSegConfigEntry * |
| readGpSegConfigFromETCDAllowNull(int *total_dbs) |
| { |
| return readGpSegConfigFromETCD(total_dbs, true); |
| } |
| |
| GpSegConfigEntry * |
| readGpSegConfig(char * buff, int *total_dbs) |
| { |
| int idx = 0; |
| int array_size = SEGMENT_CONFIGURATION_ALLOC_COUNTS; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| int rc; |
| |
| char hostname[MAXHOSTNAMELEN]; |
| char address[MAXHOSTNAMELEN]; |
| char datadir[4096]; |
| |
| if (!buff) |
| ereport(ERROR, errmsg("invalid buffer which is NULL")); |
| |
| configs = palloc0(sizeof (GpSegConfigEntry) * array_size); |
| |
| char * ptr = strtok(buff, "\n"); |
| while (ptr) |
| { |
| |
| config = &configs[idx]; |
| |
| rc = sscanf(ptr, "%d %d %c %c %c %c %d %s %s %s", (int *)&config->dbid, (int *)&config->segindex, |
| &config->role, &config->preferred_role, &config->mode, &config->status, |
| &config->port, hostname, address, datadir); |
| |
| if (rc != GPSEGCONFIGNUMATTR) |
| { |
| cleanGpSegConfigs(configs, idx); |
| ereport(ERROR, |
| errmsg("cannot decode config info from ETCD. sscanf rc=%d", rc)); |
| } |
| |
| config->hostname = pstrdup(hostname); |
| config->address = pstrdup(address); |
| config->datadir = pstrdup(datadir); |
| |
| idx++; |
| |
| if (idx >= array_size) |
| { |
| array_size = array_size * 2; |
| configs = (GpSegConfigEntry *) |
| repalloc(configs, sizeof(GpSegConfigEntry) * array_size); |
| size_t expand_size = (sizeof(GpSegConfigEntry) * array_size / 2); |
| memset(configs + expand_size, 0, expand_size); |
| } |
| |
| ptr = strtok(NULL, "\n"); |
| } |
| |
| if (total_dbs) |
| *total_dbs = idx; |
| |
| return configs; |
| } |
| |
| GpSegConfigEntry * |
| readGpSegConfigFromETCD(int *total_dbs, bool allow_null) |
| { |
| char *buff = NULL; |
| char *buff_cpy = NULL; |
| int rc = 0; |
| GpSegConfigEntry *configs = NULL; |
| etcdlib_t *etcdlib = NULL; |
| bool cache_hit = false; |
| int total_dbs_parsed = 0; |
| |
| buff = readSegmentConfigsCache(total_dbs); |
| if (!buff) |
| { |
| etcdlib = get_etcdlib(); |
| rc = etcdlib_get(etcdlib, fts_dump_file_key, &buff, NULL); |
| if (rc != 0) |
| { |
| if (buff == NULL && allow_null) |
| { |
| *total_dbs = 0; |
| return configs; |
| } |
| else |
| ereport(ERROR, |
| errmsg("fail to get fts info from ETCD. rc=%d", rc)); |
| } |
| } |
| else |
| cache_hit = true; |
| |
| if (cache_hit) |
| { |
| configs = readGpSegConfig(buff, &total_dbs_parsed); |
| if (total_dbs_parsed != *total_dbs) |
| { |
| cleanGpSegConfigs(configs, total_dbs_parsed); |
| pfree(buff); |
| ereport(ERROR, |
| errmsg("segment configuration cache got some error. total_dbs_parsed=%d, cached_total_dbs=%d", total_dbs_parsed, *total_dbs)); |
| } |
| } |
| else |
| { |
| buff_cpy = pstrdup(buff); |
| configs = readGpSegConfig(buff_cpy, total_dbs); |
| writeSegmentConfigsCacheBuff(buff, strlen(buff) + 1, *total_dbs); |
| pfree(buff_cpy); |
| } |
| |
| pfree(buff); |
| return configs; |
| } |
| |
| void |
| cleanGpSegConfigs(GpSegConfigEntry *configs, int total_dbs) |
| { |
| if (configs) |
| { |
| GpSegConfigEntry *config; |
| for (int i = 0; i < total_dbs; i++) |
| { |
| config = &configs[i]; |
| if (config->hostname) |
| pfree(config->hostname); |
| if (config->address) |
| pfree(config->address); |
| if (config->datadir) |
| pfree(config->datadir); |
| } |
| |
| pfree(configs); |
| configs = NULL; |
| } |
| } |
| |
| static |
| void copyGpsegConfig(GpSegConfigEntry *src, GpSegConfigEntry *dst) |
| { |
| Assert(src); |
| Assert(dst); |
| |
| dst->dbid = src->dbid; |
| dst->segindex = src->segindex; |
| dst->role = src->role; |
| dst->preferred_role = src->preferred_role; |
| dst->mode = src->mode; |
| dst->status = src->status; |
| dst->port = src->port; |
| dst->hostname = pstrdup(src->hostname); |
| dst->address = pstrdup(src->address); |
| dst->datadir = pstrdup(src->datadir); |
| } |
| |
| |
| /* |
| * Master function for adding a new segment |
| */ |
| void |
| addSegment(GpSegConfigEntry *new_segment_information) |
| { |
| int dump_bytes = 0; |
| int rc = 0; |
| char* buff = NULL; |
| char* buff_ptr = NULL; |
| GpSegConfigEntry* configs = NULL; |
| GpSegConfigEntry* config = NULL; |
| int total_dbs; |
| Relation rel; |
| etcdlib_t *etcdlib; |
| |
| etcdlib = get_etcdlib(); |
| rel = heap_open(RelnameGetRelid(GpSegmentConfigRelationName), AccessShareLock); |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, true); |
| |
| buff = palloc0(SEGMENT_CONFIGURATION_ONE_LINE_LENGTH * (total_dbs + 1)); |
| buff_ptr = buff; |
| |
| for (int index = 0; index < total_dbs; index++) |
| { |
| config = &configs[index]; |
| if (config->dbid == new_segment_information->dbid) |
| { |
| cleanGpSegConfigs(configs, total_dbs); |
| pfree(buff); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("already exist dbid=%d. should not add it twice.", new_segment_information->dbid)); |
| } |
| |
| dump_bytes = dumpSegConfigInfo(config, buff_ptr); |
| if (dump_bytes <= 0) |
| { |
| cleanGpSegConfigs(configs, total_dbs); |
| pfree(buff); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to dump data info to string, dbid=%d, content=%d", |
| config->dbid, config->segindex)); |
| } |
| buff_ptr = buff_ptr + dump_bytes; |
| } |
| |
| cleanGpSegConfigs(configs, total_dbs); |
| |
| dump_bytes = dumpSegConfigInfo(new_segment_information, buff_ptr); |
| if (dump_bytes <= 0) |
| { |
| pfree(buff); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to dump new add segment info to string, dbid=%d, content=%d", |
| new_segment_information->dbid, new_segment_information->segindex)); |
| } |
| |
| rc = etcdlib_set(etcdlib, fts_dump_file_key, buff, 0, false); |
| if (rc != 0) |
| { |
| pfree(buff); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to write fts info into ETCD. rc=%d", rc)); |
| } |
| |
| writeSegmentConfigsCacheBuff(buff, strlen(buff) + 1, total_dbs + 1); |
| pfree(buff); |
| heap_close(rel, NoLock); |
| } |
| |
| /* |
| * Master function to remove a segment from all catalogs |
| */ |
| void |
| delSegment(int16 dbid) |
| { |
| int rc = 0; |
| int dump_bytes = 0; |
| char *rewrite_buf = NULL; |
| char *rewrite_ptr = NULL; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| int total_dbs = 0; |
| int index = 0; |
| int index_rm = -1; |
| Relation rel; |
| etcdlib_t *etcdlib; |
| |
| etcdlib = get_etcdlib(); |
| rel = heap_open(RelnameGetRelid(GpSegmentConfigRelationName), AccessShareLock); |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, false); |
| |
| rewrite_buf = palloc0(SEGMENT_CONFIGURATION_ONE_LINE_LENGTH * (total_dbs - 1)); |
| rewrite_ptr = rewrite_buf; |
| for (index = 0; index < total_dbs; index++) |
| { |
| config = &configs[index]; |
| if (config->dbid == dbid) |
| index_rm = index; |
| else |
| { |
| dump_bytes = dumpSegConfigInfo(config, rewrite_ptr); |
| if (dump_bytes <= 0) |
| { |
| cleanGpSegConfigs(configs, total_dbs); |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to dump data info to string, dbid=%d, content=%d", |
| config->dbid, config->segindex)); |
| } |
| |
| rewrite_ptr = rewrite_ptr + dump_bytes; |
| } |
| } |
| |
| cleanGpSegConfigs(configs, total_dbs); |
| |
| /* segment not found */ |
| if (index_rm == -1) |
| { |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to find [dbid=%d] in FTS_DUMP_FILE_KEY", dbid)); |
| } |
| |
| rc = etcdlib_set(etcdlib, fts_dump_file_key, rewrite_buf, 0, false); |
| if (rc != 0) |
| { |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to write fts info into ETCD. rc=%d", rc)); |
| } |
| |
| writeSegmentConfigsCacheBuff(rewrite_buf, strlen(rewrite_buf) + 1, total_dbs - 1); |
| |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| } |
| |
| void |
| cleanSegments(void) |
| { |
| Relation rel; |
| etcdlib_t *etcdlib; |
| |
| etcdlib = get_etcdlib(); |
| rel = heap_open(RelnameGetRelid(GpSegmentConfigRelationName), AccessShareLock); |
| |
| etcdlib_del(etcdlib, fts_dump_file_key); |
| ShmemSegmentConfigsCacheReset(); |
| |
| heap_close(rel, NoLock); |
| } |
| |
| void |
| rewriteSegments(char *rewrite_string, bool allow_diff_db_counts) |
| { |
| int rc; |
| etcdlib_t *etcdlib = NULL; |
| GpSegConfigEntry *configs; |
| GpSegConfigEntry *tmp_configs; |
| int total_dbs; |
| int total_dbs_in_etcd; |
| char *rewrite_string_cpy; |
| Relation rel; |
| |
| etcdlib = get_etcdlib(); |
| rel = heap_open(RelnameGetRelid(GpSegmentConfigRelationName), AccessShareLock); |
| |
| rewrite_string_cpy = pstrdup(rewrite_string); |
| configs = readGpSegConfig(rewrite_string_cpy, &total_dbs); |
| |
| if (!allow_diff_db_counts) |
| { |
| tmp_configs = readGpSegConfigFromETCDAllowNull(&total_dbs_in_etcd); |
| cleanGpSegConfigs(tmp_configs, total_dbs_in_etcd); |
| |
| /* ff current segment counts not match the FTS results |
| * it may means that during `ftsprobe`, user additionally operate some tools(like gpinitstanby) |
| * which will make segment counts change. If force to override segment info here, |
| * may cause loss of some segment record. |
| */ |
| if (total_dbs_in_etcd != total_dbs) |
| { |
| pfree(rewrite_string_cpy); |
| heap_close(rel, NoLock); |
| elog(WARNING, "Ignored update from fts, Current segments counts changed."); |
| return; |
| } |
| } |
| |
| if (!configs || total_dbs == 0) |
| { |
| pfree(rewrite_string_cpy); |
| heap_close(rel, NoLock); |
| elog(WARNING, "fail to parse rewrite segment infos: \"%s\"", rewrite_string); |
| return; |
| } |
| |
| cleanGpSegConfigs(configs, total_dbs); |
| pfree(rewrite_string_cpy); |
| |
| rc = etcdlib_set(etcdlib, fts_dump_file_key, rewrite_string, 0, false); |
| if (rc != 0) |
| { |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to write fts info from ETCD. rc=%d", rc)); |
| } |
| |
| writeSegmentConfigsCacheBuff(rewrite_string, strlen(rewrite_string) + 1, total_dbs); |
| |
| heap_close(rel, NoLock); |
| } |
| |
| void |
| updateSegmentModeStatus(int16 dbid, char mode, char status) |
| { |
| int rc = 0; |
| int dump_bytes = 0; |
| etcdlib_t *etcdlib = NULL; |
| Relation rel; |
| int total_dbs = 0; |
| char *rewrite_buf = NULL; |
| char *rewrite_ptr = NULL; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| int index; |
| bool found = false; |
| |
| etcdlib = get_etcdlib(); |
| rel = heap_open(RelnameGetRelid(GpSegmentConfigRelationName), AccessShareLock); |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, false); |
| |
| rewrite_buf = palloc0(SEGMENT_CONFIGURATION_ONE_LINE_LENGTH * (total_dbs - 1)); |
| rewrite_ptr = rewrite_buf; |
| for (index = 0; index < total_dbs; index++) |
| { |
| config = &configs[index]; |
| if (config->dbid == dbid) |
| { |
| found = true; |
| configs->mode = mode; |
| configs->status = status; |
| } |
| |
| dump_bytes = dumpSegConfigInfo(config, rewrite_ptr); |
| if (dump_bytes <= 0) |
| { |
| cleanGpSegConfigs(configs, total_dbs); |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to dump data info to string, dbid=%d, content=%d", |
| config->dbid, config->segindex)); |
| } |
| |
| rewrite_ptr = rewrite_ptr + dump_bytes; |
| } |
| cleanGpSegConfigs(configs, total_dbs); |
| |
| if (!found) |
| { |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to find [dbid=%d] in FTS_DUMP_FILE_KEY", dbid)); |
| } |
| |
| rc = etcdlib_set(etcdlib, fts_dump_file_key, rewrite_buf, 0, false); |
| if (rc != 0) |
| { |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to write fts info into ETCD. rc=%d", rc)); |
| } |
| |
| writeSegmentConfigsCacheBuff(rewrite_buf, strlen(rewrite_buf) + 1, total_dbs); |
| |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| } |
| |
| void |
| activateStandby(int16 standby_dbid, int16 primary_dbid) |
| { |
| int rc = 0; |
| int dump_bytes = 0; |
| char *rewrite_buf = NULL; |
| char *rewrite_ptr = NULL; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| int total_dbs; |
| int index; |
| int index_rm = -1; |
| bool found_standby = false; |
| etcdlib_t *etcdlib; |
| Relation rel; |
| |
| etcdlib = get_etcdlib(); |
| rel = heap_open(RelnameGetRelid(GpSegmentConfigRelationName), AccessShareLock); |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, false); |
| |
| rewrite_buf = palloc0(SEGMENT_CONFIGURATION_ONE_LINE_LENGTH * (total_dbs - 1)); |
| rewrite_ptr = rewrite_buf; |
| for (index = 0; index < total_dbs; index++) |
| { |
| config = &configs[index]; |
| |
| if (config->dbid == primary_dbid) |
| { |
| index_rm = index; |
| continue; |
| } |
| |
| if (config->dbid == standby_dbid) |
| { |
| found_standby = true; |
| config->role = GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY; |
| config->preferred_role = GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY; |
| } |
| |
| dump_bytes = dumpSegConfigInfo(config, rewrite_ptr); |
| if (dump_bytes <= 0) |
| { |
| cleanGpSegConfigs(configs, total_dbs); |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to dump data info to string, dbid=%d, content=%d", |
| config->dbid, config->segindex)); |
| } |
| |
| rewrite_ptr = rewrite_ptr + dump_bytes; |
| } |
| |
| cleanGpSegConfigs(configs, total_dbs); |
| |
| if (index_rm == -1 || !found_standby) |
| { |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to find primary_dbid=%d or standby_dbid=%d in FTS_DUMP_FILE_KEY", |
| primary_dbid,standby_dbid)); |
| } |
| |
| rc = etcdlib_set(etcdlib, fts_dump_file_key, rewrite_buf, 0, false); |
| if (rc != 0) |
| { |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| ereport(ERROR, |
| errmsg("fail to write fts info into ETCD. rc=%d", rc)); |
| } |
| |
| writeSegmentConfigsCacheBuff(rewrite_buf, strlen(rewrite_buf) + 1, total_dbs - 1); |
| |
| pfree(rewrite_buf); |
| heap_close(rel, NoLock); |
| } |
| |
| /* |
| * Internal function to initialize each component info |
| */ |
| static CdbComponentDatabases * |
| getCdbComponentInfo(void) |
| { |
| MemoryContext oldContext; |
| CdbComponentDatabaseInfo *cdbInfo; |
| CdbComponentDatabases *component_databases = NULL; |
| GpSegConfigEntry *configs; |
| |
| int i; |
| int x = 0; |
| int total_dbs = 0; |
| |
| bool found; |
| HostSegsEntry *hsEntry; |
| |
| if (!CdbComponentsContext) |
| CdbComponentsContext = AllocSetContextCreate(TopMemoryContext, "cdb components Context", |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE); |
| |
| oldContext = MemoryContextSwitchTo(CdbComponentsContext); |
| |
| HTAB *hostSegsHash = hostSegsHashTableInit(); |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, false); |
| component_databases = palloc0(sizeof(CdbComponentDatabases)); |
| |
| component_databases->numActiveQEs = 0; |
| component_databases->numIdleQEs = 0; |
| component_databases->qeCounter = 0; |
| component_databases->freeCounterList = NIL; |
| |
| component_databases->segment_db_info = |
| (CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * total_dbs); |
| |
| component_databases->entry_db_info = |
| (CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * 2); |
| |
| for (i = 0; i < total_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *pRow; |
| GpSegConfigEntry *config = &configs[i]; |
| |
| /* lookup hostip/hostaddrs cache */ |
| config->hostip= NULL; |
| getAddressesForDBid(config, ERROR); |
| |
| /* |
| * We make sure we get a valid hostip for primary here, |
| * if hostip for mirrors can not be get, ignore the error. |
| */ |
| if (config->hostaddrs[0] == NULL && |
| config->role == GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY) |
| ereport(ERROR, |
| (errcode(ERRCODE_CONNECTION_FAILURE), |
| errmsg("cannot resolve network address for dbid=%d", config->dbid))); |
| |
| if (config->hostaddrs[0] != NULL) |
| config->hostip = pstrdup(config->hostaddrs[0]); |
| AssertImply(config->hostip, strlen(config->hostip) <= INET6_ADDRSTRLEN); |
| |
| /* |
| * Determine which array to place this rows data in: entry or segment, |
| * based on the content field. |
| */ |
| if (config->segindex >= 0) |
| { |
| pRow = &component_databases->segment_db_info[component_databases->total_segment_dbs]; |
| component_databases->total_segment_dbs++; |
| } |
| else |
| { |
| pRow = &component_databases->entry_db_info[component_databases->total_entry_dbs]; |
| component_databases->total_entry_dbs++; |
| } |
| |
| pRow->cdbs = component_databases; |
| pRow->config = config; |
| pRow->freelist = NIL; |
| pRow->numIdleQEs = 0; |
| pRow->numActiveQEs = 0; |
| |
| if (config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY || config->hostip == NULL) |
| continue; |
| |
| hsEntry = (HostSegsEntry *) hash_search(hostSegsHash, config->hostip, HASH_ENTER, &found); |
| if (found) |
| hsEntry->segmentCount++; |
| else |
| hsEntry->segmentCount = 1; |
| } |
| |
| /* |
| * Validate that there exists at least one entry and one segment database |
| * in the configuration |
| */ |
| if (component_databases->total_segment_dbs == 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_CARDINALITY_VIOLATION), |
| errmsg("number of segment databases cannot be 0"))); |
| } |
| if (component_databases->total_entry_dbs == 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_CARDINALITY_VIOLATION), |
| errmsg("number of entry databases cannot be 0"))); |
| } |
| |
| /* |
| * Now sort the data by segindex, isprimary desc |
| */ |
| qsort(component_databases->segment_db_info, |
| component_databases->total_segment_dbs, sizeof(CdbComponentDatabaseInfo), |
| CdbComponentDatabaseInfoCompare); |
| |
| qsort(component_databases->entry_db_info, |
| component_databases->total_entry_dbs, sizeof(CdbComponentDatabaseInfo), |
| CdbComponentDatabaseInfoCompare); |
| |
| /* |
| * Now count the number of distinct segindexes. Since it's sorted, this is |
| * easy. |
| */ |
| for (i = 0; i < component_databases->total_segment_dbs; i++) |
| { |
| if (i == 0 || |
| (component_databases->segment_db_info[i].config->segindex != component_databases->segment_db_info[i - 1].config->segindex)) |
| { |
| component_databases->total_segments++; |
| } |
| } |
| |
| /* |
| * Now validate that our identity is present in the entry databases |
| */ |
| for (i = 0; i < component_databases->total_entry_dbs; i++) |
| { |
| cdbInfo = &component_databases->entry_db_info[i]; |
| |
| if (cdbInfo->config->dbid == GpIdentity.dbid && cdbInfo->config->segindex == GpIdentity.segindex) |
| { |
| break; |
| } |
| } |
| if (i == component_databases->total_entry_dbs) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_DATA_EXCEPTION), |
| errmsg("cannot locate entry database"), |
| errdetail("entry database represented by this db in gp_segment_configuration: dbid %d content %d", |
| GpIdentity.dbid, GpIdentity.segindex))); |
| } |
| |
| /* |
| * Now validate that the segindexes for the segment databases are between |
| * 0 and (numsegments - 1) inclusive, and that we hit them all. |
| * Since it's sorted, this is relatively easy. |
| */ |
| x = 0; |
| for (i = 0; i < component_databases->total_segments; i++) |
| { |
| int this_segindex = -1; |
| |
| while (x < component_databases->total_segment_dbs) |
| { |
| this_segindex = component_databases->segment_db_info[x].config->segindex; |
| if (this_segindex < i) |
| x++; |
| else if (this_segindex == i) |
| break; |
| else if (this_segindex > i) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_DATA_EXCEPTION), |
| errmsg("content values not valid in %s table", |
| GpSegmentConfigRelationName), |
| errdetail("content values must be in the range 0 to %d inclusive.", |
| component_databases->total_segments - 1))); |
| } |
| } |
| if (this_segindex != i) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_DATA_EXCEPTION), |
| errmsg("content values not valid in %s table", |
| GpSegmentConfigRelationName), |
| errdetail("content values must be in the range 0 to %d inclusive", |
| component_databases->total_segments - 1))); |
| } |
| } |
| |
| for (i = 0; i < component_databases->total_segment_dbs; i++) |
| { |
| cdbInfo = &component_databases->segment_db_info[i]; |
| |
| if (cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY || cdbInfo->config->hostip == NULL) |
| continue; |
| |
| hsEntry = (HostSegsEntry *) hash_search(hostSegsHash, cdbInfo->config->hostip, HASH_FIND, &found); |
| Assert(found); |
| cdbInfo->hostPrimaryCount = hsEntry->segmentCount; |
| } |
| |
| for (i = 0; i < component_databases->total_entry_dbs; i++) |
| { |
| cdbInfo = &component_databases->entry_db_info[i]; |
| |
| if (cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY || cdbInfo->config->hostip == NULL) |
| continue; |
| |
| hsEntry = (HostSegsEntry *) hash_search(hostSegsHash, cdbInfo->config->hostip, HASH_FIND, &found); |
| Assert(found); |
| cdbInfo->hostPrimaryCount = hsEntry->segmentCount; |
| } |
| |
| hash_destroy(hostSegsHash); |
| |
| MemoryContextSwitchTo(oldContext); |
| |
| return component_databases; |
| } |
| |
| /* |
| * Helper function to clean up the idle segdbs list of |
| * a segment component. |
| */ |
| static void |
| cleanupComponentIdleQEs(CdbComponentDatabaseInfo *cdi, bool includeWriter) |
| { |
| SegmentDatabaseDescriptor *segdbDesc; |
| MemoryContext oldContext; |
| ListCell *curItem; |
| |
| Assert(CdbComponentsContext); |
| oldContext = MemoryContextSwitchTo(CdbComponentsContext); |
| curItem = list_head(cdi->freelist); |
| |
| foreach(curItem, cdi->freelist) |
| { |
| segdbDesc = (SegmentDatabaseDescriptor *)lfirst(curItem); |
| Assert(segdbDesc); |
| |
| if (segdbDesc->isWriter && !includeWriter) |
| continue; |
| |
| cdi->freelist = foreach_delete_current(cdi->freelist, curItem); |
| DECR_COUNT(cdi, numIdleQEs); |
| |
| cdbconn_termSegmentDescriptor(segdbDesc); |
| } |
| |
| MemoryContextSwitchTo(oldContext); |
| } |
| |
| void |
| cdbcomponent_cleanupIdleQEs(bool includeWriter) |
| { |
| CdbComponentDatabases *cdbs; |
| int i; |
| |
| /* use cdb_component_dbs directly */ |
| cdbs = cdb_component_dbs; |
| |
| if (cdbs == NULL) |
| return; |
| |
| if (cdbs->segment_db_info != NULL) |
| { |
| for (i = 0; i < cdbs->total_segment_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *cdi = &cdbs->segment_db_info[i]; |
| cleanupComponentIdleQEs(cdi, includeWriter); |
| } |
| } |
| |
| if (cdbs->entry_db_info != NULL) |
| { |
| for (i = 0; i < cdbs->total_entry_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *cdi = &cdbs->entry_db_info[i]; |
| cleanupComponentIdleQEs(cdi, includeWriter); |
| } |
| } |
| |
| return; |
| } |
| |
| /* |
| * This function is called when a transaction is started and the snapshot of |
| * segments info will not changed until the end of transaction |
| */ |
| void |
| cdbcomponent_updateCdbComponents(void) |
| { |
| int expandVersion = GetGpExpandVersion(); |
| |
| PG_TRY(); |
| { |
| if (cdb_component_dbs == NULL) |
| { |
| cdb_component_dbs = getCdbComponentInfo(); |
| cdb_component_dbs->expand_version = GetGpExpandVersion(); |
| } |
| else if (cdb_component_dbs->expand_version != expandVersion) |
| { |
| if (TempNamespaceOidIsValid()) |
| { |
| /* |
| * Do not update here, otherwise, temp files will be lost |
| * in segments; |
| */ |
| } |
| else |
| { |
| ELOG_DISPATCHER_DEBUG("FTS rescanned, get new component databases info."); |
| cdbcomponent_destroyCdbComponents(); |
| cdb_component_dbs = getCdbComponentInfo(); |
| cdb_component_dbs->expand_version = expandVersion; |
| } |
| } |
| } |
| PG_CATCH(); |
| { |
| FtsNotifyProber(); |
| |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| Assert(cdb_component_dbs->numActiveQEs == 0); |
| } |
| |
| /* |
| * cdbcomponent_getCdbComponents |
| * |
| * |
| * Storage for the SegmentInstances block and all subsidiary |
| * structures are allocated from the caller's context. |
| */ |
| CdbComponentDatabases * |
| cdbcomponent_getCdbComponents() |
| { |
| PG_TRY(); |
| { |
| if (cdb_component_dbs == NULL) |
| { |
| cdb_component_dbs = getCdbComponentInfo(); |
| cdb_component_dbs->expand_version = GetGpExpandVersion(); |
| } |
| } |
| PG_CATCH(); |
| { |
| if (Gp_role == GP_ROLE_DISPATCH) |
| FtsNotifyProber(); |
| |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| return cdb_component_dbs; |
| } |
| |
| /* |
| * cdbcomponet_destroyCdbComponents |
| * |
| * Disconnect and destroy all idle QEs, releases the memory |
| * occupied by the CdbComponentDatabases |
| * |
| * callers must clean up QEs used by dispatcher states. |
| */ |
| void |
| cdbcomponent_destroyCdbComponents(void) |
| { |
| /* caller must clean up all segdbs used by dispatcher states */ |
| Assert(!cdbcomponent_activeQEsExist()); |
| |
| hash_destroy(segment_ip_cache_htab); |
| segment_ip_cache_htab = NULL; |
| |
| /* disconnect and destroy idle QEs include writers */ |
| cdbcomponent_cleanupIdleQEs(true); |
| |
| /* delete the memory context */ |
| if (CdbComponentsContext) |
| MemoryContextDelete(CdbComponentsContext); |
| CdbComponentsContext = NULL; |
| cdb_component_dbs = NULL; |
| } |
| |
| /* |
| * Allocated a segdb |
| * |
| * If there is idle segdb in the freelist, return it, otherwise, initialize |
| * a new segdb. |
| * |
| * idle segdbs has an established connection with segment, but new segdb is |
| * not setup yet, callers need to establish the connection by themselves. |
| */ |
| SegmentDatabaseDescriptor * |
| cdbcomponent_allocateIdleQE(int contentId, SegmentType segmentType) |
| { |
| SegmentDatabaseDescriptor *segdbDesc = NULL; |
| CdbComponentDatabaseInfo *cdbinfo; |
| ListCell *curItem; |
| MemoryContext oldContext; |
| bool isWriter; |
| |
| cdbinfo = cdbcomponent_getComponentInfo(contentId); |
| |
| oldContext = MemoryContextSwitchTo(CdbComponentsContext); |
| |
| /* |
| * Always try to pop from the head. Make sure to push them back to head |
| * in cdbcomponent_recycleIdleQE(). |
| */ |
| foreach(curItem, cdbinfo->freelist) |
| { |
| SegmentDatabaseDescriptor *tmp = |
| (SegmentDatabaseDescriptor *)lfirst(curItem); |
| Assert(tmp); |
| |
| if ((segmentType == SEGMENTTYPE_EXPLICT_WRITER && !tmp->isWriter) || |
| (segmentType == SEGMENTTYPE_EXPLICT_READER && tmp->isWriter)) |
| continue; |
| |
| cdbinfo->freelist = foreach_delete_current(cdbinfo->freelist, curItem); |
| /* update numIdleQEs */ |
| DECR_COUNT(cdbinfo, numIdleQEs); |
| |
| segdbDesc = tmp; |
| break; |
| } |
| |
| if (!segdbDesc) |
| { |
| /* |
| * 1. for entrydb, it's never be writer. |
| * 2. for first QE, it must be a writer. |
| */ |
| isWriter = contentId == -1 ? false: (cdbinfo->numIdleQEs == 0 && cdbinfo->numActiveQEs == 0); |
| segdbDesc = cdbconn_createSegmentDescriptor(cdbinfo, nextQEIdentifer(cdbinfo->cdbs), isWriter); |
| } |
| |
| cdbconn_setQEIdentifier(segdbDesc, -1); |
| |
| INCR_COUNT(cdbinfo, numActiveQEs); |
| |
| MemoryContextSwitchTo(oldContext); |
| |
| return segdbDesc; |
| } |
| |
| static bool |
| cleanupQE(SegmentDatabaseDescriptor *segdbDesc) |
| { |
| Assert(segdbDesc != NULL); |
| |
| #ifdef FAULT_INJECTOR |
| if (SIMPLE_FAULT_INJECTOR("cleanup_qe") == FaultInjectorTypeSkip) |
| return false; |
| #endif |
| |
| /* |
| * if the process is in the middle of blowing up... then we don't do |
| * anything here. making libpq and other calls can definitely result in |
| * things getting HUNG. |
| */ |
| if (proc_exit_inprogress) |
| return false; |
| |
| if (cdbconn_isBadConnection(segdbDesc)) |
| return false; |
| |
| /* if segment is down, the gang can not be reused */ |
| if (FtsIsSegmentDown(segdbDesc->segment_database_info)) |
| return false; |
| |
| /* If a reader exceed the cached memory limitation, destroy it */ |
| if (!segdbDesc->isWriter && |
| (segdbDesc->conn->mop_high_watermark >> 20) > gp_vmem_protect_gang_cache_limit) |
| return false; |
| |
| /* Note, we cancel all "still running" queries */ |
| if (!cdbconn_discardResults(segdbDesc, 20)) |
| { |
| elog(LOG, "cleaning up seg%d while it is still busy", segdbDesc->segindex); |
| return false; |
| } |
| |
| /* QE is no longer associated with a slice. */ |
| cdbconn_setQEIdentifier(segdbDesc, /* slice index */ -1); |
| |
| return true; |
| } |
| |
| void |
| cdbcomponent_recycleIdleQE(SegmentDatabaseDescriptor *segdbDesc, bool forceDestroy) |
| { |
| CdbComponentDatabaseInfo *cdbinfo; |
| MemoryContext oldContext; |
| int maxLen; |
| bool isWriter; |
| |
| Assert(cdb_component_dbs); |
| Assert(CdbComponentsContext); |
| |
| cdbinfo = segdbDesc->segment_database_info; |
| isWriter = segdbDesc->isWriter; |
| |
| /* update num of active QEs */ |
| DECR_COUNT(cdbinfo, numActiveQEs); |
| |
| oldContext = MemoryContextSwitchTo(CdbComponentsContext); |
| |
| if (forceDestroy || !cleanupQE(segdbDesc)) |
| goto destroy_segdb; |
| |
| /* If freelist length exceed gp_cached_gang_threshold, destroy it */ |
| maxLen = segdbDesc->segindex == -1 ? |
| MAX_CACHED_1_GANGS : gp_cached_gang_threshold; |
| if (!isWriter && list_length(cdbinfo->freelist) >= maxLen) |
| goto destroy_segdb; |
| |
| /* Recycle the QE, put it to freelist */ |
| if (isWriter) |
| { |
| /* writer is always the header of freelist */ |
| segdbDesc->segment_database_info->freelist = |
| lcons(segdbDesc, segdbDesc->segment_database_info->freelist); |
| } |
| else |
| { |
| int lastWriterPos = -1; |
| ListCell *cell; |
| |
| /* |
| * In cdbcomponent_allocateIdleQE() readers are always popped from the |
| * head, so to restore the original order we must pushed them back to |
| * the head, and keep in mind readers must be put after the writers. |
| */ |
| |
| for (cell = list_head(segdbDesc->segment_database_info->freelist); |
| cell && ((SegmentDatabaseDescriptor *) lfirst(cell))->isWriter; |
| lastWriterPos++, cell = lnext(segdbDesc->segment_database_info->freelist, cell)) ; |
| |
| if (lastWriterPos >= 0) |
| segdbDesc->segment_database_info->freelist = list_insert_nth(segdbDesc->segment_database_info->freelist, |
| lastWriterPos + 1, segdbDesc); |
| else |
| segdbDesc->segment_database_info->freelist = |
| lcons(segdbDesc, segdbDesc->segment_database_info->freelist); |
| } |
| |
| INCR_COUNT(cdbinfo, numIdleQEs); |
| |
| MemoryContextSwitchTo(oldContext); |
| |
| return; |
| |
| destroy_segdb: |
| |
| cdbconn_termSegmentDescriptor(segdbDesc); |
| |
| if (isWriter) |
| { |
| markCurrentGxactWriterGangLost(); |
| } |
| |
| MemoryContextSwitchTo(oldContext); |
| } |
| |
| static int |
| nextQEIdentifer(CdbComponentDatabases *cdbs) |
| { |
| int result; |
| |
| if (!cdbs->freeCounterList) |
| return cdbs->qeCounter++; |
| |
| result = linitial_int(cdbs->freeCounterList); |
| cdbs->freeCounterList = list_delete_first(cdbs->freeCounterList); |
| return result; |
| } |
| |
| bool |
| cdbcomponent_qesExist(void) |
| { |
| return !cdb_component_dbs ? false : |
| (cdb_component_dbs->numIdleQEs > 0 || cdb_component_dbs->numActiveQEs > 0); |
| } |
| |
| bool |
| cdbcomponent_activeQEsExist(void) |
| { |
| return !cdb_component_dbs ? false : cdb_component_dbs->numActiveQEs > 0; |
| } |
| |
| /* |
| * Find CdbComponentDatabaseInfo in the array by segment index. |
| */ |
| CdbComponentDatabaseInfo * |
| cdbcomponent_getComponentInfo(int contentId) |
| { |
| CdbComponentDatabaseInfo *cdbInfo = NULL; |
| CdbComponentDatabases *cdbs; |
| |
| cdbs = cdbcomponent_getCdbComponents(); |
| |
| if (contentId < -1 || contentId >= cdbs->total_segments) |
| ereport(FATAL, |
| (errcode(ERRCODE_DATA_EXCEPTION), |
| errmsg("unexpected content id %d, should be [-1, %d]", |
| contentId, cdbs->total_segments - 1))); |
| /* entry db */ |
| if (contentId == -1) |
| { |
| cdbInfo = &cdbs->entry_db_info[0]; |
| return cdbInfo; |
| } |
| |
| /* no mirror, segment_db_info is sorted by content id */ |
| if (cdbs->total_segment_dbs == cdbs->total_segments) |
| { |
| cdbInfo = &cdbs->segment_db_info[contentId]; |
| return cdbInfo; |
| } |
| |
| /* with mirror, segment_db_info is sorted by content id */ |
| if (cdbs->total_segment_dbs != cdbs->total_segments) |
| { |
| Assert(cdbs->total_segment_dbs == cdbs->total_segments * 2); |
| cdbInfo = &cdbs->segment_db_info[2 * contentId]; |
| |
| if (!SEGMENT_IS_ACTIVE_PRIMARY(cdbInfo)) |
| { |
| cdbInfo = &cdbs->segment_db_info[2 * contentId + 1]; |
| } |
| |
| return cdbInfo; |
| } |
| |
| return cdbInfo; |
| } |
| |
| static void |
| ensureInterconnectAddress(void) |
| { |
| if (interconnect_address) |
| return; |
| |
| if (GpIdentity.segindex >= 0) |
| { |
| Assert(Gp_role == GP_ROLE_EXECUTE); |
| Assert(MyProcPort != NULL); |
| Assert(MyProcPort->laddr.addr.ss_family == AF_INET |
| || MyProcPort->laddr.addr.ss_family == AF_INET6); |
| /* |
| * We assume that the QD, using the address in gp_segment_configuration |
| * as its destination IP address, connects to the segment/QE. |
| * So, the local address in the PORT can be used for interconnect. |
| */ |
| char local_addr[NI_MAXHOST]; |
| getnameinfo((const struct sockaddr *)&MyProcPort->laddr.addr, |
| MyProcPort->laddr.salen, |
| local_addr, sizeof(local_addr), |
| NULL, 0, NI_NUMERICHOST); |
| interconnect_address = MemoryContextStrdup(TopMemoryContext, local_addr); |
| } |
| else if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| /* |
| * Here, we can only retrieve the ADDRESS in gp_segment_configuration |
| * from `cdbcomponent*`. We couldn't get it in a way as the QEs. |
| */ |
| CdbComponentDatabaseInfo *qdInfo; |
| qdInfo = cdbcomponent_getComponentInfo(MASTER_CONTENT_ID); |
| interconnect_address = MemoryContextStrdup(TopMemoryContext, qdInfo->config->hostip); |
| } |
| else if (qdHostname && qdHostname[0] != '\0') |
| { |
| Assert(Gp_role == GP_ROLE_EXECUTE); |
| /* |
| * QE on the master can't get its interconnect address like that on the primary. |
| * The QD connects to its postmaster via the unix domain socket. |
| */ |
| interconnect_address = qdHostname; |
| } |
| else |
| Assert(false); |
| } |
| /* |
| * performs all necessary setup required for Apache Cloudberry mode. |
| * |
| * This includes cdblink_setup() and initializing the Motion Layer. |
| */ |
| void |
| cdb_setup(void) |
| { |
| elog(DEBUG1, "Initializing Cloudberry components..."); |
| |
| if (Gp_role != GP_ROLE_UTILITY && !IS_SINGLENODE()) |
| { |
| if (!CurrentMotionIPCLayer) { |
| ereport(ERROR, |
| (errmsg("Interconnect moudle have not been preloaded"), |
| errdetail("Please make sure interconnect is included in option shared_preload_libraries"))); |
| } |
| ensureInterconnectAddress(); |
| /* Initialize the Motion Layer IPC subsystem. */ |
| CurrentMotionIPCLayer->InitMotionLayerIPC(); |
| } |
| |
| /* |
| * Backend process requires consistent state, it cannot proceed until |
| * dtx recovery process finish up the recovery of distributed transactions. |
| * |
| * Ignore background worker because bgworker_should_start_mpp() already did |
| * the check. |
| */ |
| if (!IsBackgroundWorker && |
| Gp_role == GP_ROLE_DISPATCH && |
| !*shmDtmStarted) |
| { |
| ereport(FATAL, |
| (errcode(ERRCODE_CANNOT_CONNECT_NOW), |
| errmsg(POSTMASTER_IN_RECOVERY_MSG), |
| errdetail("waiting for distributed transaction recovery to complete"))); |
| } |
| } |
| |
| /* |
| * performs all necessary cleanup required when leaving Cloudberry |
| * Database mode. This is also called when the process exits. |
| * |
| * NOTE: the arguments to this function are here only so that we can |
| * register it with on_proc_exit(). These parameters should not |
| * be used since there are some callers to this that pass them |
| * as NULL. |
| * |
| */ |
| void |
| cdb_cleanup(int code pg_attribute_unused(), Datum arg |
| pg_attribute_unused()) |
| { |
| elog(DEBUG1, "Cleaning up Cloudberry components..."); |
| |
| DisconnectAndDestroyAllGangs(true); |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| if (cdb_total_plans > 0) |
| { |
| elog(DEBUG1, "session dispatched %d plans %d slices (%f), largest plan %d", |
| cdb_total_plans, cdb_total_slices, |
| ((double) cdb_total_slices / (double) cdb_total_plans), |
| cdb_max_slices); |
| } |
| } |
| |
| if (Gp_role != GP_ROLE_UTILITY && !IS_SINGLENODE()) |
| { |
| /* shutdown our listener socket */ |
| CurrentMotionIPCLayer->CleanUpMotionLayerIPC(); |
| } |
| } |
| |
| /* |
| * CdbComponentDatabaseInfoCompare: |
| * A compare function for CdbComponentDatabaseInfo structs |
| * that compares based on , isprimary desc |
| * for use with qsort. |
| */ |
| static int |
| CdbComponentDatabaseInfoCompare(const void *p1, const void *p2) |
| { |
| const CdbComponentDatabaseInfo *obj1 = (CdbComponentDatabaseInfo *) p1; |
| const CdbComponentDatabaseInfo *obj2 = (CdbComponentDatabaseInfo *) p2; |
| |
| int cmp = obj1->config->segindex - obj2->config->segindex; |
| |
| if (cmp == 0) |
| { |
| int obj2cmp = 0; |
| int obj1cmp = 0; |
| |
| if (SEGMENT_IS_ACTIVE_PRIMARY(obj2)) |
| obj2cmp = 1; |
| |
| if (SEGMENT_IS_ACTIVE_PRIMARY(obj1)) |
| obj1cmp = 1; |
| |
| cmp = obj2cmp - obj1cmp; |
| } |
| |
| return cmp; |
| } |
| |
| /* |
| * Maintain a cache of names. |
| * |
| * The keys are all NAMEDATALEN long. |
| */ |
| static char * |
| getDnsCachedAddress(char *name, int port, int elevel, bool use_cache) |
| { |
| SegIpEntry *e = NULL; |
| char hostinfo[NI_MAXHOST]; |
| |
| if (use_cache) |
| { |
| if (segment_ip_cache_htab == NULL) |
| { |
| HASHCTL hash_ctl; |
| |
| MemSet(&hash_ctl, 0, sizeof(hash_ctl)); |
| |
| hash_ctl.keysize = NAMEDATALEN + 1; |
| hash_ctl.entrysize = sizeof(SegIpEntry); |
| |
| segment_ip_cache_htab = hash_create("segment_dns_cache", |
| 256, &hash_ctl, HASH_ELEM | HASH_STRINGS); |
| } |
| else |
| { |
| e = (SegIpEntry *) hash_search(segment_ip_cache_htab, |
| name, HASH_FIND, NULL); |
| if (e != NULL) |
| return e->hostinfo; |
| } |
| } |
| |
| /* |
| * The name is either not in our cache, or we've been instructed to not |
| * use the cache. Perform the name lookup. |
| */ |
| if (!use_cache || (use_cache && e == NULL)) |
| { |
| MemoryContext oldContext = NULL; |
| int ret; |
| char portNumberStr[32]; |
| char *service; |
| struct addrinfo *addrs = NULL, |
| *addr; |
| struct addrinfo hint; |
| |
| /* Initialize hint structure */ |
| MemSet(&hint, 0, sizeof(hint)); |
| hint.ai_socktype = SOCK_STREAM; |
| hint.ai_family = AF_UNSPEC; |
| |
| snprintf(portNumberStr, sizeof(portNumberStr), "%d", port); |
| service = portNumberStr; |
| |
| ret = pg_getaddrinfo_all(name, service, &hint, &addrs); |
| if (ret || !addrs) |
| { |
| if (addrs) |
| pg_freeaddrinfo_all(hint.ai_family, addrs); |
| |
| /* |
| * If a host name is unknown, whether it is an error depends on its role: |
| * - if it is a primary then it's an error; |
| * - if it is a mirror then it's just a warning; |
| * but we do not know the role information here, so always treat it as a |
| * warning, the callers should check the role and decide what to do. |
| */ |
| if (ret != EAI_FAIL && elevel == ERROR) |
| elevel = WARNING; |
| |
| ereport(elevel, |
| (errmsg("could not translate host name \"%s\", port \"%d\" to address: %s", |
| name, port, gai_strerror(ret)))); |
| |
| return NULL; |
| } |
| |
| /* save in the cache context */ |
| if (use_cache) |
| oldContext = MemoryContextSwitchTo(TopMemoryContext); |
| |
| hostinfo[0] = '\0'; |
| for (addr = addrs; addr; addr = addr->ai_next) |
| { |
| #ifdef HAVE_UNIX_SOCKETS |
| /* Ignore AF_UNIX sockets, if any are returned. */ |
| if (addr->ai_family == AF_UNIX) |
| continue; |
| #endif |
| if (addr->ai_family == AF_INET) /* IPv4 address */ |
| { |
| memset(hostinfo, 0, sizeof(hostinfo)); |
| pg_getnameinfo_all((struct sockaddr_storage *) addr->ai_addr, addr->ai_addrlen, |
| hostinfo, sizeof(hostinfo), |
| NULL, 0, |
| NI_NUMERICHOST); |
| |
| if (use_cache) |
| { |
| /* Insert into our cache htab */ |
| e = (SegIpEntry *) hash_search(segment_ip_cache_htab, |
| name, HASH_ENTER, NULL); |
| memcpy(e->hostinfo, hostinfo, sizeof(hostinfo)); |
| } |
| |
| break; |
| } |
| } |
| |
| #ifdef HAVE_IPV6 |
| |
| /* |
| * IPv6 probably would work fine, we'd just need to make sure all the |
| * data structures are big enough for the IPv6 address. And on some |
| * broken systems, you can get an IPv6 address, but not be able to |
| * bind to it because IPv6 is disabled or missing in the kernel, so |
| * we'd only want to use the IPv6 address if there isn't an IPv4 |
| * address. All we really need to do is test this. |
| */ |
| if (((!use_cache && !hostinfo[0]) || (use_cache && e == NULL)) |
| && addrs->ai_family == AF_INET6) |
| { |
| addr = addrs; |
| /* Get a text representation of the IP address */ |
| pg_getnameinfo_all((struct sockaddr_storage *) addr->ai_addr, addr->ai_addrlen, |
| hostinfo, sizeof(hostinfo), |
| NULL, 0, |
| NI_NUMERICHOST); |
| |
| if (use_cache) |
| { |
| /* Insert into our cache htab */ |
| e = (SegIpEntry *) hash_search(segment_ip_cache_htab, |
| name, HASH_ENTER, NULL); |
| memcpy(e->hostinfo, hostinfo, sizeof(hostinfo)); |
| } |
| } |
| #endif |
| |
| if (use_cache) |
| MemoryContextSwitchTo(oldContext); |
| |
| pg_freeaddrinfo_all(hint.ai_family, addrs); |
| } |
| |
| /* return a pointer to our cache. */ |
| if (use_cache) |
| return e->hostinfo; |
| |
| return pstrdup(hostinfo); |
| } |
| |
| /* |
| * getDnsAddress |
| * |
| * same as getDnsCachedAddress, but without using the cache. A non-cached |
| * version was used inline inside of cdbgang.c, and since it is needed now |
| * elsewhere, it is available externally. |
| */ |
| char * |
| getDnsAddress(char *hostname, int port, int elevel) |
| { |
| return getDnsCachedAddress(hostname, port, elevel, false); |
| } |
| |
| |
| /* |
| * Given a component-db in the system, find the addresses at which it |
| * can be reached, appropriately populate the argument-structure, and |
| * maintain the ip-lookup-cache. |
| */ |
| static void |
| getAddressesForDBid(GpSegConfigEntry *c, int elevel) |
| { |
| char *name; |
| |
| Assert(c != NULL); |
| |
| /* Use hostname */ |
| memset(c->hostaddrs, 0, COMPONENT_DBS_MAX_ADDRS * sizeof(char *)); |
| |
| #ifdef FAULT_INJECTOR |
| if (SIMPLE_FAULT_INJECTOR("get_dns_cached_address") == FaultInjectorTypeSkip) |
| { |
| /* inject a dns error for primary of segment 0 */ |
| if (c->segindex == 0 && |
| c->preferred_role == GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY) |
| { |
| c->address = pstrdup("dnserrordummyaddress"); |
| c->hostname = pstrdup("dnserrordummyaddress"); |
| } |
| } |
| #endif |
| |
| /* |
| * add an entry, using the first the "address" and then the "hostname" as |
| * fallback. |
| */ |
| name = getDnsCachedAddress(c->address, c->port, elevel, true); |
| |
| if (name) |
| { |
| c->hostaddrs[0] = pstrdup(name); |
| return; |
| } |
| |
| /* now the hostname. */ |
| name = getDnsCachedAddress(c->hostname, c->port, elevel, true); |
| if (name) |
| { |
| c->hostaddrs[0] = pstrdup(name); |
| } |
| else |
| { |
| c->hostaddrs[0] = NULL; |
| } |
| |
| return; |
| } |
| |
| /* |
| * hostSegsHashTableInit() |
| * Construct a hash table of HostSegsEntry |
| */ |
| static HTAB * |
| hostSegsHashTableInit(void) |
| { |
| HASHCTL info; |
| |
| /* Set key and entry sizes. */ |
| MemSet(&info, 0, sizeof(info)); |
| info.keysize = INET6_ADDRSTRLEN; |
| info.entrysize = sizeof(HostSegsEntry); |
| |
| return hash_create("HostSegs", 32, &info, HASH_ELEM | HASH_STRINGS); |
| } |
| |
| /* |
| * Given total number of primary segment databases and a number of |
| * segments to "skip" - this routine creates a boolean map (array) the |
| * size of total number of segments and randomly selects several |
| * entries (total number of total_to_skip) to be marked as |
| * "skipped". This is used for external tables with the 'gpfdist' |
| * protocol where we want to get a number of *random* segdbs to |
| * connect to a gpfdist client. |
| * |
| * Caller of this function should pfree skip_map when done with it. |
| */ |
| bool * |
| makeRandomSegMap(int total_primaries, int total_to_skip) |
| { |
| int randint; /* some random int representing a seg */ |
| int skipped = 0; /* num segs already marked to be skipped */ |
| bool *skip_map; |
| |
| skip_map = (bool *) palloc(total_primaries * sizeof(bool)); |
| MemSet(skip_map, false, total_primaries * sizeof(bool)); |
| |
| while (total_to_skip != skipped) |
| { |
| /* |
| * create a random int between 0 and (total_primaries - 1). |
| */ |
| randint = cdbhashrandomseg(total_primaries); |
| |
| /* |
| * mark this random index 'true' in the skip map (marked to be |
| * skipped) unless it was already marked. |
| */ |
| if (skip_map[randint] == false) |
| { |
| skip_map[randint] = true; |
| skipped++; |
| } |
| } |
| |
| return skip_map; |
| } |
| |
| /* |
| * Determine the dbid for the master standby |
| */ |
| int16 |
| master_standby_dbid(void) |
| { |
| int total_dbs; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| int16 dbid = 0; |
| |
| /* |
| * Can only run on a master node, this restriction is due to the reliance |
| * on the gp_segment_configuration table. |
| */ |
| if (!IS_QUERY_DISPATCHER()) |
| elog(ERROR, "master_standby_dbid() executed on execution segment"); |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, false); |
| |
| /* |
| * SELECT dbid FROM gp_segment_configuration WHERE content = -1 AND role = |
| * GP_SEGMENT_CONFIGURATION_ROLE_MIRROR |
| */ |
| for (int i = 0; i < total_dbs; i++) |
| { |
| config = &configs[i]; |
| if (config->role == GP_SEGMENT_CONFIGURATION_ROLE_MIRROR && config->segindex == -1) |
| { |
| dbid = config->dbid; |
| break; |
| } |
| } |
| |
| cleanGpSegConfigs(configs, total_dbs); |
| return dbid; |
| } |
| |
| GpSegConfigEntry * |
| dbid_get_dbinfo(int16 dbid) |
| { |
| int total_dbs; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| GpSegConfigEntry *config_rc = NULL; |
| |
| /* Should not call this method in TopMemoryContext |
| * It will make memory leak |
| */ |
| Assert(CurrentMemoryContext != TopMemoryContext); |
| |
| /* |
| * Can only run on a master node, this restriction is due to the reliance |
| * on the gp_segment_configuration table. This may be able to be relaxed |
| * by switching to a different method of checking. |
| */ |
| if (!IS_QUERY_DISPATCHER()) |
| elog(ERROR, "dbid_get_dbinfo() executed on execution segment"); |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, false); |
| for (int i = 0; i < total_dbs; i++) |
| { |
| config = &configs[i]; |
| if (config->dbid == dbid) |
| { |
| config_rc = palloc0(sizeof(GpSegConfigEntry)); |
| copyGpsegConfig(config, config_rc); |
| break; |
| } |
| } |
| |
| cleanGpSegConfigs(configs, total_dbs); |
| return config_rc; |
| } |
| |
| /* |
| * Obtain the dbid of a of a segment at a given segment index (i.e., content id) |
| * currently fulfilling the role specified. This means that the segment is |
| * really performing the role of primary or mirror, irrespective of their |
| * preferred role. |
| */ |
| int16 |
| contentid_get_dbid(int16 contentid, char role, bool getPreferredRoleNotCurrentRole) |
| { |
| int16 dbid = 0; |
| int total_dbs; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| |
| /* |
| * Can only run on a master node, this restriction is due to the reliance |
| * on the gp_segment_configuration table. This may be able to be relaxed |
| * by switching to a different method of checking. |
| */ |
| if (!IS_QUERY_DISPATCHER()) |
| elog(ERROR, "contentid_get_dbid() executed on execution segment"); |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, false); |
| |
| for (int i = 0; i < total_dbs; i++) |
| { |
| config = &configs[i]; |
| if (config->segindex == contentid && |
| (getPreferredRoleNotCurrentRole ? config->preferred_role == role : config->role == role)) |
| { |
| dbid = config->dbid; |
| break; |
| } |
| } |
| |
| cleanGpSegConfigs(configs, total_dbs); |
| return dbid; |
| } |
| |
| int16 |
| cdbcomponent_get_maxdbid(void) |
| { |
| int16 dbid = 0; |
| int total_dbs; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, false); |
| |
| for (int i = 0; i < total_dbs; i++) |
| { |
| config = &configs[i]; |
| dbid = Max(dbid, config->dbid); |
| } |
| |
| cleanGpSegConfigs(configs, total_dbs); |
| return dbid; |
| } |
| |
| |
| int16 |
| cdbcomponent_get_availableDbId(void) |
| { |
| HASHCTL hash_ctl; |
| |
| memset(&hash_ctl, 0, sizeof(hash_ctl)); |
| hash_ctl.keysize = sizeof(int32); |
| hash_ctl.entrysize = sizeof(int32); |
| hash_ctl.hash = int32_hash; |
| HTAB *htab = hash_create("Temporary table of dbids", |
| 1024, |
| &hash_ctl, |
| HASH_ELEM | HASH_FUNCTION); |
| |
| /* scan gp_segment_configuration in ETCD */ |
| int total_dbs; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| |
| /* |
| * Set up hash of used dbids. We use int32 here because int16 doesn't |
| * have a convenient hash and we can use casting below to check for |
| * overflow of int16 |
| */ |
| int32 dbid = 0; |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, false); |
| |
| for (int i = 0; i < total_dbs; i++) |
| { |
| config = &configs[i]; |
| dbid = config->dbid; |
| (void) hash_search(htab, (void *) &dbid, HASH_ENTER, NULL); |
| } |
| |
| /* search for available dbid */ |
| for (dbid = 1;; dbid++) |
| { |
| if (dbid != (int16) dbid) |
| elog(ERROR, "unable to find available dbid"); |
| |
| if (hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL) |
| { |
| hash_destroy(htab); |
| cleanGpSegConfigs(configs, total_dbs); |
| return (int16) dbid; |
| } |
| } |
| cleanGpSegConfigs(configs, total_dbs); |
| } |
| |
| int16 |
| cdbcomponent_get_maxcontentid(void) |
| { |
| int16 segindex = 0; |
| int total_dbs; |
| GpSegConfigEntry *configs = NULL; |
| GpSegConfigEntry *config = NULL; |
| |
| configs = readGpSegConfigFromETCD(&total_dbs, false); |
| |
| for (int i = 0; i < total_dbs; i++) |
| { |
| config = &configs[i]; |
| segindex = Max(segindex, config->segindex); |
| } |
| |
| cleanGpSegConfigs(configs, total_dbs); |
| return segindex; |
| } |
| |
| List * |
| cdbcomponent_getCdbComponentsList(void) |
| { |
| CdbComponentDatabases *cdbs; |
| List *segments = NIL; |
| int i; |
| |
| cdbs = cdbcomponent_getCdbComponents(); |
| |
| for (i = 0; i < cdbs->total_segments; i++) |
| { |
| segments = lappend_int(segments, i); |
| } |
| |
| return segments; |
| } |
| |
| /* |
| * return the number of total segments for current snapshot of |
| * segments info |
| */ |
| int |
| getgpsegmentCount(void) |
| { |
| /* 1 represents a singleton postgresql in utility mode */ |
| int32 numsegments = 1; |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| numsegments = cdbcomponent_getCdbComponents()->total_segments; |
| else if (Gp_role == GP_ROLE_EXECUTE) |
| numsegments = numsegmentsFromQD; |
| /* |
| * If we are in 'Utility & Binary Upgrade' mode, it must be launched |
| * by the pg_upgrade, so we give it an correct numsegments to make |
| * sure the pg_upgrade can run normally. |
| * Only Utility QD process have the entire information in the |
| * gp_segment_configuration, so we count the segments count in this |
| * process. |
| */ |
| else if (Gp_role == GP_ROLE_UTILITY && |
| IsBinaryUpgrade && |
| IS_QUERY_DISPATCHER()) |
| { |
| numsegments = cdbcomponent_getCdbComponents()->total_segments; |
| } |
| |
| return numsegments; |
| } |
| |
| /* |
| * IsOnConflictUpdate |
| * Return true if a plannedstmt is an upsert: insert ... on conflict do update |
| */ |
| bool |
| IsOnConflictUpdate(PlannedStmt *ps) |
| { |
| Plan *plan; |
| |
| if (ps == NULL || ps->commandType != CMD_INSERT) |
| return false; |
| |
| plan = ps->planTree; |
| |
| if (plan && IsA(plan, Motion)) |
| plan = outerPlan(plan); |
| |
| if (plan == NULL || !IsA(plan, ModifyTable)) |
| return false; |
| |
| return ((ModifyTable *)plan)->onConflictAction == ONCONFLICT_UPDATE; |
| } |
| |
| /* |
| * Avoid core file generation for this PANIC. It helps to avoid |
| * filling up disks during tests and also saves time. |
| */ |
| void |
| AvoidCorefileGeneration() |
| { |
| #if defined(HAVE_GETRLIMIT) && defined(RLIMIT_CORE) |
| struct rlimit lim; |
| getrlimit(RLIMIT_CORE, &lim); |
| lim.rlim_cur = 0; |
| if (setrlimit(RLIMIT_CORE, &lim) != 0) |
| { |
| int save_errno = errno; |
| |
| elog(NOTICE, |
| "setrlimit failed for RLIMIT_CORE soft limit to zero. errno: %d (%m).", |
| save_errno); |
| } |
| #endif |
| } |
| |
| PG_FUNCTION_INFO_V1(gp_get_suboverflowed_backends); |
| /* |
| * Find the backends where subtransaction overflowed. |
| */ |
| Datum |
| gp_get_suboverflowed_backends(PG_FUNCTION_ARGS) |
| { |
| int i; |
| ArrayBuildState *astate = NULL; |
| |
| LWLockAcquire(ProcArrayLock, LW_SHARED); |
| for (i = 0; i < ProcGlobal->allProcCount; i++) |
| { |
| if (ProcGlobal->subxidStates[i].overflowed) |
| astate = accumArrayResult(astate, |
| Int32GetDatum(ProcGlobal->allProcs[i].pid), |
| false, INT4OID, CurrentMemoryContext); |
| } |
| LWLockRelease(ProcArrayLock); |
| |
| if (astate) |
| PG_RETURN_DATUM(makeArrayResult(astate, |
| CurrentMemoryContext)); |
| else |
| PG_RETURN_NULL(); |
| } |
| |
| #endif /* USE_INTERNAL_FTS */ |