| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/heapam.h" |
| #include "access/table.h" |
| #include "access/tableam.h" |
| #include "access/xact.h" |
| #include "catalog/indexing.h" |
| #include "catalog/pg_authid.h" |
| #include "executor/executor.h" |
| #include "miscadmin.h" |
| #include "nodes/parsenodes.h" |
| #include "parser/parse_relation.h" |
| #include "utils/acl.h" |
| #include "utils/json.h" |
| #include "utils/rel.h" |
| #include "utils/rls.h" |
| |
| #include "utils/load/ag_load_edges.h" |
| #include "utils/load/ag_load_labels.h" |
| #include "utils/load/age_load.h" |
| |
| static agtype_value *csv_value_to_agtype_value(char *csv_val); |
| static Oid get_or_create_graph(const Name graph_name); |
| static int32 get_or_create_label(Oid graph_oid, char *graph_name, |
| char *label_name, char label_kind); |
| static char *build_safe_filename(char *name); |
| static void check_file_read_permission(void); |
| static void check_table_permissions(Oid relid); |
| static void check_rls_for_load(Oid relid); |
| |
| #define AGE_BASE_CSV_DIRECTORY "/tmp/age/" |
| #define AGE_CSV_FILE_EXTENSION ".csv" |
| |
| /* |
| * Trim leading and trailing whitespace from a string. |
| * Returns a newly allocated string with whitespace removed. |
| * Returns empty string for NULL input. |
| */ |
| char *trim_whitespace(const char *str) |
| { |
| const char *start; |
| const char *end; |
| size_t len; |
| |
| if (str == NULL) |
| { |
| return pstrdup(""); |
| } |
| |
| /* Find first non-whitespace character */ |
| start = str; |
| while (*start && (*start == ' ' || *start == '\t' || |
| *start == '\n' || *start == '\r')) |
| { |
| start++; |
| } |
| |
| /* If string is all whitespace, return empty string */ |
| if (*start == '\0') |
| { |
| return pstrdup(""); |
| } |
| |
| /* Find last non-whitespace character */ |
| end = str + strlen(str) - 1; |
| while (end > start && (*end == ' ' || *end == '\t' || |
| *end == '\n' || *end == '\r')) |
| { |
| end--; |
| } |
| |
| /* Copy the trimmed string */ |
| len = end - start + 1; |
| return pnstrdup(start, len); |
| } |
| |
| static char *build_safe_filename(char *name) |
| { |
| int length; |
| char path[PATH_MAX]; |
| char *resolved; |
| |
| if (name == NULL) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("file name cannot be NULL"))); |
| |
| } |
| |
| length = strlen(name); |
| |
| if (length == 0) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("file name cannot be zero length"))); |
| |
| } |
| |
| snprintf(path, sizeof(path), "%s%s", AGE_BASE_CSV_DIRECTORY, name); |
| |
| resolved = realpath(path, NULL); |
| |
| if (resolved == NULL) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("File or path does not exist [%s]", path))); |
| } |
| |
| if (strncmp(resolved, AGE_BASE_CSV_DIRECTORY, |
| strlen(AGE_BASE_CSV_DIRECTORY)) != 0) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("You can only load files located in [%s].", |
| AGE_BASE_CSV_DIRECTORY))); |
| } |
| |
| length = strlen(resolved) - 4; |
| if (strncmp(resolved+length, AGE_CSV_FILE_EXTENSION, |
| strlen(AGE_CSV_FILE_EXTENSION)) != 0) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("You can only load files with extension [%s].", |
| AGE_CSV_FILE_EXTENSION))); |
| } |
| |
| return resolved; |
| } |
| |
| /* |
| * Check if the current user has permission to read server files. |
| * Only users with the pg_read_server_files role can load from files. |
| */ |
| static void check_file_read_permission(void) |
| { |
| if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES)) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
| errmsg("permission denied to LOAD from a file"), |
| errdetail("Only roles with privileges of the \"%s\" role may LOAD from a file.", |
| "pg_read_server_files"))); |
| } |
| } |
| |
| /* |
| * Check if the current user has INSERT permission on the target table. |
| */ |
| static void check_table_permissions(Oid relid) |
| { |
| AclResult aclresult; |
| |
| aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_INSERT); |
| if (aclresult != ACLCHECK_OK) |
| { |
| aclcheck_error(aclresult, OBJECT_TABLE, get_rel_name(relid)); |
| } |
| } |
| |
| /* |
| * Check if RLS is enabled on the target table. |
| * CSV loading is not supported with row-level security. |
| */ |
| static void check_rls_for_load(Oid relid) |
| { |
| if (check_enable_rls(relid, InvalidOid, true) == RLS_ENABLED) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("LOAD from file is not supported with row-level security"), |
| errhint("Use Cypher CREATE clause instead."))); |
| } |
| } |
| |
| agtype *create_empty_agtype(void) |
| { |
| agtype* out; |
| agtype_in_state result; |
| |
| memset(&result, 0, sizeof(agtype_in_state)); |
| |
| result.res = push_agtype_value(&result.parse_state, WAGT_BEGIN_OBJECT, |
| NULL); |
| result.res = push_agtype_value(&result.parse_state, WAGT_END_OBJECT, NULL); |
| |
| out = agtype_value_to_agtype(result.res); |
| pfree_agtype_in_state(&result); |
| |
| return out; |
| } |
| |
| /* |
| * Converts the given csv value to an agtype_value. |
| * |
| * If csv_val is not a valid json, it is wrapped by double-quotes to make it a |
| * string value. Because agtype is jsonb-like, the token should be a valid |
| * json in order to be parsed into an agtype_value of appropriate type. |
| * Finally, agtype_value_from_cstring() is called for parsing. |
| */ |
| static agtype_value *csv_value_to_agtype_value(char *csv_val) |
| { |
| char *new_csv_val; |
| agtype_value *res; |
| |
| /* Handle NULL or empty input - return null agtype value */ |
| if (csv_val == NULL || csv_val[0] == '\0') |
| { |
| res = palloc(sizeof(agtype_value)); |
| res->type = AGTV_NULL; |
| return res; |
| } |
| |
| if (!json_validate(cstring_to_text(csv_val), false, false)) |
| { |
| /* wrap the string with double-quote */ |
| int oldlen; |
| int newlen; |
| |
| oldlen = strlen(csv_val); |
| newlen = oldlen + 2; /* +2 for double-quotes */ |
| new_csv_val = (char *)palloc(sizeof(char) * (newlen + 1)); |
| |
| new_csv_val[0] = '"'; |
| strncpy(&new_csv_val[1], csv_val, oldlen); |
| new_csv_val[oldlen + 1] = '"'; |
| new_csv_val[oldlen + 2] = '\0'; |
| } |
| else |
| { |
| new_csv_val = csv_val; |
| } |
| |
| res = agtype_value_from_cstring(new_csv_val, strlen(new_csv_val)); |
| |
| /* extract from top-level row scalar array */ |
| if (res->type == AGTV_ARRAY && res->val.array.raw_scalar) |
| { |
| res = &res->val.array.elems[0]; |
| } |
| |
| return res; |
| } |
| |
| agtype *create_agtype_from_list(char **header, char **fields, size_t fields_len, |
| int64 vertex_id, bool load_as_agtype) |
| { |
| agtype* out; |
| agtype_value* key_agtype; |
| agtype_value* value_agtype; |
| agtype_in_state result; |
| int i; |
| |
| memset(&result, 0, sizeof(agtype_in_state)); |
| |
| result.res = push_agtype_value(&result.parse_state, WAGT_BEGIN_OBJECT, |
| NULL); |
| |
| key_agtype = string_to_agtype_value("__id__"); |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_KEY, |
| key_agtype); |
| |
| value_agtype = integer_to_agtype_value(vertex_id); |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_VALUE, |
| value_agtype); |
| |
| for (i = 0; i<fields_len; i++) |
| { |
| char *trimmed_value; |
| |
| /* Skip empty header fields (e.g., from trailing commas) */ |
| if (header[i] == NULL || header[i][0] == '\0') |
| { |
| continue; |
| } |
| |
| key_agtype = string_to_agtype_value(header[i]); |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_KEY, |
| key_agtype); |
| |
| /* Trim whitespace from field value */ |
| trimmed_value = trim_whitespace(fields[i]); |
| |
| if (load_as_agtype) |
| { |
| value_agtype = csv_value_to_agtype_value(trimmed_value); |
| } |
| else |
| { |
| /* Handle empty field values */ |
| if (trimmed_value[0] == '\0') |
| { |
| value_agtype = palloc(sizeof(agtype_value)); |
| value_agtype->type = AGTV_STRING; |
| value_agtype->val.string.len = 0; |
| value_agtype->val.string.val = pstrdup(""); |
| } |
| else |
| { |
| value_agtype = string_to_agtype_value(trimmed_value); |
| } |
| } |
| |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_VALUE, |
| value_agtype); |
| } |
| |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_END_OBJECT, NULL); |
| |
| /* serialize it */ |
| out = agtype_value_to_agtype(result.res); |
| |
| /* now that it is serialized we can free the in memory structure */ |
| pfree_agtype_in_state(&result); |
| |
| return out; |
| } |
| |
| agtype* create_agtype_from_list_i(char **header, char **fields, |
| size_t fields_len, size_t start_index, |
| bool load_as_agtype) |
| { |
| agtype* out; |
| agtype_value* key_agtype; |
| agtype_value* value_agtype; |
| agtype_in_state result; |
| size_t i; |
| |
| if (start_index >= fields_len) |
| { |
| return create_empty_agtype(); |
| } |
| |
| memset(&result, 0, sizeof(agtype_in_state)); |
| |
| result.res = push_agtype_value(&result.parse_state, WAGT_BEGIN_OBJECT, |
| NULL); |
| |
| for (i = start_index; i < fields_len; i++) |
| { |
| char *trimmed_value; |
| |
| /* Skip empty header fields (e.g., from trailing commas) */ |
| if (header[i] == NULL || header[i][0] == '\0') |
| { |
| continue; |
| } |
| |
| key_agtype = string_to_agtype_value(header[i]); |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_KEY, |
| key_agtype); |
| |
| /* Trim whitespace from field value */ |
| trimmed_value = trim_whitespace(fields[i]); |
| |
| if (load_as_agtype) |
| { |
| value_agtype = csv_value_to_agtype_value(trimmed_value); |
| } |
| else |
| { |
| /* Handle empty field values */ |
| if (trimmed_value[0] == '\0') |
| { |
| value_agtype = palloc(sizeof(agtype_value)); |
| value_agtype->type = AGTV_STRING; |
| value_agtype->val.string.len = 0; |
| value_agtype->val.string.val = pstrdup(""); |
| } |
| else |
| { |
| value_agtype = string_to_agtype_value(trimmed_value); |
| } |
| } |
| |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_VALUE, |
| value_agtype); |
| } |
| |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_END_OBJECT, NULL); |
| |
| /* serialize it */ |
| out = agtype_value_to_agtype(result.res); |
| |
| /* now that it is serialized we can free the in memory structure */ |
| pfree_agtype_in_state(&result); |
| |
| return out; |
| } |
| |
| void insert_edge_simple(Oid graph_oid, char *label_name, graphid edge_id, |
| graphid start_id, graphid end_id, |
| agtype *edge_properties) |
| { |
| |
| Datum values[6]; |
| bool nulls[4] = {false, false, false, false}; |
| Relation label_relation; |
| HeapTuple tuple; |
| |
| /* Check if label provided exists as vertex label, then throw error */ |
| if (get_label_kind(label_name, graph_oid) == LABEL_KIND_VERTEX) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("label %s already exists as vertex label", label_name))); |
| } |
| |
| /* Open the relation */ |
| label_relation = table_open(get_label_relation(label_name, graph_oid), |
| RowExclusiveLock); |
| |
| /* Form the tuple */ |
| values[0] = GRAPHID_GET_DATUM(edge_id); |
| values[1] = GRAPHID_GET_DATUM(start_id); |
| values[2] = GRAPHID_GET_DATUM(end_id); |
| values[3] = AGTYPE_P_GET_DATUM((edge_properties)); |
| tuple = heap_form_tuple(RelationGetDescr(label_relation), |
| values, nulls); |
| |
| if (RelationGetForm(label_relation)->relhasindex) |
| { |
| /* |
| * CatalogTupleInsertWithInfo() is originally for PostgreSQL's |
| * catalog. However, it is used here for convenience. |
| */ |
| CatalogIndexState indstate = CatalogOpenIndexes(label_relation); |
| CatalogTupleInsertWithInfo(label_relation, tuple, indstate); |
| CatalogCloseIndexes(indstate); |
| } |
| else |
| { |
| heap_insert(label_relation, tuple, GetCurrentCommandId(true), |
| 0, NULL); |
| } |
| |
| /* Close the relation */ |
| table_close(label_relation, RowExclusiveLock); |
| CommandCounterIncrement(); |
| } |
| |
| void insert_vertex_simple(Oid graph_oid, char *label_name, graphid vertex_id, |
| agtype *vertex_properties) |
| { |
| |
| Datum values[2]; |
| bool nulls[2] = {false, false}; |
| Relation label_relation; |
| HeapTuple tuple; |
| |
| /* Check if label provided exists as edge label, then throw error */ |
| if (get_label_kind(label_name, graph_oid) == LABEL_KIND_EDGE) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("label %s already exists as edge label", |
| label_name))); |
| } |
| |
| /* Open the relation */ |
| label_relation = table_open(get_label_relation(label_name, graph_oid), |
| RowExclusiveLock); |
| |
| /* Form the tuple */ |
| values[0] = GRAPHID_GET_DATUM(vertex_id); |
| values[1] = AGTYPE_P_GET_DATUM((vertex_properties)); |
| tuple = heap_form_tuple(RelationGetDescr(label_relation), |
| values, nulls); |
| |
| if (RelationGetForm(label_relation)->relhasindex) |
| { |
| /* |
| * CatalogTupleInsertWithInfo() is originally for PostgreSQL's |
| * catalog. However, it is used here for convenience. |
| */ |
| CatalogIndexState indstate = CatalogOpenIndexes(label_relation); |
| CatalogTupleInsertWithInfo(label_relation, tuple, indstate); |
| CatalogCloseIndexes(indstate); |
| } |
| else |
| { |
| heap_insert(label_relation, tuple, GetCurrentCommandId(true), |
| 0, NULL); |
| } |
| |
| /* Close the relation */ |
| table_close(label_relation, RowExclusiveLock); |
| CommandCounterIncrement(); |
| } |
| |
| void insert_batch(batch_insert_state *batch_state) |
| { |
| List *result; |
| int i; |
| |
| /* Check constraints for each tuple before inserting */ |
| if (batch_state->resultRelInfo->ri_RelationDesc->rd_att->constr) |
| { |
| for (i = 0; i < batch_state->num_tuples; i++) |
| { |
| ExecConstraints(batch_state->resultRelInfo, |
| batch_state->slots[i], |
| batch_state->estate); |
| } |
| } |
| |
| /* Insert the tuples */ |
| heap_multi_insert(batch_state->resultRelInfo->ri_RelationDesc, |
| batch_state->slots, batch_state->num_tuples, |
| GetCurrentCommandId(true), |
| TABLE_INSERT_SKIP_FSM, /* Skip free space map for bulk */ |
| batch_state->bistate); /* Use bulk insert state */ |
| |
| /* Insert index entries for the tuples */ |
| if (batch_state->resultRelInfo->ri_NumIndices > 0) |
| { |
| for (i = 0; i < batch_state->num_tuples; i++) |
| { |
| result = ExecInsertIndexTuples(batch_state->resultRelInfo, |
| batch_state->slots[i], |
| batch_state->estate, false, |
| true, NULL, NIL, false); |
| |
| /* Check if the unique constraint is violated */ |
| if (list_length(result) != 0) |
| { |
| Datum id; |
| bool isnull; |
| |
| id = slot_getattr(batch_state->slots[i], 1, &isnull); |
| ereport(ERROR, (errmsg("Cannot insert duplicate vertex id: %ld", |
| DATUM_GET_GRAPHID(id)), |
| errhint("Entry id %ld is already used", |
| get_graphid_entry_id(id)))); |
| } |
| } |
| } |
| |
| CommandCounterIncrement(); |
| } |
| |
| PG_FUNCTION_INFO_V1(load_labels_from_file); |
| Datum load_labels_from_file(PG_FUNCTION_ARGS) |
| { |
| Name graph_name; |
| Name label_name; |
| text* file_name; |
| char* graph_name_str; |
| char* label_name_str; |
| char* file_path_str; |
| Oid graph_oid; |
| Oid label_relid; |
| int32 label_id; |
| bool id_field_exists; |
| bool load_as_agtype; |
| |
| if (PG_ARGISNULL(0)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("graph name must not be NULL"))); |
| } |
| |
| if (PG_ARGISNULL(1)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("label name must not be NULL"))); |
| } |
| |
| if (PG_ARGISNULL(2)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("file path must not be NULL"))); |
| } |
| |
| /* Check file read permission first */ |
| check_file_read_permission(); |
| |
| graph_name = PG_GETARG_NAME(0); |
| label_name = PG_GETARG_NAME(1); |
| file_name = PG_GETARG_TEXT_P(2); |
| id_field_exists = PG_GETARG_BOOL(3); |
| load_as_agtype = PG_GETARG_BOOL(4); |
| |
| graph_name_str = NameStr(*graph_name); |
| label_name_str = NameStr(*label_name); |
| |
| if (strcmp(label_name_str, "") == 0) |
| { |
| label_name_str = AG_DEFAULT_LABEL_VERTEX; |
| } |
| |
| file_path_str = build_safe_filename(text_to_cstring(file_name)); |
| |
| graph_oid = get_or_create_graph(graph_name); |
| label_id = get_or_create_label(graph_oid, graph_name_str, |
| label_name_str, LABEL_KIND_VERTEX); |
| |
| /* Get the label relation and check permissions */ |
| label_relid = get_label_relation(label_name_str, graph_oid); |
| check_table_permissions(label_relid); |
| check_rls_for_load(label_relid); |
| |
| create_labels_from_csv_file(file_path_str, graph_name_str, graph_oid, |
| label_name_str, label_id, id_field_exists, |
| load_as_agtype); |
| |
| free(file_path_str); |
| |
| PG_RETURN_VOID(); |
| } |
| |
| PG_FUNCTION_INFO_V1(load_edges_from_file); |
| Datum load_edges_from_file(PG_FUNCTION_ARGS) |
| { |
| Name graph_name; |
| Name label_name; |
| text* file_name; |
| char* graph_name_str; |
| char* label_name_str; |
| char* file_path_str; |
| Oid graph_oid; |
| Oid label_relid; |
| int32 label_id; |
| bool load_as_agtype; |
| |
| if (PG_ARGISNULL(0)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("graph name must not be NULL"))); |
| } |
| |
| if (PG_ARGISNULL(1)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("label name must not be NULL"))); |
| } |
| |
| if (PG_ARGISNULL(2)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("file path must not be NULL"))); |
| } |
| |
| /* Check file read permission first */ |
| check_file_read_permission(); |
| |
| graph_name = PG_GETARG_NAME(0); |
| label_name = PG_GETARG_NAME(1); |
| file_name = PG_GETARG_TEXT_P(2); |
| load_as_agtype = PG_GETARG_BOOL(3); |
| |
| graph_name_str = NameStr(*graph_name); |
| label_name_str = NameStr(*label_name); |
| |
| if (strcmp(label_name_str, "") == 0) |
| { |
| label_name_str = AG_DEFAULT_LABEL_EDGE; |
| } |
| |
| file_path_str = build_safe_filename(text_to_cstring(file_name)); |
| |
| graph_oid = get_or_create_graph(graph_name); |
| label_id = get_or_create_label(graph_oid, graph_name_str, |
| label_name_str, LABEL_KIND_EDGE); |
| |
| /* Get the label relation and check permissions */ |
| label_relid = get_label_relation(label_name_str, graph_oid); |
| check_table_permissions(label_relid); |
| check_rls_for_load(label_relid); |
| |
| create_edges_from_csv_file(file_path_str, graph_name_str, graph_oid, |
| label_name_str, label_id, load_as_agtype); |
| |
| free(file_path_str); |
| |
| PG_RETURN_VOID(); |
| } |
| |
| /* |
| * Helper function to create a graph if it does not exist. |
| * Just returns Oid of the graph if it already exists. |
| */ |
| static Oid get_or_create_graph(const Name graph_name) |
| { |
| Oid graph_oid; |
| char *graph_name_str; |
| |
| graph_name_str = NameStr(*graph_name); |
| graph_oid = get_graph_oid(graph_name_str); |
| |
| if (OidIsValid(graph_oid)) |
| { |
| return graph_oid; |
| } |
| |
| graph_oid = create_graph_internal(graph_name); |
| ereport(NOTICE, |
| (errmsg("graph \"%s\" has been created", NameStr(*graph_name)))); |
| |
| return graph_oid; |
| } |
| |
| /* |
| * Helper function to create a label if it does not exist. |
| * Just returns label_id of the label if it already exists. |
| */ |
| static int32 get_or_create_label(Oid graph_oid, char *graph_name, |
| char *label_name, char label_kind) |
| { |
| int32 label_id; |
| |
| label_id = get_label_id(label_name, graph_oid); |
| |
| /* Check if label exists */ |
| if (label_id_is_valid(label_id)) |
| { |
| char *label_kind_full = (label_kind == LABEL_KIND_VERTEX) |
| ? "vertex" : "edge"; |
| char opposite_label_kind = (label_kind == LABEL_KIND_VERTEX) |
| ? LABEL_KIND_EDGE : LABEL_KIND_VERTEX; |
| |
| /* If it exists, but as another label_kind, throw an error */ |
| if (get_label_kind(label_name, graph_oid) == opposite_label_kind) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("label \"%s\" already exists as %s label", |
| label_name, label_kind_full))); |
| } |
| } |
| else |
| { |
| /* Create a label */ |
| RangeVar *rv; |
| List *parent; |
| char *default_label = (label_kind == LABEL_KIND_VERTEX) |
| ? AG_DEFAULT_LABEL_VERTEX : AG_DEFAULT_LABEL_EDGE; |
| rv = get_label_range_var(graph_name, graph_oid, default_label); |
| parent = list_make1(rv); |
| |
| create_label(graph_name, label_name, label_kind, parent); |
| label_id = get_label_id(label_name, graph_oid); |
| |
| ereport(NOTICE, |
| (errmsg("VLabel \"%s\" has been created", label_name))); |
| } |
| |
| return label_id; |
| } |
| |
| /* |
| * Initialize the batch insert state. |
| */ |
| void init_batch_insert(batch_insert_state **batch_state, |
| char *label_name, Oid graph_oid) |
| { |
| Relation relation; |
| Oid relid; |
| EState *estate; |
| ResultRelInfo *resultRelInfo; |
| RangeTblEntry *rte; |
| RTEPermissionInfo *perminfo; |
| List *range_table = NIL; |
| List *perminfos = NIL; |
| int i; |
| |
| /* Get the relation OID */ |
| relid = get_label_relation(label_name, graph_oid); |
| |
| /* Initialize executor state */ |
| estate = CreateExecutorState(); |
| |
| /* Create range table entry for ExecConstraints */ |
| rte = makeNode(RangeTblEntry); |
| rte->rtekind = RTE_RELATION; |
| rte->relid = relid; |
| rte->relkind = RELKIND_RELATION; |
| rte->rellockmode = RowExclusiveLock; |
| rte->perminfoindex = 1; |
| range_table = list_make1(rte); |
| |
| /* Create permission info */ |
| perminfo = makeNode(RTEPermissionInfo); |
| perminfo->relid = relid; |
| perminfo->requiredPerms = ACL_INSERT; |
| perminfos = list_make1(perminfo); |
| |
| /* Initialize range table in executor state */ |
| ExecInitRangeTable(estate, range_table, perminfos, NULL); |
| |
| /* Initialize resultRelInfo - this opens the relation */ |
| resultRelInfo = makeNode(ResultRelInfo); |
| ExecInitResultRelation(estate, resultRelInfo, 1); |
| |
| /* Get relation from resultRelInfo (opened by ExecInitResultRelation) */ |
| relation = resultRelInfo->ri_RelationDesc; |
| |
| /* Open the indices */ |
| ExecOpenIndices(resultRelInfo, false); |
| |
| /* Initialize the batch insert state */ |
| *batch_state = (batch_insert_state *) palloc0(sizeof(batch_insert_state)); |
| (*batch_state)->slots = palloc(sizeof(TupleTableSlot *) * BATCH_SIZE); |
| (*batch_state)->estate = estate; |
| (*batch_state)->resultRelInfo = resultRelInfo; |
| (*batch_state)->num_tuples = 0; |
| (*batch_state)->buffered_bytes = 0; |
| (*batch_state)->bistate = GetBulkInsertState(); |
| |
| /* Create slots */ |
| for (i = 0; i < BATCH_SIZE; i++) |
| { |
| (*batch_state)->slots[i] = MakeSingleTupleTableSlot( |
| RelationGetDescr(relation), |
| &TTSOpsHeapTuple); |
| } |
| } |
| |
| /* |
| * Finish the batch insert for vertices. Insert the |
| * tuples remaining in the batch state and clean up. |
| */ |
| void finish_batch_insert(batch_insert_state **batch_state) |
| { |
| int i; |
| |
| if ((*batch_state)->num_tuples > 0) |
| { |
| insert_batch(*batch_state); |
| (*batch_state)->num_tuples = 0; |
| } |
| |
| /* Free slots */ |
| for (i = 0; i < BATCH_SIZE; i++) |
| { |
| ExecDropSingleTupleTableSlot((*batch_state)->slots[i]); |
| } |
| |
| /* Free BulkInsertState */ |
| FreeBulkInsertState((*batch_state)->bistate); |
| |
| /* Close result relations and range table relations */ |
| ExecCloseResultRelations((*batch_state)->estate); |
| ExecCloseRangeTableRelations((*batch_state)->estate); |
| |
| /* Clean up executor state */ |
| FreeExecutorState((*batch_state)->estate); |
| pfree((*batch_state)->slots); |
| pfree(*batch_state); |
| *batch_state = NULL; |
| } |