blob: be7825bfbbf49d39873fb498c6428bb8b5463923 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "executor/tuptable.h"
#include "nodes/execnodes.h"
#include "nodes/extensible.h"
#include "nodes/nodes.h"
#include "nodes/plannodes.h"
#include "rewrite/rewriteHandler.h"
#include "utils/rel.h"
#include "catalog/ag_label.h"
#include "executor/cypher_executor.h"
#include "executor/cypher_utils.h"
#include "nodes/cypher_nodes.h"
#include "utils/agtype.h"
#include "utils/graphid.h"
static void begin_cypher_create(CustomScanState *node, EState *estate,
int eflags);
static TupleTableSlot *exec_cypher_create(CustomScanState *node);
static void end_cypher_create(CustomScanState *node);
static void rescan_cypher_create(CustomScanState *node);
static void create_edge(cypher_create_custom_scan_state *css,
cypher_target_node *node, Datum prev_vertex_id,
ListCell *next, List *list);
static Datum create_vertex(cypher_create_custom_scan_state *css,
cypher_target_node *node, ListCell *next,
List *list);
static void process_pattern(cypher_create_custom_scan_state *css);
const CustomExecMethods cypher_create_exec_methods = {CREATE_SCAN_STATE_NAME,
begin_cypher_create,
exec_cypher_create,
end_cypher_create,
rescan_cypher_create,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL};
static void begin_cypher_create(CustomScanState *node, EState *estate,
int eflags)
{
cypher_create_custom_scan_state *css =
(cypher_create_custom_scan_state *)node;
ListCell *lc;
Plan *subplan;
Assert(list_length(css->cs->custom_plans) == 1);
subplan = linitial(css->cs->custom_plans);
node->ss.ps.lefttree = ExecInitNode(subplan, estate, eflags);
ExecAssignExprContext(estate, &node->ss.ps);
ExecInitScanTupleSlot(estate, &node->ss,
ExecGetResultType(node->ss.ps.lefttree),
&TTSOpsHeapTuple);
if (!CYPHER_CLAUSE_IS_TERMINAL(css->flags))
{
TupleDesc tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
ExecAssignProjectionInfo(&node->ss.ps, tupdesc);
}
foreach (lc, css->pattern)
{
cypher_create_path *path = lfirst(lc);
ListCell *lc2;
foreach (lc2, path->target_nodes)
{
cypher_target_node *cypher_node =
(cypher_target_node *)lfirst(lc2);
Relation rel;
if (!CYPHER_TARGET_NODE_INSERT_ENTITY(cypher_node->flags))
continue;
// Open relation and acquire a row exclusive lock.
rel = table_open(cypher_node->relid, RowExclusiveLock);
// Initialize resultRelInfo for the vertex
cypher_node->resultRelInfo = makeNode(ResultRelInfo);
InitResultRelInfo(cypher_node->resultRelInfo, rel,
list_length(estate->es_range_table), NULL,
estate->es_instrument);
// Open all indexes for the relation
ExecOpenIndices(cypher_node->resultRelInfo, false);
// Setup the relation's tuple slot
cypher_node->elemTupleSlot = table_slot_create(
rel, &estate->es_tupleTable);
if (cypher_node->id_expr != NULL)
{
cypher_node->id_expr_state =
ExecInitExpr(cypher_node->id_expr, (PlanState *)node);
}
if (cypher_node->prop_expr != NULL)
{
cypher_node->prop_expr_state = ExecInitExpr(cypher_node->prop_expr,
(PlanState *)node);
}
}
}
/*
* Postgres does not assign the es_output_cid in queries that do
* not write to disk, ie: SELECT commands. We need the command id
* for our clauses, and we may need to initialize it. We cannot use
* GetCurrentCommandId because there may be other cypher clauses
* that have modified the command id.
*/
if (estate->es_output_cid == 0)
{
estate->es_output_cid = estate->es_snapshot->curcid;
}
Increment_Estate_CommandId(estate);
}
/*
* CREATE the vertices and edges for a CREATE clause pattern.
*/
static void process_pattern(cypher_create_custom_scan_state *css)
{
ListCell *lc2;
foreach (lc2, css->pattern)
{
cypher_create_path *path = lfirst(lc2);
List *list = path->target_nodes;
ListCell *lc = list_head(list);
/*
* Create the first vertex. The create_vertex function will
* create the rest of the path, if necessary.
*/
create_vertex(css, lfirst(lc), lnext(list, lc), list);
/*
* If this path is a variable, take the list that was accumulated
* in the vertex/edge creation, create a path datum, and add to the
* scantuple slot.
*/
if (path->path_attr_num != InvalidAttrNumber)
{
TupleTableSlot *scantuple;
PlanState *ps;
Datum result;
ps = css->css.ss.ps.lefttree;
scantuple = ps->ps_ExprContext->ecxt_scantuple;
result = make_path(css->path_values);
scantuple->tts_values[path->path_attr_num - 1] = result;
scantuple->tts_isnull[path->path_attr_num - 1] = false;
}
css->path_values = NIL;
}
}
static TupleTableSlot *exec_cypher_create(CustomScanState *node)
{
cypher_create_custom_scan_state *css =
(cypher_create_custom_scan_state *)node;
EState *estate = css->css.ss.ps.state;
ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
TupleTableSlot *slot;
bool terminal = CYPHER_CLAUSE_IS_TERMINAL(css->flags);
bool used = false;
/*
* If the CREATE clause was the final cypher clause written then we aren't
* returning anything from this result node. So the exec_cypher_create
* function will only be called once. Therefore we will process all tuples
* from the subtree at once.
*/
do
{
/*Process the subtree first */
Decrement_Estate_CommandId(estate)
slot = ExecProcNode(node->ss.ps.lefttree);
Increment_Estate_CommandId(estate)
/* break when there are no tuples */
if (TupIsNull(slot))
{
break;
}
/* setup the scantuple that the process_pattern needs */
econtext->ecxt_scantuple =
node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple;
process_pattern(css);
/*
* This may not be necessary. If we have an empty pattern, nothing was
* inserted and the current command Id was not used. So, only flag it
* if there is a non empty pattern.
*/
if (list_length(css->pattern) > 0)
{
/* the current command Id has been used */
used = true;
}
} while (terminal);
/*
* If the current command Id wasn't used, nothing was inserted and we're
* done.
*/
if (!used)
{
return NULL;
}
/* update the current command Id */
CommandCounterIncrement();
/* if this was a terminal CREATE just return NULL */
if (terminal)
{
return NULL;
}
econtext->ecxt_scantuple = ExecProject(node->ss.ps.lefttree->ps_ProjInfo);
return ExecProject(node->ss.ps.ps_ProjInfo);
}
static void end_cypher_create(CustomScanState *node)
{
cypher_create_custom_scan_state *css =
(cypher_create_custom_scan_state *)node;
ListCell *lc;
// increment the command counter
CommandCounterIncrement();
ExecEndNode(node->ss.ps.lefttree);
foreach (lc, css->pattern)
{
cypher_create_path *path = lfirst(lc);
ListCell *lc2;
foreach (lc2, path->target_nodes)
{
cypher_target_node *cypher_node =
(cypher_target_node *)lfirst(lc2);
if (!CYPHER_TARGET_NODE_INSERT_ENTITY(cypher_node->flags))
{
continue;
}
// close all indices for the node
ExecCloseIndices(cypher_node->resultRelInfo);
// close the relation itself
table_close(cypher_node->resultRelInfo->ri_RelationDesc,
RowExclusiveLock);
}
}
}
static void rescan_cypher_create(CustomScanState *node)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cypher create clause cannot be rescanned"),
errhint("its unsafe to use joins in a query with a Cypher CREATE clause")));
}
Node *create_cypher_create_plan_state(CustomScan *cscan)
{
cypher_create_custom_scan_state *cypher_css =
palloc0(sizeof(cypher_create_custom_scan_state));
cypher_create_target_nodes *target_nodes;
char *serialized_data;
Const *c;
cypher_css->cs = cscan;
// get the serialized data structure from the Const and deserialize it.
c = linitial(cscan->custom_private);
serialized_data = (char *)c->constvalue;
target_nodes = stringToNode(serialized_data);
Assert(is_ag_node(target_nodes, cypher_create_target_nodes));
cypher_css->path_values = NIL;
cypher_css->pattern = target_nodes->paths;
cypher_css->flags = target_nodes->flags;
cypher_css->graph_oid = target_nodes->graph_oid;
cypher_css->css.ss.ps.type = T_CustomScanState;
cypher_css->css.methods = &cypher_create_exec_methods;
return (Node *)cypher_css;
}
/*
* Create the edge entity.
*/
static void create_edge(cypher_create_custom_scan_state *css,
cypher_target_node *node, Datum prev_vertex_id,
ListCell *next, List *list)
{
bool isNull;
EState *estate = css->css.ss.ps.state;
ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
ResultRelInfo *resultRelInfo = node->resultRelInfo;
ResultRelInfo **old_estate_es_result_relations = NULL;
TupleTableSlot *elemTupleSlot = node->elemTupleSlot;
TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple;
Datum id;
Datum start_id, end_id, next_vertex_id;
List *prev_path = css->path_values;
Assert(node->type == LABEL_KIND_EDGE);
Assert(lfirst(next) != NULL);
/*
* Create the next vertex before creating the edge. We need the
* next vertex's id.
*/
css->path_values = NIL;
next_vertex_id = create_vertex(css, lfirst(next), lnext(list, next), list);
/*
* Set the start and end vertex ids
*/
if (node->dir == CYPHER_REL_DIR_RIGHT)
{
// create pattern (prev_vertex)-[edge]->(next_vertex)
start_id = prev_vertex_id;
end_id = next_vertex_id;
}
else if (node->dir == CYPHER_REL_DIR_LEFT)
{
// create pattern (prev_vertex)<-[edge]-(next_vertex)
start_id = next_vertex_id;
end_id = prev_vertex_id;
}
else
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("edge direction must be specified in a CREATE clause")));
}
/*
* Set estate's result relation to the vertex's result
* relation.
*
* Note: This obliterates what was their previously
*/
/* save the old result relation info */
old_estate_es_result_relations = estate->es_result_relations;
estate->es_result_relations = &resultRelInfo;
ExecClearTuple(elemTupleSlot);
// Graph Id for the edge
id = ExecEvalExpr(node->id_expr_state, econtext, &isNull);
elemTupleSlot->tts_values[edge_tuple_id] = id;
elemTupleSlot->tts_isnull[edge_tuple_id] = isNull;
// Graph id for the starting vertex
elemTupleSlot->tts_values[edge_tuple_start_id] = start_id;
elemTupleSlot->tts_isnull[edge_tuple_start_id] = false;
// Graph id for the ending vertex
elemTupleSlot->tts_values[edge_tuple_end_id] = end_id;
elemTupleSlot->tts_isnull[edge_tuple_end_id] = false;
// Edge's properties map
elemTupleSlot->tts_values[edge_tuple_properties] =
scanTupleSlot->tts_values[node->prop_attr_num];
elemTupleSlot->tts_isnull[edge_tuple_properties] =
scanTupleSlot->tts_isnull[node->prop_attr_num];
// Insert the new edge
insert_entity_tuple(resultRelInfo, elemTupleSlot, estate);
/* restore the old result relation info */
estate->es_result_relations = old_estate_es_result_relations;
/*
* When the edge is used by clauses higher in the execution tree
* we need to create an edge datum. When the edge is a variable,
* add to the scantuple slot. When the edge is part of a path
* variable, add to the list.
*/
if (CYPHER_TARGET_NODE_OUTPUT(node->flags))
{
PlanState *ps = css->css.ss.ps.lefttree;
TupleTableSlot *scantuple = ps->ps_ExprContext->ecxt_scantuple;
Datum result;
result = make_edge(
id, start_id, end_id, CStringGetDatum(node->label_name),
PointerGetDatum(scanTupleSlot->tts_values[node->prop_attr_num]));
if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
{
prev_path = lappend(prev_path, DatumGetPointer(result));
css->path_values = list_concat(prev_path, css->path_values);
}
if (CYPHER_TARGET_NODE_IS_VARIABLE(node->flags))
{
scantuple->tts_values[node->tuple_position - 1] = result;
scantuple->tts_isnull[node->tuple_position - 1] = false;
}
}
}
/*
* Creates the vertex entity, returns the vertex's id in case the caller is
* the create_edge function.
*/
static Datum create_vertex(cypher_create_custom_scan_state *css,
cypher_target_node *node, ListCell *next, List *list)
{
bool isNull;
Datum id;
EState *estate = css->css.ss.ps.state;
ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
ResultRelInfo *resultRelInfo = node->resultRelInfo;
TupleTableSlot *elemTupleSlot = node->elemTupleSlot;
TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple;
Assert(node->type == LABEL_KIND_VERTEX);
/*
* Vertices in a path might already exists. If they do get the id
* to pass to the edges before and after it. Otherwise, insert the
* new vertex into it's table and then pass the id along.
*/
if (CYPHER_TARGET_NODE_INSERT_ENTITY(node->flags))
{
ResultRelInfo **old_estate_es_result_relations = NULL;
/*
* Set estate's result relation to the vertex's result
* relation.
*
* Note: This obliterates what was their previously
*/
/* save the old result relation info */
old_estate_es_result_relations = estate->es_result_relations;
estate->es_result_relations = &resultRelInfo;
ExecClearTuple(elemTupleSlot);
// get the next graphid for this vertex.
id = ExecEvalExpr(node->id_expr_state, econtext, &isNull);
elemTupleSlot->tts_values[vertex_tuple_id] = id;
elemTupleSlot->tts_isnull[vertex_tuple_id] = isNull;
// get the properties for this vertex
elemTupleSlot->tts_values[vertex_tuple_properties] =
scanTupleSlot->tts_values[node->prop_attr_num];
elemTupleSlot->tts_isnull[vertex_tuple_properties] =
scanTupleSlot->tts_isnull[node->prop_attr_num];
// Insert the new vertex
insert_entity_tuple(resultRelInfo, elemTupleSlot, estate);
/* restore the old result relation info */
estate->es_result_relations = old_estate_es_result_relations;
/*
* When the vertex is used by clauses higher in the execution tree
* we need to create a vertex datum. When the vertex is a variable,
* add to the scantuple slot. When the vertex is part of a path
* variable, add to the list.
*/
if (CYPHER_TARGET_NODE_OUTPUT(node->flags))
{
TupleTableSlot *scantuple;
PlanState *ps;
Datum result;
ps = css->css.ss.ps.lefttree;
scantuple = ps->ps_ExprContext->ecxt_scantuple;
// make the vertex agtype
result = make_vertex(id, CStringGetDatum(node->label_name),
PointerGetDatum(scanTupleSlot->tts_values[node->prop_attr_num]));
// append to the path list
if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
{
css->path_values = lappend(css->path_values,
DatumGetPointer(result));
}
/*
* Put the vertex in the correct spot in the scantuple, so parent
* execution nodes can reference the newly created variable.
*/
if (CYPHER_TARGET_NODE_IS_VARIABLE(node->flags))
{
scantuple->tts_values[node->tuple_position - 1] = result;
scantuple->tts_isnull[node->tuple_position - 1] = false;
}
}
}
else
{
agtype *a;
agtype_value *v;
agtype_value *id_value;
TupleTableSlot *scantuple;
PlanState *ps;
ps = css->css.ss.ps.lefttree;
scantuple = ps->ps_ExprContext->ecxt_scantuple;
// get the vertex agtype in the scanTupleSlot
a = DATUM_GET_AGTYPE_P(scantuple->tts_values[node->tuple_position - 1]);
// Convert to an agtype value
v = get_ith_agtype_value_from_container(&a->root, 0);
if (v->type != AGTV_VERTEX)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("agtype must resolve to a vertex")));
}
// extract the id agtype field
id_value = GET_AGTYPE_VALUE_OBJECT_VALUE(v, "id");
// extract the graphid and cast to a Datum
id = GRAPHID_GET_DATUM(id_value->val.int_value);
/*
* Its possible the variable has already been deleted. There are two
* ways this can happen. One is the query explicitly deleted the
* variable, the is_deleted flag will catch that. However, it is
* possible the user deleted the vertex using another variable name. We
* need to scan the table to find the vertex's current status relative
* to this CREATE clause. If the variable was initially created in this
* clause, we can skip this check, because the transaction system
* guarantees that nothing can happen to that tuple, as far as we are
* concerned with at this time.
*/
if (!SAFE_TO_SKIP_EXISTENCE_CHECK(node->flags))
{
if (!entity_exists(estate, css->graph_oid, DATUM_GET_GRAPHID(id)))
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("vertex assigned to variable %s was deleted",
node->variable_name)));
}
}
if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
{
Datum vertex = scanTupleSlot->tts_values[node->tuple_position - 1];
css->path_values = lappend(css->path_values,
DatumGetPointer(vertex));
}
}
// If the path continues, create the next edge, passing the vertex's id.
if (next != NULL)
{
create_edge(css, lfirst(next), id, lnext(list, next), list);
}
return id;
}