Merge pull request #12480: [BEAM-10647] Fixes get_query_location bug in BigQueryWrapper

diff --git a/CHANGES.md b/CHANGES.md
index 66dc68c..c3b7959 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -70,6 +70,7 @@
 
 * Shared library for simplifying management of large shared objects added to Python SDK. Example use case is sharing a large TF model object across threads ([BEAM-10417](https://issues.apache.org/jira/browse/BEAM-10417)).
 * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)).
 * OnTimerContext should not create a new one when processing each element/timer in FnApiDoFnRunner ([BEAM-9839](https://issues.apache.org/jira/browse/BEAM-9839))
 
 ## Breaking Changes
diff --git a/build.gradle b/build.gradle
index 93ca237..7f93fe2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -247,7 +247,6 @@
 }
 
 task python2PostCommit() {
-  dependsOn ":sdks:python:test-suites:portable:py2:crossLanguagePythonJavaKafkaIOFlink"
   dependsOn ":sdks:python:test-suites:portable:py2:crossLanguageTests"
   dependsOn ":sdks:python:test-suites:dataflow:py2:postCommitIT"
   dependsOn ":sdks:python:test-suites:direct:py2:directRunnerIT"
@@ -275,7 +274,6 @@
 }
 
 task python38PostCommit() {
-  dependsOn ":sdks:python:test-suites:portable:py38:crossLanguagePythonJavaKafkaIOFlink"
   dependsOn ":sdks:python:test-suites:dataflow:py38:postCommitIT"
   dependsOn ":sdks:python:test-suites:direct:py38:postCommitIT"
   dependsOn ":sdks:python:test-suites:direct:py38:hdfsIntegrationTest"
diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index 0c91573..eaad35f 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -52,6 +52,7 @@
   compile project(path: ":sdks:java:core", configuration: "shadow")
   compile project(":sdks:java:extensions:google-cloud-platform-core")
   compile project(":sdks:java:io:google-cloud-platform")
+  compile project(":sdks:java:extensions:ml")
   compile library.java.avro
   compile library.java.bigdataoss_util
   compile library.java.google_api_client
@@ -60,6 +61,7 @@
   compile library.java.google_auth_library_credentials
   compile library.java.google_auth_library_oauth2_http
   compile library.java.google_cloud_datastore_v1_proto_client
+  compile library.java.google_code_gson
   compile library.java.google_http_client
   compile library.java.joda_time
   compile library.java.proto_google_cloud_datastore_v1
@@ -67,6 +69,7 @@
   compile library.java.slf4j_jdk14
   runtime project(path: ":runners:direct-java", configuration: "shadow")
   testCompile project(":sdks:java:io:google-cloud-platform")
+  testCompile project(":sdks:java:extensions:ml")
   testCompile library.java.hamcrest_core
   testCompile library.java.hamcrest_library
   testCompile library.java.junit
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 3393e6d..cb55475 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -22,6 +22,14 @@
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
+import com.google.cloud.language.v1.AnnotateTextRequest;
+import com.google.cloud.language.v1.AnnotateTextResponse;
+import com.google.cloud.language.v1.Document;
+import com.google.cloud.language.v1.Entity;
+import com.google.cloud.language.v1.Sentence;
+import com.google.cloud.language.v1.Token;
+import com.google.gson.Gson;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
@@ -30,12 +38,14 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.extensions.ml.AnnotateText;
 import org.apache.beam.sdk.io.Compression;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.GenerateSequence;
@@ -63,6 +73,7 @@
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.Watch;
@@ -984,4 +995,167 @@
       return result;
     }
   }
+
+  public static class NaturalLanguageIntegration {
+    private static final SerializableFunction<AnnotateTextResponse, List<Map<String, List<String>>>>
+        // [START NlpAnalyzeDependencyTree]
+        analyzeDependencyTree =
+            (SerializableFunction<AnnotateTextResponse, List<Map<String, List<String>>>>)
+                response -> {
+                  List<Map<String, List<String>>> adjacencyLists = new ArrayList<>();
+                  int index = 0;
+                  for (Sentence s : response.getSentencesList()) {
+                    Map<String, List<String>> adjacencyMap = new HashMap<>();
+                    int sentenceBegin = s.getText().getBeginOffset();
+                    int sentenceEnd = sentenceBegin + s.getText().getContent().length() - 1;
+                    while (index < response.getTokensCount()
+                        && response.getTokens(index).getText().getBeginOffset() <= sentenceEnd) {
+                      Token token = response.getTokensList().get(index);
+                      int headTokenIndex = token.getDependencyEdge().getHeadTokenIndex();
+                      String headTokenContent =
+                          response.getTokens(headTokenIndex).getText().getContent();
+                      List<String> adjacencyList =
+                          adjacencyMap.getOrDefault(headTokenContent, new ArrayList<>());
+                      adjacencyList.add(token.getText().getContent());
+                      adjacencyMap.put(headTokenContent, adjacencyList);
+                      index++;
+                    }
+                    adjacencyLists.add(adjacencyMap);
+                  }
+                  return adjacencyLists;
+                };
+    // [END NlpAnalyzeDependencyTree]
+
+    private static final SerializableFunction<? super AnnotateTextResponse, TextSentiments>
+        // [START NlpExtractSentiments]
+        extractSentiments =
+        (SerializableFunction<AnnotateTextResponse, TextSentiments>)
+            annotateTextResponse -> {
+              TextSentiments sentiments = new TextSentiments();
+              sentiments.setDocumentSentiment(
+                  annotateTextResponse.getDocumentSentiment().getMagnitude());
+              Map<String, Float> sentenceSentimentsMap =
+                  annotateTextResponse.getSentencesList().stream()
+                      .collect(
+                          Collectors.toMap(
+                              (Sentence s) -> s.getText().getContent(),
+                              (Sentence s) -> s.getSentiment().getMagnitude()));
+              sentiments.setSentenceSentiments(sentenceSentimentsMap);
+              return sentiments;
+            };
+    // [END NlpExtractSentiments]
+
+    private static final SerializableFunction<? super AnnotateTextResponse, Map<String, String>>
+        // [START NlpExtractEntities]
+        extractEntities =
+        (SerializableFunction<AnnotateTextResponse, Map<String, String>>)
+            annotateTextResponse ->
+                annotateTextResponse.getEntitiesList().stream()
+                    .collect(
+                        Collectors.toMap(Entity::getName, (Entity e) -> e.getType().toString()));
+    // [END NlpExtractEntities]
+
+    private static final SerializableFunction<? super Map<String, String>, String>
+        mapEntitiesToJson =
+            (SerializableFunction<Map<String, String>, String>)
+                item -> {
+                  StringBuilder builder = new StringBuilder("[");
+                  builder.append(
+                      item.entrySet().stream()
+                          .map(
+                              entry -> "{\"" + entry.getKey() + "\": \"" + entry.getValue() + "\"}")
+                          .collect(Collectors.joining(",")));
+                  builder.append("]");
+                  return builder.toString();
+                };
+
+    private static final SerializableFunction<List<Map<String, List<String>>>, String>
+        mapDependencyTreesToJson =
+            (SerializableFunction<List<Map<String, List<String>>>, String>)
+                tree -> {
+                  Gson gson = new Gson();
+                  return gson.toJson(tree);
+                };
+
+    public static void main(Pipeline p) {
+      // [START NlpAnalyzeText]
+      AnnotateTextRequest.Features features =
+          AnnotateTextRequest.Features.newBuilder()
+              .setExtractEntities(true)
+              .setExtractDocumentSentiment(true)
+              .setExtractEntitySentiment(true)
+              .setExtractSyntax(true)
+              .build();
+      AnnotateText annotateText = AnnotateText.newBuilder().setFeatures(features).build();
+
+      PCollection<AnnotateTextResponse> responses =
+          p.apply(
+                  Create.of(
+                      "My experience so far has been fantastic, "
+                          + "I\'d really recommend this product."))
+              .apply(
+                  MapElements.into(TypeDescriptor.of(Document.class))
+                      .via(
+                          (SerializableFunction<String, Document>)
+                              input ->
+                                  Document.newBuilder()
+                                      .setContent(input)
+                                      .setType(Document.Type.PLAIN_TEXT)
+                                      .build()))
+              .apply(annotateText);
+
+      responses
+          .apply(MapElements.into(TypeDescriptor.of(TextSentiments.class)).via(extractSentiments))
+          .apply(
+              MapElements.into(TypeDescriptors.strings())
+                  .via((SerializableFunction<TextSentiments, String>) TextSentiments::toJson))
+          .apply(TextIO.write().to("sentiments.txt"));
+
+      responses
+          .apply(
+              MapElements.into(
+                      TypeDescriptors.maps(TypeDescriptors.strings(), TypeDescriptors.strings()))
+                  .via(extractEntities))
+          .apply(MapElements.into(TypeDescriptors.strings()).via(mapEntitiesToJson))
+          .apply(TextIO.write().to("entities.txt"));
+
+      responses
+          .apply(
+              MapElements.into(
+                      TypeDescriptors.lists(
+                          TypeDescriptors.maps(
+                              TypeDescriptors.strings(),
+                              TypeDescriptors.lists(TypeDescriptors.strings()))))
+                  .via(analyzeDependencyTree))
+          .apply(MapElements.into(TypeDescriptors.strings()).via(mapDependencyTreesToJson))
+          .apply(TextIO.write().to("adjacency_list.txt"));
+      // [END NlpAnalyzeText]
+    }
+
+    private static class TextSentiments implements Serializable {
+      private Float documentSentiment;
+      private Map<String, Float> sentenceSentiments;
+
+      public void setSentenceSentiments(Map<String, Float> sentenceSentiments) {
+        this.sentenceSentiments = sentenceSentiments;
+      }
+
+      public Float getDocumentSentiment() {
+        return documentSentiment;
+      }
+
+      public void setDocumentSentiment(Float documentSentiment) {
+        this.documentSentiment = documentSentiment;
+      }
+
+      public Map<String, Float> getSentenceSentiments() {
+        return sentenceSentiments;
+      }
+
+      public String toJson() {
+        Gson gson = new Gson();
+        return gson.toJson(this);
+      }
+    }
+  }
 }
diff --git a/learning/katas/go/core_transforms/section-info.yaml b/learning/katas/go/core_transforms/section-info.yaml
index 32ae3aa..bcdab5d 100644
--- a/learning/katas/go/core_transforms/section-info.yaml
+++ b/learning/katas/go/core_transforms/section-info.yaml
@@ -28,3 +28,4 @@
 - additional_outputs
 - branching
 - composite
+- windowing
diff --git a/learning/katas/go/core_transforms/windowing/lesson-info.yaml b/learning/katas/go/core_transforms/windowing/lesson-info.yaml
new file mode 100644
index 0000000..2315b4c
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/lesson-info.yaml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+content:
+- windowing
diff --git a/learning/katas/go/core_transforms/windowing/lesson-remote-info.yaml b/learning/katas/go/core_transforms/windowing/lesson-remote-info.yaml
new file mode 100644
index 0000000..7a20a3a
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/lesson-remote-info.yaml
@@ -0,0 +1,3 @@
+id: 387853
+update_date: Thu, 06 Aug 2020 17:53:20 UTC
+unit: 377026
diff --git a/learning/katas/go/core_transforms/windowing/windowing/cmd/main.go b/learning/katas/go/core_transforms/windowing/windowing/cmd/main.go
new file mode 100644
index 0000000..e1cfcf0
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/cmd/main.go
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"beam.apache.org/learning/katas/core_transforms/windowing/windowing/pkg/common"
+	"beam.apache.org/learning/katas/core_transforms/windowing/windowing/pkg/task"
+	"context"
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/debug"
+)
+
+func main() {
+	ctx := context.Background()
+
+	p, s := beam.NewPipelineWithRoot()
+
+	input := common.CreateLines(s)
+
+	result := task.ApplyTransform(s, input)
+
+	output := beam.ParDo(s, func(commit task.Commit) string {
+		return commit.String()
+	}, result)
+
+	debug.Print(s, output)
+
+	err := beamx.Run(ctx, p)
+
+	if err != nil {
+		log.Exitf(context.Background(), "Failed to execute job: %v", err)
+	}
+}
diff --git a/learning/katas/go/core_transforms/windowing/windowing/pkg/common/input.go b/learning/katas/go/core_transforms/windowing/windowing/pkg/common/input.go
new file mode 100644
index 0000000..3353e2b
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/pkg/common/input.go
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package common
+
+import (
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"time"
+)
+
+var (
+	lines = map[time.Time]string{
+		time.Date(2020, 7, 31, 15, 52, 5, 0, time.UTC):  "3c6c45924a Remove trailing whitespace from README",
+		time.Date(2020, 7, 31, 15, 59, 40, 0, time.UTC): "a52be99b62 Merge pull request #12443 from KevinGG/whitespace",
+		time.Date(2020, 7, 31, 16, 7, 36, 0, time.UTC):  "7c1772d13f Merge pull request #12439 from ibzib/beam-9199-1",
+		time.Date(2020, 7, 31, 16, 35, 41, 0, time.UTC): "d971ba13b8 Widen ranges for GCP libraries (#12198)",
+		time.Date(2020, 8, 1, 0, 7, 25, 0, time.UTC):    "875620111b Enable all Jenkins jobs triggering for committers (#12407)",
+	}
+)
+
+func CreateLines(s beam.Scope) beam.PCollection {
+	return beam.ParDo(s, timestampFn, beam.Impulse(s))
+}
+
+func timestampFn(_ []byte, emit func(beam.EventTime, string)) {
+	for timestamp, line := range lines {
+		emit(mtime.FromTime(timestamp), line)
+	}
+}
diff --git a/learning/katas/go/core_transforms/windowing/windowing/pkg/task/task.go b/learning/katas/go/core_transforms/windowing/windowing/pkg/task/task.go
new file mode 100644
index 0000000..1838578
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/pkg/task/task.go
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package task
+
+import (
+	"fmt"
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+	"time"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+	windowed := beam.WindowInto(s, window.NewFixedWindows(time.Hour), input)
+	return beam.ParDo(s, timestampFn, windowed)
+}
+
+func timestampFn(iw beam.Window, et beam.EventTime, line string) Commit {
+	return Commit{
+		MaxTimestampWindow: toTime(iw.MaxTimestamp()),
+		EventTimestamp:     toTime(et),
+		Line:               line,
+	}
+}
+
+func toTime(et beam.EventTime) time.Time {
+	return time.Unix(0, et.Milliseconds() * int64(time.Millisecond))
+}
+
+type Commit struct {
+	MaxTimestampWindow time.Time
+	EventTimestamp     time.Time
+	Line               string
+}
+
+func (c Commit) String() string {
+	return fmt.Sprintf("Window ending at: %v contains timestamp: %v for commit: \"%s\"",
+		c.MaxTimestampWindow.Format(time.Kitchen),
+		c.EventTimestamp.Format(time.Kitchen),
+		c.Line)
+}
+
diff --git a/learning/katas/go/core_transforms/windowing/windowing/task-info.yaml b/learning/katas/go/core_transforms/windowing/windowing/task-info.yaml
new file mode 100644
index 0000000..b0e38f7
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/task-info.yaml
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+type: edu
+files:
+- name: test/task_test.go
+  visible: false
+- name: cmd/main.go
+  visible: true
+- name: pkg/common/input.go
+  visible: true
+- name: pkg/task/task.go
+  visible: true
+  placeholders:
+  - offset: 1030
+    length: 60
+    placeholder_text: TODO()
+  - offset: 1099
+    length: 36
+    placeholder_text: TODO()
+  - offset: 1156
+    length: 46
+    placeholder_text: TODO()
+  - offset: 1221
+    length: 121
+    placeholder_text: TODO()
diff --git a/learning/katas/go/core_transforms/windowing/windowing/task-remote-info.yaml b/learning/katas/go/core_transforms/windowing/windowing/task-remote-info.yaml
new file mode 100644
index 0000000..235e663
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/task-remote-info.yaml
@@ -0,0 +1,2 @@
+id: 1464828
+update_date: Thu, 06 Aug 2020 17:53:26 UTC
diff --git a/learning/katas/go/core_transforms/windowing/windowing/task.md b/learning/katas/go/core_transforms/windowing/windowing/task.md
new file mode 100644
index 0000000..22444a6
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/task.md
@@ -0,0 +1,77 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+# Windowing
+
+This lesson introduces the concept of windowed PCollection elements.  A window is a view into a fixed beginning and 
+fixed end to a set of data.  In the beam model, windowing subdivides a PCollection according to the
+timestamps of its individual elements.  An element can be a part of one or more windows.
+
+A DoFn can request timestamp and windowing information about the element it is processing.  All the previous lessons 
+had this information available as well.  This lesson makes use of these parameters.  The simple dataset 
+has five git commit messages and their timestamps from the 
+[Apache Beam public repository](https://github.com/apache/beam).  Timestamps have been applied to this PCollection
+input according to the date and time of these messages.
+
+**Kata:** This lesson challenges you to apply an hourly fixed window to a PCollection.  You are then to 
+apply a ParDo to that hourly fixed windowed PCollection to produce a PCollection of a Commit struct.  The
+Commit struct is provided for you.  You are encouraged to run the pipeline at cmd/main.go of this task 
+to visualize the windowing and timestamps.
+
+<div class="hint">
+    Use <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#ParDo">
+    beam.ParDo</a>
+    with a <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#hdr-DoFns">
+    DoFn</a> to accomplish this lesson.
+</div>
+
+<div class="hint">
+    Use <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#WindowInto">
+    beam.WindowInto</a>
+    with <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam/core/graph/window#NewFixedWindows">
+    window.NewFixedWindows(time.Hour)</a>
+    on your PCollection input to apply an hourly windowing strategy to each element.
+</div>
+
+<div class="hint">
+    To access <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#Window">
+    beam.Window</a>
+    and <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#EventTime">
+    beam.EventTime</a> in your DoFn, add the parameters in the set order.
+
+```
+func doFn(iw beam.Window, et beam.EventTime, element X) Y {
+    // do something with iw, et and element to return Y
+}
+```
+</div>
+
+<div class="hint">
+    The Commit struct provided for you has a MaxTimestampWindow property that can be set from
+    <a href="https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#Window">
+    beam.Window</a>'s MaxTimestamp().
+</div>
+
+<div class="hint">
+    Refer to the Beam Programming Guide for additional information about 
+    <a href="https://beam.apache.org/documentation/programming-guide/#other-dofn-parameters">
+    additional DoFn parameters</a> and
+    <a href="https://beam.apache.org/documentation/programming-guide/#windowing">
+    windowing</a>.
+</div>
+
diff --git a/learning/katas/go/core_transforms/windowing/windowing/test/task_test.go b/learning/katas/go/core_transforms/windowing/windowing/test/task_test.go
new file mode 100644
index 0000000..8d4a2e2
--- /dev/null
+++ b/learning/katas/go/core_transforms/windowing/windowing/test/task_test.go
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package test
+
+import (
+	"beam.apache.org/learning/katas/core_transforms/windowing/windowing/pkg/common"
+	"beam.apache.org/learning/katas/core_transforms/windowing/windowing/pkg/task"
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+	"github.com/google/go-cmp/cmp"
+	"testing"
+	"time"
+)
+
+func TestApplyTransform(t *testing.T) {
+	p, s := beam.NewPipelineWithRoot()
+	tests := []struct {
+		input beam.PCollection
+		want []interface{}
+	}{
+		{
+			input: common.CreateLines(s),
+			want: []interface{}{
+				task.Commit{
+					MaxTimestampWindow: time.Unix(1596211199, 0),
+					EventTimestamp:     time.Unix(1596210725, 0),
+					Line:               "3c6c45924a Remove trailing whitespace from README",
+				},
+				task.Commit{
+					MaxTimestampWindow: time.Unix(1596211199, 0),
+					EventTimestamp:     time.Unix(1596211180, 0),
+					Line:               "a52be99b62 Merge pull request #12443 from KevinGG/whitespace",
+				},
+				task.Commit{
+					MaxTimestampWindow: time.Unix(1596214799, 0),
+					EventTimestamp:     time.Unix(1596211656, 0),
+					Line:               "7c1772d13f Merge pull request #12439 from ibzib/beam-9199-1",
+				},
+				task.Commit{
+					MaxTimestampWindow: time.Unix(1596214799, 0),
+					EventTimestamp:     time.Unix(1596213341, 0),
+					Line:               "d971ba13b8 Widen ranges for GCP libraries (#12198)",
+				},
+				task.Commit{
+					MaxTimestampWindow: time.Unix(1596243599, 0),
+					EventTimestamp:     time.Unix(1596240445, 0),
+					Line:               "875620111b Enable all Jenkins jobs triggering for committers (#12407)",
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		got := task.ApplyTransform(s, tt.input)
+		cmp.Equal(got, tt.want)
+		if err := ptest.Run(p); err != nil {
+			t.Error(err)
+		}
+	}
+}
diff --git a/learning/katas/go/course-remote-info.yaml b/learning/katas/go/course-remote-info.yaml
index 90e7821..e944389 100644
--- a/learning/katas/go/course-remote-info.yaml
+++ b/learning/katas/go/course-remote-info.yaml
@@ -1,2 +1,2 @@
 id: 70387
-update_date: Mon, 27 Jul 2020 20:44:48 UTC
+update_date: Wed, 29 Jul 2020 20:42:36 UTC
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
new file mode 100644
index 0000000..ac724b3
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package schema contains utility functions for relating Go types and Beam Schemas.
+//
+// Not all Go types can be converted to schemas. This is Go is more expressive than
+// Beam schemas. Just as not all Go types can be serialized, similarly,
+// not all Beam Schemas will have a conversion to Go types, until the correct
+// mechanism exists in the SDK to handle them.
+//
+// While efforts will be made to have conversions be reversable, this will not
+// be possible in all instances. Eg. Go arrays as fields will be converted to
+// Beam Arrays, but a Beam Array type will map by default to a Go slice.
+package schema
+
+import (
+	"fmt"
+	"reflect"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+// FromType returns a Beam Schema of the passed in type.
+// Returns an error if the type cannot be converted to a Schema.
+func FromType(ot reflect.Type) (*pipepb.Schema, error) {
+	t := ot // keep the original type for errors.
+	// The top level schema for a pointer to struct and the struct is the same.
+	if t.Kind() == reflect.Ptr {
+		t = t.Elem()
+	}
+	if t.Kind() != reflect.Struct {
+		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
+	}
+	return structToSchema(t)
+}
+
+func structToSchema(t reflect.Type) (*pipepb.Schema, error) {
+	fields := make([]*pipepb.Field, 0, t.NumField())
+	for i := 0; i < t.NumField(); i++ {
+		f, err := structFieldToField(t.Field(i))
+		if err != nil {
+			return nil, errors.Wrapf(err, "cannot convert field %v to schema", t.Field(i).Name)
+		}
+		fields = append(fields, f)
+	}
+	return &pipepb.Schema{
+		Fields: fields,
+	}, nil
+}
+
+func structFieldToField(sf reflect.StructField) (*pipepb.Field, error) {
+	name := sf.Name
+	if tag := sf.Tag.Get("beam"); tag != "" {
+		name, _ = parseTag(tag)
+	}
+	ftype, err := reflectTypeToFieldType(sf.Type)
+	if err != nil {
+		return nil, err
+	}
+	return &pipepb.Field{
+		Name: name,
+		Type: ftype,
+	}, nil
+}
+
+func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) {
+	var isPtr bool
+	t := ot
+	if t.Kind() == reflect.Ptr {
+		isPtr = true
+		t = t.Elem()
+	}
+	switch t.Kind() {
+	case reflect.Map:
+		kt, err := reflectTypeToFieldType(t.Key())
+		if err != nil {
+			return nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot)
+		}
+		vt, err := reflectTypeToFieldType(t.Elem())
+		if err != nil {
+			return nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot)
+		}
+		return &pipepb.FieldType{
+			Nullable: isPtr,
+			TypeInfo: &pipepb.FieldType_MapType{
+				MapType: &pipepb.MapType{
+					KeyType:   kt,
+					ValueType: vt,
+				},
+			},
+		}, nil
+	case reflect.Struct:
+		sch, err := structToSchema(t)
+		if err != nil {
+			return nil, errors.Wrapf(err, "unable to convert %v to schema field", ot)
+		}
+		return &pipepb.FieldType{
+			Nullable: isPtr,
+			TypeInfo: &pipepb.FieldType_RowType{
+				RowType: &pipepb.RowType{
+					Schema: sch,
+				},
+			},
+		}, nil
+	case reflect.Slice, reflect.Array:
+		// Special handling for []byte
+		if t == reflectx.ByteSlice {
+			return &pipepb.FieldType{
+				Nullable: isPtr,
+				TypeInfo: &pipepb.FieldType_AtomicType{
+					AtomicType: pipepb.AtomicType_BYTES,
+				},
+			}, nil
+		}
+		vt, err := reflectTypeToFieldType(t.Elem())
+		if err != nil {
+			return nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot)
+		}
+		return &pipepb.FieldType{
+			Nullable: isPtr,
+			TypeInfo: &pipepb.FieldType_ArrayType{
+				ArrayType: &pipepb.ArrayType{
+					ElementType: vt,
+				},
+			},
+		}, nil
+	case reflect.Interface, reflect.Chan, reflect.UnsafePointer, reflect.Complex128, reflect.Complex64:
+		return nil, errors.Errorf("unable to convert unsupported type %v to schema", ot)
+	default: // must be an atomic type
+		if enum, ok := reflectTypeToAtomicTypeMap[t.Kind()]; ok {
+			return &pipepb.FieldType{
+				Nullable: isPtr,
+				TypeInfo: &pipepb.FieldType_AtomicType{
+					AtomicType: enum,
+				},
+			}, nil
+		}
+		return nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t)
+	}
+}
+
+var reflectTypeToAtomicTypeMap = map[reflect.Kind]pipepb.AtomicType{
+	reflect.Uint8:   pipepb.AtomicType_BYTE,
+	reflect.Int16:   pipepb.AtomicType_INT16,
+	reflect.Int32:   pipepb.AtomicType_INT32,
+	reflect.Int64:   pipepb.AtomicType_INT64,
+	reflect.Int:     pipepb.AtomicType_INT64,
+	reflect.Float32: pipepb.AtomicType_FLOAT,
+	reflect.Float64: pipepb.AtomicType_DOUBLE,
+	reflect.String:  pipepb.AtomicType_STRING,
+	reflect.Bool:    pipepb.AtomicType_BOOLEAN,
+}
+
+// ToType returns a Go type of the passed in Schema.
+// Types returned by ToType are always of Struct kind.
+// Returns an error if the Schema cannot be converted to a type.
+func ToType(s *pipepb.Schema) (reflect.Type, error) {
+	fields := make([]reflect.StructField, 0, len(s.GetFields()))
+	for _, sf := range s.GetFields() {
+		rf, err := fieldToStructField(sf)
+		if err != nil {
+			return nil, errors.Wrapf(err, "cannot convert schema field %v to field", sf.GetName())
+		}
+		fields = append(fields, rf)
+	}
+	return reflect.StructOf(fields), nil
+}
+
+func fieldToStructField(sf *pipepb.Field) (reflect.StructField, error) {
+	name := sf.GetName()
+	rt, err := fieldTypeToReflectType(sf.GetType())
+	if err != nil {
+		return reflect.StructField{}, err
+	}
+	return reflect.StructField{
+		Name: strings.ToUpper(name[:1]) + name[1:], // Go field name must be capitalized for export and encoding.
+		Type: rt,
+		Tag:  reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name)),
+	}, nil
+}
+
+var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{
+	pipepb.AtomicType_BYTE:    reflectx.Uint8,
+	pipepb.AtomicType_INT16:   reflectx.Int16,
+	pipepb.AtomicType_INT32:   reflectx.Int32,
+	pipepb.AtomicType_INT64:   reflectx.Int64,
+	pipepb.AtomicType_FLOAT:   reflectx.Float32,
+	pipepb.AtomicType_DOUBLE:  reflectx.Float64,
+	pipepb.AtomicType_STRING:  reflectx.String,
+	pipepb.AtomicType_BOOLEAN: reflectx.Bool,
+	pipepb.AtomicType_BYTES:   reflectx.ByteSlice,
+}
+
+func fieldTypeToReflectType(sft *pipepb.FieldType) (reflect.Type, error) {
+	var t reflect.Type
+	switch sft.GetTypeInfo().(type) {
+	case *pipepb.FieldType_AtomicType:
+		var ok bool
+		if t, ok = atomicTypeToReflectType[sft.GetAtomicType()]; !ok {
+			return nil, errors.Errorf("unknown atomic type: %v", sft.GetAtomicType())
+		}
+	case *pipepb.FieldType_ArrayType:
+		rt, err := fieldTypeToReflectType(sft.GetArrayType().GetElementType())
+		if err != nil {
+			return nil, errors.Wrap(err, "unable to convert array element type")
+		}
+		t = reflect.SliceOf(rt)
+	case *pipepb.FieldType_MapType:
+		kt, err := fieldTypeToReflectType(sft.GetMapType().GetKeyType())
+		if err != nil {
+			return nil, errors.Wrap(err, "unable to convert map key type")
+		}
+		vt, err := fieldTypeToReflectType(sft.GetMapType().GetValueType())
+		if err != nil {
+			return nil, errors.Wrap(err, "unable to convert map value type")
+		}
+		t = reflect.MapOf(kt, vt) // Panics for invalid map keys (slices/iterables)
+	case *pipepb.FieldType_RowType:
+		rt, err := ToType(sft.GetRowType().GetSchema())
+		if err != nil {
+			return nil, errors.Wrapf(err, "unable to convert row type: %v", sft.GetRowType().GetSchema().GetId())
+		}
+		t = rt
+	// case *pipepb.FieldType_IterableType:
+	// TODO(BEAM-9615): handle IterableTypes.
+
+	// case *pipepb.FieldType_LogicalType:
+	// TODO(BEAM-9615): handle LogicalTypes types.
+
+	// Logical Types are for things that have more specialized user representation already, or
+	// things like Time or protocol buffers.
+	// They would be encoded with the schema encoding.
+
+	default:
+		return nil, errors.Errorf("unknown fieldtype: %T", sft.GetTypeInfo())
+	}
+	if sft.GetNullable() {
+		return reflect.PtrTo(t), nil
+	}
+	return t, nil
+}
+
+// parseTag splits a struct field's beam tag into its name and
+// comma-separated options.
+func parseTag(tag string) (string, options) {
+	if idx := strings.Index(tag, ","); idx != -1 {
+		return tag[:idx], options(tag[idx+1:])
+	}
+	return tag, options("")
+}
+
+type options string
+
+// TODO(BEAM-9615): implement looking up specific options from the tags.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
new file mode 100644
index 0000000..f6b5ff7
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schema
+
+import (
+	"fmt"
+	"reflect"
+	"testing"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/google/go-cmp/cmp"
+)
+
+func TestSchemaConversion(t *testing.T) {
+	tests := []struct {
+		st *pipepb.Schema
+		rt reflect.Type
+	}{
+		{
+			st: &pipepb.Schema{
+				Fields: []*pipepb.Field{
+					&pipepb.Field{
+						Name: "firstField",
+						Type: &pipepb.FieldType{
+							TypeInfo: &pipepb.FieldType_AtomicType{
+								AtomicType: pipepb.AtomicType_INT32,
+							},
+						},
+					},
+				},
+			},
+			rt: reflect.TypeOf(struct {
+				FirstField int32 `beam:"firstField"`
+			}{}),
+		}, {
+			st: &pipepb.Schema{
+				Fields: []*pipepb.Field{
+					&pipepb.Field{
+						Name: "stringField",
+						Type: &pipepb.FieldType{
+							TypeInfo: &pipepb.FieldType_AtomicType{
+								AtomicType: pipepb.AtomicType_STRING,
+							},
+						},
+					},
+					&pipepb.Field{
+						Name: "intPtrField",
+						Type: &pipepb.FieldType{
+							Nullable: true,
+							TypeInfo: &pipepb.FieldType_AtomicType{
+								AtomicType: pipepb.AtomicType_INT32,
+							},
+						},
+					},
+				},
+			},
+			rt: reflect.TypeOf(struct {
+				StringField string `beam:"stringField"`
+				IntPtrField *int32 `beam:"intPtrField"`
+			}{}),
+		}, {
+			st: &pipepb.Schema{
+				Fields: []*pipepb.Field{
+					&pipepb.Field{
+						Name: "cypher",
+						Type: &pipepb.FieldType{
+							TypeInfo: &pipepb.FieldType_MapType{
+								MapType: &pipepb.MapType{
+									KeyType: &pipepb.FieldType{
+										TypeInfo: &pipepb.FieldType_AtomicType{
+											AtomicType: pipepb.AtomicType_BOOLEAN,
+										},
+									},
+									ValueType: &pipepb.FieldType{
+										TypeInfo: &pipepb.FieldType_AtomicType{
+											AtomicType: pipepb.AtomicType_FLOAT,
+										},
+									},
+								},
+							},
+						},
+					},
+				},
+			},
+			rt: reflect.TypeOf(struct {
+				Cypher map[bool]float32 `beam:"cypher"`
+			}{}),
+		}, {
+			st: &pipepb.Schema{
+				Fields: []*pipepb.Field{
+					&pipepb.Field{
+						Name: "wrapper",
+						Type: &pipepb.FieldType{
+							TypeInfo: &pipepb.FieldType_RowType{
+								RowType: &pipepb.RowType{
+									Schema: &pipepb.Schema{
+										Fields: []*pipepb.Field{{
+											Name: "threshold",
+											Type: &pipepb.FieldType{
+												TypeInfo: &pipepb.FieldType_AtomicType{
+													AtomicType: pipepb.AtomicType_DOUBLE,
+												},
+											},
+										},
+										},
+									},
+								},
+							},
+						},
+					},
+				},
+			},
+			rt: reflect.TypeOf(struct {
+				Wrapper struct {
+					Threshold float64 `beam:"threshold"`
+				} `beam:"wrapper"`
+			}{}),
+		}, {
+			st: &pipepb.Schema{
+				Fields: []*pipepb.Field{
+					&pipepb.Field{
+						Name: "payloads",
+						Type: &pipepb.FieldType{
+							TypeInfo: &pipepb.FieldType_ArrayType{
+								ArrayType: &pipepb.ArrayType{
+									ElementType: &pipepb.FieldType{
+										TypeInfo: &pipepb.FieldType_AtomicType{
+											AtomicType: pipepb.AtomicType_BYTES,
+										},
+									},
+								},
+							},
+						},
+					},
+				},
+			},
+			rt: reflect.TypeOf(struct {
+				Payloads [][]byte `beam:"payloads"`
+			}{}),
+		},
+	}
+
+	for _, test := range tests {
+		test := test
+		t.Run(fmt.Sprintf("%v", test.rt), func(t *testing.T) {
+			{
+				got, err := ToType(test.st)
+				if err != nil {
+					t.Fatalf("error ToType(%v) = %v", test.st, err)
+				}
+
+				if d := cmp.Diff(reflect.New(test.rt).Elem().Interface(), reflect.New(got).Elem().Interface()); d != "" {
+					t.Errorf("diff (-want, +got): %v", d)
+				}
+			}
+			{
+				got, err := FromType(test.rt)
+				if err != nil {
+					t.Fatalf("error FromType(%v) = %v", test.rt, err)
+				}
+
+				if d := cmp.Diff(test.st, got); d != "" {
+					t.Errorf("diff (-want, +got): %v", d)
+				}
+
+			}
+		})
+	}
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
new file mode 100644
index 0000000..148c726
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A datetime without a time-zone.
+ *
+ * <p>It cannot represent an instant on the time-line without additional information such as an
+ * offset or time-zone.
+ *
+ * <p>Its input type is a {@link LocalDateTime}, and base type is a {@link Row} containing Date
+ * field and Time field. Date field is the same as the base type of {@link Date}, which is a Long
+ * that represents incrementing count of days where day 0 is 1970-01-01 (ISO). Time field is the
+ * same as the base type of {@link Time}, which is a Long that represents a count of time in
+ * nanoseconds.
+ */
+public class DateTime implements Schema.LogicalType<LocalDateTime, Row> {
+  public static final String DATE_FIELD_NAME = "Date";
+  public static final String TIME_FIELD_NAME = "Time";
+  public static final Schema DATETIME_SCHEMA =
+      Schema.builder().addInt64Field(DATE_FIELD_NAME).addInt64Field(TIME_FIELD_NAME).build();
+
+  @Override
+  public String getIdentifier() {
+    return "beam:logical_type:datetime:v1";
+  }
+
+  // unused
+  @Override
+  public Schema.FieldType getArgumentType() {
+    return Schema.FieldType.STRING;
+  }
+
+  // unused
+  @Override
+  public String getArgument() {
+    return "";
+  }
+
+  @Override
+  public Schema.FieldType getBaseType() {
+    return Schema.FieldType.row(DATETIME_SCHEMA);
+  }
+
+  @Override
+  public Row toBaseType(LocalDateTime input) {
+    return input == null
+        ? null
+        : Row.withSchema(DATETIME_SCHEMA)
+            .addValues(input.toLocalDate().toEpochDay(), input.toLocalTime().toNanoOfDay())
+            .build();
+  }
+
+  @Override
+  public LocalDateTime toInputType(Row base) {
+    return base == null
+        ? null
+        : LocalDateTime.of(
+            LocalDate.ofEpochDay(base.getInt64(DATE_FIELD_NAME)),
+            LocalTime.ofNanoOfDay(base.getInt64(TIME_FIELD_NAME)));
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
index d22b77a..ef6a68a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
@@ -18,8 +18,10 @@
 package org.apache.beam.sdk.schemas.logicaltypes;
 
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.values.Row;
 
 /** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType}s corresponding to SQL data types. */
 public class SqlTypes {
@@ -31,4 +33,7 @@
 
   /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL TIME type. */
   public static final LogicalType<LocalTime, Long> TIME = new Time();
+
+  /** Beam LogicalType corresponding to ZetaSQL DATETIME type. */
+  public static final LogicalType<LocalDateTime, Row> DATETIME = new DateTime();
 }
diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
index 4604a00..42efbbd 100644
--- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
+++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
@@ -39,7 +39,7 @@
           .put("BOOL", FieldType.BOOLEAN)
           .put("BYTES", FieldType.BYTES)
           .put("DATE", FieldType.logicalType(SqlTypes.DATE))
-          .put("DATETIME", FieldType.DATETIME)
+          .put("DATETIME", FieldType.logicalType(SqlTypes.DATETIME))
           .put("DOUBLE", FieldType.DOUBLE)
           .put("FLOAT", FieldType.DOUBLE)
           .put("FLOAT64", FieldType.DOUBLE)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java
index 3ef4d9f..b4a9d7e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java
@@ -124,7 +124,7 @@
 
   @Override
   public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
-    return CalciteUtils.sqlTypeWithAutoCast(typeFactory, method.getReturnType());
+    return CalciteUtils.sqlTypeWithAutoCast(typeFactory, method.getGenericReturnType());
   }
 
   @Override
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
index 1d234b0..4452422 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
@@ -48,6 +48,8 @@
     switch (typeName) {
       case TIME:
         return 6; // support microsecond time precision
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return 6; // support microsecond datetime precision
       default:
         return super.getMaxPrecision(typeName);
     }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index b6d606b..d9ad401 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -27,6 +27,7 @@
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.AbstractList;
 import java.util.AbstractMap;
@@ -35,13 +36,14 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimestampWithLocalTzType;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -92,6 +94,7 @@
 public class BeamCalcRel extends AbstractBeamCalcRel {
 
   private static final long NANOS_PER_MILLISECOND = 1000000L;
+  private static final long MILLIS_PER_DAY = 86400000L;
 
   private static final ParameterExpression outputSchemaParam =
       Expressions.parameter(Schema.class, "outputSchema");
@@ -344,6 +347,18 @@
         valueDateTime = Expressions.unbox(valueDateTime);
       }
       valueDateTime = Expressions.call(LocalDate.class, "ofEpochDay", valueDateTime);
+    } else if (CalciteUtils.TIMESTAMP_WITH_LOCAL_TZ.typesEqual(toType)
+        || CalciteUtils.NULLABLE_TIMESTAMP_WITH_LOCAL_TZ.typesEqual(toType)) {
+      // Convert TimeStamp_With_Local_TimeZone to LocalDateTime
+      Expression dateValue =
+          Expressions.divide(valueDateTime, Expressions.constant(MILLIS_PER_DAY));
+      Expression date = Expressions.call(LocalDate.class, "ofEpochDay", dateValue);
+      Expression timeValue =
+          Expressions.multiply(
+              Expressions.modulo(valueDateTime, Expressions.constant(MILLIS_PER_DAY)),
+              Expressions.constant(NANOS_PER_MILLISECOND));
+      Expression time = Expressions.call(LocalTime.class, "ofNanoOfDay", timeValue);
+      valueDateTime = Expressions.call(LocalDateTime.class, "of", date, time);
     } else {
       throw new UnsupportedOperationException("Unknown DateTime type " + toType);
     }
@@ -385,7 +400,7 @@
             .put(SqlTypes.DATE.getIdentifier(), Long.class)
             .put(SqlTypes.TIME.getIdentifier(), Long.class)
             .put(TimeWithLocalTzType.IDENTIFIER, ReadableInstant.class)
-            .put(TimestampWithLocalTzType.IDENTIFIER, ReadableInstant.class)
+            .put(SqlTypes.DATETIME.getIdentifier(), Row.class)
             .put(CharType.IDENTIFIER, String.class)
             .build();
 
@@ -442,6 +457,16 @@
               value, Expressions.divide(value, Expressions.constant(NANOS_PER_MILLISECOND)));
         } else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
           return value;
+        } else if (SqlTypes.DATETIME.getIdentifier().equals(logicalId)) {
+          Expression dateValue =
+              Expressions.call(value, "getInt64", Expressions.constant(DateTime.DATE_FIELD_NAME));
+          Expression timeValue =
+              Expressions.call(value, "getInt64", Expressions.constant(DateTime.TIME_FIELD_NAME));
+          Expression returnValue =
+              Expressions.add(
+                  Expressions.multiply(dateValue, Expressions.constant(MILLIS_PER_DAY)),
+                  Expressions.divide(timeValue, Expressions.constant(NANOS_PER_MILLISECOND)));
+          return nullOr(value, returnValue);
         } else if (!CharType.IDENTIFIER.equals(logicalId)) {
           throw new UnsupportedOperationException(
               "Unknown LogicalType " + type.getLogicalType().getIdentifier());
@@ -563,6 +588,8 @@
           || name.equals(DataContext.Variable.CURRENT_TIMESTAMP.camelName)
           || name.equals(DataContext.Variable.LOCAL_TIMESTAMP.camelName)) {
         return System.currentTimeMillis();
+      } else if (name.equals(Variable.TIME_ZONE.camelName)) {
+        return TimeZone.getDefault();
       }
       return null;
     }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index acb4ee1..6ded492 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.utils;
 
+import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.Date;
 import java.util.Map;
@@ -53,15 +54,6 @@
     }
   }
 
-  /** A LogicalType corresponding to TIMESTAMP_WITH_LOCAL_TIME_ZONE. */
-  public static class TimestampWithLocalTzType extends PassThroughLogicalType<Instant> {
-    public static final String IDENTIFIER = "SqlTimestampWithLocalTzType";
-
-    public TimestampWithLocalTzType() {
-      super(IDENTIFIER, FieldType.STRING, "", FieldType.DATETIME);
-    }
-  }
-
   /** A LogicalType corresponding to CHAR. */
   public static class CharType extends PassThroughLogicalType<String> {
     public static final String IDENTIFIER = "SqlCharType";
@@ -82,7 +74,7 @@
       return logicalId.equals(SqlTypes.DATE.getIdentifier())
           || logicalId.equals(SqlTypes.TIME.getIdentifier())
           || logicalId.equals(TimeWithLocalTzType.IDENTIFIER)
-          || logicalId.equals(TimestampWithLocalTzType.IDENTIFIER);
+          || logicalId.equals(SqlTypes.DATETIME.getIdentifier());
     }
     return false;
   }
@@ -121,8 +113,9 @@
       FieldType.logicalType(new TimeWithLocalTzType());
   public static final FieldType TIMESTAMP = FieldType.DATETIME;
   public static final FieldType NULLABLE_TIMESTAMP = FieldType.DATETIME.withNullable(true);
-  public static final FieldType TIMESTAMP_WITH_LOCAL_TZ =
-      FieldType.logicalType(new TimestampWithLocalTzType());
+  public static final FieldType TIMESTAMP_WITH_LOCAL_TZ = FieldType.logicalType(SqlTypes.DATETIME);
+  public static final FieldType NULLABLE_TIMESTAMP_WITH_LOCAL_TZ =
+      FieldType.logicalType(SqlTypes.DATETIME).withNullable(true);
 
   private static final BiMap<FieldType, SqlTypeName> BEAM_TO_CALCITE_TYPE_MAPPING =
       ImmutableBiMap.<FieldType, SqlTypeName>builder()
@@ -283,18 +276,26 @@
 
   /**
    * SQL-Java type mapping, with specified Beam rules: <br>
-   * 1. redirect {@link AbstractInstant} to {@link Date} so Calcite can recognize it.
+   * 1. redirect {@link AbstractInstant} to {@link Date} so Calcite can recognize it. <br>
+   * 2. For a list, the component type is needed to create a Sql array type.
    *
-   * @param rawType
-   * @return
+   * @param type
+   * @return Calcite RelDataType
    */
-  public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Type rawType) {
+  public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Type type) {
     // For Joda time types, return SQL type for java.util.Date.
-    if (rawType instanceof Class && AbstractInstant.class.isAssignableFrom((Class<?>) rawType)) {
+    if (type instanceof Class && AbstractInstant.class.isAssignableFrom((Class<?>) type)) {
       return typeFactory.createJavaType(Date.class);
-    } else if (rawType instanceof Class && ByteString.class.isAssignableFrom((Class<?>) rawType)) {
+    } else if (type instanceof Class && ByteString.class.isAssignableFrom((Class<?>) type)) {
       return typeFactory.createJavaType(byte[].class);
+    } else if (type instanceof ParameterizedType
+        && java.util.List.class.isAssignableFrom(
+            (Class<?>) ((ParameterizedType) type).getRawType())) {
+      ParameterizedType parameterizedType = (ParameterizedType) type;
+      Class<?> genericType = (Class<?>) parameterizedType.getActualTypeArguments()[0];
+      RelDataType collectionElementType = typeFactory.createJavaType(genericType);
+      return typeFactory.createArrayType(collectionElementType, UNLIMITED_ARRAY_SIZE);
     }
-    return typeFactory.createJavaType((Class) rawType);
+    return typeFactory.createJavaType((Class) type);
   }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
index 7aa5032..f107dc3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
@@ -38,6 +38,7 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.BitString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 public class BeamSqlUnparseContext extends SqlImplementor.SimpleContext {
@@ -69,8 +70,12 @@
   public SqlNode toSql(RexProgram program, RexNode rex) {
     if (rex.getKind().equals(SqlKind.LITERAL)) {
       final RexLiteral literal = (RexLiteral) rex;
-      SqlTypeFamily family = literal.getTypeName().getFamily();
-      if (SqlTypeFamily.BINARY.equals(family)) {
+      SqlTypeName name = literal.getTypeName();
+      SqlTypeFamily family = name.getFamily();
+      if (SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.equals(name)) {
+        TimestampString timestampString = literal.getValueAs(TimestampString.class);
+        return new SqlDateTimeLiteral(timestampString, POS);
+      } else if (SqlTypeFamily.BINARY.equals(family)) {
         ByteString byteString = literal.getValueAs(ByteString.class);
         BitString bitString = BitString.createFromHexString(byteString.toString(16));
         return new SqlByteStringLiteral(bitString, POS);
@@ -92,6 +97,21 @@
     return super.toSql(program, rex);
   }
 
+  private static class SqlDateTimeLiteral extends SqlLiteral {
+
+    private final TimestampString timestampString;
+
+    SqlDateTimeLiteral(TimestampString timestampString, SqlParserPos pos) {
+      super(timestampString, SqlTypeName.TIMESTAMP, pos);
+      this.timestampString = timestampString;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+      writer.literal("DATETIME '" + timestampString.toString() + "'");
+    }
+  }
+
   private static class SqlByteStringLiteral extends SqlLiteral {
 
     SqlByteStringLiteral(BitString bytes, SqlParserPos pos) {
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
index 1b1641f..0519798 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql;
 
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -368,22 +369,135 @@
   }
 
   @Test
-  public void testNullDatetimeFields() {
+  public void testDatetimeFields() {
     Instant current = new Instant(1561671380000L); // Long value corresponds to 27/06/2019
 
     Schema dateTimeFieldSchema =
         Schema.builder()
             .addField("dateTimeField", FieldType.DATETIME)
             .addNullableField("nullableDateTimeField", FieldType.DATETIME)
-            .addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME))
-            .addNullableField("nullableTimeTypeField", FieldType.logicalType(SqlTypes.TIME))
+            .build();
+
+    Row dateTimeRow = Row.withSchema(dateTimeFieldSchema).addValues(current, null).build();
+
+    PCollection<Row> outputRow =
+        pipeline
+            .apply(Create.of(dateTimeRow))
+            .setRowSchema(dateTimeFieldSchema)
+            .apply(
+                SqlTransform.query(
+                    "select EXTRACT(YEAR from dateTimeField) as yyyy, "
+                        + " EXTRACT(YEAR from nullableDateTimeField) as year_with_null, "
+                        + " EXTRACT(MONTH from dateTimeField) as mm, "
+                        + " EXTRACT(MONTH from nullableDateTimeField) as month_with_null "
+                        + " from PCOLLECTION"));
+
+    Schema outputRowSchema =
+        Schema.builder()
+            .addField("yyyy", FieldType.INT64)
+            .addNullableField("year_with_null", FieldType.INT64)
+            .addField("mm", FieldType.INT64)
+            .addNullableField("month_with_null", FieldType.INT64)
+            .build();
+
+    PAssert.that(outputRow)
+        .containsInAnyOrder(
+            Row.withSchema(outputRowSchema).addValues(2019L, null, 06L, null).build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSqlLogicalTypeDateFields() {
+    Schema dateTimeFieldSchema =
+        Schema.builder()
             .addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE))
             .addNullableField("nullableDateTypeField", FieldType.logicalType(SqlTypes.DATE))
             .build();
 
+    Row dateRow =
+        Row.withSchema(dateTimeFieldSchema).addValues(LocalDate.of(2019, 6, 27), null).build();
+
+    PCollection<Row> outputRow =
+        pipeline
+            .apply(Create.of(dateRow))
+            .setRowSchema(dateTimeFieldSchema)
+            .apply(
+                SqlTransform.query(
+                    "select EXTRACT(DAY from dateTypeField) as dd, "
+                        + " EXTRACT(DAY from nullableDateTypeField) as day_with_null, "
+                        + " dateTypeField + interval '1' day as date_with_day_added, "
+                        + " nullableDateTypeField + interval '1' day as day_added_with_null "
+                        + " from PCOLLECTION"));
+
+    Schema outputRowSchema =
+        Schema.builder()
+            .addField("dd", FieldType.INT64)
+            .addNullableField("day_with_null", FieldType.INT64)
+            .addField("date_with_day_added", FieldType.logicalType(SqlTypes.DATE))
+            .addNullableField("day_added_with_null", FieldType.logicalType(SqlTypes.DATE))
+            .build();
+
+    PAssert.that(outputRow)
+        .containsInAnyOrder(
+            Row.withSchema(outputRowSchema)
+                .addValues(27L, null, LocalDate.of(2019, 6, 28), null)
+                .build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSqlLogicalTypeTimeFields() {
+    Schema dateTimeFieldSchema =
+        Schema.builder()
+            .addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME))
+            .addNullableField("nullableTimeTypeField", FieldType.logicalType(SqlTypes.TIME))
+            .build();
+
+    Row timeRow =
+        Row.withSchema(dateTimeFieldSchema).addValues(LocalTime.of(1, 0, 0), null).build();
+
+    PCollection<Row> outputRow =
+        pipeline
+            .apply(Create.of(timeRow))
+            .setRowSchema(dateTimeFieldSchema)
+            .apply(
+                SqlTransform.query(
+                    "select timeTypeField + interval '1' hour as time_with_hour_added, "
+                        + " nullableTimeTypeField + interval '1' hour as hour_added_with_null, "
+                        + " timeTypeField - INTERVAL '60' SECOND as time_with_seconds_added, "
+                        + " nullableTimeTypeField - INTERVAL '60' SECOND as seconds_added_with_null "
+                        + " from PCOLLECTION"));
+
+    Schema outputRowSchema =
+        Schema.builder()
+            .addField("time_with_hour_added", FieldType.logicalType(SqlTypes.TIME))
+            .addNullableField("hour_added_with_null", FieldType.logicalType(SqlTypes.TIME))
+            .addField("time_with_seconds_added", FieldType.logicalType(SqlTypes.TIME))
+            .addNullableField("seconds_added_with_null", FieldType.logicalType(SqlTypes.TIME))
+            .build();
+
+    PAssert.that(outputRow)
+        .containsInAnyOrder(
+            Row.withSchema(outputRowSchema)
+                .addValues(LocalTime.of(2, 0, 0), null, LocalTime.of(0, 59, 0), null)
+                .build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSqlLogicalTypeDatetimeFields() {
+    Schema dateTimeFieldSchema =
+        Schema.builder()
+            .addField("dateTimeField", FieldType.logicalType(SqlTypes.DATETIME))
+            .addNullableField("nullableDateTimeField", FieldType.logicalType(SqlTypes.DATETIME))
+            .build();
+
     Row dateTimeRow =
         Row.withSchema(dateTimeFieldSchema)
-            .addValues(current, null, LocalTime.of(1, 0, 0), null, LocalDate.of(2019, 6, 27), null)
+            .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0), null)
             .build();
 
     PCollection<Row> outputRow =
@@ -396,14 +510,14 @@
                         + " EXTRACT(YEAR from nullableDateTimeField) as year_with_null, "
                         + " EXTRACT(MONTH from dateTimeField) as mm, "
                         + " EXTRACT(MONTH from nullableDateTimeField) as month_with_null, "
-                        + " timeTypeField + interval '1' hour as time_with_hour_added, "
-                        + " nullableTimeTypeField + interval '1' hour as hour_added_with_null, "
-                        + " timeTypeField - INTERVAL '60' SECOND as time_with_seconds_added, "
-                        + " nullableTimeTypeField - INTERVAL '60' SECOND as seconds_added_with_null, "
-                        + " EXTRACT(DAY from dateTypeField) as dd, "
-                        + " EXTRACT(DAY from nullableDateTypeField) as day_with_null, "
-                        + " dateTypeField + interval '1' day as date_with_day_added, "
-                        + " nullableDateTypeField + interval '1' day as day_added_with_null "
+                        + " dateTimeField + interval '1' hour as time_with_hour_added, "
+                        + " nullableDateTimeField + interval '1' hour as hour_added_with_null, "
+                        + " dateTimeField - INTERVAL '60' SECOND as time_with_seconds_added, "
+                        + " nullableDateTimeField - INTERVAL '60' SECOND as seconds_added_with_null, "
+                        + " EXTRACT(DAY from dateTimeField) as dd, "
+                        + " EXTRACT(DAY from nullableDateTimeField) as day_with_null, "
+                        + " dateTimeField + interval '1' day as date_with_day_added, "
+                        + " nullableDateTimeField + interval '1' day as day_added_with_null "
                         + " from PCOLLECTION"));
 
     Schema outputRowSchema =
@@ -412,31 +526,31 @@
             .addNullableField("year_with_null", FieldType.INT64)
             .addField("mm", FieldType.INT64)
             .addNullableField("month_with_null", FieldType.INT64)
-            .addField("time_with_hour_added", FieldType.logicalType(SqlTypes.TIME))
-            .addNullableField("hour_added_with_null", FieldType.logicalType(SqlTypes.TIME))
-            .addField("time_with_seconds_added", FieldType.logicalType(SqlTypes.TIME))
-            .addNullableField("seconds_added_with_null", FieldType.logicalType(SqlTypes.TIME))
+            .addField("time_with_hour_added", FieldType.logicalType(SqlTypes.DATETIME))
+            .addNullableField("hour_added_with_null", FieldType.logicalType(SqlTypes.DATETIME))
+            .addField("time_with_seconds_added", FieldType.logicalType(SqlTypes.DATETIME))
+            .addNullableField("seconds_added_with_null", FieldType.logicalType(SqlTypes.DATETIME))
             .addField("dd", FieldType.INT64)
             .addNullableField("day_with_null", FieldType.INT64)
-            .addField("date_with_day_added", FieldType.logicalType(SqlTypes.DATE))
-            .addNullableField("day_added_with_null", FieldType.logicalType(SqlTypes.DATE))
+            .addField("date_with_day_added", FieldType.logicalType(SqlTypes.DATETIME))
+            .addNullableField("day_added_with_null", FieldType.logicalType(SqlTypes.DATETIME))
             .build();
 
     PAssert.that(outputRow)
         .containsInAnyOrder(
             Row.withSchema(outputRowSchema)
                 .addValues(
-                    2019L,
+                    2008L,
                     null,
-                    06L,
+                    12L,
                     null,
-                    LocalTime.of(2, 0, 0),
+                    LocalDateTime.of(2008, 12, 25, 16, 30, 0),
                     null,
-                    LocalTime.of(0, 59, 0),
+                    LocalDateTime.of(2008, 12, 25, 15, 29, 0),
                     null,
-                    27L,
+                    25L,
                     null,
-                    LocalDate.of(2019, 6, 28),
+                    LocalDateTime.of(2008, 12, 26, 15, 30, 0),
                     null)
                 .build());
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 75e8a08..c2afc5d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -23,6 +23,7 @@
 
 import com.google.auto.service.AutoService;
 import java.sql.Timestamp;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.stream.IntStream;
 import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
@@ -30,6 +31,7 @@
 import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -101,6 +103,29 @@
     pipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testListUdf() throws Exception {
+    Schema resultType1 = Schema.builder().addArrayField("array_field", FieldType.INT64).build();
+    Row row1 = Row.withSchema(resultType1).addValue(Arrays.asList(1L)).build();
+    String sql1 = "SELECT test_array(1)";
+    PCollection<Row> result1 =
+        boundedInput1.apply(
+            "testArrayUdf",
+            SqlTransform.query(sql1).registerUdf("test_array", TestReturnTypeList.class));
+    PAssert.that(result1).containsInAnyOrder(row1);
+
+    Schema resultType2 = Schema.builder().addInt32Field("int_field").build();
+    Row row2 = Row.withSchema(resultType2).addValue(3).build();
+    String sql2 = "select array_length(ARRAY[1, 2, 3])";
+    PCollection<Row> result2 =
+        boundedInput1.apply(
+            "testArrayUdf2",
+            SqlTransform.query(sql2).registerUdf("array_length", TestListLength.class));
+    PAssert.that(result2).containsInAnyOrder(row2);
+
+    pipeline.run().waitUntilFinish();
+  }
+
   /** Test that an indirect subclass of a {@link CombineFn} works as a UDAF. BEAM-3777 */
   @Test
   public void testUdafMultiLevelDescendent() {
@@ -347,6 +372,20 @@
     }
   }
 
+  /** A UDF to test support of array as return type. */
+  public static final class TestReturnTypeList implements BeamSqlUdf {
+    public static java.util.List<Long> eval(Long i) {
+      return Arrays.asList(i);
+    }
+  }
+
+  /** A UDF to test support of array as argument type. */
+  public static final class TestListLength implements BeamSqlUdf {
+    public static Integer eval(java.util.List<Long> i) {
+      return i.size();
+    }
+  }
+
   /**
    * UDF to test support for {@link
    * org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.TableMacro}.
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
index f0854bc..d4819fc 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
@@ -19,6 +19,7 @@
 
 import java.math.BigDecimal;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
@@ -51,6 +52,7 @@
             .add("col_string_varchar", SqlTypeName.VARCHAR)
             .add("col_time", SqlTypeName.TIME)
             .add("col_date", SqlTypeName.DATE)
+            .add("col_timestamp_with_local_time_zone", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
             .add("col_timestamp", SqlTypeName.TIMESTAMP)
             .add("col_boolean", SqlTypeName.BOOLEAN)
             .build();
@@ -70,6 +72,7 @@
                 "hello",
                 LocalTime.now(),
                 LocalDate.now(),
+                LocalDateTime.now(),
                 DateTime.now().toInstant(),
                 true)
             .build();
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
index cc25380..5186099 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java
@@ -20,11 +20,13 @@
 import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.Value;
 import io.grpc.Status;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.List;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.DateString;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimeString;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -161,10 +163,17 @@
     }
   }
 
-  public static TimeString convertTimeValueToTimeString(Value value) {
-    LocalTime localTime = CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
-    return new TimeString(localTime.getHour(), localTime.getMinute(), localTime.getSecond())
-        .withNanos(localTime.getNano());
+  public static TimestampString convertDateTimeValueToTimeStampString(Value value) {
+    LocalDateTime dateTime =
+        CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
+    return new TimestampString(
+            dateTime.getYear(),
+            dateTime.getMonthValue(),
+            dateTime.getDayOfMonth(),
+            dateTime.getHour(),
+            dateTime.getMinute(),
+            dateTime.getSecond())
+        .withNanos(dateTime.getNano());
   }
 
   // dates are represented as an int32 value, indicating the offset
@@ -174,6 +183,12 @@
     return DateString.fromDaysSinceEpoch(value.getDateValue());
   }
 
+  public static TimeString convertTimeValueToTimeString(Value value) {
+    LocalTime localTime = CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
+    return new TimeString(localTime.getHour(), localTime.getMinute(), localTime.getSecond())
+        .withNanos(localTime.getNano());
+  }
+
   public static Value parseDateToValue(String dateString) {
     DateTime dateTime = parseDate(dateString);
     return Value.createDateValue((int) (dateTime.getMillis() / MILLIS_PER_DAY));
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
index 6ccfb26..eb8d51f 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
@@ -197,23 +197,23 @@
 
           // Time functions
           FunctionSignatureId.FN_CURRENT_DATE, // current_date
-          // FunctionSignatureId.FN_CURRENT_DATETIME, // current_datetime
+          FunctionSignatureId.FN_CURRENT_DATETIME, // current_datetime
           FunctionSignatureId.FN_CURRENT_TIME, // current_time
           FunctionSignatureId.FN_CURRENT_TIMESTAMP, // current_timestamp
           FunctionSignatureId.FN_DATE_ADD_DATE, // date_add
-          // FunctionSignatureId.FN_DATETIME_ADD, // datetime_add
+          FunctionSignatureId.FN_DATETIME_ADD, // datetime_add
           FunctionSignatureId.FN_TIME_ADD, // time_add
           FunctionSignatureId.FN_TIMESTAMP_ADD, // timestamp_add
           FunctionSignatureId.FN_DATE_DIFF_DATE, // date_diff
-          // FunctionSignatureId.FN_DATETIME_DIFF, // datetime_diff
+          FunctionSignatureId.FN_DATETIME_DIFF, // datetime_diff
           FunctionSignatureId.FN_TIME_DIFF, // time_diff
           FunctionSignatureId.FN_TIMESTAMP_DIFF, // timestamp_diff
           FunctionSignatureId.FN_DATE_SUB_DATE, // date_sub
-          // FunctionSignatureId.FN_DATETIME_SUB, // datetime_sub
+          FunctionSignatureId.FN_DATETIME_SUB, // datetime_sub
           FunctionSignatureId.FN_TIME_SUB, // time_sub
           FunctionSignatureId.FN_TIMESTAMP_SUB, // timestamp_sub
           FunctionSignatureId.FN_DATE_TRUNC_DATE, // date_trunc
-          // FunctionSignatureId.FN_DATETIME_TRUNC, // datetime_trunc
+          FunctionSignatureId.FN_DATETIME_TRUNC, // datetime_trunc
           FunctionSignatureId.FN_TIME_TRUNC, // time_trunc
           FunctionSignatureId.FN_TIMESTAMP_TRUNC, // timestamp_trunc
           FunctionSignatureId.FN_DATE_FROM_UNIX_DATE, // date_from_unix_date
@@ -234,19 +234,18 @@
           FunctionSignatureId.FN_UNIX_MILLIS_FROM_TIMESTAMP,
           // FunctionSignatureId.FN_UNIX_MICROS_FROM_TIMESTAMP,
           FunctionSignatureId.FN_DATE_FROM_TIMESTAMP, // date
-          // FunctionSignatureId.FN_DATE_FROM_DATETIME, // date
+          FunctionSignatureId.FN_DATE_FROM_DATETIME, // date
           FunctionSignatureId.FN_DATE_FROM_YEAR_MONTH_DAY, // date
           FunctionSignatureId.FN_TIMESTAMP_FROM_STRING, // timestamp
           FunctionSignatureId.FN_TIMESTAMP_FROM_DATE, // timestamp
-          // FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME, // timestamp
+          FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME, // timestamp
           FunctionSignatureId.FN_TIME_FROM_HOUR_MINUTE_SECOND, // time
           FunctionSignatureId.FN_TIME_FROM_TIMESTAMP, // time
-          // FunctionSignatureId.FN_TIME_FROM_DATETIME, // time
-          // FunctionSignatureId.FN_DATETIME_FROM_DATE_AND_TIME, // datetime
-          // FunctionSignatureId.FN_DATETIME_FROM_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND, // datetime
-          // FunctionSignatureId.FN_DATETIME_FROM_TIMESTAMP, // datetime
-          // FunctionSignatureId.FN_DATETIME_FROM_DATE, // datetime
-
+          FunctionSignatureId.FN_TIME_FROM_DATETIME, // time
+          FunctionSignatureId.FN_DATETIME_FROM_DATE_AND_TIME, // datetime
+          FunctionSignatureId.FN_DATETIME_FROM_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND, // datetime
+          FunctionSignatureId.FN_DATETIME_FROM_TIMESTAMP, // datetime
+          FunctionSignatureId.FN_DATETIME_FROM_DATE, // datetime
           FunctionSignatureId.FN_STRING_FROM_TIMESTAMP, // string
 
           // Signatures for extracting date parts, taking a date/timestamp
@@ -258,23 +257,24 @@
 
           // Signatures specific to extracting the DATE date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
+          FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date
           FunctionSignatureId.FN_EXTRACT_DATE_FROM_TIMESTAMP, // $extract_date
 
           // Signatures specific to extracting the TIME date part from a DATETIME or a
           // TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
+          FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time
           FunctionSignatureId.FN_EXTRACT_TIME_FROM_TIMESTAMP, // $extract_time
 
           // Signature specific to extracting the DATETIME date part from a TIMESTAMP.
-          // FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime
+          FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime
 
+          // Signature for formatting and parsing
           FunctionSignatureId.FN_FORMAT_DATE, // format_date
-          // FunctionSignatureId.FN_FORMAT_DATETIME, // format_datetime
+          FunctionSignatureId.FN_FORMAT_DATETIME, // format_datetime
           FunctionSignatureId.FN_FORMAT_TIME, // format_time
           FunctionSignatureId.FN_FORMAT_TIMESTAMP, // format_timestamp
           FunctionSignatureId.FN_PARSE_DATE, // parse_date
-          // FunctionSignatureId.FN_PARSE_DATETIME, // parse_datetime
+          FunctionSignatureId.FN_PARSE_DATETIME, // parse_datetime
           FunctionSignatureId.FN_PARSE_TIME, // parse_time
           FunctionSignatureId.FN_PARSE_TIMESTAMP, // parse_timestamp
 
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
index 073aa41..dbab34a 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
@@ -28,6 +28,7 @@
 import com.google.zetasql.ZetaSQLType.TypeKind;
 import java.math.BigDecimal;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.List;
@@ -36,6 +37,7 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
@@ -45,7 +47,6 @@
  * Utility methods for ZetaSQL <=> Beam translation.
  *
  * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, GEOGRAPHY
- * TODO[BEAM-10238]: support ZetaSQL types: TIME, DATETIME, NUMERIC
  */
 @Internal
 public final class ZetaSqlBeamTranslationUtils {
@@ -106,6 +107,9 @@
     } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
       // Time type
       return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
+    } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+      // DateTime type
+      return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
     } else {
       throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
     }
@@ -184,6 +188,20 @@
       } else { // input type
         return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object));
       }
+    } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+      // DateTime value
+      LocalDateTime datetime;
+      if (object instanceof Row) { // base type
+        datetime =
+            LocalDateTime.of(
+                LocalDate.ofEpochDay(((Row) object).getInt64(DateTime.DATE_FIELD_NAME)),
+                LocalTime.ofNanoOfDay(((Row) object).getInt64(DateTime.TIME_FIELD_NAME)));
+      } else { // input type
+        datetime = (LocalDateTime) object;
+      }
+      // TODO[BEAM-10611]: Create ZetaSQL Value.createDatetimeValue(LocalDateTime) function
+      return Value.createDatetimeValue(
+          CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano());
     } else {
       throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
     }
@@ -208,6 +226,8 @@
         return FieldType.logicalType(SqlTypes.DATE).withNullable(true);
       case TYPE_TIME:
         return FieldType.logicalType(SqlTypes.TIME).withNullable(true);
+      case TYPE_DATETIME:
+        return FieldType.logicalType(SqlTypes.DATETIME).withNullable(true);
       case TYPE_TIMESTAMP:
         return FieldType.DATETIME.withNullable(true);
       case TYPE_ARRAY:
@@ -314,6 +334,9 @@
     } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
       // Time value
       return CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue());
+    } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+      // DateTime value
+      return CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue());
     } else {
       throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier);
     }
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
index d8394ab..81bc142 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
@@ -20,6 +20,7 @@
 import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BOOL;
 import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BYTES;
 import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATE;
+import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME;
 import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DOUBLE;
 import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64;
 import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC;
@@ -46,7 +47,6 @@
  * Utility methods for ZetaSQL <=> Calcite translation.
  *
  * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, GEOGRAPHY
- * TODO[BEAM-10238]: support ZetaSQL types: TIME, DATETIME, NUMERIC
  */
 @Internal
 public final class ZetaSqlCalciteTranslationUtils {
@@ -72,6 +72,8 @@
         return TypeFactory.createSimpleType(TYPE_DATE);
       case TIME:
         return TypeFactory.createSimpleType(TYPE_TIME);
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return TypeFactory.createSimpleType(TYPE_DATETIME);
       case TIMESTAMP:
         return TypeFactory.createSimpleType(TYPE_TIMESTAMP);
       case ARRAY:
@@ -107,6 +109,8 @@
         return SqlTypeName.DATE;
       case TYPE_TIME:
         return SqlTypeName.TIME;
+      case TYPE_DATETIME:
+        return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
       case TYPE_TIMESTAMP:
         // TODO: handle timestamp with time zone.
         return SqlTypeName.TIMESTAMP;
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
index 19f18cd..fd5651f 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
@@ -24,6 +24,7 @@
 import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64;
 import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_STRING;
 import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIMESTAMP;
+import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.convertDateTimeValueToTimeStampString;
 import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.convertDateValueToDateString;
 import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.convertTimeValueToTimeString;
 import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.safeMicrosToMillis;
@@ -545,7 +546,7 @@
       case TYPE_TIMESTAMP:
       case TYPE_DATE:
       case TYPE_TIME:
-        // case TYPE_DATETIME:
+      case TYPE_DATETIME:
       case TYPE_BYTES:
       case TYPE_ARRAY:
       case TYPE_STRUCT:
@@ -709,7 +710,7 @@
       case TYPE_TIMESTAMP:
       case TYPE_DATE:
       case TYPE_TIME:
-        // case TYPE_DATETIME:
+      case TYPE_DATETIME:
       case TYPE_BYTES:
         ret = convertSimpleValueToRexNode(type.getKind(), value);
         break;
@@ -792,9 +793,7 @@
               rexBuilder()
                   .makeCall(
                       SqlOperators.createZetaSqlFunction(wrapperFun, returnType.getSqlTypeName()),
-                      ImmutableList.of(
-                          rexBuilder()
-                              .makeApproxLiteral(new BigDecimal(Math.random()), returnType)));
+                      rexBuilder().makeApproxLiteral(new BigDecimal(Math.random()), returnType));
           ;
         } else {
           ret =
@@ -823,12 +822,11 @@
                     SqlOperators.createZetaSqlFunction(
                         BeamBigQuerySqlDialect.NUMERIC_LITERAL_FUNCTION,
                         ZetaSqlCalciteTranslationUtils.toCalciteTypeName(kind)),
-                    ImmutableList.of(
-                        rexBuilder()
-                            .makeExactLiteral(
-                                value.getNumericValue(),
-                                ZetaSqlCalciteTranslationUtils.toSimpleRelDataType(
-                                    kind, rexBuilder()))));
+                    rexBuilder()
+                        .makeExactLiteral(
+                            value.getNumericValue(),
+                            ZetaSqlCalciteTranslationUtils.toSimpleRelDataType(
+                                kind, rexBuilder())));
         break;
       case TYPE_TIMESTAMP:
         ret =
@@ -850,6 +848,15 @@
         // TODO: Doing micro to mills truncation, need to throw exception.
         ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false);
         break;
+      case TYPE_DATETIME:
+        ret =
+            rexBuilder()
+                .makeTimestampWithLocalTimeZoneLiteral(
+                    convertDateTimeValueToTimeStampString(value),
+                    typeFactory()
+                        .getTypeSystem()
+                        .getMaxPrecision(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE));
+        break;
       case TYPE_BYTES:
         ret = rexBuilder().makeBinaryLiteral(new ByteString(value.getBytesValue().toByteArray()));
         break;
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java
index 25f40ed..9137b94 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java
@@ -17,17 +17,13 @@
  */
 package org.apache.beam.sdk.extensions.sql.zetasql.translation;
 
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME;
 import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.zetasql.ZetaSQLType.TypeKind;
-import com.google.zetasql.resolvedast.ResolvedColumn;
 import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedTableScan;
 import java.util.List;
 import java.util.Properties;
 import org.apache.beam.sdk.extensions.sql.zetasql.TableResolution;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.config.CalciteConnectionConfigImpl;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
@@ -44,16 +40,12 @@
 /** Converts table scan. */
 class TableScanConverter extends RelConverter<ResolvedTableScan> {
 
-  private static final ImmutableSet<TypeKind> UNSUPPORTED_DATA_TYPES =
-      ImmutableSet.of(TYPE_DATETIME);
-
   TableScanConverter(ConversionContext context) {
     super(context);
   }
 
   @Override
   public RelNode convert(ResolvedTableScan zetaNode, List<RelNode> inputs) {
-    checkTableScanSchema(zetaNode.getColumnList());
 
     List<String> tablePath = getTablePath(zetaNode.getTable());
 
@@ -115,15 +107,4 @@
       }
     };
   }
-
-  private void checkTableScanSchema(List<ResolvedColumn> columnList) {
-    if (columnList != null) {
-      for (ResolvedColumn resolvedColumn : columnList) {
-        if (UNSUPPORTED_DATA_TYPES.contains(resolvedColumn.getType().getKind())) {
-          throw new UnsupportedOperationException(
-              "Does not support " + UNSUPPORTED_DATA_TYPES + " types in source tables");
-        }
-      }
-    }
-  }
 }
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
index a761d9f..2edb4d0 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java
@@ -21,6 +21,7 @@
 
 import java.nio.charset.StandardCharsets;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.Arrays;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
@@ -225,47 +226,59 @@
   public static final TestBoundedTable TABLE_EMPTY =
       TestBoundedTable.of(Schema.builder().addInt64Field("ColId").addStringField("Value").build());
 
-  private static final Schema TABLE_WTH_MAP_SCHEMA =
+  private static final Schema TABLE_WITH_MAP_SCHEMA =
       Schema.builder()
           .addMapField("map_field", FieldType.STRING, FieldType.STRING)
           .addRowField("row_field", structSchema)
           .build();
   public static final TestBoundedTable TABLE_WITH_MAP =
-      TestBoundedTable.of(TABLE_WTH_MAP_SCHEMA)
+      TestBoundedTable.of(TABLE_WITH_MAP_SCHEMA)
           .addRows(
               ImmutableMap.of("MAP_KEY_1", "MAP_VALUE_1"),
               Row.withSchema(structSchema).addValues(1L, "data1").build());
 
-  private static final Schema TABLE_WTH_DATE_SCHEMA =
+  private static final Schema TABLE_WITH_DATE_SCHEMA =
       Schema.builder()
           .addLogicalTypeField("date_field", SqlTypes.DATE)
           .addStringField("str_field")
           .build();
 
   public static final TestBoundedTable TABLE_WITH_DATE =
-      TestBoundedTable.of(TABLE_WTH_DATE_SCHEMA)
+      TestBoundedTable.of(TABLE_WITH_DATE_SCHEMA)
           .addRows(LocalDate.of(2008, 12, 25), "s")
           .addRows(LocalDate.of(2020, 4, 7), "s");
 
-  private static final Schema TABLE_WTH_TIME_SCHEMA =
+  private static final Schema TABLE_WITH_TIME_SCHEMA =
       Schema.builder()
           .addLogicalTypeField("time_field", SqlTypes.TIME)
           .addStringField("str_field")
           .build();
 
   public static final TestBoundedTable TABLE_WITH_TIME =
-      TestBoundedTable.of(TABLE_WTH_TIME_SCHEMA)
+      TestBoundedTable.of(TABLE_WITH_TIME_SCHEMA)
           .addRows(LocalTime.of(15, 30, 0), "s")
           .addRows(LocalTime.of(23, 35, 59), "s");
 
-  private static final Schema TABLE_WTH_NUMERIC_SCHEMA =
+  private static final Schema TABLE_WITH_NUMERIC_SCHEMA =
       Schema.builder().addDecimalField("numeric_field").addStringField("str_field").build();
+
   public static final TestBoundedTable TABLE_WITH_NUMERIC =
-      TestBoundedTable.of(TABLE_WTH_NUMERIC_SCHEMA)
+      TestBoundedTable.of(TABLE_WITH_NUMERIC_SCHEMA)
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"), "str1")
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"), "str2")
           .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"), "str3");
 
+  private static final Schema TABLE_WITH_DATETIME_SCHEMA =
+      Schema.builder()
+          .addLogicalTypeField("datetime_field", SqlTypes.DATETIME)
+          .addStringField("str_field")
+          .build();
+
+  public static final TestBoundedTable TABLE_WITH_DATETIME =
+      TestBoundedTable.of(TABLE_WITH_DATETIME_SCHEMA)
+          .addRows(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000), "s")
+          .addRows(LocalDateTime.of(2012, 10, 6, 11, 45, 0).withNano(987654000), "s");
+
   private static byte[] stringToBytes(String s) {
     return s.getBytes(StandardCharsets.UTF_8);
   }
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
index 0510590..7b450fb 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java
@@ -28,6 +28,7 @@
 import com.google.zetasql.Value;
 import com.google.zetasql.ZetaSQLType.TypeKind;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.Arrays;
 import org.apache.beam.sdk.schemas.Schema;
@@ -54,12 +55,12 @@
           .addField("f_string", FieldType.STRING)
           .addField("f_bytes", FieldType.BYTES)
           .addLogicalTypeField("f_date", SqlTypes.DATE)
-          // .addLogicalTypeField("f_datetime", SqlTypes.DATETIME)
+          .addLogicalTypeField("f_datetime", SqlTypes.DATETIME)
           .addLogicalTypeField("f_time", SqlTypes.TIME)
           .addField("f_timestamp", FieldType.DATETIME)
           .addArrayField("f_array", FieldType.DOUBLE)
           .addRowField("f_struct", TEST_INNER_SCHEMA)
-          // .addLogicalTypeField("f_numeric", SqlTypes.NUMERIC)
+          .addField("f_numeric", FieldType.DECIMAL)
           .addNullableField("f_null", FieldType.INT64)
           .build();
 
@@ -83,10 +84,12 @@
               new StructField("f_string", TypeFactory.createSimpleType(TypeKind.TYPE_STRING)),
               new StructField("f_bytes", TypeFactory.createSimpleType(TypeKind.TYPE_BYTES)),
               new StructField("f_date", TypeFactory.createSimpleType(TypeKind.TYPE_DATE)),
+              new StructField("f_datetime", TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME)),
               new StructField("f_time", TypeFactory.createSimpleType(TypeKind.TYPE_TIME)),
               new StructField("f_timestamp", TypeFactory.createSimpleType(TypeKind.TYPE_TIMESTAMP)),
               new StructField("f_array", TEST_INNER_ARRAY_TYPE),
               new StructField("f_struct", TEST_INNER_STRUCT_TYPE),
+              new StructField("f_numeric", TypeFactory.createSimpleType(TypeKind.TYPE_NUMERIC)),
               new StructField("f_null", TypeFactory.createSimpleType(TypeKind.TYPE_INT64))));
 
   private static final Row TEST_ROW =
@@ -97,10 +100,12 @@
           .addValue("Hello")
           .addValue(new byte[] {0x11, 0x22})
           .addValue(LocalDate.of(2020, 6, 4))
+          .addValue(LocalDateTime.of(2008, 12, 25, 15, 30, 0))
           .addValue(LocalTime.of(15, 30, 45))
           .addValue(Instant.ofEpochMilli(12345678L))
           .addArray(3.0, 6.5)
           .addValue(Row.withSchema(TEST_INNER_SCHEMA).addValues(0L, "world").build())
+          .addValue(ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"))
           .addValue(null)
           .build();
 
@@ -114,6 +119,10 @@
               Value.createStringValue("Hello"),
               Value.createBytesValue(ByteString.copyFrom(new byte[] {0x11, 0x22})),
               Value.createDateValue((int) LocalDate.of(2020, 6, 4).toEpochDay()),
+              Value.createDatetimeValue(
+                  CivilTimeEncoder.encodePacked64DatetimeSeconds(
+                      LocalDateTime.of(2008, 12, 25, 15, 30, 0)),
+                  LocalDateTime.of(2008, 12, 25, 15, 30, 0).getNano()),
               Value.createTimeValue(
                   CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.of(15, 30, 45))),
               Value.createTimestampValueFromUnixMicros(12345678000L),
@@ -123,6 +132,7 @@
               Value.createStructValue(
                   TEST_INNER_STRUCT_TYPE,
                   Arrays.asList(Value.createInt64Value(0L), Value.createStringValue("world"))),
+              Value.createNumericValue(ZetaSqlTypesUtils.bigDecimalAsNumeric("12346")),
               Value.createNullValue(TypeFactory.createSimpleType(TypeKind.TYPE_INT64))));
 
   @Test
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index e9c51c7..d148012 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -2942,7 +2942,7 @@
   }
 
   @Test
-  @Ignore("BEAM-9515")
+  @Ignore("[BEAM-9515] ArrayScanToUncollectConverter Unnest does not support sub-queries")
   public void testUNNESTExpression() {
     String sql = "SELECT * FROM UNNEST(ARRAY(SELECT Value FROM KeyValue));";
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
index 6d9ba67..483a9c1 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
@@ -61,6 +61,7 @@
     testBoundedTableMap.put("table_with_date", TestInput.TABLE_WITH_DATE);
     testBoundedTableMap.put("table_with_time", TestInput.TABLE_WITH_TIME);
     testBoundedTableMap.put("table_with_numeric", TestInput.TABLE_WITH_NUMERIC);
+    testBoundedTableMap.put("table_with_datetime", TestInput.TABLE_WITH_DATETIME);
     testBoundedTableMap.put(
         "table_with_struct_ts_string", TestInput.TABLE_WITH_STRUCT_TIMESTAMP_STRING);
     testBoundedTableMap.put("streaming_sql_test_table_a", TestInput.STREAMING_SQL_TABLE_A);
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
index 6789d63..109ca1e 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java
@@ -23,9 +23,11 @@
 import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithTimeZone;
 import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithUTCTimeZone;
 
+import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.Value;
 import com.google.zetasql.ZetaSQLType.TypeKind;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -40,7 +42,6 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.Duration;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -148,7 +149,7 @@
             + "  EXTRACT(ISOYEAR FROM date) AS isoyear,\n"
             + "  EXTRACT(YEAR FROM date) AS year,\n"
             + "  EXTRACT(ISOWEEK FROM date) AS isoweek,\n"
-            // TODO[BEAM-9178]: Add tests for DATE_TRUNC and EXTRACT with "week with weekday" date
+            // TODO[BEAM-10606]: Add tests for DATE_TRUNC and EXTRACT with "week with weekday" date
             //  parts once they are supported
             // + "  EXTRACT(WEEK FROM date) AS week,\n"
             + "  EXTRACT(MONTH FROM date) AS month,\n"
@@ -219,6 +220,22 @@
   }
 
   @Test
+  public void testDateFromDateTime() {
+    String sql = "SELECT DATE(DATETIME '2008-12-25 15:30:00.123456')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build())
+                .addValues(LocalDate.of(2008, 12, 25))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testDateAdd() {
     String sql =
         "SELECT "
@@ -580,6 +597,22 @@
   }
 
   @Test
+  public void testTimeFromDateTime() {
+    String sql = "SELECT TIME(DATETIME '2008-12-25 15:30:00.123456')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build())
+                .addValues(LocalTime.of(15, 30, 0, 123456000))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testTimeAdd() {
     String sql =
         "SELECT "
@@ -753,13 +786,420 @@
   /////////////////////////////////////////////////////////////////////////////
 
   @Test
-  @Ignore("Does not support Datetime literal.")
-  public void testDatetimeLiteral() {
-    String sql = "SELECT DATETIME '2018-01-01 05:30:00.334'";
+  public void testDateTimeLiteral() {
+    String sql = "SELECT DATETIME '2008-12-25 15:30:00.123456'";
+
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unsupported ResolvedLiteral type: DATETIME");
-    zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeColumn() {
+    String sql = "SELECT FORMAT_DATETIME('%D %T %E6S', datetime_field) FROM table_with_datetime";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addStringField("f_datetime_str").build())
+                .addValues("12/25/08 15:30:00 00.123456")
+                .build(),
+            Row.withSchema(Schema.builder().addStringField("f_datetime_str").build())
+                .addValues("10/06/12 11:45:00 00.987654")
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testGroupByDateTime() {
+    String sql = "SELECT datetime_field, COUNT(*) FROM table_with_datetime GROUP BY datetime_field";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    final Schema schema =
+        Schema.builder()
+            .addLogicalTypeField("datetime_field", SqlTypes.DATETIME)
+            .addInt64Field("count")
+            .build();
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(schema)
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000), 1L)
+                .build(),
+            Row.withSchema(schema)
+                .addValues(LocalDateTime.of(2012, 10, 6, 11, 45, 0).withNano(987654000), 1L)
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testAggregateOnDateTime() {
+    String sql = "SELECT MAX(datetime_field) FROM table_with_datetime GROUP BY str_field";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addLogicalTypeField("datetime_field", SqlTypes.DATETIME)
+                        .build())
+                .addValues(LocalDateTime.of(2012, 10, 6, 11, 45, 0).withNano(987654000))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  // TODO[BEAM-9166]: Add a test for CURRENT_DATETIME function ("SELECT CURRENT_DATETIME()")
+
+  @Test
+  public void testExtractFromDateTime() {
+    String sql =
+        "SELECT "
+            + "EXTRACT(YEAR FROM DATETIME '2008-12-25 15:30:00') as year, "
+            + "EXTRACT(QUARTER FROM DATETIME '2008-12-25 15:30:00') as quarter, "
+            + "EXTRACT(MONTH FROM DATETIME '2008-12-25 15:30:00') as month, "
+            // TODO[BEAM-10606]: Add tests for DATETIME_TRUNC and EXTRACT with "week with weekday"
+            //  date parts once they are supported
+            // + "EXTRACT(WEEK FROM DATETIME '2008-12-25 15:30:00') as week, "
+            + "EXTRACT(DAY FROM DATETIME '2008-12-25 15:30:00') as day, "
+            + "EXTRACT(DAYOFWEEK FROM DATETIME '2008-12-25 15:30:00') as dayofweek, "
+            + "EXTRACT(DAYOFYEAR FROM DATETIME '2008-12-25 15:30:00') as dayofyear, "
+            + "EXTRACT(HOUR FROM DATETIME '2008-12-25 15:30:00.123456') as hour, "
+            + "EXTRACT(MINUTE FROM DATETIME '2008-12-25 15:30:00.123456') as minute, "
+            + "EXTRACT(SECOND FROM DATETIME '2008-12-25 15:30:00.123456') as second, "
+            + "EXTRACT(MILLISECOND FROM DATETIME '2008-12-25 15:30:00.123456') as millisecond, "
+            + "EXTRACT(MICROSECOND FROM DATETIME '2008-12-25 15:30:00.123456') as microsecond, "
+            + "EXTRACT(DATE FROM DATETIME '2008-12-25 15:30:00.123456') as date, "
+            + "EXTRACT(TIME FROM DATETIME '2008-12-25 15:30:00.123456') as time ";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    final Schema schema =
+        Schema.builder()
+            .addInt64Field("year")
+            .addInt64Field("quarter")
+            .addInt64Field("month")
+            // .addInt64Field("week")
+            .addInt64Field("day")
+            .addInt64Field("dayofweek")
+            .addInt64Field("dayofyear")
+            .addInt64Field("hour")
+            .addInt64Field("minute")
+            .addInt64Field("second")
+            .addInt64Field("millisecond")
+            .addInt64Field("microsecond")
+            .addLogicalTypeField("date", SqlTypes.DATE)
+            .addLogicalTypeField("time", SqlTypes.TIME)
+            .build();
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(schema)
+                .addValues(
+                    2008L,
+                    4L,
+                    12L,
+                    // 52L,
+                    25L,
+                    5L,
+                    360L,
+                    15L,
+                    30L,
+                    0L,
+                    123L,
+                    123456L,
+                    LocalDate.of(2008, 12, 25),
+                    LocalTime.of(15, 30, 0, 123456000))
+                .build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeFromDateAndTime() {
+    String sql = "SELECT DATETIME(DATE '2008-12-25', TIME '15:30:00.123456')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeFromDate() {
+    String sql = "SELECT DATETIME(DATE '2008-12-25')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 0, 0, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeFromYearMonthDayHourMinuteSecond() {
+    String sql = "SELECT DATETIME(2008, 12, 25, 15, 30, 0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeFromTimestamp() {
+    String sql = "SELECT DATETIME(TIMESTAMP '2008-12-25 15:30:00+08', 'America/Los_Angeles')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 24, 23, 30, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeAdd() {
+    String sql =
+        "SELECT "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MICROSECOND), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MILLISECOND), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 SECOND), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MINUTE), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 HOUR), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 DAY), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MONTH), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 QUARTER), "
+            + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 YEAR) ";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addLogicalTypeField("f_time1", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time2", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time3", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time4", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time5", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time6", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time7", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time8", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time9", SqlTypes.DATETIME)
+                        .build())
+                .addValues(
+                    LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(10000),
+                    LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(10000000),
+                    LocalDateTime.of(2008, 12, 25, 15, 30, 10),
+                    LocalDateTime.of(2008, 12, 25, 15, 40, 0),
+                    LocalDateTime.of(2008, 12, 26, 1, 30, 0),
+                    LocalDateTime.of(2009, 1, 4, 15, 30, 0),
+                    LocalDateTime.of(2009, 10, 25, 15, 30, 0),
+                    LocalDateTime.of(2011, 6, 25, 15, 30, 0),
+                    LocalDateTime.of(2018, 12, 25, 15, 30, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeAddWithParameter() {
+    String sql = "SELECT DATETIME_ADD(@p0, INTERVAL @p1 HOUR)";
+
+    LocalDateTime datetime = LocalDateTime.of(2008, 12, 25, 15, 30, 00).withNano(123456000);
+    ImmutableMap<String, Value> params =
+        ImmutableMap.of(
+            "p0",
+                Value.createDatetimeValue(
+                    CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano()),
+            "p1", Value.createInt64Value(3L));
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 18, 30, 00).withNano(123456000))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeSub() {
+    String sql =
+        "SELECT "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MICROSECOND), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MILLISECOND), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 SECOND), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MINUTE), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 HOUR), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 DAY), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MONTH), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 QUARTER), "
+            + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 YEAR) ";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addLogicalTypeField("f_time1", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time2", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time3", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time4", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time5", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time6", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time7", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time8", SqlTypes.DATETIME)
+                        .addLogicalTypeField("f_time9", SqlTypes.DATETIME)
+                        .build())
+                .addValues(
+                    LocalDateTime.of(2008, 12, 25, 15, 29, 59).withNano(999990000),
+                    LocalDateTime.of(2008, 12, 25, 15, 29, 59).withNano(990000000),
+                    LocalDateTime.of(2008, 12, 25, 15, 29, 50),
+                    LocalDateTime.of(2008, 12, 25, 15, 20, 0),
+                    LocalDateTime.of(2008, 12, 25, 5, 30, 0),
+                    LocalDateTime.of(2008, 12, 15, 15, 30, 0),
+                    LocalDateTime.of(2008, 2, 25, 15, 30, 0),
+                    LocalDateTime.of(2006, 6, 25, 15, 30, 0),
+                    LocalDateTime.of(1998, 12, 25, 15, 30, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeDiff() {
+    String sql =
+        "SELECT DATETIME_DIFF(DATETIME '2008-12-25 15:30:00', DATETIME '2008-10-25 15:30:00', DAY)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addInt64Field("f_datetime_diff").build())
+                .addValues(61L)
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeDiffNegativeResult() {
+    String sql =
+        "SELECT DATETIME_DIFF(DATETIME '2008-10-25 15:30:00', DATETIME '2008-12-25 15:30:00', DAY)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addInt64Field("f_datetime_diff").build())
+                .addValues(-61L)
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDateTimeTrunc() {
+    String sql = "SELECT DATETIME_TRUNC(DATETIME '2008-12-25 15:30:00', HOUR)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addLogicalTypeField("f_datetime_trunc", SqlTypes.DATETIME)
+                        .build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 0, 0))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFormatDateTime() {
+    String sql = "SELECT FORMAT_DATETIME('%D %T %E6S', DATETIME '2008-12-25 15:30:00.123456')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addStringField("f_datetime_str").build())
+                .addValues("12/25/08 15:30:00 00.123456")
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testParseDateTime() {
+    String sql = "SELECT PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S', '2008-12-25 15:30:00.123456')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -846,7 +1286,7 @@
             + "  EXTRACT(ISOYEAR FROM timestamp) AS isoyear,\n"
             + "  EXTRACT(YEAR FROM timestamp) AS year,\n"
             + "  EXTRACT(ISOWEEK FROM timestamp) AS isoweek,\n"
-            // TODO[BEAM-9178]: Add tests for TIMESTAMP_TRUNC and EXTRACT with "week with weekday"
+            // TODO[BEAM-10606]: Add tests for TIMESTAMP_TRUNC and EXTRACT with "week with weekday"
             //  date parts once they are supported
             // + "  EXTRACT(WEEK FROM timestamp) AS week,\n"
             + "  EXTRACT(MONTH FROM timestamp) AS month,\n"
@@ -926,6 +1366,23 @@
   }
 
   @Test
+  public void testExtractDateTimeFromTimestamp() {
+    String sql = "SELECT EXTRACT(DATETIME FROM TIMESTAMP '2017-05-26 12:34:56')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addLogicalTypeField("datetime", SqlTypes.DATETIME).build())
+                .addValues(LocalDateTime.of(2017, 5, 26, 12, 34, 56))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testExtractFromTimestampAtTimeZone() {
     String sql =
         "WITH Timestamps AS (\n"
@@ -1028,6 +1485,45 @@
   }
 
   @Test
+  public void testTimestampFromDateTime() {
+    String sql = "SELECT TIMESTAMP(DATETIME '2008-12-25 15:30:00')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addDateTimeField("f_timestamp").build())
+                .addValues(parseTimestampWithTimeZone("2008-12-25 15:30:00+00"))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  // test default timezone works properly in query execution stage
+  public void testTimestampFromDateTimeWithDefaultTimezoneSet() {
+    String sql = "SELECT TIMESTAMP(DATETIME '2008-12-25 15:30:00')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    zetaSQLQueryPlanner.setDefaultTimezone("Asia/Shanghai");
+    pipeline
+        .getOptions()
+        .as(BeamSqlPipelineOptions.class)
+        .setZetaSqlDefaultTimezone("Asia/Shanghai");
+
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addDateTimeField("f_timestamp").build())
+                .addValues(parseTimestampWithTimeZone("2008-12-25 15:30:00+08"))
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testTimestampAdd() {
     String sql =
         "SELECT "
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/CrossLanguageConfiguration.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/CrossLanguageConfiguration.java
deleted file mode 100644
index df0ad3e..0000000
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/CrossLanguageConfiguration.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jdbc;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/** Abstract Parameters class to expose the Jdbc transforms to an external SDK. */
-public abstract class CrossLanguageConfiguration {
-  String driverClassName;
-  String jdbcUrl;
-  String username;
-  String password;
-  String connectionProperties;
-  Iterable<String> connectionInitSqls;
-
-  public void setDriverClassName(String driverClassName) {
-    this.driverClassName = driverClassName;
-  }
-
-  public void setJdbcUrl(String jdbcUrl) {
-    this.jdbcUrl = jdbcUrl;
-  }
-
-  public void setUsername(String username) {
-    this.username = username;
-  }
-
-  public void setPassword(String password) {
-    this.password = password;
-  }
-
-  public void setConnectionProperties(String connectionProperties) {
-    this.connectionProperties = connectionProperties;
-  }
-
-  public void setConnectionInitSqls(Iterable<String> connectionInitSqls) {
-    this.connectionInitSqls = connectionInitSqls;
-  }
-
-  protected JdbcIO.DataSourceConfiguration getDataSourceConfiguration() {
-    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
-        JdbcIO.DataSourceConfiguration.create(driverClassName, jdbcUrl)
-            .withUsername(username)
-            .withPassword(password);
-
-    if (connectionProperties != null) {
-      dataSourceConfiguration =
-          dataSourceConfiguration.withConnectionProperties(connectionProperties);
-    }
-
-    if (connectionInitSqls != null) {
-      List<String> initSqls =
-          StreamSupport.stream(connectionInitSqls.spliterator(), false)
-              .collect(Collectors.toList());
-      dataSourceConfiguration = dataSourceConfiguration.withConnectionInitSqls(initSqls);
-    }
-    return dataSourceConfiguration;
-  }
-}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java
deleted file mode 100644
index 72c8b05..0000000
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jdbc;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
-import org.apache.beam.sdk.io.jdbc.JdbcIO.DataSourceConfiguration;
-import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-
-/** Exposes {@link JdbcIO.ReadRows} as an external transform for cross-language usage. */
-@Experimental(Kind.PORTABILITY)
-@AutoService(ExternalTransformRegistrar.class)
-public class JdbcReadRowsRegistrar implements ExternalTransformRegistrar {
-
-  public static final String URN = "beam:external:java:jdbc:read_rows:v1";
-
-  @Override
-  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
-    return ImmutableMap.of(URN, JdbcReadRowsRegistrar.Builder.class);
-  }
-
-  /** Parameters class to expose the Read transform to an external SDK. */
-  public static class ReadConfiguration extends CrossLanguageConfiguration {
-    private String query;
-    private Integer fetchSize;
-    private Boolean outputParallelization;
-
-    public void setOutputParallelization(Boolean outputParallelization) {
-      this.outputParallelization = outputParallelization;
-    }
-
-    public void setFetchSize(Integer fetchSize) {
-      this.fetchSize = fetchSize;
-    }
-
-    public void setQuery(String query) {
-      this.query = query;
-    }
-  }
-
-  public static class Builder
-      implements ExternalTransformBuilder<ReadConfiguration, PBegin, PCollection<Row>> {
-    @Override
-    public PTransform<PBegin, PCollection<Row>> buildExternal(ReadConfiguration configuration) {
-      DataSourceConfiguration dataSourceConfiguration = configuration.getDataSourceConfiguration();
-
-      JdbcIO.ReadRows readRows =
-          JdbcIO.readRows()
-              .withDataSourceConfiguration(dataSourceConfiguration)
-              .withQuery(configuration.query);
-
-      if (configuration.fetchSize != null) {
-        readRows = readRows.withFetchSize(configuration.fetchSize);
-      }
-      if (configuration.outputParallelization != null) {
-        readRows = readRows.withOutputParallelization(configuration.outputParallelization);
-      }
-      return readRows;
-    }
-  }
-}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcTransformsRegistrar.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcTransformsRegistrar.java
new file mode 100644
index 0000000..cfa22d5
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcTransformsRegistrar.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.DataSourceConfiguration;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Exposes {@link JdbcIO.Write} and {@link JdbcIO.ReadRows} as the external transforms for
+ * cross-language usage.
+ */
+@Experimental(Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public class JdbcTransformsRegistrar implements ExternalTransformRegistrar {
+
+  public static final String READ_ROWS_URN = "beam:external:java:jdbc:read_rows:v1";
+  public static final String WRITE_URN = "beam:external:java:jdbc:write:v1";
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+    return ImmutableMap.of(READ_ROWS_URN, ReadRowsBuilder.class, WRITE_URN, WriteBuilder.class);
+  }
+
+  private static class CrossLanguageConfiguration {
+    String driverClassName;
+    String jdbcUrl;
+    String username;
+    String password;
+    String connectionProperties;
+    Iterable<String> connectionInitSqls;
+
+    public void setDriverClassName(String driverClassName) {
+      this.driverClassName = driverClassName;
+    }
+
+    public void setJdbcUrl(String jdbcUrl) {
+      this.jdbcUrl = jdbcUrl;
+    }
+
+    public void setUsername(String username) {
+      this.username = username;
+    }
+
+    public void setPassword(String password) {
+      this.password = password;
+    }
+
+    public void setConnectionProperties(String connectionProperties) {
+      this.connectionProperties = connectionProperties;
+    }
+
+    public void setConnectionInitSqls(Iterable<String> connectionInitSqls) {
+      this.connectionInitSqls = connectionInitSqls;
+    }
+
+    protected JdbcIO.DataSourceConfiguration getDataSourceConfiguration() {
+      JdbcIO.DataSourceConfiguration dataSourceConfiguration =
+          JdbcIO.DataSourceConfiguration.create(driverClassName, jdbcUrl)
+              .withUsername(username)
+              .withPassword(password);
+
+      if (connectionProperties != null) {
+        dataSourceConfiguration =
+            dataSourceConfiguration.withConnectionProperties(connectionProperties);
+      }
+
+      if (connectionInitSqls != null) {
+        List<String> initSqls =
+            StreamSupport.stream(connectionInitSqls.spliterator(), false)
+                .collect(Collectors.toList());
+        dataSourceConfiguration = dataSourceConfiguration.withConnectionInitSqls(initSqls);
+      }
+      return dataSourceConfiguration;
+    }
+  }
+
+  public static class ReadRowsBuilder
+      implements ExternalTransformBuilder<ReadRowsBuilder.Configuration, PBegin, PCollection<Row>> {
+
+    public static class Configuration extends CrossLanguageConfiguration {
+      private String query;
+      private Integer fetchSize;
+      private Boolean outputParallelization;
+
+      public void setOutputParallelization(Boolean outputParallelization) {
+        this.outputParallelization = outputParallelization;
+      }
+
+      public void setFetchSize(Integer fetchSize) {
+        this.fetchSize = fetchSize;
+      }
+
+      public void setQuery(String query) {
+        this.query = query;
+      }
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildExternal(Configuration configuration) {
+      JdbcIO.ReadRows readRows =
+          JdbcIO.readRows()
+              .withDataSourceConfiguration(configuration.getDataSourceConfiguration())
+              .withQuery(configuration.query);
+      if (configuration.fetchSize != null) {
+        readRows = readRows.withFetchSize(configuration.fetchSize);
+      }
+      if (configuration.outputParallelization != null) {
+        readRows = readRows.withOutputParallelization(configuration.outputParallelization);
+      }
+      return readRows;
+    }
+  }
+
+  public static class WriteBuilder
+      implements ExternalTransformBuilder<WriteBuilder.Configuration, PCollection<Row>, PDone> {
+
+    public static class Configuration extends CrossLanguageConfiguration {
+      private String statement;
+
+      public void setStatement(String statement) {
+        this.statement = statement;
+      }
+    }
+
+    @Override
+    public PTransform<PCollection<Row>, PDone> buildExternal(Configuration configuration) {
+      DataSourceConfiguration dataSourceConfiguration = configuration.getDataSourceConfiguration();
+
+      // TODO: BEAM-10396 use writeRows() when it's available
+      return JdbcIO.<Row>write()
+          .withDataSourceConfiguration(dataSourceConfiguration)
+          .withStatement(configuration.statement)
+          .withPreparedStatementSetter(new JdbcUtil.BeamRowPreparedStatementSetter());
+    }
+  }
+}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java
deleted file mode 100644
index 39997e0..0000000
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jdbc;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
-import org.apache.beam.sdk.io.jdbc.JdbcIO.DataSourceConfiguration;
-import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-
-/** Exposes {@link JdbcIO.Write} as an external transform for cross-language usage. */
-@Experimental(Kind.PORTABILITY)
-@AutoService(ExternalTransformRegistrar.class)
-public class JdbcWriteRegistrar implements ExternalTransformRegistrar {
-
-  public static final String URN = "beam:external:java:jdbc:write:v1";
-
-  @Override
-  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
-    return ImmutableMap.of(URN, JdbcWriteRegistrar.Builder.class);
-  }
-
-  /** Parameters class to expose the Write transform to an external SDK. */
-  public static class WriteConfiguration extends CrossLanguageConfiguration {
-    private String statement;
-
-    public void setStatement(String statement) {
-      this.statement = statement;
-    }
-  }
-
-  public static class Builder
-      implements ExternalTransformBuilder<WriteConfiguration, PCollection<Row>, PDone> {
-    @Override
-    public PTransform<PCollection<Row>, PDone> buildExternal(WriteConfiguration configuration) {
-      DataSourceConfiguration dataSourceConfiguration = configuration.getDataSourceConfiguration();
-
-      // TODO: BEAM-10396 use writeRows() when it's available
-      return JdbcIO.<Row>write()
-          .withDataSourceConfiguration(dataSourceConfiguration)
-          .withStatement(configuration.statement)
-          .withPreparedStatementSetter(new JdbcUtil.BeamRowPreparedStatementSetter());
-    }
-  }
-}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 13aabc8..08847f6 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -452,9 +452,15 @@
         // Set required defaults
         setTopicPartitions(Collections.emptyList());
         setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN);
-        setMaxNumRecords(Long.MAX_VALUE);
+        if (config.maxReadTime != null) {
+          setMaxReadTime(Duration.standardSeconds(config.maxReadTime));
+        }
+        setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE : config.maxNumRecords);
         setCommitOffsetsInFinalizeEnabled(false);
         setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
+        if (config.startReadTime != null) {
+          setStartReadTime(Instant.ofEpochMilli(config.startReadTime));
+        }
         // We do not include Metadata until we can encode KafkaRecords cross-language
         return build().withoutMetadata();
       }
@@ -507,6 +513,9 @@
         private Iterable<String> topics;
         private String keyDeserializer;
         private String valueDeserializer;
+        private Long startReadTime;
+        private Long maxNumRecords;
+        private Long maxReadTime;
 
         public void setConsumerConfig(Iterable<KV<String, String>> consumerConfig) {
           this.consumerConfig = consumerConfig;
@@ -523,6 +532,18 @@
         public void setValueDeserializer(String valueDeserializer) {
           this.valueDeserializer = valueDeserializer;
         }
+
+        public void setStartReadTime(Long startReadTime) {
+          this.startReadTime = startReadTime;
+        }
+
+        public void setMaxNumRecords(Long maxNumRecords) {
+          this.maxNumRecords = maxNumRecords;
+        }
+
+        public void setMaxReadTime(Long maxReadTime) {
+          this.maxReadTime = maxReadTime;
+        }
       }
     }
 
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index d157c16..3e44c17 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -34,6 +34,7 @@
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.expansion.service.ExpansionService;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Impulse;
@@ -67,6 +68,7 @@
             .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
             .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
             .build();
+    Long startReadTime = 100L;
 
     ExternalTransforms.ExternalConfigurationPayload payload =
         ExternalTransforms.ExternalConfigurationPayload.newBuilder()
@@ -98,6 +100,12 @@
                     .addCoderUrn("beam:coder:string_utf8:v1")
                     .setPayload(ByteString.copyFrom(encodeString(valueDeserializer)))
                     .build())
+            .putConfiguration(
+                "start_read_time",
+                ExternalTransforms.ConfigValue.newBuilder()
+                    .addCoderUrn("beam:coder:varint:v1")
+                    .setPayload(ByteString.copyFrom(encodeLong(startReadTime)))
+                    .build())
             .build();
 
     RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
@@ -280,6 +288,12 @@
     return baos.toByteArray();
   }
 
+  private static byte[] encodeLong(Long str) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    VarLongCoder.of().encode(str, baos);
+    return baos.toByteArray();
+  }
+
   private static class TestStreamObserver<T> implements StreamObserver<T> {
 
     private T result;
diff --git a/sdks/python/apache_beam/dataframe/__init__.py b/sdks/python/apache_beam/dataframe/__init__.py
index 427fee1..9071a88 100644
--- a/sdks/python/apache_beam/dataframe/__init__.py
+++ b/sdks/python/apache_beam/dataframe/__init__.py
@@ -13,3 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import
+
+from apache_beam.dataframe.expressions import allow_non_parallel_operations
diff --git a/sdks/python/apache_beam/dataframe/doctests.py b/sdks/python/apache_beam/dataframe/doctests.py
index 2d135ea..93f33d1 100644
--- a/sdks/python/apache_beam/dataframe/doctests.py
+++ b/sdks/python/apache_beam/dataframe/doctests.py
@@ -361,6 +361,8 @@
     original_doc_test_runner = doctest.DocTestRunner
     doctest.DocTestRunner = lambda **kwargs: BeamDataframeDoctestRunner(
         env, use_beam=use_beam, skip=skip, **kwargs)
-    return func(*args, extraglobs=extraglobs, optionflags=optionflags, **kwargs)
+    with expressions.allow_non_parallel_operations():
+      return func(
+          *args, extraglobs=extraglobs, optionflags=optionflags, **kwargs)
   finally:
     doctest.DocTestRunner = original_doc_test_runner
diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py
index 376efb8..34e01cca 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -16,6 +16,9 @@
 
 from __future__ import absolute_import
 
+import contextlib
+import threading
+
 from typing import Any
 from typing import Callable
 from typing import Iterable
@@ -203,6 +206,11 @@
         be partitioned by index whenever all of its inputs are partitioned by
         index.
     """
+    if (not _get_allow_non_parallel() and
+        requires_partition_by == partitionings.Singleton()):
+      raise NonParallelOperation(
+          "Using non-parallel form of %s "
+          "outside of allow_non_parallel_operations block." % name)
     args = tuple(args)
     if proxy is None:
       proxy = func(*(arg.proxy() for arg in args))
@@ -236,3 +244,22 @@
       args,
       requires_partition_by=partitionings.Nothing(),
       preserves_partition_by=partitionings.Singleton())
+
+
+_ALLOW_NON_PARALLEL = threading.local()
+_ALLOW_NON_PARALLEL.value = False
+
+
+def _get_allow_non_parallel():
+  return _ALLOW_NON_PARALLEL.value
+
+
+@contextlib.contextmanager
+def allow_non_parallel_operations(allow=True):
+  old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow
+  yield
+  _ALLOW_NON_PARALLEL.value = old_value
+
+
+class NonParallelOperation(Exception):
+  pass
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 1383a52..773b3ba 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -21,6 +21,7 @@
 import numpy as np
 import pandas as pd
 
+import apache_beam as beam
 from apache_beam.dataframe import expressions
 from apache_beam.dataframe import frame_base
 from apache_beam.dataframe import frames  # pylint: disable=unused-import
@@ -81,5 +82,35 @@
     self._run_test(lambda df: df.loc[lambda df: df.A > 10], df)
 
 
+class AllowNonParallelTest(unittest.TestCase):
+  def _use_non_parallel_operation(self):
+    _ = frame_base.DeferredFrame.wrap(
+        expressions.PlaceholderExpression(pd.Series([1, 2, 3]))).replace(
+            'a', 'b', limit=1)
+
+  def test_disallow_non_parallel(self):
+    with self.assertRaises(expressions.NonParallelOperation):
+      self._use_non_parallel_operation()
+
+  def test_allow_non_parallel_in_context(self):
+    with beam.dataframe.allow_non_parallel_operations():
+      self._use_non_parallel_operation()
+
+  def test_allow_non_parallel_nesting(self):
+    # disallowed
+    with beam.dataframe.allow_non_parallel_operations():
+      # allowed
+      self._use_non_parallel_operation()
+      with beam.dataframe.allow_non_parallel_operations(False):
+        # disallowed again
+        with self.assertRaises(expressions.NonParallelOperation):
+          self._use_non_parallel_operation()
+      # allowed
+      self._use_non_parallel_operation()
+    # disallowed
+    with self.assertRaises(expressions.NonParallelOperation):
+      self._use_non_parallel_operation()
+
+
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py
index 81917cf..e010b71 100644
--- a/sdks/python/apache_beam/dataframe/transforms_test.py
+++ b/sdks/python/apache_beam/dataframe/transforms_test.py
@@ -81,8 +81,9 @@
         'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
         'Speed': [380., 370., 24., 26.]
     })
-    self.run_scenario(df, lambda df: df.groupby('Animal').sum())
-    self.run_scenario(df, lambda df: df.groupby('Animal').mean())
+    with expressions.allow_non_parallel_operations():
+      self.run_scenario(df, lambda df: df.groupby('Animal').sum())
+      self.run_scenario(df, lambda df: df.groupby('Animal').mean())
 
   def test_filter(self):
     df = pd.DataFrame({
@@ -95,19 +96,21 @@
         df, lambda df: df.set_index('Animal').filter(regex='F.*', axis='index'))
 
   def test_aggregate(self):
-    a = pd.DataFrame({'col': [1, 2, 3]})
-    self.run_scenario(a, lambda a: a.agg(sum))
-    self.run_scenario(a, lambda a: a.agg(['mean', 'min', 'max']))
+    with expressions.allow_non_parallel_operations():
+      a = pd.DataFrame({'col': [1, 2, 3]})
+      self.run_scenario(a, lambda a: a.agg(sum))
+      self.run_scenario(a, lambda a: a.agg(['mean', 'min', 'max']))
 
   def test_scalar(self):
-    a = pd.Series([1, 2, 6])
-    self.run_scenario(a, lambda a: a.agg(sum))
-    self.run_scenario(a, lambda a: a / a.agg(sum))
+    with expressions.allow_non_parallel_operations():
+      a = pd.Series([1, 2, 6])
+      self.run_scenario(a, lambda a: a.agg(sum))
+      self.run_scenario(a, lambda a: a / a.agg(sum))
 
-    # Tests scalar being used as an input to a downstream stage.
-    df = pd.DataFrame({'key': ['a', 'a', 'b'], 'val': [1, 2, 6]})
-    self.run_scenario(
-        df, lambda df: df.groupby('key').sum().val / df.val.agg(sum))
+      # Tests scalar being used as an input to a downstream stage.
+      df = pd.DataFrame({'key': ['a', 'a', 'b'], 'val': [1, 2, 6]})
+      self.run_scenario(
+          df, lambda df: df.groupby('key').sum().val / df.val.agg(sum))
 
   def test_input_output_polymorphism(self):
     one_series = pd.Series([1])
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test_py3.py b/sdks/python/apache_beam/examples/snippets/snippets_test_py3.py
index 0f0b668..5eb1d4b 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test_py3.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test_py3.py
@@ -96,6 +96,18 @@
       ids = numbers | 'to_id' >> beam.Map(my_fn)
       # [END type_hints_map_annotations]
 
+    # Example using an annotated PTransform.
+    with self.assertRaises(typehints.TypeCheckError):
+      # [START type_hints_ptransforms]
+      from apache_beam.pvalue import PCollection
+
+      class IntToStr(beam.PTransform):
+        def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
+          return pcoll | beam.Map(lambda elem: str(elem))
+
+      ids = numbers | 'convert to str' >> IntToStr()
+      # [END type_hints_ptransforms]
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
index 0dad234..cec1d9b 100644
--- a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
@@ -28,12 +28,17 @@
 import time
 import typing
 import unittest
+import uuid
 
 import apache_beam as beam
 from apache_beam.io.kafka import ReadFromKafka
 from apache_beam.io.kafka import WriteToKafka
 from apache_beam.metrics import Metrics
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+NUM_RECORDS = 1000
 
 
 class CrossLanguageKafkaIO(object):
@@ -47,7 +52,7 @@
     _ = (
         pipeline
         | 'Impulse' >> beam.Impulse()
-        | 'Generate' >> beam.FlatMap(lambda x: range(1000))  # pylint: disable=range-builtin-not-iterating
+        | 'Generate' >> beam.FlatMap(lambda x: range(NUM_RECORDS))  # pylint: disable=range-builtin-not-iterating
         | 'Reshuffle' >> beam.Reshuffle()
         | 'MakeKV' >> beam.Map(lambda x:
                                (b'', str(x).encode())).with_output_types(
@@ -57,8 +62,8 @@
             topic=self.topic,
             expansion_service=self.expansion_service))
 
-  def build_read_pipeline(self, pipeline):
-    _ = (
+  def build_read_pipeline(self, pipeline, max_num_records=None):
+    kafka_records = (
         pipeline
         | 'ReadFromKafka' >> ReadFromKafka(
             consumer_config={
@@ -66,7 +71,14 @@
                 'auto.offset.reset': 'earliest'
             },
             topics=[self.topic],
-            expansion_service=self.expansion_service)
+            max_num_records=max_num_records,
+            expansion_service=self.expansion_service))
+
+    if max_num_records:
+      return kafka_records
+
+    return (
+        kafka_records
         | 'Windowing' >> beam.WindowInto(
             beam.window.FixedWindows(300),
             trigger=beam.transforms.trigger.AfterProcessingTime(60),
@@ -86,6 +98,30 @@
     os.environ.get('LOCAL_KAFKA_JAR'),
     "LOCAL_KAFKA_JAR environment var is not provided.")
 class CrossLanguageKafkaIOTest(unittest.TestCase):
+  def test_kafkaio(self):
+    kafka_topic = 'xlang_kafkaio_test_{}'.format(uuid.uuid4())
+    local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
+    with self.local_kafka_service(local_kafka_jar) as kafka_port:
+      bootstrap_servers = '{}:{}'.format(
+          self.get_platform_localhost(), kafka_port)
+      pipeline_creator = CrossLanguageKafkaIO(bootstrap_servers, kafka_topic)
+
+      self.run_kafka_write(pipeline_creator)
+      self.run_kafka_read(pipeline_creator)
+
+  def run_kafka_write(self, pipeline_creator):
+    with TestPipeline() as pipeline:
+      pipeline.not_use_test_runner_api = True
+      pipeline_creator.build_write_pipeline(pipeline)
+
+  def run_kafka_read(self, pipeline_creator):
+    with TestPipeline() as pipeline:
+      pipeline.not_use_test_runner_api = True
+      result = pipeline_creator.build_read_pipeline(pipeline, NUM_RECORDS)
+      assert_that(
+          result,
+          equal_to([(b'', str(i).encode()) for i in range(NUM_RECORDS)]))
+
   def get_platform_localhost(self):
     if sys.platform == 'darwin':
       return 'host.docker.internal'
@@ -119,18 +155,6 @@
       if kafka_server:
         kafka_server.kill()
 
-  def test_kafkaio_write(self):
-    local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
-    with self.local_kafka_service(local_kafka_jar) as kafka_port:
-      p = TestPipeline()
-      p.not_use_test_runner_api = True
-      xlang_kafkaio = CrossLanguageKafkaIO(
-          '%s:%s' % (self.get_platform_localhost(), kafka_port),
-          'xlang_kafkaio_test')
-      xlang_kafkaio.build_write_pipeline(p)
-      job = p.run()
-      job.wait_until_finish()
-
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py
index 4336bec..dc75b73 100644
--- a/sdks/python/apache_beam/io/kafka.py
+++ b/sdks/python/apache_beam/io/kafka.py
@@ -97,6 +97,9 @@
         ('topics', typing.List[unicode]),
         ('key_deserializer', unicode),
         ('value_deserializer', unicode),
+        ('start_read_time', typing.Optional[int]),
+        ('max_num_records', typing.Optional[int]),
+        ('max_read_time', typing.Optional[int]),
     ])
 
 
@@ -125,24 +128,30 @@
       topics,
       key_deserializer=byte_array_deserializer,
       value_deserializer=byte_array_deserializer,
-      expansion_service=None):
+      start_read_time=None,
+      max_num_records=None,
+      max_read_time=None,
+      expansion_service=None,
+  ):
     """
     Initializes a read operation from Kafka.
 
     :param consumer_config: A dictionary containing the consumer configuration.
     :param topics: A list of topic strings.
     :param key_deserializer: A fully-qualified Java class name of a Kafka
-                             Deserializer for the topic's key, e.g.
-                             'org.apache.kafka.common.
-                             serialization.LongDeserializer'.
-                             Default: 'org.apache.kafka.common.
-                             serialization.ByteArrayDeserializer'.
+        Deserializer for the topic's key, e.g.
+        'org.apache.kafka.common.serialization.LongDeserializer'.
+        Default: 'org.apache.kafka.common.serialization.ByteArrayDeserializer'.
     :param value_deserializer: A fully-qualified Java class name of a Kafka
-                               Deserializer for the topic's value, e.g.
-                               'org.apache.kafka.common.
-                               serialization.LongDeserializer'.
-                               Default: 'org.apache.kafka.common.
-                               serialization.ByteArrayDeserializer'.
+        Deserializer for the topic's value, e.g.
+        'org.apache.kafka.common.serialization.LongDeserializer'.
+        Default: 'org.apache.kafka.common.serialization.ByteArrayDeserializer'.
+    :param start_read_time: Use timestamp to set up start offset in milliseconds
+        epoch.
+    :param max_num_records: Maximum amount of records to be read. Mainly used
+        for tests and demo applications.
+    :param max_read_time: Maximum amount of time in seconds the transform
+        executes. Mainly used for tests and demo applications.
     :param expansion_service: The address (host:port) of the ExpansionService.
     """
     super(ReadFromKafka, self).__init__(
@@ -153,6 +162,9 @@
                 topics=topics,
                 key_deserializer=key_deserializer,
                 value_deserializer=value_deserializer,
+                max_num_records=max_num_records,
+                max_read_time=max_read_time,
+                start_read_time=start_read_time,
             )),
         expansion_service or default_io_expansion_service())
 
@@ -195,17 +207,13 @@
     :param producer_config: A dictionary containing the producer configuration.
     :param topic: A Kafka topic name.
     :param key_deserializer: A fully-qualified Java class name of a Kafka
-                             Serializer for the topic's key, e.g.
-                             'org.apache.kafka.common.
-                             serialization.LongSerializer'.
-                             Default: 'org.apache.kafka.common.
-                             serialization.ByteArraySerializer'.
+        Serializer for the topic's key, e.g.
+        'org.apache.kafka.common.serialization.LongSerializer'.
+        Default: 'org.apache.kafka.common.serialization.ByteArraySerializer'.
     :param value_deserializer: A fully-qualified Java class name of a Kafka
-                               Serializer for the topic's value, e.g.
-                               'org.apache.kafka.common.
-                               serialization.LongSerializer'.
-                               Default: 'org.apache.kafka.common.
-                               serialization.ByteArraySerializer'.
+        Serializer for the topic's value, e.g.
+        'org.apache.kafka.common.serialization.LongSerializer'.
+        Default: 'org.apache.kafka.common.serialization.ByteArraySerializer'.
     :param expansion_service: The address (host:port) of the ExpansionService.
     """
     super(WriteToKafka, self).__init__(
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 2ab121e..4a0e80f 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -68,13 +68,16 @@
 from apache_beam.internal import pickler
 from apache_beam.internal import util
 from apache_beam.portability import python_urns
+from apache_beam.pvalue import DoOutputsTuple
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.transforms.display import HasDisplayData
 from apache_beam.typehints import native_type_compatibility
 from apache_beam.typehints import typehints
+from apache_beam.typehints.decorators import IOTypeHints
 from apache_beam.typehints.decorators import TypeCheckError
 from apache_beam.typehints.decorators import WithTypeHints
 from apache_beam.typehints.decorators import get_signature
+from apache_beam.typehints.decorators import get_type_hints
 from apache_beam.typehints.decorators import getcallargs_forhints
 from apache_beam.typehints.trivial_inference import instance_to_type
 from apache_beam.typehints.typehints import validate_composite_type_param
@@ -350,6 +353,14 @@
     # type: () -> str
     return self.__class__.__name__
 
+  def default_type_hints(self):
+    fn_type_hints = IOTypeHints.from_callable(self.expand)
+    if fn_type_hints is not None:
+      fn_type_hints = fn_type_hints.strip_pcoll()
+
+    # Prefer class decorator type hints for backwards compatibility.
+    return get_type_hints(self.__class__).with_defaults(fn_type_hints)
+
   def with_input_types(self, input_type_hint):
     """Annotates the input type of a :class:`PTransform` with a type-hint.
 
@@ -419,6 +430,8 @@
     root_hint = (
         arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
     for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
+      if isinstance(pvalue_, DoOutputsTuple):
+        continue
       if pvalue_.element_type is None:
         # TODO(robertwb): It's a bug that we ever get here. (typecheck)
         continue
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index dad0a31..4cd7681 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -105,6 +105,7 @@
 from typing import Optional
 from typing import Tuple
 from typing import TypeVar
+from typing import Union
 
 from apache_beam.typehints import native_type_compatibility
 from apache_beam.typehints import typehints
@@ -378,6 +379,75 @@
         self.output_types and len(self.output_types[0]) == 1 and
         not self.output_types[1])
 
+  def strip_pcoll(self):
+    from apache_beam.pipeline import Pipeline
+    from apache_beam.pvalue import PBegin
+    from apache_beam.pvalue import PDone
+
+    return self.strip_pcoll_helper(self.input_types,
+                                   self._has_input_types,
+                                   'input_types',
+                                    [Pipeline, PBegin],
+                                   'This input type hint will be ignored '
+                                   'and not used for type-checking purposes. '
+                                   'Typically, input type hints for a '
+                                   'PTransform are single (or nested) types '
+                                   'wrapped by a PCollection, or PBegin.',
+                                   'strip_pcoll_input()').\
+                strip_pcoll_helper(self.output_types,
+                                   self.has_simple_output_type,
+                                   'output_types',
+                                   [PDone, None],
+                                   'This output type hint will be ignored '
+                                   'and not used for type-checking purposes. '
+                                   'Typically, output type hints for a '
+                                   'PTransform are single (or nested) types '
+                                   'wrapped by a PCollection, PDone, or None.',
+                                   'strip_pcoll_output()')
+
+  def strip_pcoll_helper(
+      self,
+      my_type,            # type: any
+      has_my_type,        # type: Callable[[], bool]
+      my_key,             # type: str
+      special_containers,   # type: List[Union[PBegin, PDone, PCollection]]
+      error_str,          # type: str
+      source_str          # type: str
+      ):
+    # type: (...) -> IOTypeHints
+    from apache_beam.pvalue import PCollection
+
+    if not has_my_type() or not my_type or len(my_type[0]) != 1:
+      return self
+
+    my_type = my_type[0][0]
+
+    if isinstance(my_type, typehints.AnyTypeConstraint):
+      return self
+
+    special_containers += [PCollection]
+    kwarg_dict = {}
+
+    if (my_type not in special_containers and
+        getattr(my_type, '__origin__', None) != PCollection):
+      logging.warning(error_str + ' Got: %s instead.' % my_type)
+      kwarg_dict[my_key] = None
+      return self._replace(
+          origin=self._make_origin([self], tb=False, msg=[source_str]),
+          **kwarg_dict)
+
+    if (getattr(my_type, '__args__', -1) in [-1, None] or
+        len(my_type.__args__) == 0):
+      # e.g. PCollection (or PBegin/PDone)
+      kwarg_dict[my_key] = ((typehints.Any, ), {})
+    else:
+      # e.g. PCollection[type]
+      kwarg_dict[my_key] = ((convert_to_beam_type(my_type.__args__[0]), ), {})
+
+    return self._replace(
+        origin=self._make_origin([self], tb=False, msg=[source_str]),
+        **kwarg_dict)
+
   def strip_iterable(self):
     # type: () -> IOTypeHints
 
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py b/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
index 2016871..e12930d 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
@@ -22,6 +22,7 @@
 
 from __future__ import absolute_import
 
+import typing
 import unittest
 
 import apache_beam as beam
@@ -257,6 +258,135 @@
     result = [1, 2, 3] | beam.FlatMap(fn) | beam.Map(fn2)
     self.assertCountEqual([4, 6], result)
 
+  def test_typed_ptransform_with_no_error(self):
+    class StrToInt(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    _ = ['1', '2', '3'] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_bad_typehints(self):
+    class StrToInt(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                "Input type hint violation at IntToStr: "
+                                "expected <class 'str'>, got <class 'int'>"):
+      _ = ['1', '2', '3'] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_bad_input(self):
+    class StrToInt(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                "Input type hint violation at StrToInt: "
+                                "expected <class 'str'>, got <class 'int'>"):
+      # Feed integers to a PTransform that expects strings
+      _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_partial_typehints(self):
+    class StrToInt(beam.PTransform):
+      def expand(self, pcoll) -> beam.pvalue.PCollection[int]:
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    # Feed integers to a PTransform that should expect strings
+    # but has no typehints so it expects any
+    _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_bare_wrappers(self):
+    class StrToInt(beam.PTransform):
+      def expand(
+          self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_no_typehints(self):
+    class StrToInt(beam.PTransform):
+      def expand(self, pcoll):
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    # Feed integers to a PTransform that should expect strings
+    # but has no typehints so it expects any
+    _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_generic_annotations(self):
+    T = typing.TypeVar('T')
+
+    class IntToInt(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[T]:
+        return pcoll | beam.Map(lambda x: x)
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    _ = [1, 2, 3] | IntToInt() | IntToStr()
+
+  def test_typed_ptransform_with_do_outputs_tuple_compiles(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, element: int, *args, **kwargs):
+        if element % 2:
+          yield beam.pvalue.TaggedOutput('odd', 1)
+        else:
+          yield beam.pvalue.TaggedOutput('even', 1)
+
+    class MyPTransform(beam.PTransform):
+      def expand(self, pcoll: beam.pvalue.PCollection[int]):
+        return pcoll | beam.ParDo(MyDoFn()).with_outputs('odd', 'even')
+
+    # This test fails if you remove the following line from ptransform.py
+    # if isinstance(pvalue_, DoOutputsTuple): continue
+    _ = [1, 2, 3] | MyPTransform()
+
 
 class AnnotationsTest(unittest.TestCase):
   def test_pardo_dofn(self):
diff --git a/sdks/python/apache_beam/typehints/typehints_test_py3.py b/sdks/python/apache_beam/typehints/typehints_test_py3.py
index a7c23f0..5a36330 100644
--- a/sdks/python/apache_beam/typehints/typehints_test_py3.py
+++ b/sdks/python/apache_beam/typehints/typehints_test_py3.py
@@ -23,11 +23,19 @@
 from __future__ import absolute_import
 from __future__ import print_function
 
+import typing
 import unittest
 
+import apache_beam.typehints.typehints as typehints
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam.pvalue import PBegin
+from apache_beam.pvalue import PCollection
+from apache_beam.pvalue import PDone
 from apache_beam.transforms.core import DoFn
 from apache_beam.typehints import KV
 from apache_beam.typehints import Iterable
+from apache_beam.typehints.typehints import Any
 
 
 class TestParDoAnnotations(unittest.TestCase):
@@ -46,11 +54,221 @@
       def process(self, element: int) -> Iterable[str]:
         pass
 
-    print(MyDoFn().get_type_hints())
     th = MyDoFn().get_type_hints()
     self.assertEqual(th.input_types, ((int, ), {}))
     self.assertEqual(th.output_types, ((str, ), {}))
 
 
+class TestPTransformAnnotations(unittest.TestCase):
+  def test_pep484_annotations(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_annotations_without_input_pcollection_wrapper(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: int) -> PCollection[str]:
+        return pcoll | Map(lambda num: str(num))
+
+    error_str = (
+        r'This input type hint will be ignored and not used for '
+        r'type-checking purposes. Typically, input type hints for a '
+        r'PTransform are single (or nested) types wrapped by a '
+        r'PCollection, or PBegin. Got: {} instead.'.format(int))
+
+    with self.assertLogs(level='WARN') as log:
+      MyPTransform().get_type_hints()
+      self.assertIn(error_str, log.output[0])
+
+  def test_annotations_without_output_pcollection_wrapper(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection[int]) -> str:
+        return pcoll | Map(lambda num: str(num))
+
+    error_str = (
+        r'This output type hint will be ignored and not used for '
+        r'type-checking purposes. Typically, output type hints for a '
+        r'PTransform are single (or nested) types wrapped by a '
+        r'PCollection, PDone, or None. Got: {} instead.'.format(str))
+
+    with self.assertLogs(level='WARN') as log:
+      th = MyPTransform().get_type_hints()
+      self.assertIn(error_str, log.output[0])
+      self.assertEqual(th.input_types, ((int, ), {}))
+      self.assertEqual(th.output_types, None)
+
+  def test_annotations_without_input_internal_type(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection) -> PCollection[str]:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_annotations_without_output_internal_type(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection[int]) -> PCollection:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_without_any_internal_type(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection) -> PCollection:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_without_input_typehint(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll) -> PCollection[str]:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_annotations_without_output_typehint(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection[int]):
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_without_any_typehints(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll):
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, None)
+    self.assertEqual(th.output_types, None)
+
+  def test_annotations_with_pbegin(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PBegin):
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_with_pdone(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll) -> PDone:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_with_none_input(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: None) -> PCollection[str]:
+        return pcoll | Map(lambda num: str(num))
+
+    error_str = (
+        r'This input type hint will be ignored and not used for '
+        r'type-checking purposes. Typically, input type hints for a '
+        r'PTransform are single (or nested) types wrapped by a '
+        r'PCollection, or PBegin. Got: {} instead.'.format(None))
+
+    with self.assertLogs(level='WARN') as log:
+      th = MyPTransform().get_type_hints()
+      self.assertIn(error_str, log.output[0])
+      self.assertEqual(th.input_types, None)
+      self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_annotations_with_none_output(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll) -> None:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_with_arbitrary_output(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll) -> str:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, None)
+
+  def test_annotations_with_arbitrary_input_and_output(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: int) -> str:
+        return pcoll | Map(lambda num: str(num))
+
+    input_error_str = (
+        r'This input type hint will be ignored and not used for '
+        r'type-checking purposes. Typically, input type hints for a '
+        r'PTransform are single (or nested) types wrapped by a '
+        r'PCollection, or PBegin. Got: {} instead.'.format(int))
+
+    output_error_str = (
+        r'This output type hint will be ignored and not used for '
+        r'type-checking purposes. Typically, output type hints for a '
+        r'PTransform are single (or nested) types wrapped by a '
+        r'PCollection, PDone, or None. Got: {} instead.'.format(str))
+
+    with self.assertLogs(level='WARN') as log:
+      th = MyPTransform().get_type_hints()
+      self.assertIn(input_error_str, log.output[0])
+      self.assertIn(output_error_str, log.output[1])
+      self.assertEqual(th.input_types, None)
+      self.assertEqual(th.output_types, None)
+
+  def test_typing_module_annotations_are_converted_to_beam_annotations(self):
+    class MyPTransform(PTransform):
+      def expand(
+          self, pcoll: PCollection[typing.Dict[str, str]]
+      ) -> PCollection[typing.Dict[str, str]]:
+        return pcoll
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
+    self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
+
+  def test_nested_typing_annotations_are_converted_to_beam_annotations(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll:
+         PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]) \
+      -> PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]:
+        return pcoll
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(
+        th.input_types,
+        ((typehints.Union[int, typehints.Any, typehints.Dict[str,
+                                                             float]], ), {}))
+    self.assertEqual(
+        th.input_types,
+        ((typehints.Union[int, typehints.Any, typehints.Dict[str,
+                                                             float]], ), {}))
+
+  def test_mixed_annotations_are_converted_to_beam_annotations(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: typing.Any) -> typehints.Any:
+        return pcoll
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((typehints.Any, ), {}))
+    self.assertEqual(th.input_types, ((typehints.Any, ), {}))
+
+
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle
index 2e60afa..48312a6 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -101,38 +101,6 @@
   }
 }
 
-task crossLanguagePythonJavaKafkaIOFlink {
-  dependsOn 'setupVirtualenv'
-  dependsOn ':runners:flink:1.10:job-server:shadowJar'
-  dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker"
-  dependsOn ':sdks:java:container:docker'
-  dependsOn ':sdks:java:io:expansion-service:shadowJar'
-  dependsOn ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar'
-
-  doLast {
-    def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
-    def options = [
-            "--runner=FlinkRunner",
-            "--parallelism=2",
-            "--environment_type=DOCKER",
-            "--environment_cache_millis=10000",
-            "--experiment=beam_fn_api",
-    ]
-    exec {
-      environment "LOCAL_KAFKA_JAR", kafkaJar
-      executable 'sh'
-      args '-c', """
-          . ${envdir}/bin/activate \\
-          && cd ${pythonRootDir} \\
-          && pip install -e .[test] \\
-          && python setup.py nosetests \\
-              --tests apache_beam.io.external.xlang_kafkaio_it_test:CrossLanguageKafkaIOTest \\
-              --test-pipeline-options='${options.join(' ')}'
-          """
-    }
-  }
-}
-
 task createProcessWorker {
   dependsOn ':sdks:python:container:build'
   dependsOn 'setupVirtualenv'
@@ -223,12 +191,14 @@
           ':runners:flink:1.10:job-server:shadowJar',
           ':sdks:java:container:docker',
           ':sdks:java:io:expansion-service:shadowJar',
+          ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar'
   ]
 
   doLast {
     def tests = [
             "apache_beam.io.gcp.bigquery_read_it_test",
             "apache_beam.io.external.xlang_jdbcio_it_test",
+            "apache_beam.io.external.xlang_kafkaio_it_test",
     ]
     def testOpts = ["--tests=${tests.join(',')}"]
     def cmdArgs = mapToArgString([
@@ -236,7 +206,9 @@
             "suite": "postCommitIT-flink-py${pythonVersionSuffix}",
             "pipeline_opts": "--runner=FlinkRunner --project=apache-beam-testing --environment_type=LOOPBACK --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
     ])
+    def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
     exec {
+      environment "LOCAL_KAFKA_JAR", kafkaJar
       executable 'sh'
       args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs"
     }
diff --git a/website/www/site/content/en/contribute/release-guide.md b/website/www/site/content/en/contribute/release-guide.md
index 87eb732..8599b41 100644
--- a/website/www/site/content/en/contribute/release-guide.md
+++ b/website/www/site/content/en/contribute/release-guide.md
@@ -37,6 +37,9 @@
 
 1. Decide to release
 1. Prepare for the release
+1. Investigate performance regressions
+1. Create a release branch
+1. Verify release branch
 1. Build a release candidate
 1. Vote on the release candidate
 1. During vote process, run validation tests
@@ -45,7 +48,7 @@
 1. Promote the release
 
 
-### Decide to release
+## 1. Decide to release
 
 Deciding to release and selecting a Release Manager is the first step of the release process. This is a consensus-based decision of the entire community.
 
@@ -60,7 +63,7 @@
 
 **********
 
-## 1. Prepare for the release
+## 2. Prepare for the release
 
 Before your first release, you should perform one-time configuration steps. This will set up your security keys for signing the release and access to various release repositories.
 
@@ -244,7 +247,23 @@
 **********
 
 
-## 2. Create a release branch in apache/beam repository
+## 3. Investigate performance regressions
+
+Check the Beam load tests for possible performance regressions. Measurements are available on [metrics.beam.apache.org](http://metrics.beam.apache.org).
+
+All Runners which publish data should be checked for the following, in both *batch* and *streaming* mode:
+
+- [ParDo](http://metrics.beam.apache.org/d/MOi-kf3Zk/pardo-load-tests) and [GBK](http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-test): Runtime, latency, checkpoint duration
+- [Nexmark](http://metrics.beam.apache.org/d/ahuaA_zGz/nexmark): Query runtime for all queries
+- [IO](http://metrics.beam.apache.org/d/bnlHKP3Wz/java-io-it-tests-dataflow): Runtime
+
+If regressions are found, the release branch can still be created, but the regressions should be investigated and fixed as part of the release process.
+The role of the release manager is to file JIRA issues for each regression with the 'Fix Version' set to the to-be-released version. The release manager
+oversees these just like any other JIRA issue marked with the 'Fix Version' of the release. 
+
+The mailing list should be informed to allow fixing the regressions in the course of the release.
+
+## 4. Create a release branch in apache/beam repository
 
 Attention: Only committer has permission to create release branch in apache/beam.
 
@@ -367,7 +386,7 @@
 **********
 
 
-## 3. Verify release branch
+## 5. Verify release branch
 
 After the release branch is cut you need to make sure it builds and has no significant issues that would block the creation of the release candidate.
 There are 2 ways to perform this verification, either running automation script(recommended), or running all commands manually.
@@ -477,11 +496,16 @@
 
 * Description: Description of failure
 
+#### Inform the mailing list
+
+The dev@beam.apache.org mailing list should be informed about the release branch being cut. Alongside with this note,
+a list of pending issues and to-be-trigated issues should be included. Afterwards, this list can be refined and updated
+by the release manager and the Beam community.
 
 **********
 
 
-## 4. Triage release-blocking issues in JIRA
+## 6. Triage release-blocking issues in JIRA
 
 There could be outstanding release-blocking issues, which should be triaged before proceeding to build a release candidate. We track them by assigning a specific `Fix version` field even before the issue resolved.
 
@@ -528,7 +552,7 @@
 **********
 
 
-## 5. Build a release candidate
+## 7. Build a release candidate
 
 ### Checklist before proceeding
 
@@ -599,7 +623,7 @@
 **********
 
 
-## 6. Prepare documents
+## 8. Prepare documents
 
 ### Update and Verify Javadoc
 
@@ -748,7 +772,7 @@
 **********
 
 
-## 7. Vote and validate release candidate
+## 9. Vote and validate release candidate
 
 Once you have built and individually reviewed the release candidate, please share it for the community-wide review. Please review foundation-wide [voting guidelines](http://www.apache.org/foundation/voting.html) for more information.
 
@@ -1074,7 +1098,7 @@
 **********
 
 
-## 8. Finalize the release
+## 10. Finalize the release
 
 Once the release candidate has been reviewed and approved by the community, the release should be finalized. This involves the final deployment of the release candidate to the release repositories, merging of the website changes, etc.
 
@@ -1163,7 +1187,7 @@
 **********
 
 
-## 9. Promote the release
+## 11. Promote the release
 
 Once the release has been finalized, the last step of the process is to promote the release within the project and beyond.
 
diff --git a/website/www/site/content/en/documentation/io/built-in/snowflake.md b/website/www/site/content/en/documentation/io/built-in/snowflake.md
new file mode 100644
index 0000000..078a373
--- /dev/null
+++ b/website/www/site/content/en/documentation/io/built-in/snowflake.md
@@ -0,0 +1,364 @@
+---
+title: "Apache Snowflake I/O connector"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+[Built-in I/O Transforms](/documentation/io/built-in/)
+
+# Snowflake I/O
+Pipeline options and general information about using and running Snowflake IO.
+
+## Authentication
+All authentication methods available for the Snowflake JDBC Driver are possible to use with the IO transforms:
+
+- Username and password
+- Key pair
+- OAuth token
+
+Passing credentials is done via Pipeline options.
+
+Passing credentials is done via Pipeline options used to instantiate `SnowflakeIO.DataSourceConfiguration`:
+{{< highlight java >}}
+SnowflakePipelineOptions options = PipelineOptionsFactory
+        .fromArgs(args)
+        .withValidation()
+        .as(SnowflakePipelineOptions.class);
+SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(options);
+
+SnowflakeIO.DataSourceConfiguration.create(credentials)
+        .(other DataSourceConfiguration options)
+{{< /highlight >}}
+### Username and password 
+To use username/password authentication in SnowflakeIO, invoke your pipeline with the following Pipeline options:
+{{< highlight >}}
+--username=<USERNAME> --password=<PASSWORD>
+{{< /highlight >}}
+### Key pair
+To use this authentication method, you must first generate a key pair and associate the public key with the Snowflake user that will connect using the IO transform. For instructions, see the [Snowflake documentation](https://docs.snowflake.com/en/user-guide/jdbc-configure.html).
+
+To use key pair authentication with SnowflakeIO, invoke your pipeline with following Pipeline options:
+{{< highlight >}}
+--username=<USERNAME> --privateKeyPath=<PATH_TO_P8_FILE> --privateKeyPassphrase=<PASSWORD_FOR_KEY>
+{{< /highlight >}}
+
+### OAuth token
+SnowflakeIO also supports OAuth token.
+
+**IMPORTANT**: SnowflakeIO requires a valid OAuth access token. It will neither be able to refresh the token nor obtain it using a web-based flow. For information on configuring an OAuth integration and obtaining the token, see the  [Snowflake documentation](https://docs.snowflake.com/en/user-guide/oauth-intro.html).
+
+Once you have the token, invoke your pipeline with following Pipeline Options:
+{{< highlight >}}
+--oauthToken=<TOKEN>
+{{< /highlight >}}
+## DataSource Configuration
+DataSource configuration is required in both read and write object for configuring Snowflake connection properties for IO purposes.
+### General usage
+Create the DataSource configuration:
+{{< highlight java >}}
+ SnowflakeIO.DataSourceConfiguration
+            .create(SnowflakeCredentialsFactory.of(options))
+            .withUrl(options.getUrl())
+            .withServerName(options.getServerName())
+            .withDatabase(options.getDatabase())
+            .withWarehouse(options.getWarehouse())
+            .withSchema(options.getSchema());
+{{< /highlight >}}
+Where parameters can be:
+
+- ` .withUrl(...)`
+  - JDBC-like URL for your Snowflake account, including account name and region, without any parameters.
+  - Example: `.withUrl("jdbc:snowflake://account.snowflakecomputing.com")`
+- `.withServerName(...)`
+  - Server Name - full server name with account, zone and domain.
+  - Example: `.withServerName("account.snowflakecomputing.com")`
+- `.withDatabase(...)`
+  - Name of the Snowflake database to use. 
+  - Example: `.withDatabase("MY_DATABASE")`
+- `.withWarehouse(...)`
+  - Name of the Snowflake warehouse to use. This parameter is optional. If no warehouse name is specified, the default warehouse for the user is used.
+  - Example: `.withWarehouse("MY_WAREHOUSE")`
+- `.withSchema(...)`
+  - Name of the schema in the database to use. This parameter is optional.
+  - Example: `.withSchema("PUBLIC")`
+
+
+**Note** - either `.withUrl(...)` or `.withServerName(...)` **is required**.
+
+## Pipeline options
+Use Beam’s [Pipeline options](https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/options/PipelineOptions.html) to set options via the command line.
+### Snowflake Pipeline options
+Snowflake IO library supports following options that can be passed via the [command line](https://beam.apache.org/documentation/io/built-in/snowflake/#running-main-command-with-pipeline-options) by default when a Pipeline uses them:
+
+`--url` Snowflake's JDBC-like url including account name and region without any parameters.
+
+`--serverName` Full server name with account, zone and domain.
+
+`--username` Required for username/password and Private Key authentication.
+
+`--oauthToken` Required for OAuth authentication only.
+
+`--password` Required for username/password authentication only.
+
+`--privateKeyPath` Path to Private Key file. Required for Private Key authentication only.
+
+`--privateKeyPassphrase` Private Key's passphrase. Required for Private Key authentication only.
+
+`--stagingBucketName` External bucket path ending with `/`. I.e. `gs://bucket/`. Sub-directories are allowed.
+
+`--storageIntegrationName` Storage integration name
+
+`--warehouse` Warehouse to use. Optional.
+
+`--database` Database name to connect to. Optional.
+
+`--schema` Schema to use. Optional.
+
+`--table` Table to use. Optional.
+
+`--query` Query to use. Optional.
+
+`--role` Role to use. Optional.
+
+`--authenticator` Authenticator to use. Optional.
+
+`--portNumber` Port number. Optional.
+
+`--loginTimeout` Login timeout. Optional.
+
+## Running pipelines on Dataflow
+By default, pipelines are run on [Direct Runner](https://beam.apache.org/documentation/runners/direct/) on your local machine. To run a pipeline on [Google Dataflow](https://cloud.google.com/dataflow/), you must provide the following Pipeline options:
+
+- `--runner=DataflowRunner`
+  - The Dataflow’s specific runner.
+
+- `--project=<GCS PROJECT>`
+  - Name of the Google Cloud Platform project.
+
+- `--stagingBucketName=<GCS BUCKET NAME>`
+  - Google Cloud Services bucket where the Beam files will be staged.
+
+- `--maxNumWorkers=5`
+  - (optional) Maximum number of workers.
+
+- `--appName=<JOB NAME>`
+  - (optional) Prefix for the job name in the Dataflow Dashboard.
+
+More pipeline options for Dataflow can be found [here](https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.html).
+
+**Note**: To properly authenticate with Google Cloud, please use [gcloud](https://cloud.google.com/sdk/gcloud/) or follow the [Google Cloud documentation](https://cloud.google.com/docs/authentication/).
+
+**Important**: Please acknowledge [Google Dataflow pricing](Important: Please acknowledge Google Dataflow pricing).
+
+## Writing to Snowflake tables
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/values/PCollection.html) to your Snowflake database.
+### Batch write (from a bounded source)
+The basic .`write()` operation usage is as follows:
+{{< highlight java >}}
+data.apply(
+   SnowflakeIO.<type>write()
+       .withDataSourceConfiguration(dc)
+       .to("MY_TABLE")
+       .withStagingBucketName("BUCKET NAME")
+       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+       .withUserDataMapper(mapper)
+)
+{{< /highlight >}}
+Replace type with the data type of the PCollection object to write; for example, SnowflakeIO.<String> for an input PCollection of Strings.
+
+All the below parameters are required:
+
+- `.withDataSourceConfiguration()` Accepts a DatasourceConfiguration object.
+
+- `.to()` Accepts the target Snowflake table name.
+
+- `.withStagingBucketName()` Accepts a cloud bucket path ended with slash.
+ -Example: `.withStagingBucketName("gs://mybucket/my/dir/")`
+
+- `.withStorageIntegrationName()` Accepts a name of a Snowflake storage integration object created according to Snowflake documentationt. Example:
+{{< highlight >}}
+CREATE OR REPLACE STORAGE INTEGRATION test_integration 
+TYPE = EXTERNAL_STAGE 
+STORAGE_PROVIDER = GCS 
+ENABLED = TRUE 
+STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
+{{< /highlight >}}
+Then:
+{{< highlight >}}
+.withStorageIntegrationName(test_integration)
+{{< /highlight >}}
+
+- `.withUserDataMapper()` Accepts the UserDataMapper function that will map a user's PCollection to an array of String values `(String[])`.
+
+**Note**:
+SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
+
+### UserDataMapper function
+The UserDataMapper function is required to map data from a PCollection to an array of String values before the `write()` operation saves the data to temporary .csv files. For example:
+{{< highlight java >}}
+public static SnowflakeIO.UserDataMapper<Long> getCsvMapper() {
+    return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new String[] {recordLine.toString()};
+}
+{{< /highlight >}}
+
+### Additional write options
+#### Transformation query
+The `.withQueryTransformation()` option for the `write()` operation accepts a SQL query as a String value, which will be performed while transfering data staged in CSV files directly to the target Snowflake table. For information about the transformation SQL syntax,  see the [Snowflake Documentation](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html#transformation-parameters).
+
+Usage:
+{{< highlight java >}}
+String query = "SELECT t.$1 from YOUR_TABLE;";
+data.apply(
+   SnowflakeIO.<~>write()
+       .withDataSourceConfiguration(dc)
+       .to("MY_TABLE")
+       .withStagingBucketName("BUCKET NAME")
+       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+       .withUserDataMapper(mapper)
+       .withQueryTransformation(query)
+)
+{{< /highlight >}}
+
+#### Write disposition
+Define the write behaviour based on the table where data will be written to by specifying the `.withWriteDisposition(...)` option for the `write()` operation. The following values are supported:
+
+- APPEND - Default behaviour. Written data is added to the existing rows in the table,
+
+- EMPTY - The target table must be empty;  otherwise, the write operation fails,
+
+- TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+Example of usage:
+{{< highlight java >}}
+data.apply(
+   SnowflakeIO.<~>write()
+       .withDataSourceConfiguration(dc)
+       .to("MY_TABLE")
+       .withStagingBucketName("BUCKET NAME")
+       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+       .withUserDataMapper(mapper)
+       .withWriteDisposition(TRUNCATE)
+)
+{{< /highlight >}}
+
+#### Create disposition
+The `.withCreateDisposition()` option defines the behavior of the write operation if the target table does not exist . The following values are supported:
+
+- CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the `.withTableSchema()` option.
+
+- CREATE_NEVER -  The write operation fails if the target table does not exist.
+
+Usage:
+{{< highlight java >}}
+data.apply(
+   SnowflakeIO.<~>write()
+       .withDataSourceConfiguration(dc)
+       .to("MY_TABLE")
+       .withStagingBucketName("BUCKET NAME")
+       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+       .withUserDataMapper(mapper)
+       .withCreateDisposition(CREATE_NEVER)
+)
+{{< /highlight >}}
+
+#### Table schema disposition
+When the `.withCreateDisposition()` .option is set to `CREATE_IF_NEEDED`, the `.withTableSchema()` option enables specifying the schema for the created target table. 
+A table schema is a list of `SFColumn` objects with name and type corresponding to column type for each column in the table. 
+
+Usage:
+{{< highlight java >}}
+SFTableSchema tableSchema =
+    new SFTableSchema(
+        SFColumn.of("my_date", new SFDate(), true),
+        new SFColumn("id", new SFNumber()),
+        SFColumn.of("name", new SFText(), true));
+
+data.apply(
+   SnowflakeIO.<~>write()
+       .withDataSourceConfiguration(dc)
+       .to("MY_TABLE")
+       .withStagingBucketName("BUCKET NAME")
+       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+       .withUserDataMapper(mapper)
+       .withTableSchema(tableSchema)
+)
+{{< /highlight >}}
+## Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/values/PCollection.html) of user-defined data type.
+
+### General usage
+
+The basic `.read()` operation usage:
+{{< highlight java >}}
+PCollection<USER_DATA_TYPE> items = pipeline.apply(
+   SnowflakeIO.<USER_DATA_TYPE>read()
+       .withDataSourceConfiguration(dc)
+       .fromTable("MY_TABLE") // or .fromQuery("QUERY")
+       .withStagingBucketName("BUCKET NAME")
+       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+       .withCsvMapper(mapper)
+       .withCoder(coder));
+)
+{{< /highlight >}}
+Where all below parameters are required:
+
+- `.withDataSourceConfiguration(...)`
+  - Accepts a DataSourceConfiguration object.
+
+- `.fromTable(...) or .fromQuery(...)`
+  - Specifies a Snowflake table name or custom SQL query.
+
+- `.withStagingBucketName()`
+  - Accepts a cloud bucket name.
+
+-  `.withStorageIntegrationName()`
+  - Accepts a name of a Snowflake storage integration object created according to Snowflake documentation. Example:
+{{< highlight >}}
+CREATE OR REPLACE STORAGE INTEGRATION test_integration 
+TYPE = EXTERNAL_STAGE 
+STORAGE_PROVIDER = GCS 
+ENABLED = TRUE 
+STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
+{{< /highlight >}}
+Then:
+{{< highlight >}}
+.withStorageIntegrationName(test_integration)
+{{< /highlight >}}
+
+- `.withCsvMapper(mapper)`
+  - Accepts a [CSVMapper](https://beam.apache.org/documentation/io/built-in/snowflake/#csvmapper) instance for mapping String[] to USER_DATA_TYPE.
+- `.withCoder(coder)`
+  - Accepts the [Coder](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/coders/Coder.html) for USER_DATA_TYPE.
+
+**Note**:
+SnowflakeIO uses COPY statements behind the scenes to read (using [COPY to location](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)) files staged in cloud storage.StagingBucketName will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+### CSVMapper
+SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. 
+
+The CSVMapper’s job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom POJO.
+
+Example implementation of CsvMapper for GenericRecord:
+{{< highlight java >}}
+static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
+   return (SnowflakeIO.CsvMapper<GenericRecord>)
+           parts -> {
+               return new GenericRecordBuilder(PARQUET_SCHEMA)
+                       .set("ID", Long.valueOf(parts[0]))
+                       .set("NAME", parts[1])
+                       [...]
+                       .build();
+           };
+}
+{{< /highlight >}}
\ No newline at end of file
diff --git a/website/www/site/content/en/documentation/patterns/ai-platform.md b/website/www/site/content/en/documentation/patterns/ai-platform.md
index 905f895..b2a7b10 100644
--- a/website/www/site/content/en/documentation/patterns/ai-platform.md
+++ b/website/www/site/content/en/documentation/patterns/ai-platform.md
@@ -35,7 +35,7 @@
 {{< /highlight >}}
 
 {{< highlight java >}}
-// Java examples will be available on Beam 2.23 release.
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" NlpAnalyzeText >}}
 {{< /highlight >}}
 
 
@@ -79,7 +79,7 @@
 {{< /highlight >}}
 
 {{< highlight java >}}
-// Java examples will be available on Beam 2.23 release.
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" NlpExtractSentiments >}}
 {{< /highlight >}}
 
 The snippet loops over `sentences` and, for each sentence, extracts the sentiment score. 
@@ -99,7 +99,7 @@
 {{< /highlight >}}
 
 {{< highlight java >}}
-// Java examples will be available on Beam 2.23 release.
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" NlpExtractEntities >}}
 {{< /highlight >}}
 
 Entities can be found in `entities` attribute. Just like before, `entities` is a sequence, that's why list comprehension is a viable choice. The most tricky part is interpreting the types of entities. Natural Language API defines entity types as enum. In a response object, entity types are returned as integers. That's why a user has to instantiate `naturallanguageml.enums.Entity.Type` to access a human-readable name.
@@ -119,7 +119,7 @@
 {{< /highlight >}}
 
 {{< highlight java >}}
-// Java examples will be available on Beam 2.23 release.
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" NlpAnalyzeDependencyTree >}}
 {{< /highlight >}}
 
 The output is below. For better readability, indexes are replaced by text which they refer to:
diff --git a/website/www/site/content/en/documentation/sdks/python-type-safety.md b/website/www/site/content/en/documentation/sdks/python-type-safety.md
index 074460c..755f795 100644
--- a/website/www/site/content/en/documentation/sdks/python-type-safety.md
+++ b/website/www/site/content/en/documentation/sdks/python-type-safety.md
@@ -71,7 +71,7 @@
 Using Annotations has the added benefit of allowing use of a static type checker (such as mypy) to additionally type check your code.
 If you already use a type checker, using annotations instead of decorators reduces code duplication.
 However, annotations do not cover all the use cases that decorators and inline declarations do.
-Two such are the `expand` of a composite transform and lambda functions.
+For instance, they do not work for lambda functions.
 
 ### Declaring Type Hints Using Type Annotations
 
@@ -82,6 +82,7 @@
 Annotations are currently supported on:
 
  - `process()` methods on `DoFn` subclasses.
+ - `expand()` methods on `PTransform` subclasses.
  - Functions passed to: `ParDo`, `Map`, `FlatMap`, `Filter`.
 
 The following code declares an `int` input and a `str` output type hint on the `to_id` transform, using annotations on `my_fn`.
@@ -90,6 +91,15 @@
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test_py3.py" type_hints_map_annotations >}}
 {{< /highlight >}}
 
+The following code demonstrates how to use annotations on `PTransform` subclasses. 
+A valid annotation is a `PCollection` that wraps an internal (nested) type, `PBegin`, `PDone`, or `None`. 
+The following code declares typehints on a custom PTransform, that takes a `PCollection[int]` input
+and outputs a `PCollection[str]`, using annotations.
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test_py3.py" type_hints_ptransforms >}}
+{{< /highlight >}}
+
 The following code declares `int` input and output type hints on `filter_evens`, using annotations on `FilterEvensDoFn.process`.
 Since `process` returns a generator, the output type for a DoFn producing a `PCollection[int]` is annotated as `Iterable[int]` (`Generator[int, None, None]` would also work here).
 Beam will remove the outer iterable of the return type on the `DoFn.process` method and functions passed to `FlatMap` to deduce the element type of resulting PCollection .
@@ -182,6 +192,7 @@
 * `Iterable[T]`
 * `Iterator[T]`
 * `Generator[T]`
+* `PCollection[T]`
 
 **Note:** The `Tuple[T, U]` type hint is a tuple with a fixed number of heterogeneously typed elements, while the `Tuple[T, ...]` type hint is a tuple with a variable of homogeneously typed elements.
 
diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/sum.md b/website/www/site/content/en/documentation/transforms/python/aggregation/sum.md
index 8d49f5b..a9aef38 100644
--- a/website/www/site/content/en/documentation/transforms/python/aggregation/sum.md
+++ b/website/www/site/content/en/documentation/transforms/python/aggregation/sum.md
@@ -17,7 +17,61 @@
 
 # Sum
 
-## Examples
-See [BEAM-7390](https://issues.apache.org/jira/browse/BEAM-7390) for updates. 
+{{< localstorage language language-py >}}
 
-## Related transforms
\ No newline at end of file
+{{< button-pydoc path="apache_beam.transforms.core" class="CombineGlobally" >}}
+
+Sums all the elements within each aggregation.
+
+## Examples
+
+In the following example, we create a pipeline with a `PCollection`.
+Then, we get the sum of all the element values in different ways.
+
+### Example 1: Sum of the elements in a PCollection
+
+We use `Combine.Globally()` to get sum of all the element values from the *entire* `PCollection`.
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sum.py" sum_globally >}}
+{{< /highlight >}}
+
+{{< paragraph class="notebook-skip" >}}
+Output:
+{{< /paragraph >}}
+
+{{< highlight class="notebook-skip" >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sum_test.py" total >}}
+{{< /highlight >}}
+
+{{< buttons-code-snippet
+  py="sdks/python/apache_beam/examples/snippets/transforms/aggregation/sum.py" >}}
+
+### Example 2: Sum of the elements for each key
+
+We use `Combine.PerKey()` to get the sum of all the element values for each unique key in a `PCollection` of key-values.
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sum.py" sum_per_key >}}
+{{< /highlight >}}
+
+{{< paragraph class="notebook-skip" >}}
+Output:
+{{< /paragraph >}}
+
+{{< highlight class="notebook-skip" >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sum_test.py" totals_per_key >}}
+{{< /highlight >}}
+
+{{< buttons-code-snippet
+  py="sdks/python/apache_beam/examples/snippets/transforms/aggregation/sum.py" >}}
+
+## Related transforms
+
+* [CombineGlobally](/documentation/transforms/python/aggregation/combineglobally)
+* [CombinePerKey](/documentation/transforms/python/aggregation/combineperkey)
+* [Max](/documentation/transforms/python/aggregation/max)
+* [Mean](/documentation/transforms/python/aggregation/mean)
+* [Min](/documentation/transforms/python/aggregation/min)
+
+{{< button-pydoc path="apache_beam.transforms.core" class="CombineGlobally" >}}
diff --git a/website/www/site/content/en/documentation/transforms/python/overview.md b/website/www/site/content/en/documentation/transforms/python/overview.md
index d648ac7..059abcb 100644
--- a/website/www/site/content/en/documentation/transforms/python/overview.md
+++ b/website/www/site/content/en/documentation/transforms/python/overview.md
@@ -60,7 +60,7 @@
   <tr><td><a href="/documentation/transforms/python/aggregation/mean">Mean</a></td><td>Computes the average within each aggregation.</td></tr>
   <tr><td><a href="/documentation/transforms/python/aggregation/min">Min</a></td><td>Gets the element with the minimum value within each aggregation.</td></tr>
   <tr><td><a href="/documentation/transforms/python/aggregation/sample">Sample</a></td><td>Randomly select some number of elements from each aggregation.</td></tr>
-  <tr><td>Sum</td><td>Not available.</td></tr>
+  <tr><td><a href="/documentation/transforms/python/aggregation/sum">Sum</a></td><td>Sums all the elements within each aggregation.</td></tr>
   <tr><td><a href="/documentation/transforms/python/aggregation/top">Top</a></td><td>Compute the largest element(s) in each aggregation.</td></tr>
 </table>
 
diff --git a/website/www/site/data/io_matrix.yaml b/website/www/site/data/io_matrix.yaml
index 5923a86..84637c6 100644
--- a/website/www/site/data/io_matrix.yaml
+++ b/website/www/site/data/io_matrix.yaml
@@ -305,6 +305,7 @@
             url: https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.datastore.v1new.datastoreio.html
       - transform: SnowflakeIO
         description: Experimental Transforms for reading from and writing to [Snowflake](https://www.snowflake.com/).
+        docs: /documentation/io/built-in/snowflake
         implementations:
           - language: java
             name: org.apache.beam.sdk.io.snowflake.SnowflakeIO
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index c8d0da9..f715f0c 100644
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -74,6 +74,7 @@
             <li><a href="/documentation/io/built-in/hadoop/">Hadoop Input/Output Format IO</a></li>
             <li><a href="/documentation/io/built-in/hcatalog/">HCatalog IO</a></li>
             <li><a href="/documentation/io/built-in/google-bigquery/">Google BigQuery I/O connector</a></li>
+            <li><a href="/documentation/io/built-in/snowflake/">Snowflake I/O connector</a></li>
           </ul>
        </li>
 
@@ -204,6 +205,7 @@
           <li><a href="/documentation/transforms/python/aggregation/mean/">Mean</a></li>
           <li><a href="/documentation/transforms/python/aggregation/min/">Min</a></li>
           <li><a href="/documentation/transforms/python/aggregation/sample/">Sample</a></li>
+          <li><a href="/documentation/transforms/python/aggregation/sum/">Sum</a></li>
           <li><a href="/documentation/transforms/python/aggregation/top/">Top</a></li>
         </ul>
       </li>