[BEAM-12769] Fix typo in test class name, CLass -> Class
diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto
index 10166e1..8f819b6 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -417,6 +417,9 @@
GCS_PROJECT_ID = 17 [(label_props) = { name: "GCS_PROJECT_ID"}];
DATASTORE_PROJECT = 18 [(label_props) = { name: "DATASTORE_PROJECT" }];
DATASTORE_NAMESPACE = 19 [(label_props) = { name: "DATASTORE_NAMESPACE" }];
+ BIGTABLE_PROJECT_ID = 20 [(label_props) = { name: "BIGTABLE_PROJECT_ID"}];
+ INSTANCE_ID = 21 [(label_props) = { name: "INSTANCE_ID"}];
+ TABLE_ID = 22 [(label_props) = { name: "TABLE_ID"}];
}
// A set of key and value labels which define the scope of the metric. For
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java
index 800413c..3133cc1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java
@@ -33,6 +33,16 @@
projectId, datasetId, tableId);
}
+ public static String bigtableTableID(String project, String instance, String table) {
+ return String.format("projects/%s/instances/%s/tables/%s", project, instance, table);
+ }
+
+ public static String bigtableResource(String projectId, String instanceId, String tableId) {
+ return String.format(
+ "//bigtable.googleapis.com/projects/%s/instances/%s/tables/%s",
+ projectId, instanceId, tableId);
+ }
+
public static String datastoreResource(String projectId, String namespace) {
return String.format(
"//bigtable.googleapis.com/projects/%s/namespaces/%s", projectId, namespace);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 88e55db..57ee58a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -80,6 +80,9 @@
public static final String BIGQUERY_QUERY_NAME = "BIGQUERY_QUERY_NAME";
public static final String DATASTORE_PROJECT = "DATASTORE_PROJECT";
public static final String DATASTORE_NAMESPACE = "DATASTORE_NAMESPACE";
+ public static final String BIGTABLE_PROJECT_ID = "BIGTABLE_PROJECT_ID";
+ public static final String INSTANCE_ID = "INSTANCE_ID";
+ public static final String TABLE_ID = "TABLE_ID";
static {
// Note: One benefit of defining these strings above, instead of pulling them in from
@@ -109,6 +112,10 @@
checkArgument(DATASTORE_PROJECT.equals(extractLabel(MonitoringInfoLabels.DATASTORE_PROJECT)));
checkArgument(
DATASTORE_NAMESPACE.equals(extractLabel(MonitoringInfoLabels.DATASTORE_NAMESPACE)));
+ checkArgument(
+ BIGTABLE_PROJECT_ID.equals(extractLabel(MonitoringInfoLabels.BIGTABLE_PROJECT_ID)));
+ checkArgument(INSTANCE_ID.equals(extractLabel(MonitoringInfoLabels.INSTANCE_ID)));
+ checkArgument(TABLE_ID.equals(extractLabel(MonitoringInfoLabels.TABLE_ID)));
}
}
diff --git a/sdks/go/examples/xlang/transforms.go b/sdks/go/examples/xlang/transforms.go
index acad214..3d410e8 100644
--- a/sdks/go/examples/xlang/transforms.go
+++ b/sdks/go/examples/xlang/transforms.go
@@ -20,7 +20,6 @@
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)
@@ -51,7 +50,7 @@
pl := beam.CrossLanguagePayload(prefixPayload{Data: prefix})
outT := beam.UnnamedOutput(typex.New(reflectx.String))
outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:prefix", pl, addr, beam.UnnamedInput(col), outT)
- return outs[graph.UnnamedOutputTag]
+ return outs[beam.UnnamedOutputTag()]
}
func CoGroupByKey(s beam.Scope, addr string, col1, col2 beam.PCollection) beam.PCollection {
@@ -59,21 +58,21 @@
namedInputs := map[string]beam.PCollection{"col1": col1, "col2": col2}
outT := beam.UnnamedOutput(typex.NewCoGBK(typex.New(reflectx.Int64), typex.New(reflectx.String)))
outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:cgbk", nil, addr, namedInputs, outT)
- return outs[graph.UnnamedOutputTag]
+ return outs[beam.UnnamedOutputTag()]
}
func CombinePerKey(s beam.Scope, addr string, col beam.PCollection) beam.PCollection {
s = s.Scope("XLangTest.CombinePerKey")
outT := beam.UnnamedOutput(typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64)))
outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:compk", nil, addr, beam.UnnamedInput(col), outT)
- return outs[graph.UnnamedOutputTag]
+ return outs[beam.UnnamedOutputTag()]
}
func CombineGlobally(s beam.Scope, addr string, col beam.PCollection) beam.PCollection {
s = s.Scope("XLangTest.CombineGlobally")
outT := beam.UnnamedOutput(typex.New(reflectx.Int64))
outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:comgl", nil, addr, beam.UnnamedInput(col), outT)
- return outs[graph.UnnamedOutputTag]
+ return outs[beam.UnnamedOutputTag()]
}
func Flatten(s beam.Scope, addr string, col1, col2 beam.PCollection) beam.PCollection {
@@ -81,14 +80,14 @@
namedInputs := map[string]beam.PCollection{"col1": col1, "col2": col2}
outT := beam.UnnamedOutput(typex.New(reflectx.Int64))
outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:flatten", nil, addr, namedInputs, outT)
- return outs[graph.UnnamedOutputTag]
+ return outs[beam.UnnamedOutputTag()]
}
func GroupByKey(s beam.Scope, addr string, col beam.PCollection) beam.PCollection {
s = s.Scope("XLangTest.GroupByKey")
outT := beam.UnnamedOutput(typex.NewCoGBK(typex.New(reflectx.String), typex.New(reflectx.Int64)))
outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:gbk", nil, addr, beam.UnnamedInput(col), outT)
- return outs[graph.UnnamedOutputTag]
+ return outs[beam.UnnamedOutputTag()]
}
func Multi(s beam.Scope, addr string, main1, main2, side beam.PCollection) (mainOut, sideOut beam.PCollection) {
@@ -112,5 +111,5 @@
s = s.Scope("XLang.Count")
outT := beam.UnnamedOutput(typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64)))
c := beam.CrossLanguage(s, "beam:transforms:xlang:count", nil, addr, beam.UnnamedInput(col), outT)
- return c[graph.UnnamedOutputTag]
+ return c[beam.UnnamedOutputTag()]
}
diff --git a/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go b/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
index f2317f0..8781cff 100644
--- a/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
+++ b/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
@@ -48,7 +48,6 @@
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)
@@ -131,7 +130,7 @@
pl := beam.CrossLanguagePayload(rpl)
outT := beam.UnnamedOutput(typex.NewKV(typex.New(rcfg.key), typex.New(rcfg.val)))
out := beam.CrossLanguage(s, readURN, pl, addr, nil, outT)
- return out[graph.UnnamedOutputTag]
+ return out[beam.UnnamedOutputTag()]
}
type readOption func(*readConfig)
diff --git a/sdks/go/pkg/beam/xlang.go b/sdks/go/pkg/beam/xlang.go
index 4ec0162..b9c6ec7 100644
--- a/sdks/go/pkg/beam/xlang.go
+++ b/sdks/go/pkg/beam/xlang.go
@@ -22,32 +22,44 @@
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
)
-// xlang exposes an API to execute cross-language transforms within the Go SDK.
-// It is experimental and likely to change. It exposes convenient wrappers
-// around the core functions to pass in any combination of named/unnamed
-// inputs/outputs.
-
// UnnamedInput is a helper function for passing single unnamed inputs to
-// `beam.CrossLanguage`.
+// beam.CrossLanguage.
//
// Example:
-// beam.CrossLanguage(s, urn, payload, addr, UnnamedInput(input), outputs);
+// beam.CrossLanguage(s, urn, payload, addr, UnnamedInput(input), outputs)
func UnnamedInput(col PCollection) map[string]PCollection {
return map[string]PCollection{graph.UnnamedInputTag: col}
}
// UnnamedOutput is a helper function for passing single unnamed output types to
-// `beam.CrossLanguage`.
+// beam.CrossLanguage. The associated output can be accessed with beam.UnnamedOutputTag.
//
// Example:
-// beam.CrossLanguage(s, urn, payload, addr, inputs, UnnamedOutput(output));
+// resultMap := beam.CrossLanguage(s, urn, payload, addr, inputs, UnnamedOutput(output));
+// result := resultMap[beam.UnnamedOutputTag()]
func UnnamedOutput(t FullType) map[string]FullType {
return map[string]FullType{graph.UnnamedOutputTag: t}
}
-// CrossLanguagePayload encodes a native Go struct into a payload for
-// cross-language transforms. To find the expected structure of a payload,
-// consult the documentation in the SDK performing the expansion.
+// UnnamedOutputTag provides the output tag used for an output passed to beam.UnnamedOutput.
+// Needed to retrieve the unnamed output PCollection from the result of beam.CrossLanguage.
+func UnnamedOutputTag() string {
+ return graph.UnnamedOutputTag
+}
+
+// CrossLanguagePayload encodes a native Go struct into a payload for cross-language transforms.
+// payloads are []byte encoded ExternalConfigurationPayload protobufs. In order to fill the
+// contents of the protobuf, the provided struct will be used to converted to a row encoded
+// representation with an accompanying schema, so the input struct must be compatible with schemas.
+//
+// See https://beam.apache.org/documentation/programming-guide/#schemas for basic information on
+// schemas, and pkg/beam/core/runtime/graphx/schema for details on schemas in the Go SDK.
+//
+// Example:
+// type stringPayload struct {
+// Data string
+// }
+// encodedPl := beam.CrossLanguagePayload(stringPayload{Data: "foo"})
func CrossLanguagePayload(pl interface{}) []byte {
bytes, err := xlangx.EncodeStructPayload(pl)
if err != nil {
@@ -56,8 +68,73 @@
return bytes
}
-// CrossLanguage executes a cross-language transform that uses named inputs and
-// returns named outputs.
+// CrossLanguage is a low-level transform for executing cross-language transforms written in other
+// SDKs. Because this is low-level, it is recommended to use one of the higher-level IO-specific
+// wrappers where available. These can be found in the pkg/beam/io/xlang subdirectory.
+// CrossLanguage is useful for executing cross-language transforms which do not have any existing
+// IO wrappers.
+//
+// Usage requires an address for an expansion service accessible during pipeline construction, a
+// URN identifying the desired transform, an optional payload with configuration information, and
+// input and output names. It outputs a map of named output PCollections.
+//
+// For more information on expansion services and other aspects of cross-language transforms in
+// general, refer to the Beam programming guide: https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines
+//
+// Payload
+//
+// Payloads are configuration data that some cross-language transforms require for expansion.
+// Consult the documentation of the transform in the source SDK to find out what payload data it
+// requires. If no payload is required, pass in nil.
+//
+// CrossLanguage accepts payloads as a []byte containing an encoded ExternalConfigurationPayload
+// protobuf. The helper function beam.CrossLanguagePayload is the recommended way to easily encode
+// a standard Go struct for use as a payload.
+//
+// Inputs and Outputs
+//
+// Like most transforms, any input PCollections must be provided. Unlike most transforms, output
+// types must be provided because Go cannot infer output types from external transforms.
+//
+// Inputs and outputs to a cross-language transform may be either named or unnamed. Named
+// inputs/outputs are used when there are more than one input/output, and are provided as maps with
+// names as keys. Unnamed inputs/outputs are used when there is only one, and a map can be quickly
+// constructed with the UnnamedInput and UnnamedOutput methods.
+//
+// An example of defining named inputs and outputs:
+//
+// namedInputs := map[string]beam.PCollection{"pcol1": pcol1, "pcol2": pcol2}
+// namedOutputTypes := map[string]typex.FullType{
+// "main": typex.New(reflectx.String),
+// "side": typex.New(reflectx.Int64),
+// }
+//
+// CrossLanguage outputs a map of PCollections with associated names. These names will match those
+// from provided named outputs. If the beam.UnnamedOutput method was used, the PCollection can be
+// retrieved with beam.UnnamedOutputTag().
+//
+// An example of retrieving named outputs from a call to CrossLanguage:
+//
+// outputs := beam.CrossLanguage(...)
+// mainPcol := outputs["main"]
+// sidePcol := outputs["side"]
+//
+// Example
+//
+// This example shows using CrossLanguage to execute the Prefix cross-language transform using an
+// expansion service running on localhost:8099. Prefix requires a payload containing a prefix to
+// prepend to every input string.
+//
+// type prefixPayload struct {
+// Data string
+// }
+// encodedPl := beam.CrossLanguagePayload(prefixPayload{Data: "foo"})
+// urn := "beam:transforms:xlang:test:prefix"
+// expansionAddr := "localhost:8099"
+// outputType := beam.UnnamedOutput(typex.New(reflectx.String))
+// input := beam.UnnamedInput(inputPcol)
+// outs := beam.CrossLanguage(s, urn, encodedPl, expansionAddr, input, outputType)
+// outPcol := outputs[beam.UnnamedOutputTag()]
func CrossLanguage(
s Scope,
urn string,
@@ -86,7 +163,9 @@
return mapNodeToPCollection(namedOutputs)
}
-// TryCrossLanguage coordinates the core functions required to execute the cross-language transform
+// TryCrossLanguage coordinates the core functions required to execute the cross-language transform.
+// This is mainly intended for internal use. For the general-use entry point, see
+// beam.CrossLanguage.
func TryCrossLanguage(s Scope, ext *graph.ExternalTransform, ins []*graph.Inbound, outs []*graph.Outbound) (map[string]*graph.Node, error) {
// Adding an edge in the graph corresponding to the ExternalTransform
edge, isBoundedUpdater := graph.NewCrossLanguage(s.real, s.scope, ext, ins, outs)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
index 484fe3b..dfe797e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
@@ -86,7 +86,6 @@
private final SerializableFunction<ElementT, TableRow> toTableRow;
private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
private final Set<String> allowedMetricUrns;
- private @Nullable DatasetService datasetService;
/** Tracks bytes written, exposed as "ByteCount" Counter. */
private Counter byteCounter = SinkMetrics.bytesWritten();
@@ -222,6 +221,15 @@
/** The list of unique ids for each BigQuery table row. */
private transient Map<String, List<String>> uniqueIdsForTableRows;
+ private transient @Nullable DatasetService datasetService;
+
+ private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
+ if (datasetService == null) {
+ datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+ }
+ return datasetService;
+ }
+
/** Prepares a target BigQuery table. */
@StartBundle
public void startBundle() {
@@ -257,10 +265,10 @@
tableRows.entrySet()) {
TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
flushRows(
+ getDatasetService(options),
tableReference,
entry.getValue(),
uniqueIdsForTableRows.get(entry.getKey()),
- options,
failedInserts,
successfulInserts);
}
@@ -272,6 +280,18 @@
}
reportStreamingApiLogging(options);
}
+
+ @Teardown
+ public void onTeardown() {
+ try {
+ if (datasetService != null) {
+ datasetService.close();
+ datasetService = null;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
// The max duration input records are allowed to be buffered in the state, if using ViaStateful.
@@ -325,13 +345,22 @@
// shuffling.
private class InsertBatchedElements
extends DoFn<KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>, Void> {
+ private transient @Nullable DatasetService datasetService;
+
+ private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
+ if (datasetService == null) {
+ datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+ }
+ return datasetService;
+ }
+
@ProcessElement
public void processElement(
@Element KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>> input,
BoundedWindow window,
ProcessContext context,
MultiOutputReceiver out)
- throws InterruptedException {
+ throws InterruptedException, IOException {
List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows = new ArrayList<>();
List<String> uniqueIds = new ArrayList<>();
for (TableRowInfo<ElementT> row : input.getValue()) {
@@ -347,7 +376,13 @@
TableReference tableReference = BigQueryHelpers.parseTableSpec(input.getKey().getKey());
List<ValueInSingleWindow<ErrorT>> failedInserts = Lists.newArrayList();
List<ValueInSingleWindow<TableRow>> successfulInserts = Lists.newArrayList();
- flushRows(tableReference, tableRows, uniqueIds, options, failedInserts, successfulInserts);
+ flushRows(
+ getDatasetService(options),
+ tableReference,
+ tableRows,
+ uniqueIds,
+ failedInserts,
+ successfulInserts);
for (ValueInSingleWindow<ErrorT> row : failedInserts) {
out.get(failedOutputTag).output(row.getValue());
@@ -357,44 +392,43 @@
}
reportStreamingApiLogging(options);
}
- }
- @Teardown
- public void onTeardown() {
- try {
- if (datasetService != null) {
- datasetService.close();
- datasetService = null;
+ @Teardown
+ public void onTeardown() {
+ try {
+ if (datasetService != null) {
+ datasetService.close();
+ datasetService = null;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- } catch (Exception e) {
- throw new RuntimeException(e);
}
}
/** Writes the accumulated rows into BigQuery with streaming API. */
private void flushRows(
+ DatasetService datasetService,
TableReference tableReference,
List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows,
List<String> uniqueIds,
- BigQueryOptions options,
List<ValueInSingleWindow<ErrorT>> failedInserts,
List<ValueInSingleWindow<TableRow>> successfulInserts)
throws InterruptedException {
if (!tableRows.isEmpty()) {
try {
long totalBytes =
- getDatasetService(options)
- .insertAll(
- tableReference,
- tableRows,
- uniqueIds,
- retryPolicy,
- failedInserts,
- errorContainer,
- skipInvalidRows,
- ignoreUnknownValues,
- ignoreInsertIds,
- successfulInserts);
+ datasetService.insertAll(
+ tableReference,
+ tableRows,
+ uniqueIds,
+ retryPolicy,
+ failedInserts,
+ errorContainer,
+ skipInvalidRows,
+ ignoreUnknownValues,
+ ignoreInsertIds,
+ successfulInserts);
byteCounter.inc(totalBytes);
} catch (IOException e) {
throw new RuntimeException(e);
@@ -402,13 +436,6 @@
}
}
- private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
- if (datasetService == null) {
- datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
- }
- return datasetService;
- }
-
private void reportStreamingApiLogging(BigQueryOptions options) {
MetricsContainer processWideContainer = MetricsEnvironment.getProcessWideContainer();
if (processWideContainer instanceof MetricsLogger) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 1d3d894..8b9b705 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -965,49 +965,53 @@
// earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
// For these cases the withoutValidation method can be used to disable the check.
if (getValidate()) {
- if (table != null) {
- checkArgument(table.isAccessible(), "Cannot call validate if table is dynamically set.");
- }
- if (table != null && table.get().getProjectId() != null) {
- // Check for source table presence for early failure notification.
- DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
- BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
- BigQueryHelpers.verifyTablePresence(datasetService, table.get());
- } else if (getQuery() != null) {
- checkArgument(
- getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
- JobService jobService = getBigQueryServices().getJobService(bqOptions);
- try {
- jobService.dryRunQuery(
- bqOptions.getBigQueryProject() == null
- ? bqOptions.getProject()
- : bqOptions.getBigQueryProject(),
- new JobConfigurationQuery()
- .setQuery(getQuery().get())
- .setFlattenResults(getFlattenResults())
- .setUseLegacySql(getUseLegacySql()),
- getQueryLocation());
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
+ try (DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions)) {
+ if (table != null) {
+ checkArgument(
+ table.isAccessible(), "Cannot call validate if table is dynamically set.");
}
+ if (table != null && table.get().getProjectId() != null) {
+ // Check for source table presence for early failure notification.
+ BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
+ BigQueryHelpers.verifyTablePresence(datasetService, table.get());
+ } else if (getQuery() != null) {
+ checkArgument(
+ getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
+ JobService jobService = getBigQueryServices().getJobService(bqOptions);
+ try {
+ jobService.dryRunQuery(
+ bqOptions.getBigQueryProject() == null
+ ? bqOptions.getProject()
+ : bqOptions.getBigQueryProject(),
+ new JobConfigurationQuery()
+ .setQuery(getQuery().get())
+ .setFlattenResults(getFlattenResults())
+ .setUseLegacySql(getUseLegacySql()),
+ getQueryLocation());
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
+ }
- DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
- // If the user provided a temp dataset, check if the dataset exists before launching the
- // query
- if (getQueryTempDataset() != null) {
- // The temp table is only used for dataset and project id validation, not for table name
- // validation
- TableReference tempTable =
- new TableReference()
- .setProjectId(
- bqOptions.getBigQueryProject() == null
- ? bqOptions.getProject()
- : bqOptions.getBigQueryProject())
- .setDatasetId(getQueryTempDataset())
- .setTableId("dummy table");
- BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+ // If the user provided a temp dataset, check if the dataset exists before launching the
+ // query
+ if (getQueryTempDataset() != null) {
+ // The temp table is only used for dataset and project id validation, not for table
+ // name
+ // validation
+ TableReference tempTable =
+ new TableReference()
+ .setProjectId(
+ bqOptions.getBigQueryProject() == null
+ ? bqOptions.getProject()
+ : bqOptions.getBigQueryProject())
+ .setDatasetId(getQueryTempDataset())
+ .setTableId("dummy table");
+ BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+ }
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
}
@@ -1401,15 +1405,17 @@
options.getJobName(), jobUuid, JobType.QUERY),
queryTempDataset);
- DatasetService datasetService = getBigQueryServices().getDatasetService(options);
- LOG.info("Deleting temporary table with query results {}", tempTable);
- datasetService.deleteTable(tempTable);
- // Delete dataset only if it was created by Beam
- boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
- if (datasetCreatedByBeam) {
- LOG.info(
- "Deleting temporary dataset with query results {}", tempTable.getDatasetId());
- datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId());
+ try (DatasetService datasetService =
+ getBigQueryServices().getDatasetService(options)) {
+ LOG.info("Deleting temporary table with query results {}", tempTable);
+ datasetService.deleteTable(tempTable);
+ // Delete dataset only if it was created by Beam
+ boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
+ if (datasetCreatedByBeam) {
+ LOG.info(
+ "Deleting temporary dataset with query results {}", tempTable.getDatasetId());
+ datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId());
+ }
}
}
};
@@ -2484,17 +2490,20 @@
// The user specified a table.
if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) {
TableReference table = getTableWithDefaultProject(options).get();
- DatasetService datasetService = getBigQueryServices().getDatasetService(options);
- // Check for destination table presence and emptiness for early failure notification.
- // Note that a presence check can fail when the table or dataset is created by an earlier
- // stage of the pipeline. For these cases the #withoutValidation method can be used to
- // disable the check.
- BigQueryHelpers.verifyDatasetPresence(datasetService, table);
- if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
- BigQueryHelpers.verifyTablePresence(datasetService, table);
- }
- if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
- BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
+ try (DatasetService datasetService = getBigQueryServices().getDatasetService(options)) {
+ // Check for destination table presence and emptiness for early failure notification.
+ // Note that a presence check can fail when the table or dataset is created by an earlier
+ // stage of the pipeline. For these cases the #withoutValidation method can be used to
+ // disable the check.
+ BigQueryHelpers.verifyDatasetPresence(datasetService, table);
+ if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+ BigQueryHelpers.verifyTablePresence(datasetService, table);
+ }
+ if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+ BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index aeee3d2..eb9f8c4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -38,10 +38,14 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.values.KV;
@@ -130,12 +134,40 @@
String tableNameSr =
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ ServiceCallMetric serviceCallMetric =
+ new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
ReadRowsRequest.Builder requestB =
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
if (source.getRowFilter() != null) {
requestB.setFilter(source.getRowFilter());
}
- results = session.getDataClient().readRows(requestB.build());
+ try {
+ results = session.getDataClient().readRows(requestB.build());
+ serviceCallMetric.call("ok");
+ } catch (StatusRuntimeException e) {
+ serviceCallMetric.call(e.getStatus().getCode().value());
+ throw e;
+ }
return advance();
}
@@ -182,10 +214,12 @@
static class BigtableWriterImpl implements Writer {
private BigtableSession session;
private BulkMutation bulkMutation;
+ private BigtableTableName tableName;
BigtableWriterImpl(BigtableSession session, BigtableTableName tableName) {
this.session = session;
bulkMutation = session.createBulkMutation(tableName);
+ this.tableName = tableName;
}
@Override
@@ -231,6 +265,28 @@
.addAllMutations(record.getValue())
.build();
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.MutateRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ tableName.getTableId()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ tableName.getTableId()));
+ ServiceCallMetric serviceCallMetric =
+ new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
Futures.addCallback(
new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
@@ -238,10 +294,17 @@
@Override
public void onSuccess(MutateRowResponse mutateRowResponse) {
result.complete(mutateRowResponse);
+ serviceCallMetric.call("ok");
}
@Override
public void onFailure(Throwable throwable) {
+ if (throwable instanceof StatusRuntimeException) {
+ serviceCallMetric.call(
+ ((StatusRuntimeException) throwable).getStatus().getCode().value());
+ } else {
+ serviceCallMetric.call("unknown");
+ }
result.completeExceptionally(throwable);
}
},
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index 69be079..b983cc3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
@@ -42,9 +43,15 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.HashMap;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -61,8 +68,12 @@
@RunWith(JUnit4.class)
public class BigtableServiceImplTest {
+ private static final String PROJECT_ID = "project";
+ private static final String INSTANCE_ID = "instance";
+ private static final String TABLE_ID = "table";
+
private static final BigtableTableName TABLE_NAME =
- new BigtableInstanceName("project", "instance").toTableName("table");
+ new BigtableInstanceName(PROJECT_ID, INSTANCE_ID).toTableName(TABLE_ID);
@Mock private BigtableSession mockSession;
@@ -76,10 +87,13 @@
public void setup() {
MockitoAnnotations.initMocks(this);
BigtableOptions options =
- new BigtableOptions.Builder().setProjectId("project").setInstanceId("instance").build();
+ new BigtableOptions.Builder().setProjectId(PROJECT_ID).setInstanceId(INSTANCE_ID).build();
when(mockSession.getOptions()).thenReturn(options);
when(mockSession.createBulkMutation(eq(TABLE_NAME))).thenReturn(mockBulkMutation);
when(mockSession.getDataClient()).thenReturn(mockBigtableDataClient);
+ // Setup the ProcessWideContainer for testing metrics are set.
+ MetricsContainerImpl container = new MetricsContainerImpl(null);
+ MetricsEnvironment.setProcessWideContainer(container);
}
/**
@@ -94,7 +108,7 @@
ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
- when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of("table_name"));
+ when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
@SuppressWarnings("unchecked")
ResultScanner<Row> mockResultScanner = Mockito.mock(ResultScanner.class);
Row expectedRow = Row.newBuilder().setKey(ByteString.copyFromUtf8("a")).build();
@@ -109,6 +123,7 @@
underTest.close();
verify(mockResultScanner, times(1)).close();
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}
/**
@@ -140,4 +155,27 @@
underTest.close();
verify(mockBulkMutation, times(1)).flush();
}
+
+ private void verifyMetricWasSet(String method, String status, long count) {
+ // Verify the metric as reported.
+ HashMap<String, String> labels = new HashMap<>();
+ labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ labels.put(MonitoringInfoConstants.Labels.METHOD, method);
+ labels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(PROJECT_ID, INSTANCE_ID, TABLE_ID));
+ labels.put(MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, PROJECT_ID);
+ labels.put(MonitoringInfoConstants.Labels.INSTANCE_ID, INSTANCE_ID);
+ labels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(PROJECT_ID, INSTANCE_ID, TABLE_ID));
+ labels.put(MonitoringInfoConstants.Labels.STATUS, status);
+
+ MonitoringInfoMetricName name =
+ MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, labels);
+ MetricsContainerImpl container =
+ (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer();
+ assertEquals(count, (long) container.getCounter(name).getCumulative());
+ }
}
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 886e56e..9ed0b25 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -208,8 +208,6 @@
def load_pcoder(self, *labels):
saved_pcoder = self._saved_pcoders.get(self._path(*labels), None)
- # TODO(BEAM-12506): Get rid of the SafeFastPrimitivesCoder for
- # WindowedValueHolder.
if saved_pcoder is None or isinstance(saved_pcoder,
coders.FastPrimitivesCoder):
return self._default_pcoder
diff --git a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index 054c9a6..fc8a8aa 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -390,8 +390,6 @@
def load_pcoder(self, *labels):
saved_pcoder = self._saved_pcoders.get(
os.path.join(self._cache_dir, *labels), None)
- # TODO(BEAM-12506): Get rid of the SafeFastPrimitivesCoder for
- # WindowedValueHolder.
if saved_pcoder is None or isinstance(saved_pcoder,
coders.FastPrimitivesCoder):
return self._default_pcoder
diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
index cee3d34..1dc42e0 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
@@ -227,7 +227,11 @@
endpoint=test_stream_service.endpoint)
sql_source = {}
for tag, output in output_pcolls.items():
- sql_source[tag_to_name[tag]] = output
+ name = tag_to_name[tag]
+ # Must mark the element_type to avoid introducing pickled Python coder
+ # to the Java expansion service.
+ output.element_type = name_to_pcoll[name].element_type
+ sql_source[name] = output
return sql_source