Merge pull request #9324 [Release 2.15.0] [BEAM-7874], [BEAM-7873] Distributed FnApiRunner bugfixs
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index e9cf2e2..3ca2870 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -508,7 +508,7 @@
jaxb_api : "javax.xml.bind:jaxb-api:$jaxb_api_version",
joda_time : "joda-time:joda-time:2.10.1",
junit : "junit:junit:4.13-beta-1",
- kafka_2_11 : "org.apache.kafka:kafka_2.11:$kafka_version",
+ kafka : "org.apache.kafka:kafka_2.11:$kafka_version",
kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version",
malhar_library : "org.apache.apex:malhar-library:$apex_malhar_version",
mockito_core : "org.mockito:mockito-core:1.10.19",
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 05b055b..4c48cb3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -238,6 +238,14 @@
"Missing required values: " + Joiner.on(',').join(missing));
}
+ if (dataflowOptions.getRegion() == null) {
+ dataflowOptions.setRegion("us-central1");
+ LOG.warn(
+ "--region not set; will default to us-central1. Future releases of Beam will "
+ + "require the user to set the region explicitly. "
+ + "https://cloud.google.com/compute/docs/regions-zones/regions-zones");
+ }
+
PathValidator validator = dataflowOptions.getPathValidator();
String gcpTempLocation;
try {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 35df563..c035839 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -126,8 +126,8 @@
@Description(
"The Google Compute Engine region for creating Dataflow jobs. See "
+ "https://cloud.google.com/compute/docs/regions-zones/regions-zones for a list of valid "
- + "options. Default is up to the Dataflow service.")
- @Default.String("us-central1")
+ + "options. Currently defaults to us-central1, but future releases of Beam will "
+ + "require the user to set the region explicitly.")
String getRegion();
void setRegion(String region);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
index 2355071..947d290 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
@@ -86,6 +86,7 @@
pipelineOptions.setWorkerId(WORKER_ID);
pipelineOptions.setGcpCredential(new TestCredential());
pipelineOptions.setDataflowClient(service);
+ pipelineOptions.setRegion("us-central1");
}
@Test
diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index c2e229a..a4e47f3 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -64,32 +64,29 @@
compile library.java.slf4j_api
compile library.java.joda_time
compile library.java.args4j
- compile "io.dropwizard.metrics:metrics-core:3.1.2"
- compile library.java.jackson_module_scala
provided library.java.spark_core
provided library.java.spark_streaming
provided library.java.spark_network_common
provided library.java.hadoop_common
- provided library.java.hadoop_mapreduce_client_core
- provided library.java.commons_compress
provided library.java.commons_lang3
provided library.java.commons_io_2x
provided library.java.hamcrest_core
provided library.java.hamcrest_library
- provided "org.apache.zookeeper:zookeeper:3.4.11"
- provided "org.scala-lang:scala-library:2.11.8"
provided "com.esotericsoftware.kryo:kryo:2.21"
+ runtimeOnly library.java.jackson_module_scala
+ runtimeOnly "org.scala-lang:scala-library:2.11.8"
testCompile project(":sdks:java:io:kafka")
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
// SparkStateInternalsTest extends abstract StateInternalsTest
testCompile project(path: ":runners:core-java", configuration: "testRuntime")
testCompile project(":sdks:java:harness")
testCompile library.java.avro
+ testCompile library.java.kafka
testCompile library.java.kafka_clients
testCompile library.java.junit
testCompile library.java.mockito_core
testCompile library.java.jackson_dataformat_yaml
- testCompile "org.apache.kafka:kafka_2.11:0.11.0.1"
+ testCompile "org.apache.zookeeper:zookeeper:3.4.11"
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(":sdks:java:io:hadoop-format")
validatesRunner project(":sdks:java:io:hadoop-format").sourceSets.test.output
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index b709664..921d829 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -54,7 +54,7 @@
maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).")
autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).")
zone = flag.String("zone", "", "GCP zone (optional)")
- region = flag.String("region", "us-central1", "GCP Region (optional)")
+ region = flag.String("region", "", "GCP Region (optional but encouraged)")
network = flag.String("network", "", "GCP network (optional)")
tempLocation = flag.String("temp_location", "", "Temp location (optional)")
machineType = flag.String("worker_machine_type", "", "GCE machine type (optional)")
@@ -90,6 +90,12 @@
if *stagingLocation == "" {
return errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>")
}
+ if *region == "" {
+ *region = "us-central1"
+ log.Warn(ctx, "--region not set; will default to us-central1. Future releases of Beam will "+
+ "require the user to set the region explicitly. "+
+ "https://cloud.google.com/compute/docs/regions-zones/regions-zones")
+ }
if *image == "" {
*image = getContainerImage(ctx)
}
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
index c327e6a..80c66ae 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
@@ -171,11 +171,16 @@
batch_size: the number of entities returned by fake datastore in one req.
"""
- # Test for both random long ids and string ids.
- id_or_name = [True, False]
+ # Test for random long ids, string ids, and a mix of both.
+ id_or_name = [True, False, None]
for id_type in id_or_name:
- entities = fake_datastore.create_entities(num_entities, id_type)
+ if id_type is None:
+ entities = fake_datastore.create_entities(num_entities, False)
+ entities.extend(fake_datastore.create_entities(num_entities, True))
+ num_entities *= 2
+ else:
+ entities = fake_datastore.create_entities(num_entities, id_type)
mock_datastore = MagicMock()
# Assign a fake run_query method as a side_effect to the mock.
mock_datastore.run_query.side_effect = \
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py
index 4f8be83..82a109f 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py
@@ -26,6 +26,9 @@
from builtins import range
from builtins import round
+from past.builtins import long
+from past.builtins import unicode
+
from apache_beam.io.gcp.datastore.v1new import types
__all__ = ['QuerySplitterError', 'SplitNotPossibleError', 'get_splits']
@@ -123,10 +126,59 @@
return scatter_query
+class IdOrName(object):
+ """Represents an ID or name of a Datastore key,
+
+ Implements sort ordering: by ID, then by name, keys with IDs before those
+ with names.
+ """
+ def __init__(self, id_or_name):
+ self.id_or_name = id_or_name
+ if isinstance(id_or_name, (str, unicode)):
+ self.id = None
+ self.name = id_or_name
+ elif isinstance(id_or_name, (int, long)):
+ self.id = id_or_name
+ self.name = None
+ else:
+ raise TypeError('Unexpected type of id_or_name: %s' % id_or_name)
+
+ def __lt__(self, other):
+ if not isinstance(other, IdOrName):
+ return super(IdOrName, self).__lt__(other)
+
+ if self.id is not None:
+ if other.id is None:
+ return True
+ else:
+ return self.id < other.id
+
+ if other.id is not None:
+ return False
+
+ return self.name < other.name
+
+ def __eq__(self, other):
+ if not isinstance(other, IdOrName):
+ return super(IdOrName, self).__eq__(other)
+ return self.id == other.id and self.name == other.name
+
+ def __hash__(self):
+ return hash((self.id, self.other))
+
+
def client_key_sort_key(client_key):
"""Key function for sorting lists of ``google.cloud.datastore.key.Key``."""
- return [client_key.project, client_key.namespace or ''] + [
- str(element) for element in client_key.flat_path]
+ sort_key = [client_key.project, client_key.namespace or '']
+ # A key path is made up of (kind, id_or_name) pairs. The last pair might be
+ # missing an id_or_name.
+ flat_path = list(client_key.flat_path)
+ while flat_path:
+ sort_key.append(flat_path.pop(0)) # kind
+ if flat_path:
+ sort_key.append(IdOrName(flat_path.pop(0)))
+
+ return sort_key
def _get_scatter_keys(client, query, num_splits):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py
index 3e30859..7f3d1ed 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py
@@ -103,9 +103,16 @@
unused_batch_size: ignored in v1new since query results are entirely
handled by the Datastore client.
"""
- # Test for both random long ids and string ids.
- for id_or_name in [True, False]:
- client_entities = helper.create_client_entities(num_entities, id_or_name)
+ # Test for random long ids, string ids, and a mix of both.
+ for id_or_name in [True, False, None]:
+ if id_or_name is None:
+ client_entities = helper.create_client_entities(num_entities, False)
+ client_entities.extend(helper.create_client_entities(num_entities,
+ True))
+ num_entities *= 2
+ else:
+ client_entities = helper.create_client_entities(num_entities,
+ id_or_name)
mock_client = mock.MagicMock()
mock_client_query = mock.MagicMock()
@@ -154,6 +161,19 @@
if lt_key is None:
last_query_seen = True
+ def test_id_or_name(self):
+ id_ = query_splitter.IdOrName(1)
+ self.assertEqual(1, id_.id)
+ self.assertIsNone(id_.name)
+ name = query_splitter.IdOrName('1')
+ self.assertIsNone(name.id)
+ self.assertEqual('1', name.name)
+ self.assertEqual(query_splitter.IdOrName(1), query_splitter.IdOrName(1))
+ self.assertEqual(query_splitter.IdOrName('1'), query_splitter.IdOrName('1'))
+ self.assertLess(query_splitter.IdOrName(2), query_splitter.IdOrName('1'))
+ self.assertLess(query_splitter.IdOrName(1), query_splitter.IdOrName(2))
+ self.assertLess(query_splitter.IdOrName('1'), query_splitter.IdOrName('2'))
+
def test_client_key_sort_key(self):
k = key.Key('kind1', 1, project=self._PROJECT, namespace=self._NAMESPACE)
k2 = key.Key('kind2', 'a', parent=k)
@@ -165,6 +185,31 @@
keys.sort(key=query_splitter.client_key_sort_key)
self.assertEqual(expected_sort, keys)
+ def test_client_key_sort_key_ids(self):
+ k1 = key.Key('kind', 2, project=self._PROJECT)
+ k2 = key.Key('kind', 1, project=self._PROJECT)
+ keys = [k1, k2]
+ expected_sort = [k2, k1]
+ keys.sort(key=query_splitter.client_key_sort_key)
+ self.assertEqual(expected_sort, keys)
+
+ def test_client_key_sort_key_names(self):
+ k1 = key.Key('kind', '2', project=self._PROJECT)
+ k2 = key.Key('kind', '1', project=self._PROJECT)
+ keys = [k1, k2]
+ expected_sort = [k2, k1]
+ keys.sort(key=query_splitter.client_key_sort_key)
+ self.assertEqual(expected_sort, keys)
+
+ def test_client_key_sort_key_ids_vs_names(self):
+ # Keys with IDs always come before keys with names.
+ k1 = key.Key('kind', '1', project=self._PROJECT)
+ k2 = key.Key('kind', 2, project=self._PROJECT)
+ keys = [k1, k2]
+ expected_sort = [k2, k1]
+ keys.sort(key=query_splitter.client_key_sort_key)
+ self.assertEqual(expected_sort, keys)
+
# Hide base class from collection by nose.
del QuerySplitterTestBase
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index de92519..658978f 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -448,13 +448,12 @@
parser.add_argument('--temp_location',
default=None,
help='GCS path for saving temporary workflow jobs.')
- # The Cloud Dataflow service does not yet honor this setting. However, once
- # service support is added then users of this SDK will be able to control
- # the region. Default is up to the Dataflow service. See
+ # The Google Compute Engine region for creating Dataflow jobs. See
# https://cloud.google.com/compute/docs/regions-zones/regions-zones for a
- # list of valid options/
+ # list of valid options. Currently defaults to us-central1, but future
+ # releases of Beam will require the user to set the region explicitly.
parser.add_argument('--region',
- default='us-central1',
+ default=None,
help='The Google Compute Engine region for creating '
'Dataflow job.')
parser.add_argument('--service_account_email',
@@ -515,6 +514,15 @@
errors.append('--dataflow_job_file and --template_location '
'are mutually exclusive.')
+ if self.view_as(GoogleCloudOptions).region is None:
+ self.view_as(GoogleCloudOptions).region = 'us-central1'
+ runner = self.view_as(StandardOptions).runner
+ if runner == 'DataflowRunner' or runner == 'TestDataflowRunner':
+ logging.warning(
+ '--region not set; will default to us-central1. Future releases of '
+ 'Beam will require the user to set the region explicitly. '
+ 'https://cloud.google.com/compute/docs/regions-zones/regions-zones')
+
return errors