Merge pull request #12471 [BEAM-9615] Add initial Schema to Go conversions.
[BEAM-9615] Add initial Schema to Go conversions.
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 3622764..82541ef 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -391,7 +391,7 @@
def gax_version = "1.54.0"
def generated_grpc_ga_version = "1.85.1"
def google_auth_version = "0.19.0"
- def google_clients_version = "1.30.9"
+ def google_clients_version = "1.30.10"
def google_cloud_bigdataoss_version = "2.1.3"
def google_cloud_core_version = "1.92.2"
def google_cloud_pubsublite_version = "0.1.6"
@@ -469,13 +469,13 @@
google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version",
google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version",
google_api_common : "com.google.api:api-common:1.8.1",
- google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20191211-$google_clients_version",
- google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20200313-$google_clients_version",
- google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200311-$google_clients_version",
- google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20200305-$google_clients_version",
- google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1beta1-rev20200525-$google_clients_version",
- google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20200312-$google_clients_version",
- google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20200226-$google_clients_version",
+ google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20200719-$google_clients_version",
+ google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20200501-$google_clients_version",
+ google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200720-$google_clients_version",
+ google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20200713-$google_clients_version",
+ google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1beta1-rev20200713-$google_clients_version",
+ google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20200713-$google_clients_version",
+ google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20200611-$google_clients_version",
google_auth_library_credentials : "com.google.auth:google-auth-library-credentials:$google_auth_version",
google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http:$google_auth_version",
google_cloud_bigquery : "com.google.cloud:google-cloud-bigquery:1.108.0",
diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index 0c91573..eaad35f 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -52,6 +52,7 @@
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:extensions:google-cloud-platform-core")
compile project(":sdks:java:io:google-cloud-platform")
+ compile project(":sdks:java:extensions:ml")
compile library.java.avro
compile library.java.bigdataoss_util
compile library.java.google_api_client
@@ -60,6 +61,7 @@
compile library.java.google_auth_library_credentials
compile library.java.google_auth_library_oauth2_http
compile library.java.google_cloud_datastore_v1_proto_client
+ compile library.java.google_code_gson
compile library.java.google_http_client
compile library.java.joda_time
compile library.java.proto_google_cloud_datastore_v1
@@ -67,6 +69,7 @@
compile library.java.slf4j_jdk14
runtime project(path: ":runners:direct-java", configuration: "shadow")
testCompile project(":sdks:java:io:google-cloud-platform")
+ testCompile project(":sdks:java:extensions:ml")
testCompile library.java.hamcrest_core
testCompile library.java.hamcrest_library
testCompile library.java.junit
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 3393e6d..cb55475 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -22,6 +22,14 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
+import com.google.cloud.language.v1.AnnotateTextRequest;
+import com.google.cloud.language.v1.AnnotateTextResponse;
+import com.google.cloud.language.v1.Document;
+import com.google.cloud.language.v1.Entity;
+import com.google.cloud.language.v1.Sentence;
+import com.google.cloud.language.v1.Token;
+import com.google.gson.Gson;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
@@ -30,12 +38,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.extensions.ml.AnnotateText;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.GenerateSequence;
@@ -63,6 +73,7 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Watch;
@@ -984,4 +995,167 @@
return result;
}
}
+
+ public static class NaturalLanguageIntegration {
+ private static final SerializableFunction<AnnotateTextResponse, List<Map<String, List<String>>>>
+ // [START NlpAnalyzeDependencyTree]
+ analyzeDependencyTree =
+ (SerializableFunction<AnnotateTextResponse, List<Map<String, List<String>>>>)
+ response -> {
+ List<Map<String, List<String>>> adjacencyLists = new ArrayList<>();
+ int index = 0;
+ for (Sentence s : response.getSentencesList()) {
+ Map<String, List<String>> adjacencyMap = new HashMap<>();
+ int sentenceBegin = s.getText().getBeginOffset();
+ int sentenceEnd = sentenceBegin + s.getText().getContent().length() - 1;
+ while (index < response.getTokensCount()
+ && response.getTokens(index).getText().getBeginOffset() <= sentenceEnd) {
+ Token token = response.getTokensList().get(index);
+ int headTokenIndex = token.getDependencyEdge().getHeadTokenIndex();
+ String headTokenContent =
+ response.getTokens(headTokenIndex).getText().getContent();
+ List<String> adjacencyList =
+ adjacencyMap.getOrDefault(headTokenContent, new ArrayList<>());
+ adjacencyList.add(token.getText().getContent());
+ adjacencyMap.put(headTokenContent, adjacencyList);
+ index++;
+ }
+ adjacencyLists.add(adjacencyMap);
+ }
+ return adjacencyLists;
+ };
+ // [END NlpAnalyzeDependencyTree]
+
+ private static final SerializableFunction<? super AnnotateTextResponse, TextSentiments>
+ // [START NlpExtractSentiments]
+ extractSentiments =
+ (SerializableFunction<AnnotateTextResponse, TextSentiments>)
+ annotateTextResponse -> {
+ TextSentiments sentiments = new TextSentiments();
+ sentiments.setDocumentSentiment(
+ annotateTextResponse.getDocumentSentiment().getMagnitude());
+ Map<String, Float> sentenceSentimentsMap =
+ annotateTextResponse.getSentencesList().stream()
+ .collect(
+ Collectors.toMap(
+ (Sentence s) -> s.getText().getContent(),
+ (Sentence s) -> s.getSentiment().getMagnitude()));
+ sentiments.setSentenceSentiments(sentenceSentimentsMap);
+ return sentiments;
+ };
+ // [END NlpExtractSentiments]
+
+ private static final SerializableFunction<? super AnnotateTextResponse, Map<String, String>>
+ // [START NlpExtractEntities]
+ extractEntities =
+ (SerializableFunction<AnnotateTextResponse, Map<String, String>>)
+ annotateTextResponse ->
+ annotateTextResponse.getEntitiesList().stream()
+ .collect(
+ Collectors.toMap(Entity::getName, (Entity e) -> e.getType().toString()));
+ // [END NlpExtractEntities]
+
+ private static final SerializableFunction<? super Map<String, String>, String>
+ mapEntitiesToJson =
+ (SerializableFunction<Map<String, String>, String>)
+ item -> {
+ StringBuilder builder = new StringBuilder("[");
+ builder.append(
+ item.entrySet().stream()
+ .map(
+ entry -> "{\"" + entry.getKey() + "\": \"" + entry.getValue() + "\"}")
+ .collect(Collectors.joining(",")));
+ builder.append("]");
+ return builder.toString();
+ };
+
+ private static final SerializableFunction<List<Map<String, List<String>>>, String>
+ mapDependencyTreesToJson =
+ (SerializableFunction<List<Map<String, List<String>>>, String>)
+ tree -> {
+ Gson gson = new Gson();
+ return gson.toJson(tree);
+ };
+
+ public static void main(Pipeline p) {
+ // [START NlpAnalyzeText]
+ AnnotateTextRequest.Features features =
+ AnnotateTextRequest.Features.newBuilder()
+ .setExtractEntities(true)
+ .setExtractDocumentSentiment(true)
+ .setExtractEntitySentiment(true)
+ .setExtractSyntax(true)
+ .build();
+ AnnotateText annotateText = AnnotateText.newBuilder().setFeatures(features).build();
+
+ PCollection<AnnotateTextResponse> responses =
+ p.apply(
+ Create.of(
+ "My experience so far has been fantastic, "
+ + "I\'d really recommend this product."))
+ .apply(
+ MapElements.into(TypeDescriptor.of(Document.class))
+ .via(
+ (SerializableFunction<String, Document>)
+ input ->
+ Document.newBuilder()
+ .setContent(input)
+ .setType(Document.Type.PLAIN_TEXT)
+ .build()))
+ .apply(annotateText);
+
+ responses
+ .apply(MapElements.into(TypeDescriptor.of(TextSentiments.class)).via(extractSentiments))
+ .apply(
+ MapElements.into(TypeDescriptors.strings())
+ .via((SerializableFunction<TextSentiments, String>) TextSentiments::toJson))
+ .apply(TextIO.write().to("sentiments.txt"));
+
+ responses
+ .apply(
+ MapElements.into(
+ TypeDescriptors.maps(TypeDescriptors.strings(), TypeDescriptors.strings()))
+ .via(extractEntities))
+ .apply(MapElements.into(TypeDescriptors.strings()).via(mapEntitiesToJson))
+ .apply(TextIO.write().to("entities.txt"));
+
+ responses
+ .apply(
+ MapElements.into(
+ TypeDescriptors.lists(
+ TypeDescriptors.maps(
+ TypeDescriptors.strings(),
+ TypeDescriptors.lists(TypeDescriptors.strings()))))
+ .via(analyzeDependencyTree))
+ .apply(MapElements.into(TypeDescriptors.strings()).via(mapDependencyTreesToJson))
+ .apply(TextIO.write().to("adjacency_list.txt"));
+ // [END NlpAnalyzeText]
+ }
+
+ private static class TextSentiments implements Serializable {
+ private Float documentSentiment;
+ private Map<String, Float> sentenceSentiments;
+
+ public void setSentenceSentiments(Map<String, Float> sentenceSentiments) {
+ this.sentenceSentiments = sentenceSentiments;
+ }
+
+ public Float getDocumentSentiment() {
+ return documentSentiment;
+ }
+
+ public void setDocumentSentiment(Float documentSentiment) {
+ this.documentSentiment = documentSentiment;
+ }
+
+ public Map<String, Float> getSentenceSentiments() {
+ return sentenceSentiments;
+ }
+
+ public String toJson() {
+ Gson gson = new Gson();
+ return gson.toJson(this);
+ }
+ }
+ }
}
diff --git a/learning/katas/go/core_transforms/section-info.yaml b/learning/katas/go/core_transforms/section-info.yaml
index 32ae3aa..bcdab5d 100644
--- a/learning/katas/go/core_transforms/section-info.yaml
+++ b/learning/katas/go/core_transforms/section-info.yaml
@@ -28,3 +28,4 @@
- additional_outputs
- branching
- composite
+- windowing
diff --git a/learning/katas/go/core_transforms/windowing/lesson-info.yaml b/learning/katas/go/core_transforms/windowing/lesson-info.yaml
new file mode 100644
index 0000000..2315b4c
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/lesson-info.yaml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+content:
+- windowing
diff --git a/learning/katas/go/core_transforms/windowing/lesson-remote-info.yaml b/learning/katas/go/core_transforms/windowing/lesson-remote-info.yaml
new file mode 100644
index 0000000..7a20a3a
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/lesson-remote-info.yaml
@@ -0,0 +1,3 @@
+id: 387853
+update_date: Thu, 06 Aug 2020 17:53:20 UTC
+unit: 377026
diff --git a/learning/katas/go/core_transforms/windowing/windowing/cmd/main.go b/learning/katas/go/core_transforms/windowing/windowing/cmd/main.go
new file mode 100644
index 0000000..e1cfcf0
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/cmd/main.go
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "beam.apache.org/learning/katas/core_transforms/windowing/windowing/pkg/common"
+ "beam.apache.org/learning/katas/core_transforms/windowing/windowing/pkg/task"
+ "context"
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/log"
+ "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+ "github.com/apache/beam/sdks/go/pkg/beam/x/debug"
+)
+
+func main() {
+ ctx := context.Background()
+
+ p, s := beam.NewPipelineWithRoot()
+
+ input := common.CreateLines(s)
+
+ result := task.ApplyTransform(s, input)
+
+ output := beam.ParDo(s, func(commit task.Commit) string {
+ return commit.String()
+ }, result)
+
+ debug.Print(s, output)
+
+ err := beamx.Run(ctx, p)
+
+ if err != nil {
+ log.Exitf(context.Background(), "Failed to execute job: %v", err)
+ }
+}
diff --git a/learning/katas/go/core_transforms/windowing/windowing/pkg/common/input.go b/learning/katas/go/core_transforms/windowing/windowing/pkg/common/input.go
new file mode 100644
index 0000000..3353e2b
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/pkg/common/input.go
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package common
+
+import (
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+ "time"
+)
+
+var (
+ lines = map[time.Time]string{
+ time.Date(2020, 7, 31, 15, 52, 5, 0, time.UTC): "3c6c45924a Remove trailing whitespace from README",
+ time.Date(2020, 7, 31, 15, 59, 40, 0, time.UTC): "a52be99b62 Merge pull request #12443 from KevinGG/whitespace",
+ time.Date(2020, 7, 31, 16, 7, 36, 0, time.UTC): "7c1772d13f Merge pull request #12439 from ibzib/beam-9199-1",
+ time.Date(2020, 7, 31, 16, 35, 41, 0, time.UTC): "d971ba13b8 Widen ranges for GCP libraries (#12198)",
+ time.Date(2020, 8, 1, 0, 7, 25, 0, time.UTC): "875620111b Enable all Jenkins jobs triggering for committers (#12407)",
+ }
+)
+
+func CreateLines(s beam.Scope) beam.PCollection {
+ return beam.ParDo(s, timestampFn, beam.Impulse(s))
+}
+
+func timestampFn(_ []byte, emit func(beam.EventTime, string)) {
+ for timestamp, line := range lines {
+ emit(mtime.FromTime(timestamp), line)
+ }
+}
diff --git a/learning/katas/go/core_transforms/windowing/windowing/pkg/task/task.go b/learning/katas/go/core_transforms/windowing/windowing/pkg/task/task.go
new file mode 100644
index 0000000..1838578
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/pkg/task/task.go
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package task
+
+import (
+ "fmt"
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+ "time"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+ windowed := beam.WindowInto(s, window.NewFixedWindows(time.Hour), input)
+ return beam.ParDo(s, timestampFn, windowed)
+}
+
+func timestampFn(iw beam.Window, et beam.EventTime, line string) Commit {
+ return Commit{
+ MaxTimestampWindow: toTime(iw.MaxTimestamp()),
+ EventTimestamp: toTime(et),
+ Line: line,
+ }
+}
+
+func toTime(et beam.EventTime) time.Time {
+ return time.Unix(0, et.Milliseconds() * int64(time.Millisecond))
+}
+
+type Commit struct {
+ MaxTimestampWindow time.Time
+ EventTimestamp time.Time
+ Line string
+}
+
+func (c Commit) String() string {
+ return fmt.Sprintf("Window ending at: %v contains timestamp: %v for commit: \"%s\"",
+ c.MaxTimestampWindow.Format(time.Kitchen),
+ c.EventTimestamp.Format(time.Kitchen),
+ c.Line)
+}
+
diff --git a/learning/katas/go/core_transforms/windowing/windowing/task-info.yaml b/learning/katas/go/core_transforms/windowing/windowing/task-info.yaml
new file mode 100644
index 0000000..b0e38f7
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/task-info.yaml
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+type: edu
+files:
+- name: test/task_test.go
+ visible: false
+- name: cmd/main.go
+ visible: true
+- name: pkg/common/input.go
+ visible: true
+- name: pkg/task/task.go
+ visible: true
+ placeholders:
+ - offset: 1030
+ length: 60
+ placeholder_text: TODO()
+ - offset: 1099
+ length: 36
+ placeholder_text: TODO()
+ - offset: 1156
+ length: 46
+ placeholder_text: TODO()
+ - offset: 1221
+ length: 121
+ placeholder_text: TODO()
diff --git a/learning/katas/go/core_transforms/windowing/windowing/task-remote-info.yaml b/learning/katas/go/core_transforms/windowing/windowing/task-remote-info.yaml
new file mode 100644
index 0000000..235e663
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/task-remote-info.yaml
@@ -0,0 +1,2 @@
+id: 1464828
+update_date: Thu, 06 Aug 2020 17:53:26 UTC
diff --git a/learning/katas/go/core_transforms/windowing/windowing/task.md b/learning/katas/go/core_transforms/windowing/windowing/task.md
new file mode 100644
index 0000000..22444a6
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/task.md
@@ -0,0 +1,77 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+# Windowing
+
+This lesson introduces the concept of windowed PCollection elements. A window is a view into a fixed beginning and
+fixed end to a set of data. In the beam model, windowing subdivides a PCollection according to the
+timestamps of its individual elements. An element can be a part of one or more windows.
+
+A DoFn can request timestamp and windowing information about the element it is processing. All the previous lessons
+had this information available as well. This lesson makes use of these parameters. The simple dataset
+has five git commit messages and their timestamps from the
+[Apache Beam public repository](https://github.com/apache/beam). Timestamps have been applied to this PCollection
+input according to the date and time of these messages.
+
+**Kata:** This lesson challenges you to apply an hourly fixed window to a PCollection. You are then to
+apply a ParDo to that hourly fixed windowed PCollection to produce a PCollection of a Commit struct. The
+Commit struct is provided for you. You are encouraged to run the pipeline at cmd/main.go of this task
+to visualize the windowing and timestamps.
+
+<div class="hint">
+ Use <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#ParDo">
+ beam.ParDo</a>
+ with a <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#hdr-DoFns">
+ DoFn</a> to accomplish this lesson.
+</div>
+
+<div class="hint">
+ Use <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#WindowInto">
+ beam.WindowInto</a>
+ with <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam/core/graph/window#NewFixedWindows">
+ window.NewFixedWindows(time.Hour)</a>
+ on your PCollection input to apply an hourly windowing strategy to each element.
+</div>
+
+<div class="hint">
+ To access <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#Window">
+ beam.Window</a>
+ and <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#EventTime">
+ beam.EventTime</a> in your DoFn, add the parameters in the set order.
+
+```
+func doFn(iw beam.Window, et beam.EventTime, element X) Y {
+ // do something with iw, et and element to return Y
+}
+```
+</div>
+
+<div class="hint">
+ The Commit struct provided for you has a MaxTimestampWindow property that can be set from
+ <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#Window">
+ beam.Window</a>'s MaxTimestamp().
+</div>
+
+<div class="hint">
+ Refer to the Beam Programming Guide for additional information about
+ <a href="https://beam.apache.org/documentation/programming-guide/#other-dofn-parameters">
+ additional DoFn parameters</a> and
+ <a href="https://beam.apache.org/documentation/programming-guide/#windowing">
+ windowing</a>.
+</div>
+
diff --git a/learning/katas/go/core_transforms/windowing/windowing/test/task_test.go b/learning/katas/go/core_transforms/windowing/windowing/test/task_test.go
new file mode 100644
index 0000000..8d4a2e2
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/test/task_test.go
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package test
+
+import (
+ "beam.apache.org/learning/katas/core_transforms/windowing/windowing/pkg/common"
+ "beam.apache.org/learning/katas/core_transforms/windowing/windowing/pkg/task"
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+ "github.com/google/go-cmp/cmp"
+ "testing"
+ "time"
+)
+
+func TestApplyTransform(t *testing.T) {
+ p, s := beam.NewPipelineWithRoot()
+ tests := []struct {
+ input beam.PCollection
+ want []interface{}
+ }{
+ {
+ input: common.CreateLines(s),
+ want: []interface{}{
+ task.Commit{
+ MaxTimestampWindow: time.Unix(1596211199, 0),
+ EventTimestamp: time.Unix(1596210725, 0),
+ Line: "3c6c45924a Remove trailing whitespace from README",
+ },
+ task.Commit{
+ MaxTimestampWindow: time.Unix(1596211199, 0),
+ EventTimestamp: time.Unix(1596211180, 0),
+ Line: "a52be99b62 Merge pull request #12443 from KevinGG/whitespace",
+ },
+ task.Commit{
+ MaxTimestampWindow: time.Unix(1596214799, 0),
+ EventTimestamp: time.Unix(1596211656, 0),
+ Line: "7c1772d13f Merge pull request #12439 from ibzib/beam-9199-1",
+ },
+ task.Commit{
+ MaxTimestampWindow: time.Unix(1596214799, 0),
+ EventTimestamp: time.Unix(1596213341, 0),
+ Line: "d971ba13b8 Widen ranges for GCP libraries (#12198)",
+ },
+ task.Commit{
+ MaxTimestampWindow: time.Unix(1596243599, 0),
+ EventTimestamp: time.Unix(1596240445, 0),
+ Line: "875620111b Enable all Jenkins jobs triggering for committers (#12407)",
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ got := task.ApplyTransform(s, tt.input)
+ cmp.Equal(got, tt.want)
+ if err := ptest.Run(p); err != nil {
+ t.Error(err)
+ }
+ }
+}
diff --git a/learning/katas/go/course-remote-info.yaml b/learning/katas/go/course-remote-info.yaml
index 90e7821..e944389 100644
--- a/learning/katas/go/course-remote-info.yaml
+++ b/learning/katas/go/course-remote-info.yaml
@@ -1,2 +1,2 @@
id: 70387
-update_date: Mon, 27 Jul 2020 20:44:48 UTC
+update_date: Wed, 29 Jul 2020 20:42:36 UTC
diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
index 80e4eb8..08b823d 100644
--- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
+++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
@@ -347,7 +347,6 @@
urn: "beam:coder:row:v1"
# str: string, i32: int32, f64: float64, arr: array[string]
payload: "\n\t\n\x03str\x1a\x02\x10\x07\n\t\n\x03i32\x1a\x02\x10\x03\n\t\n\x03f64\x1a\x02\x10\x06\n\r\n\x03arr\x1a\x06\x1a\x04\n\x02\x10\x07\x12$4e5e554c-d4c1-4a5d-b5e1-f3293a6b9f05"
-nested: false
examples:
"\u0004\u0000\u0003foo\u00a9\u0046\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a\0\0\0\u0003\u0003foo\u0003bar\u0003baz": {str: "foo", i32: 9001, f64: "0.1", arr: ["foo", "bar", "baz"]}
@@ -357,7 +356,6 @@
urn: "beam:coder:row:v1"
# str: nullable string, i32: nullable int32, f64: nullable float64
payload: "\n\x0b\n\x03str\x1a\x04\x08\x01\x10\x07\n\x0b\n\x03i32\x1a\x04\x08\x01\x10\x03\n\x0b\n\x03f64\x1a\x04\x08\x01\x10\x06\x12$b20c6545-57af-4bc8-b2a9-51ace21c7393"
-nested: false
examples:
"\u0003\u0001\u0007": {str: null, i32: null, f64: null}
"\u0003\u0001\u0004\u0003foo\u00a9\u0046": {str: "foo", i32: 9001, f64: null}
@@ -380,7 +378,33 @@
urn: "beam:coder:row:v1"
# f_bool: boolean, f_bytes: nullable bytes
payload: "\n\x0c\n\x06f_bool\x1a\x02\x10\x08\n\x0f\n\x07f_bytes\x1a\x04\x08\x01\x10\t\x12$eea1b747-7571-43d3-aafa-9255afdceafb"
-nested: false
examples:
"\x02\x01\x02\x01": {f_bool: True, f_bytes: null}
"\x02\x00\x00\x04ab\x00c": {f_bool: False, f_bytes: "ab\0c"}
+
+---
+
+# Binary data generated with the python SDK:
+#
+# import typing
+# import apache_beam as beam
+# class Test(typing.NamedTuple):
+# f_map: typing.Mapping[str,int]
+# schema = beam.typehints.schemas.named_tuple_to_schema(Test)
+# coder = beam.coders.row_coder.RowCoder(schema)
+# print("payload = %s" % schema.SerializeToString())
+# examples = (Test(f_map={}),
+# Test(f_map={"foo": 9001, "bar": 9223372036854775807}),
+# Test(f_map={"everything": None, "is": None, "null!": None, "¯\_(ツ)_/¯": None}))
+# for example in examples:
+# print("example = %s" % coder.encode(example))
+coder:
+ urn: "beam:coder:row:v1"
+ # f_map: map<str, nullable int64>
+ payload: "\n\x15\n\x05f_map\x1a\x0c*\n\n\x02\x10\x07\x12\x04\x08\x01\x10\x04\x12$d8c8f969-14e6-457f-a8b5-62a1aec7f1cd"
+ # map ordering is non-deterministic
+ non_deterministic: True
+examples:
+ "\x01\x00\x00\x00\x00\x00": {f_map: {}}
+ "\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f": {f_map: {"foo": 9001, "bar": 9223372036854775807}}
+ "\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map: {"everything":null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 3623790..8b1ce6b 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -855,10 +855,21 @@
// BOOLEAN: beam:coder:bool:v1
// BYTES: beam:coder:bytes:v1
// ArrayType: beam:coder:iterable:v1 (always has a known length)
- // MapType: not yet a standard coder (BEAM-7996)
+ // MapType: not a standard coder, specification defined below.
// RowType: beam:coder:row:v1
// LogicalType: Uses the coder for its representation.
//
+ // The MapType is encoded by:
+ // - An INT32 representing the size of the map (N)
+ // - Followed by N interleaved keys and values, encoded with their
+ // corresponding coder.
+ //
+ // Nullable types in container types (ArrayType, MapType) are encoded by:
+ // - A one byte null indicator, 0x00 for null values, or 0x01 for present
+ // values.
+ // - For present values the null indicator is followed by the value
+ // encoded with it's corresponding coder.
+ //
// The payload for RowCoder is an instance of Schema.
// Components: None
// Experimental.
diff --git a/model/pipeline/src/main/proto/schema.proto b/model/pipeline/src/main/proto/schema.proto
index dcf75ca..bffa5f1 100644
--- a/model/pipeline/src/main/proto/schema.proto
+++ b/model/pipeline/src/main/proto/schema.proto
@@ -19,6 +19,9 @@
// ** Experimental **
// Protocol Buffers describing Beam Schemas, a portable representation for
// complex types.
+//
+// The primary application of Schema is as the payload for the standard coder
+// "beam:coder:row:v1", defined in beam_runner_api.proto
syntax = "proto3";
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
index 10221b8..2c4090e 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
@@ -21,7 +21,6 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
@@ -43,6 +42,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -340,6 +340,10 @@
}
private static Object parseField(Object value, Schema.FieldType fieldType) {
+ if (value == null) {
+ return null;
+ }
+
switch (fieldType.getTypeName()) {
case BYTE:
return ((Number) value).byteValue();
@@ -366,14 +370,18 @@
.map((element) -> parseField(element, fieldType.getCollectionElementType()))
.collect(toImmutableList());
case MAP:
- Map<Object, Object> kvMap = (Map<Object, Object>) value;
- return kvMap.entrySet().stream()
- .collect(
- toImmutableMap(
- (pair) -> parseField(pair.getKey(), fieldType.getMapKeyType()),
- (pair) -> parseField(pair.getValue(), fieldType.getMapValueType())));
+ Map<Object, Object> kvMap = new HashMap<>();
+ ((Map<Object, Object>) value)
+ .entrySet().stream()
+ .forEach(
+ (entry) ->
+ kvMap.put(
+ parseField(entry.getKey(), fieldType.getMapKeyType()),
+ parseField(entry.getValue(), fieldType.getMapValueType())));
+ return kvMap;
case ROW:
- Map<String, Object> rowMap = (Map<String, Object>) value;
+ // Clone map so we don't mutate the underlying value
+ Map<String, Object> rowMap = new HashMap<>((Map<String, Object>) value);
Schema schema = fieldType.getRowSchema();
Row.Builder row = Row.withSchema(schema);
for (Schema.Field field : schema.getFields()) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
new file mode 100644
index 0000000..148c726
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * <p>It cannot represent an instant on the time-line without additional information such as an
+ * offset or time-zone.
+ *
+ * <p>Its input type is a {@link LocalDateTime}, and base type is a {@link Row} containing Date
+ * field and Time field. Date field is the same as the base type of {@link Date}, which is a Long
+ * that represents incrementing count of days where day 0 is 1970-01-01 (ISO). Time field is the
+ * same as the base type of {@link Time}, which is a Long that represents a count of time in
+ * nanoseconds.
+ */
+public class DateTime implements Schema.LogicalType<LocalDateTime, Row> {
+ public static final String DATE_FIELD_NAME = "Date";
+ public static final String TIME_FIELD_NAME = "Time";
+ public static final Schema DATETIME_SCHEMA =
+ Schema.builder().addInt64Field(DATE_FIELD_NAME).addInt64Field(TIME_FIELD_NAME).build();
+
+ @Override
+ public String getIdentifier() {
+ return "beam:logical_type:datetime:v1";
+ }
+
+ // unused
+ @Override
+ public Schema.FieldType getArgumentType() {
+ return Schema.FieldType.STRING;
+ }
+
+ // unused
+ @Override
+ public String getArgument() {
+ return "";
+ }
+
+ @Override
+ public Schema.FieldType getBaseType() {
+ return Schema.FieldType.row(DATETIME_SCHEMA);
+ }
+
+ @Override
+ public Row toBaseType(LocalDateTime input) {
+ return input == null
+ ? null
+ : Row.withSchema(DATETIME_SCHEMA)
+ .addValues(input.toLocalDate().toEpochDay(), input.toLocalTime().toNanoOfDay())
+ .build();
+ }
+
+ @Override
+ public LocalDateTime toInputType(Row base) {
+ return base == null
+ ? null
+ : LocalDateTime.of(
+ LocalDate.ofEpochDay(base.getInt64(DATE_FIELD_NAME)),
+ LocalTime.ofNanoOfDay(base.getInt64(TIME_FIELD_NAME)));
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
index d22b77a..ef6a68a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
@@ -18,8 +18,10 @@
package org.apache.beam.sdk.schemas.logicaltypes;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.values.Row;
/** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType}s corresponding to SQL data types. */
public class SqlTypes {
@@ -31,4 +33,7 @@
/** Beam LogicalType corresponding to ZetaSQL/CalciteSQL TIME type. */
public static final LogicalType<LocalTime, Long> TIME = new Time();
+
+ /** Beam LogicalType corresponding to ZetaSQL DATETIME type. */
+ public static final LogicalType<LocalDateTime, Row> DATETIME = new DateTime();
}
diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
index 4604a00..42efbbd 100644
--- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
+++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
@@ -39,7 +39,7 @@
.put("BOOL", FieldType.BOOLEAN)
.put("BYTES", FieldType.BYTES)
.put("DATE", FieldType.logicalType(SqlTypes.DATE))
- .put("DATETIME", FieldType.DATETIME)
+ .put("DATETIME", FieldType.logicalType(SqlTypes.DATETIME))
.put("DOUBLE", FieldType.DOUBLE)
.put("FLOAT", FieldType.DOUBLE)
.put("FLOAT64", FieldType.DOUBLE)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java
index 3ef4d9f..b4a9d7e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java
@@ -124,7 +124,7 @@
@Override
public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
- return CalciteUtils.sqlTypeWithAutoCast(typeFactory, method.getReturnType());
+ return CalciteUtils.sqlTypeWithAutoCast(typeFactory, method.getGenericReturnType());
}
@Override
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
index 1d234b0..4452422 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
@@ -48,6 +48,8 @@
switch (typeName) {
case TIME:
return 6; // support microsecond time precision
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return 6; // support microsecond datetime precision
default:
return super.getMaxPrecision(typeName);
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index b6d606b..d9ad401 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -27,6 +27,7 @@
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.AbstractList;
import java.util.AbstractMap;
@@ -35,13 +36,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimestampWithLocalTzType;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -92,6 +94,7 @@
public class BeamCalcRel extends AbstractBeamCalcRel {
private static final long NANOS_PER_MILLISECOND = 1000000L;
+ private static final long MILLIS_PER_DAY = 86400000L;
private static final ParameterExpression outputSchemaParam =
Expressions.parameter(Schema.class, "outputSchema");
@@ -344,6 +347,18 @@
valueDateTime = Expressions.unbox(valueDateTime);
}
valueDateTime = Expressions.call(LocalDate.class, "ofEpochDay", valueDateTime);
+ } else if (CalciteUtils.TIMESTAMP_WITH_LOCAL_TZ.typesEqual(toType)
+ || CalciteUtils.NULLABLE_TIMESTAMP_WITH_LOCAL_TZ.typesEqual(toType)) {
+ // Convert TimeStamp_With_Local_TimeZone to LocalDateTime
+ Expression dateValue =
+ Expressions.divide(valueDateTime, Expressions.constant(MILLIS_PER_DAY));
+ Expression date = Expressions.call(LocalDate.class, "ofEpochDay", dateValue);
+ Expression timeValue =
+ Expressions.multiply(
+ Expressions.modulo(valueDateTime, Expressions.constant(MILLIS_PER_DAY)),
+ Expressions.constant(NANOS_PER_MILLISECOND));
+ Expression time = Expressions.call(LocalTime.class, "ofNanoOfDay", timeValue);
+ valueDateTime = Expressions.call(LocalDateTime.class, "of", date, time);
} else {
throw new UnsupportedOperationException("Unknown DateTime type " + toType);
}
@@ -385,7 +400,7 @@
.put(SqlTypes.DATE.getIdentifier(), Long.class)
.put(SqlTypes.TIME.getIdentifier(), Long.class)
.put(TimeWithLocalTzType.IDENTIFIER, ReadableInstant.class)
- .put(TimestampWithLocalTzType.IDENTIFIER, ReadableInstant.class)
+ .put(SqlTypes.DATETIME.getIdentifier(), Row.class)
.put(CharType.IDENTIFIER, String.class)
.build();
@@ -442,6 +457,16 @@
value, Expressions.divide(value, Expressions.constant(NANOS_PER_MILLISECOND)));
} else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
return value;
+ } else if (SqlTypes.DATETIME.getIdentifier().equals(logicalId)) {
+ Expression dateValue =
+ Expressions.call(value, "getInt64", Expressions.constant(DateTime.DATE_FIELD_NAME));
+ Expression timeValue =
+ Expressions.call(value, "getInt64", Expressions.constant(DateTime.TIME_FIELD_NAME));
+ Expression returnValue =
+ Expressions.add(
+ Expressions.multiply(dateValue, Expressions.constant(MILLIS_PER_DAY)),
+ Expressions.divide(timeValue, Expressions.constant(NANOS_PER_MILLISECOND)));
+ return nullOr(value, returnValue);
} else if (!CharType.IDENTIFIER.equals(logicalId)) {
throw new UnsupportedOperationException(
"Unknown LogicalType " + type.getLogicalType().getIdentifier());
@@ -563,6 +588,8 @@
|| name.equals(DataContext.Variable.CURRENT_TIMESTAMP.camelName)
|| name.equals(DataContext.Variable.LOCAL_TIMESTAMP.camelName)) {
return System.currentTimeMillis();
+ } else if (name.equals(Variable.TIME_ZONE.camelName)) {
+ return TimeZone.getDefault();
}
return null;
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index acb4ee1..6ded492 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.utils;
+import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Date;
import java.util.Map;
@@ -53,15 +54,6 @@
}
}
- /** A LogicalType corresponding to TIMESTAMP_WITH_LOCAL_TIME_ZONE. */
- public static class TimestampWithLocalTzType extends PassThroughLogicalType<Instant> {
- public static final String IDENTIFIER = "SqlTimestampWithLocalTzType";
-
- public TimestampWithLocalTzType() {
- super(IDENTIFIER, FieldType.STRING, "", FieldType.DATETIME);
- }
- }
-
/** A LogicalType corresponding to CHAR. */
public static class CharType extends PassThroughLogicalType<String> {
public static final String IDENTIFIER = "SqlCharType";
@@ -82,7 +74,7 @@
return logicalId.equals(SqlTypes.DATE.getIdentifier())
|| logicalId.equals(SqlTypes.TIME.getIdentifier())
|| logicalId.equals(TimeWithLocalTzType.IDENTIFIER)
- || logicalId.equals(TimestampWithLocalTzType.IDENTIFIER);
+ || logicalId.equals(SqlTypes.DATETIME.getIdentifier());
}
return false;
}
@@ -121,8 +113,9 @@
FieldType.logicalType(new TimeWithLocalTzType());
public static final FieldType TIMESTAMP = FieldType.DATETIME;
public static final FieldType NULLABLE_TIMESTAMP = FieldType.DATETIME.withNullable(true);
- public static final FieldType TIMESTAMP_WITH_LOCAL_TZ =
- FieldType.logicalType(new TimestampWithLocalTzType());
+ public static final FieldType TIMESTAMP_WITH_LOCAL_TZ = FieldType.logicalType(SqlTypes.DATETIME);
+ public static final FieldType NULLABLE_TIMESTAMP_WITH_LOCAL_TZ =
+ FieldType.logicalType(SqlTypes.DATETIME).withNullable(true);
private static final BiMap<FieldType, SqlTypeName> BEAM_TO_CALCITE_TYPE_MAPPING =
ImmutableBiMap.<FieldType, SqlTypeName>builder()
@@ -283,18 +276,26 @@
/**
* SQL-Java type mapping, with specified Beam rules: <br>
- * 1. redirect {@link AbstractInstant} to {@link Date} so Calcite can recognize it.
+ * 1. redirect {@link AbstractInstant} to {@link Date} so Calcite can recognize it. <br>
+ * 2. For a list, the component type is needed to create a Sql array type.
*
- * @param rawType
- * @return
+ * @param type
+ * @return Calcite RelDataType
*/
- public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Type rawType) {
+ public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Type type) {
// For Joda time types, return SQL type for java.util.Date.
- if (rawType instanceof Class && AbstractInstant.class.isAssignableFrom((Class<?>) rawType)) {
+ if (type instanceof Class && AbstractInstant.class.isAssignableFrom((Class<?>) type)) {
return typeFactory.createJavaType(Date.class);
- } else if (rawType instanceof Class && ByteString.class.isAssignableFrom((Class<?>) rawType)) {
+ } else if (type instanceof Class && ByteString.class.isAssignableFrom((Class<?>) type)) {
return typeFactory.createJavaType(byte[].class);
+ } else if (type instanceof ParameterizedType
+ && java.util.List.class.isAssignableFrom(
+ (Class<?>) ((ParameterizedType) type).getRawType())) {
+ ParameterizedType parameterizedType = (ParameterizedType) type;
+ Class<?> genericType = (Class<?>) parameterizedType.getActualTypeArguments()[0];
+ RelDataType collectionElementType = typeFactory.createJavaType(genericType);
+ return typeFactory.createArrayType(collectionElementType, UNLIMITED_ARRAY_SIZE);
}
- return typeFactory.createJavaType((Class) rawType);
+ return typeFactory.createJavaType((Class) type);
}
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
index 7aa5032..f107dc3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
@@ -38,6 +38,7 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.BitString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString;
import org.checkerframework.checker.nullness.qual.Nullable;
public class BeamSqlUnparseContext extends SqlImplementor.SimpleContext {
@@ -69,8 +70,12 @@
public SqlNode toSql(RexProgram program, RexNode rex) {
if (rex.getKind().equals(SqlKind.LITERAL)) {
final RexLiteral literal = (RexLiteral) rex;
- SqlTypeFamily family = literal.getTypeName().getFamily();
- if (SqlTypeFamily.BINARY.equals(family)) {
+ SqlTypeName name = literal.getTypeName();
+ SqlTypeFamily family = name.getFamily();
+ if (SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.equals(name)) {
+ TimestampString timestampString = literal.getValueAs(TimestampString.class);
+ return new SqlDateTimeLiteral(timestampString, POS);
+ } else if (SqlTypeFamily.BINARY.equals(family)) {
ByteString byteString = literal.getValueAs(ByteString.class);
BitString bitString = BitString.createFromHexString(byteString.toString(16));
return new SqlByteStringLiteral(bitString, POS);
@@ -92,6 +97,21 @@
return super.toSql(program, rex);
}
+ private static class SqlDateTimeLiteral extends SqlLiteral {
+
+ private final TimestampString timestampString;
+
+ SqlDateTimeLiteral(TimestampString timestampString, SqlParserPos pos) {
+ super(timestampString, SqlTypeName.TIMESTAMP, pos);
+ this.timestampString = timestampString;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.literal("DATETIME '" + timestampString.toString() + "'");
+ }
+ }
+
private static class SqlByteStringLiteral extends SqlLiteral {
SqlByteStringLiteral(BitString bytes, SqlParserPos pos) {
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
index 1b1641f..0519798 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.sql;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.HashMap;
@@ -368,22 +369,135 @@
}
@Test
- public void testNullDatetimeFields() {
+ public void testDatetimeFields() {
Instant current = new Instant(1561671380000L); // Long value corresponds to 27/06/2019
Schema dateTimeFieldSchema =
Schema.builder()
.addField("dateTimeField", FieldType.DATETIME)
.addNullableField("nullableDateTimeField", FieldType.DATETIME)
- .addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME))
- .addNullableField("nullableTimeTypeField", FieldType.logicalType(SqlTypes.TIME))
+ .build();
+
+ Row dateTimeRow = Row.withSchema(dateTimeFieldSchema).addValues(current, null).build();
+
+ PCollection<Row> outputRow =
+ pipeline
+ .apply(Create.of(dateTimeRow))
+ .setRowSchema(dateTimeFieldSchema)
+ .apply(
+ SqlTransform.query(
+ "select EXTRACT(YEAR from dateTimeField) as yyyy, "
+ + " EXTRACT(YEAR from nullableDateTimeField) as year_with_null, "
+ + " EXTRACT(MONTH from dateTimeField) as mm, "
+ + " EXTRACT(MONTH from nullableDateTimeField) as month_with_null "
+ + " from PCOLLECTION"));
+
+ Schema outputRowSchema =
+ Schema.builder()
+ .addField("yyyy", FieldType.INT64)
+ .addNullableField("year_with_null", FieldType.INT64)
+ .addField("mm", FieldType.INT64)
+ .addNullableField("month_with_null", FieldType.INT64)
+ .build();
+
+ PAssert.that(outputRow)
+ .containsInAnyOrder(
+ Row.withSchema(outputRowSchema).addValues(2019L, null, 06L, null).build());
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testSqlLogicalTypeDateFields() {
+ Schema dateTimeFieldSchema =
+ Schema.builder()
.addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE))
.addNullableField("nullableDateTypeField", FieldType.logicalType(SqlTypes.DATE))
.build();
+ Row dateRow =
+ Row.withSchema(dateTimeFieldSchema).addValues(LocalDate.of(2019, 6, 27), null).build();
+
+ PCollection<Row> outputRow =
+ pipeline
+ .apply(Create.of(dateRow))
+ .setRowSchema(dateTimeFieldSchema)
+ .apply(
+ SqlTransform.query(
+ "select EXTRACT(DAY from dateTypeField) as dd, "
+ + " EXTRACT(DAY from nullableDateTypeField) as day_with_null, "
+ + " dateTypeField + interval '1' day as date_with_day_added, "
+ + " nullableDateTypeField + interval '1' day as day_added_with_null "
+ + " from PCOLLECTION"));
+
+ Schema outputRowSchema =
+ Schema.builder()
+ .addField("dd", FieldType.INT64)
+ .addNullableField("day_with_null", FieldType.INT64)
+ .addField("date_with_day_added", FieldType.logicalType(SqlTypes.DATE))
+ .addNullableField("day_added_with_null", FieldType.logicalType(SqlTypes.DATE))
+ .build();
+
+ PAssert.that(outputRow)
+ .containsInAnyOrder(
+ Row.withSchema(outputRowSchema)
+ .addValues(27L, null, LocalDate.of(2019, 6, 28), null)
+ .build());
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testSqlLogicalTypeTimeFields() {
+ Schema dateTimeFieldSchema =
+ Schema.builder()
+ .addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME))
+ .addNullableField("nullableTimeTypeField", FieldType.logicalType(SqlTypes.TIME))
+ .build();
+
+ Row timeRow =
+ Row.withSchema(dateTimeFieldSchema).addValues(LocalTime.of(1, 0, 0), null).build();
+
+ PCollection<Row> outputRow =
+ pipeline
+ .apply(Create.of(timeRow))
+ .setRowSchema(dateTimeFieldSchema)
+ .apply(
+ SqlTransform.query(
+ "select timeTypeField + interval '1' hour as time_with_hour_added, "
+ + " nullableTimeTypeField + interval '1' hour as hour_added_with_null, "
+ + " timeTypeField - INTERVAL '60' SECOND as time_with_seconds_added, "
+ + " nullableTimeTypeField - INTERVAL '60' SECOND as seconds_added_with_null "
+ + " from PCOLLECTION"));
+
+ Schema outputRowSchema =
+ Schema.builder()
+ .addField("time_with_hour_added", FieldType.logicalType(SqlTypes.TIME))
+ .addNullableField("hour_added_with_null", FieldType.logicalType(SqlTypes.TIME))
+ .addField("time_with_seconds_added", FieldType.logicalType(SqlTypes.TIME))
+ .addNullableField("seconds_added_with_null", FieldType.logicalType(SqlTypes.TIME))
+ .build();
+
+ PAssert.that(outputRow)
+ .containsInAnyOrder(
+ Row.withSchema(outputRowSchema)
+ .addValues(LocalTime.of(2, 0, 0), null, LocalTime.of(0, 59, 0), null)
+ .build());
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testSqlLogicalTypeDatetimeFields() {
+ Schema dateTimeFieldSchema =
+ Schema.builder()
+ .addField("dateTimeField", FieldType.logicalType(SqlTypes.DATETIME))
+ .addNullableField("nullableDateTimeField", FieldType.logicalType(SqlTypes.DATETIME))
+ .build();
+
Row dateTimeRow =
Row.withSchema(dateTimeFieldSchema)
- .addValues(current, null, LocalTime.of(1, 0, 0), null, LocalDate.of(2019, 6, 27), null)
+ .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0), null)
.build();
PCollection<Row> outputRow =
@@ -396,14 +510,14 @@
+ " EXTRACT(YEAR from nullableDateTimeField) as year_with_null, "
+ " EXTRACT(MONTH from dateTimeField) as mm, "
+ " EXTRACT(MONTH from nullableDateTimeField) as month_with_null, "
- + " timeTypeField + interval '1' hour as time_with_hour_added, "
- + " nullableTimeTypeField + interval '1' hour as hour_added_with_null, "
- + " timeTypeField - INTERVAL '60' SECOND as time_with_seconds_added, "
- + " nullableTimeTypeField - INTERVAL '60' SECOND as seconds_added_with_null, "
- + " EXTRACT(DAY from dateTypeField) as dd, "
- + " EXTRACT(DAY from nullableDateTypeField) as day_with_null, "
- + " dateTypeField + interval '1' day as date_with_day_added, "
- + " nullableDateTypeField + interval '1' day as day_added_with_null "
+ + " dateTimeField + interval '1' hour as time_with_hour_added, "
+ + " nullableDateTimeField + interval '1' hour as hour_added_with_null, "
+ + " dateTimeField - INTERVAL '60' SECOND as time_with_seconds_added, "
+ + " nullableDateTimeField - INTERVAL '60' SECOND as seconds_added_with_null, "
+ + " EXTRACT(DAY from dateTimeField) as dd, "
+ + " EXTRACT(DAY from nullableDateTimeField) as day_with_null, "
+ + " dateTimeField + interval '1' day as date_with_day_added, "
+ + " nullableDateTimeField + interval '1' day as day_added_with_null "
+ " from PCOLLECTION"));
Schema outputRowSchema =
@@ -412,31 +526,31 @@
.addNullableField("year_with_null", FieldType.INT64)
.addField("mm", FieldType.INT64)
.addNullableField("month_with_null", FieldType.INT64)
- .addField("time_with_hour_added", FieldType.logicalType(SqlTypes.TIME))
- .addNullableField("hour_added_with_null", FieldType.logicalType(SqlTypes.TIME))
- .addField("time_with_seconds_added", FieldType.logicalType(SqlTypes.TIME))
- .addNullableField("seconds_added_with_null", FieldType.logicalType(SqlTypes.TIME))
+ .addField("time_with_hour_added", FieldType.logicalType(SqlTypes.DATETIME))
+ .addNullableField("hour_added_with_null", FieldType.logicalType(SqlTypes.DATETIME))
+ .addField("time_with_seconds_added", FieldType.logicalType(SqlTypes.DATETIME))
+ .addNullableField("seconds_added_with_null", FieldType.logicalType(SqlTypes.DATETIME))
.addField("dd", FieldType.INT64)
.addNullableField("day_with_null", FieldType.INT64)
- .addField("date_with_day_added", FieldType.logicalType(SqlTypes.DATE))
- .addNullableField("day_added_with_null", FieldType.logicalType(SqlTypes.DATE))
+ .addField("date_with_day_added", FieldType.logicalType(SqlTypes.DATETIME))
+ .addNullableField("day_added_with_null", FieldType.logicalType(SqlTypes.DATETIME))
.build();
PAssert.that(outputRow)
.containsInAnyOrder(
Row.withSchema(outputRowSchema)
.addValues(
- 2019L,
+ 2008L,
null,
- 06L,
+ 12L,
null,
- LocalTime.of(2, 0, 0),
+ LocalDateTime.of(2008, 12, 25, 16, 30, 0),
null,
- LocalTime.of(0, 59, 0),
+ LocalDateTime.of(2008, 12, 25, 15, 29, 0),
null,
- 27L,
+ 25L,
null,
- LocalDate.of(2019, 6, 28),
+ LocalDateTime.of(2008, 12, 26, 15, 30, 0),
null)
.build());
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 75e8a08..c2afc5d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -23,6 +23,7 @@
import com.google.auto.service.AutoService;
import java.sql.Timestamp;
+import java.util.Arrays;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
@@ -30,6 +31,7 @@
import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -101,6 +103,29 @@
pipeline.run().waitUntilFinish();
}
+ @Test
+ public void testListUdf() throws Exception {
+ Schema resultType1 = Schema.builder().addArrayField("array_field", FieldType.INT64).build();
+ Row row1 = Row.withSchema(resultType1).addValue(Arrays.asList(1L)).build();
+ String sql1 = "SELECT test_array(1)";
+ PCollection<Row> result1 =
+ boundedInput1.apply(
+ "testArrayUdf",
+ SqlTransform.query(sql1).registerUdf("test_array", TestReturnTypeList.class));
+ PAssert.that(result1).containsInAnyOrder(row1);
+
+ Schema resultType2 = Schema.builder().addInt32Field("int_field").build();
+ Row row2 = Row.withSchema(resultType2).addValue(3).build();
+ String sql2 = "select array_length(ARRAY[1, 2, 3])";
+ PCollection<Row> result2 =
+ boundedInput1.apply(
+ "testArrayUdf2",
+ SqlTransform.query(sql2).registerUdf("array_length", TestListLength.class));
+ PAssert.that(result2).containsInAnyOrder(row2);
+
+ pipeline.run().waitUntilFinish();
+ }
+
/** Test that an indirect subclass of a {@link CombineFn} works as a UDAF. BEAM-3777 */
@Test
public void testUdafMultiLevelDescendent() {
@@ -347,6 +372,20 @@
}
}
+ /** A UDF to test support of array as return type. */
+ public static final class TestReturnTypeList implements BeamSqlUdf {
+ public static java.util.List<Long> eval(Long i) {
+ return Arrays.asList(i);
+ }
+ }
+
+ /** A UDF to test support of array as argument type. */
+ public static final class TestListLength implements BeamSqlUdf {
+ public static Integer eval(java.util.List<Long> i) {
+ return i.size();
+ }
+ }
+
/**
* UDF to test support for {@link
* org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.TableMacro}.
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
index f0854bc..d4819fc 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
@@ -19,6 +19,7 @@
import java.math.BigDecimal;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
@@ -51,6 +52,7 @@
.add("col_string_varchar", SqlTypeName.VARCHAR)
.add("col_time", SqlTypeName.TIME)
.add("col_date", SqlTypeName.DATE)
+ .add("col_timestamp_with_local_time_zone", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
.add("col_timestamp", SqlTypeName.TIMESTAMP)
.add("col_boolean", SqlTypeName.BOOLEAN)
.build();
@@ -70,6 +72,7 @@
"hello",
LocalTime.now(),
LocalDate.now(),
+ LocalDateTime.now(),
DateTime.now().toInstant(),
true)
.build();
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
index cc25380..5186099 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
@@ -20,11 +20,13 @@
import com.google.zetasql.CivilTimeEncoder;
import com.google.zetasql.Value;
import io.grpc.Status;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnit;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.DateString;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimeString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -161,10 +163,17 @@
}
}
- public static TimeString convertTimeValueToTimeString(Value value) {
- LocalTime localTime = CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
- return new TimeString(localTime.getHour(), localTime.getMinute(), localTime.getSecond())
- .withNanos(localTime.getNano());
+ public static TimestampString convertDateTimeValueToTimeStampString(Value value) {
+ LocalDateTime dateTime =
+ CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
+ return new TimestampString(
+ dateTime.getYear(),
+ dateTime.getMonthValue(),
+ dateTime.getDayOfMonth(),
+ dateTime.getHour(),
+ dateTime.getMinute(),
+ dateTime.getSecond())
+ .withNanos(dateTime.getNano());
}
// dates are represented as an int32 value, indicating the offset
@@ -174,6 +183,12 @@
return DateString.fromDaysSinceEpoch(value.getDateValue());
}
+ public static TimeString convertTimeValueToTimeString(Value value) {
+ LocalTime localTime = CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
+ return new TimeString(localTime.getHour(), localTime.getMinute(), localTime.getSecond())
+ .withNanos(localTime.getNano());
+ }
+
public static Value parseDateToValue(String dateString) {
DateTime dateTime = parseDate(dateString);
return Value.createDateValue((int) (dateTime.getMillis() / MILLIS_PER_DAY));
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
index 6ccfb26..eb8d51f 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
@@ -197,23 +197,23 @@
// Time functions
FunctionSignatureId.FN_CURRENT_DATE, // current_date
- // FunctionSignatureId.FN_CURRENT_DATETIME, // current_datetime
+ FunctionSignatureId.FN_CURRENT_DATETIME, // current_datetime
FunctionSignatureId.FN_CURRENT_TIME, // current_time
FunctionSignatureId.FN_CURRENT_TIMESTAMP, // current_timestamp
FunctionSignatureId.FN_DATE_ADD_DATE, // date_add
- // FunctionSignatureId.FN_DATETIME_ADD, // datetime_add
+ FunctionSignatureId.FN_DATETIME_ADD, // datetime_add
FunctionSignatureId.FN_TIME_ADD, // time_add
FunctionSignatureId.FN_TIMESTAMP_ADD, // timestamp_add
FunctionSignatureId.FN_DATE_DIFF_DATE, // date_diff
- // FunctionSignatureId.FN_DATETIME_DIFF, // datetime_diff
+ FunctionSignatureId.FN_DATETIME_DIFF, // datetime_diff
FunctionSignatureId.FN_TIME_DIFF, // time_diff
FunctionSignatureId.FN_TIMESTAMP_DIFF, // timestamp_diff
FunctionSignatureId.FN_DATE_SUB_DATE, // date_sub
- // FunctionSignatureId.FN_DATETIME_SUB, // datetime_sub
+ FunctionSignatureId.FN_DATETIME_SUB, // datetime_sub
FunctionSignatureId.FN_TIME_SUB, // time_sub
FunctionSignatureId.FN_TIMESTAMP_SUB, // timestamp_sub
FunctionSignatureId.FN_DATE_TRUNC_DATE, // date_trunc
- // FunctionSignatureId.FN_DATETIME_TRUNC, // datetime_trunc
+ FunctionSignatureId.FN_DATETIME_TRUNC, // datetime_trunc
FunctionSignatureId.FN_TIME_TRUNC, // time_trunc
FunctionSignatureId.FN_TIMESTAMP_TRUNC, // timestamp_trunc
FunctionSignatureId.FN_DATE_FROM_UNIX_DATE, // date_from_unix_date
@@ -234,19 +234,18 @@
FunctionSignatureId.FN_UNIX_MILLIS_FROM_TIMESTAMP,
// FunctionSignatureId.FN_UNIX_MICROS_FROM_TIMESTAMP,
FunctionSignatureId.FN_DATE_FROM_TIMESTAMP, // date
- // FunctionSignatureId.FN_DATE_FROM_DATETIME, // date
+ FunctionSignatureId.FN_DATE_FROM_DATETIME, // date
FunctionSignatureId.FN_DATE_FROM_YEAR_MONTH_DAY, // date
FunctionSignatureId.FN_TIMESTAMP_FROM_STRING, // timestamp
FunctionSignatureId.FN_TIMESTAMP_FROM_DATE, // timestamp
- // FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME, // timestamp
+ FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME, // timestamp
FunctionSignatureId.FN_TIME_FROM_HOUR_MINUTE_SECOND, // time
FunctionSignatureId.FN_TIME_FROM_TIMESTAMP, // time
- // FunctionSignatureId.FN_TIME_FROM_DATETIME, // time
- // FunctionSignatureId.FN_DATETIME_FROM_DATE_AND_TIME, // datetime
- // FunctionSignatureId.FN_DATETIME_FROM_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND, // datetime
- // FunctionSignatureId.FN_DATETIME_FROM_TIMESTAMP, // datetime
- // FunctionSignatureId.FN_DATETIME_FROM_DATE, // datetime
-
+ FunctionSignatureId.FN_TIME_FROM_DATETIME, // time
+ FunctionSignatureId.FN_DATETIME_FROM_DATE_AND_TIME, // datetime
+ FunctionSignatureId.FN_DATETIME_FROM_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND, // datetime
+ FunctionSignatureId.FN_DATETIME_FROM_TIMESTAMP, // datetime
+ FunctionSignatureId.FN_DATETIME_FROM_DATE, // datetime
FunctionSignatureId.FN_STRING_FROM_TIMESTAMP, // string
// Signatures for extracting date parts, taking a date/timestamp
@@ -258,23 +257,24 @@
// Signatures specific to extracting the DATE date part from a DATETIME or a
// TIMESTAMP.
- // FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
+ FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
FunctionSignatureId.FN_EXTRACT_DATE_FROM_TIMESTAMP, // $extract_date
// Signatures specific to extracting the TIME date part from a DATETIME or a
// TIMESTAMP.
- // FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
+ FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
FunctionSignatureId.FN_EXTRACT_TIME_FROM_TIMESTAMP, // $extract_time
// Signature specific to extracting the DATETIME date part from a TIMESTAMP.
- // FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime
+ FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime
+ // Signature for formatting and parsing
FunctionSignatureId.FN_FORMAT_DATE, // format_date
- // FunctionSignatureId.FN_FORMAT_DATETIME, // format_datetime
+ FunctionSignatureId.FN_FORMAT_DATETIME, // format_datetime
FunctionSignatureId.FN_FORMAT_TIME, // format_time
FunctionSignatureId.FN_FORMAT_TIMESTAMP, // format_timestamp
FunctionSignatureId.FN_PARSE_DATE, // parse_date
- // FunctionSignatureId.FN_PARSE_DATETIME, // parse_datetime
+ FunctionSignatureId.FN_PARSE_DATETIME, // parse_datetime
FunctionSignatureId.FN_PARSE_TIME, // parse_time
FunctionSignatureId.FN_PARSE_TIMESTAMP, // parse_timestamp
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
index 073aa41..dbab34a 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
@@ -28,6 +28,7 @@
import com.google.zetasql.ZetaSQLType.TypeKind;
import java.math.BigDecimal;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
@@ -36,6 +37,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
@@ -45,7 +47,6 @@
* Utility methods for ZetaSQL <=> Beam translation.
*
* <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, GEOGRAPHY
- * TODO[BEAM-10238]: support ZetaSQL types: TIME, DATETIME, NUMERIC
*/
@Internal
public final class ZetaSqlBeamTranslationUtils {
@@ -106,6 +107,9 @@
} else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
// Time type
return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
+ } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+ // DateTime type
+ return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
} else {
throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
}
@@ -184,6 +188,20 @@
} else { // input type
return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object));
}
+ } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+ // DateTime value
+ LocalDateTime datetime;
+ if (object instanceof Row) { // base type
+ datetime =
+ LocalDateTime.of(
+ LocalDate.ofEpochDay(((Row) object).getInt64(DateTime.DATE_FIELD_NAME)),
+ LocalTime.ofNanoOfDay(((Row) object).getInt64(DateTime.TIME_FIELD_NAME)));
+ } else { // input type
+ datetime = (LocalDateTime) object;
+ }
+ // TODO[BEAM-10611]: Create ZetaSQL Value.createDatetimeValue(LocalDateTime) function
+ return Value.createDatetimeValue(
+ CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano());
} else {
throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
}
@@ -208,6 +226,8 @@
return FieldType.logicalType(SqlTypes.DATE).withNullable(true);
case TYPE_TIME:
return FieldType.logicalType(SqlTypes.TIME).withNullable(true);
+ case TYPE_DATETIME:
+ return FieldType.logicalType(SqlTypes.DATETIME).withNullable(true);
case TYPE_TIMESTAMP:
return FieldType.DATETIME.withNullable(true);
case TYPE_ARRAY:
@@ -314,6 +334,9 @@
} else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
// Time value
return CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
+ } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+ // DateTime value
+ return CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
} else {
throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
index d8394ab..81bc142 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
@@ -20,6 +20,7 @@
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BOOL;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BYTES;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATE;
+import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DOUBLE;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC;
@@ -46,7 +47,6 @@
* Utility methods for ZetaSQL <=> Calcite translation.
*
* <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, GEOGRAPHY
- * TODO[BEAM-10238]: support ZetaSQL types: TIME, DATETIME, NUMERIC
*/
@Internal
public final class ZetaSqlCalciteTranslationUtils {
@@ -72,6 +72,8 @@
return TypeFactory.createSimpleType(TYPE_DATE);
case TIME:
return TypeFactory.createSimpleType(TYPE_TIME);
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return TypeFactory.createSimpleType(TYPE_DATETIME);
case TIMESTAMP:
return TypeFactory.createSimpleType(TYPE_TIMESTAMP);
case ARRAY:
@@ -107,6 +109,8 @@
return SqlTypeName.DATE;
case TYPE_TIME:
return SqlTypeName.TIME;
+ case TYPE_DATETIME:
+ return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
case TYPE_TIMESTAMP:
// TODO: handle timestamp with time zone.
return SqlTypeName.TIMESTAMP;
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
index 19f18cd..fd5651f 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
@@ -24,6 +24,7 @@
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_STRING;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIMESTAMP;
+import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.convertDateTimeValueToTimeStampString;
import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.convertDateValueToDateString;
import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.convertTimeValueToTimeString;
import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.safeMicrosToMillis;
@@ -545,7 +546,7 @@
case TYPE_TIMESTAMP:
case TYPE_DATE:
case TYPE_TIME:
- // case TYPE_DATETIME:
+ case TYPE_DATETIME:
case TYPE_BYTES:
case TYPE_ARRAY:
case TYPE_STRUCT:
@@ -709,7 +710,7 @@
case TYPE_TIMESTAMP:
case TYPE_DATE:
case TYPE_TIME:
- // case TYPE_DATETIME:
+ case TYPE_DATETIME:
case TYPE_BYTES:
ret = convertSimpleValueToRexNode(type.getKind(), value);
break;
@@ -792,9 +793,7 @@
rexBuilder()
.makeCall(
SqlOperators.createZetaSqlFunction(wrapperFun, returnType.getSqlTypeName()),
- ImmutableList.of(
- rexBuilder()
- .makeApproxLiteral(new BigDecimal(Math.random()), returnType)));
+ rexBuilder().makeApproxLiteral(new BigDecimal(Math.random()), returnType));
;
} else {
ret =
@@ -823,12 +822,11 @@
SqlOperators.createZetaSqlFunction(
BeamBigQuerySqlDialect.NUMERIC_LITERAL_FUNCTION,
ZetaSqlCalciteTranslationUtils.toCalciteTypeName(kind)),
- ImmutableList.of(
- rexBuilder()
- .makeExactLiteral(
- value.getNumericValue(),
- ZetaSqlCalciteTranslationUtils.toSimpleRelDataType(
- kind, rexBuilder()))));
+ rexBuilder()
+ .makeExactLiteral(
+ value.getNumericValue(),
+ ZetaSqlCalciteTranslationUtils.toSimpleRelDataType(
+ kind, rexBuilder())));
break;
case TYPE_TIMESTAMP:
ret =
@@ -850,6 +848,15 @@
// TODO: Doing micro to mills truncation, need to throw exception.
ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false);
break;
+ case TYPE_DATETIME:
+ ret =
+ rexBuilder()
+ .makeTimestampWithLocalTimeZoneLiteral(
+ convertDateTimeValueToTimeStampString(value),
+ typeFactory()
+ .getTypeSystem()
+ .getMaxPrecision(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE));
+ break;
case TYPE_BYTES:
ret = rexBuilder().makeBinaryLiteral(new ByteString(value.getBytesValue().toByteArray()));
break;
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java
index 25f40ed..9137b94 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java
@@ -17,17 +17,13 @@
*/
package org.apache.beam.sdk.extensions.sql.zetasql.translation;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME;
import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkNotNull;
-import com.google.zetasql.ZetaSQLType.TypeKind;
-import com.google.zetasql.resolvedast.ResolvedColumn;
import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedTableScan;
import java.util.List;
import java.util.Properties;
import org.apache.beam.sdk.extensions.sql.zetasql.TableResolution;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteSchema;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
@@ -44,16 +40,12 @@
/** Converts table scan. */
class TableScanConverter extends RelConverter<ResolvedTableScan> {
- private static final ImmutableSet<TypeKind> UNSUPPORTED_DATA_TYPES =
- ImmutableSet.of(TYPE_DATETIME);
-
TableScanConverter(ConversionContext context) {
super(context);
}
@Override
public RelNode convert(ResolvedTableScan zetaNode, List<RelNode> inputs) {
- checkTableScanSchema(zetaNode.getColumnList());
List<String> tablePath = getTablePath(zetaNode.getTable());
@@ -115,15 +107,4 @@
}
};
}
-
- private void checkTableScanSchema(List<ResolvedColumn> columnList) {
- if (columnList != null) {
- for (ResolvedColumn resolvedColumn : columnList) {
- if (UNSUPPORTED_DATA_TYPES.contains(resolvedColumn.getType().getKind())) {
- throw new UnsupportedOperationException(
- "Does not support " + UNSUPPORTED_DATA_TYPES + " types in source tables");
- }
- }
- }
- }
}
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
index a761d9f..2edb4d0 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
@@ -21,6 +21,7 @@
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
@@ -225,47 +226,59 @@
public static final TestBoundedTable TABLE_EMPTY =
TestBoundedTable.of(Schema.builder().addInt64Field("ColId").addStringField("Value").build());
- private static final Schema TABLE_WTH_MAP_SCHEMA =
+ private static final Schema TABLE_WITH_MAP_SCHEMA =
Schema.builder()
.addMapField("map_field", FieldType.STRING, FieldType.STRING)
.addRowField("row_field", structSchema)
.build();
public static final TestBoundedTable TABLE_WITH_MAP =
- TestBoundedTable.of(TABLE_WTH_MAP_SCHEMA)
+ TestBoundedTable.of(TABLE_WITH_MAP_SCHEMA)
.addRows(
ImmutableMap.of("MAP_KEY_1", "MAP_VALUE_1"),
Row.withSchema(structSchema).addValues(1L, "data1").build());
- private static final Schema TABLE_WTH_DATE_SCHEMA =
+ private static final Schema TABLE_WITH_DATE_SCHEMA =
Schema.builder()
.addLogicalTypeField("date_field", SqlTypes.DATE)
.addStringField("str_field")
.build();
public static final TestBoundedTable TABLE_WITH_DATE =
- TestBoundedTable.of(TABLE_WTH_DATE_SCHEMA)
+ TestBoundedTable.of(TABLE_WITH_DATE_SCHEMA)
.addRows(LocalDate.of(2008, 12, 25), "s")
.addRows(LocalDate.of(2020, 4, 7), "s");
- private static final Schema TABLE_WTH_TIME_SCHEMA =
+ private static final Schema TABLE_WITH_TIME_SCHEMA =
Schema.builder()
.addLogicalTypeField("time_field", SqlTypes.TIME)
.addStringField("str_field")
.build();
public static final TestBoundedTable TABLE_WITH_TIME =
- TestBoundedTable.of(TABLE_WTH_TIME_SCHEMA)
+ TestBoundedTable.of(TABLE_WITH_TIME_SCHEMA)
.addRows(LocalTime.of(15, 30, 0), "s")
.addRows(LocalTime.of(23, 35, 59), "s");
- private static final Schema TABLE_WTH_NUMERIC_SCHEMA =
+ private static final Schema TABLE_WITH_NUMERIC_SCHEMA =
Schema.builder().addDecimalField("numeric_field").addStringField("str_field").build();
+
public static final TestBoundedTable TABLE_WITH_NUMERIC =
- TestBoundedTable.of(TABLE_WTH_NUMERIC_SCHEMA)
+ TestBoundedTable.of(TABLE_WITH_NUMERIC_SCHEMA)
.addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"), "str1")
.addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"), "str2")
.addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"), "str3");
+ private static final Schema TABLE_WITH_DATETIME_SCHEMA =
+ Schema.builder()
+ .addLogicalTypeField("datetime_field", SqlTypes.DATETIME)
+ .addStringField("str_field")
+ .build();
+
+ public static final TestBoundedTable TABLE_WITH_DATETIME =
+ TestBoundedTable.of(TABLE_WITH_DATETIME_SCHEMA)
+ .addRows(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000), "s")
+ .addRows(LocalDateTime.of(2012, 10, 6, 11, 45, 0).withNano(987654000), "s");
+
private static byte[] stringToBytes(String s) {
return s.getBytes(StandardCharsets.UTF_8);
}
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
index 0510590..7b450fb 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
@@ -28,6 +28,7 @@
import com.google.zetasql.Value;
import com.google.zetasql.ZetaSQLType.TypeKind;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import org.apache.beam.sdk.schemas.Schema;
@@ -54,12 +55,12 @@
.addField("f_string", FieldType.STRING)
.addField("f_bytes", FieldType.BYTES)
.addLogicalTypeField("f_date", SqlTypes.DATE)
- // .addLogicalTypeField("f_datetime", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_datetime", SqlTypes.DATETIME)
.addLogicalTypeField("f_time", SqlTypes.TIME)
.addField("f_timestamp", FieldType.DATETIME)
.addArrayField("f_array", FieldType.DOUBLE)
.addRowField("f_struct", TEST_INNER_SCHEMA)
- // .addLogicalTypeField("f_numeric", SqlTypes.NUMERIC)
+ .addField("f_numeric", FieldType.DECIMAL)
.addNullableField("f_null", FieldType.INT64)
.build();
@@ -83,10 +84,12 @@
new StructField("f_string", TypeFactory.createSimpleType(TypeKind.TYPE_STRING)),
new StructField("f_bytes", TypeFactory.createSimpleType(TypeKind.TYPE_BYTES)),
new StructField("f_date", TypeFactory.createSimpleType(TypeKind.TYPE_DATE)),
+ new StructField("f_datetime", TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME)),
new StructField("f_time", TypeFactory.createSimpleType(TypeKind.TYPE_TIME)),
new StructField("f_timestamp", TypeFactory.createSimpleType(TypeKind.TYPE_TIMESTAMP)),
new StructField("f_array", TEST_INNER_ARRAY_TYPE),
new StructField("f_struct", TEST_INNER_STRUCT_TYPE),
+ new StructField("f_numeric", TypeFactory.createSimpleType(TypeKind.TYPE_NUMERIC)),
new StructField("f_null", TypeFactory.createSimpleType(TypeKind.TYPE_INT64))));
private static final Row TEST_ROW =
@@ -97,10 +100,12 @@
.addValue("Hello")
.addValue(new byte[] {0x11, 0x22})
.addValue(LocalDate.of(2020, 6, 4))
+ .addValue(LocalDateTime.of(2008, 12, 25, 15, 30, 0))
.addValue(LocalTime.of(15, 30, 45))
.addValue(Instant.ofEpochMilli(12345678L))
.addArray(3.0, 6.5)
.addValue(Row.withSchema(TEST_INNER_SCHEMA).addValues(0L, "world").build())
+ .addValue(ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"))
.addValue(null)
.build();
@@ -114,6 +119,10 @@
Value.createStringValue("Hello"),
Value.createBytesValue(ByteString.copyFrom(new byte[] {0x11, 0x22})),
Value.createDateValue((int) LocalDate.of(2020, 6, 4).toEpochDay()),
+ Value.createDatetimeValue(
+ CivilTimeEncoder.encodePacked64DatetimeSeconds(
+ LocalDateTime.of(2008, 12, 25, 15, 30, 0)),
+ LocalDateTime.of(2008, 12, 25, 15, 30, 0).getNano()),
Value.createTimeValue(
CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.of(15, 30, 45))),
Value.createTimestampValueFromUnixMicros(12345678000L),
@@ -123,6 +132,7 @@
Value.createStructValue(
TEST_INNER_STRUCT_TYPE,
Arrays.asList(Value.createInt64Value(0L), Value.createStringValue("world"))),
+ Value.createNumericValue(ZetaSqlTypesUtils.bigDecimalAsNumeric("12346")),
Value.createNullValue(TypeFactory.createSimpleType(TypeKind.TYPE_INT64))));
@Test
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index e9c51c7..d148012 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -2942,7 +2942,7 @@
}
@Test
- @Ignore("BEAM-9515")
+ @Ignore("[BEAM-9515] ArrayScanToUncollectConverter Unnest does not support sub-queries")
public void testUNNESTExpression() {
String sql = "SELECT * FROM UNNEST(ARRAY(SELECT Value FROM KeyValue));";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
index 6d9ba67..483a9c1 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
@@ -61,6 +61,7 @@
testBoundedTableMap.put("table_with_date", TestInput.TABLE_WITH_DATE);
testBoundedTableMap.put("table_with_time", TestInput.TABLE_WITH_TIME);
testBoundedTableMap.put("table_with_numeric", TestInput.TABLE_WITH_NUMERIC);
+ testBoundedTableMap.put("table_with_datetime", TestInput.TABLE_WITH_DATETIME);
testBoundedTableMap.put(
"table_with_struct_ts_string", TestInput.TABLE_WITH_STRUCT_TIMESTAMP_STRING);
testBoundedTableMap.put("streaming_sql_test_table_a", TestInput.STREAMING_SQL_TABLE_A);
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
index 6789d63..109ca1e 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
@@ -23,9 +23,11 @@
import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithTimeZone;
import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithUTCTimeZone;
+import com.google.zetasql.CivilTimeEncoder;
import com.google.zetasql.Value;
import com.google.zetasql.ZetaSQLType.TypeKind;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -40,7 +42,6 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -148,7 +149,7 @@
+ " EXTRACT(ISOYEAR FROM date) AS isoyear,\n"
+ " EXTRACT(YEAR FROM date) AS year,\n"
+ " EXTRACT(ISOWEEK FROM date) AS isoweek,\n"
- // TODO[BEAM-9178]: Add tests for DATE_TRUNC and EXTRACT with "week with weekday" date
+ // TODO[BEAM-10606]: Add tests for DATE_TRUNC and EXTRACT with "week with weekday" date
// parts once they are supported
// + " EXTRACT(WEEK FROM date) AS week,\n"
+ " EXTRACT(MONTH FROM date) AS month,\n"
@@ -219,6 +220,22 @@
}
@Test
+ public void testDateFromDateTime() {
+ String sql = "SELECT DATE(DATETIME '2008-12-25 15:30:00.123456')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build())
+ .addValues(LocalDate.of(2008, 12, 25))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
public void testDateAdd() {
String sql =
"SELECT "
@@ -580,6 +597,22 @@
}
@Test
+ public void testTimeFromDateTime() {
+ String sql = "SELECT TIME(DATETIME '2008-12-25 15:30:00.123456')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build())
+ .addValues(LocalTime.of(15, 30, 0, 123456000))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
public void testTimeAdd() {
String sql =
"SELECT "
@@ -753,13 +786,420 @@
/////////////////////////////////////////////////////////////////////////////
@Test
- @Ignore("Does not support Datetime literal.")
- public void testDatetimeLiteral() {
- String sql = "SELECT DATETIME '2018-01-01 05:30:00.334'";
+ public void testDateTimeLiteral() {
+ String sql = "SELECT DATETIME '2008-12-25 15:30:00.123456'";
+
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("Unsupported ResolvedLiteral type: DATETIME");
- zetaSQLQueryPlanner.convertToBeamRel(sql);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+ .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeColumn() {
+ String sql = "SELECT FORMAT_DATETIME('%D %T %E6S', datetime_field) FROM table_with_datetime";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addStringField("f_datetime_str").build())
+ .addValues("12/25/08 15:30:00 00.123456")
+ .build(),
+ Row.withSchema(Schema.builder().addStringField("f_datetime_str").build())
+ .addValues("10/06/12 11:45:00 00.987654")
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testGroupByDateTime() {
+ String sql = "SELECT datetime_field, COUNT(*) FROM table_with_datetime GROUP BY datetime_field";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ final Schema schema =
+ Schema.builder()
+ .addLogicalTypeField("datetime_field", SqlTypes.DATETIME)
+ .addInt64Field("count")
+ .build();
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(schema)
+ .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000), 1L)
+ .build(),
+ Row.withSchema(schema)
+ .addValues(LocalDateTime.of(2012, 10, 6, 11, 45, 0).withNano(987654000), 1L)
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testAggregateOnDateTime() {
+ String sql = "SELECT MAX(datetime_field) FROM table_with_datetime GROUP BY str_field";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addLogicalTypeField("datetime_field", SqlTypes.DATETIME)
+ .build())
+ .addValues(LocalDateTime.of(2012, 10, 6, 11, 45, 0).withNano(987654000))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ // TODO[BEAM-9166]: Add a test for CURRENT_DATETIME function ("SELECT CURRENT_DATETIME()")
+
+ @Test
+ public void testExtractFromDateTime() {
+ String sql =
+ "SELECT "
+ + "EXTRACT(YEAR FROM DATETIME '2008-12-25 15:30:00') as year, "
+ + "EXTRACT(QUARTER FROM DATETIME '2008-12-25 15:30:00') as quarter, "
+ + "EXTRACT(MONTH FROM DATETIME '2008-12-25 15:30:00') as month, "
+ // TODO[BEAM-10606]: Add tests for DATETIME_TRUNC and EXTRACT with "week with weekday"
+ // date parts once they are supported
+ // + "EXTRACT(WEEK FROM DATETIME '2008-12-25 15:30:00') as week, "
+ + "EXTRACT(DAY FROM DATETIME '2008-12-25 15:30:00') as day, "
+ + "EXTRACT(DAYOFWEEK FROM DATETIME '2008-12-25 15:30:00') as dayofweek, "
+ + "EXTRACT(DAYOFYEAR FROM DATETIME '2008-12-25 15:30:00') as dayofyear, "
+ + "EXTRACT(HOUR FROM DATETIME '2008-12-25 15:30:00.123456') as hour, "
+ + "EXTRACT(MINUTE FROM DATETIME '2008-12-25 15:30:00.123456') as minute, "
+ + "EXTRACT(SECOND FROM DATETIME '2008-12-25 15:30:00.123456') as second, "
+ + "EXTRACT(MILLISECOND FROM DATETIME '2008-12-25 15:30:00.123456') as millisecond, "
+ + "EXTRACT(MICROSECOND FROM DATETIME '2008-12-25 15:30:00.123456') as microsecond, "
+ + "EXTRACT(DATE FROM DATETIME '2008-12-25 15:30:00.123456') as date, "
+ + "EXTRACT(TIME FROM DATETIME '2008-12-25 15:30:00.123456') as time ";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ final Schema schema =
+ Schema.builder()
+ .addInt64Field("year")
+ .addInt64Field("quarter")
+ .addInt64Field("month")
+ // .addInt64Field("week")
+ .addInt64Field("day")
+ .addInt64Field("dayofweek")
+ .addInt64Field("dayofyear")
+ .addInt64Field("hour")
+ .addInt64Field("minute")
+ .addInt64Field("second")
+ .addInt64Field("millisecond")
+ .addInt64Field("microsecond")
+ .addLogicalTypeField("date", SqlTypes.DATE)
+ .addLogicalTypeField("time", SqlTypes.TIME)
+ .build();
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(schema)
+ .addValues(
+ 2008L,
+ 4L,
+ 12L,
+ // 52L,
+ 25L,
+ 5L,
+ 360L,
+ 15L,
+ 30L,
+ 0L,
+ 123L,
+ 123456L,
+ LocalDate.of(2008, 12, 25),
+ LocalTime.of(15, 30, 0, 123456000))
+ .build());
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeFromDateAndTime() {
+ String sql = "SELECT DATETIME(DATE '2008-12-25', TIME '15:30:00.123456')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+ .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeFromDate() {
+ String sql = "SELECT DATETIME(DATE '2008-12-25')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+ .addValues(LocalDateTime.of(2008, 12, 25, 0, 0, 0))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeFromYearMonthDayHourMinuteSecond() {
+ String sql = "SELECT DATETIME(2008, 12, 25, 15, 30, 0)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+ .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeFromTimestamp() {
+ String sql = "SELECT DATETIME(TIMESTAMP '2008-12-25 15:30:00+08', 'America/Los_Angeles')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+ .addValues(LocalDateTime.of(2008, 12, 24, 23, 30, 0))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeAdd() {
+ String sql =
+ "SELECT "
+ + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MICROSECOND), "
+ + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MILLISECOND), "
+ + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 SECOND), "
+ + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MINUTE), "
+ + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 HOUR), "
+ + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 DAY), "
+ + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MONTH), "
+ + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 QUARTER), "
+ + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 YEAR) ";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addLogicalTypeField("f_time1", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time2", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time3", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time4", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time5", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time6", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time7", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time8", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time9", SqlTypes.DATETIME)
+ .build())
+ .addValues(
+ LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(10000),
+ LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(10000000),
+ LocalDateTime.of(2008, 12, 25, 15, 30, 10),
+ LocalDateTime.of(2008, 12, 25, 15, 40, 0),
+ LocalDateTime.of(2008, 12, 26, 1, 30, 0),
+ LocalDateTime.of(2009, 1, 4, 15, 30, 0),
+ LocalDateTime.of(2009, 10, 25, 15, 30, 0),
+ LocalDateTime.of(2011, 6, 25, 15, 30, 0),
+ LocalDateTime.of(2018, 12, 25, 15, 30, 0))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeAddWithParameter() {
+ String sql = "SELECT DATETIME_ADD(@p0, INTERVAL @p1 HOUR)";
+
+ LocalDateTime datetime = LocalDateTime.of(2008, 12, 25, 15, 30, 00).withNano(123456000);
+ ImmutableMap<String, Value> params =
+ ImmutableMap.of(
+ "p0",
+ Value.createDatetimeValue(
+ CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano()),
+ "p1", Value.createInt64Value(3L));
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+ .addValues(LocalDateTime.of(2008, 12, 25, 18, 30, 00).withNano(123456000))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeSub() {
+ String sql =
+ "SELECT "
+ + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MICROSECOND), "
+ + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MILLISECOND), "
+ + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 SECOND), "
+ + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MINUTE), "
+ + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 HOUR), "
+ + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 DAY), "
+ + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MONTH), "
+ + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 QUARTER), "
+ + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 YEAR) ";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addLogicalTypeField("f_time1", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time2", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time3", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time4", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time5", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time6", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time7", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time8", SqlTypes.DATETIME)
+ .addLogicalTypeField("f_time9", SqlTypes.DATETIME)
+ .build())
+ .addValues(
+ LocalDateTime.of(2008, 12, 25, 15, 29, 59).withNano(999990000),
+ LocalDateTime.of(2008, 12, 25, 15, 29, 59).withNano(990000000),
+ LocalDateTime.of(2008, 12, 25, 15, 29, 50),
+ LocalDateTime.of(2008, 12, 25, 15, 20, 0),
+ LocalDateTime.of(2008, 12, 25, 5, 30, 0),
+ LocalDateTime.of(2008, 12, 15, 15, 30, 0),
+ LocalDateTime.of(2008, 2, 25, 15, 30, 0),
+ LocalDateTime.of(2006, 6, 25, 15, 30, 0),
+ LocalDateTime.of(1998, 12, 25, 15, 30, 0))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeDiff() {
+ String sql =
+ "SELECT DATETIME_DIFF(DATETIME '2008-12-25 15:30:00', DATETIME '2008-10-25 15:30:00', DAY)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addInt64Field("f_datetime_diff").build())
+ .addValues(61L)
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeDiffNegativeResult() {
+ String sql =
+ "SELECT DATETIME_DIFF(DATETIME '2008-10-25 15:30:00', DATETIME '2008-12-25 15:30:00', DAY)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addInt64Field("f_datetime_diff").build())
+ .addValues(-61L)
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDateTimeTrunc() {
+ String sql = "SELECT DATETIME_TRUNC(DATETIME '2008-12-25 15:30:00', HOUR)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addLogicalTypeField("f_datetime_trunc", SqlTypes.DATETIME)
+ .build())
+ .addValues(LocalDateTime.of(2008, 12, 25, 15, 0, 0))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testFormatDateTime() {
+ String sql = "SELECT FORMAT_DATETIME('%D %T %E6S', DATETIME '2008-12-25 15:30:00.123456')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addStringField("f_datetime_str").build())
+ .addValues("12/25/08 15:30:00 00.123456")
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testParseDateTime() {
+ String sql = "SELECT PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S', '2008-12-25 15:30:00.123456')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+ .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
/////////////////////////////////////////////////////////////////////////////
@@ -846,7 +1286,7 @@
+ " EXTRACT(ISOYEAR FROM timestamp) AS isoyear,\n"
+ " EXTRACT(YEAR FROM timestamp) AS year,\n"
+ " EXTRACT(ISOWEEK FROM timestamp) AS isoweek,\n"
- // TODO[BEAM-9178]: Add tests for TIMESTAMP_TRUNC and EXTRACT with "week with weekday"
+ // TODO[BEAM-10606]: Add tests for TIMESTAMP_TRUNC and EXTRACT with "week with weekday"
// date parts once they are supported
// + " EXTRACT(WEEK FROM timestamp) AS week,\n"
+ " EXTRACT(MONTH FROM timestamp) AS month,\n"
@@ -926,6 +1366,23 @@
}
@Test
+ public void testExtractDateTimeFromTimestamp() {
+ String sql = "SELECT EXTRACT(DATETIME FROM TIMESTAMP '2017-05-26 12:34:56')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder().addLogicalTypeField("datetime", SqlTypes.DATETIME).build())
+ .addValues(LocalDateTime.of(2017, 5, 26, 12, 34, 56))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
public void testExtractFromTimestampAtTimeZone() {
String sql =
"WITH Timestamps AS (\n"
@@ -1028,6 +1485,45 @@
}
@Test
+ public void testTimestampFromDateTime() {
+ String sql = "SELECT TIMESTAMP(DATETIME '2008-12-25 15:30:00')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addDateTimeField("f_timestamp").build())
+ .addValues(parseTimestampWithTimeZone("2008-12-25 15:30:00+00"))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ // test default timezone works properly in query execution stage
+ public void testTimestampFromDateTimeWithDefaultTimezoneSet() {
+ String sql = "SELECT TIMESTAMP(DATETIME '2008-12-25 15:30:00')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ zetaSQLQueryPlanner.setDefaultTimezone("Asia/Shanghai");
+ pipeline
+ .getOptions()
+ .as(BeamSqlPipelineOptions.class)
+ .setZetaSqlDefaultTimezone("Asia/Shanghai");
+
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addDateTimeField("f_timestamp").build())
+ .addValues(parseTimestampWithTimeZone("2008-12-25 15:30:00+08"))
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
public void testTimestampAdd() {
String sql =
"SELECT "
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 9521443..3f449d2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -129,7 +129,7 @@
new PubsubMessage().encodeData(outgoingMessage.message().getData().toByteArray());
pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage));
if (!outgoingMessage.message().getOrderingKey().isEmpty()) {
- pubsubMessage.put("orderingKey", outgoingMessage.message().getOrderingKey());
+ pubsubMessage.setOrderingKey(outgoingMessage.message().getOrderingKey());
}
pubsubMessages.add(pubsubMessage);
}
@@ -156,6 +156,7 @@
}
@Override
+ @SuppressWarnings("ProtoFieldNullComparison")
public List<IncomingMessage> pull(
long requestTimeMsSinceEpoch,
SubscriptionPath subscription,
@@ -207,8 +208,12 @@
com.google.pubsub.v1.PubsubMessage.newBuilder();
protoMessage.setData(ByteString.copyFrom(elementBytes));
protoMessage.putAllAttributes(attributes);
- protoMessage.setOrderingKey(
- (String) pubsubMessage.getUnknownKeys().getOrDefault("orderingKey", ""));
+ // PubsubMessage uses `null` to represent no ordering key where we want a default of "".
+ if (pubsubMessage.getOrderingKey() != null) {
+ protoMessage.setOrderingKey(pubsubMessage.getOrderingKey());
+ } else {
+ protoMessage.setOrderingKey("");
+ }
incomingMessages.add(
IncomingMessage.of(
protoMessage.build(),
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index aad9729..22c1cb1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -101,7 +101,7 @@
.setAttributes(
ImmutableMap.of(
TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID))
- .set("orderingKey", ORDERING_KEY);
+ .setOrderingKey(ORDERING_KEY);
ReceivedMessage expectedReceivedMessage =
new ReceivedMessage().setMessage(expectedPubsubMessage).setAckId(ACK_ID);
PullResponse expectedResponse =
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 8fdc4a4..54c63e7 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -530,6 +530,92 @@
return 1
+class MapCoderImpl(StreamCoderImpl):
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Note this implementation always uses nested context when encoding keys
+ and values. This differs from Java's MapCoder, which uses
+ nested=False if possible for the last value encoded.
+
+ This difference is acceptable because MapCoder is not standard. It is only
+ used in a standard context by RowCoder which always uses nested context for
+ attribute values.
+
+ A coder for typing.Mapping objects."""
+ def __init__(
+ self,
+ key_coder, # type: CoderImpl
+ value_coder # type: CoderImpl
+ ):
+ self._key_coder = key_coder
+ self._value_coder = value_coder
+
+ def encode_to_stream(self, dict_value, out, nested):
+ out.write_bigendian_int32(len(dict_value))
+ for key, value in dict_value.items():
+ # Note this implementation always uses nested context when encoding keys
+ # and values which differs from Java. See note in docstring.
+ self._key_coder.encode_to_stream(key, out, True)
+ self._value_coder.encode_to_stream(value, out, True)
+
+ def decode_from_stream(self, in_stream, nested):
+ size = in_stream.read_bigendian_int32()
+ result = {}
+ for _ in range(size):
+ # Note this implementation always uses nested context when encoding keys
+ # and values which differs from Java. See note in docstring.
+ key = self._key_coder.decode_from_stream(in_stream, True)
+ value = self._value_coder.decode_from_stream(in_stream, True)
+ result[key] = value
+
+ return result
+
+ def estimate_size(self, unused_value, nested=False):
+ estimate = 4 # 4 bytes for int32 size prefix
+ for key, value in unused_value.items():
+ estimate += self._key_coder.estimate_size(key, True)
+ estimate += self._value_coder.estimate_size(value, True)
+ return estimate
+
+
+class NullableCoderImpl(StreamCoderImpl):
+ """For internal use only; no backwards-compatibility guarantees.
+
+ A coder for typing.Optional objects."""
+
+ ENCODE_NULL = 0
+ ENCODE_PRESENT = 1
+
+ def __init__(
+ self,
+ value_coder # type: CoderImpl
+ ):
+ self._value_coder = value_coder
+
+ def encode_to_stream(self, value, out, nested):
+ if value is None:
+ out.write_byte(self.ENCODE_NULL)
+ else:
+ out.write_byte(self.ENCODE_PRESENT)
+ self._value_coder.encode_to_stream(value, out, nested)
+
+ def decode_from_stream(self, in_stream, nested):
+ null_indicator = in_stream.read_byte()
+ if null_indicator == self.ENCODE_NULL:
+ return None
+ elif null_indicator == self.ENCODE_PRESENT:
+ return self._value_coder.decode_from_stream(in_stream, nested)
+ else:
+ raise ValueError(
+ "Encountered unexpected value for null indicator: '%s'" %
+ null_indicator)
+
+ def estimate_size(self, unused_value, nested=False):
+ return 1 + (
+ self._value_coder.estimate_size(unused_value)
+ if unused_value is not None else 0)
+
+
class FloatCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
def encode_to_stream(self, value, out, nested):
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 399a46d..b6aca0a 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -83,6 +83,8 @@
'FastPrimitivesCoder',
'FloatCoder',
'IterableCoder',
+ 'MapCoder',
+ 'NullableCoder',
'PickleCoder',
'ProtoCoder',
'SingletonCoder',
@@ -520,6 +522,57 @@
Coder.register_structured_urn(common_urns.coders.BOOL.urn, BooleanCoder)
+class MapCoder(FastCoder):
+ def __init__(self, key_coder, value_coder):
+ # type: (Coder, Coder) -> None
+ self._key_coder = key_coder
+ self._value_coder = value_coder
+
+ def _create_impl(self):
+ return coder_impl.MapCoderImpl(
+ self._key_coder.get_impl(), self._value_coder.get_impl())
+
+ def to_type_hint(self):
+ return typehints.Dict[self._key_coder.to_type_hint(),
+ self._value_coder.to_type_hint()]
+
+ def is_deterministic(self):
+ # () -> bool
+ # Map ordering is non-deterministic
+ return False
+
+ def __eq__(self, other):
+ return (
+ type(self) == type(other) and self._key_coder == other._key_coder and
+ self._value_coder == other._value_coder)
+
+ def __hash__(self):
+ return hash(type(self)) + hash(self._key_coder) + hash(self._value_coder)
+
+
+class NullableCoder(FastCoder):
+ def __init__(self, value_coder):
+ # type: (Coder) -> None
+ self._value_coder = value_coder
+
+ def _create_impl(self):
+ return coder_impl.NullableCoderImpl(self._value_coder.get_impl())
+
+ def to_type_hint(self):
+ return typehints.Optional[self._value_coder.to_type_hint()]
+
+ def is_deterministic(self):
+ # () -> bool
+ return self._value_coder.is_deterministic()
+
+ def __eq__(self, other):
+ return (
+ type(self) == type(other) and self._value_coder == other._value_coder)
+
+ def __hash__(self):
+ return hash(type(self)) + hash(self._value_coder)
+
+
class VarIntCoder(FastCoder):
"""Variable-length integer coder."""
def _create_impl(self):
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index f1a771a..e1ce23b 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -82,8 +82,8 @@
coders.ToBytesCoder
])
cls.seen_nested -= set([coders.ProtoCoder, CustomCoder])
- assert not standard - cls.seen
- assert not cls.seen_nested - standard
+ assert not standard - cls.seen, str(standard - cls.seen)
+ assert not cls.seen_nested - standard, str(cls.seen_nested - standard)
@classmethod
def _observe(cls, coder):
@@ -560,6 +560,16 @@
context=context,
test_size_estimation=False)
+ def test_nullable_coder(self):
+ self.check_coder(coders.NullableCoder(coders.VarIntCoder()), None, 2 * 64)
+
+ def test_map_coder(self):
+ self.check_coder(
+ coders.MapCoder(coders.VarIntCoder(), coders.StrUtf8Coder()), {
+ 1: "one", 300: "three hundred"
+ }, {}, {i: str(i)
+ for i in range(5000)})
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py
index 3ad880f..02c7f06 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -30,6 +30,8 @@
from apache_beam.coders.coders import FastCoder
from apache_beam.coders.coders import FloatCoder
from apache_beam.coders.coders import IterableCoder
+from apache_beam.coders.coders import MapCoder
+from apache_beam.coders.coders import NullableCoder
from apache_beam.coders.coders import StrUtf8Coder
from apache_beam.coders.coders import TupleCoder
from apache_beam.coders.coders import VarIntCoder
@@ -58,8 +60,9 @@
to encode/decode.
"""
self.schema = schema
+ # Use non-null coders because null values are represented separately
self.components = [
- RowCoder.coder_from_type(field.type) for field in self.schema.fields
+ _nonnull_coder_from_type(field.type) for field in self.schema.fields
]
def _create_impl(self):
@@ -102,32 +105,6 @@
# type: (bytes) -> RowCoder
return RowCoder(proto_utils.parse_Bytes(payload, schema_pb2.Schema))
- @staticmethod
- def coder_from_type(field_type):
- type_info = field_type.WhichOneof("type_info")
- if type_info == "atomic_type":
- if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64):
- return VarIntCoder()
- elif field_type.atomic_type == schema_pb2.DOUBLE:
- return FloatCoder()
- elif field_type.atomic_type == schema_pb2.STRING:
- return StrUtf8Coder()
- elif field_type.atomic_type == schema_pb2.BOOLEAN:
- return BooleanCoder()
- elif field_type.atomic_type == schema_pb2.BYTES:
- return BytesCoder()
- elif type_info == "array_type":
- return IterableCoder(
- RowCoder.coder_from_type(field_type.array_type.element_type))
- elif type_info == "row_type":
- return RowCoder(field_type.row_type.schema)
-
- # The Java SDK supports several more types, but the coders are not yet
- # standard, and are not implemented in Python.
- raise ValueError(
- "Encountered a type that is not currently supported by RowCoder: %s" %
- field_type)
-
def __reduce__(self):
# when pickling, use bytes representation of the schema. schema_pb2.Schema
# objects cannot be pickled.
@@ -137,6 +114,43 @@
typecoders.registry.register_coder(row_type.RowTypeConstraint, RowCoder)
+def _coder_from_type(field_type):
+ coder = _nonnull_coder_from_type(field_type)
+ if field_type.nullable:
+ return NullableCoder(coder)
+ else:
+ return coder
+
+
+def _nonnull_coder_from_type(field_type):
+ type_info = field_type.WhichOneof("type_info")
+ if type_info == "atomic_type":
+ if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64):
+ return VarIntCoder()
+ elif field_type.atomic_type == schema_pb2.DOUBLE:
+ return FloatCoder()
+ elif field_type.atomic_type == schema_pb2.STRING:
+ return StrUtf8Coder()
+ elif field_type.atomic_type == schema_pb2.BOOLEAN:
+ return BooleanCoder()
+ elif field_type.atomic_type == schema_pb2.BYTES:
+ return BytesCoder()
+ elif type_info == "array_type":
+ return IterableCoder(_coder_from_type(field_type.array_type.element_type))
+ elif type_info == "map_type":
+ return MapCoder(
+ _coder_from_type(field_type.map_type.key_type),
+ _coder_from_type(field_type.map_type.value_type))
+ elif type_info == "row_type":
+ return RowCoder(field_type.row_type.schema)
+
+ # The Java SDK supports several more types, but the coders are not yet
+ # standard, and are not implemented in Python.
+ raise ValueError(
+ "Encountered a type that is not currently supported by RowCoder: %s" %
+ field_type)
+
+
class RowCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
SIZE_CODER = VarIntCoder().get_impl()
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py
index 65b1024..4277e58 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -45,14 +45,15 @@
("aliases", typing.List[unicode]),
("knows_javascript", bool),
# TODO(BEAM-7372): Use bytes instead of ByteString
- ("payload", typing.Optional[typing.ByteString])
+ ("payload", typing.Optional[typing.ByteString]),
+ ("custom_metadata", typing.Mapping[unicode, int])
])
coders_registry.register_coder(Person, RowCoder)
class RowCoderTest(unittest.TestCase):
- JON_SNOW = Person("Jon Snow", 23, None, ["crow", "wildling"], False, None)
+ JON_SNOW = Person("Jon Snow", 23, None, ["crow", "wildling"], False, None, {})
PEOPLE = [
JON_SNOW,
Person(
@@ -60,8 +61,9 @@
25,
"Westeros", ["Mother of Dragons"],
False,
- None),
- Person("Michael Bluth", 30, None, [], True, b"I've made a huge mistake")
+ None, {"dragons": 3}),
+ Person(
+ "Michael Bluth", 30, None, [], True, b"I've made a huge mistake", {})
]
def test_create_row_coder_from_named_tuple(self):
@@ -102,6 +104,15 @@
name="payload",
type=schema_pb2.FieldType(
atomic_type=schema_pb2.BYTES, nullable=True)),
+ schema_pb2.Field(
+ name="custom_metadata",
+ type=schema_pb2.FieldType(
+ map_type=schema_pb2.MapType(
+ key_type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.STRING),
+ value_type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.INT64),
+ ))),
])
coder = RowCoder(schema)
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 4a13895..df88978 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -29,6 +29,7 @@
import sys
import unittest
from builtins import map
+from copy import deepcopy
from typing import Dict
from typing import Tuple
@@ -78,6 +79,13 @@
def value_parser_from_schema(schema):
def attribute_parser_from_type(type_):
+ parser = nonnull_attribute_parser_from_type(type_)
+ if type_.nullable:
+ return lambda x: None if x is None else parser(x)
+ else:
+ return parser
+
+ def nonnull_attribute_parser_from_type(type_):
# TODO: This should be exhaustive
type_info = type_.WhichOneof("type_info")
if type_info == "atomic_type":
@@ -89,8 +97,8 @@
element_parser = attribute_parser_from_type(type_.array_type.element_type)
return lambda x: list(map(element_parser, x))
elif type_info == "map_type":
- key_parser = attribute_parser_from_type(type_.array_type.key_type)
- value_parser = attribute_parser_from_type(type_.array_type.value_type)
+ key_parser = attribute_parser_from_type(type_.map_type.key_type)
+ value_parser = attribute_parser_from_type(type_.map_type.value_type)
return lambda x: dict(
(key_parser(k), value_parser(v)) for k, v in x.items())
@@ -101,6 +109,7 @@
def value_parser(x):
result = []
+ x = deepcopy(x)
for name, parser in parsers:
value = x.pop(name)
result.append(None if value is None else parser(value))
diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py
index 7780cd3..0b1c296 100644
--- a/sdks/python/apache_beam/dataframe/frame_base.py
+++ b/sdks/python/apache_beam/dataframe/frame_base.py
@@ -16,7 +16,9 @@
from __future__ import absolute_import
+import functools
import inspect
+import sys
from typing import Any
from typing import Callable
from typing import Dict
@@ -29,6 +31,18 @@
from apache_beam.dataframe import expressions
from apache_beam.dataframe import partitionings
+# pylint: disable=deprecated-method
+if sys.version_info < (3, ):
+ _getargspec = inspect.getargspec
+
+ def _unwrap(func):
+ while hasattr(func, '__wrapped__'):
+ func = func.__wrapped__
+ return func
+else:
+ _getargspec = inspect.getfullargspec
+ _unwrap = inspect.unwrap
+
class DeferredBase(object):
@@ -146,8 +160,7 @@
value = kwargs[key]
else:
try:
- # pylint: disable=deprecated-method
- ix = inspect.getargspec(func).args.index(key)
+ ix = _getargspec(func).args.index(key)
except ValueError:
# TODO: fix for delegation?
continue
@@ -226,6 +239,68 @@
return wrapper
+def maybe_inplace(func):
+ @functools.wraps(func)
+ def wrapper(self, inplace=False, **kwargs):
+ result = func(self, **kwargs)
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ return wrapper
+
+
+def args_to_kwargs(base_type):
+ def wrap(func):
+ arg_names = _getargspec(_unwrap(getattr(base_type, func.__name__))).args
+
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ for name, value in zip(arg_names, args):
+ if name in kwargs:
+ raise TypeError(
+ "%s() got multiple values for argument '%s'" %
+ (func.__name__, name))
+ kwargs[name] = value
+ return func(**kwargs)
+
+ return wrapper
+
+ return wrap
+
+
+def populate_defaults(base_type):
+ def wrap(func):
+ base_argspec = _getargspec(_unwrap(getattr(base_type, func.__name__)))
+ if not base_argspec.defaults:
+ return func
+
+ arg_to_default = dict(
+ zip(
+ base_argspec.args[-len(base_argspec.defaults):],
+ base_argspec.defaults))
+
+ unwrapped_func = _unwrap(func)
+ # args that do not have defaults in func, but do have defaults in base
+ func_argspec = _getargspec(unwrapped_func)
+ num_non_defaults = len(func_argspec.args) - len(func_argspec.defaults or ())
+ defaults_to_populate = set(
+ func_argspec.args[:num_non_defaults]).intersection(
+ arg_to_default.keys())
+
+ @functools.wraps(func)
+ def wrapper(**kwargs):
+ for name in defaults_to_populate:
+ if name not in kwargs:
+ kwargs[name] = arg_to_default[name]
+ return func(**kwargs)
+
+ return wrapper
+
+ return wrap
+
+
class WontImplementError(NotImplementedError):
"""An subclass of NotImplementedError to raise indicating that implementing
the given method is infeasible.
diff --git a/sdks/python/apache_beam/dataframe/frame_base_test.py b/sdks/python/apache_beam/dataframe/frame_base_test.py
index 392272c..b527da0 100644
--- a/sdks/python/apache_beam/dataframe/frame_base_test.py
+++ b/sdks/python/apache_beam/dataframe/frame_base_test.py
@@ -41,6 +41,59 @@
self.assertTrue(sub(x, b)._expr.evaluate_at(session).equals(a - b))
self.assertTrue(sub(a, y)._expr.evaluate_at(session).equals(a - b))
+ def test_maybe_inplace(self):
+ @frame_base.maybe_inplace
+ def add_one(frame):
+ return frame + 1
+
+ frames.DeferredSeries.add_one = add_one
+ original_expr = expressions.PlaceholderExpression(pd.Series([1, 2, 3]))
+ x = frames.DeferredSeries(original_expr)
+ x.add_one()
+ self.assertIs(x._expr, original_expr)
+ x.add_one(inplace=False)
+ self.assertIs(x._expr, original_expr)
+ x.add_one(inplace=True)
+ self.assertIsNot(x._expr, original_expr)
+
+ def test_args_to_kwargs(self):
+ class Base(object):
+ def func(self, a=1, b=2, c=3):
+ pass
+
+ class Proxy(object):
+ @frame_base.args_to_kwargs(Base)
+ def func(self, **kwargs):
+ return kwargs
+
+ proxy = Proxy()
+ # pylint: disable=too-many-function-args
+ self.assertEqual(proxy.func(), {})
+ self.assertEqual(proxy.func(100), {'a': 100})
+ self.assertEqual(proxy.func(2, 4, 6), {'a': 2, 'b': 4, 'c': 6})
+ self.assertEqual(proxy.func(2, c=6), {'a': 2, 'c': 6})
+ self.assertEqual(proxy.func(c=6, a=2), {'a': 2, 'c': 6})
+
+ def test_args_to_kwargs_populates_defaults(self):
+ class Base(object):
+ def func(self, a=1, b=2, c=3):
+ pass
+
+ class Proxy(object):
+ @frame_base.args_to_kwargs(Base)
+ @frame_base.populate_defaults(Base)
+ def func(self, a, c=1000, **kwargs):
+ return dict(kwargs, a=a, c=c)
+
+ proxy = Proxy()
+ # pylint: disable=too-many-function-args
+ self.assertEqual(proxy.func(), {'a': 1, 'c': 1000})
+ self.assertEqual(proxy.func(100), {'a': 100, 'c': 1000})
+ self.assertEqual(proxy.func(2, 4, 6), {'a': 2, 'b': 4, 'c': 6})
+ self.assertEqual(proxy.func(2, c=6), {'a': 2, 'c': 6})
+ self.assertEqual(proxy.func(c=6, a=2), {'a': 2, 'c': 6})
+ self.assertEqual(proxy.func(c=6), {'a': 1, 'c': 6})
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 89e9154..9e2e97a 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -54,29 +54,20 @@
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')
- def replace(
- self,
- to_replace=None,
- value=None,
- inplace=False,
- limit=None,
- *args,
- **kwargs):
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ @frame_base.maybe_inplace
+ def replace(self, limit, **kwargs):
if limit is None:
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
- result = frame_base.DeferredFrame.wrap(
+ return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'replace',
- lambda df: df.replace(
- to_replace, value, False, limit, *args, **kwargs), [self._expr],
+ lambda df: df.replace(limit=limit, **kwargs), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
- if inplace:
- self._expr = result._expr
- else:
- return result
def unstack(self, *args, **kwargs):
raise frame_base.WontImplementError('non-deferred column values')
@@ -159,14 +150,15 @@
def loc(self):
return _DeferredLoc(self)
- def aggregate(self, *args, **kwargs):
- if 'axis' in kwargs and kwargs['axis'] is None:
- return self.agg(*args, **dict(kwargs, axis=1)).agg(
- *args, **dict(kwargs, axis=0))
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ def aggregate(self, axis, **kwargs):
+ if axis is None:
+ return self.agg(axis=1, **kwargs).agg(axis=0, **kwargs)
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda df: df.agg(*args, **kwargs),
+ lambda df: df.agg(axis=axis, **kwargs),
[self._expr],
# TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
@@ -188,32 +180,22 @@
min = frame_base._associative_agg_method('min')
mode = frame_base._agg_method('mode')
- def dropna(
- self,
- axis=0,
- how='any',
- thresh=None,
- subset=None,
- inplace=False,
- *args,
- **kwargs):
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ @frame_base.maybe_inplace
+ def dropna(self, axis, **kwargs):
# TODO(robertwb): This is a common pattern. Generalize?
if axis == 1 or axis == 'columns':
requires_partition_by = partitionings.Singleton()
else:
requires_partition_by = partitionings.Nothing()
- result = frame_base.DeferredFrame.wrap(
+ return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'dropna',
- lambda df: df.dropna(
- axis, how, thresh, subset, False, *args, **kwargs),
+ lambda df: df.dropna(axis=axis, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
- if inplace:
- self._expr = result._expr
- else:
- return result
items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
'non-lazy')
@@ -223,13 +205,15 @@
prod = product = frame_base._associative_agg_method('prod')
- def quantile(self, q=0.5, axis=0, *args, **kwargs):
- if axis != 0:
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ def quantile(self, axis, **kwargs):
+ if axis == 1 or axis == 'columns':
raise frame_base.WontImplementError('non-deferred column values')
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'quantile',
- lambda df: df.quantile(q, axis, *args, **kwargs),
+ lambda df: df.quantile(axis=axis, **kwargs),
[self._expr],
#TODO(robertwb): Approximate quantiles?
requires_partition_by=partitionings.Singleton(),
@@ -237,28 +221,26 @@
query = frame_base._elementwise_method('query')
- def replace(self, to_replace=None,
- value=None,
- inplace=False,
- limit=None, *args, **kwargs):
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ @frame_base.maybe_inplace
+ def replace(self, limit, **kwargs):
if limit is None:
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
- result = frame_base.DeferredFrame.wrap(
+ return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'replace',
- lambda df: df.replace(
- to_replace, value, False, limit, *args, **kwargs),
+ lambda df: df.replace(limit=limit, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
- if inplace:
- self._expr = result._expr
- else:
- return result
- def reset_index(self, level=None, drop=False, inplace=False, *args, **kwargs):
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ @frame_base.maybe_inplace
+ def reset_index(self, level, **kwargs):
if level is not None and not isinstance(level, (tuple, list)):
level = [level]
if level is None or len(level) == len(self._expr.proxy().index.levels):
@@ -266,22 +248,20 @@
requires_partition_by = partitionings.Singleton()
else:
requires_partition_by = partitionings.Nothing()
- result = frame_base.DeferredFrame.wrap(
+ return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'reset_index',
- lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+ lambda df: df.reset_index(level=level, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
- if inplace:
- self._expr = result._expr
- else:
- return result
round = frame_base._elementwise_method('round')
select_dtypes = frame_base._elementwise_method('select_dtypes')
- def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ def shift(self, axis, **kwargs):
if axis == 1 or axis == 'columns':
requires_partition_by = partitionings.Nothing()
else:
@@ -289,7 +269,7 @@
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'shift',
- lambda df: df.shift(periods, freq, axis, *args, **kwargs),
+ lambda df: df.shift(axis=axis, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
@@ -298,24 +278,21 @@
def shape(self):
raise frame_base.WontImplementError('scalar value')
- def sort_values(
- self, by, axis=0, ascending=True, inplace=False, *args, **kwargs):
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ @frame_base.maybe_inplace
+ def sort_values(self, axis, **kwargs):
if axis == 1 or axis == 'columns':
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
- result = frame_base.DeferredFrame.wrap(
+ return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'sort_values',
- lambda df: df.sort_values(
- by, axis, ascending, False, *args, **kwargs),
+ lambda df: df.sort_values(axis=axis, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
- if inplace:
- self._expr = result._expr
- else:
- return result
stack = frame_base._elementwise_method('stack')
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test_py3.py b/sdks/python/apache_beam/examples/snippets/snippets_test_py3.py
index 0f0b668..5eb1d4b 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test_py3.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test_py3.py
@@ -96,6 +96,18 @@
ids = numbers | 'to_id' >> beam.Map(my_fn)
# [END type_hints_map_annotations]
+ # Example using an annotated PTransform.
+ with self.assertRaises(typehints.TypeCheckError):
+ # [START type_hints_ptransforms]
+ from apache_beam.pvalue import PCollection
+
+ class IntToStr(beam.PTransform):
+ def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
+ return pcoll | beam.Map(lambda elem: str(elem))
+
+ ids = numbers | 'convert to str' >> IntToStr()
+ # [END type_hints_ptransforms]
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 2ab121e..4a0e80f 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -68,13 +68,16 @@
from apache_beam.internal import pickler
from apache_beam.internal import util
from apache_beam.portability import python_urns
+from apache_beam.pvalue import DoOutputsTuple
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.display import HasDisplayData
from apache_beam.typehints import native_type_compatibility
from apache_beam.typehints import typehints
+from apache_beam.typehints.decorators import IOTypeHints
from apache_beam.typehints.decorators import TypeCheckError
from apache_beam.typehints.decorators import WithTypeHints
from apache_beam.typehints.decorators import get_signature
+from apache_beam.typehints.decorators import get_type_hints
from apache_beam.typehints.decorators import getcallargs_forhints
from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.typehints.typehints import validate_composite_type_param
@@ -350,6 +353,14 @@
# type: () -> str
return self.__class__.__name__
+ def default_type_hints(self):
+ fn_type_hints = IOTypeHints.from_callable(self.expand)
+ if fn_type_hints is not None:
+ fn_type_hints = fn_type_hints.strip_pcoll()
+
+ # Prefer class decorator type hints for backwards compatibility.
+ return get_type_hints(self.__class__).with_defaults(fn_type_hints)
+
def with_input_types(self, input_type_hint):
"""Annotates the input type of a :class:`PTransform` with a type-hint.
@@ -419,6 +430,8 @@
root_hint = (
arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
+ if isinstance(pvalue_, DoOutputsTuple):
+ continue
if pvalue_.element_type is None:
# TODO(robertwb): It's a bug that we ever get here. (typecheck)
continue
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index dad0a31..4cd7681 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -105,6 +105,7 @@
from typing import Optional
from typing import Tuple
from typing import TypeVar
+from typing import Union
from apache_beam.typehints import native_type_compatibility
from apache_beam.typehints import typehints
@@ -378,6 +379,75 @@
self.output_types and len(self.output_types[0]) == 1 and
not self.output_types[1])
+ def strip_pcoll(self):
+ from apache_beam.pipeline import Pipeline
+ from apache_beam.pvalue import PBegin
+ from apache_beam.pvalue import PDone
+
+ return self.strip_pcoll_helper(self.input_types,
+ self._has_input_types,
+ 'input_types',
+ [Pipeline, PBegin],
+ 'This input type hint will be ignored '
+ 'and not used for type-checking purposes. '
+ 'Typically, input type hints for a '
+ 'PTransform are single (or nested) types '
+ 'wrapped by a PCollection, or PBegin.',
+ 'strip_pcoll_input()').\
+ strip_pcoll_helper(self.output_types,
+ self.has_simple_output_type,
+ 'output_types',
+ [PDone, None],
+ 'This output type hint will be ignored '
+ 'and not used for type-checking purposes. '
+ 'Typically, output type hints for a '
+ 'PTransform are single (or nested) types '
+ 'wrapped by a PCollection, PDone, or None.',
+ 'strip_pcoll_output()')
+
+ def strip_pcoll_helper(
+ self,
+ my_type, # type: any
+ has_my_type, # type: Callable[[], bool]
+ my_key, # type: str
+ special_containers, # type: List[Union[PBegin, PDone, PCollection]]
+ error_str, # type: str
+ source_str # type: str
+ ):
+ # type: (...) -> IOTypeHints
+ from apache_beam.pvalue import PCollection
+
+ if not has_my_type() or not my_type or len(my_type[0]) != 1:
+ return self
+
+ my_type = my_type[0][0]
+
+ if isinstance(my_type, typehints.AnyTypeConstraint):
+ return self
+
+ special_containers += [PCollection]
+ kwarg_dict = {}
+
+ if (my_type not in special_containers and
+ getattr(my_type, '__origin__', None) != PCollection):
+ logging.warning(error_str + ' Got: %s instead.' % my_type)
+ kwarg_dict[my_key] = None
+ return self._replace(
+ origin=self._make_origin([self], tb=False, msg=[source_str]),
+ **kwarg_dict)
+
+ if (getattr(my_type, '__args__', -1) in [-1, None] or
+ len(my_type.__args__) == 0):
+ # e.g. PCollection (or PBegin/PDone)
+ kwarg_dict[my_key] = ((typehints.Any, ), {})
+ else:
+ # e.g. PCollection[type]
+ kwarg_dict[my_key] = ((convert_to_beam_type(my_type.__args__[0]), ), {})
+
+ return self._replace(
+ origin=self._make_origin([self], tb=False, msg=[source_str]),
+ **kwarg_dict)
+
def strip_iterable(self):
# type: () -> IOTypeHints
diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py
index 0cc513f..cb4cf01 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -178,6 +178,11 @@
return schema_pb2.FieldType(
array_type=schema_pb2.ArrayType(element_type=element_type))
+ elif _safe_issubclass(type_, Mapping):
+ key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+ return schema_pb2.FieldType(
+ map_type=schema_pb2.MapType(key_type=key_type, value_type=value_type))
+
raise ValueError("Unsupported type: %s" % type_)
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py b/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
index 2016871..e12930d 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
@@ -22,6 +22,7 @@
from __future__ import absolute_import
+import typing
import unittest
import apache_beam as beam
@@ -257,6 +258,135 @@
result = [1, 2, 3] | beam.FlatMap(fn) | beam.Map(fn2)
self.assertCountEqual([4, 6], result)
+ def test_typed_ptransform_with_no_error(self):
+ class StrToInt(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ _ = ['1', '2', '3'] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_bad_typehints(self):
+ class StrToInt(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ "Input type hint violation at IntToStr: "
+ "expected <class 'str'>, got <class 'int'>"):
+ _ = ['1', '2', '3'] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_bad_input(self):
+ class StrToInt(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ "Input type hint violation at StrToInt: "
+ "expected <class 'str'>, got <class 'int'>"):
+ # Feed integers to a PTransform that expects strings
+ _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_partial_typehints(self):
+ class StrToInt(beam.PTransform):
+ def expand(self, pcoll) -> beam.pvalue.PCollection[int]:
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ # Feed integers to a PTransform that should expect strings
+ # but has no typehints so it expects any
+ _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_bare_wrappers(self):
+ class StrToInt(beam.PTransform):
+ def expand(
+ self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_no_typehints(self):
+ class StrToInt(beam.PTransform):
+ def expand(self, pcoll):
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ # Feed integers to a PTransform that should expect strings
+ # but has no typehints so it expects any
+ _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_generic_annotations(self):
+ T = typing.TypeVar('T')
+
+ class IntToInt(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[T]:
+ return pcoll | beam.Map(lambda x: x)
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ _ = [1, 2, 3] | IntToInt() | IntToStr()
+
+ def test_typed_ptransform_with_do_outputs_tuple_compiles(self):
+ class MyDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs):
+ if element % 2:
+ yield beam.pvalue.TaggedOutput('odd', 1)
+ else:
+ yield beam.pvalue.TaggedOutput('even', 1)
+
+ class MyPTransform(beam.PTransform):
+ def expand(self, pcoll: beam.pvalue.PCollection[int]):
+ return pcoll | beam.ParDo(MyDoFn()).with_outputs('odd', 'even')
+
+ # This test fails if you remove the following line from ptransform.py
+ # if isinstance(pvalue_, DoOutputsTuple): continue
+ _ = [1, 2, 3] | MyPTransform()
+
class AnnotationsTest(unittest.TestCase):
def test_pardo_dofn(self):
diff --git a/sdks/python/apache_beam/typehints/typehints_test_py3.py b/sdks/python/apache_beam/typehints/typehints_test_py3.py
index a7c23f0..5a36330 100644
--- a/sdks/python/apache_beam/typehints/typehints_test_py3.py
+++ b/sdks/python/apache_beam/typehints/typehints_test_py3.py
@@ -23,11 +23,19 @@
from __future__ import absolute_import
from __future__ import print_function
+import typing
import unittest
+import apache_beam.typehints.typehints as typehints
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam.pvalue import PBegin
+from apache_beam.pvalue import PCollection
+from apache_beam.pvalue import PDone
from apache_beam.transforms.core import DoFn
from apache_beam.typehints import KV
from apache_beam.typehints import Iterable
+from apache_beam.typehints.typehints import Any
class TestParDoAnnotations(unittest.TestCase):
@@ -46,11 +54,221 @@
def process(self, element: int) -> Iterable[str]:
pass
- print(MyDoFn().get_type_hints())
th = MyDoFn().get_type_hints()
self.assertEqual(th.input_types, ((int, ), {}))
self.assertEqual(th.output_types, ((str, ), {}))
+class TestPTransformAnnotations(unittest.TestCase):
+ def test_pep484_annotations(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_annotations_without_input_pcollection_wrapper(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: int) -> PCollection[str]:
+ return pcoll | Map(lambda num: str(num))
+
+ error_str = (
+ r'This input type hint will be ignored and not used for '
+ r'type-checking purposes. Typically, input type hints for a '
+ r'PTransform are single (or nested) types wrapped by a '
+ r'PCollection, or PBegin. Got: {} instead.'.format(int))
+
+ with self.assertLogs(level='WARN') as log:
+ MyPTransform().get_type_hints()
+ self.assertIn(error_str, log.output[0])
+
+ def test_annotations_without_output_pcollection_wrapper(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection[int]) -> str:
+ return pcoll | Map(lambda num: str(num))
+
+ error_str = (
+ r'This output type hint will be ignored and not used for '
+ r'type-checking purposes. Typically, output type hints for a '
+ r'PTransform are single (or nested) types wrapped by a '
+ r'PCollection, PDone, or None. Got: {} instead.'.format(str))
+
+ with self.assertLogs(level='WARN') as log:
+ th = MyPTransform().get_type_hints()
+ self.assertIn(error_str, log.output[0])
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, None)
+
+ def test_annotations_without_input_internal_type(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection) -> PCollection[str]:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_annotations_without_output_internal_type(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection[int]) -> PCollection:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_without_any_internal_type(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection) -> PCollection:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_without_input_typehint(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll) -> PCollection[str]:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_annotations_without_output_typehint(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection[int]):
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_without_any_typehints(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll):
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, None)
+ self.assertEqual(th.output_types, None)
+
+ def test_annotations_with_pbegin(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PBegin):
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_with_pdone(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll) -> PDone:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_with_none_input(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: None) -> PCollection[str]:
+ return pcoll | Map(lambda num: str(num))
+
+ error_str = (
+ r'This input type hint will be ignored and not used for '
+ r'type-checking purposes. Typically, input type hints for a '
+ r'PTransform are single (or nested) types wrapped by a '
+ r'PCollection, or PBegin. Got: {} instead.'.format(None))
+
+ with self.assertLogs(level='WARN') as log:
+ th = MyPTransform().get_type_hints()
+ self.assertIn(error_str, log.output[0])
+ self.assertEqual(th.input_types, None)
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_annotations_with_none_output(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll) -> None:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_with_arbitrary_output(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll) -> str:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, None)
+
+ def test_annotations_with_arbitrary_input_and_output(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: int) -> str:
+ return pcoll | Map(lambda num: str(num))
+
+ input_error_str = (
+ r'This input type hint will be ignored and not used for '
+ r'type-checking purposes. Typically, input type hints for a '
+ r'PTransform are single (or nested) types wrapped by a '
+ r'PCollection, or PBegin. Got: {} instead.'.format(int))
+
+ output_error_str = (
+ r'This output type hint will be ignored and not used for '
+ r'type-checking purposes. Typically, output type hints for a '
+ r'PTransform are single (or nested) types wrapped by a '
+ r'PCollection, PDone, or None. Got: {} instead.'.format(str))
+
+ with self.assertLogs(level='WARN') as log:
+ th = MyPTransform().get_type_hints()
+ self.assertIn(input_error_str, log.output[0])
+ self.assertIn(output_error_str, log.output[1])
+ self.assertEqual(th.input_types, None)
+ self.assertEqual(th.output_types, None)
+
+ def test_typing_module_annotations_are_converted_to_beam_annotations(self):
+ class MyPTransform(PTransform):
+ def expand(
+ self, pcoll: PCollection[typing.Dict[str, str]]
+ ) -> PCollection[typing.Dict[str, str]]:
+ return pcoll
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
+ self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
+
+ def test_nested_typing_annotations_are_converted_to_beam_annotations(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll:
+ PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]) \
+ -> PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]:
+ return pcoll
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(
+ th.input_types,
+ ((typehints.Union[int, typehints.Any, typehints.Dict[str,
+ float]], ), {}))
+ self.assertEqual(
+ th.input_types,
+ ((typehints.Union[int, typehints.Any, typehints.Dict[str,
+ float]], ), {}))
+
+ def test_mixed_annotations_are_converted_to_beam_annotations(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: typing.Any) -> typehints.Any:
+ return pcoll
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((typehints.Any, ), {}))
+ self.assertEqual(th.input_types, ((typehints.Any, ), {}))
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/website/www/site/content/en/documentation/io/built-in/snowflake.md b/website/www/site/content/en/documentation/io/built-in/snowflake.md
new file mode 100644
index 0000000..078a373
--- /dev/null
+++ b/website/www/site/content/en/documentation/io/built-in/snowflake.md
@@ -0,0 +1,364 @@
+---
+title: "Apache Snowflake I/O connector"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+[Built-in I/O Transforms](/documentation/io/built-in/)
+
+# Snowflake I/O
+Pipeline options and general information about using and running Snowflake IO.
+
+## Authentication
+All authentication methods available for the Snowflake JDBC Driver are possible to use with the IO transforms:
+
+- Username and password
+- Key pair
+- OAuth token
+
+Passing credentials is done via Pipeline options.
+
+Passing credentials is done via Pipeline options used to instantiate `SnowflakeIO.DataSourceConfiguration`:
+{{< highlight java >}}
+SnowflakePipelineOptions options = PipelineOptionsFactory
+ .fromArgs(args)
+ .withValidation()
+ .as(SnowflakePipelineOptions.class);
+SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(options);
+
+SnowflakeIO.DataSourceConfiguration.create(credentials)
+ .(other DataSourceConfiguration options)
+{{< /highlight >}}
+### Username and password
+To use username/password authentication in SnowflakeIO, invoke your pipeline with the following Pipeline options:
+{{< highlight >}}
+--username=<USERNAME> --password=<PASSWORD>
+{{< /highlight >}}
+### Key pair
+To use this authentication method, you must first generate a key pair and associate the public key with the Snowflake user that will connect using the IO transform. For instructions, see the [Snowflake documentation](https://docs.snowflake.com/en/user-guide/jdbc-configure.html).
+
+To use key pair authentication with SnowflakeIO, invoke your pipeline with following Pipeline options:
+{{< highlight >}}
+--username=<USERNAME> --privateKeyPath=<PATH_TO_P8_FILE> --privateKeyPassphrase=<PASSWORD_FOR_KEY>
+{{< /highlight >}}
+
+### OAuth token
+SnowflakeIO also supports OAuth token.
+
+**IMPORTANT**: SnowflakeIO requires a valid OAuth access token. It will neither be able to refresh the token nor obtain it using a web-based flow. For information on configuring an OAuth integration and obtaining the token, see the [Snowflake documentation](https://docs.snowflake.com/en/user-guide/oauth-intro.html).
+
+Once you have the token, invoke your pipeline with following Pipeline Options:
+{{< highlight >}}
+--oauthToken=<TOKEN>
+{{< /highlight >}}
+## DataSource Configuration
+DataSource configuration is required in both read and write object for configuring Snowflake connection properties for IO purposes.
+### General usage
+Create the DataSource configuration:
+{{< highlight java >}}
+ SnowflakeIO.DataSourceConfiguration
+ .create(SnowflakeCredentialsFactory.of(options))
+ .withUrl(options.getUrl())
+ .withServerName(options.getServerName())
+ .withDatabase(options.getDatabase())
+ .withWarehouse(options.getWarehouse())
+ .withSchema(options.getSchema());
+{{< /highlight >}}
+Where parameters can be:
+
+- ` .withUrl(...)`
+ - JDBC-like URL for your Snowflake account, including account name and region, without any parameters.
+ - Example: `.withUrl("jdbc:snowflake://account.snowflakecomputing.com")`
+- `.withServerName(...)`
+ - Server Name - full server name with account, zone and domain.
+ - Example: `.withServerName("account.snowflakecomputing.com")`
+- `.withDatabase(...)`
+ - Name of the Snowflake database to use.
+ - Example: `.withDatabase("MY_DATABASE")`
+- `.withWarehouse(...)`
+ - Name of the Snowflake warehouse to use. This parameter is optional. If no warehouse name is specified, the default warehouse for the user is used.
+ - Example: `.withWarehouse("MY_WAREHOUSE")`
+- `.withSchema(...)`
+ - Name of the schema in the database to use. This parameter is optional.
+ - Example: `.withSchema("PUBLIC")`
+
+
+**Note** - either `.withUrl(...)` or `.withServerName(...)` **is required**.
+
+## Pipeline options
+Use Beam’s [Pipeline options](https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/options/PipelineOptions.html) to set options via the command line.
+### Snowflake Pipeline options
+Snowflake IO library supports following options that can be passed via the [command line](https://beam.apache.org/documentation/io/built-in/snowflake/#running-main-command-with-pipeline-options) by default when a Pipeline uses them:
+
+`--url` Snowflake's JDBC-like url including account name and region without any parameters.
+
+`--serverName` Full server name with account, zone and domain.
+
+`--username` Required for username/password and Private Key authentication.
+
+`--oauthToken` Required for OAuth authentication only.
+
+`--password` Required for username/password authentication only.
+
+`--privateKeyPath` Path to Private Key file. Required for Private Key authentication only.
+
+`--privateKeyPassphrase` Private Key's passphrase. Required for Private Key authentication only.
+
+`--stagingBucketName` External bucket path ending with `/`. I.e. `gs://bucket/`. Sub-directories are allowed.
+
+`--storageIntegrationName` Storage integration name
+
+`--warehouse` Warehouse to use. Optional.
+
+`--database` Database name to connect to. Optional.
+
+`--schema` Schema to use. Optional.
+
+`--table` Table to use. Optional.
+
+`--query` Query to use. Optional.
+
+`--role` Role to use. Optional.
+
+`--authenticator` Authenticator to use. Optional.
+
+`--portNumber` Port number. Optional.
+
+`--loginTimeout` Login timeout. Optional.
+
+## Running pipelines on Dataflow
+By default, pipelines are run on [Direct Runner](https://beam.apache.org/documentation/runners/direct/) on your local machine. To run a pipeline on [Google Dataflow](https://cloud.google.com/dataflow/), you must provide the following Pipeline options:
+
+- `--runner=DataflowRunner`
+ - The Dataflow’s specific runner.
+
+- `--project=<GCS PROJECT>`
+ - Name of the Google Cloud Platform project.
+
+- `--stagingBucketName=<GCS BUCKET NAME>`
+ - Google Cloud Services bucket where the Beam files will be staged.
+
+- `--maxNumWorkers=5`
+ - (optional) Maximum number of workers.
+
+- `--appName=<JOB NAME>`
+ - (optional) Prefix for the job name in the Dataflow Dashboard.
+
+More pipeline options for Dataflow can be found [here](https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.html).
+
+**Note**: To properly authenticate with Google Cloud, please use [gcloud](https://cloud.google.com/sdk/gcloud/) or follow the [Google Cloud documentation](https://cloud.google.com/docs/authentication/).
+
+**Important**: Please acknowledge [Google Dataflow pricing](Important: Please acknowledge Google Dataflow pricing).
+
+## Writing to Snowflake tables
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/values/PCollection.html) to your Snowflake database.
+### Batch write (from a bounded source)
+The basic .`write()` operation usage is as follows:
+{{< highlight java >}}
+data.apply(
+ SnowflakeIO.<type>write()
+ .withDataSourceConfiguration(dc)
+ .to("MY_TABLE")
+ .withStagingBucketName("BUCKET NAME")
+ .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+ .withUserDataMapper(mapper)
+)
+{{< /highlight >}}
+Replace type with the data type of the PCollection object to write; for example, SnowflakeIO.<String> for an input PCollection of Strings.
+
+All the below parameters are required:
+
+- `.withDataSourceConfiguration()` Accepts a DatasourceConfiguration object.
+
+- `.to()` Accepts the target Snowflake table name.
+
+- `.withStagingBucketName()` Accepts a cloud bucket path ended with slash.
+ -Example: `.withStagingBucketName("gs://mybucket/my/dir/")`
+
+- `.withStorageIntegrationName()` Accepts a name of a Snowflake storage integration object created according to Snowflake documentationt. Example:
+{{< highlight >}}
+CREATE OR REPLACE STORAGE INTEGRATION test_integration
+TYPE = EXTERNAL_STAGE
+STORAGE_PROVIDER = GCS
+ENABLED = TRUE
+STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
+{{< /highlight >}}
+Then:
+{{< highlight >}}
+.withStorageIntegrationName(test_integration)
+{{< /highlight >}}
+
+- `.withUserDataMapper()` Accepts the UserDataMapper function that will map a user's PCollection to an array of String values `(String[])`.
+
+**Note**:
+SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
+
+### UserDataMapper function
+The UserDataMapper function is required to map data from a PCollection to an array of String values before the `write()` operation saves the data to temporary .csv files. For example:
+{{< highlight java >}}
+public static SnowflakeIO.UserDataMapper<Long> getCsvMapper() {
+ return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new String[] {recordLine.toString()};
+}
+{{< /highlight >}}
+
+### Additional write options
+#### Transformation query
+The `.withQueryTransformation()` option for the `write()` operation accepts a SQL query as a String value, which will be performed while transfering data staged in CSV files directly to the target Snowflake table. For information about the transformation SQL syntax, see the [Snowflake Documentation](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html#transformation-parameters).
+
+Usage:
+{{< highlight java >}}
+String query = "SELECT t.$1 from YOUR_TABLE;";
+data.apply(
+ SnowflakeIO.<~>write()
+ .withDataSourceConfiguration(dc)
+ .to("MY_TABLE")
+ .withStagingBucketName("BUCKET NAME")
+ .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+ .withUserDataMapper(mapper)
+ .withQueryTransformation(query)
+)
+{{< /highlight >}}
+
+#### Write disposition
+Define the write behaviour based on the table where data will be written to by specifying the `.withWriteDisposition(...)` option for the `write()` operation. The following values are supported:
+
+- APPEND - Default behaviour. Written data is added to the existing rows in the table,
+
+- EMPTY - The target table must be empty; otherwise, the write operation fails,
+
+- TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+Example of usage:
+{{< highlight java >}}
+data.apply(
+ SnowflakeIO.<~>write()
+ .withDataSourceConfiguration(dc)
+ .to("MY_TABLE")
+ .withStagingBucketName("BUCKET NAME")
+ .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+ .withUserDataMapper(mapper)
+ .withWriteDisposition(TRUNCATE)
+)
+{{< /highlight >}}
+
+#### Create disposition
+The `.withCreateDisposition()` option defines the behavior of the write operation if the target table does not exist . The following values are supported:
+
+- CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the `.withTableSchema()` option.
+
+- CREATE_NEVER - The write operation fails if the target table does not exist.
+
+Usage:
+{{< highlight java >}}
+data.apply(
+ SnowflakeIO.<~>write()
+ .withDataSourceConfiguration(dc)
+ .to("MY_TABLE")
+ .withStagingBucketName("BUCKET NAME")
+ .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+ .withUserDataMapper(mapper)
+ .withCreateDisposition(CREATE_NEVER)
+)
+{{< /highlight >}}
+
+#### Table schema disposition
+When the `.withCreateDisposition()` .option is set to `CREATE_IF_NEEDED`, the `.withTableSchema()` option enables specifying the schema for the created target table.
+A table schema is a list of `SFColumn` objects with name and type corresponding to column type for each column in the table.
+
+Usage:
+{{< highlight java >}}
+SFTableSchema tableSchema =
+ new SFTableSchema(
+ SFColumn.of("my_date", new SFDate(), true),
+ new SFColumn("id", new SFNumber()),
+ SFColumn.of("name", new SFText(), true));
+
+data.apply(
+ SnowflakeIO.<~>write()
+ .withDataSourceConfiguration(dc)
+ .to("MY_TABLE")
+ .withStagingBucketName("BUCKET NAME")
+ .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+ .withUserDataMapper(mapper)
+ .withTableSchema(tableSchema)
+)
+{{< /highlight >}}
+## Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/values/PCollection.html) of user-defined data type.
+
+### General usage
+
+The basic `.read()` operation usage:
+{{< highlight java >}}
+PCollection<USER_DATA_TYPE> items = pipeline.apply(
+ SnowflakeIO.<USER_DATA_TYPE>read()
+ .withDataSourceConfiguration(dc)
+ .fromTable("MY_TABLE") // or .fromQuery("QUERY")
+ .withStagingBucketName("BUCKET NAME")
+ .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+ .withCsvMapper(mapper)
+ .withCoder(coder));
+)
+{{< /highlight >}}
+Where all below parameters are required:
+
+- `.withDataSourceConfiguration(...)`
+ - Accepts a DataSourceConfiguration object.
+
+- `.fromTable(...) or .fromQuery(...)`
+ - Specifies a Snowflake table name or custom SQL query.
+
+- `.withStagingBucketName()`
+ - Accepts a cloud bucket name.
+
+- `.withStorageIntegrationName()`
+ - Accepts a name of a Snowflake storage integration object created according to Snowflake documentation. Example:
+{{< highlight >}}
+CREATE OR REPLACE STORAGE INTEGRATION test_integration
+TYPE = EXTERNAL_STAGE
+STORAGE_PROVIDER = GCS
+ENABLED = TRUE
+STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
+{{< /highlight >}}
+Then:
+{{< highlight >}}
+.withStorageIntegrationName(test_integration)
+{{< /highlight >}}
+
+- `.withCsvMapper(mapper)`
+ - Accepts a [CSVMapper](https://beam.apache.org/documentation/io/built-in/snowflake/#csvmapper) instance for mapping String[] to USER_DATA_TYPE.
+- `.withCoder(coder)`
+ - Accepts the [Coder](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/coders/Coder.html) for USER_DATA_TYPE.
+
+**Note**:
+SnowflakeIO uses COPY statements behind the scenes to read (using [COPY to location](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)) files staged in cloud storage.StagingBucketName will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+### CSVMapper
+SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library.
+
+The CSVMapper’s job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom POJO.
+
+Example implementation of CsvMapper for GenericRecord:
+{{< highlight java >}}
+static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
+ return (SnowflakeIO.CsvMapper<GenericRecord>)
+ parts -> {
+ return new GenericRecordBuilder(PARQUET_SCHEMA)
+ .set("ID", Long.valueOf(parts[0]))
+ .set("NAME", parts[1])
+ [...]
+ .build();
+ };
+}
+{{< /highlight >}}
\ No newline at end of file
diff --git a/website/www/site/content/en/documentation/patterns/ai-platform.md b/website/www/site/content/en/documentation/patterns/ai-platform.md
index 905f895..b2a7b10 100644
--- a/website/www/site/content/en/documentation/patterns/ai-platform.md
+++ b/website/www/site/content/en/documentation/patterns/ai-platform.md
@@ -35,7 +35,7 @@
{{< /highlight >}}
{{< highlight java >}}
-// Java examples will be available on Beam 2.23 release.
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" NlpAnalyzeText >}}
{{< /highlight >}}
@@ -79,7 +79,7 @@
{{< /highlight >}}
{{< highlight java >}}
-// Java examples will be available on Beam 2.23 release.
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" NlpExtractSentiments >}}
{{< /highlight >}}
The snippet loops over `sentences` and, for each sentence, extracts the sentiment score.
@@ -99,7 +99,7 @@
{{< /highlight >}}
{{< highlight java >}}
-// Java examples will be available on Beam 2.23 release.
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" NlpExtractEntities >}}
{{< /highlight >}}
Entities can be found in `entities` attribute. Just like before, `entities` is a sequence, that's why list comprehension is a viable choice. The most tricky part is interpreting the types of entities. Natural Language API defines entity types as enum. In a response object, entity types are returned as integers. That's why a user has to instantiate `naturallanguageml.enums.Entity.Type` to access a human-readable name.
@@ -119,7 +119,7 @@
{{< /highlight >}}
{{< highlight java >}}
-// Java examples will be available on Beam 2.23 release.
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" NlpAnalyzeDependencyTree >}}
{{< /highlight >}}
The output is below. For better readability, indexes are replaced by text which they refer to:
diff --git a/website/www/site/content/en/documentation/sdks/python-type-safety.md b/website/www/site/content/en/documentation/sdks/python-type-safety.md
index 074460c..755f795 100644
--- a/website/www/site/content/en/documentation/sdks/python-type-safety.md
+++ b/website/www/site/content/en/documentation/sdks/python-type-safety.md
@@ -71,7 +71,7 @@
Using Annotations has the added benefit of allowing use of a static type checker (such as mypy) to additionally type check your code.
If you already use a type checker, using annotations instead of decorators reduces code duplication.
However, annotations do not cover all the use cases that decorators and inline declarations do.
-Two such are the `expand` of a composite transform and lambda functions.
+For instance, they do not work for lambda functions.
### Declaring Type Hints Using Type Annotations
@@ -82,6 +82,7 @@
Annotations are currently supported on:
- `process()` methods on `DoFn` subclasses.
+ - `expand()` methods on `PTransform` subclasses.
- Functions passed to: `ParDo`, `Map`, `FlatMap`, `Filter`.
The following code declares an `int` input and a `str` output type hint on the `to_id` transform, using annotations on `my_fn`.
@@ -90,6 +91,15 @@
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test_py3.py" type_hints_map_annotations >}}
{{< /highlight >}}
+The following code demonstrates how to use annotations on `PTransform` subclasses.
+A valid annotation is a `PCollection` that wraps an internal (nested) type, `PBegin`, `PDone`, or `None`.
+The following code declares typehints on a custom PTransform, that takes a `PCollection[int]` input
+and outputs a `PCollection[str]`, using annotations.
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test_py3.py" type_hints_ptransforms >}}
+{{< /highlight >}}
+
The following code declares `int` input and output type hints on `filter_evens`, using annotations on `FilterEvensDoFn.process`.
Since `process` returns a generator, the output type for a DoFn producing a `PCollection[int]` is annotated as `Iterable[int]` (`Generator[int, None, None]` would also work here).
Beam will remove the outer iterable of the return type on the `DoFn.process` method and functions passed to `FlatMap` to deduce the element type of resulting PCollection .
@@ -182,6 +192,7 @@
* `Iterable[T]`
* `Iterator[T]`
* `Generator[T]`
+* `PCollection[T]`
**Note:** The `Tuple[T, U]` type hint is a tuple with a fixed number of heterogeneously typed elements, while the `Tuple[T, ...]` type hint is a tuple with a variable of homogeneously typed elements.
diff --git a/website/www/site/data/io_matrix.yaml b/website/www/site/data/io_matrix.yaml
index 5923a86..84637c6 100644
--- a/website/www/site/data/io_matrix.yaml
+++ b/website/www/site/data/io_matrix.yaml
@@ -305,6 +305,7 @@
url: https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.datastore.v1new.datastoreio.html
- transform: SnowflakeIO
description: Experimental Transforms for reading from and writing to [Snowflake](https://www.snowflake.com/).
+ docs: /documentation/io/built-in/snowflake
implementations:
- language: java
name: org.apache.beam.sdk.io.snowflake.SnowflakeIO
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index c8d0da9..fd35a21bd 100644
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -74,6 +74,7 @@
<li><a href="/documentation/io/built-in/hadoop/">Hadoop Input/Output Format IO</a></li>
<li><a href="/documentation/io/built-in/hcatalog/">HCatalog IO</a></li>
<li><a href="/documentation/io/built-in/google-bigquery/">Google BigQuery I/O connector</a></li>
+ <li><a href="/documentation/io/built-in/snowflake/">Snowflake I/O connector</a></li>
</ul>
</li>