Merge pull request #9324 [Release 2.15.0] [BEAM-7874], [BEAM-7873] Distributed FnApiRunner bugfixs 

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index e9cf2e2..3ca2870 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -508,7 +508,7 @@
         jaxb_api                                    : "javax.xml.bind:jaxb-api:$jaxb_api_version",
         joda_time                                   : "joda-time:joda-time:2.10.1",
         junit                                       : "junit:junit:4.13-beta-1",
-        kafka_2_11                                  : "org.apache.kafka:kafka_2.11:$kafka_version",
+        kafka                                       : "org.apache.kafka:kafka_2.11:$kafka_version",
         kafka_clients                               : "org.apache.kafka:kafka-clients:$kafka_version",
         malhar_library                              : "org.apache.apex:malhar-library:$apex_malhar_version",
         mockito_core                                : "org.mockito:mockito-core:1.10.19",
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 05b055b..4c48cb3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -238,6 +238,14 @@
           "Missing required values: " + Joiner.on(',').join(missing));
     }
 
+    if (dataflowOptions.getRegion() == null) {
+      dataflowOptions.setRegion("us-central1");
+      LOG.warn(
+          "--region not set; will default to us-central1. Future releases of Beam will "
+              + "require the user to set the region explicitly. "
+              + "https://cloud.google.com/compute/docs/regions-zones/regions-zones");
+    }
+
     PathValidator validator = dataflowOptions.getPathValidator();
     String gcpTempLocation;
     try {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 35df563..c035839 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -126,8 +126,8 @@
   @Description(
       "The Google Compute Engine region for creating Dataflow jobs. See "
           + "https://cloud.google.com/compute/docs/regions-zones/regions-zones for a list of valid "
-          + "options. Default is up to the Dataflow service.")
-  @Default.String("us-central1")
+          + "options. Currently defaults to us-central1, but future releases of Beam will "
+          + "require the user to set the region explicitly.")
   String getRegion();
 
   void setRegion(String region);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
index 2355071..947d290 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
@@ -86,6 +86,7 @@
     pipelineOptions.setWorkerId(WORKER_ID);
     pipelineOptions.setGcpCredential(new TestCredential());
     pipelineOptions.setDataflowClient(service);
+    pipelineOptions.setRegion("us-central1");
   }
 
   @Test
diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index c2e229a..a4e47f3 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -64,32 +64,29 @@
   compile library.java.slf4j_api
   compile library.java.joda_time
   compile library.java.args4j
-  compile "io.dropwizard.metrics:metrics-core:3.1.2"
-  compile library.java.jackson_module_scala
   provided library.java.spark_core
   provided library.java.spark_streaming
   provided library.java.spark_network_common
   provided library.java.hadoop_common
-  provided library.java.hadoop_mapreduce_client_core
-  provided library.java.commons_compress
   provided library.java.commons_lang3
   provided library.java.commons_io_2x
   provided library.java.hamcrest_core
   provided library.java.hamcrest_library
-  provided "org.apache.zookeeper:zookeeper:3.4.11"
-  provided "org.scala-lang:scala-library:2.11.8"
   provided "com.esotericsoftware.kryo:kryo:2.21"
+  runtimeOnly library.java.jackson_module_scala
+  runtimeOnly "org.scala-lang:scala-library:2.11.8"
   testCompile project(":sdks:java:io:kafka")
   testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
   // SparkStateInternalsTest extends abstract StateInternalsTest
   testCompile project(path: ":runners:core-java", configuration: "testRuntime")
   testCompile project(":sdks:java:harness")
   testCompile library.java.avro
+  testCompile library.java.kafka
   testCompile library.java.kafka_clients
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.jackson_dataformat_yaml
-  testCompile "org.apache.kafka:kafka_2.11:0.11.0.1"
+  testCompile "org.apache.zookeeper:zookeeper:3.4.11"
   validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
   validatesRunner project(":sdks:java:io:hadoop-format")
   validatesRunner project(":sdks:java:io:hadoop-format").sourceSets.test.output
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index b709664..921d829 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -54,7 +54,7 @@
 	maxNumWorkers        = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).")
 	autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).")
 	zone                 = flag.String("zone", "", "GCP zone (optional)")
-	region               = flag.String("region", "us-central1", "GCP Region (optional)")
+	region               = flag.String("region", "", "GCP Region (optional but encouraged)")
 	network              = flag.String("network", "", "GCP network (optional)")
 	tempLocation         = flag.String("temp_location", "", "Temp location (optional)")
 	machineType          = flag.String("worker_machine_type", "", "GCE machine type (optional)")
@@ -90,6 +90,12 @@
 	if *stagingLocation == "" {
 		return errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>")
 	}
+	if *region == "" {
+		*region = "us-central1"
+		log.Warn(ctx, "--region not set; will default to us-central1. Future releases of Beam will "+
+			"require the user to set the region explicitly. "+
+			"https://cloud.google.com/compute/docs/regions-zones/regions-zones")
+	}
 	if *image == "" {
 		*image = getContainerImage(ctx)
 	}
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
index c327e6a..80c66ae 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
@@ -171,11 +171,16 @@
       batch_size: the number of entities returned by fake datastore in one req.
     """
 
-    # Test for both random long ids and string ids.
-    id_or_name = [True, False]
+    # Test for random long ids, string ids, and a mix of both.
+    id_or_name = [True, False, None]
 
     for id_type in id_or_name:
-      entities = fake_datastore.create_entities(num_entities, id_type)
+      if id_type is None:
+        entities = fake_datastore.create_entities(num_entities, False)
+        entities.extend(fake_datastore.create_entities(num_entities, True))
+        num_entities *= 2
+      else:
+        entities = fake_datastore.create_entities(num_entities, id_type)
       mock_datastore = MagicMock()
       # Assign a fake run_query method as a side_effect to the mock.
       mock_datastore.run_query.side_effect = \
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py
index 4f8be83..82a109f 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py
@@ -26,6 +26,9 @@
 from builtins import range
 from builtins import round
 
+from past.builtins import long
+from past.builtins import unicode
+
 from apache_beam.io.gcp.datastore.v1new import types
 
 __all__ = ['QuerySplitterError', 'SplitNotPossibleError', 'get_splits']
@@ -123,10 +126,59 @@
   return scatter_query
 
 
+class IdOrName(object):
+  """Represents an ID or name of a Datastore key,
+
+   Implements sort ordering: by ID, then by name, keys with IDs before those
+   with names.
+   """
+  def __init__(self, id_or_name):
+    self.id_or_name = id_or_name
+    if isinstance(id_or_name, (str, unicode)):
+      self.id = None
+      self.name = id_or_name
+    elif isinstance(id_or_name, (int, long)):
+      self.id = id_or_name
+      self.name = None
+    else:
+      raise TypeError('Unexpected type of id_or_name: %s' % id_or_name)
+
+  def __lt__(self, other):
+    if not isinstance(other, IdOrName):
+      return super(IdOrName, self).__lt__(other)
+
+    if self.id is not None:
+      if other.id is None:
+        return True
+      else:
+        return self.id < other.id
+
+    if other.id is not None:
+      return False
+
+    return self.name < other.name
+
+  def __eq__(self, other):
+    if not isinstance(other, IdOrName):
+      return super(IdOrName, self).__eq__(other)
+    return self.id == other.id and self.name == other.name
+
+  def __hash__(self):
+    return hash((self.id, self.other))
+
+
 def client_key_sort_key(client_key):
   """Key function for sorting lists of ``google.cloud.datastore.key.Key``."""
-  return [client_key.project, client_key.namespace or ''] + [
-      str(element) for element in client_key.flat_path]
+  sort_key = [client_key.project, client_key.namespace or '']
+  # A key path is made up of (kind, id_or_name) pairs. The last pair might be
+  # missing an id_or_name.
+  flat_path = list(client_key.flat_path)
+  while flat_path:
+    sort_key.append(flat_path.pop(0))  # kind
+    if flat_path:
+      sort_key.append(IdOrName(flat_path.pop(0)))
+
+  return sort_key
 
 
 def _get_scatter_keys(client, query, num_splits):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py
index 3e30859..7f3d1ed 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py
@@ -103,9 +103,16 @@
       unused_batch_size: ignored in v1new since query results are entirely
         handled by the Datastore client.
     """
-    # Test for both random long ids and string ids.
-    for id_or_name in [True, False]:
-      client_entities = helper.create_client_entities(num_entities, id_or_name)
+    # Test for random long ids, string ids, and a mix of both.
+    for id_or_name in [True, False, None]:
+      if id_or_name is None:
+        client_entities = helper.create_client_entities(num_entities, False)
+        client_entities.extend(helper.create_client_entities(num_entities,
+                                                             True))
+        num_entities *= 2
+      else:
+        client_entities = helper.create_client_entities(num_entities,
+                                                        id_or_name)
 
       mock_client = mock.MagicMock()
       mock_client_query = mock.MagicMock()
@@ -154,6 +161,19 @@
           if lt_key is None:
             last_query_seen = True
 
+  def test_id_or_name(self):
+    id_ = query_splitter.IdOrName(1)
+    self.assertEqual(1, id_.id)
+    self.assertIsNone(id_.name)
+    name = query_splitter.IdOrName('1')
+    self.assertIsNone(name.id)
+    self.assertEqual('1', name.name)
+    self.assertEqual(query_splitter.IdOrName(1), query_splitter.IdOrName(1))
+    self.assertEqual(query_splitter.IdOrName('1'), query_splitter.IdOrName('1'))
+    self.assertLess(query_splitter.IdOrName(2), query_splitter.IdOrName('1'))
+    self.assertLess(query_splitter.IdOrName(1), query_splitter.IdOrName(2))
+    self.assertLess(query_splitter.IdOrName('1'), query_splitter.IdOrName('2'))
+
   def test_client_key_sort_key(self):
     k = key.Key('kind1', 1, project=self._PROJECT, namespace=self._NAMESPACE)
     k2 = key.Key('kind2', 'a', parent=k)
@@ -165,6 +185,31 @@
     keys.sort(key=query_splitter.client_key_sort_key)
     self.assertEqual(expected_sort, keys)
 
+  def test_client_key_sort_key_ids(self):
+    k1 = key.Key('kind', 2, project=self._PROJECT)
+    k2 = key.Key('kind', 1, project=self._PROJECT)
+    keys = [k1, k2]
+    expected_sort = [k2, k1]
+    keys.sort(key=query_splitter.client_key_sort_key)
+    self.assertEqual(expected_sort, keys)
+
+  def test_client_key_sort_key_names(self):
+    k1 = key.Key('kind', '2', project=self._PROJECT)
+    k2 = key.Key('kind', '1', project=self._PROJECT)
+    keys = [k1, k2]
+    expected_sort = [k2, k1]
+    keys.sort(key=query_splitter.client_key_sort_key)
+    self.assertEqual(expected_sort, keys)
+
+  def test_client_key_sort_key_ids_vs_names(self):
+    # Keys with IDs always come before keys with names.
+    k1 = key.Key('kind', '1', project=self._PROJECT)
+    k2 = key.Key('kind', 2, project=self._PROJECT)
+    keys = [k1, k2]
+    expected_sort = [k2, k1]
+    keys.sort(key=query_splitter.client_key_sort_key)
+    self.assertEqual(expected_sort, keys)
+
 
 # Hide base class from collection by nose.
 del QuerySplitterTestBase
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index de92519..658978f 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -448,13 +448,12 @@
     parser.add_argument('--temp_location',
                         default=None,
                         help='GCS path for saving temporary workflow jobs.')
-    # The Cloud Dataflow service does not yet honor this setting. However, once
-    # service support is added then users of this SDK will be able to control
-    # the region. Default is up to the Dataflow service. See
+    # The Google Compute Engine region for creating Dataflow jobs. See
     # https://cloud.google.com/compute/docs/regions-zones/regions-zones for a
-    # list of valid options/
+    # list of valid options. Currently defaults to us-central1, but future
+    # releases of Beam will require the user to set the region explicitly.
     parser.add_argument('--region',
-                        default='us-central1',
+                        default=None,
                         help='The Google Compute Engine region for creating '
                         'Dataflow job.')
     parser.add_argument('--service_account_email',
@@ -515,6 +514,15 @@
         errors.append('--dataflow_job_file and --template_location '
                       'are mutually exclusive.')
 
+    if self.view_as(GoogleCloudOptions).region is None:
+      self.view_as(GoogleCloudOptions).region = 'us-central1'
+      runner = self.view_as(StandardOptions).runner
+      if runner == 'DataflowRunner' or runner == 'TestDataflowRunner':
+        logging.warning(
+            '--region not set; will default to us-central1. Future releases of '
+            'Beam will require the user to set the region explicitly. '
+            'https://cloud.google.com/compute/docs/regions-zones/regions-zones')
+
     return errors