| /*------------------------------------------------------------------------- |
| * |
| * external.c |
| * routines for getting external info from external table fdw. |
| * |
| * Portions Copyright (c) 2020-Present VMware, Inc. or its affiliates. |
| * |
| * IDENTIFICATION |
| * src/backend/access/external/external.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include <fstream/gfile.h> |
| |
| #include "access/external.h" |
| #include "access/reloptions.h" |
| #include "access/table.h" |
| #include "catalog/indexing.h" |
| #include "catalog/pg_foreign_server.h" |
| #include "catalog/pg_foreign_table.h" |
| #include "cdb/cdbsreh.h" |
| #include "cdb/cdbvars.h" |
| #include "commands/defrem.h" |
| #include "foreign/foreign.h" |
| #include "mb/pg_wchar.h" |
| #include "nodes/makefuncs.h" |
| #include "optimizer/optimizer.h" |
| #include "utils/fmgroids.h" |
| #include "utils/lsyscache.h" |
| #include "utils/uri.h" |
| |
| static List *create_external_scan_uri_list(ExtTableEntry *ext, bool *ismasteronly); |
| |
| void |
| gfile_printf_then_putc_newline(const char *format,...) |
| { |
| char *a; |
| va_list va; |
| int i; |
| |
| va_start(va, format); |
| i = vsnprintf(0, 0, format, va); |
| va_end(va); |
| |
| if (i < 0) |
| elog(NOTICE, "gfile_printf_then_putc_newline vsnprintf failed."); |
| else if (!(a = palloc(i + 1))) |
| elog(NOTICE, "gfile_printf_then_putc_newline palloc failed."); |
| else |
| { |
| va_start(va, format); |
| vsnprintf(a, i + 1, format, va); |
| va_end(va); |
| elog(NOTICE, "%s", a); |
| pfree(a); |
| } |
| } |
| |
| void * |
| gfile_malloc(size_t size) |
| { |
| return palloc(size); |
| } |
| |
| void |
| gfile_free(void *a) |
| { |
| pfree(a); |
| } |
| |
| /* Split the uris string which may contain escape */ |
| static char* |
| strsep_uri(char **uris) |
| { |
| char *index; |
| char *result; |
| |
| if ((index = *uris) == NULL) |
| return NULL; |
| if (*index == '\0') |
| return NULL; |
| |
| size_t len = strlen(index); |
| result = (char *)palloc(len + 1); |
| int j = 0; |
| for (;;) |
| { |
| if (*index == '\0') |
| { |
| result[j++] = '\0'; |
| *uris = index; |
| break; |
| } |
| /* If escape is found */ |
| else if (*index == '\\') |
| { |
| /* Check the next character after escape. */ |
| index++; |
| /* If it is a separator or another escape, skip the previous escape. */ |
| if (*index == '\\' || *index == '|') |
| { |
| result[j++] = *index; |
| } |
| /* This is only possible for previous version data without escape. |
| * If it is the end, continue and the next loop will handle it. |
| */ |
| else if (*index == '\0') |
| { |
| result[j++] = '\\'; |
| continue; |
| } |
| /* This is only possible for previous version data without escape. |
| * If it is a common char, keep the original format. |
| */ |
| else |
| { |
| result[j++] = '\\'; |
| result[j++] = *index; |
| } |
| index++; |
| } |
| /* For correct data, only delimiter have not escape before. */ |
| else if (*index == '|') |
| { |
| index++; |
| result[j++] = '\0'; |
| *uris = index; |
| break; |
| } |
| else |
| { |
| result[j++] = *index; |
| index++; |
| } |
| } |
| return result; |
| } |
| |
| /* transform the locations string to a list */ |
| List* |
| TokenizeLocationUris(char *uris) |
| { |
| char *uri = NULL; |
| List *result = NIL; |
| |
| Assert(uris != NULL); |
| |
| while ((uri = strsep_uri(&uris)) != NULL) |
| { |
| result = lappend(result, makeString(uri)); |
| } |
| |
| return result; |
| } |
| |
| /* |
| * Get the entry for an exttable relation (from pg_foreign_table) |
| */ |
| ExtTableEntry* |
| GetExtTableEntry(Oid relid) |
| { |
| ExtTableEntry *extentry; |
| |
| extentry = GetExtTableEntryIfExists(relid); |
| if (!extentry) |
| ereport(ERROR, |
| (errcode(ERRCODE_UNDEFINED_OBJECT), |
| errmsg("missing pg_foreign_table entry for relation \"%s\"", |
| get_rel_name(relid)))); |
| return extentry; |
| } |
| |
| /* |
| * Like GetExtTableEntry(Oid), but returns NULL instead of throwing |
| * an error if no pg_foreign_table entry is found. |
| */ |
| ExtTableEntry* |
| GetExtTableEntryIfExists(Oid relid) |
| { |
| ForeignTable *ft; |
| ExtTableEntry *extentry; |
| |
| /* do nothing if it's not an external table */ |
| if (!rel_is_external_table(relid)) |
| return NULL; |
| |
| ft = GetForeignTable(relid); |
| |
| /* options array is always populated, {} if no options set */ |
| if (ft->options == NULL) |
| elog(ERROR, "could not find options for external protocol"); |
| |
| extentry = GetExtFromForeignTableOptions(ft->options, relid); |
| |
| pfree(ft); |
| |
| return extentry; |
| } |
| |
| ExtTableEntry * |
| GetExtFromForeignTableOptions(List *ftoptons, Oid relid) |
| { |
| ExtTableEntry *extentry; |
| ListCell *lc; |
| List *entryOptions = NIL; |
| char *arg; |
| bool rejectlimit_found = false; |
| bool rejectlimittype_found = false; |
| bool logerrors_found = false; |
| bool encoding_found = false; |
| bool iswritable_found = false; |
| bool executeon_found = false; |
| |
| extentry = (ExtTableEntry *) palloc0(sizeof(ExtTableEntry)); |
| |
| foreach(lc, ftoptons) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (pg_strcasecmp(def->defname, "location_uris") == 0) |
| { |
| extentry->urilocations = TokenizeLocationUris(defGetString(def)); |
| continue; |
| } |
| |
| if (pg_strcasecmp(def->defname, "execute_on") == 0) |
| { |
| extentry->execlocations = list_make1(makeString(defGetString(def))); |
| executeon_found = true; |
| continue; |
| } |
| |
| if (pg_strcasecmp(def->defname, "command") == 0) |
| { |
| extentry->command = defGetString(def); |
| continue; |
| } |
| |
| if (pg_strcasecmp(def->defname, "format_type") == 0) |
| { |
| arg = defGetString(def); |
| extentry->fmtcode = arg[0]; |
| continue; |
| } |
| |
| /* only CSV format needs this for ProcessCopyOptions(), will do it later */ |
| if (pg_strcasecmp(def->defname, "format") == 0) |
| { |
| continue; |
| } |
| |
| if (pg_strcasecmp(def->defname, "reject_limit") == 0) |
| { |
| extentry->rejectlimit = atoi(defGetString(def)); |
| rejectlimit_found = true; |
| continue; |
| } |
| |
| if (pg_strcasecmp(def->defname, "reject_limit_type") == 0) |
| { |
| arg = defGetString(def); |
| extentry->rejectlimittype = arg[0]; |
| rejectlimittype_found = true; |
| continue; |
| } |
| |
| if (pg_strcasecmp(def->defname, "log_errors") == 0) |
| { |
| arg = defGetString(def); |
| extentry->logerrors = arg[0]; |
| logerrors_found = true; |
| continue; |
| } |
| |
| if (pg_strcasecmp(def->defname, "encoding") == 0) |
| { |
| extentry->encoding = atoi(defGetString(def)); |
| encoding_found = true; |
| continue; |
| } |
| |
| if (pg_strcasecmp(def->defname, "is_writable") == 0) |
| { |
| extentry->iswritable = defGetBoolean(def); |
| iswritable_found = true; |
| continue; |
| } |
| |
| entryOptions = lappend(entryOptions, makeDefElem(def->defname, (Node *) makeString(pstrdup(defGetString(def))), -1)); |
| } |
| |
| /* If CSV format was chosen, make it visible to ProcessCopyOptions. */ |
| if (fmttype_is_csv(extentry->fmtcode)) |
| entryOptions = lappend(entryOptions, makeDefElem("format", (Node *) makeString("csv"), -1)); |
| |
| if (!executeon_found) |
| extentry->execlocations = list_make1(makeString("ALL_SEGMENTS")); |
| |
| if(!iswritable_found) |
| extentry->iswritable = false; |
| |
| if(!encoding_found) |
| extentry->encoding = GetDatabaseEncoding(); |
| |
| if(!logerrors_found) |
| extentry->logerrors = LOG_ERRORS_DISABLE; |
| |
| if (!rejectlimit_found) { |
| /* mark that no SREH requested */ |
| extentry->rejectlimit = -1; |
| } |
| |
| if (!rejectlimittype_found) |
| extentry->rejectlimittype = -1; |
| |
| extentry->options = entryOptions; |
| |
| return extentry; |
| } |
| |
| ExternalScanInfo * |
| MakeExternalScanInfo(ExtTableEntry *extEntry) |
| { |
| ExternalScanInfo *node = makeNode(ExternalScanInfo); |
| List *urilist; |
| bool ismasteronly = false; |
| bool islimitinrows = false; |
| int rejectlimit = -1; |
| char logerrors = LOG_ERRORS_DISABLE; |
| static uint32 scancounter = 0; |
| |
| if (extEntry->rejectlimit != -1) |
| { |
| /* |
| * single row error handling is requested, make sure reject limit and |
| * reject type are valid. |
| * |
| * NOTE: this should never happen unless somebody modified the catalog |
| * manually. We are just being pedantic here. |
| */ |
| VerifyRejectLimit(extEntry->rejectlimittype, extEntry->rejectlimit); |
| } |
| |
| /* assign Uris to segments. */ |
| urilist = create_external_scan_uri_list(extEntry, &ismasteronly); |
| |
| /* single row error handling */ |
| if (extEntry->rejectlimit != -1) |
| { |
| islimitinrows = (extEntry->rejectlimittype == 'r' ? true : false); |
| rejectlimit = extEntry->rejectlimit; |
| logerrors = extEntry->logerrors; |
| } |
| |
| node->uriList = urilist; |
| node->fmtType = extEntry->fmtcode; |
| node->isMasterOnly = ismasteronly; |
| node->rejLimit = rejectlimit; |
| node->rejLimitInRows = islimitinrows; |
| node->logErrors = logerrors; |
| node->encoding = extEntry->encoding; |
| node->scancounter = scancounter++; |
| node->extOptions = extEntry->options; |
| |
| return node; |
| } |
| |
| static List * |
| create_external_scan_uri_list(ExtTableEntry *ext, bool *ismasteronly) |
| { |
| ListCell *c; |
| List *modifiedloclist = NIL; |
| int i; |
| CdbComponentDatabases *db_info; |
| int total_primaries; |
| char **segdb_file_map; |
| |
| /* various processing flags */ |
| bool using_execute = false; /* true if EXECUTE is used */ |
| bool using_location; /* true if LOCATION is used */ |
| bool found_candidate = false; |
| bool found_match = false; |
| bool done = false; |
| List *filenames; |
| |
| /* gpfdist(s) or EXECUTE specific variables */ |
| int total_to_skip = 0; |
| int max_participants_allowed = 0; |
| int num_segs_participating = 0; |
| bool *skip_map = NULL; |
| bool should_skip_randomly = false; |
| |
| Uri *uri; |
| char *on_clause; |
| |
| *ismasteronly = false; |
| |
| /* is this an EXECUTE table or a LOCATION (URI) table */ |
| if (ext->command) |
| { |
| using_execute = true; |
| using_location = false; |
| } |
| else |
| { |
| using_execute = false; |
| using_location = true; |
| } |
| |
| /* is this an EXECUTE table or a LOCATION (URI) table */ |
| if (ext->command && !gp_external_enable_exec) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_FEATURE_NOT_CONFIGURED), /* any better errcode? */ |
| errmsg("using external tables with OS level commands (EXECUTE clause) is disabled"), |
| errhint("To enable set gp_external_enable_exec=on."))); |
| } |
| |
| /* various validations */ |
| if (ext->iswritable) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot read from a WRITABLE external table"), |
| errhint("Create the table as READABLE instead."))); |
| |
| /* |
| * take a peek at the first URI so we know which protocol we'll deal with |
| */ |
| if (!using_execute) |
| { |
| char *first_uri_str; |
| |
| first_uri_str = strVal(linitial(ext->urilocations)); |
| uri = ParseExternalTableUri(first_uri_str); |
| } |
| else |
| uri = NULL; |
| |
| /* get the ON clause information, and restrict 'ON COORDINATOR' to custom |
| * protocols only */ |
| on_clause = (char *) strVal(linitial(ext->execlocations)); |
| if ((strcmp(on_clause, "COORDINATOR_ONLY") == 0) |
| && using_location && (uri->protocol != URI_CUSTOM)) { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), |
| errmsg("\'ON COORDINATOR\' is not supported by this protocol yet"))); |
| } |
| |
| /* get the total valid primary segdb count */ |
| db_info = cdbcomponent_getCdbComponents(); |
| total_primaries = 0; |
| for (i = 0; i < db_info->total_segment_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *p = &db_info->segment_db_info[i]; |
| |
| if (SEGMENT_IS_ACTIVE_PRIMARY(p)) |
| total_primaries++; |
| } |
| /* In single node mode, we only have one primary which is coordinator. */ |
| if (IS_SINGLENODE()) |
| total_primaries = 1; |
| |
| /* |
| * initialize a file-to-segdb mapping. segdb_file_map string array indexes |
| * segindex and the entries are the external file path is assigned to this |
| * segment database. For example if segdb_file_map[2] has "/tmp/emp.1" then |
| * this file is assigned to primary segdb 2. if an entry has NULL then |
| * that segdb isn't assigned any file. |
| */ |
| segdb_file_map = (char **) palloc0(total_primaries * sizeof(char *)); |
| |
| /* |
| * Now we do the actual assignment of work to the segment databases (where |
| * work is either a URI to open or a command to execute). Due to the big |
| * differences between the different protocols we handle each one |
| * separately. Unfortunately this means some code duplication, but keeping |
| * this separation makes the code much more understandable and (even) more |
| * maintainable. |
| * |
| * Outline of the following code blocks (from simplest to most complex): |
| * (only one of these will get executed for a statement) |
| * |
| * 1) segment mapping for tables with LOCATION http:// or file:// . |
| * |
| * These two protocols are very similar in that they enforce a |
| * 1-URI:1-segdb relationship. The only difference between them is that |
| * file:// URI must be assigned to a segdb on a host that is local to that |
| * URI. |
| * |
| * 2) segment mapping for tables with LOCATION gpfdist(s):// or custom |
| * protocol |
| * |
| * This protocol is more complicated - in here we usually duplicate the |
| * user supplied gpfdist(s):// URIs until there is one available to every |
| * segdb. However, in some cases (as determined by gp_external_max_segs |
| * GUC) we don't want to use *all* segdbs but instead figure out how many |
| * and pick them randomly (this is mainly for better performance and |
| * resource mgmt). |
| * |
| * 3) segment mapping for tables with EXECUTE 'cmd' ON. |
| * |
| * In here we don't have URI's. We have a single command string and a |
| * specification of the segdb granularity it should get executed on (the |
| * ON clause). Depending on the ON clause specification we could go many |
| * different ways, for example: assign the command to all segdb, or one |
| * command per host, or assign to 5 random segments, etc... |
| */ |
| |
| /* (1) */ |
| if (using_location && (uri->protocol == URI_FILE || uri->protocol == URI_HTTP)) |
| { |
| /* |
| * extract file path and name from URI strings and assign them a |
| * primary segdb |
| */ |
| foreach(c, ext->urilocations) |
| { |
| const char *uri_str = (char *) strVal(lfirst(c)); |
| |
| uri = ParseExternalTableUri(uri_str); |
| |
| found_candidate = false; |
| found_match = false; |
| |
| /* |
| * look through our segment database list and try to find a |
| * database that can handle this uri. |
| */ |
| for (i = 0; i < db_info->total_segment_dbs && !found_match; i++) |
| { |
| CdbComponentDatabaseInfo *p = &db_info->segment_db_info[i]; |
| int segind = p->config->segindex; |
| |
| /* |
| * Assign mapping of external file to this segdb only if: |
| * 1) This segdb is a valid primary. |
| * 2) An external file wasn't already assigned to it. |
| * 3) If 'file' protocol, host of segdb and file must be |
| * the same. |
| * |
| * This logic also guarantees that file that appears first in |
| * the external location list for the same host gets assigned |
| * the segdb with the lowest index for this host. |
| */ |
| if (SEGMENT_IS_ACTIVE_PRIMARY(p)) |
| { |
| if (uri->protocol == URI_FILE) |
| { |
| if (pg_strcasecmp(uri->hostname, p->config->hostname) != 0 && pg_strcasecmp(uri->hostname, p->config->address) != 0) |
| continue; |
| } |
| |
| /* a valid primary segdb exist on this host */ |
| found_candidate = true; |
| |
| if (segdb_file_map[segind] == NULL) |
| { |
| /* segdb not taken yet. assign this URI to this segdb */ |
| segdb_file_map[segind] = pstrdup(uri_str); |
| found_match = true; |
| } |
| |
| /* |
| * too bad. this segdb already has an external source |
| * assigned |
| */ |
| } |
| } |
| |
| /* If we're in SingleNode mode, we can only handle one external location. */ |
| if (IS_SINGLENODE() && segdb_file_map[0] == NULL) |
| { |
| segdb_file_map[0] = pstrdup(uri_str); |
| found_match = true; |
| } |
| |
| /* |
| * We failed to find a segdb for this URI. |
| */ |
| if (!found_match) |
| { |
| if (uri->protocol == URI_FILE) |
| { |
| if (found_candidate) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TABLE_DEFINITION), |
| errmsg("could not assign a segment database for \"%s\"", |
| uri_str), |
| errdetail("There are more external files than primary segment databases on host \"%s\"", |
| uri->hostname))); |
| } |
| else |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TABLE_DEFINITION), |
| errmsg("could not assign a segment database for \"%s\"", |
| uri_str), |
| errdetail("There isn't a valid primary segment database on host \"%s\"", |
| uri->hostname))); |
| } |
| } |
| else /* HTTP */ |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TABLE_DEFINITION), |
| errmsg("could not assign a segment database for \"%s\"", |
| uri_str), |
| errdetail("There are more URIs than total primary segment databases"))); |
| } |
| } |
| } |
| |
| |
| } |
| /* (2) */ |
| else if (using_location && (uri->protocol == URI_GPFDIST || |
| uri->protocol == URI_GPFDISTS || |
| uri->protocol == URI_CUSTOM)) |
| { |
| if ((strcmp(on_clause, "COORDINATOR_ONLY") == 0 || Gp_role == GP_ROLE_UTILITY) && (uri->protocol == URI_CUSTOM)) |
| { |
| const char *uri_str = strVal(linitial(ext->urilocations)); |
| segdb_file_map[0] = pstrdup(uri_str); |
| *ismasteronly = true; |
| } |
| else |
| { |
| /* We currently don't support gpfdist in SINGLE NODE mode. */ |
| if (Gp_role == GP_ROLE_UTILITY) |
| { |
| ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("gpfdist is not supported in single node mode"))); |
| } |
| /* |
| * Re-write the location list for GPFDIST or GPFDISTS before mapping to segments. |
| * |
| * If we happen to be dealing with URI's with the 'gpfdist' (or 'gpfdists') protocol |
| * we do an extra step here. |
| * |
| * (*) We modify the urilocationlist so that every |
| * primary segdb will get a URI (therefore we duplicate the existing |
| * URI's until the list is of size = total_primaries). |
| * Example: 2 URIs, 7 total segdbs. |
| * Original LocationList: URI1->URI2 |
| * Modified LocationList: URI1->URI2->URI1->URI2->URI1->URI2->URI1 |
| * |
| * (**) We also make sure that we don't allocate more segdbs than |
| * (# of URIs x gp_external_max_segs). |
| * Example: 2 URIs, 7 total segdbs, gp_external_max_segs = 3 |
| * Original LocationList: URI1->URI2 |
| * Modified LocationList: URI1->URI2->URI1->URI2->URI1->URI2 (6 total). |
| * |
| * (***) In that case that we need to allocate only a subset of primary |
| * segdbs and not all we then also create a random map of segments to skip. |
| * Using the previous example a we create a map of 7 entries and need to |
| * randomly select 1 segdb to skip (7 - 6 = 1). so it may look like this: |
| * [F F T F F F F] - in which case we know to skip the 3rd segment only. |
| */ |
| |
| /* total num of segs that will participate in the external operation */ |
| num_segs_participating = total_primaries; |
| |
| /* max num of segs that are allowed to participate in the operation */ |
| if ((uri->protocol == URI_GPFDIST) || (uri->protocol == URI_GPFDISTS)) |
| { |
| max_participants_allowed = list_length(ext->urilocations) * |
| gp_external_max_segs; |
| } |
| else |
| { |
| /* |
| * for custom protocol, set max_participants_allowed to |
| * num_segs_participating so that assignment to segments will use |
| * all available segments |
| */ |
| max_participants_allowed = num_segs_participating; |
| } |
| |
| elog(DEBUG5, |
| "num_segs_participating = %d. max_participants_allowed = %d. number of URIs = %d", |
| num_segs_participating, max_participants_allowed, list_length(ext->urilocations)); |
| |
| /* see (**) above */ |
| if (num_segs_participating > max_participants_allowed) |
| { |
| total_to_skip = num_segs_participating - max_participants_allowed; |
| num_segs_participating = max_participants_allowed; |
| should_skip_randomly = true; |
| |
| elog(NOTICE, "External scan %s will utilize %d out " |
| "of %d segment databases", |
| (uri->protocol == URI_GPFDIST ? "from gpfdist(s) server" : "using custom protocol"), |
| num_segs_participating, |
| total_primaries); |
| } |
| |
| if (list_length(ext->urilocations) > num_segs_participating) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TABLE_DEFINITION), |
| errmsg("there are more external files (URLs) than primary segments that can read them"), |
| errdetail("Found %d URLs and %d primary segments.", |
| list_length(ext->urilocations), |
| num_segs_participating))); |
| |
| /* |
| * restart location list and fill in new list until number of |
| * locations equals the number of segments participating in this |
| * action (see (*) above for more details). |
| */ |
| while (!done) |
| { |
| foreach(c, ext->urilocations) |
| { |
| char *uri_str = (char *) strVal(lfirst(c)); |
| |
| /* append to a list of Value nodes, size nelems */ |
| modifiedloclist = lappend(modifiedloclist, makeString(pstrdup(uri_str))); |
| |
| if (list_length(modifiedloclist) == num_segs_participating) |
| { |
| done = true; |
| break; |
| } |
| |
| if (list_length(modifiedloclist) > num_segs_participating) |
| { |
| elog(ERROR, "External scan location list failed building distribution."); |
| } |
| } |
| } |
| |
| /* See (***) above for details */ |
| if (should_skip_randomly) |
| skip_map = makeRandomSegMap(total_primaries, total_to_skip); |
| |
| /* |
| * assign each URI from the new location list a primary segdb |
| */ |
| foreach(c, modifiedloclist) |
| { |
| const char *uri_str = strVal(lfirst(c)); |
| |
| uri = ParseExternalTableUri(uri_str); |
| |
| found_match = false; |
| |
| /* |
| * look through our segment database list and try to find a |
| * database that can handle this uri. |
| */ |
| for (i = 0; i < db_info->total_segment_dbs && !found_match; i++) |
| { |
| CdbComponentDatabaseInfo *p = &db_info->segment_db_info[i]; |
| int segind = p->config->segindex; |
| |
| /* |
| * Assign mapping of external file to this segdb only if: |
| * 1) This segdb is a valid primary. |
| * 2) An external file wasn't already assigned to it. |
| */ |
| if (SEGMENT_IS_ACTIVE_PRIMARY(p)) |
| { |
| /* |
| * skip this segdb if skip_map for this seg index tells us |
| * to skip it (set to 'true'). |
| */ |
| if (should_skip_randomly) |
| { |
| Assert(segind < total_primaries); |
| |
| if (skip_map[segind]) |
| continue; /* skip it */ |
| } |
| |
| if (segdb_file_map[segind] == NULL) |
| { |
| /* segdb not taken yet. assign this URI to this segdb */ |
| segdb_file_map[segind] = pstrdup(uri_str); |
| found_match = true; |
| } |
| |
| /* |
| * too bad. this segdb already has an external source |
| * assigned |
| */ |
| } |
| } |
| |
| /* We failed to find a segdb for this gpfdist(s) URI */ |
| if (!found_match) |
| { |
| /* should never happen */ |
| elog(LOG, |
| "external tables gpfdist(s) allocation error. " |
| "total_primaries: %d, num_segs_participating %d " |
| "max_participants_allowed %d, total_to_skip %d", |
| total_primaries, num_segs_participating, |
| max_participants_allowed, total_to_skip); |
| |
| elog(ERROR, |
| "internal error in createplan for external tables when trying to assign segments for gpfdist(s)"); |
| } |
| } |
| } |
| } |
| /* (3) */ |
| else if (using_execute) |
| { |
| const char *command = ext->command; |
| const char *prefix = "execute:"; |
| char *prefixed_command; |
| |
| /* build the command string for the executor - 'execute:command' */ |
| StringInfo buf = makeStringInfo(); |
| |
| appendStringInfo(buf, "%s%s", prefix, command); |
| prefixed_command = pstrdup(buf->data); |
| |
| pfree(buf->data); |
| pfree(buf); |
| buf = NULL; |
| |
| /* |
| * Now we handle each one of the ON locations separately: |
| * |
| * 1) all segs |
| * 2) one per host |
| * 3) all segs on host <foo> |
| * 4) seg <n> only |
| * 5) <n> random segs |
| * 6) master only |
| */ |
| if (strcmp(on_clause, "ALL_SEGMENTS") == 0) |
| { |
| /* all segments get a copy of the command to execute */ |
| |
| for (i = 0; i < db_info->total_segment_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *p = &db_info->segment_db_info[i]; |
| int segind = p->config->segindex; |
| |
| if (SEGMENT_IS_ACTIVE_PRIMARY(p)) |
| segdb_file_map[segind] = pstrdup(prefixed_command); |
| } |
| |
| } |
| else if (strcmp(on_clause, "PER_HOST") == 0) |
| { |
| /* 1 seg per host */ |
| |
| List *visited_hosts = NIL; |
| ListCell *lc; |
| |
| for (i = 0; i < db_info->total_segment_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *p = &db_info->segment_db_info[i]; |
| int segind = p->config->segindex; |
| |
| if (SEGMENT_IS_ACTIVE_PRIMARY(p)) |
| { |
| bool host_taken = false; |
| |
| foreach(lc, visited_hosts) |
| { |
| const char *hostname = strVal(lfirst(lc)); |
| |
| if (pg_strcasecmp(hostname, p->config->hostname) == 0) |
| { |
| host_taken = true; |
| break; |
| } |
| } |
| |
| /* |
| * if not assigned to a seg on this host before - do it |
| * now and add this hostname to the list so that we don't |
| * use segs on this host again. |
| */ |
| if (!host_taken) |
| { |
| segdb_file_map[segind] = pstrdup(prefixed_command); |
| visited_hosts = lappend(visited_hosts, |
| makeString(pstrdup(p->config->hostname))); |
| } |
| } |
| } |
| } |
| else if (strncmp(on_clause, "HOST:", strlen("HOST:")) == 0) |
| { |
| /* all segs on the specified host get copy of the command */ |
| char *hostname = on_clause + strlen("HOST:"); |
| bool match_found = false; |
| |
| for (i = 0; i < db_info->total_segment_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *p = &db_info->segment_db_info[i]; |
| int segind = p->config->segindex; |
| |
| if (SEGMENT_IS_ACTIVE_PRIMARY(p) && |
| pg_strcasecmp(hostname, p->config->hostname) == 0) |
| { |
| segdb_file_map[segind] = pstrdup(prefixed_command); |
| match_found = true; |
| } |
| } |
| |
| if (!match_found) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TABLE_DEFINITION), |
| errmsg("could not assign a segment database for command \"%s\")", |
| command), |
| errdetail("No valid primary segment was found in the requested host name \"%s\".", |
| hostname))); |
| } |
| else if (strncmp(on_clause, "SEGMENT_ID:", strlen("SEGMENT_ID:")) == 0) |
| { |
| /* 1 seg with specified id gets a copy of the command */ |
| int target_segid = atoi(on_clause + strlen("SEGMENT_ID:")); |
| bool match_found = false; |
| |
| for (i = 0; i < db_info->total_segment_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *p = &db_info->segment_db_info[i]; |
| int segind = p->config->segindex; |
| |
| if (SEGMENT_IS_ACTIVE_PRIMARY(p) && segind == target_segid) |
| { |
| segdb_file_map[segind] = pstrdup(prefixed_command); |
| match_found = true; |
| } |
| } |
| |
| if (!match_found) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TABLE_DEFINITION), |
| errmsg("could not assign a segment database for command \"%s\"", |
| command), |
| errdetail("The requested segment id %d is not a valid primary segment or doesn't exist in the database", |
| target_segid))); |
| } |
| else if (strncmp(on_clause, "TOTAL_SEGS:", strlen("TOTAL_SEGS:")) == 0) |
| { |
| /* total n segments selected randomly */ |
| |
| int num_segs_to_use = atoi(on_clause + strlen("TOTAL_SEGS:")); |
| |
| if (num_segs_to_use > total_primaries) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TABLE_DEFINITION), |
| errmsg("table defined with EXECUTE ON %d but there are only %d valid primary segments in the database", |
| num_segs_to_use, total_primaries))); |
| |
| total_to_skip = total_primaries - num_segs_to_use; |
| skip_map = makeRandomSegMap(total_primaries, total_to_skip); |
| |
| for (i = 0; i < db_info->total_segment_dbs; i++) |
| { |
| CdbComponentDatabaseInfo *p = &db_info->segment_db_info[i]; |
| int segind = p->config->segindex; |
| |
| if (SEGMENT_IS_ACTIVE_PRIMARY(p)) |
| { |
| Assert(segind < total_primaries); |
| if (skip_map[segind]) |
| continue; /* skip it */ |
| |
| segdb_file_map[segind] = pstrdup(prefixed_command); |
| } |
| } |
| } |
| else if (strcmp(on_clause, "COORDINATOR_ONLY") == 0) |
| { |
| /* |
| * store the command in first array entry and indicate that it is |
| * meant for the master segment (not seg o). |
| */ |
| segdb_file_map[0] = pstrdup(prefixed_command); |
| *ismasteronly = true; |
| } |
| else |
| { |
| elog(ERROR, "Internal error in createplan for external tables: got invalid ON clause code %s", |
| on_clause); |
| } |
| } |
| else |
| { |
| /* should never get here */ |
| elog(ERROR, "Internal error in createplan for external tables"); |
| } |
| |
| /* |
| * convert array map to a list so it can be serialized as part of the plan |
| */ |
| filenames = NIL; |
| for (i = 0; i < total_primaries; i++) |
| { |
| if (segdb_file_map[i] != NULL) |
| filenames = lappend(filenames, makeString(segdb_file_map[i])); |
| else |
| { |
| /* no file for this segdb. add a null entry */ |
| String *n = makeNode(String); |
| |
| filenames = lappend(filenames, n); |
| } |
| } |
| |
| return filenames; |
| } |