| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/heapam.h" |
| #include "access/htup_details.h" |
| #include "access/multixact.h" |
| #include "access/table.h" |
| #include "access/xact.h" |
| #include "executor/tuptable.h" |
| #include "nodes/execnodes.h" |
| #include "nodes/extensible.h" |
| #include "nodes/nodes.h" |
| #include "nodes/plannodes.h" |
| #include "parser/parsetree.h" |
| #include "storage/bufmgr.h" |
| #include "utils/rel.h" |
| #include "common/hashfn.h" |
| |
| #include "catalog/ag_label.h" |
| #include "executor/cypher_executor.h" |
| #include "executor/cypher_utils.h" |
| #include "nodes/cypher_nodes.h" |
| #include "utils/agtype.h" |
| #include "utils/graphid.h" |
| |
| static void begin_cypher_delete(CustomScanState *node, EState *estate, |
| int eflags); |
| static TupleTableSlot *exec_cypher_delete(CustomScanState *node); |
| static void end_cypher_delete(CustomScanState *node); |
| static void rescan_cypher_delete(CustomScanState *node); |
| |
| static void process_delete_list(CustomScanState *node); |
| |
| static void check_for_connected_edges(CustomScanState *node); |
| static agtype_value *extract_entity(CustomScanState *node, |
| TupleTableSlot *scanTupleSlot, |
| int entity_position); |
| static void delete_entity(EState *estate, ResultRelInfo *resultRelInfo, |
| HeapTuple tuple); |
| |
| const CustomExecMethods cypher_delete_exec_methods = {DELETE_SCAN_STATE_NAME, |
| begin_cypher_delete, |
| exec_cypher_delete, |
| end_cypher_delete, |
| rescan_cypher_delete, |
| NULL, |
| NULL, |
| NULL, |
| NULL, |
| NULL, |
| NULL, |
| NULL, |
| NULL}; |
| |
| /* |
| * Initialization at the beginning of execution. Setup the child node, |
| * setup its scan tuple slot and projection info, expression context, |
| * collect metadata about visible edges, and alter the commandid for |
| * the transaction. |
| */ |
| static void begin_cypher_delete(CustomScanState *node, EState *estate, |
| int eflags) |
| { |
| cypher_delete_custom_scan_state *css = |
| (cypher_delete_custom_scan_state *)node; |
| Plan *subplan; |
| HASHCTL hashctl; |
| |
| Assert(list_length(css->cs->custom_plans) == 1); |
| |
| // setup child |
| subplan = linitial(css->cs->custom_plans); |
| node->ss.ps.lefttree = ExecInitNode(subplan, estate, eflags); |
| |
| // setup expr context |
| ExecAssignExprContext(estate, &node->ss.ps); |
| |
| // setup scan tuple slot and projection info |
| ExecInitScanTupleSlot(estate, &node->ss, |
| ExecGetResultType(node->ss.ps.lefttree), |
| &TTSOpsHeapTuple); |
| |
| if (!CYPHER_CLAUSE_IS_TERMINAL(css->flags)) |
| { |
| TupleDesc tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; |
| |
| ExecAssignProjectionInfo(&node->ss.ps, tupdesc); |
| } |
| |
| /* |
| * Get all the labels that are visible to this delete clause at this point |
| * in the transaction. To be used later when the delete clause finds |
| * vertices. |
| */ |
| css->edge_labels = get_all_edge_labels_per_graph(estate, css->delete_data->graph_oid); |
| |
| /* init vertex_id_htab */ |
| MemSet(&hashctl, 0, sizeof(hashctl)); |
| hashctl.keysize = sizeof(graphid); |
| hashctl.entrysize = |
| sizeof(graphid); // entries are not used, but entrysize must >= keysize |
| hashctl.hash = tag_hash; |
| css->vertex_id_htab = hash_create(DELETE_VERTEX_HTAB_NAME, |
| DELETE_VERTEX_HTAB_SIZE, &hashctl, |
| HASH_ELEM | HASH_FUNCTION); |
| |
| /* |
| * Postgres does not assign the es_output_cid in queries that do |
| * not write to disk, ie: SELECT commands. We need the command id |
| * for our clauses, and we may need to initialize it. We cannot use |
| * GetCurrentCommandId because there may be other cypher clauses |
| * that have modified the command id. |
| */ |
| if (estate->es_output_cid == 0) |
| estate->es_output_cid = estate->es_snapshot->curcid; |
| |
| Increment_Estate_CommandId(estate); |
| } |
| |
| /* |
| * Called once per tuple. If this is a terminal DELETE clause |
| * process everyone of its child tuple, otherwise process the |
| * next tuple. |
| */ |
| static TupleTableSlot *exec_cypher_delete(CustomScanState *node) |
| { |
| cypher_delete_custom_scan_state *css = |
| (cypher_delete_custom_scan_state *)node; |
| EState *estate = css->css.ss.ps.state; |
| ExprContext *econtext = css->css.ss.ps.ps_ExprContext; |
| TupleTableSlot *slot; |
| |
| if (CYPHER_CLAUSE_IS_TERMINAL(css->flags)) |
| { |
| /* |
| * If the DELETE clause was the final cypher clause written |
| * then we aren't returning anything from this result node. |
| * So the exec_cypher_delete function will only be called once. |
| * Therefore we will process all tuples from the subtree at once. |
| */ |
| while(true) |
| { |
| //Process the subtree first |
| Decrement_Estate_CommandId(estate) |
| slot = ExecProcNode(node->ss.ps.lefttree); |
| Increment_Estate_CommandId(estate) |
| |
| if (TupIsNull(slot)) |
| break; |
| |
| // setup the scantuple that the process_delete_list needs |
| econtext->ecxt_scantuple = |
| node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple; |
| |
| process_delete_list(node); |
| } |
| |
| return NULL; |
| } |
| else |
| { |
| //Process the subtree first |
| Decrement_Estate_CommandId(estate) |
| slot = ExecProcNode(node->ss.ps.lefttree); |
| Increment_Estate_CommandId(estate) |
| |
| if (TupIsNull(slot)) |
| return NULL; |
| |
| // setup the scantuple that the process_delete_list needs |
| econtext->ecxt_scantuple = |
| node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple; |
| |
| process_delete_list(node); |
| |
| econtext->ecxt_scantuple = |
| ExecProject(node->ss.ps.lefttree->ps_ProjInfo); |
| |
| return ExecProject(node->ss.ps.ps_ProjInfo); |
| } |
| } |
| |
| /* |
| * Called at the end of execution. Tell its child to |
| * end its execution. |
| */ |
| static void end_cypher_delete(CustomScanState *node) |
| { |
| check_for_connected_edges(node); |
| |
| hash_destroy(((cypher_delete_custom_scan_state *)node)->vertex_id_htab); |
| |
| ExecEndNode(node->ss.ps.lefttree); |
| } |
| |
| /* |
| * Used for rewinding the scan state and reprocessing the results. |
| * |
| * XXX: This is not currently supported. We need to find out |
| * when this function will be called and determine a process |
| * for allowing the Delete clause to run multiple times without |
| * redundant edits to the database. |
| */ |
| static void rescan_cypher_delete(CustomScanState *node) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cypher DELETE clause cannot be rescanned"), |
| errhint("its unsafe to use joins in a query with a Cypher DELETE clause"))); |
| } |
| |
| /* |
| * Create the CustomScanState from the CustomScan and pass |
| * necessary metadata. |
| */ |
| Node *create_cypher_delete_plan_state(CustomScan *cscan) |
| { |
| cypher_delete_custom_scan_state *cypher_css = |
| palloc0(sizeof(cypher_delete_custom_scan_state)); |
| cypher_delete_information *delete_data; |
| char *serialized_data; |
| Const *c; |
| |
| cypher_css->cs = cscan; |
| |
| // get the serialized data structure from the Const and deserialize it. |
| c = linitial(cscan->custom_private); |
| serialized_data = (char *)c->constvalue; |
| delete_data = stringToNode(serialized_data); |
| |
| Assert(is_ag_node(delete_data, cypher_delete_information)); |
| |
| cypher_css->delete_data = delete_data; |
| cypher_css->flags = delete_data->flags; |
| |
| cypher_css->css.ss.ps.type = T_CustomScanState; |
| cypher_css->css.methods = &cypher_delete_exec_methods; |
| |
| return (Node *)cypher_css; |
| } |
| |
| /* |
| * Extract the vertex or edge to be deleted, perform some type checking to |
| * validate datum is an agtype vertex or edge. |
| */ |
| static agtype_value *extract_entity(CustomScanState *node, |
| TupleTableSlot *scanTupleSlot, |
| int entity_position) |
| { |
| agtype_value *original_entity_value; |
| agtype *original_entity; |
| TupleDesc tupleDescriptor; |
| |
| tupleDescriptor = scanTupleSlot->tts_tupleDescriptor; |
| |
| // type checking, make sure the entity is an agtype vertex or edge |
| if (tupleDescriptor->attrs[entity_position -1].atttypid != AGTYPEOID) |
| ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("DELETE clause can only delete agtype"))); |
| |
| original_entity = DATUM_GET_AGTYPE_P(scanTupleSlot->tts_values[entity_position - 1]); |
| original_entity_value = get_ith_agtype_value_from_container(&original_entity->root, 0); |
| |
| if (original_entity_value->type != AGTV_VERTEX && original_entity_value->type != AGTV_EDGE) |
| ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("DELETE clause can only delete vertices and edges"))); |
| |
| return original_entity_value; |
| } |
| |
| /* |
| * Try and delete the entity that is describe by the HeapTuple in the table |
| * described by the resultRelInfo. |
| */ |
| static void delete_entity(EState *estate, ResultRelInfo *resultRelInfo, |
| HeapTuple tuple) |
| { |
| ResultRelInfo **saved_resultRels; |
| LockTupleMode lockmode; |
| TM_FailureData hufd; |
| TM_Result lock_result; |
| TM_Result delete_result; |
| Buffer buffer; |
| |
| // Find the physical tuple, this variable is coming from |
| saved_resultRels = estate->es_result_relations; |
| estate->es_result_relations = &resultRelInfo; |
| |
| lockmode = ExecUpdateLockMode(estate, resultRelInfo); |
| |
| lock_result = heap_lock_tuple(resultRelInfo->ri_RelationDesc, tuple, |
| GetCurrentCommandId(false), lockmode, |
| LockWaitBlock, false, &buffer, &hufd); |
| |
| /* |
| * It is possible the entity may have already been deleted. If the tuple |
| * can be deleted, the lock result will be HeapTupleMayBeUpdated. If the |
| * tuple was already deleted by this DELETE clause, the result would be |
| * TM_SelfModified, if the result was deleted by a previous delete |
| * clause, the result will TM_Invisible. Throw an error if any |
| * other result was returned. |
| */ |
| if (lock_result == TM_Ok) |
| { |
| delete_result = heap_delete(resultRelInfo->ri_RelationDesc, |
| &tuple->t_self, GetCurrentCommandId(true), |
| estate->es_crosscheck_snapshot, true, &hufd, |
| false); |
| |
| /* |
| * Unlike locking, the heap_delete either succeeded |
| * HeapTupleMayBeUpdate, or it failed and returned any other result. |
| */ |
| switch (delete_result) |
| { |
| case TM_Ok: |
| break; |
| case TM_SelfModified: |
| ereport( |
| ERROR, |
| (errcode(ERRCODE_INTERNAL_ERROR), |
| errmsg( |
| "deleting the same entity more than once cannot happen"))); |
| /* ereport never gets here */ |
| break; |
| case TM_Updated: |
| ereport( |
| ERROR, |
| (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), |
| errmsg("could not serialize access due to concurrent update"))); |
| /* ereport never gets here */ |
| break; |
| default: |
| elog(ERROR, "Entity failed to be update"); |
| /* elog never gets here */ |
| break; |
| } |
| /* increment the command counter */ |
| CommandCounterIncrement(); |
| } |
| else if (lock_result != TM_Invisible && lock_result != TM_SelfModified) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INTERNAL_ERROR), |
| errmsg("Entity could not be locked for updating"))); |
| |
| } |
| |
| ReleaseBuffer(buffer); |
| |
| estate->es_result_relations = saved_resultRels; |
| } |
| |
| /* |
| * After the delete's subtress has been processed, we then go through the list |
| * of variables to be deleted. |
| */ |
| static void process_delete_list(CustomScanState *node) |
| { |
| cypher_delete_custom_scan_state *css = |
| (cypher_delete_custom_scan_state *)node; |
| ListCell *lc; |
| ExprContext *econtext = css->css.ss.ps.ps_ExprContext; |
| TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple; |
| EState *estate = node->ss.ps.state; |
| |
| foreach(lc, css->delete_data->delete_items) |
| { |
| cypher_delete_item *item; |
| agtype_value *original_entity_value, *id, *label; |
| ScanKeyData scan_keys[1]; |
| TableScanDesc scan_desc; |
| ResultRelInfo *resultRelInfo; |
| HeapTuple heap_tuple; |
| char *label_name; |
| Integer *pos; |
| int entity_position; |
| |
| item = lfirst(lc); |
| |
| pos = item->entity_position; |
| entity_position = pos->ival; |
| |
| /* skip if the entity is null */ |
| if (scanTupleSlot->tts_isnull[entity_position - 1]) |
| continue; |
| |
| original_entity_value = extract_entity(node, scanTupleSlot, |
| entity_position); |
| |
| id = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "id"); |
| label = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "label"); |
| label_name = pnstrdup(label->val.string.val, label->val.string.len); |
| |
| resultRelInfo = create_entity_result_rel_info(estate, css->delete_data->graph_name, label_name); |
| |
| /* |
| * Setup the scan key to require the id field on-disc to match the |
| * entity's graphid. |
| */ |
| if (original_entity_value->type == AGTV_VERTEX) |
| { |
| ScanKeyInit(&scan_keys[0], Anum_ag_label_vertex_table_id, |
| BTEqualStrategyNumber, F_GRAPHIDEQ, |
| GRAPHID_GET_DATUM(id->val.int_value)); |
| } |
| else if (original_entity_value->type == AGTV_EDGE) |
| { |
| ScanKeyInit(&scan_keys[0], Anum_ag_label_edge_table_id, |
| BTEqualStrategyNumber, F_GRAPHIDEQ, |
| GRAPHID_GET_DATUM(id->val.int_value)); |
| } |
| else |
| { |
| ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("DELETE clause can only delete vertices and edges"))); |
| } |
| |
| /* |
| * Setup the scan description, with the correct snapshot and scan keys. |
| */ |
| estate->es_snapshot->curcid = GetCurrentCommandId(false); |
| estate->es_output_cid = GetCurrentCommandId(false); |
| scan_desc = table_beginscan(resultRelInfo->ri_RelationDesc, |
| estate->es_snapshot, 1, scan_keys); |
| |
| /* Retrieve the tuple. */ |
| heap_tuple = heap_getnext(scan_desc, ForwardScanDirection); |
| |
| /* |
| * If the heap tuple still exists (It wasn't deleted after this variable |
| * was created) we can delete it. Otherwise, its safe to skip this |
| * delete. |
| */ |
| if (!HeapTupleIsValid(heap_tuple)) |
| { |
| table_endscan(scan_desc); |
| destroy_entity_result_rel_info(resultRelInfo); |
| |
| continue; |
| } |
| |
| /* |
| * For vertices, we insert the vertex ID in the hashtable |
| * vertex_id_htab. This hashtable is used later to process |
| * connected edges. |
| */ |
| if (original_entity_value->type == AGTV_VERTEX) |
| { |
| bool found; |
| hash_search(css->vertex_id_htab, (void *)&(id->val.int_value), |
| HASH_ENTER, &found); |
| } |
| |
| /* At this point, we are ready to delete the node/vertex. */ |
| delete_entity(estate, resultRelInfo, heap_tuple); |
| |
| /* Close the scan and the relation. */ |
| table_endscan(scan_desc); |
| destroy_entity_result_rel_info(resultRelInfo); |
| } |
| } |
| |
| /* |
| * Scans the edge tables and checks if the deleted vertices are connected to |
| * any edge(s). For DETACH DELETE, the connected edges are deleted. Otherwise, |
| * an error is thrown. |
| */ |
| static void check_for_connected_edges(CustomScanState *node) |
| { |
| ListCell *lc; |
| cypher_delete_custom_scan_state *css = |
| (cypher_delete_custom_scan_state *)node; |
| EState *estate = css->css.ss.ps.state; |
| char *graph_name = css->delete_data->graph_name; |
| |
| /* scans each label from css->edge_labels */ |
| foreach (lc, css->edge_labels) |
| { |
| char *label_name = lfirst(lc); |
| ResultRelInfo *resultRelInfo; |
| TableScanDesc scan_desc; |
| HeapTuple tuple; |
| TupleTableSlot *slot; |
| |
| resultRelInfo = create_entity_result_rel_info(estate, graph_name, |
| label_name); |
| estate->es_snapshot->curcid = GetCurrentCommandId(false); |
| estate->es_output_cid = GetCurrentCommandId(false); |
| scan_desc = table_beginscan(resultRelInfo->ri_RelationDesc, |
| estate->es_snapshot, 0, NULL); |
| slot = ExecInitExtraTupleSlot( |
| estate, RelationGetDescr(resultRelInfo->ri_RelationDesc), |
| &TTSOpsHeapTuple); |
| |
| /* for each row */ |
| while (true) |
| { |
| graphid startid; |
| graphid endid; |
| bool isNull; |
| bool found_startid = false; |
| bool found_endid = false; |
| |
| tuple = heap_getnext(scan_desc, ForwardScanDirection); |
| |
| /* no more tuples to process, break and scan the next label. */ |
| if (!HeapTupleIsValid(tuple)) |
| { |
| break; |
| } |
| |
| ExecStoreHeapTuple(tuple, slot, false); |
| |
| startid = GRAPHID_GET_DATUM(slot_getattr( |
| slot, Anum_ag_label_edge_table_start_id, &isNull)); |
| endid = GRAPHID_GET_DATUM( |
| slot_getattr(slot, Anum_ag_label_edge_table_end_id, &isNull)); |
| |
| hash_search(css->vertex_id_htab, (void *)&startid, HASH_FIND, |
| &found_startid); |
| |
| if (!found_startid) |
| { |
| hash_search(css->vertex_id_htab, (void *)&endid, HASH_FIND, |
| &found_endid); |
| } |
| |
| if (found_startid || found_endid) |
| { |
| if (css->delete_data->detach) |
| { |
| delete_entity(estate, resultRelInfo, tuple); |
| } |
| else |
| { |
| ereport( |
| ERROR, |
| (errcode(ERRCODE_INTERNAL_ERROR), |
| errmsg( |
| "Cannot delete a vertex that has edge(s). " |
| "Delete the edge(s) first, or try DETACH DELETE."))); |
| } |
| } |
| } |
| |
| table_endscan(scan_desc); |
| destroy_entity_result_rel_info(resultRelInfo); |
| } |
| } |