Merge pull request #21944 from [cherry-pick][release-2.40.0][21941] Fix no output timestamp case
[cherry-pick][release-2.40.0][21941] Fix no output timestamp case
diff --git a/CHANGES.md b/CHANGES.md
index 9cb2830..97fbd3f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -75,6 +75,7 @@
* The Go Sdk now requires a minimum version of 1.18 in order to support generics ([BEAM-14347](https://issues.apache.org/jira/browse/BEAM-14347)).
* synthetic.SourceConfig field types have changed to int64 from int for better compatibility with Flink's use of Logical types in Schemas (Go) ([BEAM-14173](https://issues.apache.org/jira/browse/BEAM-14173))
+* Default coder updated to compress sources used with `BoundedSourceAsSDFWrapperFn` and `UnboundedSourceAsSDFWrapper`.
## Deprecations
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index b300f31..4b81806 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -34,6 +34,7 @@
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark;
@@ -148,7 +149,7 @@
.getPipeline()
.apply(Impulse.create())
.apply(ParDo.of(new OutputSingleSource<>(source)))
- .setCoder(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {}))
+ .setCoder(SnappyCoder.of(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {})))
.apply(ParDo.of(new BoundedSourceAsSDFWrapperFn<>()))
.setCoder(source.getOutputCoder())
.setTypeDescriptor(source.getOutputCoder().getEncodedTypeDescriptor());
@@ -216,7 +217,9 @@
.apply(Impulse.create())
.apply(ParDo.of(new OutputSingleSource<>(source)))
.setCoder(
- SerializableCoder.of(new TypeDescriptor<UnboundedSource<T, CheckpointMark>>() {}))
+ SnappyCoder.of(
+ SerializableCoder.of(
+ new TypeDescriptor<UnboundedSource<T, CheckpointMark>>() {})))
.apply(ParDo.of(createUnboundedSdfWrapper()))
.setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder()));
@@ -314,7 +317,7 @@
@GetRestrictionCoder
public Coder<BoundedSourceT> restrictionCoder() {
- return SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {});
+ return SnappyCoder.of(SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {}));
}
/**
@@ -600,9 +603,10 @@
@GetRestrictionCoder
public Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder() {
- return new UnboundedSourceRestrictionCoder<>(
- SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
- NullableCoder.of(checkpointCoder));
+ return SnappyCoder.of(
+ new UnboundedSourceRestrictionCoder<>(
+ SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
+ NullableCoder.of(checkpointCoder)));
}
/**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 3a37a36..0a62245 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -33,6 +33,7 @@
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -516,7 +517,9 @@
static void verifyTableNotExistOrEmpty(DatasetService datasetService, TableReference tableRef) {
try {
- if (datasetService.getTable(tableRef) != null) {
+ if (datasetService.getTable(
+ tableRef, Collections.emptyList(), DatasetService.TableMetadataView.BASIC)
+ != null) {
checkState(
datasetService.isTableEmpty(tableRef),
"BigQuery table is not empty: %s.",
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index dd4da2b..a0484cc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -111,6 +111,16 @@
/** An interface to get, create and delete Cloud BigQuery datasets and tables. */
public interface DatasetService extends AutoCloseable {
+
+ // maps the values at
+ // https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get#TableMetadataView
+ enum TableMetadataView {
+ TABLE_METADATA_VIEW_UNSPECIFIED,
+ BASIC,
+ STORAGE_STATS,
+ FULL;
+ };
+
/**
* Gets the specified {@link Table} resource by table ID.
*
@@ -123,6 +133,10 @@
Table getTable(TableReference tableRef, List<String> selectedFields)
throws InterruptedException, IOException;
+ @Nullable
+ Table getTable(TableReference tableRef, List<String> selectedFields, TableMetadataView view)
+ throws InterruptedException, IOException;
+
/** Creates the specified table if it does not exist. */
void createTable(Table table) throws InterruptedException, IOException;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 50f3988..a50947a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -616,13 +616,24 @@
@Override
public @Nullable Table getTable(TableReference tableRef, List<String> selectedFields)
throws IOException, InterruptedException {
- return getTable(tableRef, selectedFields, createDefaultBackoff(), Sleeper.DEFAULT);
+ return getTable(tableRef, selectedFields, TableMetadataView.STORAGE_STATS);
+ }
+
+ @Override
+ public @Nullable Table getTable(
+ TableReference tableRef, List<String> selectedFields, TableMetadataView view)
+ throws IOException, InterruptedException {
+ return getTable(tableRef, selectedFields, view, createDefaultBackoff(), Sleeper.DEFAULT);
}
@VisibleForTesting
@Nullable
Table getTable(
- TableReference ref, List<String> selectedFields, BackOff backoff, Sleeper sleeper)
+ TableReference ref,
+ List<String> selectedFields,
+ TableMetadataView view,
+ BackOff backoff,
+ Sleeper sleeper)
throws IOException, InterruptedException {
Tables.Get get =
client
@@ -632,6 +643,9 @@
if (!selectedFields.isEmpty()) {
get.setSelectedFields(String.join(",", selectedFields));
}
+ if (view != null) {
+ get.set("view", view.name());
+ }
try {
return executeWithRetries(
get,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
index 403554b..426dfbb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
@@ -112,7 +112,9 @@
tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId()));
try (DatasetService datasetService =
bqServices.getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class))) {
- if (datasetService.getTable(tableReference) == null) {
+ if (datasetService.getTable(
+ tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC)
+ == null) {
TableSchema tableSchema = schemaSupplier.get();
Preconditions.checkArgumentNotNull(
tableSchema,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
index c7fca47..ba6063d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
@@ -22,6 +22,7 @@
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -166,7 +167,12 @@
if (!schemaHolder.isPresent()) {
// Not initialized. Query the new schema with the monitor released and then update the cache.
try {
- @Nullable Table table = datasetService.getTable(tableReference);
+ // requesting the BASIC view will prevent BQ backend to run calculations
+ // related with storage stats that are not needed here
+ @Nullable
+ Table table =
+ datasetService.getTable(
+ tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC);
schemaHolder =
Optional.ofNullable((table == null) ? null : SchemaHolder.of(table.getSchema(), 0));
} catch (Exception e) {
@@ -298,7 +304,12 @@
Map<String, TableSchema> schemas = Maps.newHashMapWithExpectedSize(tables.size());
for (Map.Entry<String, Refresh> entry : tables.entrySet()) {
TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
- Table table = entry.getValue().getDatasetService().getTable(tableReference);
+ Table table =
+ entry
+ .getValue()
+ .getDatasetService()
+ .getTable(
+ tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC);
if (table == null) {
throw new RuntimeException("Did not get value for table " + tableReference);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 1b4d03d..213d649 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -173,6 +173,23 @@
@Override
public Table getTable(TableReference tableRef, @Nullable List<String> selectedFields)
throws InterruptedException, IOException {
+ return getTable(tableRef, selectedFields, null);
+ }
+
+ @Override
+ public Table getTable(
+ TableReference tableRef,
+ @Nullable List<String> selectedFields,
+ @Nullable TableMetadataView view)
+ throws InterruptedException, IOException {
+ return getTableImpl(tableRef, selectedFields, view);
+ }
+
+ public Table getTableImpl(
+ TableReference tableRef,
+ @Nullable List<String> selectedFields,
+ @Nullable TableMetadataView view)
+ throws InterruptedException, IOException {
synchronized (FakeDatasetService.class) {
Map<String, TableContainer> dataset =
tables.get(tableRef.getProjectId(), tableRef.getDatasetId());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 6007d6a..a57299b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -474,7 +474,7 @@
Table table =
datasetService.getTable(
- tableRef, Collections.emptyList(), BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+ tableRef, Collections.emptyList(), null, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
assertEquals(testTable, table);
verifyAllResponsesAreRead();
@@ -499,7 +499,7 @@
.setTableId("tableId");
Table table =
datasetService.getTable(
- tableRef, Collections.emptyList(), BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+ tableRef, Collections.emptyList(), null, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
assertNull(table);
verifyAllResponsesAreRead();
@@ -526,7 +526,7 @@
new BigQueryServicesImpl.DatasetServiceImpl(
bigquery, null, PipelineOptionsFactory.create());
datasetService.getTable(
- tableRef, Collections.emptyList(), BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
+ tableRef, Collections.emptyList(), null, BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
}
@Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 3895c8c..cef9391 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -125,6 +125,7 @@
when(mockClient.tables()).thenReturn(mockTables);
when(mockTables.get(anyString(), anyString(), anyString())).thenReturn(mockTablesGet);
when(mockTablesGet.setPrettyPrint(false)).thenReturn(mockTablesGet);
+ when(mockTablesGet.set(anyString(), anyString())).thenReturn(mockTablesGet);
when(mockTablesGet.execute()).thenReturn(table);
}
@@ -132,6 +133,7 @@
verify(mockClient).tables();
verify(mockTables).get("project", "dataset", "table");
verify(mockTablesGet, atLeastOnce()).setPrettyPrint(false);
+ verify(mockTablesGet, atLeastOnce()).set(anyString(), anyString());
verify(mockTablesGet, atLeastOnce()).execute();
}