Merge pull request #14572: [BEAM-12118] Fix racy precondition exception in QueueingBeamFnDataClient
diff --git a/CHANGES.md b/CHANGES.md
index a94662f..489a9c6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -38,6 +38,8 @@
* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Python Row objects are now sensitive to field order. So `Row(x=3, y=4)` is no
longer considered equal to `Row(y=4, x=3)` (BEAM-11929).
+* Kafka Beam SQL tables now ascribe meaning to the LOCATION field; previously
+ it was ignored if provided.
## Deprecations
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 0947ce7..1fa4f5e 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -520,7 +520,7 @@
google_auth_library_credentials : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version
google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http", // google_cloud_platform_libraries_bom sets version
google_cloud_bigquery : "com.google.cloud:google-cloud-bigquery", // google_cloud_platform_libraries_bom sets version
- google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage:1.12.0",
+ google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage:1.18.1",
google_cloud_bigtable_client_core : "com.google.cloud.bigtable:bigtable-client-core:1.16.0",
google_cloud_bigtable_emulator : "com.google.cloud:google-cloud-bigtable-emulator:0.125.2",
google_cloud_core : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java
index f944572..9f32382 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java
@@ -55,13 +55,39 @@
value.clear();
}
- /** Increment the distribution by the given amount. */
+ /** Increment the corresponding histogram bucket count for the value by 1. */
@Override
public void update(double value) {
this.value.record(value);
dirty.afterModification();
}
+ /**
+ * Increment all of the bucket counts in this histogram, by the bucket counts specified in other.
+ */
+ public void update(HistogramCell other) {
+ this.value.update(other.value);
+ dirty.afterModification();
+ }
+
+ // TODO(BEAM-12103): Update this function to allow incrementing the infinite buckets as well.
+ // and remove the incTopBucketCount and incBotBucketCount methods.
+ // Using 0 and length -1 as the bucketIndex.
+ public void incBucketCount(int bucketIndex, long count) {
+ this.value.incBucketCount(bucketIndex, count);
+ dirty.afterModification();
+ }
+
+ public void incTopBucketCount(long count) {
+ this.value.incTopBucketCount(count);
+ dirty.afterModification();
+ }
+
+ public void incBottomBucketCount(long count) {
+ this.value.incBottomBucketCount(count);
+ dirty.afterModification();
+ }
+
@Override
public DirtyState getDirty() {
return dirty;
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 40ea1b5..94ddc99 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -43,10 +43,10 @@
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsLogger;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -69,10 +69,10 @@
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
-public class MetricsContainerImpl implements Serializable, MetricsContainer, MetricsLogger {
+public class MetricsContainerImpl implements Serializable, MetricsContainer {
private static final Logger LOG = LoggerFactory.getLogger(MetricsContainerImpl.class);
- private final @Nullable String stepName;
+ protected final @Nullable String stepName;
private MetricsMap<MetricName, CounterCell> counters = new MetricsMap<>(CounterCell::new);
@@ -95,7 +95,6 @@
private Map<MetricKey, Optional<String>> shortIdsByMetricKey = new ConcurrentHashMap<>();
/** Reset the metrics. */
- @Override
public void reset() {
reset(counters);
reset(distributions);
@@ -376,6 +375,7 @@
updateCounters(counters, other.counters);
updateDistributions(distributions, other.distributions);
updateGauges(gauges, other.gauges);
+ updateHistograms(histograms, other.histograms);
}
private void updateForSumInt64Type(MonitoringInfo monitoringInfo) {
@@ -446,6 +446,16 @@
}
}
+ private void updateHistograms(
+ MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> current,
+ MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> updates) {
+ for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> histogram :
+ updates.entries()) {
+ HistogramCell h = histogram.getValue();
+ current.get(histogram.getKey()).update(h);
+ }
+ }
+
@Override
public boolean equals(@Nullable Object object) {
if (object instanceof MetricsContainerImpl) {
@@ -455,7 +465,6 @@
&& Objects.equals(distributions, metricsContainerImpl.distributions)
&& Objects.equals(gauges, metricsContainerImpl.gauges);
}
-
return false;
}
@@ -466,21 +475,25 @@
/**
* Match a MetricName with a given metric filter. If the metric filter is null, the method always
- * returns true.
+ * returns true. TODO(BEAM-10986) Consider making this use the MetricNameFilter and related
+ * classes.
*/
- private boolean matchMetricName(MetricName metricName, @Nullable Set<MetricName> metricFilter) {
- if (metricFilter == null) {
+ @VisibleForTesting
+ static boolean matchMetric(MetricName metricName, @Nullable Set<String> allowedMetricUrns) {
+ if (allowedMetricUrns == null) {
return true;
- } else {
- return metricFilter.contains(metricName);
}
+ if (metricName instanceof MonitoringInfoMetricName) {
+ return allowedMetricUrns.contains(((MonitoringInfoMetricName) metricName).getUrn());
+ }
+ return false;
}
+
/** Return a string representing the cumulative values of all metrics in this container. */
- @Override
- public String getCumulativeString(@Nullable Set<MetricName> metricFilter) {
+ public String getCumulativeString(@Nullable Set<String> allowedMetricUrns) {
StringBuilder message = new StringBuilder();
for (Map.Entry<MetricName, CounterCell> cell : counters.entries()) {
- if (!matchMetricName(cell.getKey(), metricFilter)) {
+ if (!matchMetric(cell.getKey(), allowedMetricUrns)) {
continue;
}
message.append(cell.getKey().toString());
@@ -489,7 +502,7 @@
message.append("\n");
}
for (Map.Entry<MetricName, DistributionCell> cell : distributions.entries()) {
- if (!matchMetricName(cell.getKey(), metricFilter)) {
+ if (!matchMetric(cell.getKey(), allowedMetricUrns)) {
continue;
}
message.append(cell.getKey().toString());
@@ -502,7 +515,7 @@
message.append("\n");
}
for (Map.Entry<MetricName, GaugeCell> cell : gauges.entries()) {
- if (!matchMetricName(cell.getKey(), metricFilter)) {
+ if (!matchMetric(cell.getKey(), allowedMetricUrns)) {
continue;
}
message.append(cell.getKey().toString());
@@ -513,7 +526,7 @@
}
for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
histograms.entries()) {
- if (!matchMetricName(cell.getKey().getKey(), metricFilter)) {
+ if (!matchMetric(cell.getKey().getKey(), allowedMetricUrns)) {
continue;
}
message.append(cell.getKey().getKey().toString());
@@ -532,8 +545,43 @@
return message.toString();
}
- @Override
- public Logger getMetricLogger() {
- return LOG;
+ /**
+ * Returns a MetricContainer with the delta values between two MetricsContainers. The purpose of
+ * this function is to print the changes made to the metrics within a window of time. The
+ * difference between the counter and histogram bucket counters are calculated between curr and
+ * prev. The most recent value are used for gauges. Distribution metrics are dropped (As there is
+ * meaningful way to calculate the delta). Returns curr if prev is null.
+ */
+ public static MetricsContainerImpl deltaContainer(
+ @Nullable MetricsContainerImpl prev, MetricsContainerImpl curr) {
+ if (prev == null) {
+ return curr;
+ }
+ MetricsContainerImpl deltaContainer = new MetricsContainerImpl(curr.stepName);
+ for (Map.Entry<MetricName, CounterCell> cell : curr.counters.entries()) {
+ Long prevValue = prev.counters.get(cell.getKey()).getCumulative();
+ Long currValue = cell.getValue().getCumulative();
+ deltaContainer.counters.get(cell.getKey()).inc(currValue - prevValue);
+ }
+ for (Map.Entry<MetricName, GaugeCell> cell : curr.gauges.entries()) {
+ // Simply take the most recent value for gauge, no need to count deltas.
+ deltaContainer.gauges.get(cell.getKey()).update(cell.getValue().getCumulative());
+ }
+ for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
+ curr.histograms.entries()) {
+ HistogramData.BucketType bt = cell.getKey().getValue();
+ HistogramData prevValue = prev.histograms.get(cell.getKey()).getCumulative();
+ HistogramData currValue = cell.getValue().getCumulative();
+ HistogramCell deltaValueCell = deltaContainer.histograms.get(cell.getKey());
+ deltaValueCell.incBottomBucketCount(
+ currValue.getBottomBucketCount() - prevValue.getBottomBucketCount());
+ for (int i = 0; i < bt.getNumBuckets(); i++) {
+ Long bucketCountDelta = currValue.getCount(i) - prevValue.getCount(i);
+ deltaValueCell.incBucketCount(i, bucketCountDelta);
+ }
+ deltaValueCell.incTopBucketCount(
+ currValue.getTopBucketCount() - prevValue.getTopBucketCount());
+ }
+ return deltaContainer;
}
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsLogger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsLogger.java
new file mode 100644
index 0000000..c367382
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsLogger.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Experimental(Kind.METRICS)
+public class MetricsLogger extends MetricsContainerImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricsLogger.class);
+
+ Lock reportingLocK = new ReentrantLock();
+ AtomicLong lastReportedMillis = new AtomicLong(System.currentTimeMillis());
+ @Nullable MetricsContainerImpl lastMetricsSnapshot = null;
+
+ public MetricsLogger(@Nullable String stepName) {
+ super(stepName);
+ }
+
+ public String generateLogMessage(
+ String header, Set<String> allowedMetricUrns, long lastReported) {
+ MetricsContainerImpl nextMetricsSnapshot = new MetricsContainerImpl(this.stepName);
+ nextMetricsSnapshot.update(this);
+ MetricsContainerImpl deltaContainer =
+ MetricsContainerImpl.deltaContainer(lastMetricsSnapshot, nextMetricsSnapshot);
+
+ StringBuilder logMessage = new StringBuilder();
+ logMessage.append(header);
+ logMessage.append(deltaContainer.getCumulativeString(allowedMetricUrns));
+ logMessage.append(String.format("(last reported at %s)%n", new Date(lastReported)));
+
+ lastMetricsSnapshot = nextMetricsSnapshot;
+ return logMessage.toString();
+ }
+
+ public void tryLoggingMetrics(
+ String header, Set<String> allowedMetricUrns, long minimumLoggingFrequencyMillis) {
+
+ if (reportingLocK.tryLock()) {
+ try {
+ long currentTimeMillis = System.currentTimeMillis();
+ long lastReported = lastReportedMillis.get();
+ if (currentTimeMillis - lastReported > minimumLoggingFrequencyMillis) {
+ LOG.info(generateLogMessage(header, allowedMetricUrns, lastReported));
+ lastReportedMillis.set(currentTimeMillis);
+ }
+ } finally {
+ reportingLocK.unlock();
+ }
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public boolean equals(@Nullable Object object) {
+ if (object instanceof MetricsLogger) {
+ return super.equals(object);
+ }
+ return false;
+ }
+}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index fa9ebc4..5161bb4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -71,9 +71,20 @@
public static final String NAME = "NAME";
public static final String SERVICE = "SERVICE";
public static final String METHOD = "METHOD";
+ public static final String RESOURCE = "RESOURCE";
public static final String STATUS = "STATUS";
+ public static final String BIGQUERY_PROJECT_ID = "BIGQUERY_PROJECT_ID";
+ public static final String BIGQUERY_DATASET = "BIGQUERY_DATASET";
+ public static final String BIGQUERY_TABLE = "BIGQUERY_TABLE";
+ public static final String BIGQUERY_VIEW = "BIGQUERY_VIEW";
+ public static final String BIGQUERY_QUERY_NAME = "BIGQUERY_QUERY_NAME";
static {
+ // Note: One benefit of defining these strings above, instead of pulling them in from
+ // the proto files, is to ensure that this code will crash if the strings in the proto
+ // file are changed, without modifying this file.
+ // Though, one should not change those strings either, as Runner Harnesss running old versions
+ // would not be able to understand the new label names./
checkArgument(PTRANSFORM.equals(extractLabel(MonitoringInfoLabels.TRANSFORM)));
checkArgument(PCOLLECTION.equals(extractLabel(MonitoringInfoLabels.PCOLLECTION)));
checkArgument(
@@ -84,7 +95,15 @@
checkArgument(NAME.equals(extractLabel(MonitoringInfoLabels.NAME)));
checkArgument(SERVICE.equals(extractLabel(MonitoringInfoLabels.SERVICE)));
checkArgument(METHOD.equals(extractLabel(MonitoringInfoLabels.METHOD)));
+ checkArgument(RESOURCE.equals(extractLabel(MonitoringInfoLabels.RESOURCE)));
checkArgument(STATUS.equals(extractLabel(MonitoringInfoLabels.STATUS)));
+ checkArgument(
+ BIGQUERY_PROJECT_ID.equals(extractLabel(MonitoringInfoLabels.BIGQUERY_PROJECT_ID)));
+ checkArgument(BIGQUERY_DATASET.equals(extractLabel(MonitoringInfoLabels.BIGQUERY_DATASET)));
+ checkArgument(BIGQUERY_TABLE.equals(extractLabel(MonitoringInfoLabels.BIGQUERY_TABLE)));
+ checkArgument(BIGQUERY_VIEW.equals(extractLabel(MonitoringInfoLabels.BIGQUERY_VIEW)));
+ checkArgument(
+ BIGQUERY_QUERY_NAME.equals(extractLabel(MonitoringInfoLabels.BIGQUERY_QUERY_NAME)));
}
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index f04fcea..837432f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -23,11 +23,18 @@
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.util.HistogramData;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -269,6 +276,61 @@
}
@Test
+ public void testDeltaCounters() {
+ MetricName cName = MetricName.named("namespace", "counter");
+ MetricName gName = MetricName.named("namespace", "gauge");
+ HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5);
+ MetricName hName = MetricName.named("namespace", "histogram");
+
+ MetricsContainerImpl prevContainer = new MetricsContainerImpl(null);
+ prevContainer.getCounter(cName).inc(2L);
+ prevContainer.getGauge(gName).set(4L);
+ // Set buckets counts to: [1,1,1,0,0,0,1]
+ prevContainer.getHistogram(hName, bucketType).update(-1);
+ prevContainer.getHistogram(hName, bucketType).update(1);
+ prevContainer.getHistogram(hName, bucketType).update(3);
+ prevContainer.getHistogram(hName, bucketType).update(20);
+
+ MetricsContainerImpl nextContainer = new MetricsContainerImpl(null);
+ nextContainer.getCounter(cName).inc(9L);
+ nextContainer.getGauge(gName).set(8L);
+ // Set buckets counts to: [2,4,5,0,0,0,3]
+ nextContainer.getHistogram(hName, bucketType).update(-1);
+ nextContainer.getHistogram(hName, bucketType).update(-1);
+ for (int i = 0; i < 4; i++) {
+ nextContainer.getHistogram(hName, bucketType).update(1);
+ }
+ for (int i = 0; i < 5; i++) {
+ nextContainer.getHistogram(hName, bucketType).update(3);
+ }
+ nextContainer.getHistogram(hName, bucketType).update(20);
+ nextContainer.getHistogram(hName, bucketType).update(20);
+ nextContainer.getHistogram(hName, bucketType).update(20);
+
+ MetricsContainerImpl deltaContainer =
+ MetricsContainerImpl.deltaContainer(prevContainer, nextContainer);
+ // Expect counter value: 7 = 9 - 2
+ long cValue = deltaContainer.getCounter(cName).getCumulative();
+ assertEquals(7L, cValue);
+
+ // Expect gauge value: 8.
+ GaugeData gValue = deltaContainer.getGauge(gName).getCumulative();
+ assertEquals(8L, gValue.value());
+
+ // Expect bucket counts: [1,3,4,0,0,0,2]
+ assertEquals(
+ 1, deltaContainer.getHistogram(hName, bucketType).getCumulative().getBottomBucketCount());
+ long[] expectedBucketCounts = (new long[] {3, 4, 0, 0, 0});
+ for (int i = 0; i < expectedBucketCounts.length; i++) {
+ assertEquals(
+ expectedBucketCounts[i],
+ deltaContainer.getHistogram(hName, bucketType).getCumulative().getCount(i));
+ }
+ assertEquals(
+ 2, deltaContainer.getHistogram(hName, bucketType).getCumulative().getTopBucketCount());
+ }
+
+ @Test
public void testNotEquals() {
MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("stepName");
@@ -293,4 +355,37 @@
Assert.assertNotEquals(metricsContainerImpl, differentGauges);
Assert.assertNotEquals(metricsContainerImpl.hashCode(), differentGauges.hashCode());
}
+
+ @Test
+ public void testMatchMetric() {
+ String urn = MonitoringInfoConstants.Urns.API_REQUEST_COUNT;
+ Map<String, String> labels = new HashMap<String, String>();
+ labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "MyPtransform");
+ labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
+ labels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
+ labels.put(MonitoringInfoConstants.Labels.RESOURCE, "Resource");
+ labels.put(MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, "MyProject");
+ labels.put(MonitoringInfoConstants.Labels.BIGQUERY_DATASET, "MyDataset");
+ labels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, "MyTable");
+
+ // MonitoringInfoMetricName will copy labels. So its safe to reuse this reference.
+ labels.put(MonitoringInfoConstants.Labels.STATUS, "ok");
+ MonitoringInfoMetricName okName = MonitoringInfoMetricName.named(urn, labels);
+ labels.put(MonitoringInfoConstants.Labels.STATUS, "not_found");
+ MonitoringInfoMetricName notFoundName = MonitoringInfoMetricName.named(urn, labels);
+
+ Set<String> allowedMetricUrns = new HashSet<String>();
+ allowedMetricUrns.add(MonitoringInfoConstants.Urns.API_REQUEST_COUNT);
+ assertTrue(MetricsContainerImpl.matchMetric(okName, allowedMetricUrns));
+ assertTrue(MetricsContainerImpl.matchMetric(notFoundName, allowedMetricUrns));
+
+ MetricName userMetricName = MetricName.named("namespace", "name");
+ assertFalse(MetricsContainerImpl.matchMetric(userMetricName, allowedMetricUrns));
+
+ MetricName elementCountName =
+ MonitoringInfoMetricName.named(
+ MonitoringInfoConstants.Urns.ELEMENT_COUNT,
+ Collections.singletonMap("name", "counter"));
+ assertFalse(MetricsContainerImpl.matchMetric(elementCountName, allowedMetricUrns));
+ }
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsLoggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsLoggerTest.java
new file mode 100644
index 0000000..a37a0c7
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsLoggerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.util.HistogramData;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+public class MetricsLoggerTest {
+
+ @Test
+ public void testGeneratedLogMessageShowsDeltas() {
+ MetricName cName =
+ MonitoringInfoMetricName.named(
+ MonitoringInfoConstants.Urns.ELEMENT_COUNT,
+ Collections.singletonMap("name", "counter"));
+ HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5);
+ MetricName hName =
+ MonitoringInfoMetricName.named(
+ MonitoringInfoConstants.Urns.ELEMENT_COUNT,
+ Collections.singletonMap("name", "histogram"));
+
+ MetricsLogger logger = new MetricsLogger(null);
+ logger.getCounter(cName).inc(2L);
+ // Set buckets counts to: [0,1,1,,0,0,...]
+ logger.getHistogram(hName, bucketType).update(1);
+ logger.getHistogram(hName, bucketType).update(3);
+
+ Set<String> allowedMetricUrns = new HashSet<String>();
+ allowedMetricUrns.add(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
+ String msg = logger.generateLogMessage("My Headder", allowedMetricUrns, 0);
+ assertThat(msg, CoreMatchers.containsString("beam:metric:element_count:v1 {name=counter} = 2"));
+ assertThat(
+ msg,
+ CoreMatchers.containsString(
+ "{name=histogram} = {count: 2, p50: 2.000000, p90: 3.600000, p99: 3.960000}"));
+
+ logger.getCounter(cName).inc(3L);
+ // Set buckets counts to: [0,5,6,0,0,0]
+ // Which means a delta of: [0,4,5,0,0,0]
+ for (int i = 0; i < 4; i++) {
+ logger.getHistogram(hName, bucketType).update(1);
+ }
+ for (int i = 0; i < 5; i++) {
+ logger.getHistogram(hName, bucketType).update(3);
+ }
+ msg = logger.generateLogMessage("My Header: ", allowedMetricUrns, 0);
+ assertThat(msg, CoreMatchers.containsString("beam:metric:element_count:v1 {name=counter} = 3"));
+ assertThat(
+ msg,
+ CoreMatchers.containsString(
+ "{name=histogram} = {count: 9, p50: 2.200000, p90: 3.640000, p99: 3.964000}"));
+
+ logger.getCounter(cName).inc(4L);
+ // Set buckets counts to: [0,8,10,0,0,0]
+ // Which means a delta of: [0,3,4,0,0,0]
+ for (int i = 0; i < 3; i++) {
+ logger.getHistogram(hName, bucketType).update(1);
+ }
+ for (int i = 0; i < 4; i++) {
+ logger.getHistogram(hName, bucketType).update(3);
+ }
+ msg = logger.generateLogMessage("My Header: ", allowedMetricUrns, 0);
+ assertThat(
+ msg,
+ CoreMatchers.containsString(
+ "{name=histogram} = {count: 7, p50: 2.250000, p90: 3.650000, p99: 3.965000}"));
+ }
+}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 5a58612..bb5eccd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -68,7 +68,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
-import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.internal.CustomSources;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
@@ -292,7 +292,9 @@
StreamingDataflowWorker worker =
StreamingDataflowWorker.fromDataflowWorkerHarnessOptions(options, sdkHarnessRegistry);
- MetricsEnvironment.setProcessWideContainer(new MetricsContainerImpl(null));
+ // Use the MetricsLogger container which is used by BigQueryIO to periodically log process-wide
+ // metrics.
+ MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
JvmInitializers.runBeforeProcessing(options);
worker.startStatusPages();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLogger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLogger.java
deleted file mode 100644
index f5af592..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLogger.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import java.io.Serializable;
-import java.util.Date;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.slf4j.Logger;
-
-@Experimental(Kind.METRICS)
-public interface MetricsLogger extends Serializable {
- Lock REPORTING_LOCK = new ReentrantLock();
- AtomicLong LAST_REPORTED_MILLIS = new AtomicLong(System.currentTimeMillis());
-
- default void tryLoggingMetrics(
- String header,
- Set<MetricName> metricFilter,
- long minimumLoggingFrequencyMillis,
- boolean resetMetrics) {
- if (REPORTING_LOCK.tryLock()) {
- try {
- long currentTimeMillis = System.currentTimeMillis();
- long lastReported = LAST_REPORTED_MILLIS.get();
- if (currentTimeMillis - lastReported > minimumLoggingFrequencyMillis) {
- StringBuilder logMessage = new StringBuilder();
- logMessage.append(header);
- logMessage.append(getCumulativeString(metricFilter));
- if (resetMetrics) {
- reset();
- logMessage.append(String.format("(last reported at %s)%n", new Date(lastReported)));
- }
- getMetricLogger().info(logMessage.toString());
- LAST_REPORTED_MILLIS.set(currentTimeMillis);
- }
- } finally {
- REPORTING_LOCK.unlock();
- }
- }
- }
-
- Logger getMetricLogger();
-
- String getCumulativeString(Set<MetricName> metricFilter);
-
- void reset();
-}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
index 5bade78..28d4a38 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
@@ -40,8 +40,10 @@
private final BucketType bucketType;
+ // TODO(BEAM-12103): Update this function to remove the numTopRecords and numBottomRecords
+ // and include those counters in the buckets array.
private long[] buckets;
- private long numOfRecords;
+ private long numBoundedBucketRecords;
private long numTopRecords;
private long numBottomRecords;
@@ -53,13 +55,18 @@
public HistogramData(BucketType bucketType) {
this.bucketType = bucketType;
this.buckets = new long[bucketType.getNumBuckets()];
- this.numOfRecords = 0;
+ this.numBoundedBucketRecords = 0;
this.numTopRecords = 0;
this.numBottomRecords = 0;
}
+ public BucketType getBucketType() {
+ return this.bucketType;
+ }
+
/**
- * Create a histogram with linear buckets.
+ * TODO(BEAM-12103): Update this function to define numBuckets total, including the infinite
+ * buckets. Create a histogram with linear buckets.
*
* @param start Lower bound of a starting bucket.
* @param width Bucket width. Smaller width implies a better resolution for percentile estimation.
@@ -77,9 +84,41 @@
}
}
+ public synchronized void update(HistogramData other) {
+ synchronized (other) {
+ if (!this.bucketType.equals(other.bucketType)
+ || this.buckets.length != other.buckets.length) {
+ LOG.warn("Failed to update HistogramData from another with a different buckets");
+ return;
+ }
+
+ incTopBucketCount(other.numTopRecords);
+ incBottomBucketCount(other.numBottomRecords);
+ for (int i = 0; i < other.buckets.length; i++) {
+ incBucketCount(i, other.buckets[i]);
+ }
+ }
+ }
+
+ // TODO(BEAM-12103): Update this function to allow incrementing the infinite buckets as well.
+ // and remove the incTopBucketCount and incBotBucketCount methods.
+ // Using 0 and length -1 as the bucketIndex.
+ public synchronized void incBucketCount(int bucketIndex, long count) {
+ this.buckets[bucketIndex] += count;
+ this.numBoundedBucketRecords += count;
+ }
+
+ public synchronized void incTopBucketCount(long count) {
+ this.numTopRecords += count;
+ }
+
+ public synchronized void incBottomBucketCount(long count) {
+ this.numBottomRecords += count;
+ }
+
public synchronized void clear() {
this.buckets = new long[bucketType.getNumBuckets()];
- this.numOfRecords = 0;
+ this.numBoundedBucketRecords = 0;
this.numTopRecords = 0;
this.numBottomRecords = 0;
}
@@ -88,19 +127,17 @@
double rangeTo = bucketType.getRangeTo();
double rangeFrom = bucketType.getRangeFrom();
if (value >= rangeTo) {
- LOG.warn("record is out of upper bound {}: {}", rangeTo, value);
numTopRecords++;
} else if (value < rangeFrom) {
- LOG.warn("record is out of lower bound {}: {}", rangeFrom, value);
numBottomRecords++;
} else {
buckets[bucketType.getBucketIndex(value)]++;
- numOfRecords++;
+ numBoundedBucketRecords++;
}
}
public synchronized long getTotalCount() {
- return numOfRecords + numTopRecords + numBottomRecords;
+ return numBoundedBucketRecords + numTopRecords + numBottomRecords;
}
public synchronized String getPercentileString(String elemType, String unit) {
@@ -117,7 +154,8 @@
}
/**
- * Get the bucket count for the given bucketIndex.
+ * TODO(BEAM-12103): Update this function to allow indexing the -INF and INF bucket (using 0 and
+ * length -1) Get the bucket count for the given bucketIndex.
*
* <p>This method does not guarantee the atomicity when sequentially accessing the multiple
* buckets i.e. other threads may alter the value between consecutive invocations. For summing the
@@ -130,6 +168,14 @@
return buckets[bucketIndex];
}
+ public synchronized long getTopBucketCount() {
+ return numTopRecords;
+ }
+
+ public synchronized long getBottomBucketCount() {
+ return numBottomRecords;
+ }
+
public double p99() {
return getLinearInterpolation(0.99);
}
@@ -232,5 +278,7 @@
public double getRangeTo() {
return getStart() + getNumBuckets() * getWidth();
}
+
+ // Note: equals() and hashCode() are implemented by the AutoValue.
}
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index f51938b..b113dac 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -39,12 +39,14 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Writer;
+import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -66,6 +68,9 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ToString;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -851,4 +856,121 @@
p.run();
}
}
+
+ /** Tests for TextSource class. */
+ @RunWith(JUnit4.class)
+ public static class TextSourceTest {
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testRemoveUtf8BOM() throws Exception {
+ Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1");
+ Path p2 =
+ createTestFile(
+ "test_txt_utf8_no_bom",
+ Charset.forName("UTF-8"),
+ "1,p2-Japanese:テスト",
+ "2,p2-Japanese:テスト");
+ Path p3 =
+ createTestFile(
+ "test_txt_utf8_bom",
+ Charset.forName("UTF-8"),
+ "\uFEFF1,p3-テストBOM",
+ "\uFEFF2,p3-テストBOM");
+ PCollection<String> contents =
+ pipeline
+ .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString()))
+ .setCoder(StringUtf8Coder.of())
+ // PCollection<String>
+ .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform());
+ // PCollection<KV<String, String>>: tableName, line
+
+ // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed.
+ PAssert.that(contents)
+ .containsInAnyOrder(
+ "1,p1",
+ "2,p1",
+ "1,p2-Japanese:テスト",
+ "2,p2-Japanese:テスト",
+ "1,p3-テストBOM",
+ "\uFEFF2,p3-テストBOM");
+
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testPreserveNonBOMBytes() throws Exception {
+ // Contains \uFEFE, not UTF BOM.
+ Path p1 =
+ createTestFile(
+ "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
+ PCollection<String> contents =
+ pipeline
+ .apply("Create", Create.of(p1.toString()))
+ .setCoder(StringUtf8Coder.of())
+ // PCollection<String>
+ .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform());
+
+ PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
+
+ pipeline.run();
+ }
+
+ private static class FileReadDoFn extends DoFn<FileIO.ReadableFile, String> {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ FileIO.ReadableFile file = c.element();
+ ValueProvider<String> filenameProvider =
+ ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename());
+ // Create a TextSource, passing null as the delimiter to use the default
+ // delimiters ('\n', '\r', or '\r\n').
+ TextSource textSource = new TextSource(filenameProvider, null, null);
+ try {
+ BoundedSource.BoundedReader<String> reader =
+ textSource
+ .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes())
+ .createReader(c.getPipelineOptions());
+ for (boolean more = reader.start(); more; more = reader.advance()) {
+ c.output(reader.getCurrent());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unable to readFile: " + file.getMetadata().resourceId().toString());
+ }
+ }
+ }
+
+ /** A transform that reads CSV file records. */
+ private static class TextFileReadTransform
+ extends PTransform<PCollection<String>, PCollection<String>> {
+ public TextFileReadTransform() {}
+
+ @Override
+ public PCollection<String> expand(PCollection<String> files) {
+ return files
+ // PCollection<String>
+ .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
+ // PCollection<Match.Metadata>
+ .apply(FileIO.readMatches())
+ // PCollection<FileIO.ReadableFile>
+ .apply("Read lines", ParDo.of(new TextIOReadTest.TextSourceTest.FileReadDoFn()));
+ // PCollection<String>: line
+ }
+ }
+
+ private Path createTestFile(String filename, Charset charset, String... lines)
+ throws IOException {
+ Path path = Files.createTempFile(filename, ".csv");
+ try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) {
+ for (String line : lines) {
+ writer.write(line);
+ writer.write('\n');
+ }
+ }
+ return path;
+ }
+ }
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java
deleted file mode 100644
index 36a3f68..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.FileIO.ReadableFile;
-import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for TextSource class. */
-@RunWith(JUnit4.class)
-public class TextSourceTest {
- @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
- @Test
- @Category(NeedsRunner.class)
- public void testRemoveUtf8BOM() throws Exception {
- Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1");
- Path p2 =
- createTestFile(
- "test_txt_utf8_no_bom",
- Charset.forName("UTF-8"),
- "1,p2-Japanese:テスト",
- "2,p2-Japanese:テスト");
- Path p3 =
- createTestFile(
- "test_txt_utf8_bom",
- Charset.forName("UTF-8"),
- "\uFEFF1,p3-テストBOM",
- "\uFEFF2,p3-テストBOM");
- PCollection<String> contents =
- pipeline
- .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString()))
- .setCoder(StringUtf8Coder.of())
- // PCollection<String>
- .apply("Read file", new TextFileReadTransform());
- // PCollection<KV<String, String>>: tableName, line
-
- // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed.
- PAssert.that(contents)
- .containsInAnyOrder(
- "1,p1",
- "2,p1",
- "1,p2-Japanese:テスト",
- "2,p2-Japanese:テスト",
- "1,p3-テストBOM",
- "\uFEFF2,p3-テストBOM");
-
- pipeline.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testPreserveNonBOMBytes() throws Exception {
- // Contains \uFEFE, not UTF BOM.
- Path p1 =
- createTestFile(
- "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
- PCollection<String> contents =
- pipeline
- .apply("Create", Create.of(p1.toString()))
- .setCoder(StringUtf8Coder.of())
- // PCollection<String>
- .apply("Read file", new TextFileReadTransform());
-
- PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
-
- pipeline.run();
- }
-
- private static class FileReadDoFn extends DoFn<ReadableFile, String> {
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- ReadableFile file = c.element();
- ValueProvider<String> filenameProvider =
- ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename());
- // Create a TextSource, passing null as the delimiter to use the default
- // delimiters ('\n', '\r', or '\r\n').
- TextSource textSource = new TextSource(filenameProvider, null, null);
- try {
- BoundedSource.BoundedReader<String> reader =
- textSource
- .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes())
- .createReader(c.getPipelineOptions());
- for (boolean more = reader.start(); more; more = reader.advance()) {
- c.output(reader.getCurrent());
- }
- } catch (IOException e) {
- throw new RuntimeException(
- "Unable to readFile: " + file.getMetadata().resourceId().toString());
- }
- }
- }
-
- /** A transform that reads CSV file records. */
- private static class TextFileReadTransform
- extends PTransform<PCollection<String>, PCollection<String>> {
- public TextFileReadTransform() {}
-
- @Override
- public PCollection<String> expand(PCollection<String> files) {
- return files
- // PCollection<String>
- .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
- // PCollection<Match.Metadata>
- .apply(FileIO.readMatches())
- // PCollection<FileIO.ReadableFile>
- .apply("Read lines", ParDo.of(new FileReadDoFn()));
- // PCollection<String>: line
- }
- }
-
- private Path createTestFile(String filename, Charset charset, String... lines)
- throws IOException {
- Path path = Files.createTempFile(filename, ".csv");
- try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) {
- for (String line : lines) {
- writer.write(line);
- writer.write('\n');
- }
- }
- return path;
- }
-}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java
index c86ccd8..a940778 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java
@@ -21,8 +21,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThrows;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -30,14 +28,12 @@
/** Tests for {@link HistogramData}. */
@RunWith(JUnit4.class)
public class HistogramDataTest {
- @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(HistogramData.class);
@Test
public void testOutOfRangeWarning() {
HistogramData histogramData = HistogramData.linear(0, 20, 5);
histogramData.record(100);
assertThat(histogramData.getTotalCount(), equalTo(1L));
- expectedLogs.verifyWarn("out of upper bound");
}
@Test
@@ -149,4 +145,53 @@
assertThat(histogramData.getTotalCount(), equalTo(0L));
assertThat(histogramData.getCount(5), equalTo(0L));
}
+
+ @Test
+ public void testUpdateUsingDoublesAndCumulative() {
+ HistogramData data = HistogramData.linear(0, 2, 2);
+ data.record(-1); // to -Inf bucket
+ data.record(0); // bucket 0
+ data.record(1);
+ data.record(3); // bucket 1
+ data.record(4); // to Inf bucket
+ assertThat(data.getCount(0), equalTo(2L));
+ assertThat(data.getCount(1), equalTo(1L));
+ assertThat(data.getTotalCount(), equalTo(5L));
+
+ // Now try updating it with another HistogramData
+ HistogramData data2 = HistogramData.linear(0, 2, 2);
+ data2.record(-1); // to -Inf bucket
+ data2.record(-1); // to -Inf bucket
+ data2.record(0); // bucket 0
+ data2.record(0);
+ data2.record(1);
+ data2.record(1);
+ data2.record(3); // bucket 1
+ data2.record(3);
+ data2.record(4); // to Inf bucket
+ data2.record(4);
+ assertThat(data2.getCount(0), equalTo(4L));
+ assertThat(data2.getCount(1), equalTo(2L));
+ assertThat(data2.getTotalCount(), equalTo(10L));
+
+ data.update(data2);
+ assertThat(data.getCount(0), equalTo(6L));
+ assertThat(data.getCount(1), equalTo(3L));
+ assertThat(data.getTotalCount(), equalTo(15L));
+ }
+
+ @Test
+ public void testIncrementBucketCountByIndex() {
+ HistogramData data = HistogramData.linear(0, 2, 2);
+ data.incBottomBucketCount(1);
+ data.incBucketCount(0, 2);
+ data.incBucketCount(1, 3);
+ data.incTopBucketCount(4);
+
+ assertThat(data.getBottomBucketCount(), equalTo(1L));
+ assertThat(data.getCount(0), equalTo(2L));
+ assertThat(data.getCount(1), equalTo(3L));
+ assertThat(data.getTopBucketCount(), equalTo(4L));
+ assertThat(data.getTotalCount(), equalTo(10L));
+ }
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
index 0c7396d..74f4dba 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
@@ -19,11 +19,10 @@
import static org.apache.beam.sdk.extensions.sql.meta.provider.kafka.Schemas.PAYLOAD_FIELD;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;
-import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
@@ -33,6 +32,11 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Kafka table provider.
@@ -45,23 +49,62 @@
* NAME VARCHAR(127) COMMENT 'this is the name'
* )
* COMMENT 'this is the table orders'
- * LOCATION 'kafka://localhost:2181/brokers?topic=test'
- * TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", "topic2"]}'
+ * TYPE kafka
+ * // Optional. One broker host:port pair to bootstrap with and a topic.
+ * // Only one topic overall may be provided for writing.
+ * LOCATION 'my.company.url.com:2181/topic1'
+ * // Extra bootstrap_servers and topics can be provided explicitly. These will be merged
+ * // with the server and topic in LOCATION.
+ * TBLPROPERTIES '{
+ * "bootstrap_servers": ["104.126.7.88:7743", "104.111.9.22:7743"],
+ * "topics": ["topic2", "topic3"]
+ * }'
* }</pre>
*/
@AutoService(TableProvider.class)
public class KafkaTableProvider extends InMemoryMetaTableProvider {
+ private static class ParsedLocation {
+ String brokerLocation = "";
+ String topic = "";
+ }
+
+ private static ParsedLocation parseLocation(String location) {
+ ParsedLocation parsed = new ParsedLocation();
+ List<String> split = Splitter.on('/').splitToList(location);
+ checkArgument(
+ split.size() >= 2,
+ "Location string `%s` invalid: must be <broker bootstrap location>/<topic>.",
+ location);
+ parsed.topic = Iterables.getLast(split);
+ parsed.brokerLocation = String.join("/", split.subList(0, split.size() - 1));
+ return parsed;
+ }
+
+ private static List<String> mergeParam(Optional<String> initial, @Nullable List<Object> toMerge) {
+ ImmutableList.Builder<String> merged = ImmutableList.builder();
+ initial.ifPresent(merged::add);
+ if (toMerge != null) {
+ toMerge.forEach(o -> merged.add(o.toString()));
+ }
+ return merged.build();
+ }
+
@Override
public BeamSqlTable buildBeamSqlTable(Table table) {
Schema schema = table.getSchema();
-
JSONObject properties = table.getProperties();
- String bootstrapServers = properties.getString("bootstrap.servers");
- JSONArray topicsArr = properties.getJSONArray("topics");
- List<String> topics = new ArrayList<>(topicsArr.size());
- for (Object topic : topicsArr) {
- topics.add(topic.toString());
+
+ Optional<ParsedLocation> parsedLocation = Optional.empty();
+ if (!Strings.isNullOrEmpty(table.getLocation())) {
+ parsedLocation = Optional.of(parseLocation(checkArgumentNotNull(table.getLocation())));
}
+ List<String> topics =
+ mergeParam(parsedLocation.map(loc -> loc.topic), properties.getJSONArray("topics"));
+ List<String> allBootstrapServers =
+ mergeParam(
+ parsedLocation.map(loc -> loc.brokerLocation),
+ properties.getJSONArray("bootstrap_servers"));
+ String bootstrapServers = String.join(",", allBootstrapServers);
Optional<String> payloadFormat =
properties.containsKey("format")
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
index 869c86b..fee04e4 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
@@ -93,8 +93,8 @@
.name("kafka")
.type("kafka")
.schema(TEST_SCHEMA)
- .properties(
- JSON.parseObject("{ \"topics\": [ \"mytopic\" ], \"format\": \"avro\" }"))
+ .location("localhost/mytopic")
+ .properties(JSON.parseObject("{ \"format\": \"avro\" }"))
.build()));
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
index 6f97c4e..d33665c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
@@ -66,8 +66,8 @@
.name("kafka")
.type("kafka")
.schema(TEST_SCHEMA)
- .properties(
- JSON.parseObject("{ \"topics\": [ \"mytopic\" ], \"format\": \"json\" }"))
+ .location("localhost/mytopic")
+ .properties(JSON.parseObject("{ \"format\": \"json\" }"))
.build()));
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
index ba90cf9..a75dded 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
@@ -92,9 +92,10 @@
.name("kafka")
.type("kafka")
.schema(schema)
+ .location("localhost/mytopic")
.properties(
JSON.parseObject(
- "{ \"topics\": [ \"mytopic\" ], \"format\": \"proto\", \"protoClass\": \""
+ "{ \"format\": \"proto\", \"protoClass\": \""
+ PayloadMessages.TestMessage.class.getName()
+ "\" }"))
.build()));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
index 860bc34..958ca63 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
@@ -98,9 +98,10 @@
.name("kafka")
.type("kafka")
.schema(schema)
+ .location("localhost/mytopic")
.properties(
JSON.parseObject(
- "{ \"topics\": [ \"mytopic\" ], \"format\": \"thrift\", \"thriftClass\": \""
+ "{ \"format\": \"thrift\", \"thriftClass\": \""
+ TestThriftMessage.class.getName()
+ "\", \"thriftProtocolFactoryClass\": \""
+ TCompactProtocol.Factory.class.getName()
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
index 0546c7a..ec94b65 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
@@ -19,6 +19,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.alibaba.fastjson.JSON;
import java.io.Serializable;
@@ -130,6 +131,13 @@
kafkaOptions = pipeline.getOptions().as(KafkaOptions.class);
kafkaOptions.setKafkaTopic(topic);
kafkaOptions.setKafkaBootstrapServerAddress(KAFKA_CONTAINER.getBootstrapServers());
+ checkArgument(
+ !KAFKA_CONTAINER.getBootstrapServers().contains(","),
+ "This integration test expects exactly one bootstrap server.");
+ }
+
+ private static String buildLocation() {
+ return kafkaOptions.getKafkaBootstrapServerAddress() + "/" + kafkaOptions.getKafkaTopic();
}
@Test
@@ -139,7 +147,7 @@
Table.builder()
.name("kafka_table")
.comment("kafka table")
- .location("")
+ .location(buildLocation())
.schema(TEST_TABLE_SCHEMA)
.type("kafka")
.properties(JSON.parseObject(objectsProvider.getKafkaPropertiesString()))
@@ -165,9 +173,9 @@
+ "f_string VARCHAR NOT NULL \n"
+ ") \n"
+ "TYPE 'kafka' \n"
- + "LOCATION ''\n"
+ + "LOCATION '%s'\n"
+ "TBLPROPERTIES '%s'",
- objectsProvider.getKafkaPropertiesString());
+ buildLocation(), objectsProvider.getKafkaPropertiesString());
TableProvider tb = new KafkaTableProvider();
BeamSqlEnv env = BeamSqlEnv.inMemory(tb);
@@ -213,9 +221,9 @@
+ ">"
+ ") \n"
+ "TYPE 'kafka' \n"
- + "LOCATION ''\n"
+ + "LOCATION '%s'\n"
+ "TBLPROPERTIES '%s'",
- objectsProvider.getKafkaPropertiesString());
+ buildLocation(), objectsProvider.getKafkaPropertiesString());
TableProvider tb = new KafkaTableProvider();
BeamSqlEnv env = BeamSqlEnv.inMemory(tb);
@@ -365,11 +373,7 @@
protected String getKafkaPropertiesString() {
return "{ "
+ (getPayloadFormat() == null ? "" : "\"format\" : \"" + getPayloadFormat() + "\",")
- + "\"bootstrap.servers\" : \""
- + kafkaOptions.getKafkaBootstrapServerAddress()
- + "\",\"topics\":[\""
- + kafkaOptions.getKafkaTopic()
- + "\"] }";
+ + "}";
}
}
@@ -410,11 +414,6 @@
protected String getKafkaPropertiesString() {
return "{ "
+ "\"format\" : \"proto\","
- + "\"bootstrap.servers\" : \""
- + kafkaOptions.getKafkaBootstrapServerAddress()
- + "\",\"topics\":[\""
- + kafkaOptions.getKafkaTopic()
- + "\"],"
+ "\"protoClass\": \""
+ PayloadMessages.ItMessage.class.getName()
+ "\"}";
@@ -473,11 +472,6 @@
protected String getKafkaPropertiesString() {
return "{ "
+ "\"format\" : \"thrift\","
- + "\"bootstrap.servers\" : \""
- + kafkaOptions.getKafkaBootstrapServerAddress()
- + "\",\"topics\":[\""
- + kafkaOptions.getKafkaTopic()
- + "\"],"
+ "\"thriftClass\": \""
+ thriftClass.getName()
+ "\", \"thriftProtocolFactoryClass\": \""
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
index e7154d9..a2a663a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
@@ -23,12 +23,13 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
+import java.util.List;
import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.thrift.payloads.SimpleThriftMessage;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.thrift.TBase;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -38,6 +39,8 @@
/** UnitTest for {@link KafkaTableProvider}. */
public class KafkaTableProviderTest {
private final KafkaTableProvider provider = new KafkaTableProvider();
+ private static final String LOCATION_BROKER = "104.126.7.88:7743";
+ private static final String LOCATION_TOPIC = "topic1";
@Test
public void testBuildBeamSqlCSVTable() {
@@ -47,9 +50,37 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof BeamKafkaCSVTable);
- BeamKafkaCSVTable csvTable = (BeamKafkaCSVTable) sqlTable;
- assertEquals("localhost:9092", csvTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), csvTable.getTopics());
+ BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
+ }
+
+ @Test
+ public void testBuildWithExtraServers() {
+ Table table =
+ mockTableWithExtraServers("hello", ImmutableList.of("localhost:1111", "localhost:2222"));
+ BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+ assertNotNull(sqlTable);
+ assertTrue(sqlTable instanceof BeamKafkaCSVTable);
+
+ BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+ assertEquals(
+ LOCATION_BROKER + ",localhost:1111,localhost:2222", kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
+ }
+
+ @Test
+ public void testBuildWithExtraTopics() {
+ Table table = mockTableWithExtraTopics("hello", ImmutableList.of("topic2", "topic3"));
+ BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+ assertNotNull(sqlTable);
+ assertTrue(sqlTable instanceof BeamKafkaCSVTable);
+
+ BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC, "topic2", "topic3"), kafkaTable.getTopics());
}
@Test
@@ -60,9 +91,9 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof BeamKafkaTable);
- BeamKafkaTable csvTable = (BeamKafkaTable) sqlTable;
- assertEquals("localhost:9092", csvTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), csvTable.getTopics());
+ BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
}
@Test
@@ -73,9 +104,9 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof BeamKafkaTable);
- BeamKafkaTable protoTable = (BeamKafkaTable) sqlTable;
- assertEquals("localhost:9092", protoTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), protoTable.getTopics());
+ BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
}
@Test
@@ -87,9 +118,9 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof BeamKafkaTable);
- BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
- assertEquals("localhost:9092", thriftTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+ BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
}
@Test
@@ -100,9 +131,9 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof NestedPayloadKafkaTable);
- BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
- assertEquals("localhost:9092", thriftTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+ BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
}
@Test
@@ -114,9 +145,9 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof NestedPayloadKafkaTable);
- BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
- assertEquals("localhost:9092", thriftTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+ BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
}
@Test
@@ -125,48 +156,67 @@
}
private static Table mockTable(String name) {
- return mockTable(name, false, null, null, null, null);
+ return mockTable(name, false, null, null, null, null, null, null);
+ }
+
+ private static Table mockTableWithExtraServers(String name, List<String> extraBootstrapServers) {
+ return mockTable(name, false, extraBootstrapServers, null, null, null, null, null);
+ }
+
+ private static Table mockTableWithExtraTopics(String name, List<String> extraTopics) {
+ return mockTable(name, false, null, extraTopics, null, null, null, null);
}
private static Table mockTable(String name, String payloadFormat) {
- return mockTable(name, false, payloadFormat, null, null, null);
+ return mockTable(name, false, null, null, payloadFormat, null, null, null);
}
private static Table mockProtoTable(String name, Class<?> protoClass) {
- return mockTable(name, false, "proto", protoClass, null, null);
+ return mockTable(name, false, null, null, "proto", protoClass, null, null);
}
private static Table mockThriftTable(
String name,
Class<? extends TBase<?, ?>> thriftClass,
Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
- return mockTable(name, false, "thrift", null, thriftClass, thriftProtocolFactoryClass);
+ return mockTable(
+ name, false, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
}
private static Table mockNestedBytesTable(String name) {
- return mockTable(name, true, null, null, null, null);
+ return mockTable(name, true, null, null, null, null, null, null);
}
private static Table mockNestedThriftTable(
String name,
Class<? extends TBase<?, ?>> thriftClass,
Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
- return mockTable(name, true, "thrift", null, thriftClass, thriftProtocolFactoryClass);
+ return mockTable(
+ name, true, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
}
private static Table mockTable(
String name,
boolean isNested,
+ @Nullable List<String> extraBootstrapServers,
+ @Nullable List<String> extraTopics,
@Nullable String payloadFormat,
@Nullable Class<?> protoClass,
@Nullable Class<? extends TBase<?, ?>> thriftClass,
@Nullable Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
JSONObject properties = new JSONObject();
- properties.put("bootstrap.servers", "localhost:9092");
- JSONArray topics = new JSONArray();
- topics.add("topic1");
- topics.add("topic2");
- properties.put("topics", topics);
+
+ if (extraBootstrapServers != null) {
+ JSONArray bootstrapServers = new JSONArray();
+ bootstrapServers.addAll(extraBootstrapServers);
+ properties.put("bootstrap_servers", bootstrapServers);
+ }
+ if (extraTopics != null) {
+ JSONArray topics = new JSONArray();
+ topics.addAll(extraTopics);
+ properties.put("topics", topics);
+ }
+
if (payloadFormat != null) {
properties.put("format", payloadFormat);
}
@@ -197,7 +247,7 @@
return Table.builder()
.name(name)
.comment(name + " table")
- .location("kafka://localhost:2181/brokers?topic=test")
+ .location(LOCATION_BROKER + "/" + LOCATION_TOPIC)
.schema(schema)
.type("kafka")
.properties(properties)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
index e1bf02f..e5bba75 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
@@ -25,17 +25,15 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
-import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.metrics.MetricsLogger;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
@@ -56,7 +54,6 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
@@ -80,7 +77,7 @@
private final boolean ignoreInsertIds;
private final SerializableFunction<ElementT, TableRow> toTableRow;
private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
- private final Set<MetricName> metricFilter;
+ private final Set<String> allowedMetricUrns;
/** Tracks bytes written, exposed as "ByteCount" Counter. */
private Counter byteCounter = SinkMetrics.bytesWritten();
@@ -109,7 +106,7 @@
this.ignoreInsertIds = ignoreInsertIds;
this.toTableRow = toTableRow;
this.toFailsafeTableRow = toFailsafeTableRow;
- this.metricFilter = getMetricFilter();
+ this.allowedMetricUrns = getAllowedMetricUrns();
this.batchViaStateful = false;
}
@@ -135,34 +132,14 @@
this.ignoreInsertIds = ignoreInsertIds;
this.toTableRow = toTableRow;
this.toFailsafeTableRow = toFailsafeTableRow;
- this.metricFilter = getMetricFilter();
+ this.allowedMetricUrns = getAllowedMetricUrns();
this.batchViaStateful = batchViaStateful;
}
- private static Set<MetricName> getMetricFilter() {
- ImmutableSet.Builder<MetricName> setBuilder = ImmutableSet.builder();
- setBuilder.add(
- MonitoringInfoMetricName.named(
- MonitoringInfoConstants.Urns.API_REQUEST_LATENCIES,
- BigQueryServicesImpl.API_METRIC_LABEL));
- for (String status : BigQueryServicesImpl.DatasetServiceImpl.CANONICAL_STATUS_MAP.values()) {
- setBuilder.add(
- MonitoringInfoMetricName.named(
- MonitoringInfoConstants.Urns.API_REQUEST_COUNT,
- ImmutableMap.<String, String>builder()
- .putAll(BigQueryServicesImpl.API_METRIC_LABEL)
- .put(MonitoringInfoConstants.Labels.STATUS, status)
- .build()));
- }
- setBuilder.add(
- MonitoringInfoMetricName.named(
- MonitoringInfoConstants.Urns.API_REQUEST_COUNT,
- ImmutableMap.<String, String>builder()
- .putAll(BigQueryServicesImpl.API_METRIC_LABEL)
- .put(
- MonitoringInfoConstants.Labels.STATUS,
- BigQueryServicesImpl.DatasetServiceImpl.CANONICAL_STATUS_UNKNOWN)
- .build()));
+ private static Set<String> getAllowedMetricUrns() {
+ ImmutableSet.Builder<String> setBuilder = ImmutableSet.builder();
+ setBuilder.add(MonitoringInfoConstants.Urns.API_REQUEST_COUNT);
+ setBuilder.add(MonitoringInfoConstants.Urns.API_REQUEST_LATENCIES);
return setBuilder.build();
}
@@ -394,10 +371,9 @@
if (processWideContainer instanceof MetricsLogger) {
MetricsLogger processWideMetricsLogger = (MetricsLogger) processWideContainer;
processWideMetricsLogger.tryLoggingMetrics(
- "BigQuery HTTP API Metrics: \n",
- metricFilter,
- options.getBqStreamingApiLoggingFrequencySec() * 1000L,
- true);
+ "API call Metrics: \n",
+ this.allowedMetricUrns,
+ options.getBqStreamingApiLoggingFrequencySec() * 1000L);
}
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 0bb5d2c..efc0fa9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -181,7 +181,8 @@
* Create an append client for a given Storage API write stream. The stream must be created
* first.
*/
- StreamAppendClient getStreamAppendClient(String streamName) throws Exception;
+ StreamAppendClient getStreamAppendClient(String streamName, Descriptor descriptor)
+ throws Exception;
/** Flush a given stream up to the given offset. The stream must have type BUFFERED. */
ApiFuture<FlushRowsResponse> flush(String streamName, long flushOffset)
@@ -200,8 +201,7 @@
/** An interface for appending records to a Storage API write stream. */
interface StreamAppendClient extends AutoCloseable {
/** Append rows to a Storage API write stream at the given offset. */
- ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows, Descriptor descriptor)
- throws Exception;
+ ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) throws Exception;
/**
* Pin this object. If close() is called before all pins are removed, the underlying resources
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 8f7a097..290dca2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -63,7 +63,6 @@
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
-import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse;
@@ -633,7 +632,7 @@
table.getTableReference().getProjectId(),
table.getTableReference().getDatasetId(),
table.getTableReference().getTableId(),
- TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
+ TimeUnit.MILLISECONDS.toMinutes(RETRY_CREATE_TABLE_DURATION_MILLIS));
retry = true;
}
continue;
@@ -1095,8 +1094,12 @@
}
@Override
- public StreamAppendClient getStreamAppendClient(String streamName) throws Exception {
- StreamWriterV2 streamWriter = StreamWriterV2.newBuilder(streamName).build();
+ public StreamAppendClient getStreamAppendClient(String streamName, Descriptor descriptor)
+ throws Exception {
+ ProtoSchema protoSchema =
+ ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build();
+ StreamWriterV2 streamWriter =
+ StreamWriterV2.newBuilder(streamName).setWriterSchema(protoSchema).build();
return new StreamAppendClient() {
private int pins = 0;
private boolean closed = false;
@@ -1136,20 +1139,9 @@
}
@Override
- public ApiFuture<AppendRowsResponse> appendRows(
- long offset, ProtoRows rows, Descriptor descriptor) throws Exception {
- final AppendRowsRequest.ProtoData data =
- AppendRowsRequest.ProtoData.newBuilder()
- .setWriterSchema(
- ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build())
- .setRows(rows)
- .build();
- AppendRowsRequest.Builder appendRequestBuilder =
- AppendRowsRequest.newBuilder().setProtoRows(data).setWriteStream(streamName);
- if (offset >= 0) {
- appendRequestBuilder = appendRequestBuilder.setOffset(Int64Value.of(offset));
- }
- return streamWriter.append(appendRequestBuilder.build());
+ public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
+ throws Exception {
+ return streamWriter.append(rows, offset);
}
};
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 6f21e8f..8f15121 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -167,7 +167,8 @@
.createWriteStream(tableUrn, Type.PENDING)
.getName();
this.streamAppendClient =
- Preconditions.checkNotNull(datasetService).getStreamAppendClient(streamName);
+ Preconditions.checkNotNull(datasetService)
+ .getStreamAppendClient(streamName, messageConverter.getSchemaDescriptor());
this.currentOffset = 0;
}
return streamAppendClient;
@@ -218,8 +219,7 @@
try {
long offset = currentOffset;
currentOffset += inserts.getSerializedRowsCount();
- return getWriteStream()
- .appendRows(offset, protoRows, messageConverter.getSchemaDescriptor());
+ return getWriteStream().appendRows(offset, protoRows);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index ca465dc..de04b12 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -333,6 +333,7 @@
MessageConverter<ElementT> messageConverter =
messageConverters.get(element.getKey().getKey(), dynamicDestinations);
Descriptor descriptor = messageConverter.getSchemaDescriptor();
+
// Each ProtoRows object contains at most 1MB of rows.
// TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if
// already proto or
@@ -381,7 +382,8 @@
}
String stream = getOrCreateStream(tableId, streamName, streamOffset, datasetService);
StreamAppendClient appendClient =
- APPEND_CLIENTS.get(stream, () -> datasetService.getStreamAppendClient(stream));
+ APPEND_CLIENTS.get(
+ stream, () -> datasetService.getStreamAppendClient(stream, descriptor));
for (AppendRowsContext context : contexts) {
context.streamName = stream;
appendClient.pin();
@@ -420,8 +422,8 @@
StreamAppendClient appendClient =
APPEND_CLIENTS.get(
context.streamName,
- () -> datasetService.getStreamAppendClient(context.streamName));
- return appendClient.appendRows(context.offset, protoRows, descriptor);
+ () -> datasetService.getStreamAppendClient(context.streamName, descriptor));
+ return appendClient.appendRows(context.offset, protoRows);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index b24bf16..a716586 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -415,11 +415,11 @@
}
@Override
- public StreamAppendClient getStreamAppendClient(String streamName) {
+ public StreamAppendClient getStreamAppendClient(String streamName, Descriptor descriptor) {
return new StreamAppendClient() {
@Override
- public ApiFuture<AppendRowsResponse> appendRows(
- long offset, ProtoRows rows, Descriptor descriptor) throws Exception {
+ public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
+ throws Exception {
synchronized (tables) {
Stream stream = writeStreams.get(streamName);
if (stream == null) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 4877d86..286df98 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -1063,7 +1063,7 @@
verifyNotNull(ret.getTableReference());
expectedLogs.verifyInfo(
"Quota limit reached when creating table project:dataset.table, "
- + "retrying up to 5.0 minutes");
+ + "retrying up to 5 minutes");
}
/** Tests that {@link DatasetServiceImpl#insertAll} uses the supplied {@link ErrorContainer}. */
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 78031a4..3a8010a 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -37,71 +37,109 @@
})
+def _get_deferred_args(*args):
+ return [
+ frame_base.DeferredFrame.wrap(
+ expressions.ConstantExpression(arg, arg[0:0])) for arg in args
+ ]
+
+
class DeferredFrameTest(unittest.TestCase):
- def _run_test(self, func, *args, distributed=True, expect_error=False):
- deferred_args = [
- frame_base.DeferredFrame.wrap(
- expressions.ConstantExpression(arg, arg[0:0])) for arg in args
- ]
+ def _run_error_test(self, func, *args):
+ """Verify that func(*args) raises the same exception in pandas and in Beam.
+
+ Note that for Beam this only checks for exceptions that are raised during
+ expression generation (i.e. construction time). Execution time exceptions
+ are not helpful."""
+ deferred_args = _get_deferred_args(*args)
+
+ # Get expected error
try:
expected = func(*args)
except Exception as e:
- if not expect_error:
- raise
- expected = e
+ expected_error = e
else:
- if expect_error:
- raise AssertionError(
- "Expected an error but computing expected result successfully "
- f"returned: {expected}")
+ raise AssertionError(
+ "Expected an error, but executing with pandas successfully "
+ f"returned:\n{expected}")
- session_type = (
- expressions.PartitioningSession if distributed else expressions.Session)
+ # Get actual error
try:
- actual = session_type({}).evaluate(func(*deferred_args)._expr)
+ _ = func(*deferred_args)._expr
except Exception as e:
- if not expect_error:
- raise
actual = e
else:
- if expect_error:
- raise AssertionError(
- "Expected an error:\n{expected}\nbut successfully "
- f"returned:\n{actual}")
+ raise AssertionError(
+ "Expected an error:\n{expected_error}\nbut Beam successfully "
+ "generated an expression.")
- if expect_error:
- if not isinstance(actual,
- type(expected)) or not str(actual) == str(expected):
- raise AssertionError(
- f'Expected {expected!r} to be raised, but got {actual!r}'
- ) from actual
+ # Verify
+ if (not isinstance(actual, type(expected_error)) or
+ not str(actual) == str(expected_error)):
+ raise AssertionError(
+ f'Expected {expected_error!r} to be raised, but got {actual!r}'
+ ) from actual
+
+ def _run_test(self, func, *args, distributed=True, nonparallel=False):
+ """Verify that func(*args) produces the same result in pandas and in Beam.
+
+ Args:
+ distributed (bool): Whether or not to use PartitioningSession to
+ simulate parallel execution.
+ nonparallel (bool): Whether or not this function contains a
+ non-parallelizable operation. If True, the expression will be
+ generated twice, once outside of an allow_non_parallel_operations
+ block (to verify NonParallelOperation is raised), and again inside
+ of an allow_non_parallel_operations block to actually generate an
+ expression to verify."""
+ # Compute expected value
+ expected = func(*args)
+
+ # Compute actual value
+ deferred_args = _get_deferred_args(*args)
+ if nonparallel:
+ # First run outside a nonparallel block to confirm this raises as expected
+ with self.assertRaises(expressions.NonParallelOperation):
+ _ = func(*deferred_args)
+
+ # Re-run in an allow non parallel block to get an expression to verify
+ with beam.dataframe.allow_non_parallel_operations():
+ expr = func(*deferred_args)._expr
else:
- if isinstance(expected, pd.core.generic.NDFrame):
- if distributed:
- if expected.index.is_unique:
- expected = expected.sort_index()
- actual = actual.sort_index()
- else:
- expected = expected.sort_values(list(expected.columns))
- actual = actual.sort_values(list(actual.columns))
+ expr = func(*deferred_args)._expr
- if isinstance(expected, pd.Series):
- pd.testing.assert_series_equal(expected, actual)
- elif isinstance(expected, pd.DataFrame):
- pd.testing.assert_frame_equal(expected, actual)
+ # Compute the result of the generated expression
+ session_type = (
+ expressions.PartitioningSession if distributed else expressions.Session)
+
+ actual = session_type({}).evaluate(expr)
+
+ # Verify
+ if isinstance(expected, pd.core.generic.NDFrame):
+ if distributed:
+ if expected.index.is_unique:
+ expected = expected.sort_index()
+ actual = actual.sort_index()
else:
- raise ValueError(
- f"Expected value is a {type(expected)},"
- "not a Series or DataFrame.")
+ expected = expected.sort_values(list(expected.columns))
+ actual = actual.sort_values(list(actual.columns))
+
+ if isinstance(expected, pd.Series):
+ pd.testing.assert_series_equal(expected, actual)
+ elif isinstance(expected, pd.DataFrame):
+ pd.testing.assert_frame_equal(expected, actual)
else:
- # Expectation is not a pandas object
- if isinstance(expected, float):
- cmp = lambda x: np.isclose(expected, x)
- else:
- cmp = expected.__eq__
- self.assertTrue(
- cmp(actual),
- 'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual))
+ raise ValueError(
+ f"Expected value is a {type(expected)},"
+ "not a Series or DataFrame.")
+ else:
+ # Expectation is not a pandas object
+ if isinstance(expected, float):
+ cmp = lambda x: np.isclose(expected, x)
+ else:
+ cmp = expected.__eq__
+ self.assertTrue(
+ cmp(actual), 'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual))
def test_series_arithmetic(self):
a = pd.Series([1, 2, 3])
@@ -246,14 +284,10 @@
self._run_test(lambda df: df.groupby('group').bar.sum(), df)
self._run_test(lambda df: df.groupby('group')['foo'].sum(), df)
self._run_test(lambda df: df.groupby('group')['baz'].sum(), df)
- self._run_test(
- lambda df: df.groupby('group')[['bar', 'baz']].bar.sum(),
- df,
- expect_error=True)
- self._run_test(
- lambda df: df.groupby('group')[['bat']].sum(), df, expect_error=True)
- self._run_test(
- lambda df: df.groupby('group').bat.sum(), df, expect_error=True)
+ self._run_error_test(
+ lambda df: df.groupby('group')[['bar', 'baz']].bar.sum(), df)
+ self._run_error_test(lambda df: df.groupby('group')[['bat']].sum(), df)
+ self._run_error_test(lambda df: df.groupby('group').bat.sum(), df)
self._run_test(lambda df: df.groupby('group').median(), df)
self._run_test(lambda df: df.groupby('group').foo.median(), df)
@@ -266,26 +300,19 @@
df = GROUPBY_DF
# non-existent projection column
- self._run_test(
- lambda df: df.groupby('group')[['bar', 'baz']].bar.median(),
- df,
- expect_error=True)
- self._run_test(
- lambda df: df.groupby('group')[['bad']].median(), df, expect_error=True)
+ self._run_error_test(
+ lambda df: df.groupby('group')[['bar', 'baz']].bar.median(), df)
+ self._run_error_test(lambda df: df.groupby('group')[['bad']].median(), df)
- self._run_test(
- lambda df: df.groupby('group').bad.median(), df, expect_error=True)
+ self._run_error_test(lambda df: df.groupby('group').bad.median(), df)
def test_groupby_errors_non_existent_label(self):
df = GROUPBY_DF
# non-existent grouping label
- self._run_test(
- lambda df: df.groupby(['really_bad', 'foo', 'bad']).foo.sum(),
- df,
- expect_error=True)
- self._run_test(
- lambda df: df.groupby('bad').foo.sum(), df, expect_error=True)
+ self._run_error_test(
+ lambda df: df.groupby(['really_bad', 'foo', 'bad']).foo.sum(), df)
+ self._run_error_test(lambda df: df.groupby('bad').foo.sum(), df)
def test_groupby_callable(self):
df = GROUPBY_DF
@@ -307,11 +334,9 @@
self._run_test(lambda df: df.set_index(['index1', 'index2'], drop=True), df)
self._run_test(lambda df: df.set_index('values'), df)
- self._run_test(lambda df: df.set_index('bad'), df, expect_error=True)
- self._run_test(
- lambda df: df.set_index(['index2', 'bad', 'really_bad']),
- df,
- expect_error=True)
+ self._run_error_test(lambda df: df.set_index('bad'), df)
+ self._run_error_test(
+ lambda df: df.set_index(['index2', 'bad', 'really_bad']), df)
def test_series_drop_ignore_errors(self):
midx = pd.MultiIndex(
@@ -397,22 +422,21 @@
df2 = pd.DataFrame({
'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
})
- with beam.dataframe.allow_non_parallel_operations():
- self._run_test(
- lambda df1,
- df2: df1.merge(df2, left_on='lkey', right_on='rkey').rename(
- index=lambda x: '*'),
- df1,
- df2)
- self._run_test(
- lambda df1,
- df2: df1.merge(
- df2,
- left_on='lkey',
- right_on='rkey',
- suffixes=('_left', '_right')).rename(index=lambda x: '*'),
- df1,
- df2)
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, left_on='lkey', right_on='rkey').rename(
+ index=lambda x: '*'),
+ df1,
+ df2,
+ nonparallel=True)
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(
+ df2, left_on='lkey', right_on='rkey', suffixes=('_left', '_right')).
+ rename(index=lambda x: '*'),
+ df1,
+ df2,
+ nonparallel=True)
def test_merge_left_join(self):
# This is from the pandas doctests, but fails due to re-indexing being
@@ -420,12 +444,12 @@
df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
- with beam.dataframe.allow_non_parallel_operations():
- self._run_test(
- lambda df1,
- df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),
- df1,
- df2)
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),
+ df1,
+ df2,
+ nonparallel=True)
def test_merge_on_index(self):
# This is from the pandas doctests, but fails due to re-indexing being
@@ -436,12 +460,12 @@
df2 = pd.DataFrame({
'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
}).set_index('rkey')
- with beam.dataframe.allow_non_parallel_operations():
- self._run_test(
- lambda df1,
- df2: df1.merge(df2, left_index=True, right_index=True),
- df1,
- df2)
+
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, left_index=True, right_index=True),
+ df1,
+ df2)
def test_merge_same_key(self):
df1 = pd.DataFrame({
@@ -450,55 +474,58 @@
df2 = pd.DataFrame({
'key': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
})
- with beam.dataframe.allow_non_parallel_operations():
- self._run_test(
- lambda df1,
- df2: df1.merge(df2, on='key').rename(index=lambda x: '*'),
- df1,
- df2)
- self._run_test(
- lambda df1,
- df2: df1.merge(df2, on='key', suffixes=('_left', '_right')).rename(
- index=lambda x: '*'),
- df1,
- df2)
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, on='key').rename(index=lambda x: '*'),
+ df1,
+ df2,
+ nonparallel=True)
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, on='key', suffixes=('_left', '_right')).rename(
+ index=lambda x: '*'),
+ df1,
+ df2,
+ nonparallel=True)
def test_merge_same_key_doctest(self):
df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
- with beam.dataframe.allow_non_parallel_operations():
- self._run_test(
- lambda df1,
- df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),
- df1,
- df2)
- # Test without specifying 'on'
- self._run_test(
- lambda df1,
- df2: df1.merge(df2, how='left').rename(index=lambda x: '*'),
- df1,
- df2)
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),
+ df1,
+ df2,
+ nonparallel=True)
+ # Test without specifying 'on'
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, how='left').rename(index=lambda x: '*'),
+ df1,
+ df2,
+ nonparallel=True)
def test_merge_same_key_suffix_collision(self):
df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2], 'a_lsuffix': [5, 6]})
df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4], 'a_rsuffix': [7, 8]})
- with beam.dataframe.allow_non_parallel_operations():
- self._run_test(
- lambda df1,
- df2: df1.merge(
- df2, how='left', on='a', suffixes=('_lsuffix', '_rsuffix')).
- rename(index=lambda x: '*'),
- df1,
- df2)
- # Test without specifying 'on'
- self._run_test(
- lambda df1,
- df2: df1.merge(df2, how='left', suffixes=('_lsuffix', '_rsuffix')).
- rename(index=lambda x: '*'),
- df1,
- df2)
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(
+ df2, how='left', on='a', suffixes=('_lsuffix', '_rsuffix')).rename(
+ index=lambda x: '*'),
+ df1,
+ df2,
+ nonparallel=True)
+ # Test without specifying 'on'
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, how='left', suffixes=('_lsuffix', '_rsuffix')).
+ rename(index=lambda x: '*'),
+ df1,
+ df2,
+ nonparallel=True)
def test_series_getitem(self):
s = pd.Series([x**2 for x in range(10)])
@@ -550,10 +577,9 @@
s = pd.Series(list(range(16)))
self._run_test(lambda s: s.agg('sum'), s)
self._run_test(lambda s: s.agg(['sum']), s)
- with beam.dataframe.allow_non_parallel_operations():
- self._run_test(lambda s: s.agg(['sum', 'mean']), s)
- self._run_test(lambda s: s.agg(['mean']), s)
- self._run_test(lambda s: s.agg('mean'), s)
+ self._run_test(lambda s: s.agg(['sum', 'mean']), s, nonparallel=True)
+ self._run_test(lambda s: s.agg(['mean']), s, nonparallel=True)
+ self._run_test(lambda s: s.agg('mean'), s, nonparallel=True)
def test_append_sort(self):
# yapf: disable
@@ -573,12 +599,20 @@
def test_dataframe_agg(self):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
self._run_test(lambda df: df.agg('sum'), df)
- with beam.dataframe.allow_non_parallel_operations():
- self._run_test(lambda df: df.agg(['sum', 'mean']), df)
- self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)
- self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df)
- self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df)
- self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df)
+ self._run_test(lambda df: df.agg(['sum', 'mean']), df, nonparallel=True)
+ self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)
+ self._run_test(
+ lambda df: df.agg({
+ 'A': 'sum', 'B': 'mean'
+ }), df, nonparallel=True)
+ self._run_test(
+ lambda df: df.agg({'A': ['sum', 'mean']}), df, nonparallel=True)
+ self._run_test(
+ lambda df: df.agg({
+ 'A': ['sum', 'mean'], 'B': 'min'
+ }),
+ df,
+ nonparallel=True)
def test_smallest_largest(self):
df = pd.DataFrame({'A': [1, 1, 2, 2], 'B': [2, 3, 5, 7]})
@@ -622,9 +656,8 @@
df = df.set_index('B')
# TODO(BEAM-11190): These aggregations can be done in index partitions, but
# it will require a little more complex logic
- with beam.dataframe.allow_non_parallel_operations():
- self._run_test(lambda df: df.groupby(level=0).sum(), df)
- self._run_test(lambda df: df.groupby(level=0).mean(), df)
+ self._run_test(lambda df: df.groupby(level=0).sum(), df, nonparallel=True)
+ self._run_test(lambda df: df.groupby(level=0).mean(), df, nonparallel=True)
def test_dataframe_eval_query(self):
df = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])
@@ -686,6 +719,14 @@
r"df\.quantile\(q=0\.1, axis='columns'\)"):
self._run_test(lambda df: df.quantile([0.1, 0.5], axis='columns'), df)
+ @unittest.skipIf(PD_VERSION < (1, 1), "drop_na added in pandas 1.1.0")
+ def test_groupby_count_na(self):
+ # Verify we can do a groupby.count() that doesn't drop NaN values
+ self._run_test(
+ lambda df: df.groupby('foo', dropna=True).bar.count(), GROUPBY_DF)
+ self._run_test(
+ lambda df: df.groupby('foo', dropna=False).bar.count(), GROUPBY_DF)
+
class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):
diff --git a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index c456f2a..7e8c155 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -381,8 +381,17 @@
self._saved_pcoders[os.path.join(*labels)])
def cleanup(self):
+
if os.path.exists(self._cache_dir):
- shutil.rmtree(self._cache_dir)
+
+ def on_fail_to_cleanup(function, path, excinfo):
+ _LOGGER.warning(
+ 'Failed to clean up temporary files: %s. You may'
+ 'manually delete them if necessary. Error was: %s',
+ path,
+ excinfo)
+
+ shutil.rmtree(self._cache_dir, onerror=on_fail_to_cleanup)
self._saved_pcoders = {}
self._capture_sinks = {}
self._capture_keys = set()
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
index 343ad6a..6354184 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
@@ -501,18 +501,23 @@
class SizeLimiter(Limiter):
def __init__(self, p):
self.pipeline = p
+ self._rm = None
+
+ def set_recording_manager(self, rm):
+ self._rm = rm
def is_triggered(self):
- rm = ie.current_env().get_recording_manager(self.pipeline)
- return rm.describe()['size'] > 0 if rm else False
+ return self._rm.describe()['size'] > 0 if self._rm else False
# Do the first recording to get the timestamp of the first time the fragment
# was run.
- rm = RecordingManager(p, test_limiters=[SizeLimiter(p)])
+ size_limiter = SizeLimiter(p)
+ rm = RecordingManager(p, test_limiters=[size_limiter])
+ size_limiter.set_recording_manager(rm)
self.assertEqual(rm.describe()['state'], PipelineState.STOPPED)
self.assertTrue(rm.record_pipeline())
- ie.current_env().set_recording_manager(rm, p)
+ # A recording is in progress, no need to start another one.
self.assertFalse(rm.record_pipeline())
for _ in range(60):
diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
index 59fedfd..f109726 100644
--- a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
+++ b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
@@ -344,21 +344,27 @@
### Syntax
+#### Nested mode
```
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
event_timestamp TIMESTAMP,
- attributes MAP<VARCHAR, VARCHAR>,
- payload ROW<tableElement [, tableElement ]*>
+ attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>],
+ payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
-TBLPROPERTIES '{
- "timestampAttributeKey": "key",
- "deadLetterQueue": "projects/[PROJECT]/topics/[TOPIC]",
- "format": "format"
-}'
```
+#### Flattened mode
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*)
+TYPE pubsub
+LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
+```
+
+In nested mode, the following fields hold topic metadata. The presence of the
+`attributes` field triggers nested mode usage.
+
* `event_timestamp`: The event timestamp associated with the Pub/Sub message
by PubsubIO. It can be one of the following:
* Message publish time, which is provided by Pub/Sub. This is the default
@@ -376,6 +382,8 @@
`deadLeaderQueue` field of the `tblProperties` blob. If no dead-letter queue
is specified in this case, an exception is thrown and the pipeline will
crash.
+
+
* `LOCATION`:
* `PROJECT`: ID of the Google Cloud Project
* `TOPIC`: The Pub/Sub topic name. A subscription will be created
@@ -390,15 +398,14 @@
payload was not parsed. If not specified, an exception is thrown for
parsing failures.
* `format`: Optional. Allows you to specify the Pubsub payload format.
- Possible values are {`json`, `avro`}. Defaults to `json`.
### Read Mode
-PubsubIO is currently limited to read access only.
+PubsubIO supports reading from topics by creating a new subscription.
### Write Mode
-Not supported. PubSubIO is currently limited to read access only in Beam SQL.
+PubsubIO supports writing to topics.
### Schema
@@ -411,13 +418,7 @@
### Supported Payload
-* JSON Objects (Default)
- * Beam only supports querying messages with payload containing JSON
- objects. Beam attempts to parse JSON to match the schema of the
- `payload` field.
-* Avro
- * An Avro schema is automatically generated from the specified schema of
- the `payload` field. It is used to parse incoming messages.
+* Pub/Sub supports [Generic Payload Handling](#generic-payload-handling).
### Example
@@ -427,38 +428,106 @@
LOCATION 'projects/testing-integration/topics/user-location'
```
+## Pub/Sub Lite
+
+### Syntax
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
+ publish_timestamp DATETIME,
+ event_timestamp DATETIME,
+ message_key BYTES,
+ attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
+ payload [BYTES, ROW<tableElement [, tableElement ]*>]
+)
+TYPE pubsublite
+// For writing
+LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]'
+// For reading
+LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'
+```
+
+* `LOCATION`:
+ * `PROJECT`: ID of the Google Cloud Project
+ * `TOPIC`: The Pub/Sub Lite topic name.
+ * `SUBSCRIPTION`: The Pub/Sub Lite subscription name.
+ * `GCP-LOCATION`: The location for this Pub/Sub Lite topic os subscription.
+* `TBLPROPERTIES`:
+ * `timestampAttributeKey`: Optional. The key which contains the event
+ timestamp associated with the Pub/Sub message. If not specified, the
+ message publish timestamp is used as an event timestamp for
+ windowing/watermarking.
+ * `deadLetterQueue`: Optional, supports
+ [Generic DLQ Handling](#generic-dlq-handling)
+ * `format`: Optional. Allows you to specify the payload format.
+
+### Read Mode
+
+PubsubLiteIO supports reading from subscriptions.
+
+### Write Mode
+
+PubsubLiteIO supports writing to topics.
+
+### Supported Payload
+
+* Pub/Sub Lite supports [Generic Payload Handling](#generic-payload-handling).
+
+### Example
+
+```
+CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>)
+TYPE pubsublite
+LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'
+```
+
## Kafka
KafkaIO is experimental in Beam SQL.
### Syntax
+#### Flattened mode
```
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE kafka
-LOCATION 'kafka://localhost:2181/brokers'
+LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
- "bootstrap.servers":"localhost:9092",
- "topics": ["topic1", "topic2"],
- "format": "avro"
- [, "protoClass": "com.example.ExampleMessage" ]
+ "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
+ "topics": ["topic2", "topic3"],
+ "format": "json"
}'
```
-* `LOCATION`: The Kafka topic URL.
+#### Nested mode
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
+ event_timestamp DATETIME,
+ message_key BYTES,
+ headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
+ payload [BYTES, ROW<tableElement [, tableElement ]*>]
+)
+TYPE kafka
+LOCATION 'my.company.url.com:2181/topic1'
+TBLPROPERTIES '{
+ "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
+ "topics": ["topic2", "topic3"],
+ "format": "json"
+}'
+```
+
+The presence of the `headers` field triggers nested mode usage.
+
+* `LOCATION`: A url with the initial bootstrap broker to use and the initial
+ topic name provided as the path.
* `TBLPROPERTIES`:
- * `bootstrap.servers`: Optional. Allows you to specify the bootstrap
- server.
- * `topics`: Optional. Allows you to specify specific topics.
+ * `bootstrap_servers`: Optional. Allows you to specify additional
+ bootstrap servers, which are used in addition to the one in `LOCATION`.
+ * `topics`: Optional. Allows you to specify additional topics, which are
+ used in addition to the one in `LOCATION`.
* `format`: Optional. Allows you to specify the Kafka values format. Possible values are
- {`csv`, `avro`, `json`, `proto`, `thrift`}. Defaults to `csv`.
- * `protoClass`: Optional. Use only when `format` is equal to `proto`. Allows you to
- specify full protocol buffer java class name.
- * `thriftClass`: Optional. Use only when `format` is equal to `thrift`. Allows you to
- specify full thrift java class name.
- * `thriftProtocolFactoryClass`: Optional. Use only when `format` is equal to `thrift`.
- Allows you to specify full class name of the `TProtocolFactory` to use for thrift
- serialization.
+ {`csv`, `avro`, `json`, `proto`, `thrift`}. Defaults to `csv` in
+ flattened mode or `json` in nested mode. `csv` does not support nested
+ mode.
### Read Mode
@@ -473,18 +542,8 @@
* CSV (default)
* Beam parses the messages, attempting to parse fields according to the
types specified in the schema.
-* Avro
- * An Avro schema is automatically generated from the specified field
- types. It is used to parse incoming messages and to format outgoing
- messages.
-* JSON Objects
- * Beam attempts to parse JSON to match the schema.
-* Protocol buffers
- * Fields in the schema have to match the fields of the given `protoClass`.
-* Thrift
- * Fields in the schema have to match the fields of the given `thriftClass`.
- * The `TProtocolFactory` used for thrift serialization must match the
- provided `thriftProtocolFactoryClass`.
+* Kafka supports all [Generic Payload Handling](#generic-payload-handling)
+ formats.
### Schema
@@ -582,3 +641,47 @@
TYPE text
LOCATION '/home/admin/orders'
```
+
+## Generic Payload Handling
+
+Certain data sources and sinks support generic payload handling. This handling
+parses a byte array payload field into a table schema. The following schemas are
+supported by this handling. All require at least setting `"format": "<type>"`,
+and may require other properties.
+
+* `avro`: Avro
+ * An Avro schema is automatically generated from the specified field
+ types. It is used to parse incoming messages and to format outgoing
+ messages.
+* `json`: JSON Objects
+ * Beam attempts to parse the byte array as UTF-8 JSON to match the schema.
+* `proto`: Protocol Buffers
+ * Beam locates the equivalent Protocol Buffer class and uses it to parse
+ the payload
+ * `protoClass`: Required. The proto class name to use. Must be built into
+ the deployed JAR.
+ * Fields in the schema have to match the fields of the given `protoClass`.
+* `thrift`: Thrift
+ * Fields in the schema have to match the fields of the given
+ `thriftClass`.
+ * `thriftClass`: Required. Allows you to specify full thrift java class
+ name. Must be built into the deployed JAR.
+ * `thriftProtocolFactoryClass`: Required. Allows you to specify full class
+ name of the `TProtocolFactory` to use for thrift serialization. Must be
+ built into the deployed JAR.
+ * The `TProtocolFactory` used for thrift serialization must match the
+ provided `thriftProtocolFactoryClass`.
+
+## Generic DLQ Handling
+
+Sources and sinks which support generic DLQ handling specify a parameter with
+the format `"<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]"`. The following types of
+DLQ handling are supported:
+
+* `bigquery`: BigQuery
+ * DLQ_ID is the table spec for an output table with an "error" string
+ field and "payload" byte array field.
+* `pubsub`: Pub/Sub Topic
+ * DLQ_ID is the full path of the Pub/Sub Topic.
+* `pubsublite`: Pub/Sub Lite Topic
+ * DLQ_ID is the full path of the Pub/Sub Lite Topic.
\ No newline at end of file