Add support for customizing the max number of records per record batch (#27)
Closes GH-26
diff --git a/src/afs.cc b/src/afs.cc
index 79597a2..bb8b5fc 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -84,6 +84,9 @@
static const int SessionTimeoutDefault = 300;
static int SessionTimeout;
+static const int MaxNRowsPerRecordBatchDefault = 1 * 1024 * 1024;
+static int MaxNRowsPerRecordBatch;
+
static volatile sig_atomic_t GotSIGTERM = false;
void afs_sigterm(SIGNAL_ARGS)
{
@@ -93,12 +96,21 @@
errno = errnoSaved;
}
+static volatile sig_atomic_t GotSIGHUP = false;
+void afs_sighup(SIGNAL_ARGS)
+{
+ auto errnoSaved = errno;
+ GotSIGHUP = true;
+ SetLatch(MyLatch);
+ errno = errnoSaved;
+}
+
static volatile sig_atomic_t GotSIGUSR1 = false;
void afs_sigusr1(SIGNAL_ARGS)
{
procsignal_sigusr1_handler(postgres_signal_arg);
- GotSIGUSR1 = true;
auto errnoSaved = errno;
+ GotSIGUSR1 = true;
SetLatch(MyLatch);
errno = errnoSaved;
}
@@ -689,13 +701,7 @@
// Write schema only stream format data to return only schema.
ARROW_ASSIGN_OR_RAISE(auto writer,
arrow::ipc::MakeStreamWriter(&output, schema, option));
- // Build 1 null row to write schema.
- for (uint64_t iAttribute = 0; iAttribute < SPI_tuptable->tupdesc->natts;
- ++iAttribute)
- {
- auto arrayBuilder = builder->GetField(iAttribute);
- ARROW_RETURN_NOT_OK(arrayBuilder->AppendNull());
- }
+ // Build an empty record batch to write schema.
ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
P("%s: %s: write: schema: WriteRecordBatch", Tag, tag_);
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
@@ -705,6 +711,7 @@
// Write another stream format data with record batches.
ARROW_ASSIGN_OR_RAISE(writer,
arrow::ipc::MakeStreamWriter(&output, schema, option));
+ bool needLastFlush = false;
for (uint64_t iTuple = 0; iTuple < SPI_processed; ++iTuple)
{
P("%s: %s: write: data: record batch: %d/%d",
@@ -739,10 +746,21 @@
ARROW_RETURN_NOT_OK(arrayBuilder->Append(DatumGetInt32(datum)));
}
}
+
+ if (((iTuple + 1) % MaxNRowsPerRecordBatch) == 0) {
+ ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
+ P("%s: %s: write: data: WriteRecordBatch: %d/%d", Tag, tag_, iTuple, SPI_processed);
+ ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+ needLastFlush = false;
+ } else {
+ needLastFlush = true;
+ }
}
- ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
- P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
- ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+ if (needLastFlush) {
+ ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
+ P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
+ ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+ }
P("%s: %s: write: data: Close", Tag, tag_);
ARROW_RETURN_NOT_OK(writer->Close());
return output.Close();
@@ -1002,6 +1020,12 @@
WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
+ if (GotSIGHUP)
+ {
+ GotSIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
if (GotSIGUSR1)
{
GotSIGUSR1 = false;
@@ -1022,6 +1046,7 @@
afs_executor(Datum arg)
{
pqsignal(SIGTERM, afs_sigterm);
+ pqsignal(SIGHUP, afs_sighup);
pqsignal(SIGUSR1, afs_sigusr1);
BackgroundWorkerUnblockSignals();
@@ -1045,6 +1070,12 @@
ResetLatch(MyLatch);
+ if (GotSIGHUP)
+ {
+ GotSIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
if (GotSIGUSR1)
{
GotSIGUSR1 = false;
@@ -1064,6 +1095,7 @@
afs_server(Datum arg)
{
pqsignal(SIGTERM, afs_sigterm);
+ pqsignal(SIGHUP, afs_sighup);
pqsignal(SIGUSR1, afs_sigusr1);
BackgroundWorkerUnblockSignals();
@@ -1088,6 +1120,7 @@
afs_main(Datum arg)
{
pqsignal(SIGTERM, afs_sigterm);
+ pqsignal(SIGHUP, afs_sighup);
pqsignal(SIGUSR1, afs_sigusr1);
BackgroundWorkerUnblockSignals();
@@ -1099,6 +1132,12 @@
WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
+ if (GotSIGHUP)
+ {
+ GotSIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
if (GotSIGUSR1)
{
GotSIGUSR1 = false;
@@ -1126,7 +1165,7 @@
(std::string("default: ") + URIDefault).c_str(),
&URI,
URIDefault,
- PGC_USERSET,
+ PGC_POSTMASTER,
0,
NULL,
NULL,
@@ -1140,12 +1179,25 @@
SessionTimeoutDefault,
-1,
INT_MAX,
- PGC_SIGHUP,
+ PGC_USERSET,
GUC_UNIT_S,
NULL,
NULL,
NULL);
+ DefineCustomIntVariable("arrow_flight_sql.max_n_rows_per_record_batch",
+ "The maximum number of rows per record batch.",
+ "The default is 1 * 1024 * 1024 rows.",
+ &MaxNRowsPerRecordBatch,
+ MaxNRowsPerRecordBatchDefault,
+ 1,
+ INT_MAX,
+ PGC_USERSET,
+ 0,
+ NULL,
+ NULL,
+ NULL);
+
PreviousShmemRequestHook = shmem_request_hook;
shmem_request_hook = afs_shmem_request_hook;