[BEAM-6627] Add size reporting to JdbcIOIT (#10267)


diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
index c324c4d..56b7230 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
@@ -18,10 +18,12 @@
 package org.apache.beam.sdk.io.common;
 
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.Optional;
 import javax.sql.DataSource;
 import org.postgresql.ds.PGSimpleDataSource;
 
@@ -79,6 +81,21 @@
         options.getPostgresDatabaseName());
   }
 
+  public static Optional<Long> getPostgresTableSize(DataSource dataSource, String tableName) {
+    try (Connection connection = dataSource.getConnection()) {
+      try (Statement statement = connection.createStatement()) {
+        ResultSet resultSet =
+            statement.executeQuery(String.format("select pg_relation_size('%s')", tableName));
+        if (resultSet.next()) {
+          return Optional.of(resultSet.getLong(1));
+        }
+      }
+    } catch (SQLException e) {
+      return Optional.empty();
+    }
+    return Optional.empty();
+  }
+
   public static void createTableWithStatement(DataSource dataSource, String stmt)
       throws SQLException {
     try (Connection connection = dataSource.getConnection()) {
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 311cfe0..81b9799 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -24,6 +24,7 @@
 import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
@@ -83,6 +84,7 @@
   private static String tableName;
   private static String bigQueryDataset;
   private static String bigQueryTable;
+  private static Long tableSize;
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
@@ -97,6 +99,7 @@
     dataSource = DatabaseTestHelper.getPostgresDataSource(options);
     tableName = DatabaseTestHelper.getTestTableName("IT");
     executeWithRetry(JdbcIOIT::createTable);
+    tableSize = DatabaseTestHelper.getPostgresTableSize(dataSource, tableName).orElse(0L);
   }
 
   private static void createTable() throws SQLException {
@@ -141,6 +144,9 @@
   private Set<Function<MetricsReader, NamedTestResult>> getWriteMetricSuppliers(
       String uuid, String timestamp) {
     Set<Function<MetricsReader, NamedTestResult>> suppliers = new HashSet<>();
+    Optional<Long> postgresTableSize =
+        DatabaseTestHelper.getPostgresTableSize(dataSource, tableName);
+
     suppliers.add(
         reader -> {
           long writeStart = reader.getStartTimeMetric("write_time");
@@ -148,6 +154,13 @@
           return NamedTestResult.create(
               uuid, timestamp, "write_time", (writeEnd - writeStart) / 1e3);
         });
+
+    postgresTableSize.ifPresent(
+        tableFinalSize ->
+            suppliers.add(
+                ignore ->
+                    NamedTestResult.create(
+                        uuid, timestamp, "total_size", tableFinalSize - tableSize)));
     return suppliers;
   }