| /*------------------------------------------------------------------------- |
| * |
| * postgres_fdw.c |
| * Foreign-data wrapper for remote PostgreSQL servers |
| * |
| * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group |
| * |
| * IDENTIFICATION |
| * contrib/postgres_fdw/postgres_fdw.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include <limits.h> |
| |
| #include "access/htup_details.h" |
| #include "access/sysattr.h" |
| #include "access/table.h" |
| #include "catalog/pg_class.h" |
| #include "catalog/pg_opfamily.h" |
| #include "commands/defrem.h" |
| #include "commands/explain.h" |
| #include "commands/vacuum.h" |
| #include "executor/execAsync.h" |
| #include "foreign/fdwapi.h" |
| #include "funcapi.h" |
| #include "miscadmin.h" |
| #include "nodes/makefuncs.h" |
| #include "nodes/nodeFuncs.h" |
| #include "optimizer/appendinfo.h" |
| #include "optimizer/clauses.h" |
| #include "optimizer/cost.h" |
| #include "optimizer/inherit.h" |
| #include "optimizer/optimizer.h" |
| #include "optimizer/pathnode.h" |
| #include "optimizer/paths.h" |
| #include "optimizer/planmain.h" |
| #include "optimizer/prep.h" |
| #include "optimizer/restrictinfo.h" |
| #include "optimizer/tlist.h" |
| #include "parser/parsetree.h" |
| #include "postgres_fdw.h" |
| #include "storage/latch.h" |
| #include "utils/builtins.h" |
| #include "utils/float.h" |
| #include "utils/guc.h" |
| #include "utils/lsyscache.h" |
| #include "utils/memutils.h" |
| #include "utils/rel.h" |
| #include "utils/sampling.h" |
| #include "utils/selfuncs.h" |
| |
| PG_MODULE_MAGIC; |
| |
| /* Default CPU cost to start up a foreign query. */ |
| #define DEFAULT_FDW_STARTUP_COST 100.0 |
| |
| /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */ |
| #define DEFAULT_FDW_TUPLE_COST 0.01 |
| |
| /* If no remote estimates, assume a sort costs 20% extra */ |
| #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 |
| |
| /* |
| * Indexes of FDW-private information stored in fdw_private lists. |
| * |
| * These items are indexed with the enum FdwScanPrivateIndex, so an item |
| * can be fetched with list_nth(). For example, to get the SELECT statement: |
| * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); |
| */ |
| enum FdwScanPrivateIndex |
| { |
| /* SQL statement to execute remotely (as a String node) */ |
| FdwScanPrivateSelectSql, |
| /* Integer list of attribute numbers retrieved by the SELECT */ |
| FdwScanPrivateRetrievedAttrs, |
| /* Integer representing the desired fetch_size */ |
| FdwScanPrivateFetchSize, |
| |
| /* |
| * String describing join i.e. names of relations being joined and types |
| * of join, added when the scan is join |
| */ |
| FdwScanPrivateRelations |
| }; |
| |
| /* |
| * Similarly, this enum describes what's kept in the fdw_private list for |
| * a ModifyTable node referencing a postgres_fdw foreign table. We store: |
| * |
| * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server |
| * 2) Integer list of target attribute numbers for INSERT/UPDATE |
| * (NIL for a DELETE) |
| * 3) Length till the end of VALUES clause for INSERT |
| * (-1 for a DELETE/UPDATE) |
| * 4) Boolean flag showing if the remote query has a RETURNING clause |
| * 5) Integer list of attribute numbers retrieved by RETURNING, if any |
| */ |
| enum FdwModifyPrivateIndex |
| { |
| /* SQL statement to execute remotely (as a String node) */ |
| FdwModifyPrivateUpdateSql, |
| /* Integer list of target attribute numbers for INSERT/UPDATE */ |
| FdwModifyPrivateTargetAttnums, |
| /* Length till the end of VALUES clause (as an Integer node) */ |
| FdwModifyPrivateLen, |
| /* has-returning flag (as a Boolean node) */ |
| FdwModifyPrivateHasReturning, |
| /* Integer list of attribute numbers retrieved by RETURNING */ |
| FdwModifyPrivateRetrievedAttrs |
| }; |
| |
| /* |
| * Similarly, this enum describes what's kept in the fdw_private list for |
| * a ForeignScan node that modifies a foreign table directly. We store: |
| * |
| * 1) UPDATE/DELETE statement text to be sent to the remote server |
| * 2) Boolean flag showing if the remote query has a RETURNING clause |
| * 3) Integer list of attribute numbers retrieved by RETURNING, if any |
| * 4) Boolean flag showing if we set the command es_processed |
| */ |
| enum FdwDirectModifyPrivateIndex |
| { |
| /* SQL statement to execute remotely (as a String node) */ |
| FdwDirectModifyPrivateUpdateSql, |
| /* has-returning flag (as a Boolean node) */ |
| FdwDirectModifyPrivateHasReturning, |
| /* Integer list of attribute numbers retrieved by RETURNING */ |
| FdwDirectModifyPrivateRetrievedAttrs, |
| /* set-processed flag (as a Boolean node) */ |
| FdwDirectModifyPrivateSetProcessed |
| }; |
| |
| /* |
| * Execution state of a foreign scan using postgres_fdw. |
| */ |
| typedef struct PgFdwScanState |
| { |
| Relation rel; /* relcache entry for the foreign table. NULL |
| * for a foreign join scan. */ |
| TupleDesc tupdesc; /* tuple descriptor of scan */ |
| AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ |
| |
| /* extracted fdw_private data */ |
| char *query; /* text of SELECT command */ |
| List *retrieved_attrs; /* list of retrieved attribute numbers */ |
| |
| /* for remote query execution */ |
| PGconn *conn; /* connection for the scan */ |
| PgFdwConnState *conn_state; /* extra per-connection state */ |
| unsigned int cursor_number; /* quasi-unique ID for my cursor */ |
| bool cursor_exists; /* have we created the cursor? */ |
| int numParams; /* number of parameters passed to query */ |
| FmgrInfo *param_flinfo; /* output conversion functions for them */ |
| List *param_exprs; /* executable expressions for param values */ |
| const char **param_values; /* textual values of query parameters */ |
| |
| /* for storing result tuples */ |
| HeapTuple *tuples; /* array of currently-retrieved tuples */ |
| int num_tuples; /* # of tuples in array */ |
| int next_tuple; /* index of next one to return */ |
| |
| /* batch-level state, for optimizing rewinds and avoiding useless fetch */ |
| int fetch_ct_2; /* Min(# of fetches done, 2) */ |
| bool eof_reached; /* true if last fetch reached EOF */ |
| |
| /* for asynchronous execution */ |
| bool async_capable; /* engage asynchronous-capable logic? */ |
| |
| /* working memory contexts */ |
| MemoryContext batch_cxt; /* context holding current batch of tuples */ |
| MemoryContext temp_cxt; /* context for per-tuple temporary data */ |
| |
| int fetch_size; /* number of tuples per fetch */ |
| } PgFdwScanState; |
| |
| /* |
| * Execution state of a foreign insert/update/delete operation. |
| */ |
| typedef struct PgFdwModifyState |
| { |
| Relation rel; /* relcache entry for the foreign table */ |
| AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ |
| |
| /* for remote query execution */ |
| PGconn *conn; /* connection for the scan */ |
| PgFdwConnState *conn_state; /* extra per-connection state */ |
| char *p_name; /* name of prepared statement, if created */ |
| |
| /* extracted fdw_private data */ |
| char *query; /* text of INSERT/UPDATE/DELETE command */ |
| char *orig_query; /* original text of INSERT command */ |
| List *target_attrs; /* list of target attribute numbers */ |
| int values_end; /* length up to the end of VALUES */ |
| int batch_size; /* value of FDW option "batch_size" */ |
| bool has_returning; /* is there a RETURNING clause? */ |
| List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ |
| |
| /* info about parameters for prepared statement */ |
| AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ |
| int p_nums; /* number of parameters to transmit */ |
| FmgrInfo *p_flinfo; /* output conversion functions for them */ |
| |
| /* batch operation stuff */ |
| int num_slots; /* number of slots to insert */ |
| |
| /* working memory context */ |
| MemoryContext temp_cxt; /* context for per-tuple temporary data */ |
| |
| /* for update row movement if subplan result rel */ |
| struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if |
| * created */ |
| } PgFdwModifyState; |
| |
| /* |
| * Execution state of a foreign scan that modifies a foreign table directly. |
| */ |
| typedef struct PgFdwDirectModifyState |
| { |
| Relation rel; /* relcache entry for the foreign table */ |
| AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ |
| |
| /* extracted fdw_private data */ |
| char *query; /* text of UPDATE/DELETE command */ |
| bool has_returning; /* is there a RETURNING clause? */ |
| List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ |
| bool set_processed; /* do we set the command es_processed? */ |
| |
| /* for remote query execution */ |
| PGconn *conn; /* connection for the update */ |
| PgFdwConnState *conn_state; /* extra per-connection state */ |
| int numParams; /* number of parameters passed to query */ |
| FmgrInfo *param_flinfo; /* output conversion functions for them */ |
| List *param_exprs; /* executable expressions for param values */ |
| const char **param_values; /* textual values of query parameters */ |
| |
| /* for storing result tuples */ |
| PGresult *result; /* result for query */ |
| int num_tuples; /* # of result tuples */ |
| int next_tuple; /* index of next one to return */ |
| Relation resultRel; /* relcache entry for the target relation */ |
| AttrNumber *attnoMap; /* array of attnums of input user columns */ |
| AttrNumber ctidAttno; /* attnum of input ctid column */ |
| AttrNumber oidAttno; /* attnum of input oid column */ |
| bool hasSystemCols; /* are there system columns of resultRel? */ |
| |
| /* working memory context */ |
| MemoryContext temp_cxt; /* context for per-tuple temporary data */ |
| } PgFdwDirectModifyState; |
| |
| /* |
| * Workspace for analyzing a foreign table. |
| */ |
| typedef struct PgFdwAnalyzeState |
| { |
| Relation rel; /* relcache entry for the foreign table */ |
| AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ |
| List *retrieved_attrs; /* attr numbers retrieved by query */ |
| |
| /* collected sample rows */ |
| HeapTuple *rows; /* array of size targrows */ |
| int targrows; /* target # of sample rows */ |
| int numrows; /* # of sample rows collected */ |
| |
| /* for random sampling */ |
| double samplerows; /* # of rows fetched */ |
| double rowstoskip; /* # of rows to skip before next sample */ |
| ReservoirStateData rstate; /* state for reservoir sampling */ |
| |
| /* working memory contexts */ |
| MemoryContext anl_cxt; /* context for per-analyze lifespan data */ |
| MemoryContext temp_cxt; /* context for per-tuple temporary data */ |
| } PgFdwAnalyzeState; |
| |
| /* |
| * This enum describes what's kept in the fdw_private list for a ForeignPath. |
| * We store: |
| * |
| * 1) Boolean flag showing if the remote query has the final sort |
| * 2) Boolean flag showing if the remote query has the LIMIT clause |
| */ |
| enum FdwPathPrivateIndex |
| { |
| /* has-final-sort flag (as a Boolean node) */ |
| FdwPathPrivateHasFinalSort, |
| /* has-limit flag (as a Boolean node) */ |
| FdwPathPrivateHasLimit |
| }; |
| |
| /* Struct for extra information passed to estimate_path_cost_size() */ |
| typedef struct |
| { |
| PathTarget *target; |
| bool has_final_sort; |
| bool has_limit; |
| double limit_tuples; |
| int64 count_est; |
| int64 offset_est; |
| } PgFdwPathExtraData; |
| |
| /* |
| * Identify the attribute where data conversion fails. |
| */ |
| typedef struct ConversionLocation |
| { |
| AttrNumber cur_attno; /* attribute number being processed, or 0 */ |
| Relation rel; /* foreign table being processed, or NULL */ |
| ForeignScanState *fsstate; /* plan node being processed, or NULL */ |
| } ConversionLocation; |
| |
| /* Callback argument for ec_member_matches_foreign */ |
| typedef struct |
| { |
| Expr *current; /* current expr, or NULL if not yet found */ |
| List *already_used; /* expressions already dealt with */ |
| } ec_member_foreign_arg; |
| |
| /* |
| * SQL functions |
| */ |
| PG_FUNCTION_INFO_V1(postgres_fdw_handler); |
| |
| /* |
| * FDW callback routines |
| */ |
| static void postgresGetForeignRelSize(PlannerInfo *root, |
| RelOptInfo *baserel, |
| Oid foreigntableid); |
| static void postgresGetForeignPaths(PlannerInfo *root, |
| RelOptInfo *baserel, |
| Oid foreigntableid); |
| static ForeignScan *postgresGetForeignPlan(PlannerInfo *root, |
| RelOptInfo *foreignrel, |
| Oid foreigntableid, |
| ForeignPath *best_path, |
| List *tlist, |
| List *scan_clauses, |
| Plan *outer_plan); |
| static void postgresBeginForeignScan(ForeignScanState *node, int eflags); |
| static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); |
| static void postgresReScanForeignScan(ForeignScanState *node); |
| static void postgresEndForeignScan(ForeignScanState *node); |
| static void postgresAddForeignUpdateTargets(PlannerInfo *root, |
| Index rtindex, |
| RangeTblEntry *target_rte, |
| Relation target_relation); |
| static List *postgresPlanForeignModify(PlannerInfo *root, |
| ModifyTable *plan, |
| Index resultRelation, |
| int subplan_index); |
| static void postgresBeginForeignModify(ModifyTableState *mtstate, |
| ResultRelInfo *resultRelInfo, |
| List *fdw_private, |
| int subplan_index, |
| int eflags); |
| static TupleTableSlot *postgresExecForeignInsert(EState *estate, |
| ResultRelInfo *resultRelInfo, |
| TupleTableSlot *slot, |
| TupleTableSlot *planSlot); |
| static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate, |
| ResultRelInfo *resultRelInfo, |
| TupleTableSlot **slots, |
| TupleTableSlot **planSlots, |
| int *numSlots); |
| static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo); |
| static TupleTableSlot *postgresExecForeignUpdate(EState *estate, |
| ResultRelInfo *resultRelInfo, |
| TupleTableSlot *slot, |
| TupleTableSlot *planSlot); |
| static TupleTableSlot *postgresExecForeignDelete(EState *estate, |
| ResultRelInfo *resultRelInfo, |
| TupleTableSlot *slot, |
| TupleTableSlot *planSlot); |
| static void postgresEndForeignModify(EState *estate, |
| ResultRelInfo *resultRelInfo); |
| static void postgresBeginForeignInsert(ModifyTableState *mtstate, |
| ResultRelInfo *resultRelInfo); |
| static void postgresEndForeignInsert(EState *estate, |
| ResultRelInfo *resultRelInfo); |
| static int postgresIsForeignRelUpdatable(Relation rel); |
| static bool postgresPlanDirectModify(PlannerInfo *root, |
| ModifyTable *plan, |
| Index resultRelation, |
| int subplan_index); |
| static void postgresBeginDirectModify(ForeignScanState *node, int eflags); |
| static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node); |
| static void postgresEndDirectModify(ForeignScanState *node); |
| static void postgresExplainForeignScan(ForeignScanState *node, |
| ExplainState *es); |
| static void postgresExplainForeignModify(ModifyTableState *mtstate, |
| ResultRelInfo *rinfo, |
| List *fdw_private, |
| int subplan_index, |
| ExplainState *es); |
| static void postgresExplainDirectModify(ForeignScanState *node, |
| ExplainState *es); |
| static void postgresExecForeignTruncate(List *rels, |
| DropBehavior behavior, |
| bool restart_seqs); |
| static bool postgresAnalyzeForeignTable(Relation relation, |
| AcquireSampleRowsFunc *func, |
| BlockNumber *totalpages); |
| static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, |
| Oid serverOid); |
| static void postgresGetForeignJoinPaths(PlannerInfo *root, |
| RelOptInfo *joinrel, |
| RelOptInfo *outerrel, |
| RelOptInfo *innerrel, |
| JoinType jointype, |
| JoinPathExtraData *extra); |
| static bool postgresRecheckForeignScan(ForeignScanState *node, |
| TupleTableSlot *slot); |
| static void postgresGetForeignUpperPaths(PlannerInfo *root, |
| UpperRelationKind stage, |
| RelOptInfo *input_rel, |
| RelOptInfo *output_rel, |
| void *extra); |
| static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); |
| static void postgresForeignAsyncRequest(AsyncRequest *areq); |
| static void postgresForeignAsyncConfigureWait(AsyncRequest *areq); |
| static void postgresForeignAsyncNotify(AsyncRequest *areq); |
| |
| static int greenplumCheckIsCloudberry(UserMapping *user); |
| |
| /* |
| * Helper functions |
| */ |
| static void estimate_path_cost_size(PlannerInfo *root, |
| RelOptInfo *foreignrel, |
| List *param_join_conds, |
| List *pathkeys, |
| PgFdwPathExtraData *fpextra, |
| double *p_rows, int *p_width, |
| Cost *p_startup_cost, Cost *p_total_cost); |
| static void get_remote_estimate(const char *sql, |
| PGconn *conn, |
| double *rows, |
| int *width, |
| Cost *startup_cost, |
| Cost *total_cost); |
| static void adjust_foreign_grouping_path_cost(PlannerInfo *root, |
| List *pathkeys, |
| double retrieved_rows, |
| double width, |
| double limit_tuples, |
| Cost *p_startup_cost, |
| Cost *p_run_cost); |
| static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, |
| EquivalenceClass *ec, EquivalenceMember *em, |
| void *arg); |
| static void create_cursor(ForeignScanState *node); |
| static void fetch_more_data(ForeignScanState *node); |
| static void close_cursor(PGconn *conn, unsigned int cursor_number, |
| PgFdwConnState *conn_state); |
| static PgFdwModifyState *create_foreign_modify(EState *estate, |
| RangeTblEntry *rte, |
| ResultRelInfo *resultRelInfo, |
| CmdType operation, |
| Plan *subplan, |
| char *query, |
| List *target_attrs, |
| int values_end, |
| bool has_returning, |
| List *retrieved_attrs); |
| static TupleTableSlot **execute_foreign_modify(EState *estate, |
| ResultRelInfo *resultRelInfo, |
| CmdType operation, |
| TupleTableSlot **slots, |
| TupleTableSlot **planSlots, |
| int *numSlots); |
| static void prepare_foreign_modify(PgFdwModifyState *fmstate); |
| static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, |
| ItemPointer tupleid, |
| TupleTableSlot **slots, |
| int numSlots); |
| static void store_returning_result(PgFdwModifyState *fmstate, |
| TupleTableSlot *slot, PGresult *res); |
| static void finish_foreign_modify(PgFdwModifyState *fmstate); |
| static void deallocate_query(PgFdwModifyState *fmstate); |
| static List *build_remote_returning(Index rtindex, Relation rel, |
| List *returningList); |
| static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); |
| static void execute_dml_stmt(ForeignScanState *node); |
| static TupleTableSlot *get_returning_data(ForeignScanState *node); |
| static void init_returning_filter(PgFdwDirectModifyState *dmstate, |
| List *fdw_scan_tlist, |
| Index rtindex); |
| static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate, |
| ResultRelInfo *resultRelInfo, |
| TupleTableSlot *slot, |
| EState *estate); |
| static void prepare_query_params(PlanState *node, |
| List *fdw_exprs, |
| int numParams, |
| FmgrInfo **param_flinfo, |
| List **param_exprs, |
| const char ***param_values); |
| static void process_query_params(ExprContext *econtext, |
| FmgrInfo *param_flinfo, |
| List *param_exprs, |
| const char **param_values); |
| static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, |
| HeapTuple *rows, int targrows, |
| double *totalrows, |
| double *totaldeadrows); |
| static void analyze_row_processor(PGresult *res, int row, |
| PgFdwAnalyzeState *astate); |
| static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); |
| static void fetch_more_data_begin(AsyncRequest *areq); |
| static void complete_pending_request(AsyncRequest *areq); |
| static HeapTuple make_tuple_from_result_row(PGresult *res, |
| int row, |
| Relation rel, |
| AttInMetadata *attinmeta, |
| List *retrieved_attrs, |
| ForeignScanState *fsstate, |
| MemoryContext temp_context); |
| static void conversion_error_callback(void *arg); |
| static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, |
| JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, |
| JoinPathExtraData *extra); |
| static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, |
| Node *havingQual); |
| static List *get_useful_pathkeys_for_relation(PlannerInfo *root, |
| RelOptInfo *rel); |
| static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel); |
| static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, |
| Path *epq_path); |
| static void add_foreign_grouping_paths(PlannerInfo *root, |
| RelOptInfo *input_rel, |
| RelOptInfo *grouped_rel, |
| GroupPathExtraData *extra); |
| static void add_foreign_ordered_paths(PlannerInfo *root, |
| RelOptInfo *input_rel, |
| RelOptInfo *ordered_rel); |
| static void add_foreign_final_paths(PlannerInfo *root, |
| RelOptInfo *input_rel, |
| RelOptInfo *final_rel, |
| FinalPathExtraData *extra); |
| static void apply_server_options(PgFdwRelationInfo *fpinfo); |
| static void apply_table_options(PgFdwRelationInfo *fpinfo); |
| static void merge_fdw_options(PgFdwRelationInfo *fpinfo, |
| const PgFdwRelationInfo *fpinfo_o, |
| const PgFdwRelationInfo *fpinfo_i); |
| static int get_batch_size_option(Relation rel); |
| |
| |
| /* |
| * Foreign-data wrapper handler function: return a struct with pointers |
| * to my callback routines. |
| */ |
| Datum |
| postgres_fdw_handler(PG_FUNCTION_ARGS) |
| { |
| FdwRoutine *routine = makeNode(FdwRoutine); |
| |
| /* Functions for scanning foreign tables */ |
| routine->GetForeignRelSize = postgresGetForeignRelSize; |
| routine->GetForeignPaths = postgresGetForeignPaths; |
| routine->GetForeignPlan = postgresGetForeignPlan; |
| routine->BeginForeignScan = postgresBeginForeignScan; |
| routine->IterateForeignScan = postgresIterateForeignScan; |
| routine->ReScanForeignScan = postgresReScanForeignScan; |
| routine->EndForeignScan = postgresEndForeignScan; |
| |
| /* Functions for updating foreign tables */ |
| routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; |
| routine->PlanForeignModify = postgresPlanForeignModify; |
| routine->BeginForeignModify = postgresBeginForeignModify; |
| routine->ExecForeignInsert = postgresExecForeignInsert; |
| routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert; |
| routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize; |
| routine->ExecForeignUpdate = postgresExecForeignUpdate; |
| routine->ExecForeignDelete = postgresExecForeignDelete; |
| routine->EndForeignModify = postgresEndForeignModify; |
| routine->BeginForeignInsert = postgresBeginForeignInsert; |
| routine->EndForeignInsert = postgresEndForeignInsert; |
| routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; |
| routine->PlanDirectModify = postgresPlanDirectModify; |
| routine->BeginDirectModify = postgresBeginDirectModify; |
| routine->IterateDirectModify = postgresIterateDirectModify; |
| routine->EndDirectModify = postgresEndDirectModify; |
| |
| /* Function for EvalPlanQual rechecks */ |
| routine->RecheckForeignScan = postgresRecheckForeignScan; |
| /* Support functions for EXPLAIN */ |
| routine->ExplainForeignScan = postgresExplainForeignScan; |
| routine->ExplainForeignModify = postgresExplainForeignModify; |
| routine->ExplainDirectModify = postgresExplainDirectModify; |
| |
| /* Support function for TRUNCATE */ |
| routine->ExecForeignTruncate = postgresExecForeignTruncate; |
| |
| /* Support functions for ANALYZE */ |
| routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; |
| |
| /* Support functions for IMPORT FOREIGN SCHEMA */ |
| routine->ImportForeignSchema = postgresImportForeignSchema; |
| |
| /* Support functions for join push-down */ |
| routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; |
| |
| /* Support functions for upper relation push-down */ |
| routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; |
| |
| /* Support functions for asynchronous execution */ |
| routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; |
| routine->ForeignAsyncRequest = postgresForeignAsyncRequest; |
| routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; |
| routine->ForeignAsyncNotify = postgresForeignAsyncNotify; |
| |
| PG_RETURN_POINTER(routine); |
| } |
| |
| /* |
| * postgresGetForeignRelSize |
| * Estimate # of rows and width of the result of the scan |
| * |
| * We should consider the effect of all baserestrictinfo clauses here, but |
| * not any join clauses. |
| */ |
| static void |
| postgresGetForeignRelSize(PlannerInfo *root, |
| RelOptInfo *baserel, |
| Oid foreigntableid) |
| { |
| PgFdwRelationInfo *fpinfo; |
| ListCell *lc; |
| |
| /* |
| * We use PgFdwRelationInfo to pass various information to subsequent |
| * functions. |
| */ |
| fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); |
| baserel->fdw_private = (void *) fpinfo; |
| |
| /* Base foreign tables need to be pushed down always. */ |
| fpinfo->pushdown_safe = true; |
| |
| /* Look up foreign-table catalog info. */ |
| fpinfo->table = GetForeignTable(foreigntableid); |
| fpinfo->server = GetForeignServer(fpinfo->table->serverid); |
| |
| /* |
| * Extract user-settable option values. Note that per-table settings of |
| * use_remote_estimate, fetch_size and async_capable override per-server |
| * settings of them, respectively. |
| */ |
| fpinfo->use_remote_estimate = false; |
| fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; |
| fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; |
| fpinfo->shippable_extensions = NIL; |
| fpinfo->fetch_size = 100; |
| fpinfo->async_capable = false; |
| |
| apply_server_options(fpinfo); |
| apply_table_options(fpinfo); |
| |
| /* |
| * If the table or the server is configured to use remote estimates, |
| * identify which user to do remote access as during planning. This |
| * should match what ExecCheckPermissions() does. If we fail due to lack |
| * of permissions, the query would have failed at runtime anyway. |
| */ |
| if (fpinfo->use_remote_estimate) |
| { |
| Oid userid; |
| |
| userid = OidIsValid(baserel->userid) ? baserel->userid : GetUserId(); |
| fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid); |
| } |
| else |
| fpinfo->user = NULL; |
| |
| /* |
| * Identify which baserestrictinfo clauses can be sent to the remote |
| * server and which can't. |
| */ |
| classifyConditions(root, baserel, baserel->baserestrictinfo, |
| &fpinfo->remote_conds, &fpinfo->local_conds); |
| |
| /* |
| * Identify which attributes will need to be retrieved from the remote |
| * server. These include all attrs needed for joins or final output, plus |
| * all attrs used in the local_conds. (Note: if we end up using a |
| * parameterized scan, it's possible that some of the join clauses will be |
| * sent to the remote and thus we wouldn't really need to retrieve the |
| * columns used in them. Doesn't seem worth detecting that case though.) |
| */ |
| fpinfo->attrs_used = NULL; |
| pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid, |
| &fpinfo->attrs_used); |
| foreach(lc, fpinfo->local_conds) |
| { |
| RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); |
| |
| pull_varattnos((Node *) rinfo->clause, baserel->relid, |
| &fpinfo->attrs_used); |
| } |
| |
| /* |
| * Compute the selectivity and cost of the local_conds, so we don't have |
| * to do it over again for each path. The best we can do for these |
| * conditions is to estimate selectivity on the basis of local statistics. |
| */ |
| fpinfo->local_conds_sel = clauselist_selectivity(root, |
| fpinfo->local_conds, |
| baserel->relid, |
| JOIN_INNER, |
| NULL, |
| false); |
| |
| cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); |
| |
| /* |
| * Set # of retrieved rows and cached relation costs to some negative |
| * value, so that we can detect when they are set to some sensible values, |
| * during one (usually the first) of the calls to estimate_path_cost_size. |
| */ |
| fpinfo->retrieved_rows = -1; |
| fpinfo->rel_startup_cost = -1; |
| fpinfo->rel_total_cost = -1; |
| |
| /* |
| * If the table or the server is configured to use remote estimates, |
| * connect to the foreign server and execute EXPLAIN to estimate the |
| * number of rows selected by the restriction clauses, as well as the |
| * average row width. Otherwise, estimate using whatever statistics we |
| * have locally, in a way similar to ordinary tables. |
| */ |
| if (fpinfo->use_remote_estimate) |
| { |
| /* |
| * Get cost/size estimates with help of remote server. Save the |
| * values in fpinfo so we don't need to do it again to generate the |
| * basic foreign path. |
| */ |
| estimate_path_cost_size(root, baserel, NIL, NIL, NULL, |
| &fpinfo->rows, &fpinfo->width, |
| &fpinfo->startup_cost, &fpinfo->total_cost); |
| |
| /* Report estimated baserel size to planner. */ |
| baserel->rows = fpinfo->rows; |
| baserel->reltarget->width = fpinfo->width; |
| } |
| else |
| { |
| /* |
| * If the foreign table has never been ANALYZEd, it will have |
| * reltuples < 0, meaning "unknown". We can't do much if we're not |
| * allowed to consult the remote server, but we can use a hack similar |
| * to plancat.c's treatment of empty relations: use a minimum size |
| * estimate of 10 pages, and divide by the column-datatype-based width |
| * estimate to get the corresponding number of tuples. |
| */ |
| if (baserel->tuples < 0) |
| { |
| baserel->pages = 10; |
| baserel->tuples = |
| (10 * BLCKSZ) / (baserel->reltarget->width + |
| MAXALIGN(SizeofHeapTupleHeader)); |
| } |
| |
| /* Estimate baserel size as best we can with local statistics. */ |
| set_baserel_size_estimates(root, baserel); |
| |
| /* Fill in basically-bogus cost estimates for use later. */ |
| estimate_path_cost_size(root, baserel, NIL, NIL, NULL, |
| &fpinfo->rows, &fpinfo->width, |
| &fpinfo->startup_cost, &fpinfo->total_cost); |
| } |
| |
| /* |
| * fpinfo->relation_name gets the numeric rangetable index of the foreign |
| * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a |
| * human-readable string at that time.) |
| */ |
| fpinfo->relation_name = psprintf("%u", baserel->relid); |
| |
| /* No outer and inner relations. */ |
| fpinfo->make_outerrel_subquery = false; |
| fpinfo->make_innerrel_subquery = false; |
| fpinfo->lower_subquery_rels = NULL; |
| /* Set the relation index. */ |
| fpinfo->relation_index = baserel->relid; |
| } |
| |
| /* |
| * get_useful_ecs_for_relation |
| * Determine which EquivalenceClasses might be involved in useful |
| * orderings of this relation. |
| * |
| * This function is in some respects a mirror image of the core function |
| * pathkeys_useful_for_merging: for a regular table, we know what indexes |
| * we have and want to test whether any of them are useful. For a foreign |
| * table, we don't know what indexes are present on the remote side but |
| * want to speculate about which ones we'd like to use if they existed. |
| * |
| * This function returns a list of potentially-useful equivalence classes, |
| * but it does not guarantee that an EquivalenceMember exists which contains |
| * Vars only from the given relation. For example, given ft1 JOIN t1 ON |
| * ft1.x + t1.x = 0, this function will say that the equivalence class |
| * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and |
| * t1 is local (or on a different server), it will turn out that no useful |
| * ORDER BY clause can be generated. It's not our job to figure that out |
| * here; we're only interested in identifying relevant ECs. |
| */ |
| static List * |
| get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel) |
| { |
| List *useful_eclass_list = NIL; |
| ListCell *lc; |
| Relids relids; |
| |
| /* |
| * First, consider whether any active EC is potentially useful for a merge |
| * join against this relation. |
| */ |
| if (rel->has_eclass_joins) |
| { |
| foreach(lc, root->eq_classes) |
| { |
| EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc); |
| |
| if (eclass_useful_for_merging(root, cur_ec, rel)) |
| useful_eclass_list = lappend(useful_eclass_list, cur_ec); |
| } |
| } |
| |
| /* |
| * Next, consider whether there are any non-EC derivable join clauses that |
| * are merge-joinable. If the joininfo list is empty, we can exit |
| * quickly. |
| */ |
| if (rel->joininfo == NIL) |
| return useful_eclass_list; |
| |
| /* If this is a child rel, we must use the topmost parent rel to search. */ |
| if (IS_OTHER_REL(rel)) |
| { |
| Assert(!bms_is_empty(rel->top_parent_relids)); |
| relids = rel->top_parent_relids; |
| } |
| else |
| relids = rel->relids; |
| |
| /* Check each join clause in turn. */ |
| foreach(lc, rel->joininfo) |
| { |
| RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc); |
| |
| /* Consider only mergejoinable clauses */ |
| if (restrictinfo->mergeopfamilies == NIL) |
| continue; |
| |
| /* Make sure we've got canonical ECs. */ |
| update_mergeclause_eclasses(root, restrictinfo); |
| |
| /* |
| * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee |
| * that left_ec and right_ec will be initialized, per comments in |
| * distribute_qual_to_rels. |
| * |
| * We want to identify which side of this merge-joinable clause |
| * contains columns from the relation produced by this RelOptInfo. We |
| * test for overlap, not containment, because there could be extra |
| * relations on either side. For example, suppose we've got something |
| * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON |
| * A.y = D.y. The input rel might be the joinrel between A and B, and |
| * we'll consider the join clause A.y = D.y. relids contains a |
| * relation not involved in the join class (B) and the equivalence |
| * class for the left-hand side of the clause contains a relation not |
| * involved in the input rel (C). Despite the fact that we have only |
| * overlap and not containment in either direction, A.y is potentially |
| * useful as a sort column. |
| * |
| * Note that it's even possible that relids overlaps neither side of |
| * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x |
| * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list, |
| * but overlaps neither side of B. In that case, we just skip this |
| * join clause, since it doesn't suggest a useful sort order for this |
| * relation. |
| */ |
| if (bms_overlap(relids, restrictinfo->right_ec->ec_relids)) |
| useful_eclass_list = list_append_unique_ptr(useful_eclass_list, |
| restrictinfo->right_ec); |
| else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids)) |
| useful_eclass_list = list_append_unique_ptr(useful_eclass_list, |
| restrictinfo->left_ec); |
| } |
| |
| return useful_eclass_list; |
| } |
| |
| /* |
| * get_useful_pathkeys_for_relation |
| * Determine which orderings of a relation might be useful. |
| * |
| * Getting data in sorted order can be useful either because the requested |
| * order matches the final output ordering for the overall query we're |
| * planning, or because it enables an efficient merge join. Here, we try |
| * to figure out which pathkeys to consider. |
| */ |
| static List * |
| get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel) |
| { |
| List *useful_pathkeys_list = NIL; |
| List *useful_eclass_list; |
| PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; |
| EquivalenceClass *query_ec = NULL; |
| ListCell *lc; |
| |
| /* |
| * Pushing the query_pathkeys to the remote server is always worth |
| * considering, because it might let us avoid a local sort. |
| */ |
| fpinfo->qp_is_pushdown_safe = false; |
| if (root->query_pathkeys) |
| { |
| bool query_pathkeys_ok = true; |
| |
| foreach(lc, root->query_pathkeys) |
| { |
| PathKey *pathkey = (PathKey *) lfirst(lc); |
| |
| /* |
| * The planner and executor don't have any clever strategy for |
| * taking data sorted by a prefix of the query's pathkeys and |
| * getting it to be sorted by all of those pathkeys. We'll just |
| * end up resorting the entire data set. So, unless we can push |
| * down all of the query pathkeys, forget it. |
| */ |
| if (!is_foreign_pathkey(root, rel, pathkey)) |
| { |
| query_pathkeys_ok = false; |
| break; |
| } |
| } |
| |
| if (query_pathkeys_ok) |
| { |
| useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys)); |
| fpinfo->qp_is_pushdown_safe = true; |
| } |
| } |
| |
| /* |
| * Even if we're not using remote estimates, having the remote side do the |
| * sort generally won't be any worse than doing it locally, and it might |
| * be much better if the remote side can generate data in the right order |
| * without needing a sort at all. However, what we're going to do next is |
| * try to generate pathkeys that seem promising for possible merge joins, |
| * and that's more speculative. A wrong choice might hurt quite a bit, so |
| * bail out if we can't use remote estimates. |
| */ |
| if (!fpinfo->use_remote_estimate) |
| return useful_pathkeys_list; |
| |
| /* Get the list of interesting EquivalenceClasses. */ |
| useful_eclass_list = get_useful_ecs_for_relation(root, rel); |
| |
| /* Extract unique EC for query, if any, so we don't consider it again. */ |
| if (list_length(root->query_pathkeys) == 1) |
| { |
| PathKey *query_pathkey = linitial(root->query_pathkeys); |
| |
| query_ec = query_pathkey->pk_eclass; |
| } |
| |
| /* |
| * As a heuristic, the only pathkeys we consider here are those of length |
| * one. It's surely possible to consider more, but since each one we |
| * choose to consider will generate a round-trip to the remote side, we |
| * need to be a bit cautious here. It would sure be nice to have a local |
| * cache of information about remote index definitions... |
| */ |
| foreach(lc, useful_eclass_list) |
| { |
| EquivalenceClass *cur_ec = lfirst(lc); |
| PathKey *pathkey; |
| |
| /* If redundant with what we did above, skip it. */ |
| if (cur_ec == query_ec) |
| continue; |
| |
| /* Can't push down the sort if the EC's opfamily is not shippable. */ |
| if (!is_shippable(linitial_oid(cur_ec->ec_opfamilies), |
| OperatorFamilyRelationId, fpinfo)) |
| continue; |
| |
| /* If no pushable expression for this rel, skip it. */ |
| if (find_em_for_rel(root, cur_ec, rel) == NULL) |
| continue; |
| |
| /* Looks like we can generate a pathkey, so let's do it. */ |
| pathkey = make_canonical_pathkey(root, cur_ec, |
| linitial_oid(cur_ec->ec_opfamilies), |
| BTLessStrategyNumber, |
| false); |
| useful_pathkeys_list = lappend(useful_pathkeys_list, |
| list_make1(pathkey)); |
| } |
| |
| return useful_pathkeys_list; |
| } |
| |
| /* |
| * postgresGetForeignPaths |
| * Create possible scan paths for a scan on the foreign table |
| */ |
| static void |
| postgresGetForeignPaths(PlannerInfo *root, |
| RelOptInfo *baserel, |
| Oid foreigntableid) |
| { |
| PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; |
| ForeignPath *path; |
| List *ppi_list; |
| ListCell *lc; |
| |
| /* |
| * Create simplest ForeignScan path node and add it to baserel. This path |
| * corresponds to SeqScan path of regular tables (though depending on what |
| * baserestrict conditions we were able to send to remote, there might |
| * actually be an indexscan happening there). We already did all the work |
| * to estimate cost and size of this path. |
| * |
| * Although this path uses no join clauses, it could still have required |
| * parameterization due to LATERAL refs in its tlist. |
| */ |
| path = create_foreignscan_path(root, baserel, |
| NULL, /* default pathtarget */ |
| fpinfo->rows, |
| fpinfo->startup_cost, |
| fpinfo->total_cost, |
| NIL, /* no pathkeys */ |
| baserel->lateral_relids, |
| NULL, /* no extra plan */ |
| NIL); /* no fdw_private list */ |
| add_path(baserel, (Path *) path, root); |
| |
| /* Add paths with pathkeys */ |
| add_paths_with_pathkeys_for_rel(root, baserel, NULL); |
| |
| /* |
| * If we're not using remote estimates, stop here. We have no way to |
| * estimate whether any join clauses would be worth sending across, so |
| * don't bother building parameterized paths. |
| */ |
| if (!fpinfo->use_remote_estimate) |
| return; |
| |
| /* |
| * Thumb through all join clauses for the rel to identify which outer |
| * relations could supply one or more safe-to-send-to-remote join clauses. |
| * We'll build a parameterized path for each such outer relation. |
| * |
| * It's convenient to manage this by representing each candidate outer |
| * relation by the ParamPathInfo node for it. We can then use the |
| * ppi_clauses list in the ParamPathInfo node directly as a list of the |
| * interesting join clauses for that rel. This takes care of the |
| * possibility that there are multiple safe join clauses for such a rel, |
| * and also ensures that we account for unsafe join clauses that we'll |
| * still have to enforce locally (since the parameterized-path machinery |
| * insists that we handle all movable clauses). |
| */ |
| ppi_list = NIL; |
| foreach(lc, baserel->joininfo) |
| { |
| RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); |
| Relids required_outer; |
| ParamPathInfo *param_info; |
| |
| /* Check if clause can be moved to this rel */ |
| if (!join_clause_is_movable_to(rinfo, baserel)) |
| continue; |
| |
| /* See if it is safe to send to remote */ |
| if (!is_foreign_expr(root, baserel, rinfo->clause)) |
| continue; |
| |
| /* Calculate required outer rels for the resulting path */ |
| required_outer = bms_union(rinfo->clause_relids, |
| baserel->lateral_relids); |
| /* We do not want the foreign rel itself listed in required_outer */ |
| required_outer = bms_del_member(required_outer, baserel->relid); |
| |
| /* |
| * required_outer probably can't be empty here, but if it were, we |
| * couldn't make a parameterized path. |
| */ |
| if (bms_is_empty(required_outer)) |
| continue; |
| |
| /* Get the ParamPathInfo */ |
| param_info = get_baserel_parampathinfo(root, baserel, |
| required_outer); |
| Assert(param_info != NULL); |
| |
| /* |
| * Add it to list unless we already have it. Testing pointer equality |
| * is OK since get_baserel_parampathinfo won't make duplicates. |
| */ |
| ppi_list = list_append_unique_ptr(ppi_list, param_info); |
| } |
| |
| /* |
| * The above scan examined only "generic" join clauses, not those that |
| * were absorbed into EquivalenceClauses. See if we can make anything out |
| * of EquivalenceClauses. |
| */ |
| if (baserel->has_eclass_joins) |
| { |
| /* |
| * We repeatedly scan the eclass list looking for column references |
| * (or expressions) belonging to the foreign rel. Each time we find |
| * one, we generate a list of equivalence joinclauses for it, and then |
| * see if any are safe to send to the remote. Repeat till there are |
| * no more candidate EC members. |
| */ |
| ec_member_foreign_arg arg; |
| |
| arg.already_used = NIL; |
| for (;;) |
| { |
| List *clauses; |
| |
| /* Make clauses, skipping any that join to lateral_referencers */ |
| arg.current = NULL; |
| clauses = generate_implied_equalities_for_column(root, |
| baserel, |
| ec_member_matches_foreign, |
| (void *) &arg, |
| baserel->lateral_referencers); |
| |
| /* Done if there are no more expressions in the foreign rel */ |
| if (arg.current == NULL) |
| { |
| Assert(clauses == NIL); |
| break; |
| } |
| |
| /* Scan the extracted join clauses */ |
| foreach(lc, clauses) |
| { |
| RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); |
| Relids required_outer; |
| ParamPathInfo *param_info; |
| |
| /* Check if clause can be moved to this rel */ |
| if (!join_clause_is_movable_to(rinfo, baserel)) |
| continue; |
| |
| /* See if it is safe to send to remote */ |
| if (!is_foreign_expr(root, baserel, rinfo->clause)) |
| continue; |
| |
| /* Calculate required outer rels for the resulting path */ |
| required_outer = bms_union(rinfo->clause_relids, |
| baserel->lateral_relids); |
| required_outer = bms_del_member(required_outer, baserel->relid); |
| if (bms_is_empty(required_outer)) |
| continue; |
| |
| /* Get the ParamPathInfo */ |
| param_info = get_baserel_parampathinfo(root, baserel, |
| required_outer); |
| Assert(param_info != NULL); |
| |
| /* Add it to list unless we already have it */ |
| ppi_list = list_append_unique_ptr(ppi_list, param_info); |
| } |
| |
| /* Try again, now ignoring the expression we found this time */ |
| arg.already_used = lappend(arg.already_used, arg.current); |
| } |
| } |
| |
| /* |
| * Now build a path for each useful outer relation. |
| */ |
| foreach(lc, ppi_list) |
| { |
| ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc); |
| double rows; |
| int width; |
| Cost startup_cost; |
| Cost total_cost; |
| |
| /* Get a cost estimate from the remote */ |
| estimate_path_cost_size(root, baserel, |
| param_info->ppi_clauses, NIL, NULL, |
| &rows, &width, |
| &startup_cost, &total_cost); |
| |
| /* |
| * ppi_rows currently won't get looked at by anything, but still we |
| * may as well ensure that it matches our idea of the rowcount. |
| */ |
| param_info->ppi_rows = rows; |
| |
| /* Make the path */ |
| path = create_foreignscan_path(root, baserel, |
| NULL, /* default pathtarget */ |
| rows, |
| startup_cost, |
| total_cost, |
| NIL, /* no pathkeys */ |
| param_info->ppi_req_outer, |
| NULL, |
| NIL); /* no fdw_private list */ |
| add_path(baserel, (Path *) path, root); |
| } |
| } |
| |
| /* |
| * postgresGetForeignPlan |
| * Create ForeignScan plan node which implements selected best path |
| */ |
| static ForeignScan * |
| postgresGetForeignPlan(PlannerInfo *root, |
| RelOptInfo *foreignrel, |
| Oid foreigntableid, |
| ForeignPath *best_path, |
| List *tlist, |
| List *scan_clauses, |
| Plan *outer_plan) |
| { |
| PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; |
| Index scan_relid; |
| List *fdw_private; |
| List *remote_exprs = NIL; |
| List *local_exprs = NIL; |
| List *params_list = NIL; |
| List *fdw_scan_tlist = NIL; |
| List *fdw_recheck_quals = NIL; |
| List *retrieved_attrs; |
| StringInfoData sql; |
| bool has_final_sort = false; |
| bool has_limit = false; |
| ListCell *lc; |
| |
| /* |
| * Get FDW private data created by postgresGetForeignUpperPaths(), if any. |
| */ |
| if (best_path->fdw_private) |
| { |
| has_final_sort = boolVal(list_nth(best_path->fdw_private, |
| FdwPathPrivateHasFinalSort)); |
| has_limit = boolVal(list_nth(best_path->fdw_private, |
| FdwPathPrivateHasLimit)); |
| } |
| |
| if (IS_SIMPLE_REL(foreignrel)) |
| { |
| /* |
| * For base relations, set scan_relid as the relid of the relation. |
| */ |
| scan_relid = foreignrel->relid; |
| |
| /* |
| * In a base-relation scan, we must apply the given scan_clauses. |
| * |
| * Separate the scan_clauses into those that can be executed remotely |
| * and those that can't. baserestrictinfo clauses that were |
| * previously determined to be safe or unsafe by classifyConditions |
| * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything |
| * else in the scan_clauses list will be a join clause, which we have |
| * to check for remote-safety. |
| * |
| * Note: the join clauses we see here should be the exact same ones |
| * previously examined by postgresGetForeignPaths. Possibly it'd be |
| * worth passing forward the classification work done then, rather |
| * than repeating it here. |
| * |
| * This code must match "extract_actual_clauses(scan_clauses, false)" |
| * except for the additional decision about remote versus local |
| * execution. |
| */ |
| foreach(lc, scan_clauses) |
| { |
| RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); |
| |
| /* Ignore any pseudoconstants, they're dealt with elsewhere */ |
| if (rinfo->pseudoconstant) |
| continue; |
| |
| if (list_member_ptr(fpinfo->remote_conds, rinfo)) |
| remote_exprs = lappend(remote_exprs, rinfo->clause); |
| else if (list_member_ptr(fpinfo->local_conds, rinfo)) |
| local_exprs = lappend(local_exprs, rinfo->clause); |
| else if (is_foreign_expr(root, foreignrel, rinfo->clause)) |
| remote_exprs = lappend(remote_exprs, rinfo->clause); |
| else |
| local_exprs = lappend(local_exprs, rinfo->clause); |
| } |
| |
| /* |
| * For a base-relation scan, we have to support EPQ recheck, which |
| * should recheck all the remote quals. |
| */ |
| fdw_recheck_quals = remote_exprs; |
| } |
| else |
| { |
| /* |
| * Join relation or upper relation - set scan_relid to 0. |
| */ |
| scan_relid = 0; |
| |
| /* |
| * For a join rel, baserestrictinfo is NIL and we are not considering |
| * parameterization right now, so there should be no scan_clauses for |
| * a joinrel or an upper rel either. |
| */ |
| Assert(!scan_clauses); |
| |
| /* |
| * Instead we get the conditions to apply from the fdw_private |
| * structure. |
| */ |
| remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false); |
| local_exprs = extract_actual_clauses(fpinfo->local_conds, false); |
| |
| /* |
| * We leave fdw_recheck_quals empty in this case, since we never need |
| * to apply EPQ recheck clauses. In the case of a joinrel, EPQ |
| * recheck is handled elsewhere --- see postgresGetForeignJoinPaths(). |
| * If we're planning an upperrel (ie, remote grouping or aggregation) |
| * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be |
| * allowed, and indeed we *can't* put the remote clauses into |
| * fdw_recheck_quals because the unaggregated Vars won't be available |
| * locally. |
| */ |
| |
| /* Build the list of columns to be fetched from the foreign server. */ |
| fdw_scan_tlist = build_tlist_to_deparse(foreignrel); |
| |
| /* |
| * Ensure that the outer plan produces a tuple whose descriptor |
| * matches our scan tuple slot. Also, remove the local conditions |
| * from outer plan's quals, lest they be evaluated twice, once by the |
| * local plan and once by the scan. |
| */ |
| if (outer_plan) |
| { |
| /* |
| * Right now, we only consider grouping and aggregation beyond |
| * joins. Queries involving aggregates or grouping do not require |
| * EPQ mechanism, hence should not have an outer plan here. |
| */ |
| Assert(!IS_UPPER_REL(foreignrel)); |
| |
| /* |
| * First, update the plan's qual list if possible. In some cases |
| * the quals might be enforced below the topmost plan level, in |
| * which case we'll fail to remove them; it's not worth working |
| * harder than this. |
| */ |
| foreach(lc, local_exprs) |
| { |
| Node *qual = lfirst(lc); |
| |
| outer_plan->qual = list_delete(outer_plan->qual, qual); |
| |
| /* |
| * For an inner join the local conditions of foreign scan plan |
| * can be part of the joinquals as well. (They might also be |
| * in the mergequals or hashquals, but we can't touch those |
| * without breaking the plan.) |
| */ |
| if (IsA(outer_plan, NestLoop) || |
| IsA(outer_plan, MergeJoin) || |
| IsA(outer_plan, HashJoin)) |
| { |
| Join *join_plan = (Join *) outer_plan; |
| |
| if (join_plan->jointype == JOIN_INNER) |
| join_plan->joinqual = list_delete(join_plan->joinqual, |
| qual); |
| } |
| } |
| |
| /* |
| * Now fix the subplan's tlist --- this might result in inserting |
| * a Result node atop the plan tree. |
| */ |
| outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist, |
| best_path->path.parallel_safe); |
| } |
| } |
| |
| /* |
| * Build the query string to be sent for execution, and identify |
| * expressions to be sent as parameters. |
| */ |
| initStringInfo(&sql); |
| deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, |
| remote_exprs, best_path->path.pathkeys, |
| has_final_sort, has_limit, false, |
| &retrieved_attrs, ¶ms_list); |
| |
| /* Remember remote_exprs for possible use by postgresPlanDirectModify */ |
| fpinfo->final_remote_exprs = remote_exprs; |
| |
| /* |
| * Build the fdw_private list that will be available to the executor. |
| * Items in the list must match order in enum FdwScanPrivateIndex. |
| */ |
| fdw_private = list_make3(makeString(sql.data), |
| retrieved_attrs, |
| makeInteger(fpinfo->fetch_size)); |
| if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) |
| fdw_private = lappend(fdw_private, |
| makeString(fpinfo->relation_name)); |
| |
| /* |
| * Create the ForeignScan node for the given relation. |
| * |
| * Note that the remote parameter expressions are stored in the fdw_exprs |
| * field of the finished plan node; we can't keep them in private state |
| * because then they wouldn't be subject to later planner processing. |
| */ |
| return make_foreignscan(tlist, |
| local_exprs, |
| scan_relid, |
| params_list, |
| fdw_private, |
| fdw_scan_tlist, |
| fdw_recheck_quals, |
| outer_plan); |
| } |
| |
| /* |
| * Construct a tuple descriptor for the scan tuples handled by a foreign join. |
| */ |
| static TupleDesc |
| get_tupdesc_for_join_scan_tuples(ForeignScanState *node) |
| { |
| ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; |
| EState *estate = node->ss.ps.state; |
| TupleDesc tupdesc; |
| |
| /* |
| * The core code has already set up a scan tuple slot based on |
| * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough, |
| * but there's one case where it isn't. If we have any whole-row row |
| * identifier Vars, they may have vartype RECORD, and we need to replace |
| * that with the associated table's actual composite type. This ensures |
| * that when we read those ROW() expression values from the remote server, |
| * we can convert them to a composite type the local server knows. |
| */ |
| tupdesc = CreateTupleDescCopy(node->ss.ss_ScanTupleSlot->tts_tupleDescriptor); |
| for (int i = 0; i < tupdesc->natts; i++) |
| { |
| Form_pg_attribute att = TupleDescAttr(tupdesc, i); |
| Var *var; |
| RangeTblEntry *rte; |
| Oid reltype; |
| |
| /* Nothing to do if it's not a generic RECORD attribute */ |
| if (att->atttypid != RECORDOID || att->atttypmod >= 0) |
| continue; |
| |
| /* |
| * If we can't identify the referenced table, do nothing. This'll |
| * likely lead to failure later, but perhaps we can muddle through. |
| */ |
| var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, |
| i)->expr; |
| if (!IsA(var, Var) || var->varattno != 0) |
| continue; |
| rte = list_nth(estate->es_range_table, var->varno - 1); |
| if (rte->rtekind != RTE_RELATION) |
| continue; |
| reltype = get_rel_type_id(rte->relid); |
| if (!OidIsValid(reltype)) |
| continue; |
| att->atttypid = reltype; |
| /* shouldn't need to change anything else */ |
| } |
| return tupdesc; |
| } |
| |
| /* |
| * postgresBeginForeignScan |
| * Initiate an executor scan of a foreign PostgreSQL table. |
| */ |
| static void |
| postgresBeginForeignScan(ForeignScanState *node, int eflags) |
| { |
| ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; |
| EState *estate = node->ss.ps.state; |
| PgFdwScanState *fsstate; |
| RangeTblEntry *rte; |
| Oid userid; |
| ForeignTable *table; |
| UserMapping *user; |
| int rtindex; |
| int numParams; |
| |
| /* |
| * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. |
| */ |
| if (eflags & EXEC_FLAG_EXPLAIN_ONLY) |
| return; |
| |
| /* |
| * We'll save private state in node->fdw_state. |
| */ |
| fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); |
| node->fdw_state = (void *) fsstate; |
| |
| /* |
| * Identify which user to do the remote access as. This should match what |
| * ExecCheckPermissions() does. |
| */ |
| userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId(); |
| if (fsplan->scan.scanrelid > 0) |
| rtindex = fsplan->scan.scanrelid; |
| else |
| rtindex = bms_next_member(fsplan->fs_base_relids, -1); |
| rte = exec_rt_fetch(rtindex, estate); |
| |
| /* Get info about foreign table. */ |
| table = GetForeignTable(rte->relid); |
| user = GetUserMapping(userid, table->serverid); |
| |
| /* |
| * Get connection to the foreign server. Connection manager will |
| * establish new connection if necessary. |
| */ |
| fsstate->conn = GetConnection(user, false, &fsstate->conn_state); |
| |
| /* Assign a unique ID for my cursor */ |
| fsstate->cursor_number = GetCursorNumber(fsstate->conn); |
| fsstate->cursor_exists = false; |
| |
| /* Get private info created by planner functions. */ |
| fsstate->query = strVal(list_nth(fsplan->fdw_private, |
| FdwScanPrivateSelectSql)); |
| fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, |
| FdwScanPrivateRetrievedAttrs); |
| fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private, |
| FdwScanPrivateFetchSize)); |
| |
| /* Create contexts for batches of tuples and per-tuple temp workspace. */ |
| fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, |
| "postgres_fdw tuple data", |
| ALLOCSET_DEFAULT_SIZES); |
| fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, |
| "postgres_fdw temporary data", |
| ALLOCSET_SMALL_SIZES); |
| |
| /* |
| * Get info we'll need for converting data fetched from the foreign server |
| * into local representation and error reporting during that process. |
| */ |
| if (fsplan->scan.scanrelid > 0) |
| { |
| fsstate->rel = node->ss.ss_currentRelation; |
| fsstate->tupdesc = RelationGetDescr(fsstate->rel); |
| } |
| else |
| { |
| fsstate->rel = NULL; |
| fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node); |
| } |
| |
| fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); |
| |
| /* |
| * Prepare for processing of parameters used in remote query, if any. |
| */ |
| numParams = list_length(fsplan->fdw_exprs); |
| fsstate->numParams = numParams; |
| if (numParams > 0) |
| prepare_query_params((PlanState *) node, |
| fsplan->fdw_exprs, |
| numParams, |
| &fsstate->param_flinfo, |
| &fsstate->param_exprs, |
| &fsstate->param_values); |
| |
| /* Set the async-capable flag */ |
| fsstate->async_capable = node->ss.ps.async_capable; |
| } |
| |
| /* |
| * postgresIterateForeignScan |
| * Retrieve next row from the result set, or clear tuple slot to indicate |
| * EOF. |
| */ |
| static TupleTableSlot * |
| postgresIterateForeignScan(ForeignScanState *node) |
| { |
| PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; |
| TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; |
| |
| /* |
| * In sync mode, if this is the first call after Begin or ReScan, we need |
| * to create the cursor on the remote side. In async mode, we would have |
| * already created the cursor before we get here, even if this is the |
| * first call after Begin or ReScan. |
| */ |
| if (!fsstate->cursor_exists) |
| create_cursor(node); |
| |
| /* |
| * Get some more tuples, if we've run out. |
| */ |
| if (fsstate->next_tuple >= fsstate->num_tuples) |
| { |
| /* In async mode, just clear tuple slot. */ |
| if (fsstate->async_capable) |
| return ExecClearTuple(slot); |
| /* No point in another fetch if we already detected EOF, though. */ |
| if (!fsstate->eof_reached) |
| fetch_more_data(node); |
| /* If we didn't get any tuples, must be end of data. */ |
| if (fsstate->next_tuple >= fsstate->num_tuples) |
| return ExecClearTuple(slot); |
| } |
| |
| /* |
| * Return the next tuple. |
| */ |
| ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++], |
| slot, |
| false); |
| |
| return slot; |
| } |
| |
| /* |
| * postgresReScanForeignScan |
| * Restart the scan. |
| */ |
| static void |
| postgresReScanForeignScan(ForeignScanState *node) |
| { |
| PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; |
| char sql[64]; |
| PGresult *res; |
| |
| /* If we haven't created the cursor yet, nothing to do. */ |
| if (!fsstate->cursor_exists) |
| return; |
| |
| /* |
| <<<<<<< HEAD |
| * If the node is async-capable, and an asynchronous fetch for it has been |
| ======= |
| * If the node is async-capable, and an asynchronous fetch for it has |
| >>>>>>> REL_16_9 |
| * begun, the asynchronous fetch might not have yet completed. Check if |
| * the node is async-capable, and an asynchronous fetch for it is still in |
| * progress; if so, complete the asynchronous fetch before restarting the |
| * scan. |
| */ |
| if (fsstate->async_capable && |
| fsstate->conn_state->pendingAreq && |
| fsstate->conn_state->pendingAreq->requestee == (PlanState *) node) |
| fetch_more_data(node); |
| |
| /* |
| * If any internal parameters affecting this node have changed, we'd |
| * better destroy and recreate the cursor. Otherwise, if the remote |
| * server is v14 or older, rewinding it should be good enough; if not, |
| * rewind is only allowed for scrollable cursors, but we don't have a way |
| * to check the scrollability of it, so destroy and recreate it in any |
| * case. If we've only fetched zero or one batch, we needn't even rewind |
| * the cursor, just rescan what we have. |
| */ |
| if (node->ss.ps.chgParam != NULL) |
| { |
| fsstate->cursor_exists = false; |
| snprintf(sql, sizeof(sql), "CLOSE c%u", |
| fsstate->cursor_number); |
| } |
| else if (fsstate->fetch_ct_2 > 1) |
| { |
| if (PQserverVersion(fsstate->conn) < 150000) |
| snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u", |
| fsstate->cursor_number); |
| else |
| { |
| fsstate->cursor_exists = false; |
| snprintf(sql, sizeof(sql), "CLOSE c%u", |
| fsstate->cursor_number); |
| } |
| } |
| else |
| { |
| /* Easy: just rescan what we already have in memory, if anything */ |
| fsstate->next_tuple = 0; |
| return; |
| } |
| |
| /* |
| * We don't use a PG_TRY block here, so be careful not to throw error |
| * without releasing the PGresult. |
| */ |
| res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state); |
| if (PQresultStatus(res) != PGRES_COMMAND_OK) |
| pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); |
| PQclear(res); |
| |
| /* Now force a fresh FETCH. */ |
| fsstate->tuples = NULL; |
| fsstate->num_tuples = 0; |
| fsstate->next_tuple = 0; |
| fsstate->fetch_ct_2 = 0; |
| fsstate->eof_reached = false; |
| } |
| |
| /* |
| * postgresEndForeignScan |
| * Finish scanning foreign table and dispose objects used for this scan |
| */ |
| static void |
| postgresEndForeignScan(ForeignScanState *node) |
| { |
| PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; |
| |
| /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ |
| if (fsstate == NULL) |
| return; |
| |
| /* Close the cursor if open, to prevent accumulation of cursors */ |
| if (fsstate->cursor_exists) |
| close_cursor(fsstate->conn, fsstate->cursor_number, |
| fsstate->conn_state); |
| |
| /* Release remote connection */ |
| ReleaseConnection(fsstate->conn); |
| fsstate->conn = NULL; |
| |
| /* MemoryContexts will be deleted automatically. */ |
| } |
| |
| /* |
| * postgresAddForeignUpdateTargets |
| * Add resjunk column(s) needed for update/delete on a foreign table |
| */ |
| static void |
| postgresAddForeignUpdateTargets(PlannerInfo *root, |
| Index rtindex, |
| RangeTblEntry *target_rte, |
| Relation target_relation) |
| { |
| Var *var; |
| |
| /* |
| * In postgres_fdw, what we need is the ctid, same as for a regular table. |
| */ |
| |
| /* Make a Var representing the desired value */ |
| var = makeVar(rtindex, |
| SelfItemPointerAttributeNumber, |
| TIDOID, |
| -1, |
| InvalidOid, |
| 0); |
| |
| /* Register it as a row-identity column needed by this target rel */ |
| add_row_identity_var(root, var, rtindex, "ctid"); |
| } |
| |
| /* |
| * postgresPlanForeignModify |
| * Plan an insert/update/delete operation on a foreign table |
| */ |
| static List * |
| postgresPlanForeignModify(PlannerInfo *root, |
| ModifyTable *plan, |
| Index resultRelation, |
| int subplan_index) |
| { |
| CmdType operation = plan->operation; |
| RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); |
| Relation rel; |
| StringInfoData sql; |
| List *targetAttrs = NIL; |
| List *withCheckOptionList = NIL; |
| List *returningList = NIL; |
| List *retrieved_attrs = NIL; |
| bool doNothing = false; |
| int values_end_len = -1; |
| |
| initStringInfo(&sql); |
| |
| /* |
| * Core code already has some lock on each rel being planned, so we can |
| * use NoLock here. |
| */ |
| rel = table_open(rte->relid, NoLock); |
| |
| /* |
| * In an INSERT, we transmit all columns that are defined in the foreign |
| * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the |
| * foreign table, we transmit all columns like INSERT; else we transmit |
| * only columns that were explicitly targets of the UPDATE, so as to avoid |
| * unnecessary data transmission. (We can't do that for INSERT since we |
| * would miss sending default values for columns not listed in the source |
| * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since |
| * those triggers might change values for non-target columns, in which |
| * case we would miss sending changed values for those columns.) |
| */ |
| if (operation == CMD_INSERT || |
| (operation == CMD_UPDATE && |
| rel->trigdesc && |
| rel->trigdesc->trig_update_before_row)) |
| { |
| TupleDesc tupdesc = RelationGetDescr(rel); |
| int attnum; |
| |
| for (attnum = 1; attnum <= tupdesc->natts; attnum++) |
| { |
| Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); |
| |
| if (!attr->attisdropped) |
| targetAttrs = lappend_int(targetAttrs, attnum); |
| } |
| } |
| else if (operation == CMD_UPDATE) |
| { |
| int col; |
| RelOptInfo *rel = find_base_rel(root, resultRelation); |
| Bitmapset *allUpdatedCols = get_rel_all_updated_cols(root, rel); |
| |
| col = -1; |
| while ((col = bms_next_member(allUpdatedCols, col)) >= 0) |
| { |
| /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */ |
| AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber; |
| |
| if (attno <= InvalidAttrNumber) /* shouldn't happen */ |
| elog(ERROR, "system-column update is not supported"); |
| targetAttrs = lappend_int(targetAttrs, attno); |
| } |
| } |
| |
| /* |
| * Extract the relevant WITH CHECK OPTION list if any. |
| */ |
| if (plan->withCheckOptionLists) |
| withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists, |
| subplan_index); |
| |
| /* |
| * Extract the relevant RETURNING list if any. |
| */ |
| if (plan->returningLists) |
| returningList = (List *) list_nth(plan->returningLists, subplan_index); |
| |
| /* |
| * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification |
| * should have already been rejected in the optimizer, as presently there |
| * is no way to recognize an arbiter index on a foreign table. Only DO |
| * NOTHING is supported without an inference specification. |
| */ |
| if (plan->onConflictAction == ONCONFLICT_NOTHING) |
| doNothing = true; |
| else if (plan->onConflictAction != ONCONFLICT_NONE) |
| elog(ERROR, "unexpected ON CONFLICT specification: %d", |
| (int) plan->onConflictAction); |
| |
| /* |
| * Construct the SQL command string. |
| */ |
| switch (operation) |
| { |
| case CMD_INSERT: |
| deparseInsertSql(&sql, rte, resultRelation, rel, |
| targetAttrs, doNothing, |
| withCheckOptionList, returningList, |
| &retrieved_attrs, &values_end_len); |
| break; |
| case CMD_UPDATE: |
| deparseUpdateSql(&sql, rte, resultRelation, rel, |
| targetAttrs, |
| withCheckOptionList, returningList, |
| &retrieved_attrs); |
| break; |
| case CMD_DELETE: |
| deparseDeleteSql(&sql, rte, resultRelation, rel, |
| returningList, |
| &retrieved_attrs); |
| break; |
| default: |
| elog(ERROR, "unexpected operation: %d", (int) operation); |
| break; |
| } |
| |
| table_close(rel, NoLock); |
| |
| /* |
| * Build the fdw_private list that will be available to the executor. |
| * Items in the list must match enum FdwModifyPrivateIndex, above. |
| */ |
| return list_make5(makeString(sql.data), |
| targetAttrs, |
| makeInteger(values_end_len), |
| makeBoolean((retrieved_attrs != NIL)), |
| retrieved_attrs); |
| } |
| |
| /* |
| * postgresBeginForeignModify |
| * Begin an insert/update/delete operation on a foreign table |
| */ |
| static void |
| postgresBeginForeignModify(ModifyTableState *mtstate, |
| ResultRelInfo *resultRelInfo, |
| List *fdw_private, |
| int subplan_index, |
| int eflags) |
| { |
| PgFdwModifyState *fmstate; |
| char *query; |
| List *target_attrs; |
| bool has_returning; |
| int values_end_len; |
| List *retrieved_attrs; |
| RangeTblEntry *rte; |
| |
| /* |
| * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState |
| * stays NULL. |
| */ |
| if (eflags & EXEC_FLAG_EXPLAIN_ONLY) |
| return; |
| |
| /* Deconstruct fdw_private data. */ |
| query = strVal(list_nth(fdw_private, |
| FdwModifyPrivateUpdateSql)); |
| target_attrs = (List *) list_nth(fdw_private, |
| FdwModifyPrivateTargetAttnums); |
| values_end_len = intVal(list_nth(fdw_private, |
| FdwModifyPrivateLen)); |
| has_returning = boolVal(list_nth(fdw_private, |
| FdwModifyPrivateHasReturning)); |
| retrieved_attrs = (List *) list_nth(fdw_private, |
| FdwModifyPrivateRetrievedAttrs); |
| |
| /* Find RTE. */ |
| rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, |
| mtstate->ps.state); |
| |
| /* Construct an execution state. */ |
| fmstate = create_foreign_modify(mtstate->ps.state, |
| rte, |
| resultRelInfo, |
| mtstate->operation, |
| outerPlanState(mtstate)->plan, |
| query, |
| target_attrs, |
| values_end_len, |
| has_returning, |
| retrieved_attrs); |
| |
| resultRelInfo->ri_FdwState = fmstate; |
| } |
| |
| /* |
| * postgresExecForeignInsert |
| * Insert one row into a foreign table |
| */ |
| static TupleTableSlot * |
| postgresExecForeignInsert(EState *estate, |
| ResultRelInfo *resultRelInfo, |
| TupleTableSlot *slot, |
| TupleTableSlot *planSlot) |
| { |
| PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; |
| TupleTableSlot **rslot; |
| int numSlots = 1; |
| |
| /* |
| * If the fmstate has aux_fmstate set, use the aux_fmstate (see |
| * postgresBeginForeignInsert()) |
| */ |
| if (fmstate->aux_fmstate) |
| resultRelInfo->ri_FdwState = fmstate->aux_fmstate; |
| rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, |
| &slot, &planSlot, &numSlots); |
| /* Revert that change */ |
| if (fmstate->aux_fmstate) |
| resultRelInfo->ri_FdwState = fmstate; |
| |
| return rslot ? *rslot : NULL; |
| } |
| |
| /* |
| * postgresExecForeignBatchInsert |
| * Insert multiple rows into a foreign table |
| */ |
| static TupleTableSlot ** |
| postgresExecForeignBatchInsert(EState *estate, |
| ResultRelInfo *resultRelInfo, |
| TupleTableSlot **slots, |
| TupleTableSlot **planSlots, |
| int *numSlots) |
| { |
| PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; |
| TupleTableSlot **rslot; |
| |
| /* |
| * If the fmstate has aux_fmstate set, use the aux_fmstate (see |
| * postgresBeginForeignInsert()) |
| */ |
| if (fmstate->aux_fmstate) |
| resultRelInfo->ri_FdwState = fmstate->aux_fmstate; |
| rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, |
| slots, planSlots, numSlots); |
| /* Revert that change */ |
| if (fmstate->aux_fmstate) |
| resultRelInfo->ri_FdwState = fmstate; |
| |
| return rslot; |
| } |
| |
| /* |
| * postgresGetForeignModifyBatchSize |
| * Determine the maximum number of tuples that can be inserted in bulk |
| * |
| * Returns the batch size specified for server or table. When batching is not |
| * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING |
| * clause), returns 1. |
| */ |
| static int |
| postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo) |
| { |
| int batch_size; |
| PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; |
| |
| /* should be called only once */ |
| Assert(resultRelInfo->ri_BatchSize == 0); |
| |
| /* |
| * Should never get called when the insert is being performed on a table |
| * that is also among the target relations of an UPDATE operation, because |
| * postgresBeginForeignInsert() currently rejects such insert attempts. |
| */ |
| Assert(fmstate == NULL || fmstate->aux_fmstate == NULL); |
| |
| /* |
| * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup |
| * the option directly in server/table options. Otherwise just use the |
| * value we determined earlier. |
| */ |
| if (fmstate) |
| batch_size = fmstate->batch_size; |
| else |
| batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc); |
| |
| /* |
| <<<<<<< HEAD |
| * Disable batching when we have to use RETURNING or there are any |
| * BEFORE/AFTER ROW INSERT triggers on the foreign table. |
| ======= |
| * Disable batching when we have to use RETURNING, there are any |
| * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any |
| * WITH CHECK OPTION constraints from parent views. |
| >>>>>>> REL_16_9 |
| * |
| * When there are any BEFORE ROW INSERT triggers on the table, we can't |
| * support it, because such triggers might query the table we're inserting |
| * into and act differently if the tuples that have already been processed |
| * and prepared for insertion are not there. |
| */ |
| if (resultRelInfo->ri_projectReturning != NULL || |
| resultRelInfo->ri_WithCheckOptions != NIL || |
| (resultRelInfo->ri_TrigDesc && |
| (resultRelInfo->ri_TrigDesc->trig_insert_before_row || |
| resultRelInfo->ri_TrigDesc->trig_insert_after_row))) |
| <<<<<<< HEAD |
| ======= |
| return 1; |
| |
| /* |
| * If the foreign table has no columns, disable batching as the INSERT |
| * syntax doesn't allow batching multiple empty rows into a zero-column |
| * table in a single statement. This is needed for COPY FROM, in which |
| * case fmstate must be non-NULL. |
| */ |
| if (fmstate && list_length(fmstate->target_attrs) == 0) |
| >>>>>>> REL_16_9 |
| return 1; |
| |
| /* |
| * Otherwise use the batch size specified for server/table. The number of |
| * parameters in a batch is limited to 65535 (uint16), so make sure we |
| * don't exceed this limit by using the maximum batch_size possible. |
| */ |
| if (fmstate && fmstate->p_nums > 0) |
| batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums); |
| |
| return batch_size; |
| } |
| |
| /* |
| * postgresExecForeignUpdate |
| * Update one row in a foreign table |
| */ |
| static TupleTableSlot * |
| postgresExecForeignUpdate(EState *estate, |
| ResultRelInfo *resultRelInfo, |
| TupleTableSlot *slot, |
| TupleTableSlot *planSlot) |
| { |
| TupleTableSlot **rslot; |
| int numSlots = 1; |
| |
| rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, |
| &slot, &planSlot, &numSlots); |
| |
| return rslot ? rslot[0] : NULL; |
| } |
| |
| /* |
| * postgresExecForeignDelete |
| * Delete one row from a foreign table |
| */ |
| static TupleTableSlot * |
| postgresExecForeignDelete(EState *estate, |
| ResultRelInfo *resultRelInfo, |
| TupleTableSlot *slot, |
| TupleTableSlot *planSlot) |
| { |
| TupleTableSlot **rslot; |
| int numSlots = 1; |
| |
| rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, |
| &slot, &planSlot, &numSlots); |
| |
| return rslot ? rslot[0] : NULL; |
| } |
| |
| /* |
| * postgresEndForeignModify |
| * Finish an insert/update/delete operation on a foreign table |
| */ |
| static void |
| postgresEndForeignModify(EState *estate, |
| ResultRelInfo *resultRelInfo) |
| { |
| PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; |
| |
| /* If fmstate is NULL, we are in EXPLAIN; nothing to do */ |
| if (fmstate == NULL) |
| return; |
| |
| /* Destroy the execution state */ |
| finish_foreign_modify(fmstate); |
| } |
| |
| /* |
| * postgresBeginForeignInsert |
| * Begin an insert operation on a foreign table |
| */ |
| static void |
| postgresBeginForeignInsert(ModifyTableState *mtstate, |
| ResultRelInfo *resultRelInfo) |
| { |
| PgFdwModifyState *fmstate; |
| ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan); |
| EState *estate = mtstate->ps.state; |
| Index resultRelation; |
| Relation rel = resultRelInfo->ri_RelationDesc; |
| RangeTblEntry *rte; |
| TupleDesc tupdesc = RelationGetDescr(rel); |
| int attnum; |
| int values_end_len; |
| StringInfoData sql; |
| List *targetAttrs = NIL; |
| List *retrieved_attrs = NIL; |
| bool doNothing = false; |
| |
| /* |
| * If the foreign table we are about to insert routed rows into is also an |
| * UPDATE subplan result rel that will be updated later, proceeding with |
| * the INSERT will result in the later UPDATE incorrectly modifying those |
| * routed rows, so prevent the INSERT --- it would be nice if we could |
| * handle this case; but for now, throw an error for safety. |
| */ |
| if (plan && plan->operation == CMD_UPDATE && |
| (resultRelInfo->ri_usesFdwDirectModify || |
| resultRelInfo->ri_FdwState)) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cannot route tuples into foreign table to be updated \"%s\"", |
| RelationGetRelationName(rel)))); |
| |
| initStringInfo(&sql); |
| |
| /* We transmit all columns that are defined in the foreign table. */ |
| for (attnum = 1; attnum <= tupdesc->natts; attnum++) |
| { |
| Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); |
| |
| if (!attr->attisdropped) |
| targetAttrs = lappend_int(targetAttrs, attnum); |
| } |
| |
| /* Check if we add the ON CONFLICT clause to the remote query. */ |
| if (plan) |
| { |
| OnConflictAction onConflictAction = plan->onConflictAction; |
| |
| /* We only support DO NOTHING without an inference specification. */ |
| if (onConflictAction == ONCONFLICT_NOTHING) |
| doNothing = true; |
| else if (onConflictAction != ONCONFLICT_NONE) |
| elog(ERROR, "unexpected ON CONFLICT specification: %d", |
| (int) onConflictAction); |
| } |
| |
| /* |
| * If the foreign table is a partition that doesn't have a corresponding |
| * RTE entry, we need to create a new RTE describing the foreign table for |
| * use by deparseInsertSql and create_foreign_modify() below, after first |
| * copying the parent's RTE and modifying some fields to describe the |
| * foreign partition to work on. However, if this is invoked by UPDATE, |
| * the existing RTE may already correspond to this partition if it is one |
| * of the UPDATE subplan target rels; in that case, we can just use the |
| * existing RTE as-is. |
| */ |
| if (resultRelInfo->ri_RangeTableIndex == 0) |
| { |
| ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo; |
| |
| rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate); |
| rte = copyObject(rte); |
| rte->relid = RelationGetRelid(rel); |
| rte->relkind = RELKIND_FOREIGN_TABLE; |
| |
| /* |
| * For UPDATE, we must use the RT index of the first subplan target |
| * rel's RTE, because the core code would have built expressions for |
| * the partition, such as RETURNING, using that RT index as varno of |
| * Vars contained in those expressions. |
| */ |
| if (plan && plan->operation == CMD_UPDATE && |
| rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation) |
| resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex; |
| else |
| resultRelation = rootResultRelInfo->ri_RangeTableIndex; |
| } |
| else |
| { |
| resultRelation = resultRelInfo->ri_RangeTableIndex; |
| rte = exec_rt_fetch(resultRelation, estate); |
| } |
| |
| /* Construct the SQL command string. */ |
| deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, |
| resultRelInfo->ri_WithCheckOptions, |
| resultRelInfo->ri_returningList, |
| &retrieved_attrs, &values_end_len); |
| |
| /* Construct an execution state. */ |
| fmstate = create_foreign_modify(mtstate->ps.state, |
| rte, |
| resultRelInfo, |
| CMD_INSERT, |
| NULL, |
| sql.data, |
| targetAttrs, |
| values_end_len, |
| retrieved_attrs != NIL, |
| retrieved_attrs); |
| |
| /* |
| * If the given resultRelInfo already has PgFdwModifyState set, it means |
| * the foreign table is an UPDATE subplan result rel; in which case, store |
| * the resulting state into the aux_fmstate of the PgFdwModifyState. |
| */ |
| if (resultRelInfo->ri_FdwState) |
| { |
| Assert(plan && plan->operation == CMD_UPDATE); |
| Assert(resultRelInfo->ri_usesFdwDirectModify == false); |
| ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate; |
| } |
| else |
| resultRelInfo->ri_FdwState = fmstate; |
| } |
| |
| /* |
| * postgresEndForeignInsert |
| * Finish an insert operation on a foreign table |
| */ |
| static void |
| postgresEndForeignInsert(EState *estate, |
| ResultRelInfo *resultRelInfo) |
| { |
| PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; |
| |
| Assert(fmstate != NULL); |
| |
| /* |
| * If the fmstate has aux_fmstate set, get the aux_fmstate (see |
| * postgresBeginForeignInsert()) |
| */ |
| if (fmstate->aux_fmstate) |
| fmstate = fmstate->aux_fmstate; |
| |
| /* Destroy the execution state */ |
| finish_foreign_modify(fmstate); |
| } |
| |
| /* |
| * postgresIsForeignRelUpdatable |
| * Determine whether a foreign table supports INSERT, UPDATE and/or |
| * DELETE. |
| */ |
| static int |
| postgresIsForeignRelUpdatable(Relation rel) |
| { |
| bool updatable; |
| ForeignTable *table; |
| ForeignServer *server; |
| ListCell *lc; |
| |
| /* |
| * By default, all postgres_fdw foreign tables are assumed updatable. This |
| * can be overridden by a per-server setting, which in turn can be |
| * overridden by a per-table setting. |
| */ |
| updatable = true; |
| |
| table = GetForeignTable(RelationGetRelid(rel)); |
| server = GetForeignServer(table->serverid); |
| |
| foreach(lc, server->options) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (strcmp(def->defname, "updatable") == 0) |
| updatable = defGetBoolean(def); |
| } |
| foreach(lc, table->options) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (strcmp(def->defname, "updatable") == 0) |
| updatable = defGetBoolean(def); |
| } |
| |
| /* |
| * Currently "updatable" means support for INSERT, UPDATE and DELETE. |
| */ |
| if (!updatable) |
| return 0; |
| |
| /* |
| * Cloudberry only supports INSERT, because UPDATE/DELETE SELECT requires |
| * the hidden column gp_segment_id and the other "ModifyTable mixes |
| * distributed and entry-only tables" issue. |
| */ |
| UserMapping *user = GetUserMapping(rel->rd_rel->relowner, table->serverid); |
| if (greenplumCheckIsCloudberry(user)) |
| return (1 << CMD_INSERT); |
| else |
| return (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE); |
| } |
| |
| /* |
| * postgresRecheckForeignScan |
| * Execute a local join execution plan for a foreign join |
| */ |
| static bool |
| postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot) |
| { |
| Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid; |
| PlanState *outerPlan = outerPlanState(node); |
| TupleTableSlot *result; |
| |
| /* For base foreign relations, it suffices to set fdw_recheck_quals */ |
| if (scanrelid > 0) |
| return true; |
| |
| Assert(outerPlan != NULL); |
| |
| /* Execute a local join execution plan */ |
| result = ExecProcNode(outerPlan); |
| if (TupIsNull(result)) |
| return false; |
| |
| /* Store result in the given slot */ |
| ExecCopySlot(slot, result); |
| |
| return true; |
| } |
| |
| /* |
| * find_modifytable_subplan |
| * Helper routine for postgresPlanDirectModify to find the |
| * ModifyTable subplan node that scans the specified RTI. |
| * |
| * Returns NULL if the subplan couldn't be identified. That's not a fatal |
| * error condition, we just abandon trying to do the update directly. |
| */ |
| static ForeignScan * |
| find_modifytable_subplan(PlannerInfo *root, |
| ModifyTable *plan, |
| Index rtindex, |
| int subplan_index) |
| { |
| Plan *subplan = outerPlan(plan); |
| |
| /* |
| * The cases we support are (1) the desired ForeignScan is the immediate |
| * child of ModifyTable, or (2) it is the subplan_index'th child of an |
| * Append node that is the immediate child of ModifyTable. There is no |
| * point in looking further down, as that would mean that local joins are |
| * involved, so we can't do the update directly. |
| * |
| * There could be a Result atop the Append too, acting to compute the |
| * UPDATE targetlist values. We ignore that here; the tlist will be |
| * checked by our caller. |
| * |
| * In principle we could examine all the children of the Append, but it's |
| * currently unlikely that the core planner would generate such a plan |
| * with the children out-of-order. Moreover, such a search risks costing |
| * O(N^2) time when there are a lot of children. |
| */ |
| if (IsA(subplan, Append)) |
| { |
| Append *appendplan = (Append *) subplan; |
| |
| if (subplan_index < list_length(appendplan->appendplans)) |
| subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index); |
| } |
| else if (IsA(subplan, Result) && |
| outerPlan(subplan) != NULL && |
| IsA(outerPlan(subplan), Append)) |
| { |
| Append *appendplan = (Append *) outerPlan(subplan); |
| |
| if (subplan_index < list_length(appendplan->appendplans)) |
| subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index); |
| } |
| |
| /* Now, have we got a ForeignScan on the desired rel? */ |
| if (IsA(subplan, ForeignScan)) |
| { |
| ForeignScan *fscan = (ForeignScan *) subplan; |
| |
| if (bms_is_member(rtindex, fscan->fs_base_relids)) |
| return fscan; |
| } |
| |
| return NULL; |
| } |
| |
| /* |
| * postgresPlanDirectModify |
| * Consider a direct foreign table modification |
| * |
| * Decide whether it is safe to modify a foreign table directly, and if so, |
| * rewrite subplan accordingly. |
| */ |
| static bool |
| postgresPlanDirectModify(PlannerInfo *root, |
| ModifyTable *plan, |
| Index resultRelation, |
| int subplan_index) |
| { |
| CmdType operation = plan->operation; |
| RelOptInfo *foreignrel; |
| RangeTblEntry *rte; |
| PgFdwRelationInfo *fpinfo; |
| Relation rel; |
| StringInfoData sql; |
| ForeignScan *fscan; |
| List *processed_tlist = NIL; |
| List *targetAttrs = NIL; |
| List *remote_exprs; |
| List *params_list = NIL; |
| List *returningList = NIL; |
| List *retrieved_attrs = NIL; |
| |
| /* |
| * Decide whether it is safe to modify a foreign table directly. |
| */ |
| |
| /* |
| * The table modification must be an UPDATE or DELETE. |
| */ |
| if (operation != CMD_UPDATE && operation != CMD_DELETE) |
| return false; |
| |
| /* |
| * Try to locate the ForeignScan subplan that's scanning resultRelation. |
| */ |
| fscan = find_modifytable_subplan(root, plan, resultRelation, subplan_index); |
| if (!fscan) |
| return false; |
| |
| /* |
| * It's unsafe to modify a foreign table directly if there are any quals |
| * that should be evaluated locally. |
| */ |
| if (fscan->scan.plan.qual != NIL) |
| return false; |
| |
| /* Safe to fetch data about the target foreign rel */ |
| if (fscan->scan.scanrelid == 0) |
| { |
| foreignrel = find_join_rel(root, fscan->fs_relids); |
| /* We should have a rel for this foreign join. */ |
| Assert(foreignrel); |
| } |
| else |
| foreignrel = root->simple_rel_array[resultRelation]; |
| rte = root->simple_rte_array[resultRelation]; |
| fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; |
| |
| /* |
| * It's unsafe to update a foreign table directly, if any expressions to |
| * assign to the target columns are unsafe to evaluate remotely. |
| */ |
| if (operation == CMD_UPDATE) |
| { |
| ListCell *lc, |
| *lc2; |
| |
| /* |
| * The expressions of concern are the first N columns of the processed |
| * targetlist, where N is the length of the rel's update_colnos. |
| */ |
| get_translated_update_targetlist(root, resultRelation, |
| &processed_tlist, &targetAttrs); |
| forboth(lc, processed_tlist, lc2, targetAttrs) |
| { |
| TargetEntry *tle = lfirst_node(TargetEntry, lc); |
| AttrNumber attno = lfirst_int(lc2); |
| |
| /* update's new-value expressions shouldn't be resjunk */ |
| Assert(!tle->resjunk); |
| |
| if (attno <= InvalidAttrNumber) /* shouldn't happen */ |
| elog(ERROR, "system-column update is not supported"); |
| |
| if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr)) |
| return false; |
| } |
| } |
| |
| /* |
| * Ok, rewrite subplan so as to modify the foreign table directly. |
| */ |
| initStringInfo(&sql); |
| |
| /* |
| * Core code already has some lock on each rel being planned, so we can |
| * use NoLock here. |
| */ |
| rel = table_open(rte->relid, NoLock); |
| |
| /* |
| * Recall the qual clauses that must be evaluated remotely. (These are |
| * bare clauses not RestrictInfos, but deparse.c's appendConditions() |
| * doesn't care.) |
| */ |
| remote_exprs = fpinfo->final_remote_exprs; |
| |
| /* |
| * Extract the relevant RETURNING list if any. |
| */ |
| if (plan->returningLists) |
| { |
| returningList = (List *) list_nth(plan->returningLists, subplan_index); |
| |
| /* |
| * When performing an UPDATE/DELETE .. RETURNING on a join directly, |
| * we fetch from the foreign server any Vars specified in RETURNING |
| * that refer not only to the target relation but to non-target |
| * relations. So we'll deparse them into the RETURNING clause of the |
| * remote query; use a targetlist consisting of them instead, which |
| * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan |
| * node below. |
| */ |
| if (fscan->scan.scanrelid == 0) |
| returningList = build_remote_returning(resultRelation, rel, |
| returningList); |
| } |
| |
| /* |
| * Construct the SQL command string. |
| */ |
| switch (operation) |
| { |
| case CMD_UPDATE: |
| deparseDirectUpdateSql(&sql, root, resultRelation, rel, |
| foreignrel, |
| processed_tlist, |
| targetAttrs, |
| remote_exprs, ¶ms_list, |
| returningList, &retrieved_attrs); |
| break; |
| case CMD_DELETE: |
| deparseDirectDeleteSql(&sql, root, resultRelation, rel, |
| foreignrel, |
| remote_exprs, ¶ms_list, |
| returningList, &retrieved_attrs); |
| break; |
| default: |
| elog(ERROR, "unexpected operation: %d", (int) operation); |
| break; |
| } |
| |
| /* |
| * Update the operation and target relation info. |
| */ |
| fscan->operation = operation; |
| fscan->resultRelation = resultRelation; |
| |
| /* |
| * Update the fdw_exprs list that will be available to the executor. |
| */ |
| fscan->fdw_exprs = params_list; |
| |
| /* |
| * Update the fdw_private list that will be available to the executor. |
| * Items in the list must match enum FdwDirectModifyPrivateIndex, above. |
| */ |
| fscan->fdw_private = list_make4(makeString(sql.data), |
| makeBoolean((retrieved_attrs != NIL)), |
| retrieved_attrs, |
| makeBoolean(plan->canSetTag)); |
| |
| /* |
| * Update the foreign-join-related fields. |
| */ |
| if (fscan->scan.scanrelid == 0) |
| { |
| /* No need for the outer subplan. */ |
| fscan->scan.plan.lefttree = NULL; |
| |
| /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */ |
| if (returningList) |
| rebuild_fdw_scan_tlist(fscan, returningList); |
| } |
| |
| /* |
| * Finally, unset the async-capable flag if it is set, as we currently |
| * don't support asynchronous execution of direct modifications. |
| */ |
| if (fscan->scan.plan.async_capable) |
| fscan->scan.plan.async_capable = false; |
| |
| table_close(rel, NoLock); |
| return true; |
| } |
| |
| /* |
| * postgresBeginDirectModify |
| * Prepare a direct foreign table modification |
| */ |
| static void |
| postgresBeginDirectModify(ForeignScanState *node, int eflags) |
| { |
| ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; |
| EState *estate = node->ss.ps.state; |
| PgFdwDirectModifyState *dmstate; |
| Index rtindex; |
| Oid userid; |
| ForeignTable *table; |
| UserMapping *user; |
| int numParams; |
| |
| /* |
| * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. |
| */ |
| if (eflags & EXEC_FLAG_EXPLAIN_ONLY) |
| return; |
| |
| /* |
| * We'll save private state in node->fdw_state. |
| */ |
| dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState)); |
| node->fdw_state = (void *) dmstate; |
| |
| /* |
| * Identify which user to do the remote access as. This should match what |
| * ExecCheckPermissions() does. |
| */ |
| userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId(); |
| |
| /* Get info about foreign table. */ |
| rtindex = node->resultRelInfo->ri_RangeTableIndex; |
| if (fsplan->scan.scanrelid == 0) |
| dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags); |
| else |
| dmstate->rel = node->ss.ss_currentRelation; |
| table = GetForeignTable(RelationGetRelid(dmstate->rel)); |
| user = GetUserMapping(userid, table->serverid); |
| |
| /* |
| * Get connection to the foreign server. Connection manager will |
| * establish new connection if necessary. |
| */ |
| dmstate->conn = GetConnection(user, false, &dmstate->conn_state); |
| |
| /* Update the foreign-join-related fields. */ |
| if (fsplan->scan.scanrelid == 0) |
| { |
| /* Save info about foreign table. */ |
| dmstate->resultRel = dmstate->rel; |
| |
| /* |
| * Set dmstate->rel to NULL to teach get_returning_data() and |
| * make_tuple_from_result_row() that columns fetched from the remote |
| * server are described by fdw_scan_tlist of the foreign-scan plan |
| * node, not the tuple descriptor for the target relation. |
| */ |
| dmstate->rel = NULL; |
| } |
| |
| /* Initialize state variable */ |
| dmstate->num_tuples = -1; /* -1 means not set yet */ |
| |
| /* Get private info created by planner functions. */ |
| dmstate->query = strVal(list_nth(fsplan->fdw_private, |
| FdwDirectModifyPrivateUpdateSql)); |
| dmstate->has_returning = boolVal(list_nth(fsplan->fdw_private, |
| FdwDirectModifyPrivateHasReturning)); |
| dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, |
| FdwDirectModifyPrivateRetrievedAttrs); |
| dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private, |
| FdwDirectModifyPrivateSetProcessed)); |
| |
| /* Create context for per-tuple temp workspace. */ |
| dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, |
| "postgres_fdw temporary data", |
| ALLOCSET_SMALL_SIZES); |
| |
| /* Prepare for input conversion of RETURNING results. */ |
| if (dmstate->has_returning) |
| { |
| TupleDesc tupdesc; |
| |
| if (fsplan->scan.scanrelid == 0) |
| tupdesc = get_tupdesc_for_join_scan_tuples(node); |
| else |
| tupdesc = RelationGetDescr(dmstate->rel); |
| |
| dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); |
| |
| /* |
| * When performing an UPDATE/DELETE .. RETURNING on a join directly, |
| * initialize a filter to extract an updated/deleted tuple from a scan |
| * tuple. |
| */ |
| if (fsplan->scan.scanrelid == 0) |
| init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex); |
| } |
| |
| /* |
| * Prepare for processing of parameters used in remote query, if any. |
| */ |
| numParams = list_length(fsplan->fdw_exprs); |
| dmstate->numParams = numParams; |
| if (numParams > 0) |
| prepare_query_params((PlanState *) node, |
| fsplan->fdw_exprs, |
| numParams, |
| &dmstate->param_flinfo, |
| &dmstate->param_exprs, |
| &dmstate->param_values); |
| } |
| |
| /* |
| * postgresIterateDirectModify |
| * Execute a direct foreign table modification |
| */ |
| static TupleTableSlot * |
| postgresIterateDirectModify(ForeignScanState *node) |
| { |
| PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; |
| EState *estate = node->ss.ps.state; |
| ResultRelInfo *resultRelInfo = node->resultRelInfo; |
| |
| /* |
| * If this is the first call after Begin, execute the statement. |
| */ |
| if (dmstate->num_tuples == -1) |
| execute_dml_stmt(node); |
| |
| /* |
| * If the local query doesn't specify RETURNING, just clear tuple slot. |
| */ |
| if (!resultRelInfo->ri_projectReturning) |
| { |
| TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; |
| Instrumentation *instr = node->ss.ps.instrument; |
| |
| Assert(!dmstate->has_returning); |
| |
| /* Increment the command es_processed count if necessary. */ |
| if (dmstate->set_processed) |
| estate->es_processed += dmstate->num_tuples; |
| |
| /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */ |
| if (instr) |
| instr->tuplecount += dmstate->num_tuples; |
| |
| return ExecClearTuple(slot); |
| } |
| |
| /* |
| * Get the next RETURNING tuple. |
| */ |
| return get_returning_data(node); |
| } |
| |
| /* |
| * postgresEndDirectModify |
| * Finish a direct foreign table modification |
| */ |
| static void |
| postgresEndDirectModify(ForeignScanState *node) |
| { |
| PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; |
| |
| /* if dmstate is NULL, we are in EXPLAIN; nothing to do */ |
| if (dmstate == NULL) |
| return; |
| |
| /* Release PGresult */ |
| PQclear(dmstate->result); |
| |
| /* Release remote connection */ |
| ReleaseConnection(dmstate->conn); |
| dmstate->conn = NULL; |
| |
| /* MemoryContext will be deleted automatically. */ |
| } |
| |
| /* |
| * postgresExplainForeignScan |
| * Produce extra output for EXPLAIN of a ForeignScan on a foreign table |
| */ |
| static void |
| postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) |
| { |
| ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan); |
| List *fdw_private = plan->fdw_private; |
| |
| /* |
| * Identify foreign scans that are really joins or upper relations. The |
| * input looks something like "(1) LEFT JOIN (2)", and we must replace the |
| * digit string(s), which are RT indexes, with the correct relation names. |
| * We do that here, not when the plan is created, because we can't know |
| * what aliases ruleutils.c will assign at plan creation time. |
| */ |
| if (list_length(fdw_private) > FdwScanPrivateRelations) |
| { |
| StringInfo relations; |
| char *rawrelations; |
| char *ptr; |
| int minrti, |
| rtoffset; |
| |
| rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations)); |
| |
| /* |
| * A difficulty with using a string representation of RT indexes is |
| * that setrefs.c won't update the string when flattening the |
| * rangetable. To find out what rtoffset was applied, identify the |
| * minimum RT index appearing in the string and compare it to the |
| * minimum member of plan->fs_base_relids. (We expect all the relids |
| * in the join will have been offset by the same amount; the Asserts |
| * below should catch it if that ever changes.) |
| */ |
| minrti = INT_MAX; |
| ptr = rawrelations; |
| while (*ptr) |
| { |
| if (isdigit((unsigned char) *ptr)) |
| { |
| int rti = strtol(ptr, &ptr, 10); |
| |
| if (rti < minrti) |
| minrti = rti; |
| } |
| else |
| ptr++; |
| } |
| rtoffset = bms_next_member(plan->fs_base_relids, -1) - minrti; |
| |
| /* Now we can translate the string */ |
| relations = makeStringInfo(); |
| ptr = rawrelations; |
| while (*ptr) |
| { |
| if (isdigit((unsigned char) *ptr)) |
| { |
| int rti = strtol(ptr, &ptr, 10); |
| RangeTblEntry *rte; |
| char *relname; |
| char *refname; |
| |
| rti += rtoffset; |
| Assert(bms_is_member(rti, plan->fs_base_relids)); |
| rte = rt_fetch(rti, es->rtable); |
| Assert(rte->rtekind == RTE_RELATION); |
| /* This logic should agree with explain.c's ExplainTargetRel */ |
| relname = get_rel_name(rte->relid); |
| if (es->verbose) |
| { |
| char *namespace; |
| |
| namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid)); |
| appendStringInfo(relations, "%s.%s", |
| quote_identifier(namespace), |
| quote_identifier(relname)); |
| } |
| else |
| appendStringInfoString(relations, |
| quote_identifier(relname)); |
| refname = (char *) list_nth(es->rtable_names, rti - 1); |
| if (refname == NULL) |
| refname = rte->eref->aliasname; |
| if (strcmp(refname, relname) != 0) |
| appendStringInfo(relations, " %s", |
| quote_identifier(refname)); |
| } |
| else |
| appendStringInfoChar(relations, *ptr++); |
| } |
| ExplainPropertyText("Relations", relations->data, es); |
| } |
| |
| /* |
| * Add remote query, when VERBOSE option is specified. |
| */ |
| if (es->verbose) |
| { |
| char *sql; |
| |
| sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); |
| ExplainPropertyText("Remote SQL", sql, es); |
| } |
| } |
| |
| /* |
| * postgresExplainForeignModify |
| * Produce extra output for EXPLAIN of a ModifyTable on a foreign table |
| */ |
| static void |
| postgresExplainForeignModify(ModifyTableState *mtstate, |
| ResultRelInfo *rinfo, |
| List *fdw_private, |
| int subplan_index, |
| ExplainState *es) |
| { |
| if (es->verbose) |
| { |
| char *sql = strVal(list_nth(fdw_private, |
| FdwModifyPrivateUpdateSql)); |
| |
| ExplainPropertyText("Remote SQL", sql, es); |
| |
| /* |
| * For INSERT we should always have batch size >= 1, but UPDATE and |
| * DELETE don't support batching so don't show the property. |
| */ |
| if (rinfo->ri_BatchSize > 0) |
| ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es); |
| } |
| } |
| |
| /* |
| * postgresExplainDirectModify |
| * Produce extra output for EXPLAIN of a ForeignScan that modifies a |
| * foreign table directly |
| */ |
| static void |
| postgresExplainDirectModify(ForeignScanState *node, ExplainState *es) |
| { |
| List *fdw_private; |
| char *sql; |
| |
| if (es->verbose) |
| { |
| fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; |
| sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql)); |
| ExplainPropertyText("Remote SQL", sql, es); |
| } |
| } |
| |
| /* |
| * postgresExecForeignTruncate |
| * Truncate one or more foreign tables |
| */ |
| static void |
| postgresExecForeignTruncate(List *rels, |
| DropBehavior behavior, |
| bool restart_seqs) |
| { |
| Oid serverid = InvalidOid; |
| UserMapping *user = NULL; |
| PGconn *conn = NULL; |
| StringInfoData sql; |
| ListCell *lc; |
| bool server_truncatable = true; |
| |
| /* |
| * By default, all postgres_fdw foreign tables are assumed truncatable. |
| * This can be overridden by a per-server setting, which in turn can be |
| * overridden by a per-table setting. |
| */ |
| foreach(lc, rels) |
| { |
| ForeignServer *server = NULL; |
| Relation rel = lfirst(lc); |
| ForeignTable *table = GetForeignTable(RelationGetRelid(rel)); |
| ListCell *cell; |
| bool truncatable; |
| |
| /* |
| * First time through, determine whether the foreign server allows |
| * truncates. Since all specified foreign tables are assumed to belong |
| * to the same foreign server, this result can be used for other |
| * foreign tables. |
| */ |
| if (!OidIsValid(serverid)) |
| { |
| serverid = table->serverid; |
| server = GetForeignServer(serverid); |
| |
| foreach(cell, server->options) |
| { |
| DefElem *defel = (DefElem *) lfirst(cell); |
| |
| if (strcmp(defel->defname, "truncatable") == 0) |
| { |
| server_truncatable = defGetBoolean(defel); |
| break; |
| } |
| } |
| } |
| |
| /* |
| * Confirm that all specified foreign tables belong to the same |
| * foreign server. |
| */ |
| Assert(table->serverid == serverid); |
| |
| /* Determine whether this foreign table allows truncations */ |
| truncatable = server_truncatable; |
| foreach(cell, table->options) |
| { |
| DefElem *defel = (DefElem *) lfirst(cell); |
| |
| if (strcmp(defel->defname, "truncatable") == 0) |
| { |
| truncatable = defGetBoolean(defel); |
| break; |
| } |
| } |
| |
| if (!truncatable) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("foreign table \"%s\" does not allow truncates", |
| RelationGetRelationName(rel)))); |
| } |
| Assert(OidIsValid(serverid)); |
| |
| /* |
| * Get connection to the foreign server. Connection manager will |
| * establish new connection if necessary. |
| */ |
| user = GetUserMapping(GetUserId(), serverid); |
| conn = GetConnection(user, false, NULL); |
| |
| /* Construct the TRUNCATE command string */ |
| initStringInfo(&sql); |
| deparseTruncateSql(&sql, rels, behavior, restart_seqs); |
| |
| /* Issue the TRUNCATE command to remote server */ |
| do_sql_command(conn, sql.data); |
| |
| pfree(sql.data); |
| } |
| |
| /* |
| * estimate_path_cost_size |
| * Get cost and size estimates for a foreign scan on given foreign relation |
| * either a base relation or a join between foreign relations or an upper |
| * relation containing foreign relations. |
| * |
| * param_join_conds are the parameterization clauses with outer relations. |
| * pathkeys specify the expected sort order if any for given path being costed. |
| * fpextra specifies additional post-scan/join-processing steps such as the |
| * final sort and the LIMIT restriction. |
| * |
| * The function returns the cost and size estimates in p_rows, p_width, |
| * p_startup_cost and p_total_cost variables. |
| */ |
| static void |
| estimate_path_cost_size(PlannerInfo *root, |
| RelOptInfo *foreignrel, |
| List *param_join_conds, |
| List *pathkeys, |
| PgFdwPathExtraData *fpextra, |
| double *p_rows, int *p_width, |
| Cost *p_startup_cost, Cost *p_total_cost) |
| { |
| PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; |
| double rows; |
| double retrieved_rows; |
| int width; |
| Cost startup_cost; |
| Cost total_cost; |
| |
| /* Make sure the core code has set up the relation's reltarget */ |
| Assert(foreignrel->reltarget); |
| |
| /* |
| * If the table or the server is configured to use remote estimates, |
| * connect to the foreign server and execute EXPLAIN to estimate the |
| * number of rows selected by the restriction+join clauses. Otherwise, |
| * estimate rows using whatever statistics we have locally, in a way |
| * similar to ordinary tables. |
| */ |
| if (fpinfo->use_remote_estimate) |
| { |
| List *remote_param_join_conds; |
| List *local_param_join_conds; |
| StringInfoData sql; |
| PGconn *conn; |
| Selectivity local_sel; |
| QualCost local_cost; |
| List *fdw_scan_tlist = NIL; |
| List *remote_conds; |
| |
| /* Required only to be passed to deparseSelectStmtForRel */ |
| List *retrieved_attrs; |
| |
| /* |
| * param_join_conds might contain both clauses that are safe to send |
| * across, and clauses that aren't. |
| */ |
| classifyConditions(root, foreignrel, param_join_conds, |
| &remote_param_join_conds, &local_param_join_conds); |
| |
| /* Build the list of columns to be fetched from the foreign server. */ |
| if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) |
| fdw_scan_tlist = build_tlist_to_deparse(foreignrel); |
| else |
| fdw_scan_tlist = NIL; |
| |
| /* |
| * The complete list of remote conditions includes everything from |
| * baserestrictinfo plus any extra join_conds relevant to this |
| * particular path. |
| */ |
| remote_conds = list_concat(remote_param_join_conds, |
| fpinfo->remote_conds); |
| |
| /* |
| * Construct EXPLAIN query including the desired SELECT, FROM, and |
| * WHERE clauses. Params and other-relation Vars are replaced by dummy |
| * values, so don't request params_list. |
| */ |
| initStringInfo(&sql); |
| appendStringInfoString(&sql, "EXPLAIN "); |
| deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, |
| remote_conds, pathkeys, |
| fpextra ? fpextra->has_final_sort : false, |
| fpextra ? fpextra->has_limit : false, |
| false, &retrieved_attrs, NULL); |
| |
| /* Get the remote estimate */ |
| conn = GetConnection(fpinfo->user, false, NULL); |
| get_remote_estimate(sql.data, conn, &rows, &width, |
| &startup_cost, &total_cost); |
| ReleaseConnection(conn); |
| |
| retrieved_rows = rows; |
| |
| /* Factor in the selectivity of the locally-checked quals */ |
| local_sel = clauselist_selectivity(root, |
| local_param_join_conds, |
| foreignrel->relid, |
| JOIN_INNER, |
| NULL, |
| false); |
| local_sel *= fpinfo->local_conds_sel; |
| |
| rows = clamp_row_est(rows * local_sel); |
| |
| /* Add in the eval cost of the locally-checked quals */ |
| startup_cost += fpinfo->local_conds_cost.startup; |
| total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; |
| cost_qual_eval(&local_cost, local_param_join_conds, root); |
| startup_cost += local_cost.startup; |
| total_cost += local_cost.per_tuple * retrieved_rows; |
| |
| /* |
| * Add in tlist eval cost for each output row. In case of an |
| * aggregate, some of the tlist expressions such as grouping |
| * expressions will be evaluated remotely, so adjust the costs. |
| */ |
| startup_cost += foreignrel->reltarget->cost.startup; |
| total_cost += foreignrel->reltarget->cost.startup; |
| total_cost += foreignrel->reltarget->cost.per_tuple * rows; |
| if (IS_UPPER_REL(foreignrel)) |
| { |
| QualCost tlist_cost; |
| |
| cost_qual_eval(&tlist_cost, fdw_scan_tlist, root); |
| startup_cost -= tlist_cost.startup; |
| total_cost -= tlist_cost.startup; |
| total_cost -= tlist_cost.per_tuple * rows; |
| } |
| } |
| else |
| { |
| Cost run_cost = 0; |
| |
| /* |
| * We don't support join conditions in this mode (hence, no |
| * parameterized paths can be made). |
| */ |
| Assert(param_join_conds == NIL); |
| |
| /* |
| * We will come here again and again with different set of pathkeys or |
| * additional post-scan/join-processing steps that caller wants to |
| * cost. We don't need to calculate the cost/size estimates for the |
| * underlying scan, join, or grouping each time. Instead, use those |
| * estimates if we have cached them already. |
| */ |
| if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0) |
| { |
| Assert(fpinfo->retrieved_rows >= 0); |
| |
| rows = fpinfo->rows; |
| retrieved_rows = fpinfo->retrieved_rows; |
| width = fpinfo->width; |
| startup_cost = fpinfo->rel_startup_cost; |
| run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost; |
| |
| /* |
| * If we estimate the costs of a foreign scan or a foreign join |
| * with additional post-scan/join-processing steps, the scan or |
| * join costs obtained from the cache wouldn't yet contain the |
| * eval costs for the final scan/join target, which would've been |
| * updated by apply_scanjoin_target_to_paths(); add the eval costs |
| * now. |
| */ |
| if (fpextra && !IS_UPPER_REL(foreignrel)) |
| { |
| /* Shouldn't get here unless we have LIMIT */ |
| Assert(fpextra->has_limit); |
| Assert(foreignrel->reloptkind == RELOPT_BASEREL || |
| foreignrel->reloptkind == RELOPT_JOINREL); |
| startup_cost += foreignrel->reltarget->cost.startup; |
| run_cost += foreignrel->reltarget->cost.per_tuple * rows; |
| } |
| } |
| else if (IS_JOIN_REL(foreignrel)) |
| { |
| PgFdwRelationInfo *fpinfo_i; |
| PgFdwRelationInfo *fpinfo_o; |
| QualCost join_cost; |
| QualCost remote_conds_cost; |
| double nrows; |
| |
| /* Use rows/width estimates made by the core code. */ |
| rows = foreignrel->rows; |
| width = foreignrel->reltarget->width; |
| |
| /* For join we expect inner and outer relations set */ |
| Assert(fpinfo->innerrel && fpinfo->outerrel); |
| |
| fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private; |
| fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; |
| |
| /* Estimate of number of rows in cross product */ |
| nrows = fpinfo_i->rows * fpinfo_o->rows; |
| |
| /* |
| * Back into an estimate of the number of retrieved rows. Just in |
| * case this is nuts, clamp to at most nrows. |
| */ |
| retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); |
| retrieved_rows = Min(retrieved_rows, nrows); |
| |
| /* |
| * The cost of foreign join is estimated as cost of generating |
| * rows for the joining relations + cost for applying quals on the |
| * rows. |
| */ |
| |
| /* |
| * Calculate the cost of clauses pushed down to the foreign server |
| */ |
| cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root); |
| /* Calculate the cost of applying join clauses */ |
| cost_qual_eval(&join_cost, fpinfo->joinclauses, root); |
| |
| /* |
| * Startup cost includes startup cost of joining relations and the |
| * startup cost for join and other clauses. We do not include the |
| * startup cost specific to join strategy (e.g. setting up hash |
| * tables) since we do not know what strategy the foreign server |
| * is going to use. |
| */ |
| startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost; |
| startup_cost += join_cost.startup; |
| startup_cost += remote_conds_cost.startup; |
| startup_cost += fpinfo->local_conds_cost.startup; |
| |
| /* |
| * Run time cost includes: |
| * |
| * 1. Run time cost (total_cost - startup_cost) of relations being |
| * joined |
| * |
| * 2. Run time cost of applying join clauses on the cross product |
| * of the joining relations. |
| * |
| * 3. Run time cost of applying pushed down other clauses on the |
| * result of join |
| * |
| * 4. Run time cost of applying nonpushable other clauses locally |
| * on the result fetched from the foreign server. |
| */ |
| run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost; |
| run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost; |
| run_cost += nrows * join_cost.per_tuple; |
| nrows = clamp_row_est(nrows * fpinfo->joinclause_sel); |
| run_cost += nrows * remote_conds_cost.per_tuple; |
| run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; |
| |
| /* Add in tlist eval cost for each output row */ |
| startup_cost += foreignrel->reltarget->cost.startup; |
| run_cost += foreignrel->reltarget->cost.per_tuple * rows; |
| } |
| else if (IS_UPPER_REL(foreignrel)) |
| { |
| RelOptInfo *outerrel = fpinfo->outerrel; |
| PgFdwRelationInfo *ofpinfo; |
| AggClauseCosts aggcosts; |
| double input_rows; |
| int numGroupCols; |
| double numGroups = 1; |
| |
| /* The upper relation should have its outer relation set */ |
| Assert(outerrel); |
| /* and that outer relation should have its reltarget set */ |
| Assert(outerrel->reltarget); |
| |
| /* |
| * This cost model is mixture of costing done for sorted and |
| * hashed aggregates in cost_agg(). We are not sure which |
| * strategy will be considered at remote side, thus for |
| * simplicity, we put all startup related costs in startup_cost |
| * and all finalization and run cost are added in total_cost. |
| */ |
| |
| ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private; |
| |
| /* Get rows from input rel */ |
| input_rows = ofpinfo->rows; |
| |
| /* Collect statistics about aggregates for estimating costs. */ |
| MemSet(&aggcosts, 0, sizeof(AggClauseCosts)); |
| if (root->parse->hasAggs) |
| { |
| get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &aggcosts); |
| } |
| |
| /* Get number of grouping columns and possible number of groups */ |
| numGroupCols = list_length(root->processed_groupClause); |
| numGroups = estimate_num_groups(root, |
| get_sortgrouplist_exprs(root->processed_groupClause, |
| fpinfo->grouped_tlist), |
| input_rows, NULL, NULL); |
| |
| /* |
| * Get the retrieved_rows and rows estimates. If there are HAVING |
| * quals, account for their selectivity. |
| */ |
| if (root->hasHavingQual) |
| { |
| /* Factor in the selectivity of the remotely-checked quals */ |
| retrieved_rows = |
| clamp_row_est(numGroups * |
| clauselist_selectivity(root, |
| fpinfo->remote_conds, |
| 0, |
| JOIN_INNER, |
| NULL, |
| false)); |
| /* Factor in the selectivity of the locally-checked quals */ |
| rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel); |
| } |
| else |
| { |
| rows = retrieved_rows = numGroups; |
| } |
| |
| /* Use width estimate made by the core code. */ |
| width = foreignrel->reltarget->width; |
| |
| /*----- |
| * Startup cost includes: |
| * 1. Startup cost for underneath input relation, adjusted for |
| * tlist replacement by apply_scanjoin_target_to_paths() |
| * 2. Cost of performing aggregation, per cost_agg() |
| *----- |
| */ |
| startup_cost = ofpinfo->rel_startup_cost; |
| startup_cost += outerrel->reltarget->cost.startup; |
| startup_cost += aggcosts.transCost.startup; |
| startup_cost += aggcosts.transCost.per_tuple * input_rows; |
| startup_cost += aggcosts.finalCost.startup; |
| startup_cost += (cpu_operator_cost * numGroupCols) * input_rows; |
| |
| /*----- |
| * Run time cost includes: |
| * 1. Run time cost of underneath input relation, adjusted for |
| * tlist replacement by apply_scanjoin_target_to_paths() |
| * 2. Run time cost of performing aggregation, per cost_agg() |
| *----- |
| */ |
| run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost; |
| run_cost += outerrel->reltarget->cost.per_tuple * input_rows; |
| run_cost += aggcosts.finalCost.per_tuple * numGroups; |
| run_cost += cpu_tuple_cost * numGroups; |
| |
| /* Account for the eval cost of HAVING quals, if any */ |
| if (root->hasHavingQual) |
| { |
| QualCost remote_cost; |
| |
| /* Add in the eval cost of the remotely-checked quals */ |
| cost_qual_eval(&remote_cost, fpinfo->remote_conds, root); |
| startup_cost += remote_cost.startup; |
| run_cost += remote_cost.per_tuple * numGroups; |
| /* Add in the eval cost of the locally-checked quals */ |
| startup_cost += fpinfo->local_conds_cost.startup; |
| run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; |
| } |
| |
| /* Add in tlist eval cost for each output row */ |
| startup_cost += foreignrel->reltarget->cost.startup; |
| run_cost += foreignrel->reltarget->cost.per_tuple * rows; |
| } |
| else |
| { |
| Cost cpu_per_tuple; |
| |
| /* Use rows/width estimates made by set_baserel_size_estimates. */ |
| rows = foreignrel->rows; |
| width = foreignrel->reltarget->width; |
| |
| /* |
| * Back into an estimate of the number of retrieved rows. Just in |
| * case this is nuts, clamp to at most foreignrel->tuples. |
| */ |
| retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); |
| retrieved_rows = Min(retrieved_rows, foreignrel->tuples); |
| |
| /* |
| * Cost as though this were a seqscan, which is pessimistic. We |
| * effectively imagine the local_conds are being evaluated |
| * remotely, too. |
| */ |
| startup_cost = 0; |
| run_cost = 0; |
| run_cost += seq_page_cost * foreignrel->pages; |
| |
| startup_cost += foreignrel->baserestrictcost.startup; |
| cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; |
| run_cost += cpu_per_tuple * foreignrel->tuples; |
| |
| /* Add in tlist eval cost for each output row */ |
| startup_cost += foreignrel->reltarget->cost.startup; |
| run_cost += foreignrel->reltarget->cost.per_tuple * rows; |
| } |
| |
| /* |
| * Without remote estimates, we have no real way to estimate the cost |
| * of generating sorted output. It could be free if the query plan |
| * the remote side would have chosen generates properly-sorted output |
| * anyway, but in most cases it will cost something. Estimate a value |
| * high enough that we won't pick the sorted path when the ordering |
| * isn't locally useful, but low enough that we'll err on the side of |
| * pushing down the ORDER BY clause when it's useful to do so. |
| */ |
| if (pathkeys != NIL) |
| { |
| if (IS_UPPER_REL(foreignrel)) |
| { |
| Assert(foreignrel->reloptkind == RELOPT_UPPER_REL && |
| fpinfo->stage == UPPERREL_GROUP_AGG); |
| adjust_foreign_grouping_path_cost(root, pathkeys, |
| retrieved_rows, width, |
| fpextra->limit_tuples, |
| &startup_cost, &run_cost); |
| } |
| else |
| { |
| startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER; |
| run_cost *= DEFAULT_FDW_SORT_MULTIPLIER; |
| } |
| } |
| |
| total_cost = startup_cost + run_cost; |
| |
| /* Adjust the cost estimates if we have LIMIT */ |
| if (fpextra && fpextra->has_limit) |
| { |
| adjust_limit_rows_costs(&rows, &startup_cost, &total_cost, |
| fpextra->offset_est, fpextra->count_est); |
| retrieved_rows = rows; |
| } |
| } |
| |
| /* |
| * If this includes the final sort step, the given target, which will be |
| * applied to the resulting path, might have different expressions from |
| * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist |
| * eval costs. |
| */ |
| if (fpextra && fpextra->has_final_sort && |
| fpextra->target != foreignrel->reltarget) |
| { |
| QualCost oldcost = foreignrel->reltarget->cost; |
| QualCost newcost = fpextra->target->cost; |
| |
| startup_cost += newcost.startup - oldcost.startup; |
| total_cost += newcost.startup - oldcost.startup; |
| total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows; |
| } |
| |
| /* |
| * Cache the retrieved rows and cost estimates for scans, joins, or |
| * groupings without any parameterization, pathkeys, or additional |
| * post-scan/join-processing steps, before adding the costs for |
| * transferring data from the foreign server. These estimates are useful |
| * for costing remote joins involving this relation or costing other |
| * remote operations on this relation such as remote sorts and remote |
| * LIMIT restrictions, when the costs can not be obtained from the foreign |
| * server. This function will be called at least once for every foreign |
| * relation without any parameterization, pathkeys, or additional |
| * post-scan/join-processing steps. |
| */ |
| if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL) |
| { |
| fpinfo->retrieved_rows = retrieved_rows; |
| fpinfo->rel_startup_cost = startup_cost; |
| fpinfo->rel_total_cost = total_cost; |
| } |
| |
| /* |
| * Add some additional cost factors to account for connection overhead |
| * (fdw_startup_cost), transferring data across the network |
| * (fdw_tuple_cost per retrieved row), and local manipulation of the data |
| * (cpu_tuple_cost per retrieved row). |
| */ |
| startup_cost += fpinfo->fdw_startup_cost; |
| total_cost += fpinfo->fdw_startup_cost; |
| total_cost += fpinfo->fdw_tuple_cost * retrieved_rows; |
| total_cost += cpu_tuple_cost * retrieved_rows; |
| |
| /* |
| * If we have LIMIT, we should prefer performing the restriction remotely |
| * rather than locally, as the former avoids extra row fetches from the |
| * remote that the latter might cause. But since the core code doesn't |
| * account for such fetches when estimating the costs of the local |
| * restriction (see create_limit_path()), there would be no difference |
| * between the costs of the local restriction and the costs of the remote |
| * restriction estimated above if we don't use remote estimates (except |
| * for the case where the foreignrel is a grouping relation, the given |
| * pathkeys is not NIL, and the effects of a bounded sort for that rel is |
| * accounted for in costing the remote restriction). Tweak the costs of |
| * the remote restriction to ensure we'll prefer it if LIMIT is a useful |
| * one. |
| */ |
| if (!fpinfo->use_remote_estimate && |
| fpextra && fpextra->has_limit && |
| fpextra->limit_tuples > 0 && |
| fpextra->limit_tuples < fpinfo->rows) |
| { |
| Assert(fpinfo->rows > 0); |
| total_cost -= (total_cost - startup_cost) * 0.05 * |
| (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows; |
| } |
| |
| /* Return results. */ |
| *p_rows = rows; |
| *p_width = width; |
| *p_startup_cost = startup_cost; |
| *p_total_cost = total_cost; |
| } |
| |
| /* |
| * Estimate costs of executing a SQL statement remotely. |
| * The given "sql" must be an EXPLAIN command. |
| */ |
| static void |
| get_remote_estimate(const char *sql, PGconn *conn, |
| double *rows, int *width, |
| Cost *startup_cost, Cost *total_cost) |
| { |
| PGresult *volatile res = NULL; |
| |
| /* PGresult must be released before leaving this function. */ |
| PG_TRY(); |
| { |
| char *line; |
| char *p; |
| int n; |
| |
| /* |
| * Execute EXPLAIN remotely. |
| */ |
| res = pgfdw_exec_query(conn, sql, NULL); |
| if (PQresultStatus(res) != PGRES_TUPLES_OK) |
| pgfdw_report_error(ERROR, res, conn, false, sql); |
| |
| /* |
| * Extract cost numbers for topmost plan node. Note we search for a |
| * left paren from the end of the line to avoid being confused by |
| * other uses of parentheses. |
| */ |
| line = PQgetvalue(res, 0, 0); |
| p = strrchr(line, '('); |
| if (p == NULL) |
| elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); |
| n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)", |
| startup_cost, total_cost, rows, width); |
| if (n != 4) |
| elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); |
| } |
| PG_FINALLY(); |
| { |
| PQclear(res); |
| } |
| PG_END_TRY(); |
| } |
| |
| /* |
| * Adjust the cost estimates of a foreign grouping path to include the cost of |
| * generating properly-sorted output. |
| */ |
| static void |
| adjust_foreign_grouping_path_cost(PlannerInfo *root, |
| List *pathkeys, |
| double retrieved_rows, |
| double width, |
| double limit_tuples, |
| Cost *p_startup_cost, |
| Cost *p_run_cost) |
| { |
| /* |
| * If the GROUP BY clause isn't sort-able, the plan chosen by the remote |
| * side is unlikely to generate properly-sorted output, so it would need |
| * an explicit sort; adjust the given costs with cost_sort(). Likewise, |
| * if the GROUP BY clause is sort-able but isn't a superset of the given |
| * pathkeys, adjust the costs with that function. Otherwise, adjust the |
| * costs by applying the same heuristic as for the scan or join case. |
| */ |
| if (!grouping_is_sortable(root->processed_groupClause) || |
| !pathkeys_contained_in(pathkeys, root->group_pathkeys)) |
| { |
| Path sort_path; /* dummy for result of cost_sort */ |
| |
| cost_sort(&sort_path, |
| root, |
| pathkeys, |
| *p_startup_cost + *p_run_cost, |
| retrieved_rows, |
| width, |
| 0.0, |
| work_mem, |
| limit_tuples); |
| |
| *p_startup_cost = sort_path.startup_cost; |
| *p_run_cost = sort_path.total_cost - sort_path.startup_cost; |
| } |
| else |
| { |
| /* |
| * The default extra cost seems too large for foreign-grouping cases; |
| * add 1/4th of that default. |
| */ |
| double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER |
| - 1.0) * 0.25; |
| |
| *p_startup_cost *= sort_multiplier; |
| *p_run_cost *= sort_multiplier; |
| } |
| } |
| |
| /* |
| * Detect whether we want to process an EquivalenceClass member. |
| * |
| * This is a callback for use by generate_implied_equalities_for_column. |
| */ |
| static bool |
| ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, |
| EquivalenceClass *ec, EquivalenceMember *em, |
| void *arg) |
| { |
| ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg; |
| Expr *expr = em->em_expr; |
| |
| /* |
| * If we've identified what we're processing in the current scan, we only |
| * want to match that expression. |
| */ |
| if (state->current != NULL) |
| return equal(expr, state->current); |
| |
| /* |
| * Otherwise, ignore anything we've already processed. |
| */ |
| if (list_member(state->already_used, expr)) |
| return false; |
| |
| /* This is the new target to process. */ |
| state->current = expr; |
| return true; |
| } |
| |
| /* |
| * Create cursor for node's query with current parameter values. |
| */ |
| static void |
| create_cursor(ForeignScanState *node) |
| { |
| PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; |
| ExprContext *econtext = node->ss.ps.ps_ExprContext; |
| int numParams = fsstate->numParams; |
| const char **values = fsstate->param_values; |
| PGconn *conn = fsstate->conn; |
| StringInfoData buf; |
| PGresult *res; |
| |
| /* First, process a pending asynchronous request, if any. */ |
| if (fsstate->conn_state->pendingAreq) |
| process_pending_request(fsstate->conn_state->pendingAreq); |
| |
| /* |
| * Construct array of query parameter values in text format. We do the |
| * conversions in the short-lived per-tuple context, so as not to cause a |
| * memory leak over repeated scans. |
| */ |
| if (numParams > 0) |
| { |
| MemoryContext oldcontext; |
| |
| oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| |
| process_query_params(econtext, |
| fsstate->param_flinfo, |
| fsstate->param_exprs, |
| values); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* Construct the DECLARE CURSOR command */ |
| initStringInfo(&buf); |
| appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", |
| fsstate->cursor_number, fsstate->query); |
| |
| /* |
| * Notice that we pass NULL for paramTypes, thus forcing the remote server |
| * to infer types for all parameters. Since we explicitly cast every |
| * parameter (see deparse.c), the "inference" is trivial and will produce |
| * the desired result. This allows us to avoid assuming that the remote |
| * server has the same OIDs we do for the parameters' types. |
| */ |
| if (!PQsendQueryParams(conn, buf.data, numParams, |
| NULL, values, NULL, NULL, 0)) |
| pgfdw_report_error(ERROR, NULL, conn, false, buf.data); |
| |
| /* |
| * Get the result, and check for success. |
| * |
| * We don't use a PG_TRY block here, so be careful not to throw error |
| * without releasing the PGresult. |
| */ |
| res = pgfdw_get_result(conn, buf.data); |
| if (PQresultStatus(res) != PGRES_COMMAND_OK) |
| pgfdw_report_error(ERROR, res, conn, true, fsstate->query); |
| PQclear(res); |
| |
| /* Mark the cursor as created, and show no tuples have been retrieved */ |
| fsstate->cursor_exists = true; |
| fsstate->tuples = NULL; |
| fsstate->num_tuples = 0; |
| fsstate->next_tuple = 0; |
| fsstate->fetch_ct_2 = 0; |
| fsstate->eof_reached = false; |
| |
| /* Clean up */ |
| pfree(buf.data); |
| } |
| |
| /* |
| * Fetch some more rows from the node's cursor. |
| */ |
| static void |
| fetch_more_data(ForeignScanState *node) |
| { |
| PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; |
| PGresult *volatile res = NULL; |
| MemoryContext oldcontext; |
| |
| /* |
| * We'll store the tuples in the batch_cxt. First, flush the previous |
| * batch. |
| */ |
| fsstate->tuples = NULL; |
| MemoryContextReset(fsstate->batch_cxt); |
| oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); |
| |
| /* PGresult must be released before leaving this function. */ |
| PG_TRY(); |
| { |
| PGconn *conn = fsstate->conn; |
| int numrows; |
| int i; |
| |
| if (fsstate->async_capable) |
| { |
| Assert(fsstate->conn_state->pendingAreq); |
| |
| /* |
| * The query was already sent by an earlier call to |
| * fetch_more_data_begin. So now we just fetch the result. |
| */ |
| res = pgfdw_get_result(conn, fsstate->query); |
| /* On error, report the original query, not the FETCH. */ |
| if (PQresultStatus(res) != PGRES_TUPLES_OK) |
| pgfdw_report_error(ERROR, res, conn, false, fsstate->query); |
| |
| /* Reset per-connection state */ |
| fsstate->conn_state->pendingAreq = NULL; |
| } |
| else |
| { |
| char sql[64]; |
| |
| /* This is a regular synchronous fetch. */ |
| snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", |
| fsstate->fetch_size, fsstate->cursor_number); |
| |
| res = pgfdw_exec_query(conn, sql, fsstate->conn_state); |
| /* On error, report the original query, not the FETCH. */ |
| if (PQresultStatus(res) != PGRES_TUPLES_OK) |
| pgfdw_report_error(ERROR, res, conn, false, fsstate->query); |
| } |
| |
| /* Convert the data into HeapTuples */ |
| numrows = PQntuples(res); |
| fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); |
| fsstate->num_tuples = numrows; |
| fsstate->next_tuple = 0; |
| |
| for (i = 0; i < numrows; i++) |
| { |
| Assert(IsA(node->ss.ps.plan, ForeignScan) || IsA(node->ss.ps.plan, DynamicForeignScan)); |
| |
| fsstate->tuples[i] = |
| make_tuple_from_result_row(res, i, |
| fsstate->rel, |
| fsstate->attinmeta, |
| fsstate->retrieved_attrs, |
| node, |
| fsstate->temp_cxt); |
| } |
| |
| /* Update fetch_ct_2 */ |
| if (fsstate->fetch_ct_2 < 2) |
| fsstate->fetch_ct_2++; |
| |
| /* Must be EOF if we didn't get as many tuples as we asked for. */ |
| fsstate->eof_reached = (numrows < fsstate->fetch_size); |
| } |
| PG_FINALLY(); |
| { |
| PQclear(res); |
| } |
| PG_END_TRY(); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * Force assorted GUC parameters to settings that ensure that we'll output |
| * data values in a form that is unambiguous to the remote server. |
| * |
| * This is rather expensive and annoying to do once per row, but there's |
| * little choice if we want to be sure values are transmitted accurately; |
| * we can't leave the settings in place between rows for fear of affecting |
| * user-visible computations. |
| * |
| * We use the equivalent of a function SET option to allow the settings to |
| * persist only until the caller calls reset_transmission_modes(). If an |
| * error is thrown in between, guc.c will take care of undoing the settings. |
| * |
| * The return value is the nestlevel that must be passed to |
| * reset_transmission_modes() to undo things. |
| */ |
| int |
| set_transmission_modes(void) |
| { |
| int nestlevel = NewGUCNestLevel(); |
| |
| /* |
| * The values set here should match what pg_dump does. See also |
| * configure_remote_session in connection.c. |
| */ |
| if (DateStyle != USE_ISO_DATES) |
| (void) set_config_option("datestyle", "ISO", |
| PGC_USERSET, PGC_S_SESSION, |
| GUC_ACTION_SAVE, true, 0, false); |
| if (IntervalStyle != INTSTYLE_POSTGRES) |
| (void) set_config_option("intervalstyle", "postgres", |
| PGC_USERSET, PGC_S_SESSION, |
| GUC_ACTION_SAVE, true, 0, false); |
| if (extra_float_digits < 3) |
| (void) set_config_option("extra_float_digits", "3", |
| PGC_USERSET, PGC_S_SESSION, |
| GUC_ACTION_SAVE, true, 0, false); |
| |
| /* |
| * In addition force restrictive search_path, in case there are any |
| * regproc or similar constants to be printed. |
| */ |
| (void) set_config_option("search_path", "pg_catalog", |
| PGC_USERSET, PGC_S_SESSION, |
| GUC_ACTION_SAVE, true, 0, false); |
| |
| return nestlevel; |
| } |
| |
| /* |
| * Undo the effects of set_transmission_modes(). |
| */ |
| void |
| reset_transmission_modes(int nestlevel) |
| { |
| AtEOXact_GUC(true, nestlevel); |
| } |
| |
| /* |
| * Utility routine to close a cursor. |
| */ |
| static void |
| close_cursor(PGconn *conn, unsigned int cursor_number, |
| PgFdwConnState *conn_state) |
| { |
| char sql[64]; |
| PGresult *res; |
| |
| snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number); |
| |
| /* |
| * We don't use a PG_TRY block here, so be careful not to throw error |
| * without releasing the PGresult. |
| */ |
| res = pgfdw_exec_query(conn, sql, conn_state); |
| if (PQresultStatus(res) != PGRES_COMMAND_OK) |
| pgfdw_report_error(ERROR, res, conn, true, sql); |
| PQclear(res); |
| } |
| |
| /* |
| * create_foreign_modify |
| * Construct an execution state of a foreign insert/update/delete |
| * operation |
| */ |
| static PgFdwModifyState * |
| create_foreign_modify(EState *estate, |
| RangeTblEntry *rte, |
| ResultRelInfo *resultRelInfo, |
| CmdType operation, |
| Plan *subplan, |
| char *query, |
| List *target_attrs, |
| int values_end, |
| bool has_returning, |
| List *retrieved_attrs) |
| { |
| PgFdwModifyState *fmstate; |
| Relation rel = resultRelInfo->ri_RelationDesc; |
| TupleDesc tupdesc = RelationGetDescr(rel); |
| Oid userid; |
| ForeignTable *table; |
| UserMapping *user; |
| AttrNumber n_params; |
| Oid typefnoid; |
| bool isvarlena; |
| ListCell *lc; |
| |
| /* Begin constructing PgFdwModifyState. */ |
| fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); |
| fmstate->rel = rel; |
| |
| /* Identify which user to do the remote access as. */ |
| userid = ExecGetResultRelCheckAsUser(resultRelInfo, estate); |
| |
| /* Get info about foreign table. */ |
| table = GetForeignTable(RelationGetRelid(rel)); |
| user = GetUserMapping(userid, table->serverid); |
| |
| /* Open connection; report that we'll create a prepared statement. */ |
| fmstate->conn = GetConnection(user, true, &fmstate->conn_state); |
| fmstate->p_name = NULL; /* prepared statement not made yet */ |
| |
| /* Set up remote query information. */ |
| fmstate->query = query; |
| if (operation == CMD_INSERT) |
| { |
| fmstate->query = pstrdup(fmstate->query); |
| fmstate->orig_query = pstrdup(fmstate->query); |
| } |
| fmstate->target_attrs = target_attrs; |
| fmstate->values_end = values_end; |
| fmstate->has_returning = has_returning; |
| fmstate->retrieved_attrs = retrieved_attrs; |
| |
| /* Create context for per-tuple temp workspace. */ |
| fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, |
| "postgres_fdw temporary data", |
| ALLOCSET_SMALL_SIZES); |
| |
| /* Prepare for input conversion of RETURNING results. */ |
| if (fmstate->has_returning) |
| fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); |
| |
| /* Prepare for output conversion of parameters used in prepared stmt. */ |
| n_params = list_length(fmstate->target_attrs) + 1; |
| fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); |
| fmstate->p_nums = 0; |
| |
| if (operation == CMD_UPDATE || operation == CMD_DELETE) |
| { |
| Assert(subplan != NULL); |
| |
| /* Find the ctid resjunk column in the subplan's result */ |
| fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, |
| "ctid"); |
| if (!AttributeNumberIsValid(fmstate->ctidAttno)) |
| elog(ERROR, "could not find junk ctid column"); |
| |
| /* First transmittable parameter will be ctid */ |
| getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); |
| fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); |
| fmstate->p_nums++; |
| } |
| |
| if (operation == CMD_INSERT || operation == CMD_UPDATE) |
| { |
| /* Set up for remaining transmittable parameters */ |
| foreach(lc, fmstate->target_attrs) |
| { |
| int attnum = lfirst_int(lc); |
| Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); |
| |
| Assert(!attr->attisdropped); |
| |
| /* Ignore generated columns; they are set to DEFAULT */ |
| if (attr->attgenerated) |
| continue; |
| getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); |
| fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); |
| fmstate->p_nums++; |
| } |
| } |
| |
| Assert(fmstate->p_nums <= n_params); |
| |
| /* Set batch_size from foreign server/table options. */ |
| if (operation == CMD_INSERT) |
| fmstate->batch_size = get_batch_size_option(rel); |
| |
| fmstate->num_slots = 1; |
| |
| /* Initialize auxiliary state */ |
| fmstate->aux_fmstate = NULL; |
| |
| return fmstate; |
| } |
| |
| /* |
| * execute_foreign_modify |
| * Perform foreign-table modification as required, and fetch RETURNING |
| * result if any. (This is the shared guts of postgresExecForeignInsert, |
| * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and |
| * postgresExecForeignDelete.) |
| */ |
| static TupleTableSlot ** |
| execute_foreign_modify(EState *estate, |
| ResultRelInfo *resultRelInfo, |
| CmdType operation, |
| TupleTableSlot **slots, |
| TupleTableSlot **planSlots, |
| int *numSlots) |
| { |
| PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; |
| ItemPointer ctid = NULL; |
| const char **p_values; |
| PGresult *res; |
| int n_rows; |
| StringInfoData sql; |
| |
| /* The operation should be INSERT, UPDATE, or DELETE */ |
| Assert(operation == CMD_INSERT || |
| operation == CMD_UPDATE || |
| operation == CMD_DELETE); |
| |
| /* First, process a pending asynchronous request, if any. */ |
| if (fmstate->conn_state->pendingAreq) |
| process_pending_request(fmstate->conn_state->pendingAreq); |
| |
| /* |
| * If the existing query was deparsed and prepared for a different number |
| * of rows, rebuild it for the proper number. |
| */ |
| if (operation == CMD_INSERT && fmstate->num_slots != *numSlots) |
| { |
| /* Destroy the prepared statement created previously */ |
| if (fmstate->p_name) |
| deallocate_query(fmstate); |
| |
| /* Build INSERT string with numSlots records in its VALUES clause. */ |
| initStringInfo(&sql); |
| rebuildInsertSql(&sql, fmstate->rel, |
| fmstate->orig_query, fmstate->target_attrs, |
| fmstate->values_end, fmstate->p_nums, |
| *numSlots - 1); |
| pfree(fmstate->query); |
| fmstate->query = sql.data; |
| fmstate->num_slots = *numSlots; |
| } |
| |
| /* Set up the prepared statement on the remote server, if we didn't yet */ |
| if (!fmstate->p_name) |
| prepare_foreign_modify(fmstate); |
| |
| /* |
| * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column |
| */ |
| if (operation == CMD_UPDATE || operation == CMD_DELETE) |
| { |
| Datum datum; |
| bool isNull; |
| |
| datum = ExecGetJunkAttribute(planSlots[0], |
| fmstate->ctidAttno, |
| &isNull); |
| /* shouldn't ever get a null result... */ |
| if (isNull) |
| elog(ERROR, "ctid is NULL"); |
| ctid = (ItemPointer) DatumGetPointer(datum); |
| } |
| |
| /* Convert parameters needed by prepared statement to text form */ |
| p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); |
| |
| /* |
| * Execute the prepared statement. |
| */ |
| if (!PQsendQueryPrepared(fmstate->conn, |
| fmstate->p_name, |
| fmstate->p_nums * (*numSlots), |
| p_values, |
| NULL, |
| NULL, |
| 0)) |
| pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); |
| |
| /* |
| * Get the result, and check for success. |
| * |
| * We don't use a PG_TRY block here, so be careful not to throw error |
| * without releasing the PGresult. |
| */ |
| res = pgfdw_get_result(fmstate->conn, fmstate->query); |
| if (PQresultStatus(res) != |
| (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) |
| pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); |
| |
| /* Check number of rows affected, and fetch RETURNING tuple if any */ |
| if (fmstate->has_returning) |
| { |
| Assert(*numSlots == 1); |
| n_rows = PQntuples(res); |
| if (n_rows > 0) |
| store_returning_result(fmstate, slots[0], res); |
| } |
| else |
| n_rows = atoi(PQcmdTuples(res)); |
| |
| /* And clean up */ |
| PQclear(res); |
| |
| MemoryContextReset(fmstate->temp_cxt); |
| |
| *numSlots = n_rows; |
| |
| /* |
| * Return NULL if nothing was inserted/updated/deleted on the remote end |
| */ |
| return (n_rows > 0) ? slots : NULL; |
| } |
| |
| /* |
| * prepare_foreign_modify |
| * Establish a prepared statement for execution of INSERT/UPDATE/DELETE |
| */ |
| static void |
| prepare_foreign_modify(PgFdwModifyState *fmstate) |
| { |
| char prep_name[NAMEDATALEN]; |
| char *p_name; |
| PGresult *res; |
| |
| /* |
| * The caller would already have processed a pending asynchronous request |
| * if any, so no need to do it here. |
| */ |
| |
| /* Construct name we'll use for the prepared statement. */ |
| snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", |
| GetPrepStmtNumber(fmstate->conn)); |
| p_name = pstrdup(prep_name); |
| |
| /* |
| * We intentionally do not specify parameter types here, but leave the |
| * remote server to derive them by default. This avoids possible problems |
| * with the remote server using different type OIDs than we do. All of |
| * the prepared statements we use in this module are simple enough that |
| * the remote server will make the right choices. |
| */ |
| if (!PQsendPrepare(fmstate->conn, |
| p_name, |
| fmstate->query, |
| 0, |
| NULL)) |
| pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); |
| |
| /* |
| * Get the result, and check for success. |
| * |
| * We don't use a PG_TRY block here, so be careful not to throw error |
| * without releasing the PGresult. |
| */ |
| res = pgfdw_get_result(fmstate->conn, fmstate->query); |
| if (PQresultStatus(res) != PGRES_COMMAND_OK) |
| pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); |
| PQclear(res); |
| |
| /* This action shows that the prepare has been done. */ |
| fmstate->p_name = p_name; |
| } |
| |
| /* |
| * convert_prep_stmt_params |
| * Create array of text strings representing parameter values |
| * |
| * tupleid is ctid to send, or NULL if none |
| * slot is slot to get remaining parameters from, or NULL if none |
| * |
| * Data is constructed in temp_cxt; caller should reset that after use. |
| */ |
| static const char ** |
| convert_prep_stmt_params(PgFdwModifyState *fmstate, |
| ItemPointer tupleid, |
| TupleTableSlot **slots, |
| int numSlots) |
| { |
| const char **p_values; |
| int i; |
| int j; |
| int pindex = 0; |
| MemoryContext oldcontext; |
| |
| oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); |
| |
| p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots); |
| |
| /* ctid is provided only for UPDATE/DELETE, which don't allow batching */ |
| Assert(!(tupleid != NULL && numSlots > 1)); |
| |
| /* 1st parameter should be ctid, if it's in use */ |
| if (tupleid != NULL) |
| { |
| Assert(numSlots == 1); |
| /* don't need set_transmission_modes for TID output */ |
| p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], |
| PointerGetDatum(tupleid)); |
| pindex++; |
| } |
| |
| /* get following parameters from slots */ |
| if (slots != NULL && fmstate->target_attrs != NIL) |
| { |
| TupleDesc tupdesc = RelationGetDescr(fmstate->rel); |
| int nestlevel; |
| ListCell *lc; |
| |
| nestlevel = set_transmission_modes(); |
| |
| for (i = 0; i < numSlots; i++) |
| { |
| j = (tupleid != NULL) ? 1 : 0; |
| foreach(lc, fmstate->target_attrs) |
| { |
| int attnum = lfirst_int(lc); |
| Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); |
| Datum value; |
| bool isnull; |
| |
| /* Ignore generated columns; they are set to DEFAULT */ |
| if (attr->attgenerated) |
| continue; |
| value = slot_getattr(slots[i], attnum, &isnull); |
| if (isnull) |
| p_values[pindex] = NULL; |
| else |
| p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j], |
| value); |
| pindex++; |
| j++; |
| } |
| } |
| |
| reset_transmission_modes(nestlevel); |
| } |
| |
| Assert(pindex == fmstate->p_nums * numSlots); |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| return p_values; |
| } |
| |
| /* |
| * store_returning_result |
| * Store the result of a RETURNING clause |
| * |
| * On error, be sure to release the PGresult on the way out. Callers do not |
| * have PG_TRY blocks to ensure this happens. |
| */ |
| static void |
| store_returning_result(PgFdwModifyState *fmstate, |
| TupleTableSlot *slot, PGresult *res) |
| { |
| PG_TRY(); |
| { |
| HeapTuple newtup; |
| |
| newtup = make_tuple_from_result_row(res, 0, |
| fmstate->rel, |
| fmstate->attinmeta, |
| fmstate->retrieved_attrs, |
| NULL, |
| fmstate->temp_cxt); |
| |
| /* |
| * The returning slot will not necessarily be suitable to store |
| * heaptuples directly, so allow for conversion. |
| */ |
| ExecForceStoreHeapTuple(newtup, slot, true); |
| } |
| PG_CATCH(); |
| { |
| PQclear(res); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| } |
| |
| /* |
| * finish_foreign_modify |
| * Release resources for a foreign insert/update/delete operation |
| */ |
| static void |
| finish_foreign_modify(PgFdwModifyState *fmstate) |
| { |
| Assert(fmstate != NULL); |
| |
| /* If we created a prepared statement, destroy it */ |
| deallocate_query(fmstate); |
| |
| /* Release remote connection */ |
| ReleaseConnection(fmstate->conn); |
| fmstate->conn = NULL; |
| } |
| |
| /* |
| * deallocate_query |
| * Deallocate a prepared statement for a foreign insert/update/delete |
| * operation |
| */ |
| static void |
| deallocate_query(PgFdwModifyState *fmstate) |
| { |
| char sql[64]; |
| PGresult *res; |
| |
| /* do nothing if the query is not allocated */ |
| if (!fmstate->p_name) |
| return; |
| |
| snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); |
| |
| /* |
| * We don't use a PG_TRY block here, so be careful not to throw error |
| * without releasing the PGresult. |
| */ |
| res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state); |
| if (PQresultStatus(res) != PGRES_COMMAND_OK) |
| pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); |
| PQclear(res); |
| pfree(fmstate->p_name); |
| fmstate->p_name = NULL; |
| } |
| |
| /* |
| * build_remote_returning |
| * Build a RETURNING targetlist of a remote query for performing an |
| * UPDATE/DELETE .. RETURNING on a join directly |
| */ |
| static List * |
| build_remote_returning(Index rtindex, Relation rel, List *returningList) |
| { |
| bool have_wholerow = false; |
| List *tlist = NIL; |
| List *vars; |
| ListCell *lc; |
| |
| Assert(returningList); |
| |
| vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS); |
| |
| /* |
| * If there's a whole-row reference to the target relation, then we'll |
| * need all the columns of the relation. |
| */ |
| foreach(lc, vars) |
| { |
| Var *var = (Var *) lfirst(lc); |
| |
| if (IsA(var, Var) && |
| var->varno == rtindex && |
| var->varattno == InvalidAttrNumber) |
| { |
| have_wholerow = true; |
| break; |
| } |
| } |
| |
| if (have_wholerow) |
| { |
| TupleDesc tupdesc = RelationGetDescr(rel); |
| int i; |
| |
| for (i = 1; i <= tupdesc->natts; i++) |
| { |
| Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1); |
| Var *var; |
| |
| /* Ignore dropped attributes. */ |
| if (attr->attisdropped) |
| continue; |
| |
| var = makeVar(rtindex, |
| i, |
| attr->atttypid, |
| attr->atttypmod, |
| attr->attcollation, |
| 0); |
| |
| tlist = lappend(tlist, |
| makeTargetEntry((Expr *) var, |
| list_length(tlist) + 1, |
| NULL, |
| false)); |
| } |
| } |
| |
| /* Now add any remaining columns to tlist. */ |
| foreach(lc, vars) |
| { |
| Var *var = (Var *) lfirst(lc); |
| |
| /* |
| * No need for whole-row references to the target relation. We don't |
| * need system columns other than ctid and oid either, since those are |
| * set locally. |
| */ |
| if (IsA(var, Var) && |
| var->varno == rtindex && |
| var->varattno <= InvalidAttrNumber && |
| var->varattno != SelfItemPointerAttributeNumber) |
| continue; /* don't need it */ |
| |
| if (tlist_member((Expr *) var, tlist)) |
| continue; /* already got it */ |
| |
| tlist = lappend(tlist, |
| makeTargetEntry((Expr *) var, |
| list_length(tlist) + 1, |
| NULL, |
| false)); |
| } |
| |
| list_free(vars); |
| |
| return tlist; |
| } |
| |
| /* |
| * rebuild_fdw_scan_tlist |
| * Build new fdw_scan_tlist of given foreign-scan plan node from given |
| * tlist |
| * |
| * There might be columns that the fdw_scan_tlist of the given foreign-scan |
| * plan node contains that the given tlist doesn't. The fdw_scan_tlist would |
| * have contained resjunk columns such as 'ctid' of the target relation and |
| * 'wholerow' of non-target relations, but the tlist might not contain them, |
| * for example. So, adjust the tlist so it contains all the columns specified |
| * in the fdw_scan_tlist; else setrefs.c will get confused. |
| */ |
| static void |
| rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist) |
| { |
| List *new_tlist = tlist; |
| List *old_tlist = fscan->fdw_scan_tlist; |
| ListCell *lc; |
| |
| foreach(lc, old_tlist) |
| { |
| TargetEntry *tle = (TargetEntry *) lfirst(lc); |
| |
| if (tlist_member(tle->expr, new_tlist)) |
| continue; /* already got it */ |
| |
| new_tlist = lappend(new_tlist, |
| makeTargetEntry(tle->expr, |
| list_length(new_tlist) + 1, |
| NULL, |
| false)); |
| } |
| fscan->fdw_scan_tlist = new_tlist; |
| } |
| |
| /* |
| * Execute a direct UPDATE/DELETE statement. |
| */ |
| static void |
| execute_dml_stmt(ForeignScanState *node) |
| { |
| PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; |
| ExprContext *econtext = node->ss.ps.ps_ExprContext; |
| int numParams = dmstate->numParams; |
| const char **values = dmstate->param_values; |
| |
| /* First, process a pending asynchronous request, if any. */ |
| if (dmstate->conn_state->pendingAreq) |
| process_pending_request(dmstate->conn_state->pendingAreq); |
| |
| /* |
| * Construct array of query parameter values in text format. |
| */ |
| if (numParams > 0) |
| process_query_params(econtext, |
| dmstate->param_flinfo, |
| dmstate->param_exprs, |
| values); |
| |
| /* |
| * Notice that we pass NULL for paramTypes, thus forcing the remote server |
| * to infer types for all parameters. Since we explicitly cast every |
| * parameter (see deparse.c), the "inference" is trivial and will produce |
| * the desired result. This allows us to avoid assuming that the remote |
| * server has the same OIDs we do for the parameters' types. |
| */ |
| if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, |
| NULL, values, NULL, NULL, 0)) |
| pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); |
| |
| /* |
| * Get the result, and check for success. |
| * |
| * We don't use a PG_TRY block here, so be careful not to throw error |
| * without releasing the PGresult. |
| */ |
| dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); |
| if (PQresultStatus(dmstate->result) != |
| (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) |
| pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, |
| dmstate->query); |
| |
| /* Get the number of rows affected. */ |
| if (dmstate->has_returning) |
| dmstate->num_tuples = PQntuples(dmstate->result); |
| else |
| dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result)); |
| } |
| |
| /* |
| * Get the result of a RETURNING clause. |
| */ |
| static TupleTableSlot * |
| get_returning_data(ForeignScanState *node) |
| { |
| PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; |
| EState *estate = node->ss.ps.state; |
| ResultRelInfo *resultRelInfo = node->resultRelInfo; |
| TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; |
| TupleTableSlot *resultSlot; |
| |
| Assert(resultRelInfo->ri_projectReturning); |
| |
| /* If we didn't get any tuples, must be end of data. */ |
| if (dmstate->next_tuple >= dmstate->num_tuples) |
| return ExecClearTuple(slot); |
| |
| /* Increment the command es_processed count if necessary. */ |
| if (dmstate->set_processed) |
| estate->es_processed += 1; |
| |
| /* |
| * Store a RETURNING tuple. If has_returning is false, just emit a dummy |
| * tuple. (has_returning is false when the local query is of the form |
| * "UPDATE/DELETE .. RETURNING 1" for example.) |
| */ |
| if (!dmstate->has_returning) |
| { |
| ExecStoreAllNullTuple(slot); |
| resultSlot = slot; |
| } |
| else |
| { |
| /* |
| * On error, be sure to release the PGresult on the way out. Callers |
| * do not have PG_TRY blocks to ensure this happens. |
| */ |
| PG_TRY(); |
| { |
| HeapTuple newtup; |
| |
| newtup = make_tuple_from_result_row(dmstate->result, |
| dmstate->next_tuple, |
| dmstate->rel, |
| dmstate->attinmeta, |
| dmstate->retrieved_attrs, |
| node, |
| dmstate->temp_cxt); |
| ExecStoreHeapTuple(newtup, slot, false); |
| } |
| PG_CATCH(); |
| { |
| PQclear(dmstate->result); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| /* Get the updated/deleted tuple. */ |
| if (dmstate->rel) |
| resultSlot = slot; |
| else |
| resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate); |
| } |
| dmstate->next_tuple++; |
| |
| /* Make slot available for evaluation of the local query RETURNING list. */ |
| resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = |
| resultSlot; |
| |
| return slot; |
| } |
| |
| /* |
| * Initialize a filter to extract an updated/deleted tuple from a scan tuple. |
| */ |
| static void |
| init_returning_filter(PgFdwDirectModifyState *dmstate, |
| List *fdw_scan_tlist, |
| Index rtindex) |
| { |
| TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); |
| ListCell *lc; |
| int i; |
| |
| /* |
| * Calculate the mapping between the fdw_scan_tlist's entries and the |
| * result tuple's attributes. |
| * |
| * The "map" is an array of indexes of the result tuple's attributes in |
| * fdw_scan_tlist, i.e., one entry for every attribute of the result |
| * tuple. We store zero for any attributes that don't have the |
| * corresponding entries in that list, marking that a NULL is needed in |
| * the result tuple. |
| * |
| * Also get the indexes of the entries for ctid and oid if any. |
| */ |
| dmstate->attnoMap = (AttrNumber *) |
| palloc0(resultTupType->natts * sizeof(AttrNumber)); |
| |
| dmstate->ctidAttno = dmstate->oidAttno = 0; |
| |
| i = 1; |
| dmstate->hasSystemCols = false; |
| foreach(lc, fdw_scan_tlist) |
| { |
| TargetEntry *tle = (TargetEntry *) lfirst(lc); |
| Var *var = (Var *) tle->expr; |
| |
| Assert(IsA(var, Var)); |
| |
| /* |
| * If the Var is a column of the target relation to be retrieved from |
| * the foreign server, get the index of the entry. |
| */ |
| if (var->varno == rtindex && |
| list_member_int(dmstate->retrieved_attrs, i)) |
| { |
| int attrno = var->varattno; |
| |
| if (attrno < 0) |
| { |
| /* |
| * We don't retrieve system columns other than ctid and oid. |
| */ |
| if (attrno == SelfItemPointerAttributeNumber) |
| dmstate->ctidAttno = i; |
| else |
| Assert(false); |
| dmstate->hasSystemCols = true; |
| } |
| else |
| { |
| /* |
| * We don't retrieve whole-row references to the target |
| * relation either. |
| */ |
| Assert(attrno > 0); |
| |
| dmstate->attnoMap[attrno - 1] = i; |
| } |
| } |
| i++; |
| } |
| } |
| |
| /* |
| * Extract and return an updated/deleted tuple from a scan tuple. |
| */ |
| static TupleTableSlot * |
| apply_returning_filter(PgFdwDirectModifyState *dmstate, |
| ResultRelInfo *resultRelInfo, |
| TupleTableSlot *slot, |
| EState *estate) |
| { |
| TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); |
| TupleTableSlot *resultSlot; |
| Datum *values; |
| bool *isnull; |
| Datum *old_values; |
| bool *old_isnull; |
| int i; |
| |
| /* |
| * Use the return tuple slot as a place to store the result tuple. |
| */ |
| resultSlot = ExecGetReturningSlot(estate, resultRelInfo); |
| |
| /* |
| * Extract all the values of the scan tuple. |
| */ |
| slot_getallattrs(slot); |
| old_values = slot->tts_values; |
| old_isnull = slot->tts_isnull; |
| |
| /* |
| * Prepare to build the result tuple. |
| */ |
| ExecClearTuple(resultSlot); |
| values = resultSlot->tts_values; |
| isnull = resultSlot->tts_isnull; |
| |
| /* |
| * Transpose data into proper fields of the result tuple. |
| */ |
| for (i = 0; i < resultTupType->natts; i++) |
| { |
| int j = dmstate->attnoMap[i]; |
| |
| if (j == 0) |
| { |
| values[i] = (Datum) 0; |
| isnull[i] = true; |
| } |
| else |
| { |
| values[i] = old_values[j - 1]; |
| isnull[i] = old_isnull[j - 1]; |
| } |
| } |
| |
| /* |
| * Build the virtual tuple. |
| */ |
| ExecStoreVirtualTuple(resultSlot); |
| |
| /* |
| * If we have any system columns to return, materialize a heap tuple in |
| * the slot from column values set above and install system columns in |
| * that tuple. |
| */ |
| if (dmstate->hasSystemCols) |
| { |
| HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL); |
| |
| /* ctid */ |
| if (dmstate->ctidAttno) |
| { |
| ItemPointer ctid = NULL; |
| |
| ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]); |
| resultTup->t_self = *ctid; |
| } |
| |
| /* |
| * And remaining columns |
| * |
| * Note: since we currently don't allow the target relation to appear |
| * on the nullable side of an outer join, any system columns wouldn't |
| * go to NULL. |
| * |
| * Note: no need to care about tableoid here because it will be |
| * initialized in ExecProcessReturning(). |
| */ |
| HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId); |
| HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId); |
| HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId); |
| } |
| |
| /* |
| * And return the result tuple. |
| */ |
| return resultSlot; |
| } |
| |
| /* |
| * Prepare for processing of parameters used in remote query. |
| */ |
| static void |
| prepare_query_params(PlanState *node, |
| List *fdw_exprs, |
| int numParams, |
| FmgrInfo **param_flinfo, |
| List **param_exprs, |
| const char ***param_values) |
| { |
| int i; |
| ListCell *lc; |
| |
| Assert(numParams > 0); |
| |
| /* Prepare for output conversion of parameters used in remote query. */ |
| *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams); |
| |
| i = 0; |
| foreach(lc, fdw_exprs) |
| { |
| Node *param_expr = (Node *) lfirst(lc); |
| Oid typefnoid; |
| bool isvarlena; |
| |
| getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); |
| fmgr_info(typefnoid, &(*param_flinfo)[i]); |
| i++; |
| } |
| |
| /* |
| * Prepare remote-parameter expressions for evaluation. (Note: in |
| * practice, we expect that all these expressions will be just Params, so |
| * we could possibly do something more efficient than using the full |
| * expression-eval machinery for this. But probably there would be little |
| * benefit, and it'd require postgres_fdw to know more than is desirable |
| * about Param evaluation.) |
| */ |
| *param_exprs = ExecInitExprList(fdw_exprs, node); |
| |
| /* Allocate buffer for text form of query parameters. */ |
| *param_values = (const char **) palloc0(numParams * sizeof(char *)); |
| } |
| |
| /* |
| * Construct array of query parameter values in text format. |
| */ |
| static void |
| process_query_params(ExprContext *econtext, |
| FmgrInfo *param_flinfo, |
| List *param_exprs, |
| const char **param_values) |
| { |
| int nestlevel; |
| int i; |
| ListCell *lc; |
| |
| nestlevel = set_transmission_modes(); |
| |
| i = 0; |
| foreach(lc, param_exprs) |
| { |
| ExprState *expr_state = (ExprState *) lfirst(lc); |
| Datum expr_value; |
| bool isNull; |
| |
| /* Evaluate the parameter expression */ |
| expr_value = ExecEvalExpr(expr_state, econtext, &isNull); |
| |
| /* |
| * Get string representation of each parameter value by invoking |
| * type-specific output function, unless the value is null. |
| */ |
| if (isNull) |
| param_values[i] = NULL; |
| else |
| param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value); |
| |
| i++; |
| } |
| |
| reset_transmission_modes(nestlevel); |
| } |
| |
| /* |
| * postgresAnalyzeForeignTable |
| * Test whether analyzing this foreign table is supported |
| */ |
| static bool |
| postgresAnalyzeForeignTable(Relation relation, |
| AcquireSampleRowsFunc *func, |
| BlockNumber *totalpages) |
| { |
| ForeignTable *table; |
| UserMapping *user; |
| PGconn *conn; |
| StringInfoData sql; |
| PGresult *volatile res = NULL; |
| |
| /* Return the row-analysis function pointer */ |
| *func = postgresAcquireSampleRowsFunc; |
| |
| /* |
| * Now we have to get the number of pages. It's annoying that the ANALYZE |
| * API requires us to return that now, because it forces some duplication |
| * of effort between this routine and postgresAcquireSampleRowsFunc. But |
| * it's probably not worth redefining that API at this point. |
| */ |
| |
| /* |
| * Get the connection to use. We do the remote access as the table's |
| * owner, even if the ANALYZE was started by some other user. |
| */ |
| table = GetForeignTable(RelationGetRelid(relation)); |
| user = GetUserMapping(relation->rd_rel->relowner, table->serverid); |
| conn = GetConnection(user, false, NULL); |
| |
| /* |
| * Construct command to get page count for relation. |
| */ |
| initStringInfo(&sql); |
| deparseAnalyzeSizeSql(&sql, relation); |
| |
| /* In what follows, do not risk leaking any PGresults. */ |
| PG_TRY(); |
| { |
| res = pgfdw_exec_query(conn, sql.data, NULL); |
| if (PQresultStatus(res) != PGRES_TUPLES_OK) |
| pgfdw_report_error(ERROR, res, conn, false, sql.data); |
| |
| if (PQntuples(res) != 1 || PQnfields(res) != 1) |
| elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); |
| *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10); |
| } |
| PG_FINALLY(); |
| { |
| PQclear(res); |
| } |
| PG_END_TRY(); |
| |
| ReleaseConnection(conn); |
| |
| return true; |
| } |
| |
| /* |
| * postgresGetAnalyzeInfoForForeignTable |
| * Count tuples in foreign table (just get pg_class.reltuples). |
| * |
| * can_tablesample determines if the remote relation supports acquiring the |
| * sample using TABLESAMPLE. |
| */ |
| static double |
| postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample) |
| { |
| ForeignTable *table; |
| UserMapping *user; |
| PGconn *conn; |
| StringInfoData sql; |
| PGresult *volatile res = NULL; |
| volatile double reltuples = -1; |
| volatile char relkind = 0; |
| |
| /* assume the remote relation does not support TABLESAMPLE */ |
| *can_tablesample = false; |
| |
| /* |
| * Get the connection to use. We do the remote access as the table's |
| * owner, even if the ANALYZE was started by some other user. |
| */ |
| table = GetForeignTable(RelationGetRelid(relation)); |
| user = GetUserMapping(relation->rd_rel->relowner, table->serverid); |
| conn = GetConnection(user, false, NULL); |
| |
| /* |
| * Construct command to get page count for relation. |
| */ |
| initStringInfo(&sql); |
| deparseAnalyzeInfoSql(&sql, relation); |
| |
| /* In what follows, do not risk leaking any PGresults. */ |
| PG_TRY(); |
| { |
| res = pgfdw_exec_query(conn, sql.data, NULL); |
| if (PQresultStatus(res) != PGRES_TUPLES_OK) |
| pgfdw_report_error(ERROR, res, conn, false, sql.data); |
| |
| if (PQntuples(res) != 1 || PQnfields(res) != 2) |
| elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query"); |
| reltuples = strtod(PQgetvalue(res, 0, 0), NULL); |
| relkind = *(PQgetvalue(res, 0, 1)); |
| } |
| PG_FINALLY(); |
| { |
| if (res) |
| PQclear(res); |
| } |
| PG_END_TRY(); |
| |
| ReleaseConnection(conn); |
| |
| /* TABLESAMPLE is supported only for regular tables and matviews */ |
| *can_tablesample = (relkind == RELKIND_RELATION || |
| relkind == RELKIND_MATVIEW || |
| relkind == RELKIND_PARTITIONED_TABLE); |
| |
| return reltuples; |
| } |
| |
| /* |
| * Acquire a random sample of rows from foreign table managed by postgres_fdw. |
| * |
| * Selected rows are returned in the caller-allocated array rows[], |
| * which must have at least targrows entries. |
| * The actual number of rows selected is returned as the function result. |
| * We also count the total number of rows in the table and return it into |
| * *totalrows. Note that *totaldeadrows is always set to 0. |
| * |
| * Note that the returned list of rows is not always in order by physical |
| * position in the table. Therefore, correlation estimates derived later |
| * may be meaningless, but it's OK because we don't use the estimates |
| * currently (the planner only pays attention to correlation for indexscans). |
| */ |
| static int |
| postgresAcquireSampleRowsFunc(Relation relation, int elevel, |
| HeapTuple *rows, int targrows, |
| double *totalrows, |
| double *totaldeadrows) |
| { |
| PgFdwAnalyzeState astate; |
| ForeignTable *table; |
| ForeignServer *server; |
| UserMapping *user; |
| PGconn *conn; |
| int server_version_num; |
| PgFdwSamplingMethod method = ANALYZE_SAMPLE_AUTO; /* auto is default */ |
| double sample_frac = -1.0; |
| double reltuples; |
| unsigned int cursor_number; |
| StringInfoData sql; |
| PGresult *volatile res = NULL; |
| ListCell *lc; |
| |
| /* Initialize workspace state */ |
| astate.rel = relation; |
| astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation)); |
| |
| astate.rows = rows; |
| astate.targrows = targrows; |
| astate.numrows = 0; |
| astate.samplerows = 0; |
| astate.rowstoskip = -1; /* -1 means not set yet */ |
| reservoir_init_selection_state(&astate.rstate, targrows); |
| |
| /* Remember ANALYZE context, and create a per-tuple temp context */ |
| astate.anl_cxt = CurrentMemoryContext; |
| astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext, |
| "postgres_fdw temporary data", |
| ALLOCSET_SMALL_SIZES); |
| |
| /* |
| * Get the connection to use. We do the remote access as the table's |
| * owner, even if the ANALYZE was started by some other user. |
| */ |
| table = GetForeignTable(RelationGetRelid(relation)); |
| server = GetForeignServer(table->serverid); |
| user = GetUserMapping(relation->rd_rel->relowner, table->serverid); |
| conn = GetConnection(user, false, NULL); |
| |
| /* We'll need server version, so fetch it now. */ |
| server_version_num = PQserverVersion(conn); |
| |
| /* |
| * What sampling method should we use? |
| */ |
| foreach(lc, server->options) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (strcmp(def->defname, "analyze_sampling") == 0) |
| { |
| char *value = defGetString(def); |
| |
| if (strcmp(value, "off") == 0) |
| method = ANALYZE_SAMPLE_OFF; |
| else if (strcmp(value, "auto") == 0) |
| method = ANALYZE_SAMPLE_AUTO; |
| else if (strcmp(value, "random") == 0) |
| method = ANALYZE_SAMPLE_RANDOM; |
| else if (strcmp(value, "system") == 0) |
| method = ANALYZE_SAMPLE_SYSTEM; |
| else if (strcmp(value, "bernoulli") == 0) |
| method = ANALYZE_SAMPLE_BERNOULLI; |
| |
| break; |
| } |
| } |
| |
| foreach(lc, table->options) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (strcmp(def->defname, "analyze_sampling") == 0) |
| { |
| char *value = defGetString(def); |
| |
| if (strcmp(value, "off") == 0) |
| method = ANALYZE_SAMPLE_OFF; |
| else if (strcmp(value, "auto") == 0) |
| method = ANALYZE_SAMPLE_AUTO; |
| else if (strcmp(value, "random") == 0) |
| method = ANALYZE_SAMPLE_RANDOM; |
| else if (strcmp(value, "system") == 0) |
| method = ANALYZE_SAMPLE_SYSTEM; |
| else if (strcmp(value, "bernoulli") == 0) |
| method = ANALYZE_SAMPLE_BERNOULLI; |
| |
| break; |
| } |
| } |
| |
| /* |
| * Error-out if explicitly required one of the TABLESAMPLE methods, but |
| * the server does not support it. |
| */ |
| if ((server_version_num < 95000) && |
| (method == ANALYZE_SAMPLE_SYSTEM || |
| method == ANALYZE_SAMPLE_BERNOULLI)) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("remote server does not support TABLESAMPLE feature"))); |
| |
| /* |
| * If we've decided to do remote sampling, calculate the sampling rate. We |
| * need to get the number of tuples from the remote server, but skip that |
| * network round-trip if not needed. |
| */ |
| if (method != ANALYZE_SAMPLE_OFF) |
| { |
| bool can_tablesample; |
| |
| reltuples = postgresGetAnalyzeInfoForForeignTable(relation, |
| &can_tablesample); |
| |
| /* |
| * Make sure we're not choosing TABLESAMPLE when the remote relation |
| * does not support that. But only do this for "auto" - if the user |
| * explicitly requested BERNOULLI/SYSTEM, it's better to fail. |
| */ |
| if (!can_tablesample && (method == ANALYZE_SAMPLE_AUTO)) |
| method = ANALYZE_SAMPLE_RANDOM; |
| |
| /* |
| * Remote's reltuples could be 0 or -1 if the table has never been |
| * vacuumed/analyzed. In that case, disable sampling after all. |
| */ |
| if ((reltuples <= 0) || (targrows >= reltuples)) |
| method = ANALYZE_SAMPLE_OFF; |
| else |
| { |
| /* |
| * All supported sampling methods require sampling rate, not |
| * target rows directly, so we calculate that using the remote |
| * reltuples value. That's imperfect, because it might be off a |
| * good deal, but that's not something we can (or should) address |
| * here. |
| * |
| * If reltuples is too low (i.e. when table grew), we'll end up |
| * sampling more rows - but then we'll apply the local sampling, |
| * so we get the expected sample size. This is the same outcome as |
| * without remote sampling. |
| * |
| * If reltuples is too high (e.g. after bulk DELETE), we will end |
| * up sampling too few rows. |
| * |
| * We can't really do much better here - we could try sampling a |
| * bit more rows, but we don't know how off the reltuples value is |
| * so how much is "a bit more"? |
| * |
| * Furthermore, the targrows value for partitions is determined |
| * based on table size (relpages), which can be off in different |
| * ways too. Adjusting the sampling rate here might make the issue |
| * worse. |
| */ |
| sample_frac = targrows / reltuples; |
| |
| /* |
| * We should never get sampling rate outside the valid range |
| * (between 0.0 and 1.0), because those cases should be covered by |
| * the previous branch that sets ANALYZE_SAMPLE_OFF. |
| */ |
| Assert(sample_frac >= 0.0 && sample_frac <= 1.0); |
| } |
| } |
| |
| /* |
| * For "auto" method, pick the one we believe is best. For servers with |
| * TABLESAMPLE support we pick BERNOULLI, for old servers we fall-back to |
| * random() to at least reduce network transfer. |
| */ |
| if (method == ANALYZE_SAMPLE_AUTO) |
| { |
| if (server_version_num < 95000) |
| method = ANALYZE_SAMPLE_RANDOM; |
| else |
| method = ANALYZE_SAMPLE_BERNOULLI; |
| } |
| |
| /* |
| * Construct cursor that retrieves whole rows from remote. |
| */ |
| cursor_number = GetCursorNumber(conn); |
| initStringInfo(&sql); |
| appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number); |
| |
| deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs); |
| |
| /* In what follows, do not risk leaking any PGresults. */ |
| PG_TRY(); |
| { |
| char fetch_sql[64]; |
| int fetch_size; |
| |
| res = pgfdw_exec_query(conn, sql.data, NULL); |
| if (PQresultStatus(res) != PGRES_COMMAND_OK) |
| pgfdw_report_error(ERROR, res, conn, false, sql.data); |
| PQclear(res); |
| res = NULL; |
| |
| /* |
| * Determine the fetch size. The default is arbitrary, but shouldn't |
| * be enormous. |
| */ |
| fetch_size = 100; |
| foreach(lc, server->options) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (strcmp(def->defname, "fetch_size") == 0) |
| { |
| (void) parse_int(defGetString(def), &fetch_size, 0, NULL); |
| break; |
| } |
| } |
| foreach(lc, table->options) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (strcmp(def->defname, "fetch_size") == 0) |
| { |
| (void) parse_int(defGetString(def), &fetch_size, 0, NULL); |
| break; |
| } |
| } |
| |
| /* Construct command to fetch rows from remote. */ |
| snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", |
| fetch_size, cursor_number); |
| |
| /* Retrieve and process rows a batch at a time. */ |
| for (;;) |
| { |
| int numrows; |
| int i; |
| |
| /* Allow users to cancel long query */ |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* |
| * XXX possible future improvement: if rowstoskip is large, we |
| * could issue a MOVE rather than physically fetching the rows, |
| * then just adjust rowstoskip and samplerows appropriately. |
| */ |
| |
| /* Fetch some rows */ |
| res = pgfdw_exec_query(conn, fetch_sql, NULL); |
| /* On error, report the original query, not the FETCH. */ |
| if (PQresultStatus(res) != PGRES_TUPLES_OK) |
| pgfdw_report_error(ERROR, res, conn, false, sql.data); |
| |
| /* Process whatever we got. */ |
| numrows = PQntuples(res); |
| for (i = 0; i < numrows; i++) |
| analyze_row_processor(res, i, &astate); |
| |
| PQclear(res); |
| res = NULL; |
| |
| /* Must be EOF if we didn't get all the rows requested. */ |
| if (numrows < fetch_size) |
| break; |
| } |
| |
| /* Close the cursor, just to be tidy. */ |
| close_cursor(conn, cursor_number, NULL); |
| } |
| PG_CATCH(); |
| { |
| PQclear(res); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| ReleaseConnection(conn); |
| |
| /* We assume that we have no dead tuple. */ |
| *totaldeadrows = 0.0; |
| |
| /* |
| * Without sampling, we've retrieved all living tuples from foreign |
| * server, so report that as totalrows. Otherwise use the reltuples |
| * estimate we got from the remote side. |
| */ |
| if (method == ANALYZE_SAMPLE_OFF) |
| *totalrows = astate.samplerows; |
| else |
| *totalrows = reltuples; |
| |
| /* |
| * Emit some interesting relation info |
| */ |
| ereport(elevel, |
| (errmsg("\"%s\": table contains %.0f rows, %d rows in sample", |
| RelationGetRelationName(relation), |
| *totalrows, astate.numrows))); |
| |
| return astate.numrows; |
| } |
| |
| /* |
| * Collect sample rows from the result of query. |
| * - Use all tuples in sample until target # of samples are collected. |
| * - Subsequently, replace already-sampled tuples randomly. |
| */ |
| static void |
| analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) |
| { |
| int targrows = astate->targrows; |
| int pos; /* array index to store tuple in */ |
| MemoryContext oldcontext; |
| |
| /* Always increment sample row counter. */ |
| astate->samplerows += 1; |
| |
| /* |
| * Determine the slot where this sample row should be stored. Set pos to |
| * negative value to indicate the row should be skipped. |
| */ |
| if (astate->numrows < targrows) |
| { |
| /* First targrows rows are always included into the sample */ |
| pos = astate->numrows++; |
| } |
| else |
| { |
| /* |
| * Now we start replacing tuples in the sample until we reach the end |
| * of the relation. Same algorithm as in acquire_sample_rows in |
| * analyze.c; see Jeff Vitter's paper. |
| */ |
| if (astate->rowstoskip < 0) |
| astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows); |
| |
| if (astate->rowstoskip <= 0) |
| { |
| /* Choose a random reservoir element to replace. */ |
| pos = (int) (targrows * sampler_random_fract(&astate->rstate.randstate)); |
| Assert(pos >= 0 && pos < targrows); |
| heap_freetuple(astate->rows[pos]); |
| } |
| else |
| { |
| /* Skip this tuple. */ |
| pos = -1; |
| } |
| |
| astate->rowstoskip -= 1; |
| } |
| |
| if (pos >= 0) |
| { |
| /* |
| * Create sample tuple from current result row, and store it in the |
| * position determined above. The tuple has to be created in anl_cxt. |
| */ |
| oldcontext = MemoryContextSwitchTo(astate->anl_cxt); |
| |
| astate->rows[pos] = make_tuple_from_result_row(res, row, |
| astate->rel, |
| astate->attinmeta, |
| astate->retrieved_attrs, |
| NULL, |
| astate->temp_cxt); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| } |
| |
| /* |
| * Import a foreign schema |
| */ |
| static List * |
| postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) |
| { |
| List *commands = NIL; |
| bool import_collate = true; |
| bool import_default = false; |
| bool import_generated = true; |
| bool import_not_null = true; |
| ForeignServer *server; |
| UserMapping *mapping; |
| PGconn *conn; |
| StringInfoData buf; |
| PGresult *volatile res = NULL; |
| int numrows, |
| i; |
| ListCell *lc; |
| |
| /* Parse statement options */ |
| foreach(lc, stmt->options) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (strcmp(def->defname, "import_collate") == 0) |
| import_collate = defGetBoolean(def); |
| else if (strcmp(def->defname, "import_default") == 0) |
| import_default = defGetBoolean(def); |
| else if (strcmp(def->defname, "import_generated") == 0) |
| import_generated = defGetBoolean(def); |
| else if (strcmp(def->defname, "import_not_null") == 0) |
| import_not_null = defGetBoolean(def); |
| else |
| ereport(ERROR, |
| (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), |
| errmsg("invalid option \"%s\"", def->defname))); |
| } |
| |
| /* |
| * Get connection to the foreign server. Connection manager will |
| * establish new connection if necessary. |
| */ |
| server = GetForeignServer(serverOid); |
| mapping = GetUserMapping(GetUserId(), server->serverid); |
| conn = GetConnection(mapping, false, NULL); |
| |
| /* Don't attempt to import collation if remote server hasn't got it */ |
| if (PQserverVersion(conn) < 90100) |
| import_collate = false; |
| |
| /* Create workspace for strings */ |
| initStringInfo(&buf); |
| |
| /* In what follows, do not risk leaking any PGresults. */ |
| PG_TRY(); |
| { |
| /* Check that the schema really exists */ |
| appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); |
| deparseStringLiteral(&buf, stmt->remote_schema); |
| |
| res = pgfdw_exec_query(conn, buf.data, NULL); |
| if (PQresultStatus(res) != PGRES_TUPLES_OK) |
| pgfdw_report_error(ERROR, res, conn, false, buf.data); |
| |
| if (PQntuples(res) != 1) |
| ereport(ERROR, |
| (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), |
| errmsg("schema \"%s\" is not present on foreign server \"%s\"", |
| stmt->remote_schema, server->servername))); |
| |
| PQclear(res); |
| res = NULL; |
| resetStringInfo(&buf); |
| |
| /* |
| * Fetch all table data from this schema, possibly restricted by |
| * EXCEPT or LIMIT TO. (We don't actually need to pay any attention |
| * to EXCEPT/LIMIT TO here, because the core code will filter the |
| * statements we return according to those lists anyway. But it |
| * should save a few cycles to not process excluded tables in the |
| * first place.) |
| * |
| * Import table data for partitions only when they are explicitly |
| * specified in LIMIT TO clause. Otherwise ignore them and only |
| * include the definitions of the root partitioned tables to allow |
| * access to the complete remote data set locally in the schema |
| * imported. |
| * |
| * Note: because we run the connection with search_path restricted to |
| * pg_catalog, the format_type() and pg_get_expr() outputs will always |
| * include a schema name for types/functions in other schemas, which |
| * is what we want. |
| */ |
| appendStringInfoString(&buf, |
| "SELECT relname, " |
| " attname, " |
| " format_type(atttypid, atttypmod), " |
| <<<<<<< HEAD |
| " attnotnull, "); |
| ======= |
| " attnotnull, " |
| " pg_get_expr(adbin, adrelid), "); |
| >>>>>>> REL_16_9 |
| |
| /* Generated columns are supported since Postgres 12 */ |
| if (PQserverVersion(conn) >= 120000) |
| appendStringInfoString(&buf, |
| <<<<<<< HEAD |
| " attgenerated, " |
| " pg_get_expr(adbin, adrelid), "); |
| else |
| appendStringInfoString(&buf, |
| " NULL, " |
| " pg_get_expr(adbin, adrelid), "); |
| ======= |
| " attgenerated, "); |
| else |
| appendStringInfoString(&buf, |
| " NULL, "); |
| >>>>>>> REL_16_9 |
| |
| if (import_collate) |
| appendStringInfoString(&buf, |
| " collname, " |
| " collnsp.nspname "); |
| else |
| appendStringInfoString(&buf, |
| " NULL, NULL "); |
| |
| appendStringInfoString(&buf, |
| "FROM pg_class c " |
| " JOIN pg_namespace n ON " |
| " relnamespace = n.oid " |
| " LEFT JOIN pg_attribute a ON " |
| " attrelid = c.oid AND attnum > 0 " |
| " AND NOT attisdropped " |
| " LEFT JOIN pg_attrdef ad ON " |
| " adrelid = c.oid AND adnum = attnum "); |
| |
| if (import_collate) |
| appendStringInfoString(&buf, |
| " LEFT JOIN pg_collation coll ON " |
| " coll.oid = attcollation " |
| " LEFT JOIN pg_namespace collnsp ON " |
| " collnsp.oid = collnamespace "); |
| <<<<<<< HEAD |
| else |
| appendStringInfoString(&buf, |
| " NULL, NULL " |
| "FROM pg_class c " |
| " JOIN pg_namespace n ON " |
| " relnamespace = n.oid " |
| " LEFT JOIN pg_attribute a ON " |
| " attrelid = c.oid AND attnum > 0 " |
| " AND NOT attisdropped " |
| " LEFT JOIN pg_attrdef ad ON " |
| " adrelid = c.oid AND adnum = attnum "); |
| ======= |
| >>>>>>> REL_16_9 |
| |
| appendStringInfoString(&buf, |
| "WHERE c.relkind IN (" |
| CppAsString2(RELKIND_RELATION) "," |
| CppAsString2(RELKIND_VIEW) "," |
| CppAsString2(RELKIND_FOREIGN_TABLE) "," |
| CppAsString2(RELKIND_MATVIEW) "," |
| CppAsString2(RELKIND_DIRECTORY_TABLE) "," |
| CppAsString2(RELKIND_PARTITIONED_TABLE) ") " |
| " AND n.nspname = "); |
| deparseStringLiteral(&buf, stmt->remote_schema); |
| |
| /* Partitions are supported since Postgres 10 */ |
| if (PQserverVersion(conn) >= 100000 && |
| stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO) |
| appendStringInfoString(&buf, " AND NOT c.relispartition "); |
| |
| /* Apply restrictions for LIMIT TO and EXCEPT */ |
| if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO || |
| stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) |
| { |
| bool first_item = true; |
| |
| appendStringInfoString(&buf, " AND c.relname "); |
| if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) |
| appendStringInfoString(&buf, "NOT "); |
| appendStringInfoString(&buf, "IN ("); |
| |
| /* Append list of table names within IN clause */ |
| foreach(lc, stmt->table_list) |
| { |
| RangeVar *rv = (RangeVar *) lfirst(lc); |
| |
| if (first_item) |
| first_item = false; |
| else |
| appendStringInfoString(&buf, ", "); |
| deparseStringLiteral(&buf, rv->relname); |
| } |
| appendStringInfoChar(&buf, ')'); |
| } |
| |
| /* Append ORDER BY at the end of query to ensure output ordering */ |
| appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); |
| |
| /* Fetch the data */ |
| res = pgfdw_exec_query(conn, buf.data, NULL); |
| if (PQresultStatus(res) != PGRES_TUPLES_OK) |
| pgfdw_report_error(ERROR, res, conn, false, buf.data); |
| |
| /* Process results */ |
| numrows = PQntuples(res); |
| /* note: incrementation of i happens in inner loop's while() test */ |
| for (i = 0; i < numrows;) |
| { |
| char *tablename = PQgetvalue(res, i, 0); |
| bool first_item = true; |
| |
| resetStringInfo(&buf); |
| appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n", |
| quote_identifier(tablename)); |
| |
| /* Scan all rows for this table */ |
| do |
| { |
| char *attname; |
| char *typename; |
| char *attnotnull; |
| char *attgenerated; |
| char *attdefault; |
| char *collname; |
| char *collnamespace; |
| |
| /* If table has no columns, we'll see nulls here */ |
| if (PQgetisnull(res, i, 1)) |
| continue; |
| |
| attname = PQgetvalue(res, i, 1); |
| typename = PQgetvalue(res, i, 2); |
| attnotnull = PQgetvalue(res, i, 3); |
| attgenerated = PQgetisnull(res, i, 4) ? (char *) NULL : |
| PQgetvalue(res, i, 4); |
| <<<<<<< HEAD |
| attdefault = PQgetisnull(res, i, 5) ? (char *) NULL : |
| ======= |
| attgenerated = PQgetisnull(res, i, 5) ? (char *) NULL : |
| >>>>>>> REL_16_9 |
| PQgetvalue(res, i, 5); |
| collname = PQgetisnull(res, i, 6) ? (char *) NULL : |
| PQgetvalue(res, i, 6); |
| collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL : |
| PQgetvalue(res, i, 7); |
| |
| if (first_item) |
| first_item = false; |
| else |
| appendStringInfoString(&buf, ",\n"); |
| |
| /* Print column name and type */ |
| appendStringInfo(&buf, " %s %s", |
| quote_identifier(attname), |
| typename); |
| |
| /* |
| * Add column_name option so that renaming the foreign table's |
| * column doesn't break the association to the underlying |
| * column. |
| */ |
| appendStringInfoString(&buf, " OPTIONS (column_name "); |
| deparseStringLiteral(&buf, attname); |
| appendStringInfoChar(&buf, ')'); |
| |
| /* Add COLLATE if needed */ |
| if (import_collate && collname != NULL && collnamespace != NULL) |
| appendStringInfo(&buf, " COLLATE %s.%s", |
| quote_identifier(collnamespace), |
| quote_identifier(collname)); |
| |
| /* Add DEFAULT if needed */ |
| if (import_default && attdefault != NULL && |
| (!attgenerated || !attgenerated[0])) |
| appendStringInfo(&buf, " DEFAULT %s", attdefault); |
| |
| /* Add GENERATED if needed */ |
| if (import_generated && attgenerated != NULL && |
| attgenerated[0] == ATTRIBUTE_GENERATED_STORED) |
| { |
| Assert(attdefault != NULL); |
| appendStringInfo(&buf, |
| " GENERATED ALWAYS AS (%s) STORED", |
| attdefault); |
| } |
| |
| /* Add NOT NULL if needed */ |
| if (import_not_null && attnotnull[0] == 't') |
| appendStringInfoString(&buf, " NOT NULL"); |
| } |
| while (++i < numrows && |
| strcmp(PQgetvalue(res, i, 0), tablename) == 0); |
| |
| /* |
| * Add server name and table-level options. We specify remote |
| * schema and table name as options (the latter to ensure that |
| * renaming the foreign table doesn't break the association). |
| */ |
| appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (", |
| quote_identifier(server->servername)); |
| |
| appendStringInfoString(&buf, "schema_name "); |
| deparseStringLiteral(&buf, stmt->remote_schema); |
| appendStringInfoString(&buf, ", table_name "); |
| deparseStringLiteral(&buf, tablename); |
| |
| appendStringInfoString(&buf, ");"); |
| |
| commands = lappend(commands, pstrdup(buf.data)); |
| } |
| } |
| PG_FINALLY(); |
| { |
| PQclear(res); |
| } |
| PG_END_TRY(); |
| |
| ReleaseConnection(conn); |
| |
| return commands; |
| } |
| |
| /* |
| * Assess whether the join between inner and outer relations can be pushed down |
| * to the foreign server. As a side effect, save information we obtain in this |
| * function to PgFdwRelationInfo passed in. |
| */ |
| static bool |
| foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, |
| RelOptInfo *outerrel, RelOptInfo *innerrel, |
| JoinPathExtraData *extra) |
| { |
| PgFdwRelationInfo *fpinfo; |
| PgFdwRelationInfo *fpinfo_o; |
| PgFdwRelationInfo *fpinfo_i; |
| ListCell *lc; |
| List *joinclauses; |
| |
| /* |
| * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins. |
| * Constructing queries representing SEMI and ANTI joins is hard, hence |
| * not considered right now. |
| */ |
| if (jointype != JOIN_INNER && jointype != JOIN_LEFT && |
| jointype != JOIN_RIGHT && jointype != JOIN_FULL) |
| return false; |
| |
| /* |
| * If either of the joining relations is marked as unsafe to pushdown, the |
| * join can not be pushed down. |
| */ |
| fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private; |
| fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private; |
| fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private; |
| if (!fpinfo_o || !fpinfo_o->pushdown_safe || |
| !fpinfo_i || !fpinfo_i->pushdown_safe) |
| return false; |
| |
| /* |
| * If joining relations have local conditions, those conditions are |
| * required to be applied before joining the relations. Hence the join can |
| * not be pushed down. |
| */ |
| if (fpinfo_o->local_conds || fpinfo_i->local_conds) |
| return false; |
| |
| /* |
| * Merge FDW options. We might be tempted to do this after we have deemed |
| * the foreign join to be OK. But we must do this beforehand so that we |
| * know which quals can be evaluated on the foreign server, which might |
| * depend on shippable_extensions. |
| */ |
| fpinfo->server = fpinfo_o->server; |
| merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i); |
| |
| /* |
| * Separate restrict list into join quals and pushed-down (other) quals. |
| * |
| * Join quals belonging to an outer join must all be shippable, else we |
| * cannot execute the join remotely. Add such quals to 'joinclauses'. |
| * |
| * Add other quals to fpinfo->remote_conds if they are shippable, else to |
| * fpinfo->local_conds. In an inner join it's okay to execute conditions |
| * either locally or remotely; the same is true for pushed-down conditions |
| * at an outer join. |
| * |
| * Note we might return failure after having already scribbled on |
| * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we |
| * won't consult those lists again if we deem the join unshippable. |
| */ |
| joinclauses = NIL; |
| foreach(lc, extra->restrictlist) |
| { |
| RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); |
| bool is_remote_clause = is_foreign_expr(root, joinrel, |
| rinfo->clause); |
| |
| if (IS_OUTER_JOIN(jointype) && |
| !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids)) |
| { |
| if (!is_remote_clause) |
| return false; |
| joinclauses = lappend(joinclauses, rinfo); |
| } |
| else |
| { |
| if (is_remote_clause) |
| fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); |
| else |
| fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); |
| } |
| } |
| |
| /* |
| * deparseExplicitTargetList() isn't smart enough to handle anything other |
| * than a Var. In particular, if there's some PlaceHolderVar that would |
| * need to be evaluated within this join tree (because there's an upper |
| * reference to a quantity that may go to NULL as a result of an outer |
| * join), then we can't try to push the join down because we'll fail when |
| * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that |
| * needs to be evaluated *at the top* of this join tree is OK, because we |
| * can do that locally after fetching the results from the remote side. |
| */ |
| foreach(lc, root->placeholder_list) |
| { |
| PlaceHolderInfo *phinfo = lfirst(lc); |
| Relids relids; |
| |
| /* PlaceHolderInfo refers to parent relids, not child relids. */ |
| relids = IS_OTHER_REL(joinrel) ? |
| joinrel->top_parent_relids : joinrel->relids; |
| |
| if (bms_is_subset(phinfo->ph_eval_at, relids) && |
| bms_nonempty_difference(relids, phinfo->ph_eval_at)) |
| return false; |
| } |
| |
| /* Save the join clauses, for later use. */ |
| fpinfo->joinclauses = joinclauses; |
| |
| fpinfo->outerrel = outerrel; |
| fpinfo->innerrel = innerrel; |
| fpinfo->jointype = jointype; |
| |
| /* |
| * By default, both the input relations are not required to be deparsed as |
| * subqueries, but there might be some relations covered by the input |
| * relations that are required to be deparsed as subqueries, so save the |
| * relids of those relations for later use by the deparser. |
| */ |
| fpinfo->make_outerrel_subquery = false; |
| fpinfo->make_innerrel_subquery = false; |
| Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids)); |
| Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids)); |
| fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels, |
| fpinfo_i->lower_subquery_rels); |
| |
| /* |
| * Pull the other remote conditions from the joining relations into join |
| * clauses or other remote clauses (remote_conds) of this relation |
| * wherever possible. This avoids building subqueries at every join step. |
| * |
| * For an inner join, clauses from both the relations are added to the |
| * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from |
| * the outer side are added to remote_conds since those can be evaluated |
| * after the join is evaluated. The clauses from inner side are added to |
| * the joinclauses, since they need to be evaluated while constructing the |
| * join. |
| * |
| * For a FULL OUTER JOIN, the other clauses from either relation can not |
| * be added to the joinclauses or remote_conds, since each relation acts |
| * as an outer relation for the other. |
| * |
| * The joining sides can not have local conditions, thus no need to test |
| * shippability of the clauses being pulled up. |
| */ |
| switch (jointype) |
| { |
| case JOIN_INNER: |
| fpinfo->remote_conds = list_concat(fpinfo->remote_conds, |
| fpinfo_i->remote_conds); |
| fpinfo->remote_conds = list_concat(fpinfo->remote_conds, |
| fpinfo_o->remote_conds); |
| break; |
| |
| case JOIN_LEFT: |
| fpinfo->joinclauses = list_concat(fpinfo->joinclauses, |
| fpinfo_i->remote_conds); |
| fpinfo->remote_conds = list_concat(fpinfo->remote_conds, |
| fpinfo_o->remote_conds); |
| break; |
| |
| case JOIN_RIGHT: |
| fpinfo->joinclauses = list_concat(fpinfo->joinclauses, |
| fpinfo_o->remote_conds); |
| fpinfo->remote_conds = list_concat(fpinfo->remote_conds, |
| fpinfo_i->remote_conds); |
| break; |
| |
| case JOIN_FULL: |
| |
| /* |
| * In this case, if any of the input relations has conditions, we |
| * need to deparse that relation as a subquery so that the |
| * conditions can be evaluated before the join. Remember it in |
| * the fpinfo of this relation so that the deparser can take |
| * appropriate action. Also, save the relids of base relations |
| * covered by that relation for later use by the deparser. |
| */ |
| if (fpinfo_o->remote_conds) |
| { |
| fpinfo->make_outerrel_subquery = true; |
| fpinfo->lower_subquery_rels = |
| bms_add_members(fpinfo->lower_subquery_rels, |
| outerrel->relids); |
| } |
| if (fpinfo_i->remote_conds) |
| { |
| fpinfo->make_innerrel_subquery = true; |
| fpinfo->lower_subquery_rels = |
| bms_add_members(fpinfo->lower_subquery_rels, |
| innerrel->relids); |
| } |
| break; |
| |
| default: |
| /* Should not happen, we have just checked this above */ |
| elog(ERROR, "unsupported join type %d", jointype); |
| } |
| |
| /* |
| * For an inner join, all restrictions can be treated alike. Treating the |
| * pushed down conditions as join conditions allows a top level full outer |
| * join to be deparsed without requiring subqueries. |
| */ |
| if (jointype == JOIN_INNER) |
| { |
| Assert(!fpinfo->joinclauses); |
| fpinfo->joinclauses = fpinfo->remote_conds; |
| fpinfo->remote_conds = NIL; |
| } |
| |
| /* Mark that this join can be pushed down safely */ |
| fpinfo->pushdown_safe = true; |
| |
| /* Get user mapping */ |
| if (fpinfo->use_remote_estimate) |
| { |
| if (fpinfo_o->use_remote_estimate) |
| fpinfo->user = fpinfo_o->user; |
| else |
| fpinfo->user = fpinfo_i->user; |
| } |
| else |
| fpinfo->user = NULL; |
| |
| /* |
| * Set # of retrieved rows and cached relation costs to some negative |
| * value, so that we can detect when they are set to some sensible values, |
| * during one (usually the first) of the calls to estimate_path_cost_size. |
| */ |
| fpinfo->retrieved_rows = -1; |
| fpinfo->rel_startup_cost = -1; |
| fpinfo->rel_total_cost = -1; |
| |
| /* |
| * Set the string describing this join relation to be used in EXPLAIN |
| * output of corresponding ForeignScan. Note that the decoration we add |
| * to the base relation names mustn't include any digits, or it'll confuse |
| * postgresExplainForeignScan. |
| */ |
| fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)", |
| fpinfo_o->relation_name, |
| get_jointype_name(fpinfo->jointype), |
| fpinfo_i->relation_name); |
| |
| /* |
| * Set the relation index. This is defined as the position of this |
| * joinrel in the join_rel_list list plus the length of the rtable list. |
| * Note that since this joinrel is at the end of the join_rel_list list |
| * when we are called, we can get the position by list_length. |
| */ |
| Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */ |
| fpinfo->relation_index = |
| list_length(root->parse->rtable) + list_length(root->join_rel_list); |
| |
| return true; |
| } |
| |
| static void |
| add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, |
| Path *epq_path) |
| { |
| List *useful_pathkeys_list = NIL; /* List of all pathkeys */ |
| ListCell *lc; |
| |
| useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel); |
| |
| /* |
| * Before creating sorted paths, arrange for the passed-in EPQ path, if |
| * any, to return columns needed by the parent ForeignScan node so that |
| * they will propagate up through Sort nodes injected below, if necessary. |
| */ |
| if (epq_path != NULL && useful_pathkeys_list != NIL) |
| { |
| PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; |
| PathTarget *target = copy_pathtarget(epq_path->pathtarget); |
| |
| /* Include columns required for evaluating PHVs in the tlist. */ |
| add_new_columns_to_pathtarget(target, |
| pull_var_clause((Node *) target->exprs, |
| PVC_RECURSE_PLACEHOLDERS)); |
| |
| /* Include columns required for evaluating the local conditions. */ |
| foreach(lc, fpinfo->local_conds) |
| { |
| RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); |
| |
| add_new_columns_to_pathtarget(target, |
| pull_var_clause((Node *) rinfo->clause, |
| PVC_RECURSE_PLACEHOLDERS)); |
| } |
| |
| /* |
| * If we have added any new columns, adjust the tlist of the EPQ path. |
| * |
| * Note: the plan created using this path will only be used to execute |
| * EPQ checks, where accuracy of the plan cost and width estimates |
| * would not be important, so we do not do set_pathtarget_cost_width() |
| * for the new pathtarget here. See also postgresGetForeignPlan(). |
| */ |
| if (list_length(target->exprs) > list_length(epq_path->pathtarget->exprs)) |
| { |
| /* The EPQ path is a join path, so it is projection-capable. */ |
| Assert(is_projection_capable_path(epq_path)); |
| |
| /* |
| * Use create_projection_path() here, so as to avoid modifying it |
| * in place. |
| */ |
| epq_path = (Path *) create_projection_path(root, |
| rel, |
| epq_path, |
| target); |
| } |
| } |
| |
| /* Create one path for each set of pathkeys we found above. */ |
| foreach(lc, useful_pathkeys_list) |
| { |
| double rows; |
| int width; |
| Cost startup_cost; |
| Cost total_cost; |
| List *useful_pathkeys = lfirst(lc); |
| Path *sorted_epq_path; |
| |
| estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL, |
| &rows, &width, &startup_cost, &total_cost); |
| |
| /* |
| * The EPQ path must be at least as well sorted as the path itself, in |
| * case it gets used as input to a mergejoin. |
| */ |
| sorted_epq_path = epq_path; |
| if (sorted_epq_path != NULL && |
| !pathkeys_contained_in(useful_pathkeys, |
| sorted_epq_path->pathkeys)) |
| sorted_epq_path = (Path *) |
| create_sort_path(root, |
| rel, |
| sorted_epq_path, |
| useful_pathkeys, |
| -1.0); |
| |
| if (IS_SIMPLE_REL(rel)) |
| add_path(rel, (Path *) |
| create_foreignscan_path(root, rel, |
| NULL, |
| rows, |
| startup_cost, |
| total_cost, |
| useful_pathkeys, |
| rel->lateral_relids, |
| sorted_epq_path, |
| NIL), |
| root); |
| else |
| add_path(rel, (Path *) |
| create_foreign_join_path(root, rel, |
| NULL, |
| rows, |
| startup_cost, |
| total_cost, |
| useful_pathkeys, |
| rel->lateral_relids, |
| sorted_epq_path, |
| NIL), |
| root); |
| } |
| } |
| |
| /* |
| * Parse options from foreign server and apply them to fpinfo. |
| * |
| * New options might also require tweaking merge_fdw_options(). |
| */ |
| static void |
| apply_server_options(PgFdwRelationInfo *fpinfo) |
| { |
| ListCell *lc; |
| |
| foreach(lc, fpinfo->server->options) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (strcmp(def->defname, "use_remote_estimate") == 0) |
| { |
| /* |
| * GPDB_13_MERGE_FIXME: For updable statement, different forked Backends by Master |
| * (QD and entrydb instances) |
| * will hold Exclusive lock on the same table, which causes lock hang issue. |
| * For Postgres, there is only one backend, and connnections have been shared, |
| * so the issue doesn't exist. |
| * |
| * For example, following query will hang: |
| * SELECT * |
| * FROM ft1, ft2, ft4, ft5, local_tbl |
| * WHERE ft1.c1 = ft2.c1 AND |
| * ft1.c2 = ft4.c1 AND |
| * ft1.c2 = ft5.c1 AND |
| * ft1.c2 = local_tbl.c1 AND |
| * ft1.c1 < 100 AND |
| * ft2.c1 < 100 FOR UPDATE; |
| */ |
| elog(WARNING, "fdw option 'use_remote_estimate' is not supported."); |
| fpinfo->use_remote_estimate = false; |
| } |
| else if (strcmp(def->defname, "fdw_startup_cost") == 0) |
| (void) parse_real(defGetString(def), &fpinfo->fdw_startup_cost, 0, |
| NULL); |
| else if (strcmp(def->defname, "fdw_tuple_cost") == 0) |
| (void) parse_real(defGetString(def), &fpinfo->fdw_tuple_cost, 0, |
| NULL); |
| else if (strcmp(def->defname, "extensions") == 0) |
| fpinfo->shippable_extensions = |
| ExtractExtensionList(defGetString(def), false); |
| else if (strcmp(def->defname, "fetch_size") == 0) |
| (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); |
| else if (strcmp(def->defname, "async_capable") == 0) |
| fpinfo->async_capable = defGetBoolean(def); |
| } |
| } |
| |
| /* |
| * Parse options from foreign table and apply them to fpinfo. |
| * |
| * New options might also require tweaking merge_fdw_options(). |
| */ |
| static void |
| apply_table_options(PgFdwRelationInfo *fpinfo) |
| { |
| ListCell *lc; |
| |
| foreach(lc, fpinfo->table->options) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (strcmp(def->defname, "use_remote_estimate") == 0) |
| { |
| /* |
| * GPDB_13_MERGE_FIXME: For updable statement, different forked Backends by Master |
| * (QD and entrydb instances) |
| * will hold Exclusive lock on the same table, which causes lock hang issue. |
| * For Postgres, there is only one backend, and connnections have been shared, |
| * so the issue doesn't exist. |
| * |
| * For example, following query will hang: |
| * SELECT * |
| * FROM ft1, ft2, ft4, ft5, local_tbl |
| * WHERE ft1.c1 = ft2.c1 AND |
| * ft1.c2 = ft4.c1 AND |
| * ft1.c2 = ft5.c1 AND |
| * ft1.c2 = local_tbl.c1 AND |
| * ft1.c1 < 100 AND |
| * ft2.c1 < 100 FOR UPDATE; |
| */ |
| elog(WARNING, "fdw option 'use_remote_estimate' is not supported."); |
| fpinfo->use_remote_estimate = false; |
| } |
| else if (strcmp(def->defname, "fetch_size") == 0) |
| (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); |
| else if (strcmp(def->defname, "async_capable") == 0) |
| fpinfo->async_capable = defGetBoolean(def); |
| } |
| } |
| |
| /* |
| * Merge FDW options from input relations into a new set of options for a join |
| * or an upper rel. |
| * |
| * For a join relation, FDW-specific information about the inner and outer |
| * relations is provided using fpinfo_i and fpinfo_o. For an upper relation, |
| * fpinfo_o provides the information for the input relation; fpinfo_i is |
| * expected to NULL. |
| */ |
| static void |
| merge_fdw_options(PgFdwRelationInfo *fpinfo, |
| const PgFdwRelationInfo *fpinfo_o, |
| const PgFdwRelationInfo *fpinfo_i) |
| { |
| /* We must always have fpinfo_o. */ |
| Assert(fpinfo_o); |
| |
| /* fpinfo_i may be NULL, but if present the servers must both match. */ |
| Assert(!fpinfo_i || |
| fpinfo_i->server->serverid == fpinfo_o->server->serverid); |
| |
| /* |
| * Copy the server specific FDW options. (For a join, both relations come |
| * from the same server, so the server options should have the same value |
| * for both relations.) |
| */ |
| fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost; |
| fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost; |
| fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; |
| fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; |
| fpinfo->fetch_size = fpinfo_o->fetch_size; |
| fpinfo->async_capable = fpinfo_o->async_capable; |
| |
| /* Merge the table level options from either side of the join. */ |
| if (fpinfo_i) |
| { |
| /* |
| * We'll prefer to use remote estimates for this join if any table |
| * from either side of the join is using remote estimates. This is |
| * most likely going to be preferred since they're already willing to |
| * pay the price of a round trip to get the remote EXPLAIN. In any |
| * case it's not entirely clear how we might otherwise handle this |
| * best. |
| */ |
| fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate || |
| fpinfo_i->use_remote_estimate; |
| |
| /* |
| * Set fetch size to maximum of the joining sides, since we are |
| * expecting the rows returned by the join to be proportional to the |
| * relation sizes. |
| */ |
| fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); |
| |
| /* |
| * We'll prefer to consider this join async-capable if any table from |
| * either side of the join is considered async-capable. This would be |
| * reasonable because in that case the foreign server would have its |
| * own resources to scan that table asynchronously, and the join could |
| * also be computed asynchronously using the resources. |
| */ |
| fpinfo->async_capable = fpinfo_o->async_capable || |
| fpinfo_i->async_capable; |
| } |
| } |
| |
| /* |
| * postgresGetForeignJoinPaths |
| * Add possible ForeignPath to joinrel, if join is safe to push down. |
| */ |
| static void |
| postgresGetForeignJoinPaths(PlannerInfo *root, |
| RelOptInfo *joinrel, |
| RelOptInfo *outerrel, |
| RelOptInfo *innerrel, |
| JoinType jointype, |
| JoinPathExtraData *extra) |
| { |
| PgFdwRelationInfo *fpinfo; |
| ForeignPath *joinpath; |
| double rows; |
| int width; |
| Cost startup_cost; |
| Cost total_cost; |
| Path *epq_path; /* Path to create plan to be executed when |
| * EvalPlanQual gets triggered. */ |
| |
| /* |
| * Skip if this join combination has been considered already. |
| */ |
| if (joinrel->fdw_private) |
| return; |
| |
| /* |
| * This code does not work for joins with lateral references, since those |
| * must have parameterized paths, which we don't generate yet. |
| */ |
| if (!bms_is_empty(joinrel->lateral_relids)) |
| return; |
| |
| /* |
| * Create unfinished PgFdwRelationInfo entry which is used to indicate |
| * that the join relation is already considered, so that we won't waste |
| * time in judging safety of join pushdown and adding the same paths again |
| * if found safe. Once we know that this join can be pushed down, we fill |
| * the entry. |
| */ |
| fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); |
| fpinfo->pushdown_safe = false; |
| joinrel->fdw_private = fpinfo; |
| /* attrs_used is only for base relations. */ |
| fpinfo->attrs_used = NULL; |
| |
| /* |
| * If there is a possibility that EvalPlanQual will be executed, we need |
| * to be able to reconstruct the row using scans of the base relations. |
| * GetExistingLocalJoinPath will find a suitable path for this purpose in |
| * the path list of the joinrel, if one exists. We must be careful to |
| * call it before adding any ForeignPath, since the ForeignPath might |
| * dominate the only suitable local path available. We also do it before |
| * calling foreign_join_ok(), since that function updates fpinfo and marks |
| * it as pushable if the join is found to be pushable. |
| */ |
| if (root->parse->commandType == CMD_DELETE || |
| root->parse->commandType == CMD_UPDATE || |
| root->rowMarks) |
| { |
| epq_path = GetExistingLocalJoinPath(joinrel); |
| if (!epq_path) |
| { |
| elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found"); |
| return; |
| } |
| } |
| else |
| epq_path = NULL; |
| |
| if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra)) |
| { |
| /* Free path required for EPQ if we copied one; we don't need it now */ |
| if (epq_path) |
| pfree(epq_path); |
| return; |
| } |
| |
| /* |
| * Compute the selectivity and cost of the local_conds, so we don't have |
| * to do it over again for each path. The best we can do for these |
| * conditions is to estimate selectivity on the basis of local statistics. |
| * The local conditions are applied after the join has been computed on |
| * the remote side like quals in WHERE clause, so pass jointype as |
| * JOIN_INNER. |
| */ |
| fpinfo->local_conds_sel = clauselist_selectivity(root, |
| fpinfo->local_conds, |
| 0, |
| JOIN_INNER, |
| NULL, |
| false); |
| cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); |
| |
| /* |
| * If we are going to estimate costs locally, estimate the join clause |
| * selectivity here while we have special join info. |
| */ |
| if (!fpinfo->use_remote_estimate) |
| fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses, |
| 0, fpinfo->jointype, |
| extra->sjinfo, |
| false); |
| |
| /* Estimate costs for bare join relation */ |
| estimate_path_cost_size(root, joinrel, NIL, NIL, NULL, |
| &rows, &width, &startup_cost, &total_cost); |
| /* Now update this information in the joinrel */ |
| joinrel->rows = rows; |
| joinrel->reltarget->width = width; |
| fpinfo->rows = rows; |
| fpinfo->width = width; |
| fpinfo->startup_cost = startup_cost; |
| fpinfo->total_cost = total_cost; |
| |
| /* |
| * Create a new join path and add it to the joinrel which represents a |
| * join between foreign tables. |
| */ |
| joinpath = create_foreign_join_path(root, |
| joinrel, |
| NULL, /* default pathtarget */ |
| rows, |
| startup_cost, |
| total_cost, |
| NIL, /* no pathkeys */ |
| joinrel->lateral_relids, |
| epq_path, |
| NIL); /* no fdw_private */ |
| |
| /* Add generated path into joinrel by add_path(). */ |
| add_path(joinrel, (Path *) joinpath, root); |
| |
| /* Consider pathkeys for the join relation */ |
| add_paths_with_pathkeys_for_rel(root, joinrel, epq_path); |
| |
| /* XXX Consider parameterized paths for the join relation */ |
| } |
| |
| /* |
| * Assess whether the aggregation, grouping and having operations can be pushed |
| * down to the foreign server. As a side effect, save information we obtain in |
| * this function to PgFdwRelationInfo of the input relation. |
| */ |
| static bool |
| foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, |
| Node *havingQual) |
| { |
| Query *query = root->parse; |
| PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private; |
| PathTarget *grouping_target = grouped_rel->reltarget; |
| PgFdwRelationInfo *ofpinfo; |
| ListCell *lc; |
| int i; |
| List *tlist = NIL; |
| |
| /* We currently don't support pushing Grouping Sets. */ |
| if (query->groupingSets) |
| return false; |
| |
| /* Get the fpinfo of the underlying scan relation. */ |
| ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; |
| |
| /* |
| * If underlying scan relation has any local conditions, those conditions |
| * are required to be applied before performing aggregation. Hence the |
| * aggregate cannot be pushed down. |
| */ |
| if (ofpinfo->local_conds) |
| return false; |
| |
| /* |
| * Examine grouping expressions, as well as other expressions we'd need to |
| * compute, and check whether they are safe to push down to the foreign |
| * server. All GROUP BY expressions will be part of the grouping target |
| * and thus there is no need to search for them separately. Add grouping |
| * expressions into target list which will be passed to foreign server. |
| * |
| * A tricky fine point is that we must not put any expression into the |
| * target list that is just a foreign param (that is, something that |
| * deparse.c would conclude has to be sent to the foreign server). If we |
| * do, the expression will also appear in the fdw_exprs list of the plan |
| * node, and setrefs.c will get confused and decide that the fdw_exprs |
| * entry is actually a reference to the fdw_scan_tlist entry, resulting in |
| * a broken plan. Somewhat oddly, it's OK if the expression contains such |
| * a node, as long as it's not at top level; then no match is possible. |
| */ |
| i = 0; |
| foreach(lc, grouping_target->exprs) |
| { |
| Expr *expr = (Expr *) lfirst(lc); |
| Index sgref = get_pathtarget_sortgroupref(grouping_target, i); |
| ListCell *l; |
| |
| /* |
| * Check whether this expression is part of GROUP BY clause. Note we |
| * check the whole GROUP BY clause not just processed_groupClause, |
| * because we will ship all of it, cf. appendGroupByClause. |
| */ |
| if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause)) |
| { |
| TargetEntry *tle; |
| |
| /* |
| * If any GROUP BY expression is not shippable, then we cannot |
| * push down aggregation to the foreign server. |
| */ |
| if (!is_foreign_expr(root, grouped_rel, expr)) |
| return false; |
| |
| /* |
| * If it would be a foreign param, we can't put it into the tlist, |
| * so we have to fail. |
| */ |
| if (is_foreign_param(root, grouped_rel, expr)) |
| return false; |
| |
| /* |
| * Pushable, so add to tlist. We need to create a TLE for this |
| * expression and apply the sortgroupref to it. We cannot use |
| * add_to_flat_tlist() here because that avoids making duplicate |
| * entries in the tlist. If there are duplicate entries with |
| * distinct sortgrouprefs, we have to duplicate that situation in |
| * the output tlist. |
| */ |
| tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false); |
| tle->ressortgroupref = sgref; |
| tlist = lappend(tlist, tle); |
| } |
| else |
| { |
| /* |
| * Non-grouping expression we need to compute. Can we ship it |
| * as-is to the foreign server? |
| */ |
| if (is_foreign_expr(root, grouped_rel, expr) && |
| !is_foreign_param(root, grouped_rel, expr)) |
| { |
| /* Yes, so add to tlist as-is; OK to suppress duplicates */ |
| tlist = add_to_flat_tlist(tlist, list_make1(expr)); |
| } |
| else |
| { |
| /* Not pushable as a whole; extract its Vars and aggregates */ |
| List *aggvars; |
| |
| aggvars = pull_var_clause((Node *) expr, |
| PVC_INCLUDE_AGGREGATES); |
| |
| /* |
| * If any aggregate expression is not shippable, then we |
| * cannot push down aggregation to the foreign server. (We |
| * don't have to check is_foreign_param, since that certainly |
| * won't return true for any such expression.) |
| */ |
| if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) |
| return false; |
| |
| /* |
| * Add aggregates, if any, into the targetlist. Plain Vars |
| * outside an aggregate can be ignored, because they should be |
| * either same as some GROUP BY column or part of some GROUP |
| * BY expression. In either case, they are already part of |
| * the targetlist and thus no need to add them again. In fact |
| * including plain Vars in the tlist when they do not match a |
| * GROUP BY column would cause the foreign server to complain |
| * that the shipped query is invalid. |
| */ |
| foreach(l, aggvars) |
| { |
| Expr *aggref = (Expr *) lfirst(l); |
| |
| if (IsA(aggref, Aggref)) |
| tlist = add_to_flat_tlist(tlist, list_make1(aggref)); |
| } |
| } |
| } |
| |
| i++; |
| } |
| |
| /* |
| * Classify the pushable and non-pushable HAVING clauses and save them in |
| * remote_conds and local_conds of the grouped rel's fpinfo. |
| */ |
| if (havingQual) |
| { |
| foreach(lc, (List *) havingQual) |
| { |
| Expr *expr = (Expr *) lfirst(lc); |
| RestrictInfo *rinfo; |
| |
| /* |
| * Currently, the core code doesn't wrap havingQuals in |
| * RestrictInfos, so we must make our own. |
| */ |
| Assert(!IsA(expr, RestrictInfo)); |
| rinfo = make_restrictinfo(root, |
| expr, |
| true, |
| false, |
| false, |
| false, |
| root->qual_security_level, |
| grouped_rel->relids, |
| NULL, |
| NULL); |
| if (is_foreign_expr(root, grouped_rel, expr)) |
| fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); |
| else |
| fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); |
| } |
| } |
| |
| /* |
| * If there are any local conditions, pull Vars and aggregates from it and |
| * check whether they are safe to pushdown or not. |
| */ |
| if (fpinfo->local_conds) |
| { |
| List *aggvars = NIL; |
| |
| foreach(lc, fpinfo->local_conds) |
| { |
| RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); |
| |
| aggvars = list_concat(aggvars, |
| pull_var_clause((Node *) rinfo->clause, |
| PVC_INCLUDE_AGGREGATES)); |
| } |
| |
| foreach(lc, aggvars) |
| { |
| Expr *expr = (Expr *) lfirst(lc); |
| |
| /* |
| * If aggregates within local conditions are not safe to push |
| * down, then we cannot push down the query. Vars are already |
| * part of GROUP BY clause which are checked above, so no need to |
| * access them again here. Again, we need not check |
| * is_foreign_param for a foreign aggregate. |
| */ |
| if (IsA(expr, Aggref)) |
| { |
| if (!is_foreign_expr(root, grouped_rel, expr)) |
| return false; |
| |
| tlist = add_to_flat_tlist(tlist, list_make1(expr)); |
| } |
| } |
| } |
| |
| /* Store generated targetlist */ |
| fpinfo->grouped_tlist = tlist; |
| |
| /* Safe to pushdown */ |
| fpinfo->pushdown_safe = true; |
| |
| /* |
| * Set # of retrieved rows and cached relation costs to some negative |
| * value, so that we can detect when they are set to some sensible values, |
| * during one (usually the first) of the calls to estimate_path_cost_size. |
| */ |
| fpinfo->retrieved_rows = -1; |
| fpinfo->rel_startup_cost = -1; |
| fpinfo->rel_total_cost = -1; |
| |
| /* |
| * Set the string describing this grouped relation to be used in EXPLAIN |
| * output of corresponding ForeignScan. Note that the decoration we add |
| * to the base relation name mustn't include any digits, or it'll confuse |
| * postgresExplainForeignScan. |
| */ |
| fpinfo->relation_name = psprintf("Aggregate on (%s)", |
| ofpinfo->relation_name); |
| |
| return true; |
| } |
| |
| /* |
| * postgresGetForeignUpperPaths |
| * Add paths for post-join operations like aggregation, grouping etc. if |
| * corresponding operations are safe to push down. |
| */ |
| static void |
| postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, |
| RelOptInfo *input_rel, RelOptInfo *output_rel, |
| void *extra) |
| { |
| PgFdwRelationInfo *fpinfo; |
| |
| /* |
| * If input rel is not safe to pushdown, then simply return as we cannot |
| * perform any post-join operations on the foreign server. |
| */ |
| if (!input_rel->fdw_private || |
| !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe) |
| return; |
| |
| /* Ignore stages we don't support; and skip any duplicate calls. */ |
| if ((stage != UPPERREL_GROUP_AGG && |
| stage != UPPERREL_ORDERED && |
| stage != UPPERREL_FINAL) || |
| output_rel->fdw_private) |
| return; |
| |
| fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); |
| fpinfo->pushdown_safe = false; |
| fpinfo->stage = stage; |
| output_rel->fdw_private = fpinfo; |
| |
| switch (stage) |
| { |
| case UPPERREL_GROUP_AGG: |
| add_foreign_grouping_paths(root, input_rel, output_rel, |
| (GroupPathExtraData *) extra); |
| break; |
| case UPPERREL_ORDERED: |
| add_foreign_ordered_paths(root, input_rel, output_rel); |
| break; |
| case UPPERREL_FINAL: |
| add_foreign_final_paths(root, input_rel, output_rel, |
| (FinalPathExtraData *) extra); |
| break; |
| default: |
| elog(ERROR, "unexpected upper relation: %d", (int) stage); |
| break; |
| } |
| } |
| |
| /* |
| * add_foreign_grouping_paths |
| * Add foreign path for grouping and/or aggregation. |
| * |
| * Given input_rel represents the underlying scan. The paths are added to the |
| * given grouped_rel. |
| */ |
| static void |
| add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, |
| RelOptInfo *grouped_rel, |
| GroupPathExtraData *extra) |
| { |
| Query *parse = root->parse; |
| PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; |
| PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private; |
| ForeignPath *grouppath; |
| double rows; |
| int width; |
| Cost startup_cost; |
| Cost total_cost; |
| |
| /* Nothing to be done, if there is no grouping or aggregation required. */ |
| if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && |
| !root->hasHavingQual) |
| return; |
| |
| Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE || |
| extra->patype == PARTITIONWISE_AGGREGATE_FULL); |
| |
| /* save the input_rel as outerrel in fpinfo */ |
| fpinfo->outerrel = input_rel; |
| |
| /* |
| * Copy foreign table, foreign server, user mapping, FDW options etc. |
| * details from the input relation's fpinfo. |
| */ |
| fpinfo->table = ifpinfo->table; |
| fpinfo->server = ifpinfo->server; |
| fpinfo->user = ifpinfo->user; |
| merge_fdw_options(fpinfo, ifpinfo, NULL); |
| |
| /* |
| * Assess if it is safe to push down aggregation and grouping. |
| * |
| * Use HAVING qual from extra. In case of child partition, it will have |
| * translated Vars. |
| */ |
| if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual)) |
| return; |
| |
| /* |
| * Compute the selectivity and cost of the local_conds, so we don't have |
| * to do it over again for each path. (Currently we create just a single |
| * path here, but in future it would be possible that we build more paths |
| * such as pre-sorted paths as in postgresGetForeignPaths and |
| * postgresGetForeignJoinPaths.) The best we can do for these conditions |
| * is to estimate selectivity on the basis of local statistics. |
| */ |
| fpinfo->local_conds_sel = clauselist_selectivity(root, |
| fpinfo->local_conds, |
| 0, |
| JOIN_INNER, |
| NULL, |
| false); |
| |
| cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); |
| |
| /* Estimate the cost of push down */ |
| estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL, |
| &rows, &width, &startup_cost, &total_cost); |
| |
| /* Now update this information in the fpinfo */ |
| fpinfo->rows = rows; |
| fpinfo->width = width; |
| fpinfo->startup_cost = startup_cost; |
| fpinfo->total_cost = total_cost; |
| |
| /* Create and add foreign path to the grouping relation. */ |
| grouppath = create_foreign_upper_path(root, |
| grouped_rel, |
| grouped_rel->reltarget, |
| rows, |
| startup_cost, |
| total_cost, |
| NIL, /* no pathkeys */ |
| NULL, |
| NIL); /* no fdw_private */ |
| |
| /* Add generated path into grouped_rel by add_path(). */ |
| add_path(grouped_rel, (Path *) grouppath, root); |
| } |
| |
| /* |
| * add_foreign_ordered_paths |
| * Add foreign paths for performing the final sort remotely. |
| * |
| * Given input_rel contains the source-data Paths. The paths are added to the |
| * given ordered_rel. |
| */ |
| static void |
| add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel, |
| RelOptInfo *ordered_rel) |
| { |
| Query *parse = root->parse; |
| PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; |
| PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private; |
| PgFdwPathExtraData *fpextra; |
| double rows; |
| int width; |
| Cost startup_cost; |
| Cost total_cost; |
| List *fdw_private; |
| ForeignPath *ordered_path; |
| ListCell *lc; |
| |
| /* Shouldn't get here unless the query has ORDER BY */ |
| Assert(parse->sortClause); |
| |
| /* We don't support cases where there are any SRFs in the targetlist */ |
| if (parse->hasTargetSRFs) |
| return; |
| |
| /* Save the input_rel as outerrel in fpinfo */ |
| fpinfo->outerrel = input_rel; |
| |
| /* |
| * Copy foreign table, foreign server, user mapping, FDW options etc. |
| * details from the input relation's fpinfo. |
| */ |
| fpinfo->table = ifpinfo->table; |
| fpinfo->server = ifpinfo->server; |
| fpinfo->user = ifpinfo->user; |
| merge_fdw_options(fpinfo, ifpinfo, NULL); |
| |
| /* |
| * If the input_rel is a base or join relation, we would already have |
| * considered pushing down the final sort to the remote server when |
| * creating pre-sorted foreign paths for that relation, because the |
| * query_pathkeys is set to the root->sort_pathkeys in that case (see |
| * standard_qp_callback()). |
| */ |
| if (input_rel->reloptkind == RELOPT_BASEREL || |
| input_rel->reloptkind == RELOPT_JOINREL) |
| { |
| Assert(root->query_pathkeys == root->sort_pathkeys); |
| |
| /* Safe to push down if the query_pathkeys is safe to push down */ |
| fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe; |
| |
| return; |
| } |
| |
| /* The input_rel should be a grouping relation */ |
| Assert(input_rel->reloptkind == RELOPT_UPPER_REL && |
| ifpinfo->stage == UPPERREL_GROUP_AGG); |
| |
| /* |
| * We try to create a path below by extending a simple foreign path for |
| * the underlying grouping relation to perform the final sort remotely, |
| * which is stored into the fdw_private list of the resulting path. |
| */ |
| |
| /* Assess if it is safe to push down the final sort */ |
| foreach(lc, root->sort_pathkeys) |
| { |
| PathKey *pathkey = (PathKey *) lfirst(lc); |
| EquivalenceClass *pathkey_ec = pathkey->pk_eclass; |
| |
| /* |
| * is_foreign_expr would detect volatile expressions as well, but |
| * checking ec_has_volatile here saves some cycles. |
| */ |
| if (pathkey_ec->ec_has_volatile) |
| return; |
| |
| /* |
| * Can't push down the sort if pathkey's opfamily is not shippable. |
| */ |
| if (!is_shippable(pathkey->pk_opfamily, OperatorFamilyRelationId, |
| fpinfo)) |
| return; |
| |
| /* |
| * The EC must contain a shippable EM that is computed in input_rel's |
| * reltarget, else we can't push down the sort. |
| */ |
| if (find_em_for_rel_target(root, |
| pathkey_ec, |
| input_rel) == NULL) |
| return; |
| } |
| |
| /* Safe to push down */ |
| fpinfo->pushdown_safe = true; |
| |
| /* Construct PgFdwPathExtraData */ |
| fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData)); |
| fpextra->target = root->upper_targets[UPPERREL_ORDERED]; |
| fpextra->has_final_sort = true; |
| |
| /* Estimate the costs of performing the final sort remotely */ |
| estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra, |
| &rows, &width, &startup_cost, &total_cost); |
| |
| /* |
| * Build the fdw_private list that will be used by postgresGetForeignPlan. |
| * Items in the list must match order in enum FdwPathPrivateIndex. |
| */ |
| fdw_private = list_make2(makeBoolean(true), makeBoolean(false)); |
| |
| /* Create foreign ordering path */ |
| ordered_path = create_foreign_upper_path(root, |
| input_rel, |
| root->upper_targets[UPPERREL_ORDERED], |
| rows, |
| startup_cost, |
| total_cost, |
| root->sort_pathkeys, |
| NULL, /* no extra plan */ |
| fdw_private); |
| |
| /* and add it to the ordered_rel */ |
| add_path(ordered_rel, (Path *) ordered_path, root); |
| } |
| |
| /* |
| * add_foreign_final_paths |
| * Add foreign paths for performing the final processing remotely. |
| * |
| * Given input_rel contains the source-data Paths. The paths are added to the |
| * given final_rel. |
| */ |
| static void |
| add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, |
| RelOptInfo *final_rel, |
| FinalPathExtraData *extra) |
| { |
| Query *parse = root->parse; |
| PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private; |
| PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private; |
| bool has_final_sort = false; |
| List *pathkeys = NIL; |
| PgFdwPathExtraData *fpextra; |
| bool save_use_remote_estimate = false; |
| double rows; |
| int width; |
| Cost startup_cost; |
| Cost total_cost; |
| List *fdw_private; |
| ForeignPath *final_path; |
| |
| /* |
| * Currently, we only support this for SELECT commands |
| */ |
| if (parse->commandType != CMD_SELECT) |
| return; |
| |
| /* |
| * No work if there is no FOR UPDATE/SHARE clause and if there is no need |
| * to add a LIMIT node |
| */ |
| if (!parse->rowMarks && !extra->limit_needed) |
| return; |
| |
| /* We don't support cases where there are any SRFs in the targetlist */ |
| if (parse->hasTargetSRFs) |
| return; |
| |
| /* Save the input_rel as outerrel in fpinfo */ |
| fpinfo->outerrel = input_rel; |
| |
| /* |
| * Copy foreign table, foreign server, user mapping, FDW options etc. |
| * details from the input relation's fpinfo. |
| */ |
| fpinfo->table = ifpinfo->table; |
| fpinfo->server = ifpinfo->server; |
| fpinfo->user = ifpinfo->user; |
| merge_fdw_options(fpinfo, ifpinfo, NULL); |
| |
| /* |
| * If there is no need to add a LIMIT node, there might be a ForeignPath |
| * in the input_rel's pathlist that implements all behavior of the query. |
| * Note: we would already have accounted for the query's FOR UPDATE/SHARE |
| * (if any) before we get here. |
| */ |
| if (!extra->limit_needed) |
| { |
| ListCell *lc; |
| |
| Assert(parse->rowMarks); |
| |
| /* |
| * Grouping and aggregation are not supported with FOR UPDATE/SHARE, |
| * so the input_rel should be a base, join, or ordered relation; and |
| * if it's an ordered relation, its input relation should be a base or |
| * join relation. |
| */ |
| Assert(input_rel->reloptkind == RELOPT_BASEREL || |
| input_rel->reloptkind == RELOPT_JOINREL || |
| (input_rel->reloptkind == RELOPT_UPPER_REL && |
| ifpinfo->stage == UPPERREL_ORDERED && |
| (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL || |
| ifpinfo->outerrel->reloptkind == RELOPT_JOINREL))); |
| |
| foreach(lc, input_rel->pathlist) |
| { |
| Path *path = (Path *) lfirst(lc); |
| |
| /* |
| * apply_scanjoin_target_to_paths() uses create_projection_path() |
| * to adjust each of its input paths if needed, whereas |
| * create_ordered_paths() uses apply_projection_to_path() to do |
| * that. So the former might have put a ProjectionPath on top of |
| * the ForeignPath; look through ProjectionPath and see if the |
| * path underneath it is ForeignPath. |
| */ |
| if (IsA(path, ForeignPath) || |
| (IsA(path, ProjectionPath) && |
| IsA(((ProjectionPath *) path)->subpath, ForeignPath))) |
| { |
| /* |
| * Create foreign final path; this gets rid of a |
| * no-longer-needed outer plan (if any), which makes the |
| * EXPLAIN output look cleaner |
| */ |
| final_path = create_foreign_upper_path(root, |
| path->parent, |
| path->pathtarget, |
| path->rows, |
| path->startup_cost, |
| path->total_cost, |
| path->pathkeys, |
| NULL, /* no extra plan */ |
| NULL); /* no fdw_private */ |
| |
| /* and add it to the final_rel */ |
| add_path(final_rel, (Path *) final_path, root); |
| |
| /* Safe to push down */ |
| fpinfo->pushdown_safe = true; |
| |
| return; |
| } |
| } |
| |
| /* |
| * If we get here it means no ForeignPaths; since we would already |
| * have considered pushing down all operations for the query to the |
| * remote server, give up on it. |
| */ |
| return; |
| } |
| |
| Assert(extra->limit_needed); |
| |
| /* |
| * If the input_rel is an ordered relation, replace the input_rel with its |
| * input relation |
| */ |
| if (input_rel->reloptkind == RELOPT_UPPER_REL && |
| ifpinfo->stage == UPPERREL_ORDERED) |
| { |
| input_rel = ifpinfo->outerrel; |
| ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private; |
| has_final_sort = true; |
| pathkeys = root->sort_pathkeys; |
| } |
| |
| /* The input_rel should be a base, join, or grouping relation */ |
| Assert(input_rel->reloptkind == RELOPT_BASEREL || |
| input_rel->reloptkind == RELOPT_JOINREL || |
| (input_rel->reloptkind == RELOPT_UPPER_REL && |
| ifpinfo->stage == UPPERREL_GROUP_AGG)); |
| |
| /* |
| * We try to create a path below by extending a simple foreign path for |
| * the underlying base, join, or grouping relation to perform the final |
| * sort (if has_final_sort) and the LIMIT restriction remotely, which is |
| * stored into the fdw_private list of the resulting path. (We |
| * re-estimate the costs of sorting the underlying relation, if |
| * has_final_sort.) |
| */ |
| |
| /* |
| * Assess if it is safe to push down the LIMIT and OFFSET to the remote |
| * server |
| */ |
| |
| /* |
| * If the underlying relation has any local conditions, the LIMIT/OFFSET |
| * cannot be pushed down. |
| */ |
| if (ifpinfo->local_conds) |
| return; |
| |
| /* |
| * If the query has FETCH FIRST .. WITH TIES, 1) it must have ORDER BY as |
| * well, which is used to determine which additional rows tie for the last |
| * place in the result set, and 2) ORDER BY must already have been |
| * determined to be safe to push down before we get here. So in that case |
| * the FETCH clause is safe to push down with ORDER BY if the remote |
| * server is v13 or later, but if not, the remote query will fail entirely |
| * for lack of support for it. Since we do not currently have a way to do |
| * a remote-version check (without accessing the remote server), disable |
| * pushing the FETCH clause for now. |
| */ |
| if (parse->limitOption == LIMIT_OPTION_WITH_TIES) |
| return; |
| |
| /* |
| * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are |
| * not safe to remote. |
| */ |
| if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) || |
| !is_foreign_expr(root, input_rel, (Expr *) parse->limitCount)) |
| return; |
| |
| /* Safe to push down */ |
| fpinfo->pushdown_safe = true; |
| |
| /* Construct PgFdwPathExtraData */ |
| fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData)); |
| fpextra->target = root->upper_targets[UPPERREL_FINAL]; |
| fpextra->has_final_sort = has_final_sort; |
| fpextra->has_limit = extra->limit_needed; |
| fpextra->limit_tuples = extra->limit_tuples; |
| fpextra->count_est = extra->count_est; |
| fpextra->offset_est = extra->offset_est; |
| |
| /* If mpp_execute = 'all segments', we need to adjust origin count_est and offset_est. */ |
| if (final_rel->exec_location == FTEXECLOCATION_ALL_SEGMENTS && fpextra->offset_est > 0) |
| { |
| if (fpextra->count_est > 0) |
| { |
| /* |
| * When both OFFSET and LIMIT clause are specified, |
| * we need to fetch tuples from 0 to limitOffset + limitCount from remote servers. |
| */ |
| fpextra->count_est += fpextra->offset_est; |
| } |
| else |
| { |
| /* |
| * When only OFFSET clasue is specified, |
| * we need to fetch all tuples from remote servers. |
| */ |
| fpextra->count_est = 0; |
| } |
| |
| fpextra->offset_est = 0; |
| } |
| |
| /* |
| * Estimate the costs of performing the final sort and the LIMIT |
| * restriction remotely. If has_final_sort is false, we wouldn't need to |
| * execute EXPLAIN anymore if use_remote_estimate, since the costs can be |
| * roughly estimated using the costs we already have for the underlying |
| * relation, in the same way as when use_remote_estimate is false. Since |
| * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to |
| * false in that case. |
| */ |
| if (!fpextra->has_final_sort) |
| { |
| save_use_remote_estimate = ifpinfo->use_remote_estimate; |
| ifpinfo->use_remote_estimate = false; |
| } |
| estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra, |
| &rows, &width, &startup_cost, &total_cost); |
| if (!fpextra->has_final_sort) |
| ifpinfo->use_remote_estimate = save_use_remote_estimate; |
| |
| /* |
| * Build the fdw_private list that will be used by postgresGetForeignPlan. |
| * Items in the list must match order in enum FdwPathPrivateIndex. |
| */ |
| fdw_private = list_make2(makeBoolean(has_final_sort), |
| makeBoolean(extra->limit_needed)); |
| |
| /* |
| * Create foreign final path; this gets rid of a no-longer-needed outer |
| * plan (if any), which makes the EXPLAIN output look cleaner |
| */ |
| final_path = create_foreign_upper_path(root, |
| input_rel, |
| root->upper_targets[UPPERREL_FINAL], |
| rows, |
| startup_cost, |
| total_cost, |
| pathkeys, |
| NULL, /* no extra plan */ |
| fdw_private); |
| |
| /* and add it to the final_rel */ |
| add_path(final_rel, (Path *) final_path, root); |
| } |
| |
| /* |
| * postgresIsForeignPathAsyncCapable |
| * Check whether a given ForeignPath node is async-capable. |
| */ |
| static bool |
| postgresIsForeignPathAsyncCapable(ForeignPath *path) |
| { |
| RelOptInfo *rel = ((Path *) path)->parent; |
| PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; |
| |
| return fpinfo->async_capable; |
| } |
| |
| /* |
| * postgresForeignAsyncRequest |
| * Asynchronously request next tuple from a foreign PostgreSQL table. |
| */ |
| static void |
| postgresForeignAsyncRequest(AsyncRequest *areq) |
| { |
| produce_tuple_asynchronously(areq, true); |
| } |
| |
| /* |
| * postgresForeignAsyncConfigureWait |
| * Configure a file descriptor event for which we wish to wait. |
| */ |
| static void |
| postgresForeignAsyncConfigureWait(AsyncRequest *areq) |
| { |
| ForeignScanState *node = (ForeignScanState *) areq->requestee; |
| PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; |
| AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; |
| AppendState *requestor = (AppendState *) areq->requestor; |
| WaitEventSet *set = requestor->as_eventset; |
| |
| /* This should not be called unless callback_pending */ |
| Assert(areq->callback_pending); |
| |
| /* |
| * If process_pending_request() has been invoked on the given request |
| * before we get here, we might have some tuples already; in which case |
| * complete the request |
| */ |
| if (fsstate->next_tuple < fsstate->num_tuples) |
| { |
| complete_pending_request(areq); |
| if (areq->request_complete) |
| return; |
| Assert(areq->callback_pending); |
| } |
| |
| /* We must have run out of tuples */ |
| Assert(fsstate->next_tuple >= fsstate->num_tuples); |
| |
| /* The core code would have registered postmaster death event */ |
| Assert(GetNumRegisteredWaitEvents(set) >= 1); |
| |
| /* Begin an asynchronous data fetch if not already done */ |
| if (!pendingAreq) |
| fetch_more_data_begin(areq); |
| else if (pendingAreq->requestor != areq->requestor) |
| { |
| /* |
| * This is the case when the in-process request was made by another |
| <<<<<<< HEAD |
| * Append. Note that it might be useless to process the request, |
| * because the query might not need tuples from that Append anymore. |
| * If there are any child subplans of the same parent that are ready |
| * for new requests, skip the given request. Likewise, if there are |
| * any configured events other than the postmaster death event, skip |
| * it. Otherwise, process the in-process request, then begin a fetch |
| * to configure the event below, because we might otherwise end up |
| * with no configured events other than the postmaster death event. |
| ======= |
| * Append. Note that it might be useless to process the request made |
| * by that Append, because the query might not need tuples from that |
| * Append anymore; so we avoid processing it to begin a fetch for the |
| * given request if possible. If there are any child subplans of the |
| * same parent that are ready for new requests, skip the given |
| * request. Likewise, if there are any configured events other than |
| * the postmaster death event, skip it. Otherwise, process the |
| * in-process request, then begin a fetch to configure the event |
| * below, because we might otherwise end up with no configured events |
| * other than the postmaster death event. |
| >>>>>>> REL_16_9 |
| */ |
| if (!bms_is_empty(requestor->as_needrequest)) |
| return; |
| if (GetNumRegisteredWaitEvents(set) > 1) |
| return; |
| process_pending_request(pendingAreq); |
| fetch_more_data_begin(areq); |
| } |
| else if (pendingAreq->requestee != areq->requestee) |
| { |
| /* |
| * This is the case when the in-process request was made by the same |
| * parent but for a different child. Since we configure only the |
| * event for the request made for that child, skip the given request. |
| */ |
| return; |
| } |
| else |
| Assert(pendingAreq == areq); |
| |
| AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn), |
| NULL, areq); |
| } |
| |
| /* |
| * postgresForeignAsyncNotify |
| * Fetch some more tuples from a file descriptor that becomes ready, |
| * requesting next tuple. |
| */ |
| static void |
| postgresForeignAsyncNotify(AsyncRequest *areq) |
| { |
| ForeignScanState *node = (ForeignScanState *) areq->requestee; |
| PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; |
| |
| /* The core code would have initialized the callback_pending flag */ |
| Assert(!areq->callback_pending); |
| |
| /* |
| * If process_pending_request() has been invoked on the given request |
| * before we get here, we might have some tuples already; in which case |
| * produce the next tuple |
| */ |
| if (fsstate->next_tuple < fsstate->num_tuples) |
| { |
| produce_tuple_asynchronously(areq, true); |
| return; |
| } |
| |
| /* We must have run out of tuples */ |
| Assert(fsstate->next_tuple >= fsstate->num_tuples); |
| |
| /* The request should be currently in-process */ |
| Assert(fsstate->conn_state->pendingAreq == areq); |
| |
| /* On error, report the original query, not the FETCH. */ |
| if (!PQconsumeInput(fsstate->conn)) |
| pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); |
| |
| fetch_more_data(node); |
| |
| produce_tuple_asynchronously(areq, true); |
| } |
| |
| /* |
| * Asynchronously produce next tuple from a foreign PostgreSQL table. |
| */ |
| static void |
| produce_tuple_asynchronously(AsyncRequest *areq, bool fetch) |
| { |
| ForeignScanState *node = (ForeignScanState *) areq->requestee; |
| PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; |
| AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; |
| TupleTableSlot *result; |
| |
| /* This should not be called if the request is currently in-process */ |
| Assert(areq != pendingAreq); |
| |
| /* Fetch some more tuples, if we've run out */ |
| if (fsstate->next_tuple >= fsstate->num_tuples) |
| { |
| /* No point in another fetch if we already detected EOF, though */ |
| if (!fsstate->eof_reached) |
| { |
| /* Mark the request as pending for a callback */ |
| ExecAsyncRequestPending(areq); |
| /* Begin another fetch if requested and if no pending request */ |
| if (fetch && !pendingAreq) |
| fetch_more_data_begin(areq); |
| } |
| else |
| { |
| /* There's nothing more to do; just return a NULL pointer */ |
| result = NULL; |
| /* Mark the request as complete */ |
| ExecAsyncRequestDone(areq, result); |
| } |
| return; |
| } |
| |
| /* Get a tuple from the ForeignScan node */ |
| result = areq->requestee->ExecProcNodeReal(areq->requestee); |
| if (!TupIsNull(result)) |
| { |
| /* Mark the request as complete */ |
| ExecAsyncRequestDone(areq, result); |
| return; |
| } |
| |
| /* We must have run out of tuples */ |
| Assert(fsstate->next_tuple >= fsstate->num_tuples); |
| |
| /* Fetch some more tuples, if we've not detected EOF yet */ |
| if (!fsstate->eof_reached) |
| { |
| /* Mark the request as pending for a callback */ |
| ExecAsyncRequestPending(areq); |
| /* Begin another fetch if requested and if no pending request */ |
| if (fetch && !pendingAreq) |
| fetch_more_data_begin(areq); |
| } |
| else |
| { |
| /* There's nothing more to do; just return a NULL pointer */ |
| result = NULL; |
| /* Mark the request as complete */ |
| ExecAsyncRequestDone(areq, result); |
| } |
| } |
| |
| /* |
| * Begin an asynchronous data fetch. |
| * |
| * Note: this function assumes there is no currently-in-progress asynchronous |
| * data fetch. |
| * |
| * Note: fetch_more_data must be called to fetch the result. |
| */ |
| static void |
| fetch_more_data_begin(AsyncRequest *areq) |
| { |
| ForeignScanState *node = (ForeignScanState *) areq->requestee; |
| PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; |
| char sql[64]; |
| |
| Assert(!fsstate->conn_state->pendingAreq); |
| |
| /* Create the cursor synchronously. */ |
| if (!fsstate->cursor_exists) |
| create_cursor(node); |
| |
| /* We will send this query, but not wait for the response. */ |
| snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", |
| fsstate->fetch_size, fsstate->cursor_number); |
| |
| if (!PQsendQuery(fsstate->conn, sql)) |
| pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); |
| |
| /* Remember that the request is in process */ |
| fsstate->conn_state->pendingAreq = areq; |
| } |
| |
| /* |
| * Process a pending asynchronous request. |
| */ |
| void |
| process_pending_request(AsyncRequest *areq) |
| { |
| ForeignScanState *node = (ForeignScanState *) areq->requestee; |
| <<<<<<< HEAD |
| PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state; |
| ======= |
| PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; |
| >>>>>>> REL_16_9 |
| |
| /* The request would have been pending for a callback */ |
| Assert(areq->callback_pending); |
| |
| /* The request should be currently in-process */ |
| Assert(fsstate->conn_state->pendingAreq == areq); |
| |
| fetch_more_data(node); |
| |
| /* |
| * If we didn't get any tuples, must be end of data; complete the request |
| * now. Otherwise, we postpone completing the request until we are called |
| * from postgresForeignAsyncConfigureWait()/postgresForeignAsyncNotify(). |
| */ |
| if (fsstate->next_tuple >= fsstate->num_tuples) |
| { |
| /* Unlike AsyncNotify, we unset callback_pending ourselves */ |
| areq->callback_pending = false; |
| /* Mark the request as complete */ |
| ExecAsyncRequestDone(areq, NULL); |
| /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ |
| ExecAsyncResponse(areq); |
| } |
| } |
| |
| /* |
| * Complete a pending asynchronous request. |
| */ |
| static void |
| complete_pending_request(AsyncRequest *areq) |
| { |
| /* The request would have been pending for a callback */ |
| Assert(areq->callback_pending); |
| |
| /* Unlike AsyncNotify, we unset callback_pending ourselves */ |
| areq->callback_pending = false; |
| |
| /* We begin a fetch afterwards if necessary; don't fetch */ |
| produce_tuple_asynchronously(areq, false); |
| |
| /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ |
| ExecAsyncResponse(areq); |
| |
| /* Also, we do instrumentation ourselves, if required */ |
| if (areq->requestee->instrument) |
| InstrUpdateTupleCount(areq->requestee->instrument, |
| TupIsNull(areq->result) ? 0.0 : 1.0); |
| } |
| |
| /* |
| * Create a tuple from the specified row of the PGresult. |
| * |
| * rel is the local representation of the foreign table, attinmeta is |
| * conversion data for the rel's tupdesc, and retrieved_attrs is an |
| * integer list of the table column numbers present in the PGresult. |
| * fsstate is the ForeignScan plan node's execution state. |
| * temp_context is a working context that can be reset after each tuple. |
| * |
| * Note: either rel or fsstate, but not both, can be NULL. rel is NULL |
| * if we're processing a remote join, while fsstate is NULL in a non-query |
| * context such as ANALYZE, or if we're processing a non-scan query node. |
| */ |
| static HeapTuple |
| make_tuple_from_result_row(PGresult *res, |
| int row, |
| Relation rel, |
| AttInMetadata *attinmeta, |
| List *retrieved_attrs, |
| ForeignScanState *fsstate, |
| MemoryContext temp_context) |
| { |
| HeapTuple tuple; |
| TupleDesc tupdesc; |
| Datum *values; |
| bool *nulls; |
| ItemPointer ctid = NULL; |
| ConversionLocation errpos; |
| ErrorContextCallback errcallback; |
| MemoryContext oldcontext; |
| ListCell *lc; |
| int j; |
| |
| Assert(row < PQntuples(res)); |
| |
| /* |
| * Do the following work in a temp context that we reset after each tuple. |
| * This cleans up not only the data we have direct access to, but any |
| * cruft the I/O functions might leak. |
| */ |
| oldcontext = MemoryContextSwitchTo(temp_context); |
| |
| /* |
| * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is |
| * provided, otherwise look to the scan node's ScanTupleSlot. |
| */ |
| if (rel) |
| tupdesc = RelationGetDescr(rel); |
| else |
| { |
| Assert(fsstate); |
| tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; |
| } |
| |
| values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); |
| nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); |
| /* Initialize to nulls for any columns not present in result */ |
| memset(nulls, true, tupdesc->natts * sizeof(bool)); |
| |
| /* |
| * Set up and install callback to report where conversion error occurs. |
| */ |
| errpos.cur_attno = 0; |
| errpos.rel = rel; |
| errpos.fsstate = fsstate; |
| errcallback.callback = conversion_error_callback; |
| errcallback.arg = (void *) &errpos; |
| errcallback.previous = error_context_stack; |
| error_context_stack = &errcallback; |
| |
| /* |
| * i indexes columns in the relation, j indexes columns in the PGresult. |
| */ |
| j = 0; |
| foreach(lc, retrieved_attrs) |
| { |
| int i = lfirst_int(lc); |
| char *valstr; |
| |
| /* fetch next column's textual value */ |
| if (PQgetisnull(res, row, j)) |
| valstr = NULL; |
| else |
| valstr = PQgetvalue(res, row, j); |
| |
| /* |
| * convert value to internal representation |
| * |
| * Note: we ignore system columns other than ctid and oid in result |
| */ |
| errpos.cur_attno = i; |
| if (i > 0) |
| { |
| /* ordinary column */ |
| Assert(i <= tupdesc->natts); |
| nulls[i - 1] = (valstr == NULL); |
| /* Apply the input function even to nulls, to support domains */ |
| values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1], |
| valstr, |
| attinmeta->attioparams[i - 1], |
| attinmeta->atttypmods[i - 1]); |
| } |
| else if (i == SelfItemPointerAttributeNumber) |
| { |
| /* ctid */ |
| if (valstr != NULL) |
| { |
| Datum datum; |
| |
| datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); |
| ctid = (ItemPointer) DatumGetPointer(datum); |
| } |
| } |
| errpos.cur_attno = 0; |
| |
| j++; |
| } |
| |
| /* Uninstall error context callback. */ |
| error_context_stack = errcallback.previous; |
| |
| /* |
| * Check we got the expected number of columns. Note: j == 0 and |
| * PQnfields == 1 is expected, since deparse emits a NULL if no columns. |
| */ |
| if (j > 0 && j != PQnfields(res)) |
| elog(ERROR, "remote query result does not match the foreign table"); |
| |
| /* |
| * Build the result tuple in caller's memory context. |
| */ |
| MemoryContextSwitchTo(oldcontext); |
| |
| tuple = heap_form_tuple(tupdesc, values, nulls); |
| |
| /* |
| * If we have a CTID to return, install it in both t_self and t_ctid. |
| * t_self is the normal place, but if the tuple is converted to a |
| * composite Datum, t_self will be lost; setting t_ctid allows CTID to be |
| * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code). |
| */ |
| if (ctid) |
| tuple->t_self = tuple->t_data->t_ctid = *ctid; |
| |
| /* |
| * Stomp on the xmin, xmax, and cmin fields from the tuple created by |
| * heap_form_tuple. heap_form_tuple actually creates the tuple with |
| * DatumTupleFields, not HeapTupleFields, but the executor expects |
| * HeapTupleFields and will happily extract system columns on that |
| * assumption. If we don't do this then, for example, the tuple length |
| * ends up in the xmin field, which isn't what we want. |
| */ |
| HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId); |
| HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId); |
| HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId); |
| |
| /* Clean up */ |
| MemoryContextReset(temp_context); |
| |
| return tuple; |
| } |
| |
| /* |
| * Callback function which is called when error occurs during column value |
| * conversion. Print names of column and relation. |
| * |
| * Note that this function mustn't do any catalog lookups, since we are in |
| * an already-failed transaction. Fortunately, we can get the needed info |
| * from the relation or the query's rangetable instead. |
| */ |
| static void |
| conversion_error_callback(void *arg) |
| { |
| ConversionLocation *errpos = (ConversionLocation *) arg; |
| Relation rel = errpos->rel; |
| ForeignScanState *fsstate = errpos->fsstate; |
| const char *attname = NULL; |
| const char *relname = NULL; |
| bool is_wholerow = false; |
| |
| /* |
| * If we're in a scan node, always use aliases from the rangetable, for |
| * consistency between the simple-relation and remote-join cases. Look at |
| * the relation's tupdesc only if we're not in a scan node. |
| */ |
| if (fsstate) |
| { |
| /* ForeignScan case */ |
| ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan); |
| int varno = 0; |
| AttrNumber colno = 0; |
| |
| if (fsplan->scan.scanrelid > 0) |
| { |
| /* error occurred in a scan against a foreign table */ |
| varno = fsplan->scan.scanrelid; |
| colno = errpos->cur_attno; |
| } |
| else |
| { |
| /* error occurred in a scan against a foreign join */ |
| TargetEntry *tle; |
| |
| tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, |
| errpos->cur_attno - 1); |
| |
| /* |
| * Target list can have Vars and expressions. For Vars, we can |
| * get some information, however for expressions we can't. Thus |
| * for expressions, just show generic context message. |
| */ |
| if (IsA(tle->expr, Var)) |
| { |
| Var *var = (Var *) tle->expr; |
| |
| varno = var->varno; |
| colno = var->varattno; |
| } |
| } |
| |
| if (varno > 0) |
| { |
| EState *estate = fsstate->ss.ps.state; |
| RangeTblEntry *rte = exec_rt_fetch(varno, estate); |
| |
| relname = rte->eref->aliasname; |
| |
| if (colno == 0) |
| is_wholerow = true; |
| else if (colno > 0 && colno <= list_length(rte->eref->colnames)) |
| attname = strVal(list_nth(rte->eref->colnames, colno - 1)); |
| else if (colno == SelfItemPointerAttributeNumber) |
| attname = "ctid"; |
| } |
| } |
| else if (rel) |
| { |
| /* Non-ForeignScan case (we should always have a rel here) */ |
| TupleDesc tupdesc = RelationGetDescr(rel); |
| |
| relname = RelationGetRelationName(rel); |
| if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) |
| { |
| Form_pg_attribute attr = TupleDescAttr(tupdesc, |
| errpos->cur_attno - 1); |
| |
| attname = NameStr(attr->attname); |
| } |
| else if (errpos->cur_attno == SelfItemPointerAttributeNumber) |
| attname = "ctid"; |
| } |
| |
| if (relname && is_wholerow) |
| errcontext("whole-row reference to foreign table \"%s\"", relname); |
| else if (relname && attname) |
| errcontext("column \"%s\" of foreign table \"%s\"", attname, relname); |
| else |
| errcontext("processing expression at position %d in select list", |
| errpos->cur_attno); |
| } |
| |
| /* |
| * Given an EquivalenceClass and a foreign relation, find an EC member |
| * that can be used to sort the relation remotely according to a pathkey |
| * using this EC. |
| * |
| * If there is more than one suitable candidate, return an arbitrary |
| * one of them. If there is none, return NULL. |
| * |
| * This checks that the EC member expression uses only Vars from the given |
| * rel and is shippable. Caller must separately verify that the pathkey's |
| * ordering operator is shippable. |
| */ |
| EquivalenceMember * |
| find_em_for_rel(PlannerInfo *root, EquivalenceClass *ec, RelOptInfo *rel) |
| { |
| ListCell *lc; |
| |
| foreach(lc, ec->ec_members) |
| { |
| EquivalenceMember *em = (EquivalenceMember *) lfirst(lc); |
| |
| /* |
| * Note we require !bms_is_empty, else we'd accept constant |
| * expressions which are not suitable for the purpose. |
| */ |
| if (bms_is_subset(em->em_relids, rel->relids) && |
| !bms_is_empty(em->em_relids) && |
| is_foreign_expr(root, rel, em->em_expr)) |
| return em; |
| } |
| |
| return NULL; |
| } |
| |
| /* |
| * Find an EquivalenceClass member that is to be computed as a sort column |
| * in the given rel's reltarget, and is shippable. |
| * |
| * If there is more than one suitable candidate, return an arbitrary |
| * one of them. If there is none, return NULL. |
| * |
| * This checks that the EC member expression uses only Vars from the given |
| * rel and is shippable. Caller must separately verify that the pathkey's |
| * ordering operator is shippable. |
| */ |
| EquivalenceMember * |
| find_em_for_rel_target(PlannerInfo *root, EquivalenceClass *ec, |
| RelOptInfo *rel) |
| { |
| PathTarget *target = rel->reltarget; |
| ListCell *lc1; |
| int i; |
| |
| i = 0; |
| foreach(lc1, target->exprs) |
| { |
| Expr *expr = (Expr *) lfirst(lc1); |
| Index sgref = get_pathtarget_sortgroupref(target, i); |
| ListCell *lc2; |
| |
| /* Ignore non-sort expressions */ |
| if (sgref == 0 || |
| get_sortgroupref_clause_noerr(sgref, |
| root->parse->sortClause) == NULL) |
| { |
| i++; |
| continue; |
| } |
| |
| /* We ignore binary-compatible relabeling on both ends */ |
| while (expr && IsA(expr, RelabelType)) |
| expr = ((RelabelType *) expr)->arg; |
| |
| /* Locate an EquivalenceClass member matching this expr, if any */ |
| foreach(lc2, ec->ec_members) |
| { |
| EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2); |
| Expr *em_expr; |
| |
| /* Don't match constants */ |
| if (em->em_is_const) |
| continue; |
| |
| /* Ignore child members */ |
| if (em->em_is_child) |
| continue; |
| |
| /* Match if same expression (after stripping relabel) */ |
| em_expr = em->em_expr; |
| while (em_expr && IsA(em_expr, RelabelType)) |
| em_expr = ((RelabelType *) em_expr)->arg; |
| |
| if (!equal(em_expr, expr)) |
| continue; |
| |
| /* Check that expression (including relabels!) is shippable */ |
| if (is_foreign_expr(root, rel, em->em_expr)) |
| return em; |
| } |
| |
| i++; |
| } |
| |
| return NULL; |
| } |
| |
| /* |
| * Determine batch size for a given foreign table. The option specified for |
| * a table has precedence. |
| */ |
| static int |
| get_batch_size_option(Relation rel) |
| { |
| Oid foreigntableid = RelationGetRelid(rel); |
| ForeignTable *table; |
| ForeignServer *server; |
| List *options; |
| ListCell *lc; |
| |
| /* we use 1 by default, which means "no batching" */ |
| int batch_size = 1; |
| |
| /* |
| * Load options for table and server. We append server options after table |
| * options, because table options take precedence. |
| */ |
| table = GetForeignTable(foreigntableid); |
| server = GetForeignServer(table->serverid); |
| |
| options = NIL; |
| options = list_concat(options, table->options); |
| options = list_concat(options, server->options); |
| |
| /* See if either table or server specifies batch_size. */ |
| foreach(lc, options) |
| { |
| DefElem *def = (DefElem *) lfirst(lc); |
| |
| if (strcmp(def->defname, "batch_size") == 0) |
| { |
| (void) parse_int(defGetString(def), &batch_size, 0, NULL); |
| break; |
| } |
| } |
| |
| return batch_size; |
| } |
| |
| static int |
| greenplumCheckIsCloudberry(UserMapping *user) |
| { |
| PGconn *conn; |
| PGresult *res; |
| int ret; |
| |
| char *query = "SELECT version()"; |
| |
| conn = GetConnection(user, false, NULL); |
| |
| res = pgfdw_exec_query(conn, query, NULL); |
| if (PQresultStatus(res) != PGRES_TUPLES_OK) |
| pgfdw_report_error(ERROR, res, conn, true, query); |
| |
| if (PQntuples(res) == 0) |
| pgfdw_report_error(ERROR, res, conn, true, query); |
| |
| ret = strstr(PQgetvalue(res, 0, 0), "Apache Cloudberry") ? 1 : 0; |
| |
| PQclear(res); |
| ReleaseConnection(conn); |
| |
| return ret; |
| } |