This closes #3970
diff --git a/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy b/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy
index eae129b..feadb89 100644
--- a/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy
+++ b/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy
@@ -45,7 +45,7 @@
     --batch-mode \
     --errors \
     --activate-profiles release,jenkins-precommit,direct-runner,dataflow-runner,spark-runner,flink-runner,apex-runner \
-    --projects sdks/python \
+    --projects sdks/python,!sdks/python/container \
     --also-make \
     --also-make-dependents \
     -D pullRequest=$ghprbPullId \
diff --git a/sdks/go/cmd/beamctl/cmd/root.go b/sdks/go/cmd/beamctl/cmd/root.go
index 53ee83c..a4e7945 100644
--- a/sdks/go/cmd/beamctl/cmd/root.go
+++ b/sdks/go/cmd/beamctl/cmd/root.go
@@ -27,6 +27,7 @@
 )
 
 var (
+	// RootCmd is the root for beamctl commands.
 	RootCmd = &cobra.Command{
 		Use:   "beamctl",
 		Short: "Apache Beam command line client",
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index d010065..e48b58c 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -162,6 +162,14 @@
             value=to_json_value(job_type)),
         dataflow.Environment.VersionValue.AdditionalProperty(
             key='major', value=to_json_value(environment_version))])
+    # TODO: Use enumerated type instead of strings for job types.
+    if job_type.startswith('FNAPI_'):
+      runner_harness_override = (
+          dependency.get_runner_harness_container_image())
+      if runner_harness_override:
+        self.debug_options.experiments = self.debug_options.experiments or []
+        self.debug_options.experiments.append(
+            'runner_harness_container_image=' + runner_harness_override)
     # Experiments
     if self.debug_options.experiments:
       for experiment in self.debug_options.experiments:
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index e7cced7..ecd6003 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -17,10 +17,12 @@
 """Unit tests for the apiclient module."""
 import unittest
 
-from mock import Mock
+import mock
+import pkg_resources
 
 from apache_beam.metrics.cells import DistributionData
 from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.dataflow.internal import dependency
 from apache_beam.runners.dataflow.internal.clients import dataflow
 
 # Protect against environments where apitools library is not available.
@@ -102,7 +104,7 @@
 
   def test_translate_means(self):
     metric_update = dataflow.CounterUpdate()
-    accumulator = Mock()
+    accumulator = mock.Mock()
     accumulator.sum = 16
     accumulator.count = 2
     apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator,
@@ -142,6 +144,57 @@
         env.proto.workerPools[0].ipConfiguration,
         dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE)
 
+  def test_harness_override_present_in_dataflow_distributions(self):
+    pipeline_options = PipelineOptions(
+        ['--temp_location', 'gs://any-location/temp', '--streaming'])
+    override = ''.join(
+        ['runner_harness_container_image=',
+         dependency.DATAFLOW_CONTAINER_IMAGE_REPOSITORY,
+         '/harness:2.2.0'])
+    distribution = pkg_resources.Distribution(version='2.2.0')
+    with mock.patch(
+        'apache_beam.runners.dataflow.internal.dependency.pkg_resources'
+        '.get_distribution',
+        mock.MagicMock(return_value=distribution)):
+      env = apiclient.Environment([], #packages
+                                  pipeline_options,
+                                  '2.0.0') #any environment version
+      self.assertIn(override, env.proto.experiments)
+
+  @mock.patch('apache_beam.runners.dataflow.internal.dependency.'
+              'beam_version.__version__', '2.2.0')
+  def test_harness_override_present_in_beam_releases(self):
+    pipeline_options = PipelineOptions(
+        ['--temp_location', 'gs://any-location/temp', '--streaming'])
+    override = ''.join(
+        ['runner_harness_container_image=',
+         dependency.DATAFLOW_CONTAINER_IMAGE_REPOSITORY,
+         '/harness:2.2.0'])
+    with mock.patch(
+        'apache_beam.runners.dataflow.internal.dependency.pkg_resources'
+        '.get_distribution',
+        mock.Mock(side_effect=pkg_resources.DistributionNotFound())):
+      env = apiclient.Environment([], #packages
+                                  pipeline_options,
+                                  '2.0.0') #any environment version
+      self.assertIn(override, env.proto.experiments)
+
+  @mock.patch('apache_beam.runners.dataflow.internal.dependency.'
+              'beam_version.__version__', '2.2.0-dev')
+  def test_harness_override_absent_in_unreleased_sdk(self):
+    pipeline_options = PipelineOptions(
+        ['--temp_location', 'gs://any-location/temp', '--streaming'])
+    with mock.patch(
+        'apache_beam.runners.dataflow.internal.dependency.pkg_resources'
+        '.get_distribution',
+        mock.Mock(side_effect=pkg_resources.DistributionNotFound())):
+      env = apiclient.Environment([], #packages
+                                  pipeline_options,
+                                  '2.0.0') #any environment version
+      if env.proto.experiments:
+        for experiment in env.proto.experiments:
+          self.assertNotIn('runner_harness_container_image=', experiment)
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 383f8e9..c1edf7d 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -1,5 +1,3 @@
-
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -61,6 +59,8 @@
 import sys
 import tempfile
 
+import pkg_resources
+
 from apache_beam import version as beam_version
 from apache_beam.internal import pickler
 from apache_beam.io.filesystems import FileSystems
@@ -97,6 +97,8 @@
 GOOGLE_SDK_NAME = 'Google Cloud Dataflow SDK for Python'
 BEAM_SDK_NAME = 'Apache Beam SDK for Python'
 
+DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'dataflow.gcr.io/v1beta3'
+
 
 def _dependency_file_copy(from_path, to_path):
   """Copies a local file to a GCS file or vice versa."""
@@ -488,6 +490,29 @@
         'type of location: %s' % sdk_remote_location)
 
 
+def get_runner_harness_container_image():
+  """For internal use only; no backwards-compatibility guarantees.
+
+   Returns:
+     str: Runner harness container image that shall be used by default
+       for current SDK version or None if the runner harness container image
+       bundled with the service shall be used.
+  """
+  try:
+    version = pkg_resources.get_distribution(GOOGLE_PACKAGE_NAME).version
+    # Pin runner harness for Dataflow releases.
+    return (DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + 'harness' + ':' +
+            version)
+  except pkg_resources.DistributionNotFound:
+    # Pin runner harness for BEAM releases.
+    if 'dev' not in beam_version.__version__:
+      return (DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + 'harness' + ':' +
+              beam_version.__version__)
+    # Don't pin runner harness for BEAM head so that we can notice
+    # potential incompatibility between runner and sdk harnesses.
+    return None
+
+
 def get_default_container_image_for_current_sdk(job_type):
   """For internal use only; no backwards-compatibility guarantees.
 
@@ -517,17 +542,17 @@
       current version of the SDK.
   """
   # TODO(silviuc): Handle apache-beam versions when we have official releases.
-  import pkg_resources as pkg
   try:
-    version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
+    version = pkg_resources.get_distribution(GOOGLE_PACKAGE_NAME).version
     # We drop any pre/post parts of the version and we keep only the X.Y.Z
     # format.  For instance the 0.3.0rc2 SDK version translates into 0.3.0.
-    container_version = '%s.%s.%s' % pkg.parse_version(version)._version.release
+    container_version = (
+        '%s.%s.%s' % pkg_resources.parse_version(version)._version.release)
     # We do, however, keep the ".dev" suffix if it is present.
     if re.match(r'.*\.dev[0-9]*$', version):
       container_version += '.dev'
     return container_version
-  except pkg.DistributionNotFound:
+  except pkg_resources.DistributionNotFound:
     # This case covers Apache Beam end-to-end testing scenarios. All these tests
     # will run with a special container version.
     if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
@@ -540,12 +565,11 @@
   """For internal use only; no backwards-compatibility guarantees.
 
   Returns name and version of SDK reported to Google Cloud Dataflow."""
-  import pkg_resources as pkg
   container_version = _get_required_container_version()
   try:
-    pkg.get_distribution(GOOGLE_PACKAGE_NAME)
+    pkg_resources.get_distribution(GOOGLE_PACKAGE_NAME)
     return (GOOGLE_SDK_NAME, container_version)
-  except pkg.DistributionNotFound:
+  except pkg_resources.DistributionNotFound:
     return (BEAM_SDK_NAME, beam_version.__version__)
 
 
@@ -563,10 +587,9 @@
 def _download_pypi_sdk_package(temp_dir):
   """Downloads SDK package from PyPI and returns path to local path."""
   package_name = get_sdk_package_name()
-  import pkg_resources as pkg
   try:
-    version = pkg.get_distribution(package_name).version
-  except pkg.DistributionNotFound:
+    version = pkg_resources.get_distribution(package_name).version
+  except pkg_resources.DistributionNotFound:
     raise RuntimeError('Please set --sdk_location command-line option '
                        'or install a valid {} distribution.'
                        .format(package_name))
diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile
new file mode 100644
index 0000000..826e36c
--- /dev/null
+++ b/sdks/python/container/Dockerfile
@@ -0,0 +1,27 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+FROM python:2
+MAINTAINER "Apache Beam <dev@beam.apache.org>"
+
+# TODO(herohde): preinstall various packages for better startup
+# performance and reliability.
+
+ADD target/linux_amd64/boot /opt/apache/beam/
+
+ENTRYPOINT ["/opt/apache/beam/boot"]
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
new file mode 100644
index 0000000..18b9900
--- /dev/null
+++ b/sdks/python/container/boot.go
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// boot is the boot code for the Python SDK harness container. It is responsible
+// for retrieving and install staged files and invoking python correctly.
+package main
+
+import (
+"context"
+"flag"
+"fmt"
+"log"
+"os"
+"path/filepath"
+"strings"
+
+"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+"github.com/apache/beam/sdks/go/pkg/beam/provision"
+"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
+"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+)
+
+var (
+	// Contract: https://s.apache.org/beam-fn-api-container-contract.
+
+	id                = flag.String("id", "", "Local identifier (required).")
+	loggingEndpoint   = flag.String("logging_endpoint", "", "Logging endpoint (required).")
+	artifactEndpoint  = flag.String("artifact_endpoint", "", "Artifact endpoint (required).")
+	provisionEndpoint = flag.String("provision_endpoint", "", "Provision endpoint (required).")
+	controlEndpoint   = flag.String("control_endpoint", "", "Control endpoint (required).")
+	semiPersistDir    = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).")
+)
+
+func main() {
+	flag.Parse()
+	if *id == "" {
+		log.Fatal("No id provided.")
+	}
+	if *loggingEndpoint == "" {
+		log.Fatal("No logging endpoint provided.")
+	}
+	if *artifactEndpoint == "" {
+		log.Fatal("No artifact endpoint provided.")
+	}
+	if *provisionEndpoint == "" {
+		log.Fatal("No provision endpoint provided.")
+	}
+	if *controlEndpoint == "" {
+		log.Fatal("No control endpoint provided.")
+	}
+
+	log.Printf("Initializing python harness: %v", strings.Join(os.Args, " "))
+
+	ctx := grpcx.WriteWorkerID(context.Background(), *id)
+
+	// (1) Obtain the pipeline options
+
+	info, err := provision.Info(ctx, *provisionEndpoint)
+	if err != nil {
+		log.Fatalf("Failed to obtain provisioning information: %v", err)
+	}
+	options, err := provision.ProtoToJSON(info.GetPipelineOptions())
+	if err != nil {
+		log.Fatalf("Failed to convert pipeline options: %v", err)
+	}
+
+	// (2) Retrieve and install the staged packages.
+
+	dir := filepath.Join(*semiPersistDir, "staged")
+
+	_, err = artifact.Materialize(ctx, *artifactEndpoint, dir)
+	if err != nil {
+		log.Fatalf("Failed to retrieve staged files: %v", err)
+	}
+
+	// TODO(herohde): the packages to install should be specified explicitly. It
+	// would also be possible to install the SDK in the Dockerfile.
+	if err := pipInstall(joinPaths(dir, "dataflow_python_sdk.tar[gcp]")); err != nil {
+		log.Fatalf("Failed to install SDK: %v", err)
+	}
+
+	// (3) Invoke python
+
+	os.Setenv("PIPELINE_OPTIONS", options)
+	os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", fmt.Sprintf("url: \"%v\"\n", *loggingEndpoint))
+	os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", fmt.Sprintf("url: \"%v\"\n", *controlEndpoint))
+
+	args := []string{
+		"-m",
+		"apache_beam.runners.worker.sdk_worker_main",
+	}
+	log.Printf("Executing: python %v", strings.Join(args, " "))
+
+	log.Fatalf("Python exited: %v", execx.Execute("python", args...))
+}
+
+// pipInstall runs pip install with the given args.
+func pipInstall(args []string) error {
+	return execx.Execute("pip", append([]string{"install"}, args...)...)
+}
+
+// joinPaths joins the dir to every artifact path. Each / in the path is
+// interpreted as a directory separator.
+func joinPaths(dir string, paths ...string) []string {
+	var ret []string
+	for _, p := range paths {
+		ret = append(ret, filepath.Join(dir, filepath.FromSlash(p)))
+	}
+	return ret
+}
\ No newline at end of file
diff --git a/sdks/python/container/pom.xml b/sdks/python/container/pom.xml
new file mode 100644
index 0000000..6be2885
--- /dev/null
+++ b/sdks/python/container/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-python</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-python-container</artifactId>
+
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: SDKs :: Python :: Container</name>
+
+  <properties>
+    <!-- Add full path directory structure for 'go get' compatibility -->
+    <go.source.base>${project.basedir}/target/src</go.source.base>
+    <go.source.dir>${go.source.base}/github.com/apache/beam/sdks/go</go.source.dir>
+  </properties>
+
+  <build>
+    <sourceDirectory>${go.source.base}</sourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-go-cmd-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${go.source.base}/github.com/apache/beam/cmd/boot</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>.</directory>
+                  <includes>
+                    <include>*.go</include>
+                  </includes>
+                  <filtering>false</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- CAVEAT: for latest shared files, run mvn install in sdks/go -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependency</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>unpack</goal>
+            </goals>
+            <configuration>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.apache.beam</groupId>
+                  <artifactId>beam-sdks-go</artifactId>
+                  <version>${project.version}</version>
+                  <type>zip</type>
+                  <classifier>pkg-sources</classifier>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${go.source.dir}</outputDirectory>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.igormaznitsa</groupId>
+        <artifactId>mvn-golang-wrapper</artifactId>
+        <executions>
+          <execution>
+            <id>go-get-imports</id>
+            <goals>
+              <goal>get</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>google.golang.org/grpc</package>
+                <package>golang.org/x/oauth2/google</package>
+                <package>google.golang.org/api/storage/v1</package>
+              </packages>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/boot</package>
+              </packages>
+              <resultName>boot</resultName>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build-linux-amd64</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/boot</package>
+              </packages>
+              <resultName>linux_amd64/boot</resultName>
+              <targetArch>amd64</targetArch>
+              <targetOs>linux</targetOs>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.spotify</groupId>
+        <artifactId>dockerfile-maven-plugin</artifactId>
+        <configuration>
+          <repository>${docker-repository-root}/python</repository>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 37f9529..b42fb70 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -32,6 +32,10 @@
 
   <name>Apache Beam :: SDKs :: Python</name>
 
+  <modules>
+    <module>container</module>
+  </modules>
+
   <properties>
     <!-- python.interpreter.bin & python.pip.bin
          is set dynamically by findSupportedPython.groovy -->
@@ -59,6 +63,7 @@
         <groupId>org.codehaus.gmaven</groupId>
         <artifactId>groovy-maven-plugin</artifactId>
         <version>${groovy-maven-plugin.version}</version>
+        <inherited>false</inherited>
         <executions>
           <execution>
             <id>find-supported-python-for-clean</id>
@@ -85,6 +90,7 @@
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>exec-maven-plugin</artifactId>
+        <inherited>false</inherited>
         <executions>
           <execution>
             <id>setuptools-clean</id>
@@ -143,7 +149,7 @@
                 <argument>--user</argument>
                 <argument>--upgrade</argument>
                 <argument>--ignore-installed</argument>
-                <argument>detox</argument>
+                <argument>tox</argument>
               </arguments>
               <environmentVariables>
                 <PYTHONUSERBASE>${python.user.base}</PYTHONUSERBASE>
@@ -189,6 +195,7 @@
           <plugin>
             <groupId>org.codehaus.mojo</groupId>
             <artifactId>exec-maven-plugin</artifactId>
+            <inherited>false</inherited>
             <executions>
               <execution>
                 <id>setuptools-test</id>
@@ -197,7 +204,7 @@
                   <goal>exec</goal>
                 </goals>
                 <configuration>
-                  <executable>${python.user.base}/bin/detox</executable>
+                  <executable>${python.user.base}/bin/tox</executable>
                   <arguments>
                     <argument>-e</argument>
                     <argument>ALL</argument>