Merge pull request #10676 from Hannah-Jiang/docker_tag_patch

[BEAM-9084] cleaning up docker image tag
diff --git a/.test-infra/jenkins/CommonTestProperties.groovy b/.test-infra/jenkins/CommonTestProperties.groovy
index 39d6695..74a8bd0 100644
--- a/.test-infra/jenkins/CommonTestProperties.groovy
+++ b/.test-infra/jenkins/CommonTestProperties.groovy
@@ -29,7 +29,7 @@
         DATAFLOW("DataflowRunner"),
         SPARK("SparkRunner"),
         SPARK_STRUCTURED_STREAMING("SparkStructuredStreamingRunner"),
-        FLINK("TestFlinkRunner"),
+        FLINK("FlinkRunner"),
         DIRECT("DirectRunner"),
         PORTABLE("PortableRunner")
 
diff --git a/.test-infra/jenkins/job_PostCommit_Java_Nexmark_Flink.groovy b/.test-infra/jenkins/job_PostCommit_Java_Nexmark_Flink.groovy
index cbcd0ba..30cf5ee 100644
--- a/.test-infra/jenkins/job_PostCommit_Java_Nexmark_Flink.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Java_Nexmark_Flink.groovy
@@ -43,6 +43,8 @@
       switches('-Pnexmark.runner=":runners:flink:1.9"' +
               ' -Pnexmark.args="' +
               [NexmarkBigqueryProperties.nexmarkBigQueryArgs,
+              '--runner=FlinkRunner',
+              '--shutdownSourcesOnFinalWatermark=true',
               '--streaming=false',
               '--suite=SMOKE',
               '--streamTimeout=60' ,
@@ -58,6 +60,8 @@
       switches('-Pnexmark.runner=":runners:flink:1.9"' +
               ' -Pnexmark.args="' +
               [NexmarkBigqueryProperties.nexmarkBigQueryArgs,
+              '--runner=FlinkRunner',
+              '--shutdownSourcesOnFinalWatermark=true',
               '--streaming=true',
               '--suite=SMOKE',
               '--streamTimeout=60' ,
@@ -73,6 +77,8 @@
       switches('-Pnexmark.runner=":runners:flink:1.9"' +
               ' -Pnexmark.args="' +
               [NexmarkBigqueryProperties.nexmarkBigQueryArgs,
+              '--runner=FlinkRunner',
+              '--shutdownSourcesOnFinalWatermark=true',
               '--queryLanguage=sql',
               '--streaming=false',
               '--suite=SMOKE',
@@ -88,6 +94,8 @@
       switches('-Pnexmark.runner=":runners:flink:1.9"' +
               ' -Pnexmark.args="' +
               [NexmarkBigqueryProperties.nexmarkBigQueryArgs,
+              '--runner=FlinkRunner',
+              '--shutdownSourcesOnFinalWatermark=true',
               '--queryLanguage=sql',
               '--streaming=true',
               '--suite=SMOKE',
@@ -107,8 +115,9 @@
   commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 240)
 
   def final JOB_SPECIFIC_OPTIONS = [
-          'suite'        : 'SMOKE',
-          'streamTimeout': 60,
+          'suite' : 'SMOKE',
+          'streamTimeout' : 60,
+          'shutdownSourcesOnFinalWatermark' : true,
   ]
 
   Nexmark.standardJob(delegate, Runner.FLINK, SDK.JAVA, JOB_SPECIFIC_OPTIONS, TriggeringContext.PR)
diff --git a/learning/katas/python/Common Transforms/Aggregation/Count/tests.py b/learning/katas/python/Common Transforms/Aggregation/Count/tests.py
index a63ecd4..ccc96cb 100644
--- a/learning/katas/python/Common Transforms/Aggregation/Count/tests.py
+++ b/learning/katas/python/Common Transforms/Aggregation/Count/tests.py
@@ -14,7 +14,8 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_file_output
+from test_helper import failed, passed, get_file_output, \
+    test_is_not_empty, test_answer_placeholders_text_deleted
 
 
 def test_output():
@@ -29,5 +30,6 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_output()
diff --git a/learning/katas/python/Common Transforms/Aggregation/Largest/tests.py b/learning/katas/python/Common Transforms/Aggregation/Largest/tests.py
index 1a89b9d..0fced6e 100644
--- a/learning/katas/python/Common Transforms/Aggregation/Largest/tests.py
+++ b/learning/katas/python/Common Transforms/Aggregation/Largest/tests.py
@@ -14,7 +14,8 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_file_output
+from test_helper import failed, passed, get_file_output, \
+    test_is_not_empty, test_answer_placeholders_text_deleted
 
 
 def test_output():
@@ -29,5 +30,6 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_output()
diff --git a/learning/katas/python/Common Transforms/Aggregation/Mean/tests.py b/learning/katas/python/Common Transforms/Aggregation/Mean/tests.py
index cba3205..9e99d39 100644
--- a/learning/katas/python/Common Transforms/Aggregation/Mean/tests.py
+++ b/learning/katas/python/Common Transforms/Aggregation/Mean/tests.py
@@ -14,7 +14,8 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_file_output
+from test_helper import failed, passed, get_file_output, \
+    test_is_not_empty, test_answer_placeholders_text_deleted
 
 
 def test_output():
@@ -29,5 +30,6 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_output()
diff --git a/learning/katas/python/Common Transforms/Aggregation/Smallest/tests.py b/learning/katas/python/Common Transforms/Aggregation/Smallest/tests.py
index 90d1223..5bfa80c 100644
--- a/learning/katas/python/Common Transforms/Aggregation/Smallest/tests.py
+++ b/learning/katas/python/Common Transforms/Aggregation/Smallest/tests.py
@@ -14,7 +14,8 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_file_output
+from test_helper import failed, passed, get_file_output, \
+    test_answer_placeholders_text_deleted, test_is_not_empty
 
 
 def test_output():
@@ -29,5 +30,6 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_output()
diff --git a/learning/katas/python/Common Transforms/Aggregation/Sum/tests.py b/learning/katas/python/Common Transforms/Aggregation/Sum/tests.py
index 9756b6d..e761cdf 100644
--- a/learning/katas/python/Common Transforms/Aggregation/Sum/tests.py
+++ b/learning/katas/python/Common Transforms/Aggregation/Sum/tests.py
@@ -14,7 +14,8 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_file_output
+from test_helper import failed, passed, get_file_output, \
+    test_is_not_empty, test_answer_placeholders_text_deleted
 
 
 def test_output():
@@ -29,5 +30,6 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_output()
diff --git a/learning/katas/python/Common Transforms/Filter/Filter/tests.py b/learning/katas/python/Common Transforms/Filter/Filter/tests.py
index 40e7837..03487d2 100644
--- a/learning/katas/python/Common Transforms/Filter/Filter/tests.py
+++ b/learning/katas/python/Common Transforms/Filter/Filter/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_filter():
@@ -39,6 +41,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_filter()
     test_output()
diff --git a/learning/katas/python/Common Transforms/Filter/ParDo/tests.py b/learning/katas/python/Common Transforms/Filter/ParDo/tests.py
index a91e9bc..b1d475b 100644
--- a/learning/katas/python/Common Transforms/Filter/ParDo/tests.py
+++ b/learning/katas/python/Common Transforms/Filter/ParDo/tests.py
@@ -14,7 +14,8 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_file_output
+from test_helper import failed, passed, get_file_output, \
+    test_is_not_empty, test_answer_placeholders_text_deleted
 
 
 def test_output():
@@ -29,5 +30,6 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Branching/Branching/tests.py b/learning/katas/python/Core Transforms/Branching/Branching/tests.py
index 8278966..de1fea6 100644
--- a/learning/katas/python/Core Transforms/Branching/Branching/tests.py
+++ b/learning/katas/python/Core Transforms/Branching/Branching/tests.py
@@ -14,7 +14,8 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_file_output
+from test_helper import failed, passed, get_file_output, \
+    test_is_not_empty, test_answer_placeholders_text_deleted
 
 
 def test_output():
@@ -39,5 +40,6 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_output()
diff --git a/learning/katas/python/Core Transforms/CoGroupByKey/CoGroupByKey/tests.py b/learning/katas/python/Core Transforms/CoGroupByKey/CoGroupByKey/tests.py
index 59c8515..da12782 100644
--- a/learning/katas/python/Core Transforms/CoGroupByKey/CoGroupByKey/tests.py
+++ b/learning/katas/python/Core Transforms/CoGroupByKey/CoGroupByKey/tests.py
@@ -14,7 +14,8 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_file_output
+from test_helper import failed, passed, get_file_output, \
+    test_is_not_empty, test_answer_placeholders_text_deleted
 
 
 def test_output():
@@ -33,5 +34,6 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Combine/Combine PerKey/tests.py b/learning/katas/python/Core Transforms/Combine/Combine PerKey/tests.py
index 7890e0a..e804283 100644
--- a/learning/katas/python/Core Transforms/Combine/Combine PerKey/tests.py
+++ b/learning/katas/python/Core Transforms/Combine/Combine PerKey/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_combine_placeholders():
@@ -35,7 +37,7 @@
     PLAYER_3 = 'Player 3'
 
     answers = [str((PLAYER_1, 115)), str((PLAYER_2, 85)), str((PLAYER_3, 25))]
-    print answers
+    print(answers)
 
     if all(num in output for num in answers):
         passed()
@@ -44,6 +46,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_combine_placeholders()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Combine/CombineFn/tests.py b/learning/katas/python/Core Transforms/Combine/CombineFn/tests.py
index 6e8ff93..656e5b3 100644
--- a/learning/katas/python/Core Transforms/Combine/CombineFn/tests.py
+++ b/learning/katas/python/Core Transforms/Combine/CombineFn/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_combine_placeholders():
@@ -39,6 +41,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_combine_placeholders()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Combine/Simple Function/tests.py b/learning/katas/python/Core Transforms/Combine/Simple Function/tests.py
index 70cb7f6..2d740d8 100644
--- a/learning/katas/python/Core Transforms/Combine/Simple Function/tests.py
+++ b/learning/katas/python/Core Transforms/Combine/Simple Function/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_combine_placeholders():
@@ -39,6 +41,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_combine_placeholders()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Composite Transform/Composite Transform/tests.py b/learning/katas/python/Core Transforms/Composite Transform/Composite Transform/tests.py
index 24477d1..cc7db80 100644
--- a/learning/katas/python/Core Transforms/Composite Transform/Composite Transform/tests.py
+++ b/learning/katas/python/Core Transforms/Composite Transform/Composite Transform/tests.py
@@ -14,7 +14,9 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_composite_expand_method():
@@ -39,6 +41,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_composite_expand_method()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Flatten/Flatten/tests.py b/learning/katas/python/Core Transforms/Flatten/Flatten/tests.py
index 285dc48..c2caa2e 100644
--- a/learning/katas/python/Core Transforms/Flatten/Flatten/tests.py
+++ b/learning/katas/python/Core Transforms/Flatten/Flatten/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_flatten():
@@ -39,6 +41,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_flatten()
     test_output()
diff --git a/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/tests.py b/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/tests.py
index afc8e36..8f9ffd5 100644
--- a/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/tests.py
+++ b/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/tests.py
@@ -14,7 +14,8 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_file_output
+from test_helper import failed, passed, get_file_output, \
+    test_is_not_empty, test_answer_placeholders_text_deleted
 
 
 def test_output():
@@ -31,5 +32,6 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Map/FlatMap/tests.py b/learning/katas/python/Core Transforms/Map/FlatMap/tests.py
index 55bd104..6eaaa64 100644
--- a/learning/katas/python/Core Transforms/Map/FlatMap/tests.py
+++ b/learning/katas/python/Core Transforms/Map/FlatMap/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_flatmap():
@@ -39,6 +41,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_flatmap()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Map/Map/tests.py b/learning/katas/python/Core Transforms/Map/Map/tests.py
index df1544f..52789ea 100644
--- a/learning/katas/python/Core Transforms/Map/Map/tests.py
+++ b/learning/katas/python/Core Transforms/Map/Map/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_map():
@@ -39,6 +41,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_map()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Map/ParDo OneToMany/tests.py b/learning/katas/python/Core Transforms/Map/ParDo OneToMany/tests.py
index 8f9fe86..b934821 100644
--- a/learning/katas/python/Core Transforms/Map/ParDo OneToMany/tests.py
+++ b/learning/katas/python/Core Transforms/Map/ParDo OneToMany/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_dofn_process_method():
@@ -49,7 +51,8 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_dofn_process_method()
     test_pardo()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Map/ParDo/tests.py b/learning/katas/python/Core Transforms/Map/ParDo/tests.py
index f6dd859..5591318 100644
--- a/learning/katas/python/Core Transforms/Map/ParDo/tests.py
+++ b/learning/katas/python/Core Transforms/Map/ParDo/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_dofn_process_method():
@@ -49,7 +51,8 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_dofn_process_method()
     test_pardo()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Partition/Partition/tests.py b/learning/katas/python/Core Transforms/Partition/Partition/tests.py
index 6c5568a..bbeeaf7 100644
--- a/learning/katas/python/Core Transforms/Partition/Partition/tests.py
+++ b/learning/katas/python/Core Transforms/Partition/Partition/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_partition():
@@ -48,6 +50,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_partition()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Side Input/Side Input/tests.py b/learning/katas/python/Core Transforms/Side Input/Side Input/tests.py
index 0d7146d..8fdd7da 100644
--- a/learning/katas/python/Core Transforms/Side Input/Side Input/tests.py
+++ b/learning/katas/python/Core Transforms/Side Input/Side Input/tests.py
@@ -22,8 +22,9 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, \
-    get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_dofn_process_method():
@@ -64,7 +65,8 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_dofn_process_method()
     test_pardo()
     test_output()
diff --git a/learning/katas/python/Core Transforms/Side Output/Side Output/tests.py b/learning/katas/python/Core Transforms/Side Output/Side Output/tests.py
index b525cae..1af8439 100644
--- a/learning/katas/python/Core Transforms/Side Output/Side Output/tests.py
+++ b/learning/katas/python/Core Transforms/Side Output/Side Output/tests.py
@@ -14,8 +14,9 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, \
-    get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_dofn_process_method():
@@ -59,7 +60,8 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_dofn_process_method()
     test_pardo()
     test_output()
diff --git a/learning/katas/python/Examples/Word Count/Word Count/task-info.yaml b/learning/katas/python/Examples/Word Count/Word Count/task-info.yaml
index 0eef4b3..435527d 100644
--- a/learning/katas/python/Examples/Word Count/Word Count/task-info.yaml
+++ b/learning/katas/python/Examples/Word Count/Word Count/task-info.yaml
@@ -23,7 +23,7 @@
   visible: true
   placeholders:
   - offset: 1021
-    length: 133
+    length: 136
     placeholder_text: TODO()
 - name: tests.py
   visible: false
diff --git a/learning/katas/python/Examples/Word Count/Word Count/task.html b/learning/katas/python/Examples/Word Count/Word Count/task.html
index a963aab..82ce81c 100644
--- a/learning/katas/python/Examples/Word Count/Word Count/task.html
+++ b/learning/katas/python/Examples/Word Count/Word Count/task.html
@@ -33,4 +33,8 @@
 <div class="hint">
   Refer to your katas above.
 </div>
+<div class="hint">
+  Use <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.MapTuple">
+  MapTuple</a> to unpack key-value pair into different function arguments.
+</div>
 </html>
diff --git a/learning/katas/python/Examples/Word Count/Word Count/task.py b/learning/katas/python/Examples/Word Count/Word Count/task.py
index dd1daeb..10b7cf8 100644
--- a/learning/katas/python/Examples/Word Count/Word Count/task.py
+++ b/learning/katas/python/Examples/Word Count/Word Count/task.py
@@ -29,7 +29,7 @@
 
    | beam.FlatMap(lambda sentence: sentence.split())
    | beam.combiners.Count.PerElement()
-   | beam.Map(lambda (k, v): k + ":" + str(v))
+   | beam.MapTuple(lambda k, v: k + ":" + str(v))
 
    | LogElements())
 
diff --git a/learning/katas/python/Examples/Word Count/Word Count/tests.py b/learning/katas/python/Examples/Word Count/Word Count/tests.py
index 2f0a085..16b7bf5 100644
--- a/learning/katas/python/Examples/Word Count/Word Count/tests.py
+++ b/learning/katas/python/Examples/Word Count/Word Count/tests.py
@@ -14,7 +14,8 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_file_output
+from test_helper import failed, passed, get_file_output, \
+    test_is_not_empty, test_answer_placeholders_text_deleted
 
 
 def test_output():
@@ -35,5 +36,6 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_output()
diff --git a/learning/katas/python/IO/Built-in IOs/Built-in IOs/task-info.yaml b/learning/katas/python/IO/Built-in IOs/Built-in IOs/task-info.yaml
index 45ce4ef..c164ed0 100644
--- a/learning/katas/python/IO/Built-in IOs/Built-in IOs/task-info.yaml
+++ b/learning/katas/python/IO/Built-in IOs/Built-in IOs/task-info.yaml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-type: edu
+type: theory
 files:
 - name: task.py
   visible: true
diff --git a/learning/katas/python/IO/Built-in IOs/Built-in IOs/task.html b/learning/katas/python/IO/Built-in IOs/Built-in IOs/task.html
index 7d6cc8d..ef1b208 100644
--- a/learning/katas/python/IO/Built-in IOs/Built-in IOs/task.html
+++ b/learning/katas/python/IO/Built-in IOs/Built-in IOs/task.html
@@ -27,7 +27,6 @@
   Transforms</a> page for a list of the currently available I/O transforms.
 </p>
 <p>
-  <b>Note:</b> There is no kata for this task. Please click the "Check" button and
-  proceed to the next task.
+  <b>Note:</b> There is no kata for this task. Please proceed to the next task.
 </p>
 </html>
diff --git a/learning/katas/python/IO/TextIO/ReadFromText/tests.py b/learning/katas/python/IO/TextIO/ReadFromText/tests.py
index 413bda7..273aada 100644
--- a/learning/katas/python/IO/TextIO/ReadFromText/tests.py
+++ b/learning/katas/python/IO/TextIO/ReadFromText/tests.py
@@ -14,7 +14,9 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_readfromtext_method():
@@ -50,6 +52,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_readfromtext_method()
     test_output()
diff --git a/learning/katas/python/Introduction/Hello Beam/Hello Beam/tests.py b/learning/katas/python/Introduction/Hello Beam/Hello Beam/tests.py
index 3ee6516..33d45a6 100644
--- a/learning/katas/python/Introduction/Hello Beam/Hello Beam/tests.py
+++ b/learning/katas/python/Introduction/Hello Beam/Hello Beam/tests.py
@@ -14,7 +14,9 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-from test_helper import run_common_tests, failed, passed, get_answer_placeholders, get_file_output
+from test_helper import failed, passed, \
+    get_answer_placeholders, get_file_output, test_is_not_empty, \
+    test_answer_placeholders_text_deleted
 
 
 def test_answer_placeholders():
@@ -36,6 +38,7 @@
 
 
 if __name__ == '__main__':
-    run_common_tests()
+    test_is_not_empty()
+    test_answer_placeholders_text_deleted()
     test_answer_placeholders()
     test_output()
diff --git a/learning/katas/python/course-info.yaml b/learning/katas/python/course-info.yaml
index b14f13a..ede92f2 100644
--- a/learning/katas/python/course-info.yaml
+++ b/learning/katas/python/course-info.yaml
@@ -22,7 +22,7 @@
 summary: "This course provides a series of katas to get familiar with Apache Beam.\
   \ \n\nApache Beam website – https://beam.apache.org/"
 programming_language: Python
-programming_language_version: 2.7
+programming_language_version: 3.7
 content:
 - Introduction
 - Core Transforms
diff --git a/learning/katas/python/course-remote-info.yaml b/learning/katas/python/course-remote-info.yaml
index ed9c8a7..c26dd2e 100644
--- a/learning/katas/python/course-remote-info.yaml
+++ b/learning/katas/python/course-remote-info.yaml
@@ -1,2 +1,2 @@
 id: 54532
-update_date: Wed, 19 Jun 2019 10:36:17 UTC
+update_date: Mon, 20 Jan 2020 15:20:43 UTC
diff --git a/learning/katas/python/log_elements.py b/learning/katas/python/log_elements.py
index 471996e..ccff89b 100644
--- a/learning/katas/python/log_elements.py
+++ b/learning/katas/python/log_elements.py
@@ -26,7 +26,7 @@
             self.prefix = prefix
 
         def process(self, element, **kwargs):
-            print self.prefix + str(element)
+            print(self.prefix + str(element))
             yield element
 
     def __init__(self, label=None, prefix=''):
diff --git a/learning/katas/python/requirements.txt b/learning/katas/python/requirements.txt
index c53a288..3a79b42 100644
--- a/learning/katas/python/requirements.txt
+++ b/learning/katas/python/requirements.txt
@@ -14,5 +14,5 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-apache-beam==2.13.0
-apache-beam[test]==2.13.0
+apache-beam==2.17.0
+apache-beam[test]==2.17.0
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
index 090ee32..072dbc7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
@@ -47,12 +47,7 @@
 
   /** Returns a timer for the given timestamp with a user specified payload. */
   public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, timestamp, payload);
-  }
-
-  /** Returns a timer for the given timestamp with a user specified payload and outputTimestamp. */
-  public static <T> Timer<T> of(Instant timestamp, Instant outputTimestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, outputTimestamp, payload);
+    return new AutoValue_Timer(timestamp, payload);
   }
 
   /**
@@ -63,9 +58,6 @@
    */
   public abstract Instant getTimestamp();
 
-  /* Returns the outputTimestamps  */
-  public abstract Instant getOutputTimestamp();
-
   /** A user supplied payload. */
   @Nullable
   public abstract T getPayload();
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 082cb28..0b41602 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -214,7 +214,7 @@
     }
 
     OnTimerArgumentProvider argumentProvider =
-        new OnTimerArgumentProvider(timerId, window, effectiveTimestamp, timeDomain);
+        new OnTimerArgumentProvider(timerId, window, timestamp, effectiveTimestamp, timeDomain);
     invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider);
   }
 
@@ -774,6 +774,7 @@
   private class OnTimerArgumentProvider extends DoFn<InputT, OutputT>.OnTimerContext
       implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
     private final BoundedWindow window;
+    private final Instant fireTimestamp;
     private final Instant timestamp;
     private final TimeDomain timeDomain;
     private final String timerId;
@@ -796,10 +797,15 @@
     }
 
     private OnTimerArgumentProvider(
-        String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+        String timerId,
+        BoundedWindow window,
+        Instant fireTimestamp,
+        Instant timestamp,
+        TimeDomain timeDomain) {
       fn.super();
       this.timerId = timerId;
       this.window = window;
+      this.fireTimestamp = fireTimestamp;
       this.timestamp = timestamp;
       this.timeDomain = timeDomain;
     }
@@ -810,6 +816,11 @@
     }
 
     @Override
+    public Instant fireTimestamp() {
+      return fireTimestamp;
+    }
+
+    @Override
     public BoundedWindow window() {
       return window;
     }
diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle
index 1722a6a..9817358 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -199,7 +199,6 @@
     maxParallelForks 2
     useJUnit {
       includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
       excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
       excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
       excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 1f7fe00..dbefa68 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -653,7 +653,10 @@
       Instant watermarkHold = keyedStateInternals.watermarkHold();
 
       long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
-
+      if (timerInternals.getWatermarkHoldMs() < Long.MAX_VALUE) {
+        combinedWatermarkHold =
+            Math.min(combinedWatermarkHold, timerInternals.getWatermarkHoldMs());
+      }
       long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold);
 
       if (potentialOutputWatermark > currentOutputWatermark) {
@@ -811,7 +814,7 @@
     // This is a user timer, so namespace must be WindowNamespace
     checkArgument(namespace instanceof WindowNamespace);
     BoundedWindow window = ((WindowNamespace) namespace).getWindow();
-    timerInternals.cleanupPendingTimer(timer.getNamespace());
+    timerInternals.cleanupPendingTimer(timer.getNamespace(), true);
     pushbackDoFnRunner.onTimer(
         timerData.getTimerId(),
         timerData.getTimerFamilyId(),
@@ -1084,6 +1087,8 @@
      */
     final MapState<String, TimerData> pendingTimersById;
 
+    long watermarkHoldMs = Long.MAX_VALUE;
+
     private FlinkTimerInternals() {
       MapStateDescriptor<String, TimerData> pendingTimersByIdStateDescriptor =
           new MapStateDescriptor<>(
@@ -1091,6 +1096,22 @@
       this.pendingTimersById = getKeyedStateStore().getMapState(pendingTimersByIdStateDescriptor);
     }
 
+    long getWatermarkHoldMs() {
+      return watermarkHoldMs;
+    }
+
+    void updateWatermarkHold() {
+      this.watermarkHoldMs = Long.MAX_VALUE;
+      try {
+        for (TimerData timerData : pendingTimersById.values()) {
+          this.watermarkHoldMs =
+              Math.min(timerData.getOutputTimestamp().getMillis(), this.watermarkHoldMs);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Exception while reading set of timers", e);
+      }
+    }
+
     @Override
     public void setTimer(
         StateNamespace namespace,
@@ -1118,6 +1139,7 @@
         // before we set the new one.
         cancelPendingTimerById(contextTimerId);
         registerTimer(timer, contextTimerId);
+        updateWatermarkHold();
       } catch (Exception e) {
         throw new RuntimeException("Failed to set timer", e);
       }
@@ -1142,13 +1164,16 @@
     private void cancelPendingTimerById(String contextTimerId) throws Exception {
       TimerData oldTimer = pendingTimersById.get(contextTimerId);
       if (oldTimer != null) {
-        deleteTimer(oldTimer);
+        deleteTimerInternal(oldTimer, false);
       }
     }
 
-    void cleanupPendingTimer(TimerData timer) {
+    void cleanupPendingTimer(TimerData timer, boolean updateWatermark) {
       try {
         pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace()));
+        if (updateWatermark) {
+          updateWatermarkHold();
+        }
       } catch (Exception e) {
         throw new RuntimeException("Failed to cleanup state with pending timers", e);
       }
@@ -1170,16 +1195,21 @@
     public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
       try {
         cancelPendingTimerById(getContextTimerId(timerId, namespace));
+        updateWatermarkHold();
       } catch (Exception e) {
         throw new RuntimeException("Failed to cancel timer", e);
       }
     }
 
     /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
-    @Deprecated
     @Override
+    @Deprecated
     public void deleteTimer(TimerData timerKey) {
-      cleanupPendingTimer(timerKey);
+      deleteTimerInternal(timerKey, true);
+    }
+
+    void deleteTimerInternal(TimerData timerKey, boolean updateWatermark) {
+      cleanupPendingTimer(timerKey, true);
       long time = timerKey.getTimestamp().getMillis();
       switch (timerKey.getDomain()) {
         case EVENT_TIME:
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 23a2a4b..3af14ca 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -679,7 +679,7 @@
       WindowedValue<KV<Object, Timer>> timerValue =
           WindowedValue.of(
               KV.of(timerKey, Timer.of(timestamp, new byte[0])),
-              timestamp,
+              outputTimestamp,
               Collections.singleton(window),
               PaneInfo.NO_FIRING);
       try {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 4f8e0d7..e955c19 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -159,7 +159,7 @@
 
   @Override
   protected void fireTimer(InternalTimer<ByteBuffer, TimerInternals.TimerData> timer) {
-    timerInternals.cleanupPendingTimer(timer.getNamespace());
+    timerInternals.cleanupPendingTimer(timer.getNamespace(), true);
     if (timer.getNamespace().getDomain().equals(TimeDomain.EVENT_TIME)) {
       // ignore this, it can only be a state cleanup timers from StatefulDoFnRunner and ProcessFn
       // does its own state cleanup and should never set event-time timers.
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index e2058bb..8b4cb24 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -126,7 +126,7 @@
 
   @Override
   protected void fireTimer(InternalTimer<ByteBuffer, TimerData> timer) {
-    timerInternals.cleanupPendingTimer(timer.getNamespace());
+    timerInternals.cleanupPendingTimer(timer.getNamespace(), true);
     doFnRunner.processElement(
         WindowedValue.valueInGlobalWindow(
             KeyedWorkItems.timersWorkItem(
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index fa3f429..990ede6 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -39,7 +39,7 @@
   filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [
     'dataflow.legacy_environment_major_version' : '7',
     'dataflow.fnapi_environment_major_version' : '7',
-    'dataflow.container_version' : 'beam-master-20191226'
+    'dataflow.container_version' : 'beam-master-20200123'
   ]
 }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
index e3d2277..89b2b7d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
@@ -120,7 +120,7 @@
 
         TimerInternals timerInternals = stepContext.namespacedToUser().timerInternals();
         timerInternals.setTimer(
-            namespace, timerId, "", timer.getTimestamp(), timer.getOutputTimestamp(), timeDomain);
+            namespace, timerId, "", timer.getTimestamp(), windowedValue.getTimestamp(), timeDomain);
 
         timerIdToKey.put(timerId, windowedValue.getValue().getKey());
         timerIdToPayload.put(timerId, timer.getPayload());
@@ -144,7 +144,7 @@
               KV.of(
                   timerIdToKey.get(timerData.getTimerId()),
                   Timer.of(timerData.getTimestamp(), timerIdToPayload.get(timerData.getTimerId()))),
-              timerData.getTimestamp(),
+              timerData.getOutputTimestamp(),
               Collections.singleton(window),
               PaneInfo.NO_FIRING);
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
index 68a54d5..a8c0586 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
@@ -234,7 +234,9 @@
     long testTimerOffset = 123456;
     // Arbitrary key.
     Object timer = timerBytes("X", testTimerOffset);
-    Object windowedTimer = WindowedValue.valueInGlobalWindow(timer);
+    Object windowedTimer =
+        WindowedValue.timestampedValueInGlobalWindow(
+            timer, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(testTimerOffset));
 
     // Simulate the SDK Harness sending a timer element to the Runner Harness.
     assertTrue(timerReceiver.receive(timerOutputPCollection, windowedTimer));
@@ -349,10 +351,14 @@
     long testTimerOffset = 123456;
     // Arbitrary key.
     Object timer1 = timerBytes("X", testTimerOffset);
-    Object windowedTimer1 = WindowedValue.valueInGlobalWindow(timer1);
+    Object windowedTimer1 =
+        WindowedValue.timestampedValueInGlobalWindow(
+            timer1, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(testTimerOffset));
 
     Object timer2 = timerBytes("Y", testTimerOffset);
-    Object windowedTimer2 = WindowedValue.valueInGlobalWindow(timer2);
+    Object windowedTimer2 =
+        WindowedValue.timestampedValueInGlobalWindow(
+            timer2, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(testTimerOffset));
 
     // Simulate the SDK Harness sending a timer element to the Runner Harness.
     assertTrue(
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
index ddc379f..6776493 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
@@ -57,7 +57,6 @@
   private final List<JobMessage> messageHistory;
   private final List<Consumer<JobStateEvent>> stateObservers;
   private final List<Consumer<JobMessage>> messageObservers;
-
   private JobApi.MetricResults metrics;
   private PortablePipelineResult resultHandle;
   @Nullable private ListenableFuture<PortablePipelineResult> invocationFuture;
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index f51072b..56b8179 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -139,10 +139,11 @@
     Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
     BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
     Instant timestamp = timer.getTimestamp();
+    Instant outputTimestamp = timer.getOutputTimestamp();
     WindowedValue<KV<Object, Timer>> timerValue =
         WindowedValue.of(
             KV.of(currentTimerKey, Timer.of(timestamp, new byte[0])),
-            timestamp,
+            outputTimestamp,
             Collections.singleton(window),
             PaneInfo.NO_FIRING);
     timerConsumer.accept(timer.getTimerId(), timerValue);
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 6ba0b84..365f679 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -1016,7 +1016,9 @@
                       @TimerId("event") Timer eventTimeTimer,
                       @TimerId("processing") Timer processingTimeTimer) {
                     context.output(KV.of("main" + context.element().getKey(), ""));
-                    eventTimeTimer.set(context.timestamp().plus(1L));
+                    eventTimeTimer
+                        .withOutputTimestamp(context.timestamp())
+                        .set(context.timestamp().plus(1L));
                     processingTimeTimer.offset(Duration.millis(2L));
                     processingTimeTimer.setRelative();
                   }
@@ -1027,7 +1029,9 @@
                       @TimerId("event") Timer eventTimeTimer,
                       @TimerId("processing") Timer processingTimeTimer) {
                     context.output(KV.of("event", ""));
-                    eventTimeTimer.set(context.timestamp().plus(11L));
+                    eventTimeTimer
+                        .withOutputTimestamp(context.timestamp())
+                        .set(context.timestamp().plus(11L));
                     processingTimeTimer.offset(Duration.millis(12L));
                     processingTimeTimer.setRelative();
                   }
@@ -1038,7 +1042,9 @@
                       @TimerId("event") Timer eventTimeTimer,
                       @TimerId("processing") Timer processingTimeTimer) {
                     context.output(KV.of("processing", ""));
-                    eventTimeTimer.set(context.timestamp().plus(21L));
+                    eventTimeTimer
+                        .withOutputTimestamp(context.timestamp())
+                        .set(context.timestamp().plus(21L));
                     processingTimeTimer.offset(Duration.millis(22L));
                     processingTimeTimer.setRelative();
                   }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
index 2d83c3c..8313d73 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
@@ -74,6 +74,8 @@
         }
       }
       LOG.info("Instantiated metrics accumulator: " + instance.value());
+    } else {
+      instance.reset();
     }
   }
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
index f674ef9..e182cf0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
@@ -51,6 +51,8 @@
         }
       }
       LOG.info("Instantiated metrics accumulator: " + instance.value());
+    } else {
+      instance.reset();
     }
   }
 
diff --git a/sdks/go/examples/build.gradle b/sdks/go/examples/build.gradle
index f7299f1..c040dd5 100644
--- a/sdks/go/examples/build.gradle
+++ b/sdks/go/examples/build.gradle
@@ -69,4 +69,9 @@
     go 'build -o ./build/bin/${GOOS}_${GOARCH}/wordcount github.com/apache/beam/sdks/go/examples/wordcount'
     go 'build -o ./build/bin/${GOOS}_${GOARCH}/yatzy github.com/apache/beam/sdks/go/examples/yatzy'
   }
+
+  // Ignore spurious vet errors during check for [BEAM-8992].
+  goVet {
+    continueOnFailure = true
+  }
 }
diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go b/sdks/go/pkg/beam/core/metrics/metrics.go
index a29446b..a21f109 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics.go
@@ -49,10 +49,13 @@
 import (
 	"context"
 	"fmt"
+	"hash/fnv"
 	"sort"
 	"sync"
+	"sync/atomic"
 	"time"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	"github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 	"github.com/golang/protobuf/ptypes"
@@ -64,22 +67,57 @@
 // using metrics requires the PTransform have a context.Context
 // argument.
 
+// perBundle is a struct that retains per transform countersets.
+// TODO(lostluck): Migrate the exec package to use these to extract
+// metric data for export to runner, rather than the global store.
+type perBundle struct {
+	mu  sync.Mutex
+	css []*ptCounterSet
+}
+
+type nameHash uint64
+
+// ptCounterSet is the internal tracking struct for a single ptransform
+// in a single bundle for all counter types.
+type ptCounterSet struct {
+	// We store the user path access to the cells in metric type segregated
+	// maps. At present, caching the name hash,
+	counters      map[nameHash]*counter
+	distributions map[nameHash]*distribution
+	gauges        map[nameHash]*gauge
+}
+
 type ctxKey string
 
-const bundleKey ctxKey = "beam:bundle"
-const ptransformKey ctxKey = "beam:ptransform"
+const (
+	bundleKey     ctxKey = "beam:bundle"
+	ptransformKey ctxKey = "beam:ptransform"
+	counterSetKey ctxKey = "beam:counterset"
+)
 
 // beamCtx is a caching context for IDs necessary to place metric updates.
-//  Allocating contexts and searching for PTransformIDs for every element
+// Allocating contexts and searching for PTransformIDs for every element
 // is expensive, so we avoid it if possible.
 type beamCtx struct {
 	context.Context
 	bundleID, ptransformID string
+	bs                     *perBundle
+	cs                     *ptCounterSet
 }
 
-// Value lifts the beam contLift the keys value for faster lookups when not available.
+// Value implements context.Value for beamCtx, and lifts the
+// values for metrics keys for value for faster lookups.
 func (ctx *beamCtx) Value(key interface{}) interface{} {
 	switch key {
+	case counterSetKey:
+		if ctx.cs == nil {
+			if cs := ctx.Context.Value(key); cs != nil {
+				ctx.cs = cs.(*ptCounterSet)
+			} else {
+				return nil
+			}
+		}
+		return ctx.cs
 	case bundleKey:
 		if ctx.bundleID == "" {
 			if id := ctx.Context.Value(key); id != nil {
@@ -102,24 +140,30 @@
 	return ctx.Context.Value(key)
 }
 
+func (ctx *beamCtx) String() string {
+	return fmt.Sprintf("beamCtx[%s;%s]", ctx.bundleID, ctx.ptransformID)
+}
+
 // SetBundleID sets the id of the current Bundle.
 func SetBundleID(ctx context.Context, id string) context.Context {
 	// Checking for *beamCtx is an optimization, so we don't dig deeply
 	// for ids if not necessary.
 	if bctx, ok := ctx.(*beamCtx); ok {
-		return &beamCtx{Context: bctx.Context, bundleID: id, ptransformID: bctx.ptransformID}
+		return &beamCtx{Context: bctx.Context, bundleID: id, bs: &perBundle{}, ptransformID: bctx.ptransformID}
 	}
-	return &beamCtx{Context: ctx, bundleID: id}
+	return &beamCtx{Context: ctx, bundleID: id, bs: &perBundle{}}
 }
 
 // SetPTransformID sets the id of the current PTransform.
+// Must only be called on a context returened by SetBundleID.
 func SetPTransformID(ctx context.Context, id string) context.Context {
 	// Checking for *beamCtx is an optimization, so we don't dig deeply
 	// for ids if not necessary.
 	if bctx, ok := ctx.(*beamCtx); ok {
-		return &beamCtx{Context: bctx.Context, bundleID: bctx.bundleID, ptransformID: id}
+		return &beamCtx{Context: bctx.Context, bundleID: bctx.bundleID, bs: bctx.bs, ptransformID: id}
 	}
-	return &beamCtx{Context: ctx, ptransformID: id}
+	panic(fmt.Sprintf("SetPTransformID called before SetBundleID for %v", id))
+	return nil // never runs.
 }
 
 const (
@@ -138,9 +182,55 @@
 	return key
 }
 
+func getCounterSet(ctx context.Context) *ptCounterSet {
+	if id := ctx.Value(counterSetKey); id != nil {
+		return id.(*ptCounterSet)
+	}
+	// It's not set anywhere and wasn't hoisted, so create it.
+	if bctx, ok := ctx.(*beamCtx); ok {
+		bctx.bs.mu.Lock()
+		cs := &ptCounterSet{
+			counters:      make(map[nameHash]*counter),
+			distributions: make(map[nameHash]*distribution),
+			gauges:        make(map[nameHash]*gauge),
+		}
+		bctx.bs.css = append(bctx.bs.css, cs)
+		bctx.cs = cs
+		bctx.bs.mu.Unlock()
+		return cs
+	}
+	panic("counterSet missing, beam isn't set up properly.")
+	return nil // never runs.
+}
+
+type kind uint8
+
+const (
+	kindUnknown kind = iota
+	kindSumCounter
+	kindDistribution
+	kindGauge
+)
+
+func (t kind) String() string {
+	switch t {
+	case kindSumCounter:
+		return "Counter"
+	case kindDistribution:
+		return "Distribution"
+	case kindGauge:
+		return "Gauge"
+	default:
+		panic(fmt.Sprintf("Unknown metric type value: %v", uint8(t)))
+	}
+}
+
 // userMetric knows how to convert it's value to a Metrics_User proto.
+// TODO(lostluck): Move proto translation to the harness package to
+// avoid the proto dependency outside of the harness.
 type userMetric interface {
 	toProto() *fnexecution_v1.Metrics_User
+	kind() kind
 }
 
 // name is a pair of strings identifying a specific metric.
@@ -159,43 +249,87 @@
 	return name{namespace: ns, name: n}
 }
 
+// We hash the name to a uint64 so we avoid using go's native string hashing for
+// every use of a metrics. uint64s have faster lookup than strings as a result.
+// Collisions are possible, but statistically unlikely as namespaces and names
+// are usually short enough to avoid this.
+var (
+	hasherMu sync.Mutex
+	hasher   = fnv.New64a()
+)
+
+func hashName(ns, n string) nameHash {
+	hasherMu.Lock()
+	hasher.Reset()
+	var buf [64]byte
+	b := buf[:]
+	hashString(ns, b)
+	hashString(n, b)
+	h := hasher.Sum64()
+	hasherMu.Unlock()
+	return nameHash(h)
+}
+
+// hashString hashes a string with the package level hasher
+// and requires posession of the hasherMu lock. The byte
+// slice is assumed to be backed by a [64]byte.
+func hashString(s string, b []byte) {
+	l := len(s)
+	i := 0
+	for len(s)-i > 64 {
+		n := i + 64
+		copy(b, s[i:n])
+		ioutilx.WriteUnsafe(hasher, b)
+		i = n
+	}
+	n := l - i
+	copy(b, s[i:])
+	ioutilx.WriteUnsafe(hasher, b[:n])
+}
+
 type key struct {
 	name               name
 	bundle, ptransform string
 }
 
 var (
-	// mu protects access to store
+	// mu protects access to the global store
 	mu sync.RWMutex
 	// store is a map of BundleIDs to PtransformIDs to userMetrics.
 	// it permits us to extract metric protos for runners per data Bundle, and
 	// per PTransform.
+	// TODO(lostluck): Migrate exec/plan.go to manage it's own perBundle counter stores.
+	// Note: This is safe to use to read the metrics concurrently with user modification
+	// in bundles, as only initial use modifies this map, and only contains the resulting
+	// metrics.
 	store = make(map[string]map[string]map[name]userMetric)
-
-	// We store the user path access to the cells in metric type segregated
-	// sync.Maps. Using sync.Maps lets metrics with disjoint keys have concurrent
-	// access to the cells, and using separate sync.Map per metric type
-	// simplifies code understanding, since each only contains a single type of
-	// cell.
-
-	countersMu      sync.RWMutex
-	counters        = make(map[key]*counter)
-	distributionsMu sync.RWMutex
-	distributions   = make(map[key]*distribution)
-	gaugesMu        sync.RWMutex
-	gauges          = make(map[key]*gauge)
 )
 
-// TODO(lostluck): 2018/03/05 Use a common internal beam now() instead, once that exists.
-var now = time.Now
+// storeMetric stores a metric away on its first use so it may be retrieved later on.
+// In the event of a name collision, storeMetric can panic, so it's prudent to release
+// locks if they are no longer required.
+func storeMetric(key key, m userMetric) {
+	mu.Lock()
+	defer mu.Unlock()
+	if _, ok := store[key.bundle]; !ok {
+		store[key.bundle] = make(map[string]map[name]userMetric)
+	}
+	if _, ok := store[key.bundle][key.ptransform]; !ok {
+		store[key.bundle][key.ptransform] = make(map[name]userMetric)
+	}
+	if ms, ok := store[key.bundle][key.ptransform][key.name]; ok {
+		if ms.kind() != m.kind() {
+			panic(fmt.Sprintf("metric name %s being reused for a second metric in a single PTransform", key.name))
+		}
+		return
+	}
+	store[key.bundle][key.ptransform][key.name] = m
+}
 
 // Counter is a simple counter for incrementing and decrementing a value.
 type Counter struct {
 	name name
-	// The following are a fast cache of the key and storage
-	mu sync.Mutex
-	k  key
-	c  *counter
+	hash nameHash
 }
 
 func (m *Counter) String() string {
@@ -204,38 +338,26 @@
 
 // NewCounter returns the Counter with the given namespace and name.
 func NewCounter(ns, n string) *Counter {
-	mn := newName(ns, n)
 	return &Counter{
-		name: mn,
+		name: newName(ns, n),
+		hash: hashName(ns, n),
 	}
 }
 
 // Inc increments the counter within the given PTransform context by v.
 func (m *Counter) Inc(ctx context.Context, v int64) {
-	key := getContextKey(ctx, m.name)
-	cs := &counter{
-		value: v,
-	}
-	m.mu.Lock()
-	if m.k == key {
-		m.c.inc(v)
-		m.mu.Unlock()
-		return
-	}
-	m.k = key
-	countersMu.Lock()
-	if c, loaded := counters[key]; loaded {
-		m.c = c
-		countersMu.Unlock()
-		m.mu.Unlock()
+	cs := getCounterSet(ctx)
+	if c, ok := cs.counters[m.hash]; ok {
 		c.inc(v)
 		return
 	}
-	m.c = cs
-	counters[key] = cs
-	countersMu.Unlock()
-	m.mu.Unlock()
-	storeMetric(key, cs)
+	// We're the first to create this metric!
+	c := &counter{
+		value: v,
+	}
+	cs.counters[m.hash] = c
+	key := getContextKey(ctx, m.name)
+	storeMetric(key, c)
 }
 
 // Dec decrements the counter within the given PTransform context by v.
@@ -246,13 +368,10 @@
 // counter is a metric cell for counter values.
 type counter struct {
 	value int64
-	mu    sync.Mutex
 }
 
 func (m *counter) inc(v int64) {
-	m.mu.Lock()
-	m.value += v
-	m.mu.Unlock()
+	atomic.AddInt64(&m.value, v)
 }
 
 func (m *counter) String() string {
@@ -262,25 +381,23 @@
 // toProto returns a Metrics_User populated with the Data messages, but not the name. The
 // caller needs to populate with the metric's name.
 func (m *counter) toProto() *fnexecution_v1.Metrics_User {
-	m.mu.Lock()
-	defer m.mu.Unlock()
 	return &fnexecution_v1.Metrics_User{
 		Data: &fnexecution_v1.Metrics_User_CounterData_{
 			CounterData: &fnexecution_v1.Metrics_User_CounterData{
-				Value: m.value,
+				Value: atomic.LoadInt64(&m.value),
 			},
 		},
 	}
 }
 
+func (m *counter) kind() kind {
+	return kindSumCounter
+}
+
 // Distribution is a simple distribution of values.
 type Distribution struct {
 	name name
-
-	// The following are a fast cache of the key and storage
-	mu sync.Mutex
-	k  key
-	d  *distribution
+	hash nameHash
 }
 
 func (m *Distribution) String() string {
@@ -289,41 +406,29 @@
 
 // NewDistribution returns the Distribution with the given namespace and name.
 func NewDistribution(ns, n string) *Distribution {
-	mn := newName(ns, n)
 	return &Distribution{
-		name: mn,
+		name: newName(ns, n),
+		hash: hashName(ns, n),
 	}
 }
 
 // Update updates the distribution within the given PTransform context with v.
 func (m *Distribution) Update(ctx context.Context, v int64) {
-	key := getContextKey(ctx, m.name)
-	ds := &distribution{
+	cs := getCounterSet(ctx)
+	if d, ok := cs.distributions[m.hash]; ok {
+		d.update(v)
+		return
+	}
+	// We're the first to create this metric!
+	d := &distribution{
 		count: 1,
 		sum:   v,
 		min:   v,
 		max:   v,
 	}
-	m.mu.Lock()
-	if m.k == key {
-		m.d.update(v)
-		m.mu.Unlock()
-		return
-	}
-	m.k = key
-	distributionsMu.Lock()
-	if d, loaded := distributions[key]; loaded {
-		m.d = d
-		distributionsMu.Unlock()
-		m.mu.Unlock()
-		d.update(v)
-		return
-	}
-	m.d = ds
-	distributions[key] = ds
-	distributionsMu.Unlock()
-	m.mu.Unlock()
-	storeMetric(key, ds)
+	cs.distributions[m.hash] = d
+	key := getContextKey(ctx, m.name)
+	storeMetric(key, d)
 }
 
 // distribution is a metric cell for distribution values.
@@ -366,14 +471,14 @@
 	}
 }
 
+func (m *distribution) kind() kind {
+	return kindDistribution
+}
+
 // Gauge is a time, value pair metric.
 type Gauge struct {
 	name name
-
-	// The following are a fast cache of the key and storage
-	mu sync.Mutex
-	k  key
-	g  *gauge
+	hash nameHash
 }
 
 func (m *Gauge) String() string {
@@ -382,57 +487,30 @@
 
 // NewGauge returns the Gauge with the given namespace and name.
 func NewGauge(ns, n string) *Gauge {
-	mn := newName(ns, n)
 	return &Gauge{
-		name: mn,
+		name: newName(ns, n),
+		hash: hashName(ns, n),
 	}
 }
 
+// TODO(lostluck): 2018/03/05 Use a common internal beam now() instead, once that exists.
+var now = time.Now
+
 // Set sets the gauge to the given value, and associates it with the current time on the clock.
 func (m *Gauge) Set(ctx context.Context, v int64) {
-	key := getContextKey(ctx, m.name)
-	gs := &gauge{
-		t: now(),
-		v: v,
-	}
-	m.mu.Lock()
-	if m.k == key {
-		m.g.set(v)
-		m.mu.Unlock()
-		return
-	}
-	m.k = key
-	gaugesMu.Lock()
-	if g, loaded := gauges[key]; loaded {
-		m.g = g
-		gaugesMu.Unlock()
-		m.mu.Unlock()
+	cs := getCounterSet(ctx)
+	if g, ok := cs.gauges[m.hash]; ok {
 		g.set(v)
 		return
 	}
-	m.g = gs
-	gauges[key] = gs
-	gaugesMu.Unlock()
-	m.mu.Unlock()
-	storeMetric(key, gs)
-}
-
-// storeMetric stores a metric away on its first use so it may be retrieved later on.
-// In the event of a name collision, storeMetric can panic, so it's prudent to release
-// locks if they are no longer required.
-func storeMetric(key key, m userMetric) {
-	mu.Lock()
-	defer mu.Unlock()
-	if _, ok := store[key.bundle]; !ok {
-		store[key.bundle] = make(map[string]map[name]userMetric)
+	// We're the first to create this metric!
+	g := &gauge{
+		t: now(),
+		v: v,
 	}
-	if _, ok := store[key.bundle][key.ptransform]; !ok {
-		store[key.bundle][key.ptransform] = make(map[name]userMetric)
-	}
-	if _, ok := store[key.bundle][key.ptransform][key.name]; ok {
-		panic(fmt.Sprintf("metric name %s being reused for a second metric in a single PTransform", key.name))
-	}
-	store[key.bundle][key.ptransform][key.name] = m
+	cs.gauges[m.hash] = g
+	key := getContextKey(ctx, m.name)
+	storeMetric(key, g)
 }
 
 // gauge is a metric cell for gauge values.
@@ -466,8 +544,12 @@
 	}
 }
 
+func (m *gauge) kind() kind {
+	return kindGauge
+}
+
 func (m *gauge) String() string {
-	return fmt.Sprintf("time: %s value: %d", m.t, m.v)
+	return fmt.Sprintf("%v time: %s value: %d", m.kind(), m.t, m.v)
 }
 
 // ToProto exports all collected metrics for the given BundleID and PTransform ID pair.
@@ -505,23 +587,19 @@
 func dumpTo(p func(format string, args ...interface{})) {
 	mu.RLock()
 	defer mu.RUnlock()
-	countersMu.RLock()
-	defer countersMu.RUnlock()
-	distributionsMu.RLock()
-	defer distributionsMu.RUnlock()
-	gaugesMu.RLock()
-	defer gaugesMu.RUnlock()
 	var bs []string
 	for b := range store {
 		bs = append(bs, b)
 	}
 	sort.Strings(bs)
+
 	for _, b := range bs {
 		var pts []string
 		for pt := range store[b] {
 			pts = append(pts, pt)
 		}
 		sort.Strings(pts)
+
 		for _, pt := range pts {
 			var ns []name
 			for n := range store[b][pt] {
@@ -537,17 +615,10 @@
 				return false
 			})
 			p("Bundle: %q - PTransformID: %q", b, pt)
+
 			for _, n := range ns {
-				key := key{name: n, bundle: b, ptransform: pt}
-				if m, ok := counters[key]; ok {
-					p("\t%s - %s", key.name, m)
-				}
-				if m, ok := distributions[key]; ok {
-					p("\t%s - %s", key.name, m)
-				}
-				if m, ok := gauges[key]; ok {
-					p("\t%s - %s", key.name, m)
-				}
+				m := store[b][pt][n]
+				p("\t%s - %s", n, m)
 			}
 		}
 	}
@@ -558,9 +629,6 @@
 func Clear() {
 	mu.Lock()
 	store = make(map[string]map[string]map[name]userMetric)
-	counters = make(map[key]*counter)
-	distributions = make(map[key]*distribution)
-	gauges = make(map[key]*gauge)
 	mu.Unlock()
 }
 
@@ -570,21 +638,6 @@
 	// No concurrency races since mu guards all access to store,
 	// and the metric cell sync.Maps are goroutine safe.
 	mu.Lock()
-	pts := store[b]
-	countersMu.Lock()
-	distributionsMu.Lock()
-	gaugesMu.Lock()
-	for pt, m := range pts {
-		for n := range m {
-			key := key{name: n, bundle: b, ptransform: pt}
-			delete(counters, key)
-			delete(distributions, key)
-			delete(gauges, key)
-		}
-	}
-	countersMu.Unlock()
-	distributionsMu.Unlock()
-	gaugesMu.Unlock()
 	delete(store, b)
 	mu.Unlock()
 }
diff --git a/sdks/go/pkg/beam/core/metrics/metrics_test.go b/sdks/go/pkg/beam/core/metrics/metrics_test.go
index 3ef2de9..c2d62c0 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics_test.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics_test.go
@@ -25,194 +25,150 @@
 // bID is a bundleId to use in the tests, if nothing more specific is needed.
 const bID = "bID"
 
-// TestRobustness validates metrics not panicking if the context doesn't
-// have the bundle or transform ID.
-func TestRobustness(t *testing.T) {
-	m := NewCounter("Test", "myCount")
-	m.Inc(context.Background(), 3)
-	ptCtx := SetPTransformID(context.Background(), "MY_TRANSFORM")
-	m.Inc(ptCtx, 3)
-	bCtx := SetBundleID(context.Background(), bID)
-	m.Inc(bCtx, 3)
-}
-
-func TestBeamContext(t *testing.T) {
-	t.Run("ptransformID", func(t *testing.T) {
-		ptID := "MY_TRANSFORM"
-		ctx := SetPTransformID(context.Background(), ptID)
-		key := getContextKey(ctx, name{})
-		if key.bundle != bundleIDUnset {
-			t.Errorf("bundleId = %q, want %q", key.bundle, bundleIDUnset)
-		}
-		if key.ptransform != ptID {
-			t.Errorf("ptransformId = %q, want %q", key.ptransform, ptID)
-		}
-
-	})
-
-	t.Run("bundleID", func(t *testing.T) {
-		ctx := SetBundleID(context.Background(), bID)
-		key := getContextKey(ctx, name{})
-		if key.bundle != bID {
-			t.Errorf("bundleId = %q, want %q", key.bundle, bID)
-		}
-		if key.ptransform != ptransformIDUnset {
-			t.Errorf("ptransformId = %q, want %q", key.ptransform, ptransformIDUnset)
-		}
-	})
-}
-
 func ctxWith(b, pt string) context.Context {
 	ctx := context.Background()
-	ctx = SetPTransformID(ctx, pt)
 	ctx = SetBundleID(ctx, b)
+	ctx = SetPTransformID(ctx, pt)
 	return ctx
 }
 
 func TestCounter_Inc(t *testing.T) {
+	ctxA := ctxWith(bID, "A")
+	ctxB := ctxWith(bID, "B")
+
 	tests := []struct {
-		ns, n, key string // Counter name and PTransform context
-		inc        int64
-		value      int64 // Internal variable to check
+		ns, n string // Counter name
+		ctx   context.Context
+		inc   int64
+		value int64 // Internal variable to check
 	}{
-		{ns: "inc1", n: "count", key: "A", inc: 1, value: 1},
-		{ns: "inc1", n: "count", key: "A", inc: 1, value: 2},
-		{ns: "inc1", n: "ticker", key: "A", inc: 1, value: 1},
-		{ns: "inc1", n: "ticker", key: "A", inc: 2, value: 3},
-		{ns: "inc1", n: "count", key: "B", inc: 1, value: 1},
-		{ns: "inc1", n: "count", key: "B", inc: 1, value: 2},
-		{ns: "inc1", n: "ticker", key: "B", inc: 1, value: 1},
-		{ns: "inc1", n: "ticker", key: "B", inc: 2, value: 3},
-		{ns: "inc2", n: "count", key: "A", inc: 1, value: 1},
-		{ns: "inc2", n: "count", key: "A", inc: 1, value: 2},
-		{ns: "inc2", n: "ticker", key: "A", inc: 1, value: 1},
-		{ns: "inc2", n: "ticker", key: "A", inc: 2, value: 3},
-		{ns: "inc2", n: "count", key: "B", inc: 1, value: 1},
-		{ns: "inc2", n: "count", key: "B", inc: 1, value: 2},
-		{ns: "inc2", n: "ticker", key: "B", inc: 1, value: 1},
-		{ns: "inc2", n: "ticker", key: "B", inc: 2, value: 3},
+		{ns: "inc1", n: "count", ctx: ctxA, inc: 1, value: 1},
+		{ns: "inc1", n: "count", ctx: ctxA, inc: 1, value: 2},
+		{ns: "inc1", n: "ticker", ctx: ctxA, inc: 1, value: 1},
+		{ns: "inc1", n: "ticker", ctx: ctxA, inc: 2, value: 3},
+		{ns: "inc1", n: "count", ctx: ctxB, inc: 1, value: 1},
+		{ns: "inc1", n: "count", ctx: ctxB, inc: 1, value: 2},
+		{ns: "inc1", n: "ticker", ctx: ctxB, inc: 1, value: 1},
+		{ns: "inc1", n: "ticker", ctx: ctxB, inc: 2, value: 3},
+		{ns: "inc2", n: "count", ctx: ctxA, inc: 1, value: 1},
+		{ns: "inc2", n: "count", ctx: ctxA, inc: 1, value: 2},
+		{ns: "inc2", n: "ticker", ctx: ctxA, inc: 1, value: 1},
+		{ns: "inc2", n: "ticker", ctx: ctxA, inc: 2, value: 3},
+		{ns: "inc2", n: "count", ctx: ctxB, inc: 1, value: 1},
+		{ns: "inc2", n: "count", ctx: ctxB, inc: 1, value: 2},
+		{ns: "inc2", n: "ticker", ctx: ctxB, inc: 1, value: 1},
+		{ns: "inc2", n: "ticker", ctx: ctxB, inc: 2, value: 3},
 	}
 
 	for _, test := range tests {
-		t.Run(fmt.Sprintf("add %d to %s.%s[%q] value: %d", test.inc, test.ns, test.n, test.key, test.value),
+		t.Run(fmt.Sprintf("add %d to %s.%s[%v] value: %d", test.inc, test.ns, test.n, test.ctx, test.value),
 			func(t *testing.T) {
 				m := NewCounter(test.ns, test.n)
-				ctx := ctxWith(bID, test.key)
-				m.Inc(ctx, test.inc)
+				m.Inc(test.ctx, test.inc)
 
-				key := key{name: name{namespace: test.ns, name: test.n}, bundle: bID, ptransform: test.key}
-				countersMu.Lock()
-				c, ok := counters[key]
-				countersMu.Unlock()
-				if !ok {
-					t.Fatalf("Unable to find Counter for key %v", key)
-				}
+				cs := getCounterSet(test.ctx)
+				c := cs.counters[m.hash]
 				if got, want := c.value, test.value; got != want {
-					t.Errorf("GetCounter(%q,%q).Inc(%s, %d) c.value got %v, want %v", test.ns, test.n, test.key, test.inc, got, want)
+					t.Errorf("GetCounter(%q,%q).Inc(%v, %d) c.value got %v, want %v", test.ns, test.n, test.ctx, test.inc, got, want)
 				}
 			})
 	}
 }
 
 func TestCounter_Dec(t *testing.T) {
+	ctxA := ctxWith(bID, "A")
+	ctxB := ctxWith(bID, "B")
+
 	tests := []struct {
-		ns, n, key string // Counter name and PTransform context
-		dec        int64
-		value      int64 // Internal variable to check
+		ns, n string // Counter name
+		ctx   context.Context
+		dec   int64
+		value int64 // Internal variable to check
 	}{
-		{ns: "dec1", n: "count", key: "A", dec: 1, value: -1},
-		{ns: "dec1", n: "count", key: "A", dec: 1, value: -2},
-		{ns: "dec1", n: "ticker", key: "A", dec: 1, value: -1},
-		{ns: "dec1", n: "ticker", key: "A", dec: 2, value: -3},
-		{ns: "dec1", n: "count", key: "B", dec: 1, value: -1},
-		{ns: "dec1", n: "count", key: "B", dec: 1, value: -2},
-		{ns: "dec1", n: "ticker", key: "B", dec: 1, value: -1},
-		{ns: "dec1", n: "ticker", key: "B", dec: 2, value: -3},
-		{ns: "dec2", n: "count", key: "A", dec: 1, value: -1},
-		{ns: "dec2", n: "count", key: "A", dec: 1, value: -2},
-		{ns: "dec2", n: "ticker", key: "A", dec: 1, value: -1},
-		{ns: "dec2", n: "ticker", key: "A", dec: 2, value: -3},
-		{ns: "dec2", n: "count", key: "B", dec: 1, value: -1},
-		{ns: "dec2", n: "count", key: "B", dec: 1, value: -2},
-		{ns: "dec2", n: "ticker", key: "B", dec: 1, value: -1},
-		{ns: "dec2", n: "ticker", key: "B", dec: 2, value: -3},
+		{ns: "dec1", n: "count", ctx: ctxA, dec: 1, value: -1},
+		{ns: "dec1", n: "count", ctx: ctxA, dec: 1, value: -2},
+		{ns: "dec1", n: "ticker", ctx: ctxA, dec: 1, value: -1},
+		{ns: "dec1", n: "ticker", ctx: ctxA, dec: 2, value: -3},
+		{ns: "dec1", n: "count", ctx: ctxB, dec: 1, value: -1},
+		{ns: "dec1", n: "count", ctx: ctxB, dec: 1, value: -2},
+		{ns: "dec1", n: "ticker", ctx: ctxB, dec: 1, value: -1},
+		{ns: "dec1", n: "ticker", ctx: ctxB, dec: 2, value: -3},
+		{ns: "dec2", n: "count", ctx: ctxA, dec: 1, value: -1},
+		{ns: "dec2", n: "count", ctx: ctxA, dec: 1, value: -2},
+		{ns: "dec2", n: "ticker", ctx: ctxA, dec: 1, value: -1},
+		{ns: "dec2", n: "ticker", ctx: ctxA, dec: 2, value: -3},
+		{ns: "dec2", n: "count", ctx: ctxB, dec: 1, value: -1},
+		{ns: "dec2", n: "count", ctx: ctxB, dec: 1, value: -2},
+		{ns: "dec2", n: "ticker", ctx: ctxB, dec: 1, value: -1},
+		{ns: "dec2", n: "ticker", ctx: ctxB, dec: 2, value: -3},
 	}
 
 	for _, test := range tests {
-		t.Run(fmt.Sprintf("subtract %d to %s.%s[%q] value: %d", test.dec, test.ns, test.n, test.key, test.value),
+		t.Run(fmt.Sprintf("subtract %d to %s.%s[%v] value: %d", test.dec, test.ns, test.n, test.ctx, test.value),
 			func(t *testing.T) {
 				m := NewCounter(test.ns, test.n)
-				ctx := ctxWith(bID, test.key)
-				m.Dec(ctx, test.dec)
+				m.Dec(test.ctx, test.dec)
 
-				key := key{name: name{namespace: test.ns, name: test.n}, bundle: bID, ptransform: test.key}
-				countersMu.Lock()
-				c, ok := counters[key]
-				countersMu.Unlock()
-				if !ok {
-					t.Fatalf("Unable to find Counter for key %v", key)
-				}
+				cs := getCounterSet(test.ctx)
+				c := cs.counters[m.hash]
 				if got, want := c.value, test.value; got != want {
-					t.Errorf("GetCounter(%q,%q).Dec(%s, %d) c.value got %v, want %v", test.ns, test.n, test.key, test.dec, got, want)
+					t.Errorf("GetCounter(%q,%q).Dec(%v, %d) c.value got %v, want %v", test.ns, test.n, test.ctx, test.dec, got, want)
 				}
 			})
 	}
 }
 
 func TestDistribution_Update(t *testing.T) {
+	ctxA := ctxWith(bID, "A")
+	ctxB := ctxWith(bID, "B")
 	tests := []struct {
-		ns, n, key           string // Gauge name and PTransform context
+		ns, n                string // Counter name
+		ctx                  context.Context
 		v                    int64
 		count, sum, min, max int64 // Internal variables to check
 	}{
-		{ns: "update1", n: "latency", key: "A", v: 1, count: 1, sum: 1, min: 1, max: 1},
-		{ns: "update1", n: "latency", key: "A", v: 1, count: 2, sum: 2, min: 1, max: 1},
-		{ns: "update1", n: "latency", key: "A", v: 1, count: 3, sum: 3, min: 1, max: 1},
-		{ns: "update1", n: "size", key: "A", v: 1, count: 1, sum: 1, min: 1, max: 1},
-		{ns: "update1", n: "size", key: "A", v: 2, count: 2, sum: 3, min: 1, max: 2},
-		{ns: "update1", n: "size", key: "A", v: 3, count: 3, sum: 6, min: 1, max: 3},
-		{ns: "update1", n: "size", key: "A", v: -4, count: 4, sum: 2, min: -4, max: 3},
-		{ns: "update1", n: "size", key: "A", v: 1, count: 5, sum: 3, min: -4, max: 3},
-		{ns: "update1", n: "latency", key: "B", v: 1, count: 1, sum: 1, min: 1, max: 1},
-		{ns: "update1", n: "latency", key: "B", v: 1, count: 2, sum: 2, min: 1, max: 1},
-		{ns: "update1", n: "size", key: "B", v: 1, count: 1, sum: 1, min: 1, max: 1},
-		{ns: "update1", n: "size", key: "B", v: 2, count: 2, sum: 3, min: 1, max: 2},
-		{ns: "update2", n: "latency", key: "A", v: 1, count: 1, sum: 1, min: 1, max: 1},
-		{ns: "update2", n: "latency", key: "A", v: 1, count: 2, sum: 2, min: 1, max: 1},
-		{ns: "update2", n: "size", key: "A", v: 1, count: 1, sum: 1, min: 1, max: 1},
-		{ns: "update2", n: "size", key: "A", v: 2, count: 2, sum: 3, min: 1, max: 2},
-		{ns: "update2", n: "latency", key: "B", v: 1, count: 1, sum: 1, min: 1, max: 1},
-		{ns: "update2", n: "latency", key: "B", v: 1, count: 2, sum: 2, min: 1, max: 1},
-		{ns: "update2", n: "size", key: "B", v: 1, count: 1, sum: 1, min: 1, max: 1},
-		{ns: "update2", n: "size", key: "B", v: 2, count: 2, sum: 3, min: 1, max: 2},
-		{ns: "update1", n: "size", key: "A", v: 1, count: 6, sum: 4, min: -4, max: 3},
+		{ns: "update1", n: "latency", ctx: ctxA, v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update1", n: "latency", ctx: ctxA, v: 1, count: 2, sum: 2, min: 1, max: 1},
+		{ns: "update1", n: "latency", ctx: ctxA, v: 1, count: 3, sum: 3, min: 1, max: 1},
+		{ns: "update1", n: "size", ctx: ctxA, v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update1", n: "size", ctx: ctxA, v: 2, count: 2, sum: 3, min: 1, max: 2},
+		{ns: "update1", n: "size", ctx: ctxA, v: 3, count: 3, sum: 6, min: 1, max: 3},
+		{ns: "update1", n: "size", ctx: ctxA, v: -4, count: 4, sum: 2, min: -4, max: 3},
+		{ns: "update1", n: "size", ctx: ctxA, v: 1, count: 5, sum: 3, min: -4, max: 3},
+		{ns: "update1", n: "latency", ctx: ctxB, v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update1", n: "latency", ctx: ctxB, v: 1, count: 2, sum: 2, min: 1, max: 1},
+		{ns: "update1", n: "size", ctx: ctxB, v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update1", n: "size", ctx: ctxB, v: 2, count: 2, sum: 3, min: 1, max: 2},
+		{ns: "update2", n: "latency", ctx: ctxA, v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update2", n: "latency", ctx: ctxA, v: 1, count: 2, sum: 2, min: 1, max: 1},
+		{ns: "update2", n: "size", ctx: ctxA, v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update2", n: "size", ctx: ctxA, v: 2, count: 2, sum: 3, min: 1, max: 2},
+		{ns: "update2", n: "latency", ctx: ctxB, v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update2", n: "latency", ctx: ctxB, v: 1, count: 2, sum: 2, min: 1, max: 1},
+		{ns: "update2", n: "size", ctx: ctxB, v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update2", n: "size", ctx: ctxB, v: 2, count: 2, sum: 3, min: 1, max: 2},
+		{ns: "update1", n: "size", ctx: ctxA, v: 1, count: 6, sum: 4, min: -4, max: 3},
 	}
 
 	for _, test := range tests {
-		t.Run(fmt.Sprintf("add %d to %s.%s[%q] count: %d sum: %d", test.v, test.ns, test.n, test.key, test.count, test.sum),
+		t.Run(fmt.Sprintf("add %d to %s.%s[%q] count: %d sum: %d", test.v, test.ns, test.n, test.ctx, test.count, test.sum),
 			func(t *testing.T) {
 				m := NewDistribution(test.ns, test.n)
-				ctx := ctxWith(bID, test.key)
-				m.Update(ctx, test.v)
+				m.Update(test.ctx, test.v)
 
-				key := key{name: name{namespace: test.ns, name: test.n}, bundle: bID, ptransform: test.key}
-				distributionsMu.Lock()
-				d, ok := distributions[key]
-				distributionsMu.Unlock()
-				if !ok {
-					t.Fatalf("Unable to find Distribution for key %v", key)
-				}
+				cs := getCounterSet(test.ctx)
+				d := cs.distributions[m.hash]
 				if got, want := d.count, test.count; got != want {
-					t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.count got %v, want %v", test.ns, test.n, test.key, test.v, got, want)
+					t.Errorf("GetDistribution(%q,%q).Update(%v, %d) d.count got %v, want %v", test.ns, test.n, test.ctx, test.v, got, want)
 				}
 				if got, want := d.sum, test.sum; got != want {
-					t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.sum got %v, want %v", test.ns, test.n, test.key, test.v, got, want)
+					t.Errorf("GetDistribution(%q,%q).Update(%v, %d) d.sum got %v, want %v", test.ns, test.n, test.ctx, test.v, got, want)
 				}
 				if got, want := d.min, test.min; got != want {
-					t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.min got %v, want %v", test.ns, test.n, test.key, test.v, got, want)
+					t.Errorf("GetDistribution(%q,%q).Update(%v, %d) d.min got %v, want %v", test.ns, test.n, test.ctx, test.v, got, want)
 				}
 				if got, want := d.max, test.max; got != want {
-					t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.max got %v, want %v", test.ns, test.n, test.key, test.v, got, want)
+					t.Errorf("GetDistribution(%q,%q).Update(%v, %d) d.max got %v, want %v", test.ns, test.n, test.ctx, test.v, got, want)
 				}
 			})
 	}
@@ -223,74 +179,51 @@
 }
 
 func TestGauge_Set(t *testing.T) {
+	ctxA := ctxWith(bID, "A")
+	ctxB := ctxWith(bID, "B")
 	tests := []struct {
-		ns, n, key string // Gauge name and PTransform context
-		v          int64
-		t          time.Time
+		ns, n string // Counter name
+		ctx   context.Context
+		v     int64
+		t     time.Time
 	}{
-		{ns: "set1", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
-		{ns: "set1", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
-		{ns: "set1", n: "speed", key: "A", v: 1, t: time.Unix(0, 0)},
-		{ns: "set1", n: "speed", key: "A", v: 2, t: time.Unix(0, 0)},
-		{ns: "set1", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
-		{ns: "set1", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
-		{ns: "set1", n: "speed", key: "B", v: 1, t: time.Unix(0, 0)},
-		{ns: "set1", n: "speed", key: "B", v: 2, t: time.Unix(0, 0)},
-		{ns: "set2", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
-		{ns: "set2", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
-		{ns: "set2", n: "speed", key: "A", v: 1, t: time.Unix(0, 0)},
-		{ns: "set2", n: "speed", key: "A", v: 2, t: time.Unix(0, 0)},
-		{ns: "set2", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
-		{ns: "set2", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
-		{ns: "set2", n: "speed", key: "B", v: 1, t: time.Unix(0, 0)},
-		{ns: "set2", n: "speed", key: "B", v: 2, t: time.Unix(0, 0)},
+		{ns: "set1", n: "load", ctx: ctxA, v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "load", ctx: ctxA, v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "speed", ctx: ctxA, v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "speed", ctx: ctxA, v: 2, t: time.Unix(0, 0)},
+		{ns: "set1", n: "load", ctx: ctxB, v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "load", ctx: ctxB, v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "speed", ctx: ctxB, v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "speed", ctx: ctxB, v: 2, t: time.Unix(0, 0)},
+		{ns: "set2", n: "load", ctx: ctxA, v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "load", ctx: ctxA, v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "speed", ctx: ctxA, v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "speed", ctx: ctxA, v: 2, t: time.Unix(0, 0)},
+		{ns: "set2", n: "load", ctx: ctxB, v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "load", ctx: ctxB, v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "speed", ctx: ctxB, v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "speed", ctx: ctxB, v: 2, t: time.Unix(0, 0)},
 	}
 
 	for _, test := range tests {
-		t.Run(fmt.Sprintf("set %s.%s[%q] to %d at %v", test.ns, test.n, test.key, test.v, test.t),
+		t.Run(fmt.Sprintf("set %s.%s[%v] to %d at %v", test.ns, test.n, test.ctx, test.v, test.t),
 			func(t *testing.T) {
 				m := NewGauge(test.ns, test.n)
-				ctx := ctxWith(bID, test.key)
 				now = testclock(test.t)
-				m.Set(ctx, test.v)
+				m.Set(test.ctx, test.v)
 
-				key := key{name: name{namespace: test.ns, name: test.n}, bundle: bID, ptransform: test.key}
-				gaugesMu.Lock()
-				g, ok := gauges[key]
-				gaugesMu.Unlock()
-				if !ok {
-					t.Fatalf("Unable to find Gauge for key %v", key)
-				}
+				cs := getCounterSet(test.ctx)
+				g := cs.gauges[m.hash]
 				if got, want := g.v, test.v; got != want {
-					t.Errorf("GetGauge(%q,%q).Set(%s, %d) g.v got %v, want %v", test.ns, test.n, test.key, test.v, got, want)
+					t.Errorf("GetGauge(%q,%q).Set(%v, %d) g.v got %v, want %v", test.ns, test.n, test.ctx, test.v, got, want)
 				}
 				if got, want := g.t, test.t; got != want {
-					t.Errorf("GetGauge(%q,%q).Set(%s, %d) t.t got %v, want %v", test.ns, test.n, test.key, test.v, got, want)
+					t.Errorf("GetGauge(%q,%q).Set(%v, %d) t.t got %v, want %v", test.ns, test.n, test.ctx, test.v, got, want)
 				}
 			})
 	}
 }
 
-type metricType uint8
-
-const (
-	counterType metricType = iota
-	distributionType
-	gaugeType
-)
-
-func (t metricType) String() string {
-	switch t {
-	case counterType:
-		return "Counter"
-	case distributionType:
-		return "Distribution"
-	case gaugeType:
-		return "Gauge"
-	default:
-		panic(fmt.Sprintf("Unknown metric type value: %v", uint8(t)))
-	}
-}
 func TestNameCollisions(t *testing.T) {
 	ns, c, d, g := "collisions", "counter", "distribution", "gauge"
 	// Checks that user code panics if a counter attempts to be defined in the same PTransform
@@ -302,17 +235,17 @@
 	NewDistribution(ns, d).Update(ctxWith(bID, d), 1)
 	NewGauge(ns, g).Set(ctxWith(bID, g), 1)
 	tests := []struct {
-		existing, new metricType
+		existing, new kind
 	}{
-		{existing: counterType, new: counterType},
-		{existing: counterType, new: distributionType},
-		{existing: counterType, new: gaugeType},
-		{existing: distributionType, new: counterType},
-		{existing: distributionType, new: distributionType},
-		{existing: distributionType, new: gaugeType},
-		{existing: gaugeType, new: counterType},
-		{existing: gaugeType, new: distributionType},
-		{existing: gaugeType, new: gaugeType},
+		{existing: kindSumCounter, new: kindSumCounter},
+		{existing: kindSumCounter, new: kindDistribution},
+		{existing: kindSumCounter, new: kindGauge},
+		{existing: kindDistribution, new: kindSumCounter},
+		{existing: kindDistribution, new: kindDistribution},
+		{existing: kindDistribution, new: kindGauge},
+		{existing: kindGauge, new: kindSumCounter},
+		{existing: kindGauge, new: kindDistribution},
+		{existing: kindGauge, new: kindGauge},
 	}
 	for _, test := range tests {
 		t.Run(fmt.Sprintf("%s name collides with %s", test.existing, test.new),
@@ -325,26 +258,26 @@
 						}
 						t.Error("panic expected")
 					} else {
-						t.Log("reusing names is fine when the metrics the same type:", test.existing, test.new)
+						t.Log("reusing names is fine when the metric is the same type:", test.existing, test.new)
 					}
 				}()
 				var name string
 				switch test.existing {
-				case counterType:
+				case kindSumCounter:
 					name = c
-				case distributionType:
+				case kindDistribution:
 					name = d
-				case gaugeType:
+				case kindGauge:
 					name = g
 				default:
 					t.Fatalf("unknown existing metricType with value: %v", int(test.existing))
 				}
 				switch test.new {
-				case counterType:
+				case kindSumCounter:
 					NewCounter(ns, name).Inc(ctxWith(bID, name), 1)
-				case distributionType:
+				case kindDistribution:
 					NewDistribution(ns, name).Update(ctxWith(bID, name), 1)
-				case gaugeType:
+				case kindGauge:
 					NewGauge(ns, name).Set(ctxWith(bID, name), 1)
 				default:
 					t.Fatalf("unknown new metricType with value: %v", int(test.new))
@@ -398,14 +331,16 @@
 	}
 }
 
-// Run on @lostluck's desktop:
+// Run on @lostluck's desktop (2020/01/21) go1.13.4
 //
-// BenchmarkMetrics/counter_inplace-12         	 5000000	       243 ns/op	     128 B/op	       2 allocs/op
-// BenchmarkMetrics/distribution_inplace-12    	 5000000	       252 ns/op	     160 B/op	       2 allocs/op
-// BenchmarkMetrics/gauge_inplace-12           	 5000000	       266 ns/op	     160 B/op	       2 allocs/op
-// BenchmarkMetrics/counter_predeclared-12     	20000000	        74.3 ns/op	      16 B/op	       1 allocs/op
-// BenchmarkMetrics/distribution_predeclared-12         	20000000	        79.6 ns/op	      48 B/op	       1 allocs/op
-// BenchmarkMetrics/gauge_predeclared-12                	20000000	        92.9 ns/op	      48 B/op	       1 allocs/op
+// Allocs & bytes should be consistetn within go versions, but ns/op is relative to the running machine.
+//
+// BenchmarkMetrics/counter_inplace-12              4814373               243 ns/op              48 B/op          1 allocs/op
+// BenchmarkMetrics/distribution_inplace-12         4455957               273 ns/op              48 B/op          1 allocs/op
+// BenchmarkMetrics/gauge_inplace-12                4605908               265 ns/op              48 B/op          1 allocs/op
+// BenchmarkMetrics/counter_predeclared-12         75339600                15.5 ns/op             0 B/op          0 allocs/op
+// BenchmarkMetrics/distribution_predeclared-12            49202775                24.4 ns/op             0 B/op          0 allocs/op
+// BenchmarkMetrics/gauge_predeclared-12                   46614810                28.3 ns/op             0 B/op          0 allocs/op
 func BenchmarkMetrics(b *testing.B) {
 	Clear()
 	pt, c, d, g := "bench.bundle.data", "counter", "distribution", "gauge"
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index 06131b7..5d50061 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -137,7 +137,7 @@
 	switch {
 	case size >= 0:
 		// Single chunk streams are fully read in and buffered in memory.
-		var buf []FullValue
+		buf := make([]FullValue, 0, size)
 		buf, err = readStreamToBuffer(cv, r, int64(size), buf)
 		if err != nil {
 			return nil, err
@@ -156,10 +156,12 @@
 			case chunk == 0: // End of stream, return buffer.
 				return &FixedReStream{Buf: buf}, nil
 			case chunk > 0: // Non-zero chunk, read that many elements from the stream, and buffer them.
-				buf, err = readStreamToBuffer(cv, r, chunk, buf)
+				chunkBuf := make([]FullValue, 0, chunk)
+				chunkBuf, err = readStreamToBuffer(cv, r, chunk, chunkBuf)
 				if err != nil {
 					return nil, err
 				}
+				buf = append(buf, chunkBuf...)
 			case chunk == -1: // State backed iterable!
 				chunk, err := coder.DecodeVarInt(r)
 				if err != nil {
diff --git a/sdks/go/pkg/beam/metrics.go b/sdks/go/pkg/beam/metrics.go
index 67da5b1..0d4c0bb 100644
--- a/sdks/go/pkg/beam/metrics.go
+++ b/sdks/go/pkg/beam/metrics.go
@@ -21,8 +21,15 @@
 	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
 )
 
+// Implementation Note: The wrapping of the embedded methods
+// is to allow better GoDocs for the methods on the proxy types.
+
 // Counter is a metric that can be incremented and decremented,
 // and is aggregated by the sum.
+//
+// Counters are safe to use in multiple bundles simultaneously, but
+// not generally threadsafe. Your DoFn needs to manage the thread
+// safety of Beam metrics for any additional concurrency it uses.
 type Counter struct {
 	*metrics.Counter
 }
@@ -44,6 +51,10 @@
 
 // Distribution is a metric that records various statistics about the distribution
 // of reported values.
+//
+// Distributions are safe to use in multiple bundles simultaneously, but
+// not generally threadsafe. Your DoFn needs to manage the thread
+// safety of Beam metrics for any additional concurrency it uses.
 type Distribution struct {
 	*metrics.Distribution
 }
@@ -60,6 +71,10 @@
 
 // Gauge is a metric that can have its new value set, and is aggregated by taking
 // the last reported value.
+//
+// Gauge are safe to use in multiple bundles simultaneously, but
+// not generally threadsafe. Your DoFn needs to manage the thread
+// safety of Beam metrics for any additional concurrency it uses.
 type Gauge struct {
 	*metrics.Gauge
 }
diff --git a/sdks/go/pkg/beam/metrics_test.go b/sdks/go/pkg/beam/metrics_test.go
index 11b4410..abd19e0 100644
--- a/sdks/go/pkg/beam/metrics_test.go
+++ b/sdks/go/pkg/beam/metrics_test.go
@@ -29,8 +29,8 @@
 
 func ctxWithPtransformID(id string) context.Context {
 	ctx := context.Background()
-	ctx = metrics.SetPTransformID(ctx, id)
 	ctx = metrics.SetBundleID(ctx, "exampleBundle")
+	ctx = metrics.SetPTransformID(ctx, id)
 	return ctx
 }
 
@@ -69,7 +69,7 @@
 
 func Example_metricsReusable() {
 
-	// Metrics can be used in multiple DoFns
+	// Metric proxies can be used in multiple DoFns
 	c := beam.NewCounter("example.reusable", "count")
 
 	extractWordsDofn := func(ctx context.Context, line string, emit func(string)) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 53ce058..61fb365 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -279,6 +279,9 @@
     /** Returns the timestamp of the current timer. */
     public abstract Instant timestamp();
 
+    /** Returns the output timestamp of the current timer. */
+    public abstract Instant fireTimestamp();
+
     /** Returns the window in which the timer is firing. */
     public abstract BoundedWindow window();
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 51e68d3..455761f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -65,6 +65,7 @@
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.Default;
@@ -95,7 +96,6 @@
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.testing.UsesStrictTimerOrdering;
 import org.apache.beam.sdk.testing.UsesTestStream;
-import org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp;
 import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
 import org.apache.beam.sdk.testing.UsesTimerMap;
 import org.apache.beam.sdk.testing.UsesTimersInParDo;
@@ -2867,29 +2867,28 @@
     public void testEventTimeTimerBounded() throws Exception {
       final String timerId = "foo";
 
-      DoFn<KV<String, Integer>, Integer> fn =
-          new DoFn<KV<String, Integer>, Integer>() {
+      DoFn<KV<String, Long>, Long> fn =
+          new DoFn<KV<String, Long>, Long>() {
 
             @TimerId(timerId)
             private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
             @ProcessElement
-            public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) {
+            public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Long> r) {
               timer.offset(Duration.standardSeconds(1)).setRelative();
-              r.output(3);
+              r.output(3L);
             }
 
             @OnTimer(timerId)
-            public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) {
+            public void onTimer(TimeDomain timeDomain, OutputReceiver<Long> r) {
               if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-                r.output(42);
+                r.output(42L);
               }
             }
           };
 
-      PCollection<Integer> output =
-          pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
-      PAssert.that(output).containsInAnyOrder(3, 42);
+      PCollection<Long> output = pipeline.apply(Create.of(KV.of("hello", 37L))).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3L, 42L);
       pipeline.run();
     }
 
@@ -2945,8 +2944,8 @@
     public void testEventTimeTimerAlignBounded() throws Exception {
       final String timerId = "foo";
 
-      DoFn<KV<String, Integer>, KV<Integer, Instant>> fn =
-          new DoFn<KV<String, Integer>, KV<Integer, Instant>>() {
+      DoFn<KV<String, Long>, KV<Long, Instant>> fn =
+          new DoFn<KV<String, Long>, KV<Long, Instant>>() {
 
             @TimerId(timerId)
             private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@@ -2955,24 +2954,23 @@
             public void processElement(
                 @TimerId(timerId) Timer timer,
                 @Timestamp Instant timestamp,
-                OutputReceiver<KV<Integer, Instant>> r) {
+                OutputReceiver<KV<Long, Instant>> r) {
               timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative();
-              r.output(KV.of(3, timestamp));
+              r.output(KV.of(3L, timestamp));
             }
 
             @OnTimer(timerId)
-            public void onTimer(
-                @Timestamp Instant timestamp, OutputReceiver<KV<Integer, Instant>> r) {
-              r.output(KV.of(42, timestamp));
+            public void onTimer(@Timestamp Instant timestamp, OutputReceiver<KV<Long, Instant>> r) {
+              r.output(KV.of(42L, timestamp));
             }
           };
 
-      PCollection<KV<Integer, Instant>> output =
-          pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+      PCollection<KV<Long, Instant>> output =
+          pipeline.apply(Create.of(KV.of("hello", 37L))).apply(ParDo.of(fn));
       PAssert.that(output)
           .containsInAnyOrder(
-              KV.of(3, BoundedWindow.TIMESTAMP_MIN_VALUE),
-              KV.of(42, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1774)));
+              KV.of(3L, BoundedWindow.TIMESTAMP_MIN_VALUE),
+              KV.of(42L, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1774)));
       pipeline.run();
     }
 
@@ -3039,28 +3037,27 @@
     public void testEventTimeTimerAbsolute() throws Exception {
       final String timerId = "foo";
 
-      DoFn<KV<String, Integer>, Integer> fn =
-          new DoFn<KV<String, Integer>, Integer>() {
+      DoFn<KV<String, Long>, Long> fn =
+          new DoFn<KV<String, Long>, Long>() {
 
             @TimerId(timerId)
             private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
             @ProcessElement
             public void processElement(
-                @TimerId(timerId) Timer timer, BoundedWindow window, OutputReceiver<Integer> r) {
+                @TimerId(timerId) Timer timer, BoundedWindow window, OutputReceiver<Long> r) {
               timer.set(window.maxTimestamp());
-              r.output(3);
+              r.output(3L);
             }
 
             @OnTimer(timerId)
-            public void onTimer(OutputReceiver<Integer> r) {
-              r.output(42);
+            public void onTimer(OutputReceiver<Long> r) {
+              r.output(42L);
             }
           };
 
-      PCollection<Integer> output =
-          pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
-      PAssert.that(output).containsInAnyOrder(3, 42);
+      PCollection<Long> output = pipeline.apply(Create.of(KV.of("hello", 37L))).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3L, 42L);
       pipeline.run();
     }
 
@@ -3126,10 +3123,10 @@
       final String stateId = "sizzle";
 
       final int offset = 5000;
-      final int timerOutput = 4093;
+      final long timerOutput = 4093;
 
-      DoFn<KV<String, Integer>, KV<String, Integer>> fn =
-          new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+      DoFn<KV<String, Long>, KV<String, Long>> fn =
+          new DoFn<KV<String, Long>, KV<String, Long>>() {
 
             @TimerId(timerId)
             private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@@ -3152,21 +3149,21 @@
 
             @OnTimer(timerId)
             public void onTimer(
-                @StateId(stateId) ValueState<String> state, OutputReceiver<KV<String, Integer>> r) {
+                @StateId(stateId) ValueState<String> state, OutputReceiver<KV<String, Long>> r) {
               r.output(KV.of(state.read(), timerOutput));
             }
           };
 
       // Enough keys that we exercise interesting code paths
       int numKeys = 50;
-      List<KV<String, Integer>> input = new ArrayList<>();
-      List<KV<String, Integer>> expectedOutput = new ArrayList<>();
+      List<KV<String, Long>> input = new ArrayList<>();
+      List<KV<String, Long>> expectedOutput = new ArrayList<>();
 
-      for (Integer key = 0; key < numKeys; ++key) {
+      for (Long key = 0L; key < numKeys; ++key) {
         // Each key should have just one final output at GC time
         expectedOutput.add(KV.of(key.toString(), timerOutput));
 
-        for (int i = 0; i < 15; ++i) {
+        for (long i = 0; i < 15; ++i) {
           // Each input should be output with the offset added
           input.add(KV.of(key.toString(), i));
           expectedOutput.add(KV.of(key.toString(), i + offset));
@@ -3175,8 +3172,7 @@
 
       Collections.shuffle(input);
 
-      PCollection<KV<String, Integer>> output =
-          pipeline.apply(Create.of(input)).apply(ParDo.of(fn));
+      PCollection<KV<String, Long>> output = pipeline.apply(Create.of(input)).apply(ParDo.of(fn));
       PAssert.that(output).containsInAnyOrder(expectedOutput);
       pipeline.run();
     }
@@ -3666,10 +3662,10 @@
 
       PCollection<String> results =
           pipeline
-              .apply(Create.of(KV.of(0, 0)))
+              .apply(Create.of(KV.of(0L, 0L)))
               .apply(
                   ParDo.of(
-                      new DoFn<KV<Integer, Integer>, String>() {
+                      new DoFn<KV<Long, Long>, String>() {
                         @TimerId(timerId)
                         private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
@@ -3739,30 +3735,29 @@
       UsesStatefulParDo.class,
       UsesTimersInParDo.class,
       UsesTestStream.class,
-      UsesTestStreamWithOutputTimestamp.class
     })
     public void testOutputTimestamp() {
       final String timerId = "bar";
-      DoFn<KV<String, Integer>, KV<String, Integer>> fn1 =
-          new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+      DoFn<KV<String, Long>, KV<String, Long>> fn1 =
+          new DoFn<KV<String, Long>, KV<String, Long>>() {
 
             @TimerId(timerId)
             private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
             @ProcessElement
             public void processElement(
-                @TimerId(timerId) Timer timer, OutputReceiver<KV<String, Integer>> o) {
+                @TimerId(timerId) Timer timer, OutputReceiver<KV<String, Long>> o) {
               timer.withOutputTimestamp(new Instant(5)).set(new Instant(10));
               // Output a message. This will cause the next DoFn to set a timer as well.
-              o.output(KV.of("foo", 100));
+              o.output(KV.of("foo", 100L));
             }
 
             @OnTimer(timerId)
             public void onTimer(OnTimerContext c, BoundedWindow w) {}
           };
 
-      DoFn<KV<String, Integer>, Integer> fn2 =
-          new DoFn<KV<String, Integer>, Integer>() {
+      DoFn<KV<String, Long>, Long> fn2 =
+          new DoFn<KV<String, Long>, Long>() {
 
             @TimerId(timerId)
             private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@@ -3787,27 +3782,27 @@
             @OnTimer(timerId)
             public void onTimer(
                 @StateId("timerFired") ValueState<Boolean> timerFiredState,
-                OutputReceiver<Integer> o) {
+                OutputReceiver<Long> o) {
               timerFiredState.write(true);
-              o.output(100);
+              o.output(100L);
             }
           };
 
-      TestStream<KV<String, Integer>> stream =
-          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+      TestStream<KV<String, Long>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))
               .advanceWatermarkTo(new Instant(0))
               // Cause fn2 to set a timer.
-              .addElements(KV.of("key", 1))
+              .addElements(KV.of("key", 1L))
               // Normally this would case fn2's timer to expire, but it shouldn't here because of
               // the output timestamp.
               .advanceWatermarkTo(new Instant(9))
               // If the timer fired, then this would case fn2 to fail with an assertion error.
-              .addElements(KV.of("key", 1))
+              .addElements(KV.of("key", 1L))
               .advanceWatermarkToInfinity();
-      PCollection<Integer> output =
+      PCollection<Long> output =
           pipeline.apply(stream).apply("first", ParDo.of(fn1)).apply("second", ParDo.of(fn2));
 
-      PAssert.that(output).containsInAnyOrder(100); // result output
+      PAssert.that(output).containsInAnyOrder(100L); // result output
       pipeline.run();
     }
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
index c3761bb..f5cb41d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
@@ -31,6 +31,7 @@
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.NlsString;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
@@ -137,6 +138,12 @@
           throw new UnsupportedOperationException(
               String.format("Column type %s is not supported yet!", type));
       }
+    } else if (type.getTypeName().isPrimitiveType()) {
+      if (TypeName.BYTES.equals(type.getTypeName()) && rawObj instanceof ByteString) {
+        return ((ByteString) rawObj).getBytes();
+      }
+      throw new UnsupportedOperationException(
+          String.format("Column type %s is not supported yet!", type));
     }
     return rawObj;
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java
new file mode 100644
index 0000000..92b9326
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.Value;
+import com.google.datastore.v1.Value.ValueTypeCase;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@Experimental
+class DataStoreV1Table extends SchemaBaseBeamTable implements Serializable {
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  @VisibleForTesting static final String DEFAULT_KEY_FIELD = "__key__";
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataStoreV1Table.class);
+  // Should match: `projectId/kind`.
+  private static final Pattern locationPattern = Pattern.compile("(?<projectId>.+)/(?<kind>.+)");
+  @VisibleForTesting final String keyField;
+  @VisibleForTesting final String projectId;
+  @VisibleForTesting final String kind;
+
+  DataStoreV1Table(Table table) {
+    super(table.getSchema());
+
+    // TODO: allow users to specify a name of the field to store a key value via TableProperties.
+    JSONObject properties = table.getProperties();
+    if (properties.containsKey(KEY_FIELD_PROPERTY)) {
+      String field = properties.getString(KEY_FIELD_PROPERTY);
+      checkArgument(
+          field != null && !field.isEmpty(), "'%s' property cannot be null.", KEY_FIELD_PROPERTY);
+      keyField = field;
+    } else {
+      keyField = DEFAULT_KEY_FIELD;
+    }
+    // TODO: allow users to specify a namespace in a location string.
+    String location = table.getLocation();
+    checkArgument(location != null, "DataStoreV1 location must be set.");
+    Matcher matcher = locationPattern.matcher(location);
+    checkArgument(
+        matcher.matches(),
+        "DataStoreV1 location must be in the following format: 'projectId/kind'");
+
+    this.projectId = matcher.group("projectId");
+    this.kind = matcher.group("kind");
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    Query.Builder q = Query.newBuilder();
+    q.addKindBuilder().setName(kind);
+    Query query = q.build();
+
+    DatastoreV1.Read readInstance =
+        DatastoreIO.v1().read().withProjectId(projectId).withQuery(query);
+
+    return begin
+        .apply("Read Datastore Entities", readInstance)
+        .apply("Convert Datastore Entities to Rows", EntityToRow.create(getSchema(), keyField));
+  }
+
+  @Override
+  public POutput buildIOWriter(PCollection<Row> input) {
+    return input
+        .apply("Convert Rows to Datastore Entities", RowToEntity.create(keyField, kind))
+        .apply("Write Datastore Entities", DatastoreIO.v1().write().withProjectId(projectId));
+  }
+
+  @Override
+  public IsBounded isBounded() {
+    return IsBounded.BOUNDED;
+  }
+
+  @Override
+  public BeamTableStatistics getTableStatistics(PipelineOptions options) {
+    long count =
+        DatastoreIO.v1().read().withProjectId(projectId).getNumEntities(options, kind, null);
+
+    if (count < 0) {
+      return BeamTableStatistics.BOUNDED_UNKNOWN;
+    }
+
+    return BeamTableStatistics.createBoundedTableStatistics((double) count);
+  }
+
+  /**
+   * A {@code PTransform} to perform a conversion of {@code PCollection<Entity>} to {@code
+   * PCollection<Row>}.
+   */
+  public static class EntityToRow extends PTransform<PCollection<Entity>, PCollection<Row>> {
+    private final Schema schema;
+    private final String keyField;
+
+    private EntityToRow(Schema schema, String keyField) {
+      this.schema = schema;
+      this.keyField = keyField;
+
+      if (schema.getFieldNames().contains(keyField)) {
+        if (!schema.getField(keyField).getType().getTypeName().equals(TypeName.BYTES)) {
+          throw new IllegalStateException(
+              "Field `"
+                  + keyField
+                  + "` should of type `VARBINARY`. Please change the type or specify a field to store the KEY value.");
+        }
+        LOGGER.info("Entity KEY will be stored under `" + keyField + "` field.");
+      }
+    }
+
+    /**
+     * Create a PTransform instance.
+     *
+     * @param schema {@code Schema} of the target row.
+     * @param keyField A name of the row field to store the {@code Key} in.
+     * @return {@code PTransform} instance for Entity to Row conversion.
+     */
+    public static EntityToRow create(Schema schema, String keyField) {
+      return new EntityToRow(schema, keyField);
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<Entity> input) {
+      return input.apply(ParDo.of(new EntityToRowConverter())).setRowSchema(schema);
+    }
+
+    @VisibleForTesting
+    class EntityToRowConverter extends DoFn<Entity, Row> {
+
+      @DoFn.ProcessElement
+      public void processElement(ProcessContext context) {
+        Entity entity = context.element();
+        ImmutableMap.Builder<String, Value> mapBuilder = ImmutableMap.builder();
+        mapBuilder.put(keyField, makeValue(entity.getKey()).build());
+        mapBuilder.putAll(entity.getPropertiesMap());
+
+        context.output(extractRowFromProperties(schema, mapBuilder.build()));
+      }
+
+      /**
+       * Convert DataStore {@code Value} to Beam type.
+       *
+       * @param currentFieldType Beam {@code Schema.FieldType} to convert to (used for {@code Row}
+       *     and {@code Array}).
+       * @param val DataStore {@code Value}.
+       * @return resulting Beam type.
+       */
+      private Object convertValueToObject(FieldType currentFieldType, Value val) {
+        ValueTypeCase typeCase = val.getValueTypeCase();
+
+        switch (typeCase) {
+          case NULL_VALUE:
+          case VALUETYPE_NOT_SET:
+            return null;
+          case BOOLEAN_VALUE:
+            return val.getBooleanValue();
+          case INTEGER_VALUE:
+            return val.getIntegerValue();
+          case DOUBLE_VALUE:
+            return val.getDoubleValue();
+          case TIMESTAMP_VALUE:
+            com.google.protobuf.Timestamp time = val.getTimestampValue();
+            long millis = time.getSeconds() * 1000 + time.getNanos() / 1000;
+            return Instant.ofEpochMilli(millis).toDateTime();
+          case STRING_VALUE:
+            return val.getStringValue();
+          case KEY_VALUE:
+            return val.getKeyValue().toByteArray();
+          case BLOB_VALUE:
+            return val.getBlobValue().toByteArray();
+          case ENTITY_VALUE:
+            // Recursive mapping for row type.
+            Schema rowSchema = currentFieldType.getRowSchema();
+            assert rowSchema != null;
+            Entity entity = val.getEntityValue();
+            return extractRowFromProperties(rowSchema, entity.getPropertiesMap());
+          case ARRAY_VALUE:
+            // Recursive mapping for collection type.
+            FieldType elementType = currentFieldType.getCollectionElementType();
+            List<Value> valueList = val.getArrayValue().getValuesList();
+            return valueList.stream()
+                .map(v -> convertValueToObject(elementType, v))
+                .collect(Collectors.toList());
+          case GEO_POINT_VALUE:
+          default:
+            throw new IllegalStateException(
+                "No conversion exists from type: "
+                    + val.getValueTypeCase().name()
+                    + " to Beam type.");
+        }
+      }
+
+      /**
+       * Converts all properties of an {@code Entity} to Beam {@code Row}.
+       *
+       * @param schema Target row {@code Schema}.
+       * @param values A map of property names and values.
+       * @return resulting Beam {@code Row}.
+       */
+      private Row extractRowFromProperties(Schema schema, Map<String, Value> values) {
+        Row.Builder builder = Row.withSchema(schema);
+        // It is not a guarantee that the values will be in the same order as the schema.
+        // Maybe metadata:
+        // https://cloud.google.com/appengine/docs/standard/python/datastore/metadataqueries
+        // TODO: figure out in what order the elements are in (without relying on Beam schema).
+        for (Schema.Field field : schema.getFields()) {
+          Value val = values.get(field.getName());
+          builder.addValue(convertValueToObject(field.getType(), val));
+        }
+        return builder.build();
+      }
+    }
+  }
+
+  /**
+   * A {@code PTransform} to perform a conversion of {@code PCollection<Row>} to {@code
+   * PCollection<Entity>}.
+   */
+  public static class RowToEntity extends PTransform<PCollection<Row>, PCollection<Entity>> {
+    private final Supplier<String> keySupplier;
+    private final String kind;
+    private final String keyField;
+
+    private RowToEntity(Supplier<String> keySupplier, String kind, String keyField) {
+      this.keySupplier = keySupplier;
+      this.kind = kind;
+      this.keyField = keyField;
+    }
+
+    @Override
+    public PCollection<Entity> expand(PCollection<Row> input) {
+      boolean isFieldPresent = input.getSchema().getFieldNames().contains(keyField);
+      if (isFieldPresent) {
+        if (!input.getSchema().getField(keyField).getType().getTypeName().equals(TypeName.BYTES)) {
+          throw new IllegalStateException(
+              "Field `"
+                  + keyField
+                  + "` should of type `VARBINARY`. Please change the type or specify a field to write the KEY value from via TableProperties.");
+        }
+        LOGGER.info("Field to use as Entity KEY is set to: `" + keyField + "`.");
+      }
+      return input.apply(ParDo.of(new RowToEntityConverter(isFieldPresent)));
+    }
+
+    /**
+     * Create a PTransform instance.
+     *
+     * @param keyField Row field containing a serialized {@code Key}, must be set when using user
+     *     specified keys.
+     * @param kind DataStore `Kind` data will be written to (required when generating random {@code
+     *     Key}s).
+     * @return {@code PTransform} instance for Row to Entity conversion.
+     */
+    public static RowToEntity create(String keyField, String kind) {
+      return new RowToEntity(
+          (Supplier<String> & Serializable) () -> UUID.randomUUID().toString(), kind, keyField);
+    }
+
+    @VisibleForTesting
+    static RowToEntity createTest(String keyString, String keyField, String kind) {
+      return new RowToEntity((Supplier<String> & Serializable) () -> keyString, kind, keyField);
+    }
+
+    @VisibleForTesting
+    class RowToEntityConverter extends DoFn<Row, Entity> {
+      private final boolean useNonRandomKey;
+
+      RowToEntityConverter(boolean useNonRandomKey) {
+        super();
+        this.useNonRandomKey = useNonRandomKey;
+      }
+
+      @DoFn.ProcessElement
+      public void processElement(ProcessContext context) {
+        Row row = context.element();
+
+        Schema schemaWithoutKeyField =
+            Schema.builder()
+                .addFields(
+                    row.getSchema().getFields().stream()
+                        .filter(field -> !field.getName().equals(keyField))
+                        .collect(Collectors.toList()))
+                .build();
+        Entity.Builder entityBuilder = constructEntityFromRow(schemaWithoutKeyField, row);
+        entityBuilder.setKey(constructKeyFromRow(row));
+
+        context.output(entityBuilder.build());
+      }
+
+      /**
+       * Converts an entire {@code Row} to an appropriate DataStore {@code Entity.Builder}.
+       *
+       * @param row {@code Row} to convert.
+       * @return resulting {@code Entity.Builder}.
+       */
+      private Entity.Builder constructEntityFromRow(Schema schema, Row row) {
+        Entity.Builder entityBuilder = Entity.newBuilder();
+        for (Schema.Field field : schema.getFields()) {
+          Value val = mapObjectToValue(row.getValue(field.getName()));
+          entityBuilder.putProperties(field.getName(), val);
+        }
+        return entityBuilder;
+      }
+
+      /**
+       * Create a random key for a {@code Row} without a keyField or use a user-specified key by
+       * parsing it from byte array when keyField is set.
+       *
+       * @param row {@code Row} to construct a key for.
+       * @return resulting {@code Key}.
+       */
+      private Key constructKeyFromRow(Row row) {
+        if (!useNonRandomKey) {
+          // When key field is not present - use key supplier to generate a random one.
+          return makeKey(kind, keySupplier.get()).build();
+        }
+        byte[] keyBytes = row.getBytes(keyField);
+        try {
+          return Key.parseFrom(keyBytes);
+        } catch (InvalidProtocolBufferException e) {
+          throw new IllegalStateException("Failed to parse DataStore key from bytes.");
+        }
+      }
+
+      /**
+       * Converts a {@code Row} value to an appropriate DataStore {@code Value} object.
+       *
+       * @param value {@code Row} value to convert.
+       * @throws IllegalStateException when no mapping function for object of given type exists.
+       * @return resulting {@code Value}.
+       */
+      private Value mapObjectToValue(Object value) {
+        if (value == null) {
+          return Value.newBuilder().build();
+        }
+
+        if (Boolean.class.equals(value.getClass())) {
+          return makeValue((Boolean) value).build();
+        } else if (Byte.class.equals(value.getClass())) {
+          return makeValue((Byte) value).build();
+        } else if (Long.class.equals(value.getClass())) {
+          return makeValue((Long) value).build();
+        } else if (Short.class.equals(value.getClass())) {
+          return makeValue((Short) value).build();
+        } else if (Integer.class.equals(value.getClass())) {
+          return makeValue((Integer) value).build();
+        } else if (Double.class.equals(value.getClass())) {
+          return makeValue((Double) value).build();
+        } else if (Float.class.equals(value.getClass())) {
+          return makeValue((Float) value).build();
+        } else if (String.class.equals(value.getClass())) {
+          return makeValue((String) value).build();
+        } else if (Instant.class.equals(value.getClass())) {
+          return makeValue(((Instant) value).toDate()).build();
+        } else if (byte[].class.equals(value.getClass())) {
+          return makeValue(ByteString.copyFrom((byte[]) value)).build();
+        } else if (value instanceof Row) {
+          // Recursive conversion to handle nested rows.
+          Row row = (Row) value;
+          return makeValue(constructEntityFromRow(row.getSchema(), row)).build();
+        } else if (value instanceof Collection) {
+          // Recursive to handle nested collections.
+          Collection<Object> collection = (Collection<Object>) value;
+          List<Value> arrayValues =
+              collection.stream().map(this::mapObjectToValue).collect(Collectors.toList());
+          return makeValue(arrayValues).build();
+        }
+        throw new IllegalStateException(
+            "No conversion exists from type: " + value.getClass() + " to DataStove Value.");
+      }
+    }
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
new file mode 100644
index 0000000..9ecf668
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+
+/**
+ * {@link TableProvider} for {@link DataStoreV1Table}.
+ *
+ * <p>A sample of DataStoreV1Table table is:
+ *
+ * <pre>{@code
+ * CREATE TABLE ORDERS(
+ *   name VARCHAR,
+ *   favorite_color VARCHAR,
+ *   favorite_numbers ARRAY<INTEGER>
+ * )
+ * TYPE 'datastoreV1'
+ * LOCATION 'projectId/kind'
+ * }</pre>
+ */
+@AutoService(TableProvider.class)
+public class DataStoreV1TableProvider extends InMemoryMetaTableProvider {
+
+  @Override
+  public String getTableType() {
+    return "datastoreV1";
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table table) {
+    return new DataStoreV1Table(table);
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/package-info.java
new file mode 100644
index 0000000..5c743e7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Table schema for DataStore. */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
new file mode 100644
index 0000000..3274846
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARBINARY;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.PartitionId;
+import com.google.datastore.v1.PropertyFilter.Operator;
+import com.google.datastore.v1.Query;
+import java.util.UUID;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow;
+import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreReadWriteIT {
+  private static final BigQueryOptions options =
+      TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+  private static final Schema SOURCE_SCHEMA =
+      Schema.builder()
+          .addNullableField("__key__", VARBINARY)
+          .addNullableField("content", STRING)
+          .build();
+  private static final Schema SOURCE_SCHEMA_WITHOUT_KEY =
+      Schema.builder().addNullableField("content", STRING).build();
+  private static final String KIND = "writereadtest";
+  private static final String KIND_ALL_TYPES = "writereadalltypestest";
+
+  @Rule public final TestPipeline writePipeline = TestPipeline.create();
+  @Rule public transient TestPipeline readPipeline = TestPipeline.create();
+
+  @Test
+  public void testDataStoreV1SqlWriteRead() {
+    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new DataStoreV1TableProvider());
+    String projectId = options.getProject();
+
+    String createTableStatement =
+        "CREATE EXTERNAL TABLE TEST( \n"
+            + "   `__key__` VARBINARY, \n"
+            + "   `content` VARCHAR \n"
+            + ") \n"
+            + "TYPE 'datastoreV1' \n"
+            + "LOCATION '"
+            + projectId
+            + "/"
+            + KIND
+            + "'";
+    sqlEnv.executeDdl(createTableStatement);
+
+    Key ancestor = makeKey(KIND, UUID.randomUUID().toString()).build();
+    Key itemKey = makeKey(ancestor, KIND, UUID.randomUUID().toString()).build();
+    String insertStatement =
+        "INSERT INTO TEST VALUES ( \n" + keyToSqlByteString(itemKey) + ", \n" + "'2000' \n" + ")";
+
+    BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(insertStatement));
+    writePipeline.run().waitUntilFinish();
+
+    String selectTableStatement = "SELECT * FROM TEST";
+    PCollection<Row> output =
+        BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(selectTableStatement));
+
+    assertThat(output.getSchema(), equalTo(SOURCE_SCHEMA));
+
+    PipelineResult.State state = readPipeline.run().waitUntilFinish(Duration.standardMinutes(5));
+    assertThat(state, equalTo(State.DONE));
+  }
+
+  @Test
+  public void testDataStoreV1SqlWriteRead_withoutKey() {
+    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new DataStoreV1TableProvider());
+    String projectId = options.getProject();
+
+    String createTableStatement =
+        "CREATE EXTERNAL TABLE TEST( \n"
+            + "   `content` VARCHAR \n"
+            + ") \n"
+            + "TYPE 'datastoreV1' \n"
+            + "LOCATION '"
+            + projectId
+            + "/"
+            + KIND
+            + "'";
+    sqlEnv.executeDdl(createTableStatement);
+
+    String insertStatement = "INSERT INTO TEST VALUES ( '3000' )";
+
+    BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(insertStatement));
+    writePipeline.run().waitUntilFinish();
+
+    String selectTableStatement = "SELECT * FROM TEST";
+    PCollection<Row> output =
+        BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(selectTableStatement));
+
+    assertThat(output.getSchema(), equalTo(SOURCE_SCHEMA_WITHOUT_KEY));
+
+    PipelineResult.State state = readPipeline.run().waitUntilFinish(Duration.standardMinutes(5));
+    assertThat(state, equalTo(State.DONE));
+  }
+
+  @Test
+  public void testWriteRead_viaCoreBeamIO() {
+    String projectId = options.getProject();
+    Key ancestor = makeKey(KIND, UUID.randomUUID().toString()).build();
+    Key itemKey =
+        makeKey(ancestor, KIND, UUID.randomUUID().toString())
+            .setPartitionId(PartitionId.newBuilder().setProjectId(projectId).build())
+            .build();
+    Row testWriteRow =
+        Row.withSchema(SOURCE_SCHEMA).addValues(itemKey.toByteArray(), "4000").build();
+
+    writePipeline
+        .apply(Create.of(testWriteRow).withRowSchema(SOURCE_SCHEMA))
+        .apply(RowToEntity.create("__key__", KIND))
+        .apply(DatastoreIO.v1().write().withProjectId(projectId));
+    writePipeline.run().waitUntilFinish();
+
+    Query.Builder query = Query.newBuilder();
+    query.addKindBuilder().setName(KIND);
+    query.setFilter(makeFilter("__key__", Operator.EQUAL, makeValue(itemKey)));
+
+    DatastoreV1.Read read =
+        DatastoreIO.v1().read().withProjectId(projectId).withQuery(query.build());
+    PCollection<Row> rowsRead =
+        readPipeline.apply(read).apply(EntityToRow.create(SOURCE_SCHEMA, "__key__"));
+
+    PAssert.that(rowsRead).containsInAnyOrder(testWriteRow);
+    readPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testReadAllSupportedTypes() {
+    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new DataStoreV1TableProvider());
+    String projectId = options.getProject();
+
+    final Schema expectedSchema =
+        Schema.builder()
+            .addNullableField("__key__", VARBINARY)
+            .addNullableField("boolean", BOOLEAN)
+            .addNullableField("datetime", DATETIME)
+            // TODO: flattening of nested fields by Calcite causes some issues.
+            /*.addRowField("embeddedentity",
+            Schema.builder()
+                .addNullableField("property1", STRING)
+                .addNullableField("property2", INT64)
+                .build())*/
+            .addNullableField("floatingnumber", DOUBLE)
+            .addNullableField("integer", INT64)
+            .addNullableField("primitivearray", FieldType.array(STRING))
+            .addNullableField("string", STRING)
+            .addNullableField("text", STRING)
+            .build();
+
+    String createTableStatement =
+        "CREATE EXTERNAL TABLE TEST( \n"
+            + "   `__key__` VARBINARY, \n"
+            + "   `boolean` BOOLEAN, \n"
+            + "   `datetime` TIMESTAMP, \n"
+            // + "   `embeddedentity` ROW(`property1` VARCHAR, `property2` BIGINT), \n"
+            + "   `floatingnumber` DOUBLE, \n"
+            + "   `integer` BIGINT, \n"
+            + "   `primitivearray` ARRAY<VARCHAR>, \n"
+            + "   `string` VARCHAR, \n"
+            + "   `text` VARCHAR"
+            + ") \n"
+            + "TYPE 'datastoreV1' \n"
+            + "LOCATION '"
+            + projectId
+            + "/"
+            + KIND_ALL_TYPES
+            + "'";
+    sqlEnv.executeDdl(createTableStatement);
+
+    String selectTableStatement = "SELECT * FROM TEST";
+    PCollection<Row> output =
+        BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(selectTableStatement));
+
+    assertThat(output.getSchema(), equalTo(expectedSchema));
+
+    PipelineResult.State state = readPipeline.run().waitUntilFinish(Duration.standardMinutes(5));
+    assertThat(state, equalTo(State.DONE));
+  }
+
+  private static String keyToSqlByteString(Key key) {
+    return "X'" + ByteString.toString(key.toByteArray(), 16) + "'";
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java
new file mode 100644
index 0000000..879add1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
+
+import static org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.DEFAULT_KEY_FIELD;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.KEY_FIELD_PROPERTY;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.alibaba.fastjson.JSON;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreTableProviderTest {
+  private DataStoreV1TableProvider provider = new DataStoreV1TableProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.getTableType());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+    Table table = fakeTable("TEST", location);
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+    assertNotNull(sqlTable);
+    assertTrue(sqlTable instanceof DataStoreV1Table);
+
+    DataStoreV1Table datastoreTable = (DataStoreV1Table) sqlTable;
+    assertEquals("projectId", datastoreTable.projectId);
+    assertEquals("batch_kind", datastoreTable.kind);
+    assertEquals(DEFAULT_KEY_FIELD, datastoreTable.keyField);
+  }
+
+  @Test
+  public void testTableProperty() {
+    final String location = "projectId/batch_kind";
+    Table table =
+        fakeTableWithProperties("TEST", location, "{ " + KEY_FIELD_PROPERTY + ": \"field_name\" }");
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+    assertNotNull(sqlTable);
+    assertTrue(sqlTable instanceof DataStoreV1Table);
+
+    DataStoreV1Table datastoreTable = (DataStoreV1Table) sqlTable;
+    assertEquals("projectId", datastoreTable.projectId);
+    assertEquals("batch_kind", datastoreTable.kind);
+    assertEquals("field_name", datastoreTable.keyField);
+  }
+
+  @Test
+  public void testTableProperty_nullValue_throwsException() {
+    final String location = "projectId/batch_kind";
+    Table table = fakeTableWithProperties("TEST", location, "{ " + KEY_FIELD_PROPERTY + ": \"\" }");
+    assertThrows(IllegalArgumentException.class, () -> provider.buildBeamSqlTable(table));
+  }
+
+  private static Table fakeTable(String name, String location) {
+    return Table.builder()
+        .name(name)
+        .comment(name + " table")
+        .location(location)
+        .schema(
+            Stream.of(
+                    Schema.Field.nullable("id", Schema.FieldType.INT32),
+                    Schema.Field.nullable("name", Schema.FieldType.STRING))
+                .collect(toSchema()))
+        .type("datastoreV1")
+        .build();
+  }
+
+  private static Table fakeTableWithProperties(String name, String location, String properties) {
+    return Table.builder()
+        .name(name)
+        .comment(name + " table")
+        .location(location)
+        .schema(
+            Stream.of(
+                    Schema.Field.nullable("id", Schema.FieldType.INT32),
+                    Schema.Field.nullable("name", Schema.FieldType.STRING))
+                .collect(toSchema()))
+        .type("datastoreV1")
+        .properties(JSON.parseObject(properties))
+        .build();
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java
new file mode 100644
index 0000000..a682b2a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARBINARY;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.DEFAULT_KEY_FIELD;
+import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTES;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.array;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Value;
+import com.google.protobuf.ByteString;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow;
+import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreTableTest {
+  private static final String KIND = "kind";
+  private static final String UUID_VALUE = UUID.randomUUID().toString();
+  private static final Key.Builder KEY = makeKey(KIND, UUID_VALUE);
+  private static final DateTime DATE_TIME = parseTimestampWithUTCTimeZone("2018-05-28 20:17:40");
+
+  private static final Schema NESTED_ROW_SCHEMA =
+      Schema.builder().addNullableField("nestedLong", INT64).build();
+  private static final Schema SCHEMA =
+      Schema.builder()
+          .addNullableField("__key__", VARBINARY)
+          .addNullableField("long", INT64)
+          .addNullableField("bool", BOOLEAN)
+          .addNullableField("datetime", DATETIME)
+          .addNullableField("array", array(STRING))
+          .addNullableField("rowArray", array(FieldType.row(NESTED_ROW_SCHEMA)))
+          .addNullableField("double", DOUBLE)
+          .addNullableField("bytes", BYTES)
+          .addNullableField("string", CalciteUtils.CHAR)
+          .addNullableField("nullable", INT64)
+          .build();
+  private static final Entity NESTED_ENTITY =
+      Entity.newBuilder().putProperties("nestedLong", makeValue(Long.MIN_VALUE).build()).build();
+  private static final Entity ENTITY =
+      Entity.newBuilder()
+          .setKey(KEY)
+          .putProperties("long", makeValue(Long.MAX_VALUE).build())
+          .putProperties("bool", makeValue(true).build())
+          .putProperties("datetime", makeValue(DATE_TIME.toDate()).build())
+          .putProperties("array", makeValue(makeValue("string1"), makeValue("string2")).build())
+          .putProperties(
+              "rowArray",
+              makeValue(Collections.singletonList(makeValue(NESTED_ENTITY).build())).build())
+          .putProperties("double", makeValue(Double.MAX_VALUE).build())
+          .putProperties(
+              "bytes", makeValue(ByteString.copyFrom("hello", Charset.defaultCharset())).build())
+          .putProperties("string", makeValue("string").build())
+          .putProperties("nullable", Value.newBuilder().build())
+          .build();
+  private static final Row ROW =
+      row(
+          SCHEMA,
+          KEY.build().toByteArray(),
+          Long.MAX_VALUE,
+          true,
+          DATE_TIME,
+          Arrays.asList("string1", "string2"),
+          Collections.singletonList(row(NESTED_ROW_SCHEMA, Long.MIN_VALUE)),
+          Double.MAX_VALUE,
+          "hello".getBytes(Charset.defaultCharset()),
+          "string",
+          null);
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void testEntityToRowConverter() {
+    PCollection<Row> result =
+        pipeline.apply(Create.of(ENTITY)).apply(EntityToRow.create(SCHEMA, DEFAULT_KEY_FIELD));
+    PAssert.that(result).containsInAnyOrder(ROW);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testEntityToRowConverterWithoutKey() {
+    Schema schemaWithoutKey =
+        Schema.builder()
+            .addFields(
+                SCHEMA.getFields().stream()
+                    .filter(f -> !f.getName().equals("__key__"))
+                    .collect(Collectors.toList()))
+            .build();
+    Row rowWithoutKey =
+        Row.withSchema(schemaWithoutKey)
+            .addValues(
+                schemaWithoutKey.getFieldNames().stream()
+                    .map(ROW::getValue)
+                    .collect(Collectors.toList()))
+            .build();
+    PCollection<Row> result =
+        pipeline
+            .apply(Create.of(ENTITY))
+            .apply(EntityToRow.create(schemaWithoutKey, DEFAULT_KEY_FIELD));
+    PAssert.that(result).containsInAnyOrder(rowWithoutKey);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testRowToEntityConverter() {
+    PCollection<Entity> result =
+        pipeline
+            .apply(Create.of(ROW))
+            .setRowSchema(SCHEMA)
+            .apply(RowToEntity.createTest(UUID_VALUE, "__key__", KIND));
+    PAssert.that(result).containsInAnyOrder(ENTITY);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testRowToEntityConverterWithoutKey() {
+    Schema schemaWithoutKey =
+        Schema.builder()
+            .addFields(
+                SCHEMA.getFields().stream()
+                    .filter(f -> !f.getName().equals("__key__"))
+                    .collect(Collectors.toList()))
+            .build();
+    Row rowWithoutKey =
+        Row.withSchema(schemaWithoutKey)
+            .addValues(
+                schemaWithoutKey.getFieldNames().stream()
+                    .map(ROW::getValue)
+                    .collect(Collectors.toList()))
+            .build();
+    PCollection<Entity> result =
+        pipeline
+            .apply(Create.of(rowWithoutKey))
+            .setRowSchema(schemaWithoutKey)
+            .apply(RowToEntity.createTest(UUID_VALUE, "__key__", KIND));
+
+    PAssert.that(result).containsInAnyOrder(ENTITY);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static Row row(Schema schema, Object... values) {
+    return Row.withSchema(schema).addValues(values).build();
+  }
+}
diff --git a/sdks/java/extensions/sql/zetasql/build.gradle b/sdks/java/extensions/sql/zetasql/build.gradle
index 330209b..06b265e 100644
--- a/sdks/java/extensions/sql/zetasql/build.gradle
+++ b/sdks/java/extensions/sql/zetasql/build.gradle
@@ -25,7 +25,7 @@
 description = "Apache Beam :: SDKs :: Java :: Extensions :: SQL :: ZetaSQL"
 ext.summary = "ZetaSQL to Calcite translator"
 
-def zetasql_version = "2019.12.1"
+def zetasql_version = "2020.01.1"
 
 dependencies {
   compile project(":sdks:java:core")
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index f0d3ac3..fa83614 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -30,6 +30,7 @@
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import org.apache.beam.fn.harness.control.BundleSplitListener;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
@@ -805,6 +806,7 @@
     private final Instant currentTimestamp;
     private final Duration allowedLateness;
     private final WindowedValue<?> currentElementOrTimer;
+    @Nullable private Instant currentOutputTimestamp;
 
     private Duration period = Duration.ZERO;
     private Duration offset = Duration.ZERO;
@@ -897,7 +899,16 @@
 
     @Override
     public org.apache.beam.sdk.state.Timer withOutputTimestamp(Instant outputTime) {
-      throw new UnsupportedOperationException("TODO: Add support for timers");
+      Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
+      checkArgument(
+          !outputTime.isAfter(windowExpiry),
+          "Attempted to set timer with output timestamp %s but that is after"
+              + " the expiration of window %s",
+          outputTime,
+          windowExpiry);
+
+      this.currentOutputTimestamp = outputTime;
+      return this;
     }
 
     /**
@@ -919,7 +930,20 @@
       Collection<FnDataReceiver<WindowedValue<KV<Object, Timer>>>> consumers =
           (Collection) context.localNameToConsumer.get(timerId);
 
-      outputTo(consumers, currentElementOrTimer.withValue(KV.of(key, Timer.of(scheduledTime))));
+      if (currentOutputTimestamp == null) {
+        if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+          currentOutputTimestamp = scheduledTime;
+        } else {
+          currentOutputTimestamp = currentElementOrTimer.getTimestamp();
+        }
+      }
+      outputTo(
+          consumers,
+          WindowedValue.of(
+              KV.of(key, Timer.of(scheduledTime)),
+              currentOutputTimestamp,
+              currentElementOrTimer.getWindows(),
+              currentElementOrTimer.getPane()));
     }
   }
 
@@ -1320,6 +1344,11 @@
     }
 
     @Override
+    public Instant fireTimestamp() {
+      return currentTimer.getValue().getValue().getTimestamp();
+    }
+
+    @Override
     public Instant timestamp() {
       return currentTimer.getTimestamp();
     }
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 8bdce50..81fa48d 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -744,7 +744,7 @@
         @TimerId("processing") Timer processingTimeTimer) {
       context.output("main" + context.element().getKey() + Iterables.toString(bagState.read()));
       bagState.add(context.element().getValue());
-      eventTimeTimer.set(context.timestamp().plus(1L));
+      eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.timestamp().plus(1L));
       processingTimeTimer.offset(Duration.millis(2L));
       processingTimeTimer.setRelative();
     }
@@ -757,7 +757,9 @@
         @TimerId("processing") Timer processingTimeTimer) {
       context.output("event" + Iterables.toString(bagState.read()));
       bagState.add("event");
-      eventTimeTimer.set(context.timestamp().plus(11L));
+      eventTimeTimer
+          .withOutputTimestamp(context.timestamp())
+          .set(context.fireTimestamp().plus(11L));
       processingTimeTimer.offset(Duration.millis(12L));
       processingTimeTimer.setRelative();
     }
@@ -770,7 +772,7 @@
         @TimerId("processing") Timer processingTimeTimer) {
       context.output("processing" + Iterables.toString(bagState.read()));
       bagState.add("processing");
-      eventTimeTimer.set(context.timestamp().plus(21L));
+      eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.timestamp().plus(21L));
       processingTimeTimer.offset(Duration.millis(22L));
       processingTimeTimer.setRelative();
     }
@@ -929,9 +931,9 @@
             timerInGlobalWindow("Y", new Instant(1100L), new Instant(1101L)),
             timerInGlobalWindow("X", new Instant(1200L), new Instant(1201L)),
             timerInGlobalWindow("Y", new Instant(1300L), new Instant(1301L)),
-            timerInGlobalWindow("A", new Instant(1400L), new Instant(1411L)),
-            timerInGlobalWindow("B", new Instant(1500L), new Instant(1511L)),
-            timerInGlobalWindow("A", new Instant(1600L), new Instant(1611L)),
+            timerInGlobalWindow("A", new Instant(1400L), new Instant(2411L)),
+            timerInGlobalWindow("B", new Instant(1500L), new Instant(2511L)),
+            timerInGlobalWindow("A", new Instant(1600L), new Instant(2611L)),
             timerInGlobalWindow("X", new Instant(1700L), new Instant(1721L)),
             timerInGlobalWindow("C", new Instant(1800L), new Instant(1821L)),
             timerInGlobalWindow("B", new Instant(1900L), new Instant(1921L))));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index acc40e1..f16c616 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -368,18 +368,9 @@
       return entity.getProperties().get("timestamp").getTimestampValue().getSeconds() * 1000000;
     }
 
-    /**
-     * Get the estimated size of the data returned by the given query.
-     *
-     * <p>Cloud Datastore provides no way to get a good estimate of how large the result of a query
-     * entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind is
-     * specified in the query.
-     *
-     * <p>See https://cloud.google.com/datastore/docs/concepts/stats.
-     */
-    static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
-        throws DatastoreException {
-      String ourKind = query.getKind(0).getName();
+    /** Retrieve latest table statistics for a given kind, namespace, and datastore. */
+    private static Entity getLatestTableStats(
+        String ourKind, @Nullable String namespace, Datastore datastore) throws DatastoreException {
       long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace);
       LOG.info("Latest stats timestamp for kind {} is {}", ourKind, latestTimestamp);
 
@@ -406,7 +397,22 @@
         throw new NoSuchElementException(
             "Datastore statistics for kind " + ourKind + " unavailable");
       }
-      Entity entity = batch.getEntityResults(0).getEntity();
+      return batch.getEntityResults(0).getEntity();
+    }
+
+    /**
+     * Get the estimated size of the data returned by the given query.
+     *
+     * <p>Cloud Datastore provides no way to get a good estimate of how large the result of a query
+     * entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind is
+     * specified in the query.
+     *
+     * <p>See https://cloud.google.com/datastore/docs/concepts/stats.
+     */
+    static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
+        throws DatastoreException {
+      String ourKind = query.getKind(0).getName();
+      Entity entity = getLatestTableStats(ourKind, namespace, datastore);
       return entity.getProperties().get("entity_bytes").getIntegerValue();
     }
 
@@ -600,6 +606,23 @@
       return toBuilder().setLocalhost(localhost).build();
     }
 
+    /** Returns Number of entities available for reading. */
+    public long getNumEntities(
+        PipelineOptions options, String ourKind, @Nullable String namespace) {
+      try {
+        V1Options v1Options = V1Options.from(getProjectId(), getNamespace(), getLocalhost());
+        V1DatastoreFactory datastoreFactory = new V1DatastoreFactory();
+        Datastore datastore =
+            datastoreFactory.getDatastore(
+                options, v1Options.getProjectId(), v1Options.getLocalhost());
+
+        Entity entity = getLatestTableStats(ourKind, namespace, datastore);
+        return entity.getProperties().get("count").getIntegerValue();
+      } catch (Exception e) {
+        return -1;
+      }
+    }
+
     @Override
     public PCollection<Entity> expand(PBegin input) {
       checkArgument(getProjectId() != null, "projectId provider cannot be null");
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 7b1be39..2dcd14c 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -81,6 +81,7 @@
 from apache_beam.runners import create_runner
 from apache_beam.transforms import ParDo
 from apache_beam.transforms import ptransform
+from apache_beam.transforms.core import RunnerAPIPTransformHolder
 from apache_beam.transforms.sideinputs import get_sideinput_index
 #from apache_beam.transforms import external
 from apache_beam.typehints import TypeCheckError
@@ -1050,7 +1051,9 @@
         for tag, id in proto.outputs.items()}
     # This annotation is expected by some runners.
     if proto.spec.urn == common_urns.primitives.PAR_DO.urn:
-      assert isinstance(result.transform, ParDo)
+      # TODO(BEAM-9168): Figure out what to do for RunnerAPIPTransformHolder.
+      assert isinstance(result.transform, (ParDo, RunnerAPIPTransformHolder)),\
+        type(result.transform)
       result.transform.output_tags = set(proto.outputs.keys()).difference(
           {'None'})
     if not result.parts:
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index bf1f761..f072e41 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -254,7 +254,7 @@
     # gets applied.
     self.producer = None  # type: Optional[AppliedPTransform]
     # Dictionary of PCollections already associated with tags.
-    self._pcolls = {}  # type: Dict[Optional[str], PValue]
+    self._pcolls = {}  # type: Dict[Optional[str], PCollection]
 
   def __str__(self):
     return '<%s>' % self._str_internal()
@@ -267,15 +267,15 @@
         self.__class__.__name__, self._main_tag, self._tags, self._transform)
 
   def __iter__(self):
-    # type: () -> Iterator[PValue]
-    """Iterates over tags returning for each call a (tag, pvalue) pair."""
+    # type: () -> Iterator[PCollection]
+    """Iterates over tags returning for each call a (tag, pcollection) pair."""
     if self._main_tag is not None:
       yield self[self._main_tag]
     for tag in self._tags:
       yield self[tag]
 
   def __getattr__(self, tag):
-    # type: (str) -> PValue
+    # type: (str) -> PCollection
     # Special methods which may be accessed before the object is
     # fully constructed (e.g. in unpickling).
     if tag[:2] == tag[-2:] == '__':
@@ -283,7 +283,7 @@
     return self[tag]
 
   def __getitem__(self, tag):
-    # type: (Union[int, str, None]) -> PValue
+    # type: (Union[int, str, None]) -> PCollection
     # Accept int tags so that we can look at Partition tags with the
     # same ints that we used in the partition function.
     # TODO(gildea): Consider requiring string-based tags everywhere.
@@ -304,7 +304,7 @@
     assert self.producer is not None
     if tag is not None:
       self._transform.output_tags.add(tag)
-      pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)  # type: PValue
+      pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)
       # Transfer the producer from the DoOutputsTuple to the resulting
       # PCollection.
       pcoll.producer = self.producer.parts[0]
@@ -315,7 +315,10 @@
         self.producer.add_output(pcoll, tag)
     else:
       # Main output is output of inner ParDo.
-      pcoll = self.producer.parts[0].outputs[None]
+      pval = self.producer.parts[0].outputs[None]
+      assert isinstance(pval, PCollection), (
+          "DoOutputsTuple should follow a ParDo.")
+      pcoll = pval
     self._pcolls[tag] = pcoll
     return pcoll
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ada700c..762b2a2 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -302,7 +302,7 @@
     return SetPDoneVisitor(pipeline)
 
   @staticmethod
-  def side_input_visitor():
+  def side_input_visitor(use_unified_worker=False):
     # Imported here to avoid circular dependencies.
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.pipeline import PipelineVisitor
@@ -320,24 +320,32 @@
           for ix, side_input in enumerate(transform_node.side_inputs):
             access_pattern = side_input._side_input_data().access_pattern
             if access_pattern == common_urns.side_inputs.ITERABLE.urn:
-              # Add a map to ('', value) as Dataflow currently only handles
-              # keyed side inputs.
-              pipeline = side_input.pvalue.pipeline
-              new_side_input = _DataflowIterableSideInput(side_input)
-              new_side_input.pvalue = beam.pvalue.PCollection(
-                  pipeline,
-                  element_type=typehints.KV[
-                      bytes, side_input.pvalue.element_type],
-                  is_bounded=side_input.pvalue.is_bounded)
-              parent = transform_node.parent or pipeline._root_transform()
-              map_to_void_key = beam.pipeline.AppliedPTransform(
-                  pipeline,
-                  beam.Map(lambda x: (b'', x)),
-                  transform_node.full_label + '/MapToVoidKey%s' % ix,
-                  (side_input.pvalue,))
-              new_side_input.pvalue.producer = map_to_void_key
-              map_to_void_key.add_output(new_side_input.pvalue)
-              parent.add_part(map_to_void_key)
+              if use_unified_worker:
+                # Patch up the access pattern to appease Dataflow when using
+                # the UW and hardcode the output type to be Any since
+                # the Dataflow JSON and pipeline proto can differ in coders
+                # which leads to encoding/decoding issues within the runner.
+                side_input.pvalue.element_type = typehints.Any
+                new_side_input = _DataflowIterableSideInput(side_input)
+              else:
+                # Add a map to ('', value) as Dataflow currently only handles
+                # keyed side inputs when using the JRH.
+                pipeline = side_input.pvalue.pipeline
+                new_side_input = _DataflowIterableAsMultimapSideInput(
+                    side_input)
+                new_side_input.pvalue = beam.pvalue.PCollection(
+                    pipeline,
+                    element_type=typehints.KV[bytes,
+                                              side_input.pvalue.element_type],
+                    is_bounded=side_input.pvalue.is_bounded)
+                parent = transform_node.parent or pipeline._root_transform()
+                map_to_void_key = beam.pipeline.AppliedPTransform(
+                    pipeline, beam.Map(lambda x: (b'', x)),
+                    transform_node.full_label + '/MapToVoidKey%s' % ix,
+                    (side_input.pvalue,))
+                new_side_input.pvalue.producer = map_to_void_key
+                map_to_void_key.add_output(new_side_input.pvalue)
+                parent.add_part(map_to_void_key)
             elif access_pattern == common_urns.side_inputs.MULTIMAP.urn:
               # Ensure the input coder is a KV coder and patch up the
               # access pattern to appease Dataflow.
@@ -397,7 +405,8 @@
 
     # Convert all side inputs into a form acceptable to Dataflow.
     if apiclient._use_fnapi(options):
-      pipeline.visit(self.side_input_visitor())
+      pipeline.visit(
+          self.side_input_visitor(apiclient._use_unified_worker(options)))
 
     # Performing configured PTransform overrides.  Note that this is currently
     # done before Runner API serialization, since the new proto needs to contain
@@ -1320,12 +1329,12 @@
     return self._data
 
 
-class _DataflowIterableSideInput(_DataflowSideInput):
+class _DataflowIterableAsMultimapSideInput(_DataflowSideInput):
   """Wraps an iterable side input as dataflow-compatible side input."""
 
-  def __init__(self, iterable_side_input):
+  def __init__(self, side_input):
     # pylint: disable=protected-access
-    side_input_data = iterable_side_input._side_input_data()
+    side_input_data = side_input._side_input_data()
     assert (
         side_input_data.access_pattern == common_urns.side_inputs.ITERABLE.urn)
     iterable_view_fn = side_input_data.view_fn
@@ -1335,6 +1344,20 @@
         lambda multimap: iterable_view_fn(multimap[b'']))
 
 
+class _DataflowIterableSideInput(_DataflowSideInput):
+  """Wraps an iterable side input as dataflow-compatible side input."""
+
+  def __init__(self, side_input):
+    # pylint: disable=protected-access
+    self.pvalue = side_input.pvalue
+    side_input_data = side_input._side_input_data()
+    assert (
+        side_input_data.access_pattern == common_urns.side_inputs.ITERABLE.urn)
+    self._data = beam.pvalue.SideInputData(common_urns.side_inputs.ITERABLE.urn,
+                                           side_input_data.window_mapping_fn,
+                                           side_input_data.view_fn)
+
+
 class _DataflowMultimapSideInput(_DataflowSideInput):
   """Wraps a multimap side input as dataflow-compatible side input."""
 
diff --git a/website/src/contribute/release-guide.md b/website/src/contribute/release-guide.md
index d7c3a7c..05cdec6 100644
--- a/website/src/contribute/release-guide.md
+++ b/website/src/contribute/release-guide.md
@@ -125,7 +125,7 @@
 
 * Determine your Apache GPG Key and Key ID, as follows:
 
-      gpg --list-keys
+      gpg --list-sigs --keyid-format LONG
 
   This will list your GPG keys. One of these should reflect your Apache account, for example:
   
@@ -377,12 +377,15 @@
 After the release branch is cut you need to make sure it builds and has no significant issues that would block the creation of the release candidate.
 There are 2 ways to perform this verification, either running automation script(recommended), or running all commands manually.
 
+! Dataflow tests will fail if Dataflow worker container is not created and published by this time. (Should be done by Google)
+
 #### Run automation script (verify_release_build.sh)
 * Script: [verify_release_build.sh](https://github.com/apache/beam/blob/master/release/src/main/scripts/verify_release_build.sh)
 
 * Usage
   1. Create a personal access token from your Github account. See instruction [here](https://help.github.com/en/articles/creating-a-personal-access-token-for-the-command-line).
      It'll be used by the script for accessing Github API.
+     You don't have to add any permissions to this token.
   1. Update required configurations listed in `RELEASE_BUILD_CONFIGS` in [script.config](https://github.com/apache/beam/blob/master/release/src/main/scripts/script.config)
   1. Then run
      ```
@@ -397,7 +400,9 @@
   1. Create a test PR against release branch;
 
 Jenkins job `beam_Release_Gradle_Build` basically run `./gradlew build -PisRelease`.
-This only verifies that everything builds with unit tests passing. 
+This only verifies that everything builds with unit tests passing.
+
+You can refer to [this script](https://gist.github.com/Ardagan/13e6031e8d1c9ebbd3029bf365c1a517) to mass-comment on PR.
 
 #### Verify the build succeeds
 
@@ -579,9 +584,6 @@
 * The script will:
   1. Run gradle release to create rc tag and push source release into github repo.
   1. Run gradle publish to push java artifacts into Maven staging repo.
-     
-     __NOTE__: In order to public staging artifacts, you need to goto the [staging repo](https://repository.apache.org/#stagingRepositories) to close the staging repository on Apache Nexus. 
-     When prompted for a description, enter “Apache Beam, version X, release candidate Y”.
   1. Stage source release into dist.apache.org dev [repo](https://dist.apache.org/repos/dist/dev/beam/).
   1. Stage,sign and hash python binaries into dist.apache.ord dev repo python dir
   1. Stage SDK docker images to [https://hub.docker.com/u/apachebeam](https://hub.docker.com/u/apachebeam).
@@ -595,6 +597,9 @@
   1. Update last release download links in `website/src/get-started/downloads.md`.
   1. Update `website/src/.htaccess` to redirect to the new version.
   1. Build and stage python wheels.
+  1. Publish staging artifacts
+      1. Go to the staging repo to close the staging repository on [Apache Nexus](https://repository.apache.org/#stagingRepositories). 
+      1. When prompted for a description, enter “Apache Beam, version X, release candidate Y”.
 
 
 ### (Alternative) Run all steps manually
@@ -946,7 +951,7 @@
     
 If there are any issues found in the release candidate, reply on the vote thread to cancel the vote. There’s no need to wait 72 hours. Proceed to the `Fix Issues` step below and address the problem. However, some issues don’t require cancellation. For example, if an issue is found in the website pull request, just correct it on the spot and the vote can continue as-is.
 
-If there are no issues, reply on the vote thread to close the voting. Then, tally the votes in a separate email. Here’s an email template; please adjust as you see fit.
+If there are no issues, reply on the vote thread to close the voting. Then, tally the votes in a separate email thread. Here’s an email template; please adjust as you see fit.
 
     From: Release Manager
     To: dev@beam.apache.org
@@ -1237,7 +1242,7 @@
 
 ### Deploy artifacts to Maven Central Repository
 
-Use the Apache Nexus repository to release the staged binary artifacts to the Maven Central repository. In the `Staging Repositories` section, find the relevant release candidate `orgapachebeam-XXX` entry and click `Release`. Drop all other release candidates that are not being released.
+Use the [Apache Nexus repository manager](https://repository.apache.org/#stagingRepositories) to release the staged binary artifacts to the Maven Central repository. In the `Staging Repositories` section, find the relevant release candidate `orgapachebeam-XXX` entry and click `Release`. Drop all other release candidates that are not being released.
 __NOTE__: If you are using [GitHub two-factor authentication](https://help.github.com/articles/securing-your-account-with-two-factor-authentication-2fa/) and haven't configure HTTPS access,
 please follow [the guide](https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/) to configure command line access.
 
@@ -1248,6 +1253,9 @@
    delete the `.asc`, `.sha512`;
 3. Upload the new release `twine upload *` from the directory with the `.zip` and `.whl` files;
 
+[Installing twine](https://packaging.python.org/tutorials/packaging-projects/#uploading-the-distribution-archives): `pip install twine`. You can install twine under [virtualenv](https://virtualenv.pypa.io/en/latest/) if preferred. 
+
+
 ### Deploy source release to dist.apache.org
 
 Copy the source release from the `dev` repository to the `release` repository at `dist.apache.org` using Subversion.
@@ -1279,7 +1287,7 @@
 
 ### Merge website pull request
 
-Merge the website pull request to [list the release]({{ site.baseurl }}/get-started/downloads/), publish the [Python API reference manual](https://beam.apache.org/releases/pydoc/), and the [Java API reference manual](https://beam.apache.org/releases/javadoc/) created earlier.
+Merge the website pull request to [list the release]({{ site.baseurl }}/get-started/downloads/), publish the [Python API reference manual](https://beam.apache.org/releases/pydoc/), the [Java API reference manual](https://beam.apache.org/releases/javadoc/) and Blogpost created earlier.
 
 ### Mark the version as released in JIRA
 
diff --git a/website/src/documentation/runners/direct.md b/website/src/documentation/runners/direct.md
index 2e763bf..acfc6bb 100644
--- a/website/src/documentation/runners/direct.md
+++ b/website/src/documentation/runners/direct.md
@@ -86,7 +86,7 @@
 
 Python [FnApiRunner](https://beam.apache.org/contribute/runner-guide/#the-fn-api) supports multi-threading and multi-processing mode.
 
-#### Setting parallelism
+<strong>Setting parallelism</strong>
 
 Number of threads or subprocesses is defined by setting the `direct_num_workers` option. There are several ways to set this option.
 
@@ -108,6 +108,23 @@
 pipeline_options.view_as(DirectOptions).direct_num_workers = 2
 ```
 
+
+
+<strong>Setting running mode</strong>
+
+From 2.19, a new option was added to set running mode. We can use `direct_running_mode` option to set the running mode.
+`direct_running_mode` can be one of [`'in_memory'`, `'multi_threading'`, `'multi_processing'`].
+
+<b>in_memory</b>: Runner and workers' communication happens in memory (not through gRPC). This is a default mode.
+
+<b>multi_threading</b>: Runner and workers communicate through gRPC and each worker runs in a thread.
+
+<b>multi_processing</b>: Runner and workers communicate through gRPC and each worker runs in a subprocess.
+
+Same as other options, `direct_running_mode` can be passed through CLI or set with `PipelineOptions`.
+
+For the versions before 2.19.0, the running mode should be set with `FnApiRunner()`. Please refer following examples.
+
 #### Running with multi-threading mode
 
 ```