Merge pull request #21944 from [cherry-pick][release-2.40.0][21941] Fix no output timestamp case

[cherry-pick][release-2.40.0][21941] Fix no output timestamp case
diff --git a/CHANGES.md b/CHANGES.md
index 9cb2830..97fbd3f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -75,6 +75,7 @@
 
 * The Go Sdk now requires a minimum version of 1.18 in order to support generics ([BEAM-14347](https://issues.apache.org/jira/browse/BEAM-14347)).
 * synthetic.SourceConfig field types have changed to int64 from int for better compatibility with Flink's use of Logical types in Schemas (Go) ([BEAM-14173](https://issues.apache.org/jira/browse/BEAM-14173))
+* Default coder updated to compress sources used with `BoundedSourceAsSDFWrapperFn` and `UnboundedSourceAsSDFWrapper`.
 
 ## Deprecations
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index b300f31..4b81806 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -34,6 +34,7 @@
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark;
@@ -148,7 +149,7 @@
           .getPipeline()
           .apply(Impulse.create())
           .apply(ParDo.of(new OutputSingleSource<>(source)))
-          .setCoder(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {}))
+          .setCoder(SnappyCoder.of(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {})))
           .apply(ParDo.of(new BoundedSourceAsSDFWrapperFn<>()))
           .setCoder(source.getOutputCoder())
           .setTypeDescriptor(source.getOutputCoder().getEncodedTypeDescriptor());
@@ -216,7 +217,9 @@
               .apply(Impulse.create())
               .apply(ParDo.of(new OutputSingleSource<>(source)))
               .setCoder(
-                  SerializableCoder.of(new TypeDescriptor<UnboundedSource<T, CheckpointMark>>() {}))
+                  SnappyCoder.of(
+                      SerializableCoder.of(
+                          new TypeDescriptor<UnboundedSource<T, CheckpointMark>>() {})))
               .apply(ParDo.of(createUnboundedSdfWrapper()))
               .setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder()));
 
@@ -314,7 +317,7 @@
 
     @GetRestrictionCoder
     public Coder<BoundedSourceT> restrictionCoder() {
-      return SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {});
+      return SnappyCoder.of(SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {}));
     }
 
     /**
@@ -600,9 +603,10 @@
 
     @GetRestrictionCoder
     public Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder() {
-      return new UnboundedSourceRestrictionCoder<>(
-          SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
-          NullableCoder.of(checkpointCoder));
+      return SnappyCoder.of(
+          new UnboundedSourceRestrictionCoder<>(
+              SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
+              NullableCoder.of(checkpointCoder)));
     }
 
     /**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 3a37a36..0a62245 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -33,6 +33,7 @@
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -516,7 +517,9 @@
 
   static void verifyTableNotExistOrEmpty(DatasetService datasetService, TableReference tableRef) {
     try {
-      if (datasetService.getTable(tableRef) != null) {
+      if (datasetService.getTable(
+              tableRef, Collections.emptyList(), DatasetService.TableMetadataView.BASIC)
+          != null) {
         checkState(
             datasetService.isTableEmpty(tableRef),
             "BigQuery table is not empty: %s.",
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index dd4da2b..a0484cc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -111,6 +111,16 @@
 
   /** An interface to get, create and delete Cloud BigQuery datasets and tables. */
   public interface DatasetService extends AutoCloseable {
+
+    // maps the values at
+    // https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get#TableMetadataView
+    enum TableMetadataView {
+      TABLE_METADATA_VIEW_UNSPECIFIED,
+      BASIC,
+      STORAGE_STATS,
+      FULL;
+    };
+
     /**
      * Gets the specified {@link Table} resource by table ID.
      *
@@ -123,6 +133,10 @@
     Table getTable(TableReference tableRef, List<String> selectedFields)
         throws InterruptedException, IOException;
 
+    @Nullable
+    Table getTable(TableReference tableRef, List<String> selectedFields, TableMetadataView view)
+        throws InterruptedException, IOException;
+
     /** Creates the specified table if it does not exist. */
     void createTable(Table table) throws InterruptedException, IOException;
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 50f3988..a50947a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -616,13 +616,24 @@
     @Override
     public @Nullable Table getTable(TableReference tableRef, List<String> selectedFields)
         throws IOException, InterruptedException {
-      return getTable(tableRef, selectedFields, createDefaultBackoff(), Sleeper.DEFAULT);
+      return getTable(tableRef, selectedFields, TableMetadataView.STORAGE_STATS);
+    }
+
+    @Override
+    public @Nullable Table getTable(
+        TableReference tableRef, List<String> selectedFields, TableMetadataView view)
+        throws IOException, InterruptedException {
+      return getTable(tableRef, selectedFields, view, createDefaultBackoff(), Sleeper.DEFAULT);
     }
 
     @VisibleForTesting
     @Nullable
     Table getTable(
-        TableReference ref, List<String> selectedFields, BackOff backoff, Sleeper sleeper)
+        TableReference ref,
+        List<String> selectedFields,
+        TableMetadataView view,
+        BackOff backoff,
+        Sleeper sleeper)
         throws IOException, InterruptedException {
       Tables.Get get =
           client
@@ -632,6 +643,9 @@
       if (!selectedFields.isEmpty()) {
         get.setSelectedFields(String.join(",", selectedFields));
       }
+      if (view != null) {
+        get.set("view", view.name());
+      }
       try {
         return executeWithRetries(
             get,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
index 403554b..426dfbb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
@@ -112,7 +112,9 @@
     tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId()));
     try (DatasetService datasetService =
         bqServices.getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class))) {
-      if (datasetService.getTable(tableReference) == null) {
+      if (datasetService.getTable(
+              tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC)
+          == null) {
         TableSchema tableSchema = schemaSupplier.get();
         Preconditions.checkArgumentNotNull(
             tableSchema,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
index c7fca47..ba6063d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
@@ -22,6 +22,7 @@
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -166,7 +167,12 @@
     if (!schemaHolder.isPresent()) {
       // Not initialized. Query the new schema with the monitor released and then update the cache.
       try {
-        @Nullable Table table = datasetService.getTable(tableReference);
+        // requesting the BASIC view will prevent BQ backend to run calculations
+        // related with storage stats that are not needed here
+        @Nullable
+        Table table =
+            datasetService.getTable(
+                tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC);
         schemaHolder =
             Optional.ofNullable((table == null) ? null : SchemaHolder.of(table.getSchema(), 0));
       } catch (Exception e) {
@@ -298,7 +304,12 @@
     Map<String, TableSchema> schemas = Maps.newHashMapWithExpectedSize(tables.size());
     for (Map.Entry<String, Refresh> entry : tables.entrySet()) {
       TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
-      Table table = entry.getValue().getDatasetService().getTable(tableReference);
+      Table table =
+          entry
+              .getValue()
+              .getDatasetService()
+              .getTable(
+                  tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC);
       if (table == null) {
         throw new RuntimeException("Did not get value for table " + tableReference);
       }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 1b4d03d..213d649 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -173,6 +173,23 @@
   @Override
   public Table getTable(TableReference tableRef, @Nullable List<String> selectedFields)
       throws InterruptedException, IOException {
+    return getTable(tableRef, selectedFields, null);
+  }
+
+  @Override
+  public Table getTable(
+      TableReference tableRef,
+      @Nullable List<String> selectedFields,
+      @Nullable TableMetadataView view)
+      throws InterruptedException, IOException {
+    return getTableImpl(tableRef, selectedFields, view);
+  }
+
+  public Table getTableImpl(
+      TableReference tableRef,
+      @Nullable List<String> selectedFields,
+      @Nullable TableMetadataView view)
+      throws InterruptedException, IOException {
     synchronized (FakeDatasetService.class) {
       Map<String, TableContainer> dataset =
           tables.get(tableRef.getProjectId(), tableRef.getDatasetId());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 6007d6a..a57299b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -474,7 +474,7 @@
 
     Table table =
         datasetService.getTable(
-            tableRef, Collections.emptyList(), BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+            tableRef, Collections.emptyList(), null, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
 
     assertEquals(testTable, table);
     verifyAllResponsesAreRead();
@@ -499,7 +499,7 @@
             .setTableId("tableId");
     Table table =
         datasetService.getTable(
-            tableRef, Collections.emptyList(), BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+            tableRef, Collections.emptyList(), null, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
 
     assertNull(table);
     verifyAllResponsesAreRead();
@@ -526,7 +526,7 @@
         new BigQueryServicesImpl.DatasetServiceImpl(
             bigquery, null, PipelineOptionsFactory.create());
     datasetService.getTable(
-        tableRef, Collections.emptyList(), BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
+        tableRef, Collections.emptyList(), null, BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
   }
 
   @Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 3895c8c..cef9391 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -125,6 +125,7 @@
     when(mockClient.tables()).thenReturn(mockTables);
     when(mockTables.get(anyString(), anyString(), anyString())).thenReturn(mockTablesGet);
     when(mockTablesGet.setPrettyPrint(false)).thenReturn(mockTablesGet);
+    when(mockTablesGet.set(anyString(), anyString())).thenReturn(mockTablesGet);
     when(mockTablesGet.execute()).thenReturn(table);
   }
 
@@ -132,6 +133,7 @@
     verify(mockClient).tables();
     verify(mockTables).get("project", "dataset", "table");
     verify(mockTablesGet, atLeastOnce()).setPrettyPrint(false);
+    verify(mockTablesGet, atLeastOnce()).set(anyString(), anyString());
     verify(mockTablesGet, atLeastOnce()).execute();
   }