| /*------------------------------------------------------------------------- |
| * |
| * tstoreReceiver.c |
| * An implementation of DestReceiver that stores the result tuples in |
| * a Tuplestore. |
| * |
| * Optionally, we can force detoasting (but not decompression) of out-of-line |
| * toasted values. This is to support cursors WITH HOLD, which must retain |
| * data even if the underlying table is dropped. |
| * |
| * Also optionally, we can apply a tuple conversion map before storing. |
| * |
| * |
| * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * IDENTIFICATION |
| * src/backend/executor/tstoreReceiver.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/heapam.h" |
| #include "access/heaptoast.h" |
| #include "access/detoast.h" |
| #include "access/tupconvert.h" |
| #include "executor/tstoreReceiver.h" |
| #include "executor/nodeShareInputScan.h" |
| |
| typedef struct |
| { |
| DestReceiver pub; |
| /* parameters: */ |
| Tuplestorestate *tstore; /* where to put the data */ |
| MemoryContext cxt; /* context containing tstore */ |
| bool detoast; /* were we told to detoast? */ |
| TupleDesc target_tupdesc; /* target tupdesc, or NULL if none */ |
| const char *map_failure_msg; /* tupdesc mapping failure message */ |
| /* workspace: */ |
| Datum *outvalues; /* values array for result tuple */ |
| Datum *tofree; /* temp values to be pfree'd */ |
| TupleConversionMap *tupmap; /* conversion map, if needed */ |
| TupleTableSlot *mapslot; /* slot for mapped tuples */ |
| } TStoreState; |
| |
| |
| static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self); |
| static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self); |
| static bool tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self); |
| |
| |
| /* |
| * Prepare to receive tuples from executor. |
| */ |
| static void |
| tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) |
| { |
| TStoreState *myState = (TStoreState *) self; |
| bool needtoast = false; |
| int natts = typeinfo->natts; |
| int i; |
| |
| /* Check if any columns require detoast work */ |
| if (myState->detoast) |
| { |
| for (i = 0; i < natts; i++) |
| { |
| Form_pg_attribute attr = TupleDescAttr(typeinfo, i); |
| |
| if (attr->attisdropped) |
| continue; |
| if (attr->attlen == -1) |
| { |
| needtoast = true; |
| break; |
| } |
| } |
| } |
| |
| /* Check if tuple conversion is needed */ |
| if (myState->target_tupdesc) |
| myState->tupmap = convert_tuples_by_position(typeinfo, |
| myState->target_tupdesc, |
| myState->map_failure_msg); |
| else |
| myState->tupmap = NULL; |
| |
| /* Set up appropriate callback */ |
| if (needtoast) |
| { |
| Assert(!myState->tupmap); |
| myState->pub.receiveSlot = tstoreReceiveSlot_detoast; |
| /* Create workspace */ |
| myState->outvalues = (Datum *) |
| MemoryContextAlloc(myState->cxt, natts * sizeof(Datum)); |
| myState->tofree = (Datum *) |
| MemoryContextAlloc(myState->cxt, natts * sizeof(Datum)); |
| myState->mapslot = NULL; |
| } |
| else if (myState->tupmap) |
| { |
| myState->pub.receiveSlot = tstoreReceiveSlot_tupmap; |
| myState->outvalues = NULL; |
| myState->tofree = NULL; |
| myState->mapslot = MakeSingleTupleTableSlot(myState->target_tupdesc, |
| &TTSOpsVirtual); |
| } |
| else |
| { |
| myState->pub.receiveSlot = tstoreReceiveSlot_notoast; |
| myState->outvalues = NULL; |
| myState->tofree = NULL; |
| myState->mapslot = NULL; |
| } |
| } |
| |
| /* |
| * Receive a tuple from the executor and store it in the tuplestore. |
| * This is for the easy case where we don't have to detoast nor map anything. |
| */ |
| static bool |
| tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self) |
| { |
| TStoreState *myState = (TStoreState *) self; |
| |
| tuplestore_puttupleslot(myState->tstore, slot); |
| |
| return true; |
| } |
| |
| /* |
| * Receive a tuple from the executor and store it in the tuplestore. |
| * This is for the case where we have to detoast any toasted values. |
| */ |
| static bool |
| tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) |
| { |
| TStoreState *myState = (TStoreState *) self; |
| TupleDesc typeinfo = slot->tts_tupleDescriptor; |
| int natts = typeinfo->natts; |
| int nfree; |
| int i; |
| MemoryContext oldcxt; |
| |
| /* Make sure the tuple is fully deconstructed */ |
| slot_getallattrs(slot); |
| |
| /* |
| * Fetch back any out-of-line datums. We build the new datums array in |
| * myState->outvalues[] (but we can re-use the slot's isnull array). Also, |
| * remember the fetched values to free afterwards. |
| */ |
| nfree = 0; |
| for (i = 0; i < natts; i++) |
| { |
| Datum val = slot->tts_values[i]; |
| Form_pg_attribute attr = TupleDescAttr(typeinfo, i); |
| |
| if (!attr->attisdropped && attr->attlen == -1 && !slot->tts_isnull[i]) |
| { |
| if (VARATT_IS_EXTERNAL(DatumGetPointer(val))) |
| { |
| val = PointerGetDatum(detoast_external_attr((struct varlena *) |
| DatumGetPointer(val))); |
| myState->tofree[nfree++] = val; |
| } |
| } |
| |
| myState->outvalues[i] = val; |
| } |
| |
| /* |
| * Push the modified tuple into the tuplestore. |
| */ |
| oldcxt = MemoryContextSwitchTo(myState->cxt); |
| tuplestore_putvalues(myState->tstore, typeinfo, |
| myState->outvalues, slot->tts_isnull); |
| MemoryContextSwitchTo(oldcxt); |
| |
| /* And release any temporary detoasted values */ |
| for (i = 0; i < nfree; i++) |
| pfree(DatumGetPointer(myState->tofree[i])); |
| |
| return true; |
| } |
| |
| /* |
| * Receive a tuple from the executor and store it in the tuplestore. |
| * This is for the case where we must apply a tuple conversion map. |
| */ |
| static bool |
| tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self) |
| { |
| TStoreState *myState = (TStoreState *) self; |
| |
| execute_attr_map_slot(myState->tupmap->attrMap, slot, myState->mapslot); |
| tuplestore_puttupleslot(myState->tstore, myState->mapslot); |
| |
| return true; |
| } |
| |
| /* |
| * Clean up at end of an executor run |
| */ |
| static void |
| tstoreShutdownReceiver(DestReceiver *self) |
| { |
| TStoreState *myState = (TStoreState *) self; |
| |
| /* Release workspace if any */ |
| if (myState->outvalues) |
| pfree(myState->outvalues); |
| myState->outvalues = NULL; |
| if (myState->tofree) |
| pfree(myState->tofree); |
| myState->tofree = NULL; |
| if (myState->tupmap) |
| free_conversion_map(myState->tupmap); |
| myState->tupmap = NULL; |
| if (myState->mapslot) |
| ExecDropSingleTupleTableSlot(myState->mapslot); |
| myState->mapslot = NULL; |
| } |
| |
| /* |
| * Destroy receiver when done with it |
| */ |
| static void |
| tstoreDestroyReceiver(DestReceiver *self) |
| { |
| pfree(self); |
| } |
| |
| /* |
| * Initially create a DestReceiver object. |
| */ |
| DestReceiver * |
| CreateTuplestoreDestReceiver(void) |
| { |
| TStoreState *self = (TStoreState *) palloc0(sizeof(TStoreState)); |
| |
| self->pub.receiveSlot = tstoreReceiveSlot_notoast; /* might change */ |
| self->pub.rStartup = tstoreStartupReceiver; |
| self->pub.rShutdown = tstoreShutdownReceiver; |
| self->pub.rDestroy = tstoreDestroyReceiver; |
| self->pub.mydest = DestTuplestore; |
| |
| /* private fields will be set by SetTuplestoreDestReceiverParams */ |
| |
| return (DestReceiver *) self; |
| } |
| |
| /* |
| * Set parameters for a TuplestoreDestReceiver |
| * |
| * tStore: where to store the tuples |
| * tContext: memory context containing tStore |
| * detoast: forcibly detoast contained data? |
| * target_tupdesc: if not NULL, forcibly convert tuples to this rowtype |
| * map_failure_msg: error message to use if mapping to target_tupdesc fails |
| * |
| * We don't currently support both detoast and target_tupdesc at the same |
| * time, just because no existing caller needs that combination. |
| */ |
| void |
| SetTuplestoreDestReceiverParams(DestReceiver *self, |
| Tuplestorestate *tStore, |
| MemoryContext tContext, |
| bool detoast, |
| TupleDesc target_tupdesc, |
| const char *map_failure_msg) |
| { |
| TStoreState *myState = (TStoreState *) self; |
| |
| Assert(!(detoast && target_tupdesc)); |
| |
| Assert(myState->pub.mydest == DestTuplestore); |
| myState->tstore = tStore; |
| myState->cxt = tContext; |
| myState->detoast = detoast; |
| myState->target_tupdesc = target_tupdesc; |
| myState->map_failure_msg = map_failure_msg; |
| } |
| |
| /* |
| * PersistentTupleStoreReceiver |
| * |
| * This receiver stores tuples in a persistent manner, allowing the |
| * tuples to be retrieved later even after the transaction has been |
| * committed. |
| */ |
| typedef struct PersistentTupleStore |
| { |
| DestReceiver pub; /* base class */ |
| ResourceOwner owner; /* resource owner for this receiver */ |
| MemoryContext cxt; /* context containing tstore */ |
| Tuplestorestate *tstore; /* state for storing tuples */ |
| SharedFileSet *fileset; /* shared fileset for storing tuples */ |
| const char *filename; /* filename for storing tuples */ |
| bool detoast; /* were we told to detoast? */ |
| bool initfile; /* is this the first time to init file? */ |
| Datum *outvalues; /* values array for result tuple */ |
| Datum *tofree; /* temp values to be pfree'd */ |
| } PersistentTupleStore; |
| |
| |
| static bool |
| persistentTstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self) |
| { |
| PersistentTupleStore *myState = (PersistentTupleStore *) self; |
| |
| if (!myState->initfile) |
| { |
| /* init file */ |
| Assert(myState->fileset); |
| Assert(myState->filename); |
| tuplestore_make_sharedV2(myState->tstore, myState->fileset, myState->filename, myState->owner); |
| myState->initfile = true; |
| } |
| tuplestore_puttupleslot(myState->tstore, slot); |
| |
| return true; |
| } |
| |
| static bool |
| persistentTstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) |
| { |
| PersistentTupleStore *myState = (PersistentTupleStore *) self; |
| TupleDesc typeinfo = slot->tts_tupleDescriptor; |
| int natts = typeinfo->natts; |
| int nfree; |
| int i; |
| MemoryContext oldcxt; |
| |
| if (!myState->initfile) |
| { |
| /* init file */ |
| Assert(myState->fileset); |
| Assert(myState->filename); |
| tuplestore_make_sharedV2(myState->tstore, myState->fileset, myState->filename, myState->owner); |
| myState->initfile = true; |
| } |
| |
| /* Make sure the tuple is fully deconstructed */ |
| slot_getallattrs(slot); |
| |
| /* |
| * Fetch back any out-of-line datums. We build the new datums array in |
| * myState->outvalues[] (but we can re-use the slot's isnull array). Also, |
| * remember the fetched values to free afterwards. |
| */ |
| nfree = 0; |
| for (i = 0; i < natts; i++) |
| { |
| Datum val = slot->tts_values[i]; |
| Form_pg_attribute attr = TupleDescAttr(typeinfo, i); |
| |
| if (!attr->attisdropped && attr->attlen == -1 && !slot->tts_isnull[i]) |
| { |
| if (VARATT_IS_EXTERNAL(DatumGetPointer(val))) |
| { |
| val = PointerGetDatum(detoast_external_attr((struct varlena *) |
| DatumGetPointer(val))); |
| myState->tofree[nfree++] = val; |
| } |
| } |
| |
| myState->outvalues[i] = val; |
| } |
| |
| /* |
| * Push the modified tuple into the tuplestore. |
| */ |
| oldcxt = MemoryContextSwitchTo(myState->cxt); |
| tuplestore_putvalues(myState->tstore, typeinfo, |
| myState->outvalues, slot->tts_isnull); |
| MemoryContextSwitchTo(oldcxt); |
| |
| /* And release any temporary detoasted values */ |
| for (i = 0; i < nfree; i++) |
| pfree(DatumGetPointer(myState->tofree[i])); |
| |
| return true; |
| } |
| |
| static void |
| persistentTstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) |
| { |
| bool needtoast = false; |
| int natts = typeinfo->natts; |
| int i; |
| PersistentTupleStore *myState = (PersistentTupleStore *) self; |
| MemoryContext oldcxt; |
| |
| oldcxt = MemoryContextSwitchTo(myState->cxt); |
| /* create in-memory tuplestore */ |
| if (myState->tstore == NULL) |
| myState->tstore = tuplestore_begin_heap(true, false, work_mem); |
| MemoryContextSwitchTo(oldcxt); |
| |
| if (myState->fileset == NULL) |
| myState->fileset = get_shareinput_fileset(); |
| |
| /* create tuplestore on disk */ |
| Assert(myState->filename); |
| Assert(myState->owner); |
| |
| /* Check if any columns require detoast work */ |
| if (myState->detoast) |
| { |
| for (i = 0; i < natts; i++) |
| { |
| Form_pg_attribute attr = TupleDescAttr(typeinfo, i); |
| |
| if (attr->attisdropped) |
| continue; |
| if (attr->attlen == -1) |
| { |
| needtoast = true; |
| break; |
| } |
| } |
| } |
| |
| /* Set up appropriate callback */ |
| if (needtoast) |
| { |
| myState->pub.receiveSlot = persistentTstoreReceiveSlot_detoast; |
| /* Create workspace */ |
| myState->outvalues = (Datum *) |
| MemoryContextAlloc(myState->cxt, natts * sizeof(Datum)); |
| myState->tofree = (Datum *) |
| MemoryContextAlloc(myState->cxt, natts * sizeof(Datum)); |
| } |
| else |
| { |
| myState->pub.receiveSlot = persistentTstoreReceiveSlot_notoast; |
| myState->outvalues = NULL; |
| myState->tofree = NULL; |
| } |
| |
| } |
| |
| static void |
| persistentTstoreShutdownReceiver(DestReceiver *self) |
| { |
| PersistentTupleStore *myState = (PersistentTupleStore *) self; |
| |
| if (myState->initfile) |
| { |
| /* Freeze tuplestore to file */ |
| tuplestore_freeze(myState->tstore); |
| |
| /* Set file to temporary to release file as soon as possible. */ |
| tuplestore_set_flags(myState->tstore, true); |
| } |
| /* Release workspace if any */ |
| if (myState->outvalues) |
| pfree(myState->outvalues); |
| myState->outvalues = NULL; |
| if (myState->tofree) |
| pfree(myState->tofree); |
| myState->tofree = NULL; |
| } |
| |
| static void |
| persistentTstoreDestroyReceiver(DestReceiver *self) |
| { |
| pfree(self); |
| } |
| |
| DestReceiver * |
| CreatePersistentTstoreDestReceiver(void) |
| { |
| PersistentTupleStore *self = palloc0(sizeof(PersistentTupleStore)); |
| |
| self->pub.receiveSlot = persistentTstoreReceiveSlot_notoast; /* might change */ |
| self->pub.rStartup = persistentTstoreStartupReceiver; |
| self->pub.rShutdown = persistentTstoreShutdownReceiver; |
| self->pub.rDestroy = persistentTstoreDestroyReceiver; |
| self->pub.mydest = DestPersistentstore; |
| |
| return (DestReceiver *) self; |
| } |
| |
| void |
| SetPersistentTstoreDestReceiverParams(DestReceiver *self, |
| Tuplestorestate *tStore, |
| ResourceOwner owner, |
| MemoryContext tContext, |
| bool detoast, |
| const char *filename) |
| { |
| PersistentTupleStore *myState = (PersistentTupleStore *) self; |
| |
| Assert(myState->pub.mydest == DestPersistentstore); |
| myState->tstore = tStore; |
| myState->owner = owner; |
| myState->cxt = tContext; |
| myState->detoast = detoast; |
| myState->filename = filename; |
| myState->initfile = false; |
| } |