[BEAM-6627] Add size reporting to JdbcIOIT (#10267)
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
index c324c4d..56b7230 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
@@ -18,10 +18,12 @@
package org.apache.beam.sdk.io.common;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.Optional;
import javax.sql.DataSource;
import org.postgresql.ds.PGSimpleDataSource;
@@ -79,6 +81,21 @@
options.getPostgresDatabaseName());
}
+ public static Optional<Long> getPostgresTableSize(DataSource dataSource, String tableName) {
+ try (Connection connection = dataSource.getConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ ResultSet resultSet =
+ statement.executeQuery(String.format("select pg_relation_size('%s')", tableName));
+ if (resultSet.next()) {
+ return Optional.of(resultSet.getLong(1));
+ }
+ }
+ } catch (SQLException e) {
+ return Optional.empty();
+ }
+ return Optional.empty();
+ }
+
public static void createTableWithStatement(DataSource dataSource, String stmt)
throws SQLException {
try (Connection connection = dataSource.getConnection()) {
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 311cfe0..81b9799 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -24,6 +24,7 @@
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
@@ -83,6 +84,7 @@
private static String tableName;
private static String bigQueryDataset;
private static String bigQueryTable;
+ private static Long tableSize;
@Rule public TestPipeline pipelineWrite = TestPipeline.create();
@Rule public TestPipeline pipelineRead = TestPipeline.create();
@@ -97,6 +99,7 @@
dataSource = DatabaseTestHelper.getPostgresDataSource(options);
tableName = DatabaseTestHelper.getTestTableName("IT");
executeWithRetry(JdbcIOIT::createTable);
+ tableSize = DatabaseTestHelper.getPostgresTableSize(dataSource, tableName).orElse(0L);
}
private static void createTable() throws SQLException {
@@ -141,6 +144,9 @@
private Set<Function<MetricsReader, NamedTestResult>> getWriteMetricSuppliers(
String uuid, String timestamp) {
Set<Function<MetricsReader, NamedTestResult>> suppliers = new HashSet<>();
+ Optional<Long> postgresTableSize =
+ DatabaseTestHelper.getPostgresTableSize(dataSource, tableName);
+
suppliers.add(
reader -> {
long writeStart = reader.getStartTimeMetric("write_time");
@@ -148,6 +154,13 @@
return NamedTestResult.create(
uuid, timestamp, "write_time", (writeEnd - writeStart) / 1e3);
});
+
+ postgresTableSize.ifPresent(
+ tableFinalSize ->
+ suppliers.add(
+ ignore ->
+ NamedTestResult.create(
+ uuid, timestamp, "total_size", tableFinalSize - tableSize)));
return suppliers;
}