Add support for customizing the max number of records per record batch (#27)

Closes GH-26
diff --git a/src/afs.cc b/src/afs.cc
index 79597a2..bb8b5fc 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -84,6 +84,9 @@
 static const int SessionTimeoutDefault = 300;
 static int SessionTimeout;
 
+static const int MaxNRowsPerRecordBatchDefault = 1 * 1024 * 1024;
+static int MaxNRowsPerRecordBatch;
+
 static volatile sig_atomic_t GotSIGTERM = false;
 void afs_sigterm(SIGNAL_ARGS)
 {
@@ -93,12 +96,21 @@
 	errno = errnoSaved;
 }
 
+static volatile sig_atomic_t GotSIGHUP = false;
+void afs_sighup(SIGNAL_ARGS)
+{
+	auto errnoSaved = errno;
+	GotSIGHUP = true;
+	SetLatch(MyLatch);
+	errno = errnoSaved;
+}
+
 static volatile sig_atomic_t GotSIGUSR1 = false;
 void afs_sigusr1(SIGNAL_ARGS)
 {
 	procsignal_sigusr1_handler(postgres_signal_arg);
-	GotSIGUSR1 = true;
 	auto errnoSaved = errno;
+	GotSIGUSR1 = true;
 	SetLatch(MyLatch);
 	errno = errnoSaved;
 }
@@ -689,13 +701,7 @@
 		// Write schema only stream format data to return only schema.
 		ARROW_ASSIGN_OR_RAISE(auto writer,
 		                      arrow::ipc::MakeStreamWriter(&output, schema, option));
-		// Build 1 null row to write schema.
-		for (uint64_t iAttribute = 0; iAttribute < SPI_tuptable->tupdesc->natts;
-		     ++iAttribute)
-		{
-			auto arrayBuilder = builder->GetField(iAttribute);
-			ARROW_RETURN_NOT_OK(arrayBuilder->AppendNull());
-		}
+		// Build an empty record batch to write schema.
 		ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
 		P("%s: %s: write: schema: WriteRecordBatch", Tag, tag_);
 		ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
@@ -705,6 +711,7 @@
 		// Write another stream format data with record batches.
 		ARROW_ASSIGN_OR_RAISE(writer,
 		                      arrow::ipc::MakeStreamWriter(&output, schema, option));
+		bool needLastFlush = false;
 		for (uint64_t iTuple = 0; iTuple < SPI_processed; ++iTuple)
 		{
 			P("%s: %s: write: data: record batch: %d/%d",
@@ -739,10 +746,21 @@
 					ARROW_RETURN_NOT_OK(arrayBuilder->Append(DatumGetInt32(datum)));
 				}
 			}
+
+			if (((iTuple + 1) % MaxNRowsPerRecordBatch) == 0) {
+				ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
+				P("%s: %s: write: data: WriteRecordBatch: %d/%d", Tag, tag_, iTuple, SPI_processed);
+				ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+				needLastFlush = false;
+			} else {
+				needLastFlush = true;
+			}
 		}
-		ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
-		P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
-		ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+		if (needLastFlush) {
+			ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
+			P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
+			ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+		}
 		P("%s: %s: write: data: Close", Tag, tag_);
 		ARROW_RETURN_NOT_OK(writer->Close());
 		return output.Close();
@@ -1002,6 +1020,12 @@
 		WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, PG_WAIT_EXTENSION);
 		ResetLatch(MyLatch);
 
+		if (GotSIGHUP)
+		{
+			GotSIGHUP = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+
 		if (GotSIGUSR1)
 		{
 			GotSIGUSR1 = false;
@@ -1022,6 +1046,7 @@
 afs_executor(Datum arg)
 {
 	pqsignal(SIGTERM, afs_sigterm);
+	pqsignal(SIGHUP, afs_sighup);
 	pqsignal(SIGUSR1, afs_sigusr1);
 	BackgroundWorkerUnblockSignals();
 
@@ -1045,6 +1070,12 @@
 
 			ResetLatch(MyLatch);
 
+			if (GotSIGHUP)
+			{
+				GotSIGHUP = false;
+				ProcessConfigFile(PGC_SIGHUP);
+			}
+
 			if (GotSIGUSR1)
 			{
 				GotSIGUSR1 = false;
@@ -1064,6 +1095,7 @@
 afs_server(Datum arg)
 {
 	pqsignal(SIGTERM, afs_sigterm);
+	pqsignal(SIGHUP, afs_sighup);
 	pqsignal(SIGUSR1, afs_sigusr1);
 	BackgroundWorkerUnblockSignals();
 
@@ -1088,6 +1120,7 @@
 afs_main(Datum arg)
 {
 	pqsignal(SIGTERM, afs_sigterm);
+	pqsignal(SIGHUP, afs_sighup);
 	pqsignal(SIGUSR1, afs_sigusr1);
 	BackgroundWorkerUnblockSignals();
 
@@ -1099,6 +1132,12 @@
 			WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, PG_WAIT_EXTENSION);
 			ResetLatch(MyLatch);
 
+			if (GotSIGHUP)
+			{
+				GotSIGHUP = false;
+				ProcessConfigFile(PGC_SIGHUP);
+			}
+
 			if (GotSIGUSR1)
 			{
 				GotSIGUSR1 = false;
@@ -1126,7 +1165,7 @@
 	                           (std::string("default: ") + URIDefault).c_str(),
 	                           &URI,
 	                           URIDefault,
-	                           PGC_USERSET,
+	                           PGC_POSTMASTER,
 	                           0,
 	                           NULL,
 	                           NULL,
@@ -1140,12 +1179,25 @@
 	                        SessionTimeoutDefault,
 	                        -1,
 	                        INT_MAX,
-	                        PGC_SIGHUP,
+	                        PGC_USERSET,
 	                        GUC_UNIT_S,
 	                        NULL,
 	                        NULL,
 	                        NULL);
 
+	DefineCustomIntVariable("arrow_flight_sql.max_n_rows_per_record_batch",
+	                        "The maximum number of rows per record batch.",
+	                        "The default is 1 * 1024 * 1024 rows.",
+	                        &MaxNRowsPerRecordBatch,
+	                        MaxNRowsPerRecordBatchDefault,
+	                        1,
+	                        INT_MAX,
+	                        PGC_USERSET,
+	                        0,
+	                        NULL,
+	                        NULL,
+	                        NULL);
+
 	PreviousShmemRequestHook = shmem_request_hook;
 	shmem_request_hook = afs_shmem_request_hook;