This closes #3970
diff --git a/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy b/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy
index eae129b..feadb89 100644
--- a/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy
+++ b/.test-infra/jenkins/job_beam_PreCommit_Python_MavenInstall.groovy
@@ -45,7 +45,7 @@
--batch-mode \
--errors \
--activate-profiles release,jenkins-precommit,direct-runner,dataflow-runner,spark-runner,flink-runner,apex-runner \
- --projects sdks/python \
+ --projects sdks/python,!sdks/python/container \
--also-make \
--also-make-dependents \
-D pullRequest=$ghprbPullId \
diff --git a/sdks/go/cmd/beamctl/cmd/root.go b/sdks/go/cmd/beamctl/cmd/root.go
index 53ee83c..a4e7945 100644
--- a/sdks/go/cmd/beamctl/cmd/root.go
+++ b/sdks/go/cmd/beamctl/cmd/root.go
@@ -27,6 +27,7 @@
)
var (
+ // RootCmd is the root for beamctl commands.
RootCmd = &cobra.Command{
Use: "beamctl",
Short: "Apache Beam command line client",
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index d010065..e48b58c 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -162,6 +162,14 @@
value=to_json_value(job_type)),
dataflow.Environment.VersionValue.AdditionalProperty(
key='major', value=to_json_value(environment_version))])
+ # TODO: Use enumerated type instead of strings for job types.
+ if job_type.startswith('FNAPI_'):
+ runner_harness_override = (
+ dependency.get_runner_harness_container_image())
+ if runner_harness_override:
+ self.debug_options.experiments = self.debug_options.experiments or []
+ self.debug_options.experiments.append(
+ 'runner_harness_container_image=' + runner_harness_override)
# Experiments
if self.debug_options.experiments:
for experiment in self.debug_options.experiments:
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index e7cced7..ecd6003 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -17,10 +17,12 @@
"""Unit tests for the apiclient module."""
import unittest
-from mock import Mock
+import mock
+import pkg_resources
from apache_beam.metrics.cells import DistributionData
from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.dataflow.internal import dependency
from apache_beam.runners.dataflow.internal.clients import dataflow
# Protect against environments where apitools library is not available.
@@ -102,7 +104,7 @@
def test_translate_means(self):
metric_update = dataflow.CounterUpdate()
- accumulator = Mock()
+ accumulator = mock.Mock()
accumulator.sum = 16
accumulator.count = 2
apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator,
@@ -142,6 +144,57 @@
env.proto.workerPools[0].ipConfiguration,
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE)
+ def test_harness_override_present_in_dataflow_distributions(self):
+ pipeline_options = PipelineOptions(
+ ['--temp_location', 'gs://any-location/temp', '--streaming'])
+ override = ''.join(
+ ['runner_harness_container_image=',
+ dependency.DATAFLOW_CONTAINER_IMAGE_REPOSITORY,
+ '/harness:2.2.0'])
+ distribution = pkg_resources.Distribution(version='2.2.0')
+ with mock.patch(
+ 'apache_beam.runners.dataflow.internal.dependency.pkg_resources'
+ '.get_distribution',
+ mock.MagicMock(return_value=distribution)):
+ env = apiclient.Environment([], #packages
+ pipeline_options,
+ '2.0.0') #any environment version
+ self.assertIn(override, env.proto.experiments)
+
+ @mock.patch('apache_beam.runners.dataflow.internal.dependency.'
+ 'beam_version.__version__', '2.2.0')
+ def test_harness_override_present_in_beam_releases(self):
+ pipeline_options = PipelineOptions(
+ ['--temp_location', 'gs://any-location/temp', '--streaming'])
+ override = ''.join(
+ ['runner_harness_container_image=',
+ dependency.DATAFLOW_CONTAINER_IMAGE_REPOSITORY,
+ '/harness:2.2.0'])
+ with mock.patch(
+ 'apache_beam.runners.dataflow.internal.dependency.pkg_resources'
+ '.get_distribution',
+ mock.Mock(side_effect=pkg_resources.DistributionNotFound())):
+ env = apiclient.Environment([], #packages
+ pipeline_options,
+ '2.0.0') #any environment version
+ self.assertIn(override, env.proto.experiments)
+
+ @mock.patch('apache_beam.runners.dataflow.internal.dependency.'
+ 'beam_version.__version__', '2.2.0-dev')
+ def test_harness_override_absent_in_unreleased_sdk(self):
+ pipeline_options = PipelineOptions(
+ ['--temp_location', 'gs://any-location/temp', '--streaming'])
+ with mock.patch(
+ 'apache_beam.runners.dataflow.internal.dependency.pkg_resources'
+ '.get_distribution',
+ mock.Mock(side_effect=pkg_resources.DistributionNotFound())):
+ env = apiclient.Environment([], #packages
+ pipeline_options,
+ '2.0.0') #any environment version
+ if env.proto.experiments:
+ for experiment in env.proto.experiments:
+ self.assertNotIn('runner_harness_container_image=', experiment)
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 383f8e9..c1edf7d 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -1,5 +1,3 @@
-
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -61,6 +59,8 @@
import sys
import tempfile
+import pkg_resources
+
from apache_beam import version as beam_version
from apache_beam.internal import pickler
from apache_beam.io.filesystems import FileSystems
@@ -97,6 +97,8 @@
GOOGLE_SDK_NAME = 'Google Cloud Dataflow SDK for Python'
BEAM_SDK_NAME = 'Apache Beam SDK for Python'
+DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'dataflow.gcr.io/v1beta3'
+
def _dependency_file_copy(from_path, to_path):
"""Copies a local file to a GCS file or vice versa."""
@@ -488,6 +490,29 @@
'type of location: %s' % sdk_remote_location)
+def get_runner_harness_container_image():
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Returns:
+ str: Runner harness container image that shall be used by default
+ for current SDK version or None if the runner harness container image
+ bundled with the service shall be used.
+ """
+ try:
+ version = pkg_resources.get_distribution(GOOGLE_PACKAGE_NAME).version
+ # Pin runner harness for Dataflow releases.
+ return (DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + 'harness' + ':' +
+ version)
+ except pkg_resources.DistributionNotFound:
+ # Pin runner harness for BEAM releases.
+ if 'dev' not in beam_version.__version__:
+ return (DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + 'harness' + ':' +
+ beam_version.__version__)
+ # Don't pin runner harness for BEAM head so that we can notice
+ # potential incompatibility between runner and sdk harnesses.
+ return None
+
+
def get_default_container_image_for_current_sdk(job_type):
"""For internal use only; no backwards-compatibility guarantees.
@@ -517,17 +542,17 @@
current version of the SDK.
"""
# TODO(silviuc): Handle apache-beam versions when we have official releases.
- import pkg_resources as pkg
try:
- version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
+ version = pkg_resources.get_distribution(GOOGLE_PACKAGE_NAME).version
# We drop any pre/post parts of the version and we keep only the X.Y.Z
# format. For instance the 0.3.0rc2 SDK version translates into 0.3.0.
- container_version = '%s.%s.%s' % pkg.parse_version(version)._version.release
+ container_version = (
+ '%s.%s.%s' % pkg_resources.parse_version(version)._version.release)
# We do, however, keep the ".dev" suffix if it is present.
if re.match(r'.*\.dev[0-9]*$', version):
container_version += '.dev'
return container_version
- except pkg.DistributionNotFound:
+ except pkg_resources.DistributionNotFound:
# This case covers Apache Beam end-to-end testing scenarios. All these tests
# will run with a special container version.
if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
@@ -540,12 +565,11 @@
"""For internal use only; no backwards-compatibility guarantees.
Returns name and version of SDK reported to Google Cloud Dataflow."""
- import pkg_resources as pkg
container_version = _get_required_container_version()
try:
- pkg.get_distribution(GOOGLE_PACKAGE_NAME)
+ pkg_resources.get_distribution(GOOGLE_PACKAGE_NAME)
return (GOOGLE_SDK_NAME, container_version)
- except pkg.DistributionNotFound:
+ except pkg_resources.DistributionNotFound:
return (BEAM_SDK_NAME, beam_version.__version__)
@@ -563,10 +587,9 @@
def _download_pypi_sdk_package(temp_dir):
"""Downloads SDK package from PyPI and returns path to local path."""
package_name = get_sdk_package_name()
- import pkg_resources as pkg
try:
- version = pkg.get_distribution(package_name).version
- except pkg.DistributionNotFound:
+ version = pkg_resources.get_distribution(package_name).version
+ except pkg_resources.DistributionNotFound:
raise RuntimeError('Please set --sdk_location command-line option '
'or install a valid {} distribution.'
.format(package_name))
diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile
new file mode 100644
index 0000000..826e36c
--- /dev/null
+++ b/sdks/python/container/Dockerfile
@@ -0,0 +1,27 @@
+###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+FROM python:2
+MAINTAINER "Apache Beam <dev@beam.apache.org>"
+
+# TODO(herohde): preinstall various packages for better startup
+# performance and reliability.
+
+ADD target/linux_amd64/boot /opt/apache/beam/
+
+ENTRYPOINT ["/opt/apache/beam/boot"]
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
new file mode 100644
index 0000000..18b9900
--- /dev/null
+++ b/sdks/python/container/boot.go
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// boot is the boot code for the Python SDK harness container. It is responsible
+// for retrieving and install staged files and invoking python correctly.
+package main
+
+import (
+"context"
+"flag"
+"fmt"
+"log"
+"os"
+"path/filepath"
+"strings"
+
+"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+"github.com/apache/beam/sdks/go/pkg/beam/provision"
+"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
+"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+)
+
+var (
+ // Contract: https://s.apache.org/beam-fn-api-container-contract.
+
+ id = flag.String("id", "", "Local identifier (required).")
+ loggingEndpoint = flag.String("logging_endpoint", "", "Logging endpoint (required).")
+ artifactEndpoint = flag.String("artifact_endpoint", "", "Artifact endpoint (required).")
+ provisionEndpoint = flag.String("provision_endpoint", "", "Provision endpoint (required).")
+ controlEndpoint = flag.String("control_endpoint", "", "Control endpoint (required).")
+ semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).")
+)
+
+func main() {
+ flag.Parse()
+ if *id == "" {
+ log.Fatal("No id provided.")
+ }
+ if *loggingEndpoint == "" {
+ log.Fatal("No logging endpoint provided.")
+ }
+ if *artifactEndpoint == "" {
+ log.Fatal("No artifact endpoint provided.")
+ }
+ if *provisionEndpoint == "" {
+ log.Fatal("No provision endpoint provided.")
+ }
+ if *controlEndpoint == "" {
+ log.Fatal("No control endpoint provided.")
+ }
+
+ log.Printf("Initializing python harness: %v", strings.Join(os.Args, " "))
+
+ ctx := grpcx.WriteWorkerID(context.Background(), *id)
+
+ // (1) Obtain the pipeline options
+
+ info, err := provision.Info(ctx, *provisionEndpoint)
+ if err != nil {
+ log.Fatalf("Failed to obtain provisioning information: %v", err)
+ }
+ options, err := provision.ProtoToJSON(info.GetPipelineOptions())
+ if err != nil {
+ log.Fatalf("Failed to convert pipeline options: %v", err)
+ }
+
+ // (2) Retrieve and install the staged packages.
+
+ dir := filepath.Join(*semiPersistDir, "staged")
+
+ _, err = artifact.Materialize(ctx, *artifactEndpoint, dir)
+ if err != nil {
+ log.Fatalf("Failed to retrieve staged files: %v", err)
+ }
+
+ // TODO(herohde): the packages to install should be specified explicitly. It
+ // would also be possible to install the SDK in the Dockerfile.
+ if err := pipInstall(joinPaths(dir, "dataflow_python_sdk.tar[gcp]")); err != nil {
+ log.Fatalf("Failed to install SDK: %v", err)
+ }
+
+ // (3) Invoke python
+
+ os.Setenv("PIPELINE_OPTIONS", options)
+ os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", fmt.Sprintf("url: \"%v\"\n", *loggingEndpoint))
+ os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", fmt.Sprintf("url: \"%v\"\n", *controlEndpoint))
+
+ args := []string{
+ "-m",
+ "apache_beam.runners.worker.sdk_worker_main",
+ }
+ log.Printf("Executing: python %v", strings.Join(args, " "))
+
+ log.Fatalf("Python exited: %v", execx.Execute("python", args...))
+}
+
+// pipInstall runs pip install with the given args.
+func pipInstall(args []string) error {
+ return execx.Execute("pip", append([]string{"install"}, args...)...)
+}
+
+// joinPaths joins the dir to every artifact path. Each / in the path is
+// interpreted as a directory separator.
+func joinPaths(dir string, paths ...string) []string {
+ var ret []string
+ for _, p := range paths {
+ ret = append(ret, filepath.Join(dir, filepath.FromSlash(p)))
+ }
+ return ret
+}
\ No newline at end of file
diff --git a/sdks/python/container/pom.xml b/sdks/python/container/pom.xml
new file mode 100644
index 0000000..6be2885
--- /dev/null
+++ b/sdks/python/container/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-python</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-python-container</artifactId>
+
+ <packaging>pom</packaging>
+
+ <name>Apache Beam :: SDKs :: Python :: Container</name>
+
+ <properties>
+ <!-- Add full path directory structure for 'go get' compatibility -->
+ <go.source.base>${project.basedir}/target/src</go.source.base>
+ <go.source.dir>${go.source.base}/github.com/apache/beam/sdks/go</go.source.dir>
+ </properties>
+
+ <build>
+ <sourceDirectory>${go.source.base}</sourceDirectory>
+ <plugins>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-go-cmd-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${go.source.base}/github.com/apache/beam/cmd/boot</outputDirectory>
+ <resources>
+ <resource>
+ <directory>.</directory>
+ <includes>
+ <include>*.go</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- CAVEAT: for latest shared files, run mvn install in sdks/go -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-dependency</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-go</artifactId>
+ <version>${project.version}</version>
+ <type>zip</type>
+ <classifier>pkg-sources</classifier>
+ <overWrite>true</overWrite>
+ <outputDirectory>${go.source.dir}</outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>com.igormaznitsa</groupId>
+ <artifactId>mvn-golang-wrapper</artifactId>
+ <executions>
+ <execution>
+ <id>go-get-imports</id>
+ <goals>
+ <goal>get</goal>
+ </goals>
+ <phase>compile</phase>
+ <configuration>
+ <packages>
+ <package>google.golang.org/grpc</package>
+ <package>golang.org/x/oauth2/google</package>
+ <package>google.golang.org/api/storage/v1</package>
+ </packages>
+ </configuration>
+ </execution>
+ <execution>
+ <id>go-build</id>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ <phase>compile</phase>
+ <configuration>
+ <packages>
+ <package>github.com/apache/beam/cmd/boot</package>
+ </packages>
+ <resultName>boot</resultName>
+ </configuration>
+ </execution>
+ <execution>
+ <id>go-build-linux-amd64</id>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ <phase>compile</phase>
+ <configuration>
+ <packages>
+ <package>github.com/apache/beam/cmd/boot</package>
+ </packages>
+ <resultName>linux_amd64/boot</resultName>
+ <targetArch>amd64</targetArch>
+ <targetOs>linux</targetOs>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>com.spotify</groupId>
+ <artifactId>dockerfile-maven-plugin</artifactId>
+ <configuration>
+ <repository>${docker-repository-root}/python</repository>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 37f9529..b42fb70 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -32,6 +32,10 @@
<name>Apache Beam :: SDKs :: Python</name>
+ <modules>
+ <module>container</module>
+ </modules>
+
<properties>
<!-- python.interpreter.bin & python.pip.bin
is set dynamically by findSupportedPython.groovy -->
@@ -59,6 +63,7 @@
<groupId>org.codehaus.gmaven</groupId>
<artifactId>groovy-maven-plugin</artifactId>
<version>${groovy-maven-plugin.version}</version>
+ <inherited>false</inherited>
<executions>
<execution>
<id>find-supported-python-for-clean</id>
@@ -85,6 +90,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
+ <inherited>false</inherited>
<executions>
<execution>
<id>setuptools-clean</id>
@@ -143,7 +149,7 @@
<argument>--user</argument>
<argument>--upgrade</argument>
<argument>--ignore-installed</argument>
- <argument>detox</argument>
+ <argument>tox</argument>
</arguments>
<environmentVariables>
<PYTHONUSERBASE>${python.user.base}</PYTHONUSERBASE>
@@ -189,6 +195,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
+ <inherited>false</inherited>
<executions>
<execution>
<id>setuptools-test</id>
@@ -197,7 +204,7 @@
<goal>exec</goal>
</goals>
<configuration>
- <executable>${python.user.base}/bin/detox</executable>
+ <executable>${python.user.base}/bin/tox</executable>
<arguments>
<argument>-e</argument>
<argument>ALL</argument>